2 ## This file is part of the libsigrokdecode project.
4 ## Copyright (C) 2011-2014 Uwe Hermann <uwe@hermann-uwe.de>
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 ## GNU General Public License for more details.
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
20 import sigrokdecode as srd
21 from math import floor, ceil
27 [<ptype>, <rxtx>, <pdata>]
29 This is the list of <ptype>s and their respective <pdata> values:
30 - 'STARTBIT': The data is the (integer) value of the start bit (0/1).
31 - 'DATA': This is always a tuple containing two items:
32 - 1st item: the (integer) value of the UART data. Valid values
33 range from 0 to 511 (as the data can be up to 9 bits in size).
34 - 2nd item: the list of individual data bits and their ss/es numbers.
35 - 'PARITYBIT': The data is the (integer) value of the parity bit (0/1).
36 - 'STOPBIT': The data is the (integer) value of the stop bit (0 or 1).
37 - 'INVALID STARTBIT': The data is the (integer) value of the start bit (0/1).
38 - 'INVALID STOPBIT': The data is the (integer) value of the stop bit (0/1).
39 - 'PARITY ERROR': The data is a tuple with two entries. The first one is
40 the expected parity value, the second is the actual parity value.
43 The <rxtx> field is 0 for RX packets, 1 for TX packets.
46 # Used for differentiating between the two data directions.
50 # Given a parity type to check (odd, even, zero, one), the value of the
51 # parity bit, the value of the data, and the length of the data (5-9 bits,
52 # usually 8 bits) return True if the parity is correct, False otherwise.
53 # 'none' is _not_ allowed as value for 'parity_type'.
54 def parity_ok(parity_type, parity_bit, data, num_data_bits):
56 # Handle easy cases first (parity bit is always 1 or 0).
57 if parity_type == 'zero':
58 return parity_bit == 0
59 elif parity_type == 'one':
60 return parity_bit == 1
62 # Count number of 1 (high) bits in the data (and the parity bit itself!).
63 ones = bin(data).count('1') + parity_bit
65 # Check for odd/even parity.
66 if parity_type == 'odd':
67 return (ones % 2) == 1
68 elif parity_type == 'even':
69 return (ones % 2) == 0
71 class SamplerateError(Exception):
74 class ChannelError(Exception):
77 class Decoder(srd.Decoder):
81 longname = 'Universal Asynchronous Receiver/Transmitter'
82 desc = 'Asynchronous, serial bus.'
87 # Allow specifying only one of the signals, e.g. if only one data
88 # direction exists (or is relevant).
89 {'id': 'rx', 'name': 'RX', 'desc': 'UART receive line'},
90 {'id': 'tx', 'name': 'TX', 'desc': 'UART transmit line'},
93 {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200},
94 {'id': 'num_data_bits', 'desc': 'Data bits', 'default': 8,
95 'values': (5, 6, 7, 8, 9)},
96 {'id': 'parity_type', 'desc': 'Parity type', 'default': 'none',
97 'values': ('none', 'odd', 'even', 'zero', 'one')},
98 {'id': 'parity_check', 'desc': 'Check parity?', 'default': 'yes',
99 'values': ('yes', 'no')},
100 {'id': 'num_stop_bits', 'desc': 'Stop bits', 'default': 1.0,
101 'values': (0.0, 0.5, 1.0, 1.5)},
102 {'id': 'bit_order', 'desc': 'Bit order', 'default': 'lsb-first',
103 'values': ('lsb-first', 'msb-first')},
104 {'id': 'format', 'desc': 'Data format', 'default': 'hex',
105 'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},
106 {'id': 'invert_rx', 'desc': 'Invert RX?', 'default': 'no',
107 'values': ('yes', 'no')},
108 {'id': 'invert_tx', 'desc': 'Invert TX?', 'default': 'no',
109 'values': ('yes', 'no')},
112 ('rx-data', 'RX data'),
113 ('tx-data', 'TX data'),
114 ('rx-start', 'RX start bits'),
115 ('tx-start', 'TX start bits'),
116 ('rx-parity-ok', 'RX parity OK bits'),
117 ('tx-parity-ok', 'TX parity OK bits'),
118 ('rx-parity-err', 'RX parity error bits'),
119 ('tx-parity-err', 'TX parity error bits'),
120 ('rx-stop', 'RX stop bits'),
121 ('tx-stop', 'TX stop bits'),
122 ('rx-warnings', 'RX warnings'),
123 ('tx-warnings', 'TX warnings'),
124 ('rx-data-bits', 'RX data bits'),
125 ('tx-data-bits', 'TX data bits'),
128 ('rx-data', 'RX', (0, 2, 4, 6, 8)),
129 ('rx-data-bits', 'RX bits', (12,)),
130 ('rx-warnings', 'RX warnings', (10,)),
131 ('tx-data', 'TX', (1, 3, 5, 7, 9)),
132 ('tx-data-bits', 'TX bits', (13,)),
133 ('tx-warnings', 'TX warnings', (11,)),
138 ('rxtx', 'RX/TX dump'),
140 idle_state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
142 def putx(self, rxtx, data):
143 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
144 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)
146 def putpx(self, rxtx, data):
147 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
148 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_python, data)
150 def putg(self, data):
151 s, halfbit = self.samplenum, self.bit_width / 2.0
152 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)
154 def putp(self, data):
155 s, halfbit = self.samplenum, self.bit_width / 2.0
156 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)
158 def putbin(self, rxtx, data):
159 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
160 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_binary, data)
163 self.samplerate = None
165 self.frame_start = [-1, -1]
166 self.startbit = [-1, -1]
167 self.cur_data_bit = [0, 0]
168 self.datavalue = [0, 0]
169 self.paritybit = [-1, -1]
170 self.stopbit1 = [-1, -1]
171 self.startsample = [-1, -1]
172 self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
173 self.databits = [[], []]
176 self.out_python = self.register(srd.OUTPUT_PYTHON)
177 self.out_binary = self.register(srd.OUTPUT_BINARY)
178 self.out_ann = self.register(srd.OUTPUT_ANN)
179 self.bw = (self.options['num_data_bits'] + 7) // 8
181 def metadata(self, key, value):
182 if key == srd.SRD_CONF_SAMPLERATE:
183 self.samplerate = value
184 # The width of one UART bit in number of samples.
185 self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
187 def get_sample_point(self, rxtx, bitnum):
188 """Determine absolute sample number of a bit slot's sample point."""
189 # bitpos is the samplenumber which is in the middle of the
190 # specified UART bit (0 = start bit, 1..x = data, x+1 = parity bit
191 # (if used) or the first stop bit, and so on).
192 # The samples within bit are 0, 1, ..., (bit_width - 1), therefore
193 # index of the middle sample within bit window is (bit_width - 1) / 2.
194 bitpos = self.frame_start[rxtx] + (self.bit_width - 1) / 2.0
195 bitpos += bitnum * self.bit_width
198 # Return true if we reached the middle of the desired bit, false otherwise.
199 def reached_bit(self, rxtx, bitnum):
200 bitpos = self.get_sample_point(rxtx, bitnum)
201 if self.samplenum >= bitpos:
205 def wait_for_start_bit(self, rxtx, signal):
206 # The caller already has detected an edge. Strictly speaking this
207 # check on the current signal level is redundant. But it does not
212 # Save the sample number where the start bit begins.
213 self.frame_start[rxtx] = self.samplenum
215 self.state[rxtx] = 'GET START BIT'
217 def get_start_bit(self, rxtx, signal):
218 # Skip samples until we're in the middle of the start bit.
219 if not self.reached_bit(rxtx, 0):
222 self.startbit[rxtx] = signal
224 # The startbit must be 0. If not, we report an error and wait
225 # for the next start bit (assuming this one was spurious).
226 if self.startbit[rxtx] != 0:
227 self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])
228 self.putg([rxtx + 10, ['Frame error', 'Frame err', 'FE']])
229 self.state[rxtx] = 'WAIT FOR START BIT'
232 self.cur_data_bit[rxtx] = 0
233 self.datavalue[rxtx] = 0
234 self.startsample[rxtx] = -1
236 self.state[rxtx] = 'GET DATA BITS'
238 self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
239 self.putg([rxtx + 2, ['Start bit', 'Start', 'S']])
241 def get_data_bits(self, rxtx, signal):
242 # Skip samples until we're in the middle of the desired data bit.
243 if not self.reached_bit(rxtx, 1 + self.cur_data_bit[rxtx]):
246 # Save the sample number of the middle of the first data bit.
247 if self.startsample[rxtx] == -1:
248 self.startsample[rxtx] = self.samplenum
250 # Get the next data bit in LSB-first or MSB-first fashion.
251 if self.options['bit_order'] == 'lsb-first':
252 self.datavalue[rxtx] >>= 1
253 self.datavalue[rxtx] |= \
254 (signal << (self.options['num_data_bits'] - 1))
256 self.datavalue[rxtx] <<= 1
257 self.datavalue[rxtx] |= (signal << 0)
259 self.putg([rxtx + 12, ['%d' % signal]])
261 # Store individual data bits and their start/end samplenumbers.
262 s, halfbit = self.samplenum, int(self.bit_width / 2)
263 self.databits[rxtx].append([signal, s - halfbit, s + halfbit])
265 # Return here, unless we already received all data bits.
266 self.cur_data_bit[rxtx] += 1
267 if self.cur_data_bit[rxtx] < self.options['num_data_bits']:
270 # Skip to either reception of the parity bit, or reception of
271 # the STOP bits if parity is not applicable.
272 self.state[rxtx] = 'GET PARITY BIT'
273 if self.options['parity_type'] == 'none':
274 self.state[rxtx] = 'GET STOP BITS'
276 self.putpx(rxtx, ['DATA', rxtx,
277 (self.datavalue[rxtx], self.databits[rxtx])])
279 b = self.datavalue[rxtx]
280 formatted = self.format_value(b)
281 if formatted is not None:
282 self.putx(rxtx, [rxtx, [formatted]])
284 bdata = b.to_bytes(self.bw, byteorder='big')
285 self.putbin(rxtx, [rxtx, bdata])
286 self.putbin(rxtx, [2, bdata])
288 self.databits[rxtx] = []
290 def format_value(self, v):
291 # Format value 'v' according to configured options.
292 # Reflects the user selected kind of representation, as well as
293 # the number of data bits in the UART frames.
295 fmt, bits = self.options['format'], self.options['num_data_bits']
297 # Assume "is printable" for values from 32 to including 126,
298 # below 32 is "control" and thus not printable, above 127 is
299 # "not ASCII" in its strict sense, 127 (DEL) is not printable,
300 # fall back to hex representation for non-printables.
302 if v in range(32, 126 + 1):
304 hexfmt = "[{:02X}]" if bits <= 8 else "[{:03X}]"
305 return hexfmt.format(v)
307 # Mere number to text conversion without prefix and padding
308 # for the "decimal" output format.
310 return "{:d}".format(v)
312 # Padding with leading zeroes for hex/oct/bin formats, but
313 # without a prefix for density -- since the format is user
314 # specified, there is no ambiguity.
316 digits = (bits + 4 - 1) // 4
319 digits = (bits + 3 - 1) // 3
326 if fmtchar is not None:
327 fmt = "{{:0{:d}{:s}}}".format(digits, fmtchar)
332 def get_parity_bit(self, rxtx, signal):
333 # Skip samples until we're in the middle of the parity bit.
334 if not self.reached_bit(rxtx, 1 + self.options['num_data_bits']):
337 self.paritybit[rxtx] = signal
339 self.state[rxtx] = 'GET STOP BITS'
341 if parity_ok(self.options['parity_type'], self.paritybit[rxtx],
342 self.datavalue[rxtx], self.options['num_data_bits']):
343 self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])
344 self.putg([rxtx + 4, ['Parity bit', 'Parity', 'P']])
346 # TODO: Return expected/actual parity values.
347 self.putp(['PARITY ERROR', rxtx, (0, 1)]) # FIXME: Dummy tuple...
348 self.putg([rxtx + 6, ['Parity error', 'Parity err', 'PE']])
350 # TODO: Currently only supports 1 stop bit.
351 def get_stop_bits(self, rxtx, signal):
352 # Skip samples until we're in the middle of the stop bit(s).
353 skip_parity = 0 if self.options['parity_type'] == 'none' else 1
354 b = 1 + self.options['num_data_bits'] + skip_parity
355 if not self.reached_bit(rxtx, b):
358 self.stopbit1[rxtx] = signal
360 # Stop bits must be 1. If not, we report an error.
361 if self.stopbit1[rxtx] != 1:
362 self.putp(['INVALID STOPBIT', rxtx, self.stopbit1[rxtx]])
363 self.putg([rxtx + 10, ['Frame error', 'Frame err', 'FE']])
364 # TODO: Abort? Ignore the frame? Other?
366 self.state[rxtx] = 'WAIT FOR START BIT'
368 self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]])
369 self.putg([rxtx + 4, ['Stop bit', 'Stop', 'T']])
371 def get_wait_cond(self, rxtx, inv):
373 Determine Decoder.wait() condition for specified UART line.
375 Returns condititions that are suitable for Decoder.wait(). Those
376 conditions either match the falling edge of the START bit, or
377 the sample point of the next bit time.
380 state = self.state[rxtx]
381 if state == 'WAIT FOR START BIT':
382 return {rxtx: 'r' if inv else 'f'}
383 if state == 'GET START BIT':
385 elif state == 'GET DATA BITS':
386 bitnum = 1 + self.cur_data_bit[rxtx]
387 elif state == 'GET PARITY BIT':
388 bitnum = 1 + self.options['num_data_bits']
389 elif state == 'GET STOP BITS':
390 bitnum = 1 + self.options['num_data_bits']
391 bitnum += 0 if self.options['parity_type'] == 'none' else 1
392 want_num = self.get_sample_point(rxtx, bitnum)
393 # want_num = int(want_num + 0.5)
394 want_num = ceil(want_num)
395 cond = {'skip': want_num - self.samplenum}
398 def inspect_sample(self, rxtx, signal, inv):
399 """Inspect a sample returned by .wait() for the specified UART line."""
404 state = self.state[rxtx]
405 if state == 'WAIT FOR START BIT':
406 self.wait_for_start_bit(rxtx, signal)
407 elif state == 'GET START BIT':
408 self.get_start_bit(rxtx, signal)
409 elif state == 'GET DATA BITS':
410 self.get_data_bits(rxtx, signal)
411 elif state == 'GET PARITY BIT':
412 self.get_parity_bit(rxtx, signal)
413 elif state == 'GET STOP BITS':
414 self.get_stop_bits(rxtx, signal)
417 if not self.samplerate:
418 raise SamplerateError('Cannot decode without samplerate.')
420 has_pin = [self.has_channel(ch) for ch in (RX, TX)]
421 if has_pin == [False, False]:
422 raise ChannelError('Either TX or RX (or both) pins required.')
425 inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
430 conds.append(self.get_wait_cond(RX, inv[RX]))
432 conds.append(self.get_wait_cond(TX, inv[TX]))
433 (rx, tx) = self.wait(conds)
435 self.inspect_sample(RX, rx, inv[RX])
437 self.inspect_sample(TX, tx, inv[TX])