2 ## This file is part of the libsigrokdecode project.
4 ## Copyright (C) 2011-2014 Uwe Hermann <uwe@hermann-uwe.de>
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ## GNU General Public License for more details.
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
20 import sigrokdecode as srd
21 from math import floor, ceil
27 [<ptype>, <rxtx>, <pdata>]
29 This is the list of <ptype>s and their respective <pdata> values:
30 - 'STARTBIT': The data is the (integer) value of the start bit (0/1).
31 - 'DATA': This is always a tuple containing two items:
32 - 1st item: the (integer) value of the UART data. Valid values
33 range from 0 to 511 (as the data can be up to 9 bits in size).
34 - 2nd item: the list of individual data bits and their ss/es numbers.
35 - 'PARITYBIT': The data is the (integer) value of the parity bit (0/1).
36 - 'STOPBIT': The data is the (integer) value of the stop bit (0 or 1).
37 - 'INVALID STARTBIT': The data is the (integer) value of the start bit (0/1).
38 - 'INVALID STOPBIT': The data is the (integer) value of the stop bit (0/1).
39 - 'PARITY ERROR': The data is a tuple with two entries. The first one is
40 the expected parity value, the second is the actual parity value.
43 The <rxtx> field is 0 for RX packets, 1 for TX packets.
46 # Used for differentiating between the two data directions.
50 # Given a parity type to check (odd, even, zero, one), the value of the
51 # parity bit, the value of the data, and the length of the data (5-9 bits,
52 # usually 8 bits) return True if the parity is correct, False otherwise.
53 # 'none' is _not_ allowed as value for 'parity_type'.
54 def parity_ok(parity_type, parity_bit, data, num_data_bits):
56 # Handle easy cases first (parity bit is always 1 or 0).
57 if parity_type == 'zero':
58 return parity_bit == 0
59 elif parity_type == 'one':
60 return parity_bit == 1
62 # Count number of 1 (high) bits in the data (and the parity bit itself!).
63 ones = bin(data).count('1') + parity_bit
65 # Check for odd/even parity.
66 if parity_type == 'odd':
67 return (ones % 2) == 1
68 elif parity_type == 'even':
69 return (ones % 2) == 0
71 class SamplerateError(Exception):
74 class ChannelError(Exception):
77 class Decoder(srd.Decoder):
81 longname = 'Universal Asynchronous Receiver/Transmitter'
82 desc = 'Asynchronous, serial bus.'
87 # Allow specifying only one of the signals, e.g. if only one data
88 # direction exists (or is relevant).
89 {'id': 'rx', 'name': 'RX', 'desc': 'UART receive line'},
90 {'id': 'tx', 'name': 'TX', 'desc': 'UART transmit line'},
93 {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200},
94 {'id': 'num_data_bits', 'desc': 'Data bits', 'default': 8,
95 'values': (5, 6, 7, 8, 9)},
96 {'id': 'parity_type', 'desc': 'Parity type', 'default': 'none',
97 'values': ('none', 'odd', 'even', 'zero', 'one')},
98 {'id': 'parity_check', 'desc': 'Check parity?', 'default': 'yes',
99 'values': ('yes', 'no')},
100 {'id': 'num_stop_bits', 'desc': 'Stop bits', 'default': 1.0,
101 'values': (0.0, 0.5, 1.0, 1.5)},
102 {'id': 'bit_order', 'desc': 'Bit order', 'default': 'lsb-first',
103 'values': ('lsb-first', 'msb-first')},
104 {'id': 'format', 'desc': 'Data format', 'default': 'hex',
105 'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},
106 {'id': 'invert_rx', 'desc': 'Invert RX?', 'default': 'no',
107 'values': ('yes', 'no')},
108 {'id': 'invert_tx', 'desc': 'Invert TX?', 'default': 'no',
109 'values': ('yes', 'no')},
112 ('rx-data', 'RX data'),
113 ('tx-data', 'TX data'),
114 ('rx-start', 'RX start bits'),
115 ('tx-start', 'TX start bits'),
116 ('rx-parity-ok', 'RX parity OK bits'),
117 ('tx-parity-ok', 'TX parity OK bits'),
118 ('rx-parity-err', 'RX parity error bits'),
119 ('tx-parity-err', 'TX parity error bits'),
120 ('rx-stop', 'RX stop bits'),
121 ('tx-stop', 'TX stop bits'),
122 ('rx-warnings', 'RX warnings'),
123 ('tx-warnings', 'TX warnings'),
124 ('rx-data-bits', 'RX data bits'),
125 ('tx-data-bits', 'TX data bits'),
128 ('rx-data', 'RX', (0, 2, 4, 6, 8)),
129 ('rx-data-bits', 'RX bits', (12,)),
130 ('rx-warnings', 'RX warnings', (10,)),
131 ('tx-data', 'TX', (1, 3, 5, 7, 9)),
132 ('tx-data-bits', 'TX bits', (13,)),
133 ('tx-warnings', 'TX warnings', (11,)),
138 ('rxtx', 'RX/TX dump'),
140 idle_state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
142 def putx(self, rxtx, data):
143 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
144 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)
146 def putpx(self, rxtx, data):
147 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
148 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_python, data)
150 def putg(self, data):
151 s, halfbit = self.samplenum, self.bit_width / 2.0
152 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)
154 def putp(self, data):
155 s, halfbit = self.samplenum, self.bit_width / 2.0
156 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)
158 def putbin(self, rxtx, data):
159 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
160 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_binary, data)
163 self.samplerate = None
165 self.frame_start = [-1, -1]
166 self.startbit = [-1, -1]
167 self.cur_data_bit = [0, 0]
168 self.datavalue = [0, 0]
169 self.paritybit = [-1, -1]
170 self.stopbit1 = [-1, -1]
171 self.startsample = [-1, -1]
172 self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
173 self.databits = [[], []]
176 self.out_python = self.register(srd.OUTPUT_PYTHON)
177 self.out_binary = self.register(srd.OUTPUT_BINARY)
178 self.out_ann = self.register(srd.OUTPUT_ANN)
179 self.bw = (self.options['num_data_bits'] + 7) // 8
181 def metadata(self, key, value):
182 if key == srd.SRD_CONF_SAMPLERATE:
183 self.samplerate = value
184 # The width of one UART bit in number of samples.
185 self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
187 def get_sample_point(self, rxtx, bitnum):
188 """Determine absolute sample number of a bit slot's sample point."""
189 # bitpos is the samplenumber which is in the middle of the
190 # specified UART bit (0 = start bit, 1..x = data, x+1 = parity bit
191 # (if used) or the first stop bit, and so on).
192 # The samples within bit are 0, 1, ..., (bit_width - 1), therefore
193 # index of the middle sample within bit window is (bit_width - 1) / 2.
194 bitpos = self.frame_start[rxtx] + (self.bit_width - 1) / 2.0
195 bitpos += bitnum * self.bit_width
198 def wait_for_start_bit(self, rxtx, signal):
199 # Save the sample number where the start bit begins.
200 self.frame_start[rxtx] = self.samplenum
202 self.state[rxtx] = 'GET START BIT'
204 def get_start_bit(self, rxtx, signal):
205 self.startbit[rxtx] = signal
207 # The startbit must be 0. If not, we report an error and wait
208 # for the next start bit (assuming this one was spurious).
209 if self.startbit[rxtx] != 0:
210 self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])
211 self.putg([rxtx + 10, ['Frame error', 'Frame err', 'FE']])
212 self.state[rxtx] = 'WAIT FOR START BIT'
215 self.cur_data_bit[rxtx] = 0
216 self.datavalue[rxtx] = 0
217 self.startsample[rxtx] = -1
219 self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
220 self.putg([rxtx + 2, ['Start bit', 'Start', 'S']])
222 self.state[rxtx] = 'GET DATA BITS'
224 def get_data_bits(self, rxtx, signal):
225 # Save the sample number of the middle of the first data bit.
226 if self.startsample[rxtx] == -1:
227 self.startsample[rxtx] = self.samplenum
229 # Get the next data bit in LSB-first or MSB-first fashion.
230 if self.options['bit_order'] == 'lsb-first':
231 self.datavalue[rxtx] >>= 1
232 self.datavalue[rxtx] |= \
233 (signal << (self.options['num_data_bits'] - 1))
235 self.datavalue[rxtx] <<= 1
236 self.datavalue[rxtx] |= (signal << 0)
238 self.putg([rxtx + 12, ['%d' % signal]])
240 # Store individual data bits and their start/end samplenumbers.
241 s, halfbit = self.samplenum, int(self.bit_width / 2)
242 self.databits[rxtx].append([signal, s - halfbit, s + halfbit])
244 # Return here, unless we already received all data bits.
245 self.cur_data_bit[rxtx] += 1
246 if self.cur_data_bit[rxtx] < self.options['num_data_bits']:
249 self.putpx(rxtx, ['DATA', rxtx,
250 (self.datavalue[rxtx], self.databits[rxtx])])
252 b = self.datavalue[rxtx]
253 formatted = self.format_value(b)
254 if formatted is not None:
255 self.putx(rxtx, [rxtx, [formatted]])
257 bdata = b.to_bytes(self.bw, byteorder='big')
258 self.putbin(rxtx, [rxtx, bdata])
259 self.putbin(rxtx, [2, bdata])
261 self.databits[rxtx] = []
263 # Advance to either reception of the parity bit, or reception of
264 # the STOP bits if parity is not applicable.
265 self.state[rxtx] = 'GET PARITY BIT'
266 if self.options['parity_type'] == 'none':
267 self.state[rxtx] = 'GET STOP BITS'
269 def format_value(self, v):
270 # Format value 'v' according to configured options.
271 # Reflects the user selected kind of representation, as well as
272 # the number of data bits in the UART frames.
274 fmt, bits = self.options['format'], self.options['num_data_bits']
276 # Assume "is printable" for values from 32 to including 126,
277 # below 32 is "control" and thus not printable, above 127 is
278 # "not ASCII" in its strict sense, 127 (DEL) is not printable,
279 # fall back to hex representation for non-printables.
281 if v in range(32, 126 + 1):
283 hexfmt = "[{:02X}]" if bits <= 8 else "[{:03X}]"
284 return hexfmt.format(v)
286 # Mere number to text conversion without prefix and padding
287 # for the "decimal" output format.
289 return "{:d}".format(v)
291 # Padding with leading zeroes for hex/oct/bin formats, but
292 # without a prefix for density -- since the format is user
293 # specified, there is no ambiguity.
295 digits = (bits + 4 - 1) // 4
298 digits = (bits + 3 - 1) // 3
305 if fmtchar is not None:
306 fmt = "{{:0{:d}{:s}}}".format(digits, fmtchar)
311 def get_parity_bit(self, rxtx, signal):
312 self.paritybit[rxtx] = signal
314 if parity_ok(self.options['parity_type'], self.paritybit[rxtx],
315 self.datavalue[rxtx], self.options['num_data_bits']):
316 self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])
317 self.putg([rxtx + 4, ['Parity bit', 'Parity', 'P']])
319 # TODO: Return expected/actual parity values.
320 self.putp(['PARITY ERROR', rxtx, (0, 1)]) # FIXME: Dummy tuple...
321 self.putg([rxtx + 6, ['Parity error', 'Parity err', 'PE']])
323 self.state[rxtx] = 'GET STOP BITS'
325 # TODO: Currently only supports 1 stop bit.
326 def get_stop_bits(self, rxtx, signal):
327 self.stopbit1[rxtx] = signal
329 # Stop bits must be 1. If not, we report an error.
330 if self.stopbit1[rxtx] != 1:
331 self.putp(['INVALID STOPBIT', rxtx, self.stopbit1[rxtx]])
332 self.putg([rxtx + 10, ['Frame error', 'Frame err', 'FE']])
333 # TODO: Abort? Ignore the frame? Other?
335 self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]])
336 self.putg([rxtx + 4, ['Stop bit', 'Stop', 'T']])
338 self.state[rxtx] = 'WAIT FOR START BIT'
340 def get_wait_cond(self, rxtx, inv):
342 Determine Decoder.wait() condition for specified UART line.
344 Returns condititions that are suitable for Decoder.wait(). Those
345 conditions either match the falling edge of the START bit, or
346 the sample point of the next bit time.
349 state = self.state[rxtx]
350 if state == 'WAIT FOR START BIT':
351 return {rxtx: 'r' if inv else 'f'}
352 if state == 'GET START BIT':
354 elif state == 'GET DATA BITS':
355 bitnum = 1 + self.cur_data_bit[rxtx]
356 elif state == 'GET PARITY BIT':
357 bitnum = 1 + self.options['num_data_bits']
358 elif state == 'GET STOP BITS':
359 bitnum = 1 + self.options['num_data_bits']
360 bitnum += 0 if self.options['parity_type'] == 'none' else 1
361 want_num = self.get_sample_point(rxtx, bitnum)
362 # want_num = int(want_num + 0.5)
363 want_num = ceil(want_num)
364 cond = {'skip': want_num - self.samplenum}
367 def inspect_sample(self, rxtx, signal, inv):
368 """Inspect a sample returned by .wait() for the specified UART line."""
373 state = self.state[rxtx]
374 if state == 'WAIT FOR START BIT':
375 self.wait_for_start_bit(rxtx, signal)
376 elif state == 'GET START BIT':
377 self.get_start_bit(rxtx, signal)
378 elif state == 'GET DATA BITS':
379 self.get_data_bits(rxtx, signal)
380 elif state == 'GET PARITY BIT':
381 self.get_parity_bit(rxtx, signal)
382 elif state == 'GET STOP BITS':
383 self.get_stop_bits(rxtx, signal)
386 if not self.samplerate:
387 raise SamplerateError('Cannot decode without samplerate.')
389 has_pin = [self.has_channel(ch) for ch in (RX, TX)]
390 if has_pin == [False, False]:
391 raise ChannelError('Either TX or RX (or both) pins required.')
394 inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
395 cond_idx = [None] * len(has_pin)
400 cond_idx[RX] = len(conds)
401 conds.append(self.get_wait_cond(RX, inv[RX]))
403 cond_idx[TX] = len(conds)
404 conds.append(self.get_wait_cond(TX, inv[TX]))
405 (rx, tx) = self.wait(conds)
406 if cond_idx[RX] is not None and self.matched[cond_idx[RX]]:
407 self.inspect_sample(RX, rx, inv[RX])
408 if cond_idx[TX] is not None and self.matched[cond_idx[TX]]:
409 self.inspect_sample(TX, tx, inv[TX])