X-Git-Url: http://sigrok.org/gitweb/?p=libsigrokdecode.git;a=blobdiff_plain;f=decoders%2Fieee488%2Fpd.py;fp=decoders%2Fieee488%2Fpd.py;h=b0948a685b40bdf08999a98179c57be3264208b8;hp=7d91a07fdd61c1161579d6ab4a85bae74561a613;hb=c70c28c896ad0509a7fe1b2129c60259c202cb98;hpb=bcb8698eb151b51c17f9205e8752c3ee1413535a diff --git a/decoders/ieee488/pd.py b/decoders/ieee488/pd.py index 7d91a07..b0948a6 100644 --- a/decoders/ieee488/pd.py +++ b/decoders/ieee488/pd.py @@ -69,6 +69,8 @@ GPIB level byte fields (commands, addresses, pieces of data): when addressing channels within the device. - 'DATA_BYTE': is the talker address (when available), is the raw data byte (transport layer, ATN inactive). + - 'PPOLL': is not applicable, is a list of bit indices + (DIO1 to DIO8 order) which responded to the PP request. Extracted payload information (peers and their communicated data): - 'TALK_LISTEN': is the current talker, is the list of @@ -239,11 +241,12 @@ PIN_DATA = PIN_DIO1 ANN_RAW_BIT, ANN_RAW_BYTE, ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA, ANN_EOI, + ANN_PP, ANN_TEXT, # TODO Want to provide one annotation class per talker address (0-30)? ANN_IEC_PERIPH, ANN_WARN, -) = range(11) +) = range(12) ( BIN_RAW, @@ -300,6 +303,7 @@ class Decoder(srd.Decoder): ('saddr', 'Secondary address'), ('data', 'Data byte'), ('eoi', 'EOI'), + ('pp', 'Parallel poll'), ('text', 'Talker text'), ('periph', 'IEC bus peripherals'), ('warning', 'Warning'), @@ -309,6 +313,7 @@ class Decoder(srd.Decoder): ('raws', 'Raw bytes', (ANN_RAW_BYTE,)), ('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)), ('eois', 'EOI', (ANN_EOI,)), + ('polls', 'Polls', (ANN_PP,)), ('texts', 'Talker texts', (ANN_TEXT,)), ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)), ('warnings', 'Warnings', (ANN_WARN,)), @@ -335,6 +340,7 @@ class Decoder(srd.Decoder): self.es_eoi = None self.ss_text = None self.es_text = None + self.ss_pp = None self.last_talker = None self.last_listener = [] self.last_iec_addr = None @@ -403,6 +409,63 @@ class Decoder(srd.Decoder): if had_eol and not is_eol: self.flush_bytes_text_accu() + def check_pp(self, dio = None): + # The combination of ATN and EOI means PP (parallel poll). Track + # this condition's start and end, and keep grabing the DIO lines' + # state as long as the condition is seen, since DAV is not used + # in the PP communication. + capture_in_pp = self.curr_atn and self.curr_eoi + decoder_in_pp = self.ss_pp is not None + if capture_in_pp and not decoder_in_pp: + # Phase starts. Track its ss. Start collecting DIO state. + self.ss_pp = self.samplenum + self.dio_pp = [] + return 'enter' + if not capture_in_pp and decoder_in_pp: + # Phase ends. Void its ss. Process collected DIO state. + ss, es = self.ss_pp, self.samplenum + dio = self.dio_pp or [] + self.ss_pp, self.dio_pp = None, None + if ss == es: + # False positive, caused by low oversampling. + return 'leave' + # Emit its annotation. Translate bit indices 0..7 for the + # DIO1..DIO8 signals to display text. Pass bit indices in + # the Python output for upper layers. + # + # TODO The presentation of this information may need more + # adjustment. The bit positions need not translate to known + # device addresses. Bits need not even belong to a single + # device. Participants and their location in the DIO pattern + # is configurable. Leave the interpretation to upper layers. + bits = [i for i, b in enumerate(dio) if b] + bits_text = ' '.join(['{}'.format(i + 1) for i in bits]) + dios = ['DIO{}'.format(i + 1) for i in bits] + dios_text = ' '.join(dios or ['-']) + text = [ + 'PPOLL {}'.format(dios_text), + 'PP {}'.format(bits_text), + 'PP', + ] + self.emit_data_ann(ss, es, ANN_PP, text) + self.putpy(ss, es, 'PPOLL', None, bits) + # Cease collecting DIO state. + return 'leave' + if decoder_in_pp: + # Keep collecting DIO state for each individual sample in + # the PP phase. Logically OR all DIO values that were seen. + # This increases robustness for low oversampling captures, + # where DIO may no longer be asserted when ATN/EOI deassert, + # and DIO was not asserted yet when ATN/EOI start asserting. + if dio is None: + dio = [] + if len(dio) > len(self.dio_pp): + self.dio_pp.extend([ 0, ] * (len(dio) - len(self.dio_pp))) + for i, b in enumerate(dio): + self.dio_pp[i] |= b + return 'keep' + return 'idle' + def handle_ifc_change(self, ifc): # Track IFC line for parallel input. # Assertion of IFC de-selects all talkers and listeners. @@ -692,6 +755,11 @@ class Decoder(srd.Decoder): # low signal levels, i.e. won't include the initial falling edge. # Scan for ATN/EOI edges as well (including the trick which works # around initial pin state). + # + # Use efficient edge based wait conditions for most activities, + # though some phases may require individual inspection of each + # sample (think parallel poll in combination with slow sampling). + # # Map low-active physical transport lines to positive logic here, # to simplify logical inspection/decoding of communicated data, # and to avoid redundancy and inconsistency in later code paths. @@ -708,6 +776,14 @@ class Decoder(srd.Decoder): if has_ifc: idx_ifc = len(waitcond) waitcond.append({PIN_IFC: 'l'}) + idx_pp_check = None + def add_data_cond(conds): + idx = len(conds) + conds.append({'skip': 1}) + return idx + def del_data_cond(conds, idx): + conds.pop(idx) + return None while True: pins = self.wait(waitcond) pins = self.invert_pins(pins) @@ -716,18 +792,34 @@ class Decoder(srd.Decoder): # captures, many edges fall onto the same sample number. So # we process active edges of flags early (before processing # data bits), and inactive edges late (after data got processed). + want_pp_check = False if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1: self.handle_ifc_change(pins[PIN_IFC]) if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1: self.handle_eoi_change(pins[PIN_EOI]) + want_pp_check = True if self.matched[idx_atn] and pins[PIN_ATN] == 1: self.handle_atn_change(pins[PIN_ATN]) + want_pp_check = True + if want_pp_check and not idx_pp_check: + pp = self.check_pp() + if pp in ('enter',): + idx_pp_check = add_data_cond(waitcond) if self.matched[idx_dav]: self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1]) + if idx_pp_check: + pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1]) + want_pp_check = False if self.matched[idx_atn] and pins[PIN_ATN] == 0: self.handle_atn_change(pins[PIN_ATN]) + want_pp_check = True if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0: self.handle_eoi_change(pins[PIN_EOI]) + want_pp_check = True + if idx_pp_check is not None and want_pp_check: + pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1]) + if pp in ('leave',) and idx_pp_check is not None: + idx_pp_check = del_data_cond(waitcond, idx_pp_check) if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0: self.handle_ifc_change(pins[PIN_IFC])