From: Gerhard Sittig Date: Wed, 13 Apr 2022 19:10:00 +0000 (+0200) Subject: uart: handle zero stop bits configuration X-Git-Url: http://sigrok.org/gitweb/?p=libsigrokdecode.git;a=commitdiff_plain;h=42d4d65c3d34ae9bfa74c40fafd1ca657d05a91b uart: handle zero stop bits configuration Use common code to advance internal state during UART frame inspection. This reduces redundancy, and improves robustness. Data bits collectors need not worry about the optional presence of subsequent fields (parity, stop bits, both can be absent). Improve the separation of implementation details of the lower layer UART frame decoding from upper layer protocol handling. Concentrate the post processing of UART frames, BREAK and IDLE conditions in the source, and keep the ss/es determination at the caller which detected the condition by arbitrary means. This unbreaks the decoder's operation when 0 stop bits are configured. The implementation still assumes that the line goes idle between frames even when zero stop bits are configured. Strictly speaking this decoder now copes with traffic that uses "less than half a stop bit". --- diff --git a/decoders/uart/pd.py b/decoders/uart/pd.py index 0c501b0..a2e1f7f 100644 --- a/decoders/uart/pd.py +++ b/decoders/uart/pd.py @@ -252,7 +252,7 @@ class Decoder(srd.Decoder): self.frame_start[rxtx] = self.samplenum self.frame_valid[rxtx] = True - self.state[rxtx] = 'GET START BIT' + self.advance_state(rxtx, signal) def get_start_bit(self, rxtx, signal): self.startbit[rxtx] = signal @@ -266,7 +266,7 @@ class Decoder(srd.Decoder): es = self.samplenum + ceil(self.bit_width / 2.0) self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx, (self.datavalue[rxtx], self.frame_valid[rxtx])]) - self.state[rxtx] = 'WAIT FOR START BIT' + self.advance_state(rxtx, signal, fatal = True, idle = es) return self.cur_data_bit[rxtx] = 0 @@ -276,7 +276,7 @@ class Decoder(srd.Decoder): self.putp(['STARTBIT', rxtx, self.startbit[rxtx]]) self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']]) - self.state[rxtx] = 'GET DATA BITS' + self.advance_state(rxtx, signal) def handle_packet(self, rxtx): d = 'rx' if (rxtx == RX) else 'tx' @@ -339,11 +339,7 @@ class Decoder(srd.Decoder): self.databits[rxtx] = [] - # Advance to either reception of the parity bit, or reception of - # the STOP bits if parity is not applicable. - self.state[rxtx] = 'GET PARITY BIT' - if self.options['parity'] == 'none': - self.state[rxtx] = 'GET STOP BITS' + self.advance_state(rxtx, signal) def format_value(self, v): # Format value 'v' according to configured options. @@ -400,7 +396,7 @@ class Decoder(srd.Decoder): self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']]) self.frame_valid[rxtx] = False - self.state[rxtx] = 'GET STOP BITS' + self.advance_state(rxtx, signal) # TODO: Currently only supports 1 stop bit. def get_stop_bits(self, rxtx, signal): @@ -415,19 +411,76 @@ class Decoder(srd.Decoder): self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]]) self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']]) + # Postprocess the UART frame + self.advance_state(rxtx, signal) + + def advance_state(self, rxtx, signal = None, fatal = False, idle = None): + # Advances the protocol decoder's internal state for all regular + # UART frame inspection. Deals with either edges, sample points, + # or other .wait() conditions. Also gracefully handles extreme + # undersampling. Each turn takes one .wait() call which in turn + # corresponds to at least one sample. That is why as many state + # transitions are done here as required within a single call. + frame_end = self.frame_start[rxtx] + self.frame_len_sample_count + if idle is not None: + # When requested by the caller, start another (potential) + # IDLE period after the caller specified position. + self.idle_start[rxtx] = idle + if fatal: + # When requested by the caller, don't advance to the next + # UART frame's field, but to the start of the next START bit + # instead. + self.state[rxtx] = 'WAIT FOR START BIT' + return + # Advance to the next UART frame's field that we expect. Cope + # with absence of optional fields. Force scan for next IDLE + # after the (optional) STOP bit field, so that callers need + # not deal with optional field presence. Also handles the cases + # where the decoder navigates to edges which are not strictly + # a field's sampling point. + if self.state[rxtx] == 'WAIT FOR START BIT': + self.state[rxtx] = 'GET START BIT' + return + if self.state[rxtx] == 'GET START BIT': + self.state[rxtx] = 'GET DATA BITS' + return + if self.state[rxtx] == 'GET DATA BITS': + self.state[rxtx] = 'GET PARITY BIT' + if self.options['parity'] != 'none': + return + # FALLTHROUGH + if self.state[rxtx] == 'GET PARITY BIT': + self.state[rxtx] = 'GET STOP BITS' + if self.options['stop_bits']: + return + # FALLTHROUGH + if self.state[rxtx] == 'GET STOP BITS': + # Postprocess the previously received UART frame. Advance + # the read position to after the frame's last bit time. So + # that the start of the next START bit won't fall into the + # end of the previously received UART frame. This improves + # robustness in the presence of glitchy input data. + ss = self.frame_start[rxtx] + es = self.samplenum + ceil(self.bit_width / 2.0) + self.handle_frame(rxtx, ss, es) + self.state[rxtx] = 'WAIT FOR START BIT' + self.idle_start[rxtx] = frame_end + return + # Unhandled state, actually a programming error. Emit diagnostics? + self.state[rxtx] = 'WAIT FOR START BIT' + + def handle_frame(self, rxtx, ss, es): # Pass the complete UART frame to upper layers. - es = self.samplenum + ceil(self.bit_width / 2.0) - self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx, + self.putpse(ss, es, ['FRAME', rxtx, (self.datavalue[rxtx], self.frame_valid[rxtx])]) - self.state[rxtx] = 'WAIT FOR START BIT' - self.idle_start[rxtx] = self.frame_start[rxtx] + self.frame_len_sample_count + def handle_idle(self, rxtx, ss, es): + self.putpse(ss, es, ['IDLE', rxtx, 0]) - def handle_break(self, rxtx): - self.putpse(self.frame_start[rxtx], self.samplenum, - ['BREAK', rxtx, 0]) - self.putgse(self.frame_start[rxtx], self.samplenum, - [Ann.RX_BREAK + rxtx, ['Break condition', 'Break', 'Brk', 'B']]) + def handle_break(self, rxtx, ss, es): + self.putpse(ss, es, ['BREAK', rxtx, 0]) + self.putgse(ss, es, [Ann.RX_BREAK + rxtx, + ['Break condition', 'Break', 'Brk', 'B']]) self.state[rxtx] = 'WAIT FOR START BIT' def get_wait_cond(self, rxtx, inv): @@ -490,7 +543,8 @@ class Decoder(srd.Decoder): return diff = self.samplenum - self.break_start[rxtx] if diff >= self.break_min_sample_count: - self.handle_break(rxtx) + ss, es = self.frame_start[rxtx], self.samplenum + self.handle_break(rxtx, ss, es) self.break_start[rxtx] = None def inspect_idle(self, rxtx, signal, inv): @@ -509,8 +563,8 @@ class Decoder(srd.Decoder): if diff < self.frame_len_sample_count: return ss, es = self.idle_start[rxtx], self.samplenum - self.putpse(ss, es, ['IDLE', rxtx, 0]) - self.idle_start[rxtx] = self.samplenum + self.handle_idle(rxtx, ss, es) + self.idle_start[rxtx] = es def decode(self): if not self.samplerate: