X-Git-Url: https://sigrok.org/gitweb/?a=blobdiff_plain;f=decoders%2Fieee488%2Fpd.py;h=b0948a685b40bdf08999a98179c57be3264208b8;hb=adb8233a0bf30b1d9ee9176e1caa5dc8ae1830dd;hp=5c25c668b50c38e18d1501e3e9dcb1b00c1b3e04;hpb=92ee3b2841960d3ac8ca736943cf23d73a41874e;p=libsigrokdecode.git diff --git a/decoders/ieee488/pd.py b/decoders/ieee488/pd.py index 5c25c66..b0948a6 100644 --- a/decoders/ieee488/pd.py +++ b/decoders/ieee488/pd.py @@ -69,6 +69,8 @@ GPIB level byte fields (commands, addresses, pieces of data): when addressing channels within the device. - 'DATA_BYTE': is the talker address (when available), is the raw data byte (transport layer, ATN inactive). + - 'PPOLL': is not applicable, is a list of bit indices + (DIO1 to DIO8 order) which responded to the PP request. Extracted payload information (peers and their communicated data): - 'TALK_LISTEN': is the current talker, is the list of @@ -145,10 +147,7 @@ def _is_msb_set(b): def _get_raw_byte(b, atn): # "Decorate" raw byte values for stacked decoders. - raw_byte = b - if atn: - raw_byte |= 0x100 - return raw_byte + return b | 0x100 if atn else b def _get_raw_text(b, atn): return ['{leader}{data:02x}'.format(leader = '/' if atn else '', data = b)] @@ -242,11 +241,12 @@ PIN_DATA = PIN_DIO1 ANN_RAW_BIT, ANN_RAW_BYTE, ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA, ANN_EOI, + ANN_PP, ANN_TEXT, # TODO Want to provide one annotation class per talker address (0-30)? ANN_IEC_PERIPH, ANN_WARN, -) = range(11) +) = range(12) ( BIN_RAW, @@ -258,7 +258,7 @@ class Decoder(srd.Decoder): api_version = 3 id = 'ieee488' name = 'IEEE-488' - longname = 'General Purpose Interface Bus' + longname = 'IEEE-488 GPIB/HPIB/IEC' desc = 'IEEE-488 General Purpose Interface Bus (GPIB/HPIB or IEC).' license = 'gplv2+' inputs = ['logic'] @@ -287,7 +287,11 @@ class Decoder(srd.Decoder): {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'}, ) options = ( - {'id': 'iec_periph', 'desc': 'Decode Commodore IEC bus peripherals details', + {'id': 'iec_periph', 'desc': 'Decode Commodore IEC peripherals', + 'default': 'no', 'values': ('no', 'yes')}, + {'id': 'delim', 'desc': 'Payload data delimiter', + 'default': 'eol', 'values': ('none', 'eol')}, + {'id': 'atn_parity', 'desc': 'ATN commands use parity', 'default': 'no', 'values': ('no', 'yes')}, ) annotations = ( @@ -299,18 +303,20 @@ class Decoder(srd.Decoder): ('saddr', 'Secondary address'), ('data', 'Data byte'), ('eoi', 'EOI'), + ('pp', 'Parallel poll'), ('text', 'Talker text'), ('periph', 'IEC bus peripherals'), - ('warn', 'Warning'), + ('warning', 'Warning'), ) annotation_rows = ( ('bits', 'IEC bits', (ANN_RAW_BIT,)), ('raws', 'Raw bytes', (ANN_RAW_BYTE,)), ('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)), ('eois', 'EOI', (ANN_EOI,)), + ('polls', 'Polls', (ANN_PP,)), ('texts', 'Talker texts', (ANN_TEXT,)), ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)), - ('warns', 'Warnings', (ANN_WARN,)), + ('warnings', 'Warnings', (ANN_WARN,)), ) binary = ( ('raw', 'Raw bytes'), @@ -334,6 +340,7 @@ class Decoder(srd.Decoder): self.es_eoi = None self.ss_text = None self.es_text = None + self.ss_pp = None self.last_talker = None self.last_listener = [] self.last_iec_addr = None @@ -377,12 +384,95 @@ class Decoder(srd.Decoder): self.accu_text = [] self.ss_text = self.es_text = None + def check_extra_flush(self, b): + # Optionally flush previously accumulated runs of payload data + # according to user specified conditions. + if self.options['delim'] == 'none': + return + if not self.accu_bytes: + return + + # This implementation exlusively handles "text lines", but adding + # support for more variants here is straight forward. + # + # Search for the first data byte _after_ a user specified text + # line termination sequence was seen. The termination sequence's + # alphabet may be variable, and the sequence may span multiple + # data bytes. We accept either CR or LF, and combine the CR+LF + # sequence to strive for maximum length annotations for improved + # readability at different zoom levels. It's acceptable that this + # implementation would also combine multiple line terminations + # like LF+LF. + term_chars = (10, 13) + is_eol = b in term_chars + had_eol = self.accu_bytes[-1] in term_chars + if had_eol and not is_eol: + self.flush_bytes_text_accu() + + def check_pp(self, dio = None): + # The combination of ATN and EOI means PP (parallel poll). Track + # this condition's start and end, and keep grabing the DIO lines' + # state as long as the condition is seen, since DAV is not used + # in the PP communication. + capture_in_pp = self.curr_atn and self.curr_eoi + decoder_in_pp = self.ss_pp is not None + if capture_in_pp and not decoder_in_pp: + # Phase starts. Track its ss. Start collecting DIO state. + self.ss_pp = self.samplenum + self.dio_pp = [] + return 'enter' + if not capture_in_pp and decoder_in_pp: + # Phase ends. Void its ss. Process collected DIO state. + ss, es = self.ss_pp, self.samplenum + dio = self.dio_pp or [] + self.ss_pp, self.dio_pp = None, None + if ss == es: + # False positive, caused by low oversampling. + return 'leave' + # Emit its annotation. Translate bit indices 0..7 for the + # DIO1..DIO8 signals to display text. Pass bit indices in + # the Python output for upper layers. + # + # TODO The presentation of this information may need more + # adjustment. The bit positions need not translate to known + # device addresses. Bits need not even belong to a single + # device. Participants and their location in the DIO pattern + # is configurable. Leave the interpretation to upper layers. + bits = [i for i, b in enumerate(dio) if b] + bits_text = ' '.join(['{}'.format(i + 1) for i in bits]) + dios = ['DIO{}'.format(i + 1) for i in bits] + dios_text = ' '.join(dios or ['-']) + text = [ + 'PPOLL {}'.format(dios_text), + 'PP {}'.format(bits_text), + 'PP', + ] + self.emit_data_ann(ss, es, ANN_PP, text) + self.putpy(ss, es, 'PPOLL', None, bits) + # Cease collecting DIO state. + return 'leave' + if decoder_in_pp: + # Keep collecting DIO state for each individual sample in + # the PP phase. Logically OR all DIO values that were seen. + # This increases robustness for low oversampling captures, + # where DIO may no longer be asserted when ATN/EOI deassert, + # and DIO was not asserted yet when ATN/EOI start asserting. + if dio is None: + dio = [] + if len(dio) > len(self.dio_pp): + self.dio_pp.extend([ 0, ] * (len(dio) - len(self.dio_pp))) + for i, b in enumerate(dio): + self.dio_pp[i] |= b + return 'keep' + return 'idle' + def handle_ifc_change(self, ifc): # Track IFC line for parallel input. # Assertion of IFC de-selects all talkers and listeners. if ifc: self.last_talker = None self.last_listener = [] + self.flush_bytes_text_accu() def handle_eoi_change(self, eoi): # Track EOI line for parallel and serial input. @@ -449,6 +539,8 @@ class Decoder(srd.Decoder): # TODO Process data depending on peripheral type and channel? def handle_data_byte(self): + if not self.curr_atn: + self.check_extra_flush(self.curr_raw) b = self.curr_raw texts = _get_raw_text(b, self.curr_atn) self.emit_data_ann(self.ss_raw, self.es_raw, ANN_RAW_BYTE, texts) @@ -459,6 +551,13 @@ class Decoder(srd.Decoder): upd_iec = False, py_type = None py_peers = False + if self.options['atn_parity'] == 'yes': + par = 1 if b & 0x80 else 0 + b &= ~0x80 + ones = bin(b).count('1') + par + if ones % 2: + warn_texts = ['Command parity error', 'parity', 'PAR'] + self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts) is_cmd, is_unl, is_unt = _is_command(b) laddr = _is_listen_addr(b) taddr = _is_talk_addr(b) @@ -656,6 +755,11 @@ class Decoder(srd.Decoder): # low signal levels, i.e. won't include the initial falling edge. # Scan for ATN/EOI edges as well (including the trick which works # around initial pin state). + # + # Use efficient edge based wait conditions for most activities, + # though some phases may require individual inspection of each + # sample (think parallel poll in combination with slow sampling). + # # Map low-active physical transport lines to positive logic here, # to simplify logical inspection/decoding of communicated data, # and to avoid redundancy and inconsistency in later code paths. @@ -672,6 +776,14 @@ class Decoder(srd.Decoder): if has_ifc: idx_ifc = len(waitcond) waitcond.append({PIN_IFC: 'l'}) + idx_pp_check = None + def add_data_cond(conds): + idx = len(conds) + conds.append({'skip': 1}) + return idx + def del_data_cond(conds, idx): + conds.pop(idx) + return None while True: pins = self.wait(waitcond) pins = self.invert_pins(pins) @@ -680,18 +792,34 @@ class Decoder(srd.Decoder): # captures, many edges fall onto the same sample number. So # we process active edges of flags early (before processing # data bits), and inactive edges late (after data got processed). + want_pp_check = False if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1: self.handle_ifc_change(pins[PIN_IFC]) if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1: self.handle_eoi_change(pins[PIN_EOI]) + want_pp_check = True if self.matched[idx_atn] and pins[PIN_ATN] == 1: self.handle_atn_change(pins[PIN_ATN]) + want_pp_check = True + if want_pp_check and not idx_pp_check: + pp = self.check_pp() + if pp in ('enter',): + idx_pp_check = add_data_cond(waitcond) if self.matched[idx_dav]: self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1]) + if idx_pp_check: + pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1]) + want_pp_check = False if self.matched[idx_atn] and pins[PIN_ATN] == 0: self.handle_atn_change(pins[PIN_ATN]) + want_pp_check = True if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0: self.handle_eoi_change(pins[PIN_EOI]) + want_pp_check = True + if idx_pp_check is not None and want_pp_check: + pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1]) + if pp in ('leave',) and idx_pp_check is not None: + idx_pp_check = del_data_cond(waitcond, idx_pp_check) if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0: self.handle_ifc_change(pins[PIN_IFC])