]> sigrok.org Git - libsigrokdecode.git/commitdiff
uart: handle zero stop bits configuration
authorGerhard Sittig <redacted>
Wed, 13 Apr 2022 19:10:00 +0000 (21:10 +0200)
committerGerhard Sittig <redacted>
Sat, 23 Apr 2022 17:50:35 +0000 (19:50 +0200)
Use common code to advance internal state during UART frame inspection.
This reduces redundancy, and improves robustness. Data bits collectors
need not worry about the optional presence of subsequent fields (parity,
stop bits, both can be absent).

Improve the separation of implementation details of the lower layer UART
frame decoding from upper layer protocol handling. Concentrate the post
processing of UART frames, BREAK and IDLE conditions in the source, and
keep the ss/es determination at the caller which detected the condition
by arbitrary means.

This unbreaks the decoder's operation when 0 stop bits are configured.
The implementation still assumes that the line goes idle between frames
even when zero stop bits are configured. Strictly speaking this decoder
now copes with traffic that uses "less than half a stop bit".

decoders/uart/pd.py

index 0c501b0dc46d63c5321c60472b750def28525c9d..a2e1f7f0ea608c77a183b12c30031a7fce4d4d7c 100644 (file)
@@ -252,7 +252,7 @@ class Decoder(srd.Decoder):
         self.frame_start[rxtx] = self.samplenum
         self.frame_valid[rxtx] = True
 
-        self.state[rxtx] = 'GET START BIT'
+        self.advance_state(rxtx, signal)
 
     def get_start_bit(self, rxtx, signal):
         self.startbit[rxtx] = signal
@@ -266,7 +266,7 @@ class Decoder(srd.Decoder):
             es = self.samplenum + ceil(self.bit_width / 2.0)
             self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,
                 (self.datavalue[rxtx], self.frame_valid[rxtx])])
-            self.state[rxtx] = 'WAIT FOR START BIT'
+            self.advance_state(rxtx, signal, fatal = True, idle = es)
             return
 
         self.cur_data_bit[rxtx] = 0
@@ -276,7 +276,7 @@ class Decoder(srd.Decoder):
         self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
         self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])
 
-        self.state[rxtx] = 'GET DATA BITS'
+        self.advance_state(rxtx, signal)
 
     def handle_packet(self, rxtx):
         d = 'rx' if (rxtx == RX) else 'tx'
@@ -339,11 +339,7 @@ class Decoder(srd.Decoder):
 
         self.databits[rxtx] = []
 
-        # Advance to either reception of the parity bit, or reception of
-        # the STOP bits if parity is not applicable.
-        self.state[rxtx] = 'GET PARITY BIT'
-        if self.options['parity'] == 'none':
-            self.state[rxtx] = 'GET STOP BITS'
+        self.advance_state(rxtx, signal)
 
     def format_value(self, v):
         # Format value 'v' according to configured options.
@@ -400,7 +396,7 @@ class Decoder(srd.Decoder):
             self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])
             self.frame_valid[rxtx] = False
 
-        self.state[rxtx] = 'GET STOP BITS'
+        self.advance_state(rxtx, signal)
 
     # TODO: Currently only supports 1 stop bit.
     def get_stop_bits(self, rxtx, signal):
@@ -415,19 +411,76 @@ class Decoder(srd.Decoder):
         self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]])
         self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])
 
