]> sigrok.org Git - libsigrokdecode.git/blobdiff - decoders/ieee488/pd.py
uart: handle zero stop bits configuration
[libsigrokdecode.git] / decoders / ieee488 / pd.py
index b4efac03b565a35b3691e02ff88b9b7ee86077a8..b0948a685b40bdf08999a98179c57be3264208b8 100644 (file)
@@ -69,6 +69,8 @@ GPIB level byte fields (commands, addresses, pieces of data):
    when addressing channels within the device.
  - 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
    is the raw data byte (transport layer, ATN inactive).
+ - 'PPOLL': <addr> is not applicable, <pdata> is a list of bit indices
+   (DIO1 to DIO8 order) which responded to the PP request.
 
 Extracted payload information (peers and their communicated data):
  - 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
@@ -239,11 +241,12 @@ PIN_DATA = PIN_DIO1
     ANN_RAW_BIT, ANN_RAW_BYTE,
     ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,
     ANN_EOI,
+    ANN_PP,
     ANN_TEXT,
     # TODO Want to provide one annotation class per talker address (0-30)?
     ANN_IEC_PERIPH,
     ANN_WARN,
-) = range(11)
+) = range(12)
 
 (
     BIN_RAW,
@@ -284,7 +287,11 @@ class Decoder(srd.Decoder):
         {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'},
     )
     options = (
-        {'id': 'iec_periph', 'desc': 'Decode Commodore IEC bus peripherals details',
+        {'id': 'iec_periph', 'desc': 'Decode Commodore IEC peripherals',
+            'default': 'no', 'values': ('no', 'yes')},
+        {'id': 'delim', 'desc': 'Payload data delimiter',
+            'default': 'eol', 'values': ('none', 'eol')},
+        {'id': 'atn_parity', 'desc': 'ATN commands use parity',
             'default': 'no', 'values': ('no', 'yes')},
     )
     annotations = (
@@ -296,18 +303,20 @@ class Decoder(srd.Decoder):
         ('saddr', 'Secondary address'),
         ('data', 'Data byte'),
         ('eoi', 'EOI'),
+        ('pp', 'Parallel poll'),
         ('text', 'Talker text'),
         ('periph', 'IEC bus peripherals'),
-        ('warn', 'Warning'),
+        ('warning', 'Warning'),
     )
     annotation_rows = (
         ('bits', 'IEC bits', (ANN_RAW_BIT,)),
         ('raws', 'Raw bytes', (ANN_RAW_BYTE,)),
         ('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)),
         ('eois', 'EOI', (ANN_EOI,)),
+        ('polls', 'Polls', (ANN_PP,)),
         ('texts', 'Talker texts', (ANN_TEXT,)),
         ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)),
-        ('warns', 'Warnings', (ANN_WARN,)),
+        ('warnings', 'Warnings', (ANN_WARN,)),
     )
     binary = (
         ('raw', 'Raw bytes'),
@@ -331,6 +340,7 @@ class Decoder(srd.Decoder):
         self.es_eoi = None
         self.ss_text = None
         self.es_text = None
+        self.ss_pp = None
         self.last_talker = None
         self.last_listener = []
         self.last_iec_addr = None
@@ -374,12 +384,95 @@ class Decoder(srd.Decoder):
             self.accu_text = []
         self.ss_text = self.es_text = None
 
+    def check_extra_flush(self, b):
+        # Optionally flush previously accumulated runs of payload data
+        # according to user specified conditions.
+        if self.options['delim'] == 'none':
+            return
+        if not self.accu_bytes:
+            return
+
+        # This implementation exlusively handles "text lines", but adding
+        # support for more variants here is straight forward.
+        #
+        # Search for the first data byte _after_ a user specified text
+        # line termination sequence was seen. The termination sequence's
+        # alphabet may be variable, and the sequence may span multiple
+        # data bytes. We accept either CR or LF, and combine the CR+LF
+        # sequence to strive for maximum length annotations for improved
+        # readability at different zoom levels. It's acceptable that this
+        # implementation would also combine multiple line terminations
+        # like LF+LF.
+        term_chars = (10, 13)
+        is_eol = b in term_chars
+        had_eol = self.accu_bytes[-1] in term_chars
+        if had_eol and not is_eol:
+            self.flush_bytes_text_accu()
+
+    def check_pp(self, dio = None):
+        # The combination of ATN and EOI means PP (parallel poll). Track
+        # this condition's start and end, and keep grabing the DIO lines'
+        # state as long as the condition is seen, since DAV is not used
+        # in the PP communication.
+        capture_in_pp = self.curr_atn and self.curr_eoi
+        decoder_in_pp = self.ss_pp is not None
+        if capture_in_pp and not decoder_in_pp:
+            # Phase starts. Track its ss. Start collecting DIO state.
+            self.ss_pp = self.samplenum
+            self.dio_pp = []
+            return 'enter'
+        if not capture_in_pp and decoder_in_pp:
+            # Phase ends. Void its ss. Process collected DIO state.
+            ss, es = self.ss_pp, self.samplenum
+            dio = self.dio_pp or []
+            self.ss_pp, self.dio_pp = None, None
+            if ss == es:
+                # False positive, caused by low oversampling.
+                return 'leave'
+            # Emit its annotation. Translate bit indices 0..7 for the
+            # DIO1..DIO8 signals to display text. Pass bit indices in
+            # the Python output for upper layers.
+            #
+            # TODO The presentation of this information may need more
+            # adjustment. The bit positions need not translate to known
+            # device addresses. Bits need not even belong to a single
+            # device. Participants and their location in the DIO pattern
+            # is configurable. Leave the interpretation to upper layers.
+            bits = [i for i, b in enumerate(dio) if b]
+            bits_text = ' '.join(['{}'.format(i + 1) for i in bits])
+            dios = ['DIO{}'.format(i + 1) for i in bits]
+            dios_text = ' '.join(dios or ['-'])
+            text = [
+                'PPOLL {}'.format(dios_text),
+                'PP {}'.format(bits_text),
+                'PP',
+            ]
+            self.emit_data_ann(ss, es, ANN_PP, text)
+            self.putpy(ss, es, 'PPOLL', None, bits)
+            # Cease collecting DIO state.
+            return 'leave'
+        if decoder_in_pp:
+            # Keep collecting DIO state for each individual sample in
+            # the PP phase. Logically OR all DIO values that were seen.
+            # This increases robustness for low oversampling captures,
+            # where DIO may no longer be asserted when ATN/EOI deassert,
+            # and DIO was not asserted yet when ATN/EOI start asserting.
+            if dio is None:
+                dio = []
+            if len(dio) > len(self.dio_pp):
+                self.dio_pp.extend([ 0, ] * (len(dio) - len(self.dio_pp)))
+            for i, b in enumerate(dio):
+                self.dio_pp[i] |= b
+            return 'keep'
+        return 'idle'
+
     def handle_ifc_change(self, ifc):
         # Track IFC line for parallel input.
         # Assertion of IFC de-selects all talkers and listeners.
         if ifc:
             self.last_talker = None
             self.last_listener = []
+            self.flush_bytes_text_accu()
 
     def handle_eoi_change(self, eoi):
         # Track EOI line for parallel and serial input.
@@ -446,6 +539,8 @@ class Decoder(srd.Decoder):
             # TODO Process data depending on peripheral type and channel?
 
     def handle_data_byte(self):
+        if not self.curr_atn:
+            self.check_extra_flush(self.curr_raw)
         b = self.curr_raw
         texts = _get_raw_text(b, self.curr_atn)
         self.emit_data_ann(self.ss_raw, self.es_raw, ANN_RAW_BYTE, texts)
@@ -456,6 +551,13 @@ class Decoder(srd.Decoder):
             upd_iec = False,
             py_type = None
             py_peers = False
+            if self.options['atn_parity'] == 'yes':
+                par = 1 if b & 0x80 else 0
+                b &= ~0x80
+                ones = bin(b).count('1') + par
+                if ones % 2:
+                    warn_texts = ['Command parity error', 'parity', 'PAR']
+                    self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
             is_cmd, is_unl, is_unt = _is_command(b)
             laddr = _is_listen_addr(b)
             taddr = _is_talk_addr(b)
@@ -653,6 +755,11 @@ class Decoder(srd.Decoder):
         # low signal levels, i.e. won't include the initial falling edge.
         # Scan for ATN/EOI edges as well (including the trick which works
         # around initial pin state).
+        #
+        # Use efficient edge based wait conditions for most activities,
+        # though some phases may require individual inspection of each
+        # sample (think parallel poll in combination with slow sampling).
+        #
         # Map low-active physical transport lines to positive logic here,
         # to simplify logical inspection/decoding of communicated data,
         # and to avoid redundancy and inconsistency in later code paths.
@@ -669,6 +776,14 @@ class Decoder(srd.Decoder):
         if has_ifc:
             idx_ifc = len(waitcond)
             waitcond.append({PIN_IFC: 'l'})
+        idx_pp_check = None
+        def add_data_cond(conds):
+            idx = len(conds)
+            conds.append({'skip': 1})
+            return idx
+        def del_data_cond(conds, idx):
+            conds.pop(idx)
+            return None
         while True:
             pins = self.wait(waitcond)
             pins = self.invert_pins(pins)
@@ -677,18 +792,34 @@ class Decoder(srd.Decoder):
             # captures, many edges fall onto the same sample number. So
             # we process active edges of flags early (before processing
             # data bits), and inactive edges late (after data got processed).
+            want_pp_check = False
             if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1:
                 self.handle_ifc_change(pins[PIN_IFC])
             if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1:
                 self.handle_eoi_change(pins[PIN_EOI])
+                want_pp_check = True
             if self.matched[idx_atn] and pins[PIN_ATN] == 1:
                 self.handle_atn_change(pins[PIN_ATN])
+                want_pp_check = True
+            if want_pp_check and not idx_pp_check:
+                pp = self.check_pp()
+                if pp in ('enter',):
+                    idx_pp_check = add_data_cond(waitcond)
             if self.matched[idx_dav]:
                 self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1])
+            if idx_pp_check:
+                pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
+            want_pp_check = False
             if self.matched[idx_atn] and pins[PIN_ATN] == 0:
                 self.handle_atn_change(pins[PIN_ATN])
+                want_pp_check = True
             if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0:
                 self.handle_eoi_change(pins[PIN_EOI])
+                want_pp_check = True
+            if idx_pp_check is not None and want_pp_check:
+                pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
+                if pp in ('leave',) and idx_pp_check is not None:
+                    idx_pp_check = del_data_cond(waitcond, idx_pp_check)
             if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0:
                 self.handle_ifc_change(pins[PIN_IFC])