]> sigrok.org Git - libsigrokdecode.git/blame_incremental - decoders/uart/pd.py
uart: Convert to PD API version 3
[libsigrokdecode.git] / decoders / uart / pd.py
... / ...
CommitLineData
1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2011-2014 Uwe Hermann <uwe@hermann-uwe.de>
5##
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, see <http://www.gnu.org/licenses/>.
18##
19
20import sigrokdecode as srd
21from math import floor, ceil
22
23'''
24OUTPUT_PYTHON format:
25
26Packet:
27[<ptype>, <rxtx>, <pdata>]
28
29This is the list of <ptype>s and their respective <pdata> values:
30 - 'STARTBIT': The data is the (integer) value of the start bit (0/1).
31 - 'DATA': This is always a tuple containing two items:
32 - 1st item: the (integer) value of the UART data. Valid values
33 range from 0 to 511 (as the data can be up to 9 bits in size).
34 - 2nd item: the list of individual data bits and their ss/es numbers.
35 - 'PARITYBIT': The data is the (integer) value of the parity bit (0/1).
36 - 'STOPBIT': The data is the (integer) value of the stop bit (0 or 1).
37 - 'INVALID STARTBIT': The data is the (integer) value of the start bit (0/1).
38 - 'INVALID STOPBIT': The data is the (integer) value of the stop bit (0/1).
39 - 'PARITY ERROR': The data is a tuple with two entries. The first one is
40 the expected parity value, the second is the actual parity value.
41 - TODO: Frame error?
42
43The <rxtx> field is 0 for RX packets, 1 for TX packets.
44'''
45
46# Used for differentiating between the two data directions.
47RX = 0
48TX = 1
49
50# Given a parity type to check (odd, even, zero, one), the value of the
51# parity bit, the value of the data, and the length of the data (5-9 bits,
52# usually 8 bits) return True if the parity is correct, False otherwise.
53# 'none' is _not_ allowed as value for 'parity_type'.
54def parity_ok(parity_type, parity_bit, data, num_data_bits):
55
56 # Handle easy cases first (parity bit is always 1 or 0).
57 if parity_type == 'zero':
58 return parity_bit == 0
59 elif parity_type == 'one':
60 return parity_bit == 1
61
62 # Count number of 1 (high) bits in the data (and the parity bit itself!).
63 ones = bin(data).count('1') + parity_bit
64
65 # Check for odd/even parity.
66 if parity_type == 'odd':
67 return (ones % 2) == 1
68 elif parity_type == 'even':
69 return (ones % 2) == 0
70
71class SamplerateError(Exception):
72 pass
73
74class ChannelError(Exception):
75 pass
76
77class Decoder(srd.Decoder):
78 api_version = 3
79 id = 'uart'
80 name = 'UART'
81 longname = 'Universal Asynchronous Receiver/Transmitter'
82 desc = 'Asynchronous, serial bus.'
83 license = 'gplv2+'
84 inputs = ['logic']
85 outputs = ['uart']
86 optional_channels = (
87 # Allow specifying only one of the signals, e.g. if only one data
88 # direction exists (or is relevant).
89 {'id': 'rx', 'name': 'RX', 'desc': 'UART receive line'},
90 {'id': 'tx', 'name': 'TX', 'desc': 'UART transmit line'},
91 )
92 options = (
93 {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200},
94 {'id': 'num_data_bits', 'desc': 'Data bits', 'default': 8,
95 'values': (5, 6, 7, 8, 9)},
96 {'id': 'parity_type', 'desc': 'Parity type', 'default': 'none',
97 'values': ('none', 'odd', 'even', 'zero', 'one')},
98 {'id': 'parity_check', 'desc': 'Check parity?', 'default': 'yes',
99 'values': ('yes', 'no')},
100 {'id': 'num_stop_bits', 'desc': 'Stop bits', 'default': 1.0,
101 'values': (0.0, 0.5, 1.0, 1.5)},
102 {'id': 'bit_order', 'desc': 'Bit order', 'default': 'lsb-first',
103 'values': ('lsb-first', 'msb-first')},
104 {'id': 'format', 'desc': 'Data format', 'default': 'hex',
105 'values': ('ascii', 'dec', 'hex', 'oct', 'bin')},
106 {'id': 'invert_rx', 'desc': 'Invert RX?', 'default': 'no',
107 'values': ('yes', 'no')},
108 {'id': 'invert_tx', 'desc': 'Invert TX?', 'default': 'no',
109 'values': ('yes', 'no')},
110 )
111 annotations = (
112 ('rx-data', 'RX data'),
113 ('tx-data', 'TX data'),
114 ('rx-start', 'RX start bits'),
115 ('tx-start', 'TX start bits'),
116 ('rx-parity-ok', 'RX parity OK bits'),
117 ('tx-parity-ok', 'TX parity OK bits'),
118 ('rx-parity-err', 'RX parity error bits'),
119 ('tx-parity-err', 'TX parity error bits'),
120 ('rx-stop', 'RX stop bits'),
121 ('tx-stop', 'TX stop bits'),
122 ('rx-warnings', 'RX warnings'),
123 ('tx-warnings', 'TX warnings'),
124 ('rx-data-bits', 'RX data bits'),
125 ('tx-data-bits', 'TX data bits'),
126 )
127 annotation_rows = (
128 ('rx-data', 'RX', (0, 2, 4, 6, 8)),
129 ('rx-data-bits', 'RX bits', (12,)),
130 ('rx-warnings', 'RX warnings', (10,)),
131 ('tx-data', 'TX', (1, 3, 5, 7, 9)),
132 ('tx-data-bits', 'TX bits', (13,)),
133 ('tx-warnings', 'TX warnings', (11,)),
134 )
135 binary = (
136 ('rx', 'RX dump'),
137 ('tx', 'TX dump'),
138 ('rxtx', 'RX/TX dump'),
139 )
140 idle_state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
141
142 def putx(self, rxtx, data):
143 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
144 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_ann, data)
145
146 def putpx(self, rxtx, data):
147 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
148 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_python, data)
149
150 def putg(self, data):
151 s, halfbit = self.samplenum, self.bit_width / 2.0
152 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)
153
154 def putp(self, data):
155 s, halfbit = self.samplenum, self.bit_width / 2.0
156 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)
157
158 def putbin(self, rxtx, data):
159 s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
160 self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_binary, data)
161
162 def __init__(self):
163 self.samplerate = None
164 self.samplenum = 0
165 self.frame_start = [-1, -1]
166 self.startbit = [-1, -1]
167 self.cur_data_bit = [0, 0]
168 self.datavalue = [0, 0]
169 self.paritybit = [-1, -1]
170 self.stopbit1 = [-1, -1]
171 self.startsample = [-1, -1]
172 self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
173 self.databits = [[], []]
174
175 def start(self):
176 self.out_python = self.register(srd.OUTPUT_PYTHON)
177 self.out_binary = self.register(srd.OUTPUT_BINARY)
178 self.out_ann = self.register(srd.OUTPUT_ANN)
179 self.bw = (self.options['num_data_bits'] + 7) // 8
180
181 def metadata(self, key, value):
182 if key == srd.SRD_CONF_SAMPLERATE:
183 self.samplerate = value
184 # The width of one UART bit in number of samples.
185 self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
186
187 def get_sample_point(self, rxtx, bitnum):
188 """Determine absolute sample number of a bit slot's sample point."""
189 # bitpos is the samplenumber which is in the middle of the
190 # specified UART bit (0 = start bit, 1..x = data, x+1 = parity bit
191 # (if used) or the first stop bit, and so on).
192 # The samples within bit are 0, 1, ..., (bit_width - 1), therefore
193 # index of the middle sample within bit window is (bit_width - 1) / 2.
194 bitpos = self.frame_start[rxtx] + (self.bit_width - 1) / 2.0
195 bitpos += bitnum * self.bit_width
196 return bitpos
197
198 # Return true if we reached the middle of the desired bit, false otherwise.
199 def reached_bit(self, rxtx, bitnum):
200 bitpos = self.get_sample_point(rxtx, bitnum)
201 if self.samplenum >= bitpos:
202 return True
203 return False
204
205 def wait_for_start_bit(self, rxtx, signal):
206 # The caller already has detected an edge. Strictly speaking this
207 # check on the current signal level is redundant. But it does not
208 # harm either.
209 if signal != 0:
210 return
211
212 # Save the sample number where the start bit begins.
213 self.frame_start[rxtx] = self.samplenum
214
215 self.state[rxtx] = 'GET START BIT'
216
217 def get_start_bit(self, rxtx, signal):
218 # Skip samples until we're in the middle of the start bit.
219 if not self.reached_bit(rxtx, 0):
220 return
221
222 self.startbit[rxtx] = signal
223
224 # The startbit must be 0. If not, we report an error and wait
225 # for the next start bit (assuming this one was spurious).
226 if self.startbit[rxtx] != 0:
227 self.putp(['INVALID STARTBIT', rxtx, self.startbit[rxtx]])
228 self.putg([rxtx + 10, ['Frame error', 'Frame err', 'FE']])
229 self.state[rxtx] = 'WAIT FOR START BIT'
230 return
231
232 self.cur_data_bit[rxtx] = 0
233 self.datavalue[rxtx] = 0
234 self.startsample[rxtx] = -1
235
236 self.state[rxtx] = 'GET DATA BITS'
237
238 self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
239 self.putg([rxtx + 2, ['Start bit', 'Start', 'S']])
240
241 def get_data_bits(self, rxtx, signal):
242 # Skip samples until we're in the middle of the desired data bit.
243 if not self.reached_bit(rxtx, 1 + self.cur_data_bit[rxtx]):
244 return
245
246 # Save the sample number of the middle of the first data bit.
247 if self.startsample[rxtx] == -1:
248 self.startsample[rxtx] = self.samplenum
249
250 # Get the next data bit in LSB-first or MSB-first fashion.
251 if self.options['bit_order'] == 'lsb-first':
252 self.datavalue[rxtx] >>= 1
253 self.datavalue[rxtx] |= \
254 (signal << (self.options['num_data_bits'] - 1))
255 else:
256 self.datavalue[rxtx] <<= 1
257 self.datavalue[rxtx] |= (signal << 0)
258
259 self.putg([rxtx + 12, ['%d' % signal]])
260
261 # Store individual data bits and their start/end samplenumbers.
262 s, halfbit = self.samplenum, int(self.bit_width / 2)
263 self.databits[rxtx].append([signal, s - halfbit, s + halfbit])
264
265 # Return here, unless we already received all data bits.
266 self.cur_data_bit[rxtx] += 1
267 if self.cur_data_bit[rxtx] < self.options['num_data_bits']:
268 return
269
270 # Skip to either reception of the parity bit, or reception of
271 # the STOP bits if parity is not applicable.
272 self.state[rxtx] = 'GET PARITY BIT'
273 if self.options['parity_type'] == 'none':
274 self.state[rxtx] = 'GET STOP BITS'
275
276 self.putpx(rxtx, ['DATA', rxtx,
277 (self.datavalue[rxtx], self.databits[rxtx])])
278
279 b = self.datavalue[rxtx]
280 formatted = self.format_value(b)
281 if formatted is not None:
282 self.putx(rxtx, [rxtx, [formatted]])
283
284 bdata = b.to_bytes(self.bw, byteorder='big')
285 self.putbin(rxtx, [rxtx, bdata])
286 self.putbin(rxtx, [2, bdata])
287
288 self.databits[rxtx] = []
289
290 def format_value(self, v):
291 # Format value 'v' according to configured options.
292 # Reflects the user selected kind of representation, as well as
293 # the number of data bits in the UART frames.
294
295 fmt, bits = self.options['format'], self.options['num_data_bits']
296
297 # Assume "is printable" for values from 32 to including 126,
298 # below 32 is "control" and thus not printable, above 127 is
299 # "not ASCII" in its strict sense, 127 (DEL) is not printable,
300 # fall back to hex representation for non-printables.
301 if fmt == 'ascii':
302 if v in range(32, 126 + 1):
303 return chr(v)
304 hexfmt = "[{:02X}]" if bits <= 8 else "[{:03X}]"
305 return hexfmt.format(v)
306
307 # Mere number to text conversion without prefix and padding
308 # for the "decimal" output format.
309 if fmt == 'dec':
310 return "{:d}".format(v)
311
312 # Padding with leading zeroes for hex/oct/bin formats, but
313 # without a prefix for density -- since the format is user
314 # specified, there is no ambiguity.
315 if fmt == 'hex':
316 digits = (bits + 4 - 1) // 4
317 fmtchar = "X"
318 elif fmt == 'oct':
319 digits = (bits + 3 - 1) // 3
320 fmtchar = "o"
321 elif fmt == 'bin':
322 digits = bits
323 fmtchar = "b"
324 else:
325 fmtchar = None
326 if fmtchar is not None:
327 fmt = "{{:0{:d}{:s}}}".format(digits, fmtchar)
328 return fmt.format(v)
329
330 return None
331
332 def get_parity_bit(self, rxtx, signal):
333 # Skip samples until we're in the middle of the parity bit.
334 if not self.reached_bit(rxtx, 1 + self.options['num_data_bits']):
335 return
336
337 self.paritybit[rxtx] = signal
338
339 self.state[rxtx] = 'GET STOP BITS'
340
341 if parity_ok(self.options['parity_type'], self.paritybit[rxtx],
342 self.datavalue[rxtx], self.options['num_data_bits']):
343 self.putp(['PARITYBIT', rxtx, self.paritybit[rxtx]])
344 self.putg([rxtx + 4, ['Parity bit', 'Parity', 'P']])
345 else:
346 # TODO: Return expected/actual parity values.
347 self.putp(['PARITY ERROR', rxtx, (0, 1)]) # FIXME: Dummy tuple...
348 self.putg([rxtx + 6, ['Parity error', 'Parity err', 'PE']])
349
350 # TODO: Currently only supports 1 stop bit.
351 def get_stop_bits(self, rxtx, signal):
352 # Skip samples until we're in the middle of the stop bit(s).
353 skip_parity = 0 if self.options['parity_type'] == 'none' else 1
354 b = 1 + self.options['num_data_bits'] + skip_parity
355 if not self.reached_bit(rxtx, b):
356 return
357
358 self.stopbit1[rxtx] = signal
359
360 # Stop bits must be 1. If not, we report an error.
361 if self.stopbit1[rxtx] != 1:
362 self.putp(['INVALID STOPBIT', rxtx, self.stopbit1[rxtx]])
363 self.putg([rxtx + 10, ['Frame error', 'Frame err', 'FE']])
364 # TODO: Abort? Ignore the frame? Other?
365
366 self.state[rxtx] = 'WAIT FOR START BIT'
367
368 self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]])
369 self.putg([rxtx + 4, ['Stop bit', 'Stop', 'T']])
370
371 def get_wait_cond(self, rxtx, inv):
372 """
373 Determine Decoder.wait() condition for specified UART line.
374
375 Returns condititions that are suitable for Decoder.wait(). Those
376 conditions either match the falling edge of the START bit, or
377 the sample point of the next bit time.
378 """
379
380 state = self.state[rxtx]
381 if state == 'WAIT FOR START BIT':
382 return {rxtx: 'r' if inv else 'f'}
383 if state == 'GET START BIT':
384 bitnum = 0
385 elif state == 'GET DATA BITS':
386 bitnum = 1 + self.cur_data_bit[rxtx]
387 elif state == 'GET PARITY BIT':
388 bitnum = 1 + self.options['num_data_bits']
389 elif state == 'GET STOP BITS':
390 bitnum = 1 + self.options['num_data_bits']
391 bitnum += 0 if self.options['parity_type'] == 'none' else 1
392 want_num = self.get_sample_point(rxtx, bitnum)
393 # want_num = int(want_num + 0.5)
394 want_num = ceil(want_num)
395 cond = {'skip': want_num - self.samplenum}
396 return cond
397
398 def decode(self):
399 if not self.samplerate:
400 raise SamplerateError('Cannot decode without samplerate.')
401
402 has_pin = [self.has_channel(ch) for ch in (RX, TX)]
403 if has_pin == [False, False]:
404 raise ChannelError('Either TX or RX (or both) pins required.')
405
406 opt = self.options
407 inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
408
409 while True:
410 conds = []
411 if has_pin[RX]:
412 conds.append(self.get_wait_cond(RX, inv[RX]))
413 if has_pin[TX]:
414 conds.append(self.get_wait_cond(TX, inv[TX]))
415 (rx, tx) = self.wait(conds)
416 if inv[RX]:
417 rx = not rx
418 if inv[TX]:
419 tx = not tx
420
421 # State machine.
422 for rxtx in (RX, TX):
423 # Don't try to handle RX (or TX) if not supplied.
424 if not has_pin[rxtx]:
425 continue
426
427 signal = rx if (rxtx == RX) else tx
428
429 if self.state[rxtx] == 'WAIT FOR START BIT':
430 self.wait_for_start_bit(rxtx, signal)
431 elif self.state[rxtx] == 'GET START BIT':
432 self.get_start_bit(rxtx, signal)
433 elif self.state[rxtx] == 'GET DATA BITS':
434 self.get_data_bits(rxtx, signal)
435 elif self.state[rxtx] == 'GET PARITY BIT':
436 self.get_parity_bit(rxtx, signal)
437 elif self.state[rxtx] == 'GET STOP BITS':
438 self.get_stop_bits(rxtx, signal)