when addressing channels within the device.
- 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
is the raw data byte (transport layer, ATN inactive).
+ - 'PPOLL': <addr> is not applicable, <pdata> is a list of bit indices
+ (DIO1 to DIO8 order) which responded to the PP request.
Extracted payload information (peers and their communicated data):
- 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
ANN_RAW_BIT, ANN_RAW_BYTE,
ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,
ANN_EOI,
+ ANN_PP,
ANN_TEXT,
# TODO Want to provide one annotation class per talker address (0-30)?
ANN_IEC_PERIPH,
ANN_WARN,
-) = range(11)
+) = range(12)
(
BIN_RAW,
('saddr', 'Secondary address'),
('data', 'Data byte'),
('eoi', 'EOI'),
+ ('pp', 'Parallel poll'),
('text', 'Talker text'),
('periph', 'IEC bus peripherals'),
('warning', 'Warning'),
('raws', 'Raw bytes', (ANN_RAW_BYTE,)),
('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)),
('eois', 'EOI', (ANN_EOI,)),
+ ('polls', 'Polls', (ANN_PP,)),
('texts', 'Talker texts', (ANN_TEXT,)),
('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)),
('warnings', 'Warnings', (ANN_WARN,)),
self.es_eoi = None
self.ss_text = None
self.es_text = None
+ self.ss_pp = None
self.last_talker = None
self.last_listener = []
self.last_iec_addr = None
if had_eol and not is_eol:
self.flush_bytes_text_accu()
+ def check_pp(self, dio = None):
+ # The combination of ATN and EOI means PP (parallel poll). Track
+ # this condition's start and end, and keep grabing the DIO lines'
+ # state as long as the condition is seen, since DAV is not used
+ # in the PP communication.
+ capture_in_pp = self.curr_atn and self.curr_eoi
+ decoder_in_pp = self.ss_pp is not None
+ if capture_in_pp and not decoder_in_pp:
+ # Phase starts. Track its ss. Start collecting DIO state.
+ self.ss_pp = self.samplenum
+ self.dio_pp = []
+ return 'enter'
+ if not capture_in_pp and decoder_in_pp:
+ # Phase ends. Void its ss. Process collected DIO state.
+ ss, es = self.ss_pp, self.samplenum
+ dio = self.dio_pp or []
+ self.ss_pp, self.dio_pp = None, None
+ if ss == es:
+ # False positive, caused by low oversampling.
+ return 'leave'
+ # Emit its annotation. Translate bit indices 0..7 for the
+ # DIO1..DIO8 signals to display text. Pass bit indices in
+ # the Python output for upper layers.
+ #
+ # TODO The presentation of this information may need more
+ # adjustment. The bit positions need not translate to known
+ # device addresses. Bits need not even belong to a single
+ # device. Participants and their location in the DIO pattern
+ # is configurable. Leave the interpretation to upper layers.
+ bits = [i for i, b in enumerate(dio) if b]
+ bits_text = ' '.join(['{}'.format(i + 1) for i in bits])
+ dios = ['DIO{}'.format(i + 1) for i in bits]
+ dios_text = ' '.join(dios or ['-'])
+ text = [
+ 'PPOLL {}'.format(dios_text),
+ 'PP {}'.format(bits_text),
+ 'PP',
+ ]
+ self.emit_data_ann(ss, es, ANN_PP, text)
+ self.putpy(ss, es, 'PPOLL', None, bits)
+ # Cease collecting DIO state.
+ return 'leave'
+ if decoder_in_pp:
+ # Keep collecting DIO state for each individual sample in
+ # the PP phase. Logically OR all DIO values that were seen.
+ # This increases robustness for low oversampling captures,
+ # where DIO may no longer be asserted when ATN/EOI deassert,
+ # and DIO was not asserted yet when ATN/EOI start asserting.
+ if dio is None:
+ dio = []
+ if len(dio) > len(self.dio_pp):
+ self.dio_pp.extend([ 0, ] * (len(dio) - len(self.dio_pp)))
+ for i, b in enumerate(dio):
+ self.dio_pp[i] |= b
+ return 'keep'
+ return 'idle'
+
def handle_ifc_change(self, ifc):
# Track IFC line for parallel input.
# Assertion of IFC de-selects all talkers and listeners.
# low signal levels, i.e. won't include the initial falling edge.
# Scan for ATN/EOI edges as well (including the trick which works
# around initial pin state).
+ #
+ # Use efficient edge based wait conditions for most activities,
+ # though some phases may require individual inspection of each
+ # sample (think parallel poll in combination with slow sampling).
+ #
# Map low-active physical transport lines to positive logic here,
# to simplify logical inspection/decoding of communicated data,
# and to avoid redundancy and inconsistency in later code paths.
if has_ifc:
idx_ifc = len(waitcond)
waitcond.append({PIN_IFC: 'l'})
+ idx_pp_check = None
+ def add_data_cond(conds):
+ idx = len(conds)
+ conds.append({'skip': 1})
+ return idx
+ def del_data_cond(conds, idx):
+ conds.pop(idx)
+ return None
while True:
pins = self.wait(waitcond)
pins = self.invert_pins(pins)
# captures, many edges fall onto the same sample number. So
# we process active edges of flags early (before processing
# data bits), and inactive edges late (after data got processed).
+ want_pp_check = False
if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1:
self.handle_ifc_change(pins[PIN_IFC])
if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1:
self.handle_eoi_change(pins[PIN_EOI])
+ want_pp_check = True
if self.matched[idx_atn] and pins[PIN_ATN] == 1:
self.handle_atn_change(pins[PIN_ATN])
+ want_pp_check = True
+ if want_pp_check and not idx_pp_check:
+ pp = self.check_pp()
+ if pp in ('enter',):
+ idx_pp_check = add_data_cond(waitcond)
if self.matched[idx_dav]:
self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1])
+ if idx_pp_check:
+ pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
+ want_pp_check = False
if self.matched[idx_atn] and pins[PIN_ATN] == 0:
self.handle_atn_change(pins[PIN_ATN])
+ want_pp_check = True
if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0:
self.handle_eoi_change(pins[PIN_EOI])
+ want_pp_check = True
+ if idx_pp_check is not None and want_pp_check:
+ pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
+ if pp in ('leave',) and idx_pp_check is not None:
+ idx_pp_check = del_data_cond(waitcond, idx_pp_check)
if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0:
self.handle_ifc_change(pins[PIN_IFC])