- def decode(self, ss, es, data):
- if self.samplerate is None:
- raise Exception("Cannot decode without samplerate.")
- for (self.samplenum, pins) in data:
-
- # Note: Ignoring identical samples here for performance reasons
- # is not possible for this PD, at least not in the current state.
- # if self.oldpins == pins:
- # continue
- self.oldpins, (rx, tx) = pins, pins
-
- # Either RX or TX (but not both) can be omitted.
- has_pin = [rx in (0, 1), tx in (0, 1)]
- if has_pin == [False, False]:
- raise Exception('Either TX or RX (or both) pins required.')
-
- # State machine.
- for rxtx in (RX, TX):
- # Don't try to handle RX (or TX) if not supplied.
- if not has_pin[rxtx]:
- continue
-
- signal = rx if (rxtx == RX) else tx
-
- if self.state[rxtx] == 'WAIT FOR START BIT':
- self.wait_for_start_bit(rxtx, self.oldbit[rxtx], signal)
- elif self.state[rxtx] == 'GET START BIT':
- self.get_start_bit(rxtx, signal)
- elif self.state[rxtx] == 'GET DATA BITS':
- self.get_data_bits(rxtx, signal)
- elif self.state[rxtx] == 'GET PARITY BIT':
- self.get_parity_bit(rxtx, signal)
- elif self.state[rxtx] == 'GET STOP BITS':
- self.get_stop_bits(rxtx, signal)
- else:
- raise Exception('Invalid state: %s' % self.state[rxtx])
-
- # Save current RX/TX values for the next round.
- self.oldbit[rxtx] = signal
-
+ def get_wait_cond(self, rxtx, inv):
+ """
+ Determine Decoder.wait() condition for specified UART line.
+
+ Returns condititions that are suitable for Decoder.wait(). Those
+ conditions either match the falling edge of the START bit, or
+ the sample point of the next bit time.
+ """
+
+ state = self.state[rxtx]
+ if state == 'WAIT FOR START BIT':
+ return {rxtx: 'r' if inv else 'f'}
+ if state == 'GET START BIT':
+ bitnum = 0
+ elif state == 'GET DATA BITS':
+ bitnum = 1 + self.cur_data_bit[rxtx]
+ elif state == 'GET PARITY BIT':
+ bitnum = 1 + self.options['num_data_bits']
+ elif state == 'GET STOP BITS':
+ bitnum = 1 + self.options['num_data_bits']
+ bitnum += 0 if self.options['parity_type'] == 'none' else 1
+ want_num = self.get_sample_point(rxtx, bitnum)
+ # want_num = int(want_num + 0.5)
+ want_num = ceil(want_num)
+ cond = {'skip': want_num - self.samplenum}
+ return cond
+
+ def inspect_sample(self, rxtx, signal, inv):
+ """Inspect a sample returned by .wait() for the specified UART line."""
+
+ if inv:
+ signal = not signal
+
+ state = self.state[rxtx]
+ if state == 'WAIT FOR START BIT':
+ self.wait_for_start_bit(rxtx, signal)
+ elif state == 'GET START BIT':
+ self.get_start_bit(rxtx, signal)
+ elif state == 'GET DATA BITS':
+ self.get_data_bits(rxtx, signal)
+ elif state == 'GET PARITY BIT':
+ self.get_parity_bit(rxtx, signal)
+ elif state == 'GET STOP BITS':
+ self.get_stop_bits(rxtx, signal)
+
+ def decode(self):
+ if not self.samplerate:
+ raise SamplerateError('Cannot decode without samplerate.')
+
+ has_pin = [self.has_channel(ch) for ch in (RX, TX)]
+ if has_pin == [False, False]:
+ raise ChannelError('Either TX or RX (or both) pins required.')
+
+ opt = self.options
+ inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
+ cond_idx = [None] * len(has_pin)
+
+ while True:
+ conds = []
+ if has_pin[RX]:
+ cond_idx[RX] = len(conds)
+ conds.append(self.get_wait_cond(RX, inv[RX]))
+ if has_pin[TX]:
+ cond_idx[TX] = len(conds)
+ conds.append(self.get_wait_cond(TX, inv[TX]))
+ (rx, tx) = self.wait(conds)
+ if cond_idx[RX] is not None and self.matched[cond_idx[RX]]:
+ self.inspect_sample(RX, rx, inv[RX])
+ if cond_idx[TX] is not None and self.matched[cond_idx[TX]]:
+ self.inspect_sample(TX, tx, inv[TX])