From: Gerhard Sittig Date: Tue, 14 Mar 2017 17:00:06 +0000 (+0100) Subject: uart: Convert to PD API version 3 X-Git-Tag: libsigrokdecode-0.5.0~61 X-Git-Url: https://sigrok.org/gitaction?a=commitdiff_plain;h=dcd3d6262c57d86581864e642ec00dda5342d1af;p=libsigrokdecode.git uart: Convert to PD API version 3 Adjust the UART protocol decoder, to make use of the query based API. Have edges detected and unrelated samples skipped by common code. This implementation keeps some redundancy in place (like checking for having reached specific sample numbers, while the backend managed that for us). This approach reduces the diff and shall simplify review. Only some common checks in decode() were moved to the start of the routine, outside of the sample inspection loop. --- diff --git a/decoders/uart/pd.py b/decoders/uart/pd.py index b081724..8db3e17 100644 --- a/decoders/uart/pd.py +++ b/decoders/uart/pd.py @@ -75,7 +75,7 @@ class ChannelError(Exception): pass class Decoder(srd.Decoder): - api_version = 2 + api_version = 3 id = 'uart' name = 'UART' longname = 'Universal Asynchronous Receiver/Transmitter' @@ -170,8 +170,6 @@ class Decoder(srd.Decoder): self.stopbit1 = [-1, -1] self.startsample = [-1, -1] self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT'] - self.oldbit = [1, 1] - self.oldpins = [-1, -1] self.databits = [[], []] def start(self): @@ -186,8 +184,8 @@ class Decoder(srd.Decoder): # The width of one UART bit in number of samples. self.bit_width = float(self.samplerate) / float(self.options['baudrate']) - # Return true if we reached the middle of the desired bit, false otherwise. - def reached_bit(self, rxtx, bitnum): + def get_sample_point(self, rxtx, bitnum): + """Determine absolute sample number of a bit slot's sample point.""" # bitpos is the samplenumber which is in the middle of the # specified UART bit (0 = start bit, 1..x = data, x+1 = parity bit # (if used) or the first stop bit, and so on). @@ -195,14 +193,20 @@ class Decoder(srd.Decoder): # index of the middle sample within bit window is (bit_width - 1) / 2. bitpos = self.frame_start[rxtx] + (self.bit_width - 1) / 2.0 bitpos += bitnum * self.bit_width + return bitpos + + # Return true if we reached the middle of the desired bit, false otherwise. + def reached_bit(self, rxtx, bitnum): + bitpos = self.get_sample_point(rxtx, bitnum) if self.samplenum >= bitpos: return True return False - def wait_for_start_bit(self, rxtx, old_signal, signal): - # The start bit is always 0 (low). As the idle UART (and the stop bit) - # level is 1 (high), the beginning of a start bit is a falling edge. - if not (old_signal == 1 and signal == 0): + def wait_for_start_bit(self, rxtx, signal): + # The caller already has detected an edge. Strictly speaking this + # check on the current signal level is redundant. But it does not + # harm either. + if signal != 0: return # Save the sample number where the start bit begins. @@ -364,29 +368,56 @@ class Decoder(srd.Decoder): self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]]) self.putg([rxtx + 4, ['Stop bit', 'Stop', 'T']]) - def decode(self, ss, es, data): + def get_wait_cond(self, rxtx, inv): + """ + Determine Decoder.wait() condition for specified UART line. + + Returns condititions that are suitable for Decoder.wait(). Those + conditions either match the falling edge of the START bit, or + the sample point of the next bit time. + """ + + state = self.state[rxtx] + if state == 'WAIT FOR START BIT': + return {rxtx: 'r' if inv else 'f'} + if state == 'GET START BIT': + bitnum = 0 + elif state == 'GET DATA BITS': + bitnum = 1 + self.cur_data_bit[rxtx] + elif state == 'GET PARITY BIT': + bitnum = 1 + self.options['num_data_bits'] + elif state == 'GET STOP BITS': + bitnum = 1 + self.options['num_data_bits'] + bitnum += 0 if self.options['parity_type'] == 'none' else 1 + want_num = self.get_sample_point(rxtx, bitnum) + # want_num = int(want_num + 0.5) + want_num = ceil(want_num) + cond = {'skip': want_num - self.samplenum} + return cond + + def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') - for (self.samplenum, pins) in data: - # We want to skip identical samples for performance reasons but, - # for now, we can only do that when we are in the idle state - # (meaning both channels are waiting for the start bit). - if self.state == self.idle_state and self.oldpins == pins: - continue - - self.oldpins, (rx, tx) = pins, pins - - if self.options['invert_rx'] == 'yes': + has_pin = [self.has_channel(ch) for ch in (RX, TX)] + if has_pin == [False, False]: + raise ChannelError('Either TX or RX (or both) pins required.') + + opt = self.options + inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes'] + + while True: + conds = [] + if has_pin[RX]: + conds.append(self.get_wait_cond(RX, inv[RX])) + if has_pin[TX]: + conds.append(self.get_wait_cond(TX, inv[TX])) + (rx, tx) = self.wait(conds) + if inv[RX]: rx = not rx - if self.options['invert_tx'] == 'yes': + if inv[TX]: tx = not tx - # Either RX or TX (but not both) can be omitted. - has_pin = [rx in (0, 1), tx in (0, 1)] - if has_pin == [False, False]: - raise ChannelError('Either TX or RX (or both) pins required.') - # State machine. for rxtx in (RX, TX): # Don't try to handle RX (or TX) if not supplied. @@ -396,7 +427,7 @@ class Decoder(srd.Decoder): signal = rx if (rxtx == RX) else tx if self.state[rxtx] == 'WAIT FOR START BIT': - self.wait_for_start_bit(rxtx, self.oldbit[rxtx], signal) + self.wait_for_start_bit(rxtx, signal) elif self.state[rxtx] == 'GET START BIT': self.get_start_bit(rxtx, signal) elif self.state[rxtx] == 'GET DATA BITS': @@ -405,6 +436,3 @@ class Decoder(srd.Decoder): self.get_parity_bit(rxtx, signal) elif self.state[rxtx] == 'GET STOP BITS': self.get_stop_bits(rxtx, signal) - - # Save current RX/TX values for the next round. - self.oldbit[rxtx] = signal