+        # Postprocess the UART frame
+        self.advance_state(rxtx, signal)
+
+    def advance_state(self, rxtx, signal = None, fatal = False, idle = None):
+        # Advances the protocol decoder's internal state for all regular
+        # UART frame inspection. Deals with either edges, sample points,
+        # or other .wait() conditions. Also gracefully handles extreme
+        # undersampling. Each turn takes one .wait() call which in turn
+        # corresponds to at least one sample. That is why as many state
+        # transitions are done here as required within a single call.
+        frame_end = self.frame_start[rxtx] + self.frame_len_sample_count
+        if idle is not None:
+            # When requested by the caller, start another (potential)
+            # IDLE period after the caller specified position.
+            self.idle_start[rxtx] = idle
+        if fatal:
+            # When requested by the caller, don't advance to the next
+            # UART frame's field, but to the start of the next START bit
+            # instead.
+            self.state[rxtx] = 'WAIT FOR START BIT'
+            return
+        # Advance to the next UART frame's field that we expect. Cope
+        # with absence of optional fields. Force scan for next IDLE
+        # after the (optional) STOP bit field, so that callers need
+        # not deal with optional field presence. Also handles the cases
+        # where the decoder navigates to edges which are not strictly
+        # a field's sampling point.
+        if self.state[rxtx] == 'WAIT FOR START BIT':
+            self.state[rxtx] = 'GET START BIT'
+            return
+        if self.state[rxtx] == 'GET START BIT':
+            self.state[rxtx] = 'GET DATA BITS'
+            return
+        if self.state[rxtx] == 'GET DATA BITS':
+            self.state[rxtx] = 'GET PARITY BIT'
+            if self.options['parity'] != 'none':
+                return
+            # FALLTHROUGH
+        if self.state[rxtx] == 'GET PARITY BIT':
+            self.state[rxtx] = 'GET STOP BITS'
+            if self.options['stop_bits']:
+                return
+            # FALLTHROUGH
+        if self.state[rxtx] == 'GET STOP BITS':
+            # Postprocess the previously received UART frame. Advance
+            # the read position to after the frame's last bit time. So
+            # that the start of the next START bit won't fall into the
+            # end of the previously received UART frame. This improves
+            # robustness in the presence of glitchy input data.
+            ss = self.frame_start[rxtx]
+            es = self.samplenum + ceil(self.bit_width / 2.0)
+            self.handle_frame(rxtx, ss, es)
+            self.state[rxtx] = 'WAIT FOR START BIT'
+            self.idle_start[rxtx] = frame_end
+            return
+        # Unhandled state, actually a programming error. Emit diagnostics?
+        self.state[rxtx] = 'WAIT FOR START BIT'
+
+    def handle_frame(self, rxtx, ss, es):
         # Pass the complete UART frame to upper layers.
-        es = self.samplenum + ceil(self.bit_width / 2.0)
-        self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,
+        self.putpse(ss, es, ['FRAME', rxtx,
             (self.datavalue[rxtx], self.frame_valid[rxtx])])
 
-        self.state[rxtx] = 'WAIT FOR START BIT'
-        self.idle_start[rxtx] = self.frame_start[rxtx] + self.frame_len_sample_count
+    def handle_idle(self, rxtx, ss, es):
+        self.putpse(ss, es, ['IDLE', rxtx, 0])
 
-    def handle_break(self, rxtx):
-        self.putpse(self.frame_start[rxtx], self.samplenum,
-                ['BREAK', rxtx, 0])
-        self.putgse(self.frame_start[rxtx], self.samplenum,
-                [Ann.RX_BREAK + rxtx, ['Break condition', 'Break', 'Brk', 'B']])
+    def handle_break(self, rxtx, ss, es):
+        self.putpse(ss, es, ['BREAK', rxtx, 0])
+        self.putgse(ss, es, [Ann.RX_BREAK + rxtx,
+                ['Break condition', 'Break', 'Brk', 'B']])
         self.state[rxtx] = 'WAIT FOR START BIT'
 
     def get_wait_cond(self, rxtx, inv):
@@ -490,7 +543,8 @@ class Decoder(srd.Decoder):
             return
         diff = self.samplenum - self.break_start[rxtx]
         if diff >= self.break_min_sample_count:
-            self.handle_break(rxtx)
+            ss, es = self.frame_start[rxtx], self.samplenum
+            self.handle_break(rxtx, ss, es)
         self.break_start[rxtx] = None
 
     def inspect_idle(self, rxtx, signal, inv):
@@ -509,8 +563,8 @@ class Decoder(srd.Decoder):
         if diff < self.frame_len_sample_count:
             return
         ss, es = self.idle_start[rxtx], self.samplenum
-        self.putpse(ss, es, ['IDLE', rxtx, 0])
-        self.idle_start[rxtx] = self.samplenum
+        self.handle_idle(rxtx, ss, es)
+        self.idle_start[rxtx] = es
 
     def decode(self):
         if not self.samplerate: