uart: add support for break condition detection
authorGerhard Sittig <gerhard.sittig@gmx.net>
Sun, 14 Oct 2018 18:17:35 +0000 (20:17 +0200)
committerUwe Hermann <uwe@hermann-uwe.de>
Tue, 16 Oct 2018 19:49:00 +0000 (21:49 +0200)
There are the "traffic inspecting" wait() conditions, which check an
edge to find the start of START, then wait for sample points to grab the
bit values. Bit times are sampled in their respective center, potential
glitches around sample points get ignored.

Add another independent set of wait() conditions which check _all_ edges
regardless of any data communication. This results in the most reliable
and maintainable detection of break conditions, regardless of how they
align to data frames. Break is defined as a period of low input signal
which spans at least one frame's length. Run the edge inspection after
data inspection, which results in the most appropriate annotation output
like leading data bits (of incomplete frames), frame errors (violated
STOP bit expectations), then break conditions. This approach is most
robust in the presence of incomplete input streams.

decoders/uart/pd.py

index 6c3d85cf9b06c9f30db1d59c33aeab85b9ecb631..ffacad9acfb6c83c65058b59a4cbda159d6a2b01 100644 (file)
@@ -124,14 +124,18 @@ class Decoder(srd.Decoder):
         ('tx-warnings', 'TX warnings'),
         ('rx-data-bits', 'RX data bits'),
         ('tx-data-bits', 'TX data bits'),
+        ('rx-break', 'RX break'),
+        ('tx-break', 'TX break'),
     )
     annotation_rows = (
         ('rx-data', 'RX', (0, 2, 4, 6, 8)),
         ('rx-data-bits', 'RX bits', (12,)),
         ('rx-warnings', 'RX warnings', (10,)),
+        ('rx-break', 'RX break', (14,)),
         ('tx-data', 'TX', (1, 3, 5, 7, 9)),
         ('tx-data-bits', 'TX bits', (13,)),
         ('tx-warnings', 'TX warnings', (11,)),
+        ('tx-break', 'TX break', (15,)),
     )
     binary = (
         ('rx', 'RX dump'),
@@ -156,6 +160,12 @@ class Decoder(srd.Decoder):
         s, halfbit = self.samplenum, self.bit_width / 2.0
         self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)
 
+    def putgse(self, ss, es, data):
+        self.put(ss, es, self.out_ann, data)
+
+    def putpse(self, ss, es, data):
+        self.put(ss, es, self.out_python, data)
+
     def putbin(self, rxtx, data):
         s, halfbit = self.startsample[rxtx], self.bit_width / 2.0
         self.put(s - floor(halfbit), self.samplenum + ceil(halfbit), self.out_binary, data)
@@ -175,6 +185,7 @@ class Decoder(srd.Decoder):
         self.startsample = [-1, -1]
         self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
         self.databits = [[], []]
+        self.break_start = [None, None]
 
     def start(self):
         self.out_python = self.register(srd.OUTPUT_PYTHON)
@@ -337,6 +348,13 @@ class Decoder(srd.Decoder):
 
         self.state[rxtx] = 'WAIT FOR START BIT'
 
+    def handle_break(self, rxtx):
+        self.putpse(self.frame_start[rxtx], self.samplenum,
+                ['BREAK', rxtx, 0])
+        self.putgse(self.frame_start[rxtx], self.samplenum,
+                [rxtx + 14, ['Break condition', 'Break', 'Brk', 'B']])
+        self.state[rxtx] = 'WAIT FOR START BIT'
+
     def get_wait_cond(self, rxtx, inv):
         # Return condititions that are suitable for Decoder.wait(). Those
         # conditions either match the falling edge of the START bit, or
@@ -373,6 +391,22 @@ class Decoder(srd.Decoder):
         elif state == 'GET STOP BITS':
             self.get_stop_bits(rxtx, signal)
 
+    def inspect_edge(self, rxtx, signal, inv):
+        # Inspect edges, independently from traffic, to detect break conditions.
+        if inv:
+            signal = not signal
+        if not signal:
+            # Signal went low. Start another interval.
+            self.break_start[rxtx] = self.samplenum
+            return
+        # Signal went high. Was there an extended period with low signal?
+        if self.break_start[rxtx] is None:
+            return
+        diff = self.samplenum - self.break_start[rxtx]
+        if diff >= self.break_min_sample_count:
+            self.handle_break(rxtx)
+        self.break_start[rxtx] = None
+
     def decode(self):
         if not self.samplerate:
             raise SamplerateError('Cannot decode without samplerate.')
@@ -383,18 +417,36 @@ class Decoder(srd.Decoder):
 
         opt = self.options
         inv = [opt['invert_rx'] == 'yes', opt['invert_tx'] == 'yes']
-        cond_idx = [None] * len(has_pin)
+        cond_data_idx = [None] * len(has_pin)
+
+        # Determine the number of samples for a complete frame's time span.
+        # A period of low signal (at least) that long is a break condition.
+        frame_samples = 1 # START
+        frame_samples += self.options['num_data_bits']
+        frame_samples += 0 if self.options['parity_type'] == 'none' else 1
+        frame_samples += self.options['num_stop_bits']
+        frame_samples *= self.bit_width
+        self.break_min_sample_count = ceil(frame_samples)
+        cond_edge_idx = [None] * len(has_pin)
 
         while True:
             conds = []
             if has_pin[RX]:
-                cond_idx[RX] = len(conds)
+                cond_data_idx[RX] = len(conds)
                 conds.append(self.get_wait_cond(RX, inv[RX]))
+                cond_edge_idx[RX] = len(conds)
+                conds.append({RX: 'e'})
             if has_pin[TX]:
-                cond_idx[TX] = len(conds)
+                cond_data_idx[TX] = len(conds)
                 conds.append(self.get_wait_cond(TX, inv[TX]))
+                cond_edge_idx[TX] = len(conds)
+                conds.append({TX: 'e'})
             (rx, tx) = self.wait(conds)
-            if cond_idx[RX] is not None and self.matched[cond_idx[RX]]:
+            if cond_data_idx[RX] is not None and self.matched[cond_data_idx[RX]]:
                 self.inspect_sample(RX, rx, inv[RX])
-            if cond_idx[TX] is not None and self.matched[cond_idx[TX]]:
+            if cond_edge_idx[RX] is not None and self.matched[cond_edge_idx[RX]]:
+                self.inspect_edge(RX, rx, inv[RX])
+            if cond_data_idx[TX] is not None and self.matched[cond_data_idx[TX]]:
                 self.inspect_sample(TX, tx, inv[TX])
+            if cond_edge_idx[TX] is not None and self.matched[cond_edge_idx[TX]]:
+                self.inspect_edge(TX, tx, inv[TX])