]> sigrok.org Git - libsigrokdecode.git/blame - decoders/ieee488/pd.py
ieee488: shorten an option description for consistency across dialogs
[libsigrokdecode.git] / decoders / ieee488 / pd.py
CommitLineData
92ee3b28
GS
1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2016 Rudolf Reuter <reuterru@arcor.de>
5## Copyright (C) 2017 Marcus Comstedt <marcus@mc.pp.se>
6## Copyright (C) 2019 Gerhard Sittig <gerhard.sittig@gmx.net>
7##
8## This program is free software; you can redistribute it and/or modify
9## it under the terms of the GNU General Public License as published by
10## the Free Software Foundation; either version 2 of the License, or
11## (at your option) any later version.
12##
13## This program is distributed in the hope that it will be useful,
14## but WITHOUT ANY WARRANTY; without even the implied warranty of
15## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16## GNU General Public License for more details.
17##
18## You should have received a copy of the GNU General Public License
19## along with this program; if not, see <http://www.gnu.org/licenses/>.
20##
21
22# This file was created from earlier implementations of the 'gpib' and
23# the 'iec' protocol decoders. It combines the parallel and the serial
24# transmission variants in a single instance with optional inputs for
25# maximum code re-use.
26
27# TODO
28# - Extend annotations for improved usability.
29# - Keep talkers' data streams on separate annotation rows? Is this useful
30# here at the GPIB level, or shall stacked decoders dispatch these? May
31# depend on how often captures get inspected which involve multiple peers.
32# - Make serial bit annotations optional? Could slow down interactive
33# exploration for long captures (see USB).
34# - Move the inlined Commodore IEC peripherals support to a stacked decoder
35# when more peripherals get added.
36# - SCPI over GPIB may "represent somewhat naturally" here already when
37# text lines are a single run of data at the GPIB layer (each line between
38# the address spec and either EOI or ATN). So a stacked SCPI decoder may
39# only become necessary when the text lines' content shall get inspected.
40
41import sigrokdecode as srd
42from common.srdhelper import bitpack
43
44'''
45OUTPUT_PYTHON format for stacked decoders:
46
47General packet format:
48[<ptype>, <addr>, <pdata>]
49
50This is the list of <ptype>s and their respective <pdata> values:
51
52Raw bits and bytes at the physical transport level:
53 - 'IEC_BIT': <addr> is not applicable, <pdata> is the transport's bit value.
54 - 'GPIB_RAW': <addr> is not applicable, <pdata> is the transport's
55 byte value. Data bytes are in the 0x00-0xff range, command/address
56 bytes are in the 0x100-0x1ff range.
57
58GPIB level byte fields (commands, addresses, pieces of data):
59 - 'COMMAND': <addr> is not applicable, <pdata> is the command's byte value.
60 - 'LISTEN': <addr> is the listener address (0-30), <pdata> is the raw
61 byte value (including the 0x20 offset).
62 - 'TALK': <addr> is the talker address (0-30), <pdata> is the raw byte
63 value (including the 0x40 offset).
64 - 'SECONDARY': <addr> is the secondary address (0-31), <pdata> is the
65 raw byte value (including the 0x60 offset).
66 - 'MSB_SET': <addr> as well as <pdata> are the raw byte value (including
67 the 0x80 offset). This usually does not happen for GPIB bytes with ATN
68 active, but was observed with the IEC bus and Commodore floppy drives,
69 when addressing channels within the device.
70 - 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
71 is the raw data byte (transport layer, ATN inactive).
72
73Extracted payload information (peers and their communicated data):
74 - 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
75 current listeners. These updates for the current "connected peers"
76 are sent when the set of peers changes, i.e. after talkers/listeners
77 got selected or deselected. Of course the data only covers what could
78 be gathered from the input data. Some controllers may not explicitly
79 address themselves, or captures may not include an early setup phase.
80 - 'TALKER_BYTES': <addr> is the talker address (when available), <pdata>
81 is the accumulated byte sequence between addressing a talker and EOI,
82 or the next command/address.
83 - 'TALKER_TEXT': <addr> is the talker address (when available), <pdata>
84 is the accumulated text sequence between addressing a talker and EOI,
85 or the next command/address.
86'''
87
88class ChannelError(Exception):
89 pass
90
91def _format_ann_texts(fmts, **args):
92 if not fmts:
93 return None
94 return [fmt.format(**args) for fmt in fmts]
95
96_cmd_table = {
97 # Command codes in the 0x00-0x1f range.
98 0x01: ['Go To Local', 'GTL'],
99 0x04: ['Selected Device Clear', 'SDC'],
100 0x05: ['Parallel Poll Configure', 'PPC'],
101 0x08: ['Global Execute Trigger', 'GET'],
102 0x09: ['Take Control', 'TCT'],
103 0x11: ['Local Lock Out', 'LLO'],
104 0x14: ['Device Clear', 'DCL'],
105 0x15: ['Parallel Poll Unconfigure', 'PPU'],
106 0x18: ['Serial Poll Enable', 'SPE'],
107 0x19: ['Serial Poll Disable', 'SPD'],
108 # Unknown type of command.
109 None: ['Unknown command 0x{cmd:02x}', 'command 0x{cmd:02x}', 'cmd {cmd:02x}', 'C{cmd_ord:c}'],
110 # Special listener/talker "addresses" (deselecting previous peers).
111 0x3f: ['Unlisten', 'UNL'],
112 0x5f: ['Untalk', 'UNT'],
113}
114
115def _is_command(b):
116 # Returns a tuple of booleans (or None when not applicable) whether
117 # the raw GPIB byte is: a command, an un-listen, an un-talk command.
118 if b in range(0x00, 0x20):
119 return True, None, None
120 if b in range(0x20, 0x40) and (b & 0x1f) == 31:
121 return True, True, False
122 if b in range(0x40, 0x60) and (b & 0x1f) == 31:
123 return True, False, True
124 return False, None, None
125
126def _is_listen_addr(b):
127 if b in range(0x20, 0x40):
128 return b & 0x1f
129 return None
130
131def _is_talk_addr(b):
132 if b in range(0x40, 0x60):
133 return b & 0x1f
134 return None
135
136def _is_secondary_addr(b):
137 if b in range(0x60, 0x80):
138 return b & 0x1f
139 return None
140
141def _is_msb_set(b):
142 if b & 0x80:
143 return b
144 return None
145
146def _get_raw_byte(b, atn):
147 # "Decorate" raw byte values for stacked decoders.
08198247 148 return b | 0x100 if atn else b
92ee3b28
GS
149
150def _get_raw_text(b, atn):
151 return ['{leader}{data:02x}'.format(leader = '/' if atn else '', data = b)]
152
153def _get_command_texts(b):
154 fmts = _cmd_table.get(b, None)
155 known = fmts is not None
156 if not fmts:
157 fmts = _cmd_table.get(None, None)
158 if not fmts:
159 return known, None
160 return known, _format_ann_texts(fmts, cmd = b, cmd_ord = ord('0') + b)
161
162def _get_address_texts(b):
163 laddr = _is_listen_addr(b)
164 taddr = _is_talk_addr(b)
165 saddr = _is_secondary_addr(b)
166 msb = _is_msb_set(b)
167 fmts = None
168 if laddr is not None:
169 fmts = ['Listen {addr:d}', 'L {addr:d}', 'L{addr_ord:c}']
170 addr = laddr
171 elif taddr is not None:
172 fmts = ['Talk {addr:d}', 'T {addr:d}', 'T{addr_ord:c}']
173 addr = taddr
174 elif saddr is not None:
175 fmts = ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
176 addr = saddr
177 elif msb is not None: # For IEC bus compat.
178 fmts = ['Secondary {addr:d}', 'S {addr:d}', 'S{addr_ord:c}']
179 addr = msb
180 return _format_ann_texts(fmts, addr = addr, addr_ord = ord('0') + addr)
181
182def _get_data_text(b):
183 # TODO Move the table of ASCII control characters to a common location?
184 # TODO Move the "printable with escapes" logic to a common helper?
185 _control_codes = {
186 0x00: 'NUL',
187 0x01: 'SOH',
188 0x02: 'STX',
189 0x03: 'ETX',
190 0x04: 'EOT',
191 0x05: 'ENQ',
192 0x06: 'ACK',
193 0x07: 'BEL',
194 0x08: 'BS',
195 0x09: 'TAB',
196 0x0a: 'LF',
197 0x0b: 'VT',
198 0x0c: 'FF',
199 0x0d: 'CR',
200 0x0e: 'SO',
201 0x0f: 'SI',
202 0x10: 'DLE',
203 0x11: 'DC1',
204 0x12: 'DC2',
205 0x13: 'DC3',
206 0x14: 'DC4',
207 0x15: 'NAK',
208 0x16: 'SYN',
209 0x17: 'ETB',
210 0x18: 'CAN',
211 0x19: 'EM',
212 0x1a: 'SUB',
213 0x1b: 'ESC',
214 0x1c: 'FS',
215 0x1d: 'GS',
216 0x1e: 'RS',
217 0x1f: 'US',
218 }
219 # Yes, exclude 0x7f (DEL) here. It's considered non-printable.
220 if b in range(0x20, 0x7f) and b not in ('[', ']'):
221 return '{:s}'.format(chr(b))
222 elif b in _control_codes:
223 return '[{:s}]'.format(_control_codes[b])
224 # Use a compact yet readable and unambigous presentation for bytes
225 # which contain non-printables. The format that is used here is
226 # compatible with 93xx EEPROM and UART decoders.
227 return '[{:02x}]'.format(b)
228
229(
230 PIN_DIO1, PIN_DIO2, PIN_DIO3, PIN_DIO4,
231 PIN_DIO5, PIN_DIO6, PIN_DIO7, PIN_DIO8,
232 PIN_EOI, PIN_DAV, PIN_NRFD, PIN_NDAC,
233 PIN_IFC, PIN_SRQ, PIN_ATN, PIN_REN,
234 PIN_CLK,
235) = range(17)
236PIN_DATA = PIN_DIO1
237
238(
239 ANN_RAW_BIT, ANN_RAW_BYTE,
240 ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,
241 ANN_EOI,
242 ANN_TEXT,
243 # TODO Want to provide one annotation class per talker address (0-30)?
244 ANN_IEC_PERIPH,
245 ANN_WARN,
246) = range(11)
247
248(
249 BIN_RAW,
250 BIN_DATA,
251 # TODO Want to provide one binary annotation class per talker address (0-30)?
252) = range(2)
253
254class Decoder(srd.Decoder):
255 api_version = 3
256 id = 'ieee488'
257 name = 'IEEE-488'
bca4fea3 258 longname = 'IEEE-488 GPIB/HPIB/IEC'
92ee3b28
GS
259 desc = 'IEEE-488 General Purpose Interface Bus (GPIB/HPIB or IEC).'
260 license = 'gplv2+'
261 inputs = ['logic']
262 outputs = ['ieee488']
263 tags = ['PC', 'Retro computing']
264 channels = (
265 {'id': 'dio1' , 'name': 'DIO1/DATA',
266 'desc': 'Data I/O bit 1, or serial data'},
267 )
268 optional_channels = (
269 {'id': 'dio2' , 'name': 'DIO2', 'desc': 'Data I/O bit 2'},
270 {'id': 'dio3' , 'name': 'DIO3', 'desc': 'Data I/O bit 3'},
271 {'id': 'dio4' , 'name': 'DIO4', 'desc': 'Data I/O bit 4'},
272 {'id': 'dio5' , 'name': 'DIO5', 'desc': 'Data I/O bit 5'},
273 {'id': 'dio6' , 'name': 'DIO6', 'desc': 'Data I/O bit 6'},
274 {'id': 'dio7' , 'name': 'DIO7', 'desc': 'Data I/O bit 7'},
275 {'id': 'dio8' , 'name': 'DIO8', 'desc': 'Data I/O bit 8'},
276 {'id': 'eoi', 'name': 'EOI', 'desc': 'End or identify'},
277 {'id': 'dav', 'name': 'DAV', 'desc': 'Data valid'},
278 {'id': 'nrfd', 'name': 'NRFD', 'desc': 'Not ready for data'},
279 {'id': 'ndac', 'name': 'NDAC', 'desc': 'Not data accepted'},
280 {'id': 'ifc', 'name': 'IFC', 'desc': 'Interface clear'},
281 {'id': 'srq', 'name': 'SRQ', 'desc': 'Service request'},
282 {'id': 'atn', 'name': 'ATN', 'desc': 'Attention'},
283 {'id': 'ren', 'name': 'REN', 'desc': 'Remote enable'},
284 {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'},
285 )
286 options = (
bcb8698e 287 {'id': 'iec_periph', 'desc': 'Decode Commodore IEC peripherals',
92ee3b28 288 'default': 'no', 'values': ('no', 'yes')},
962983f3
GS
289 {'id': 'delim', 'desc': 'Payload data delimiter',
290 'default': 'eol', 'values': ('none', 'eol')},
f697102e
GS
291 {'id': 'atn_parity', 'desc': 'ATN commands use parity',
292 'default': 'no', 'values': ('no', 'yes')},
92ee3b28
GS
293 )
294 annotations = (
295 ('bit', 'IEC bit'),
296 ('raw', 'Raw byte'),
297 ('cmd', 'Command'),
298 ('laddr', 'Listener address'),
299 ('taddr', 'Talker address'),
300 ('saddr', 'Secondary address'),
301 ('data', 'Data byte'),
302 ('eoi', 'EOI'),
303 ('text', 'Talker text'),
304 ('periph', 'IEC bus peripherals'),
e144452b 305 ('warning', 'Warning'),
92ee3b28
GS
306 )
307 annotation_rows = (
308 ('bits', 'IEC bits', (ANN_RAW_BIT,)),
309 ('raws', 'Raw bytes', (ANN_RAW_BYTE,)),
310 ('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)),
311 ('eois', 'EOI', (ANN_EOI,)),
312 ('texts', 'Talker texts', (ANN_TEXT,)),
313 ('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)),
e144452b 314 ('warnings', 'Warnings', (ANN_WARN,)),
92ee3b28
GS
315 )
316 binary = (
317 ('raw', 'Raw bytes'),
318 ('data', 'Talker bytes'),
319 )
320
321 def __init__(self):
322 self.reset()
323
324 def reset(self):
325 self.curr_raw = None
326 self.curr_atn = None
327 self.curr_eoi = None
328 self.latch_atn = None
329 self.latch_eoi = None
330 self.accu_bytes = []
331 self.accu_text = []
332 self.ss_raw = None
333 self.es_raw = None
334 self.ss_eoi = None
335 self.es_eoi = None
336 self.ss_text = None
337 self.es_text = None
338 self.last_talker = None
339 self.last_listener = []
340 self.last_iec_addr = None
341 self.last_iec_sec = None
342
343 def start(self):
344 self.out_ann = self.register(srd.OUTPUT_ANN)
345 self.out_bin = self.register(srd.OUTPUT_BINARY)
346 self.out_python = self.register(srd.OUTPUT_PYTHON)
347
348 def putg(self, ss, es, data):
349 self.put(ss, es, self.out_ann, data)
350
351 def putbin(self, ss, es, data):
352 self.put(ss, es, self.out_bin, data)
353
354 def putpy(self, ss, es, ptype, addr, pdata):
355 self.put(ss, es, self.out_python, [ptype, addr, pdata])
356
357 def emit_eoi_ann(self, ss, es):
358 self.putg(ss, es, [ANN_EOI, ['EOI']])
359
360 def emit_bin_ann(self, ss, es, ann_cls, data):
361 self.putbin(ss, es, [ann_cls, bytes(data)])
362
363 def emit_data_ann(self, ss, es, ann_cls, data):
364 self.putg(ss, es, [ann_cls, data])
365
366 def emit_warn_ann(self, ss, es, data):
367 self.putg(ss, es, [ANN_WARN, data])
368
369 def flush_bytes_text_accu(self):
370 if self.accu_bytes and self.ss_text is not None and self.es_text is not None:
371 self.emit_bin_ann(self.ss_text, self.es_text, BIN_DATA, bytearray(self.accu_bytes))
372 self.putpy(self.ss_text, self.es_text, 'TALKER_BYTES', self.last_talker, bytearray(self.accu_bytes))
373 self.accu_bytes = []
374 if self.accu_text and self.ss_text is not None and self.es_text is not None:
375 text = ''.join(self.accu_text)
376 self.emit_data_ann(self.ss_text, self.es_text, ANN_TEXT, [text])
377 self.putpy(self.ss_text, self.es_text, 'TALKER_TEXT', self.last_talker, text)
378 self.accu_text = []
379 self.ss_text = self.es_text = None
380
962983f3
GS
381 def check_extra_flush(self, b):
382 # Optionally flush previously accumulated runs of payload data
383 # according to user specified conditions.
384 if self.options['delim'] == 'none':
385 return
386 if not self.accu_bytes:
387 return
388
389 # This implementation exlusively handles "text lines", but adding
390 # support for more variants here is straight forward.
391 #
392 # Search for the first data byte _after_ a user specified text
393 # line termination sequence was seen. The termination sequence's
394 # alphabet may be variable, and the sequence may span multiple
395 # data bytes. We accept either CR or LF, and combine the CR+LF
396 # sequence to strive for maximum length annotations for improved
397 # readability at different zoom levels. It's acceptable that this
398 # implementation would also combine multiple line terminations
399 # like LF+LF.
400 term_chars = (10, 13)
401 is_eol = b in term_chars
402 had_eol = self.accu_bytes[-1] in term_chars
403 if had_eol and not is_eol:
404 self.flush_bytes_text_accu()
405
92ee3b28
GS
406 def handle_ifc_change(self, ifc):
407 # Track IFC line for parallel input.
408 # Assertion of IFC de-selects all talkers and listeners.
409 if ifc:
410 self.last_talker = None
411 self.last_listener = []
8e796681 412 self.flush_bytes_text_accu()
92ee3b28
GS
413
414 def handle_eoi_change(self, eoi):
415 # Track EOI line for parallel and serial input.
416 if eoi:
417 self.ss_eoi = self.samplenum
418 self.curr_eoi = eoi
419 else:
420 self.es_eoi = self.samplenum
421 if self.ss_eoi and self.latch_eoi:
422 self.emit_eoi_ann(self.ss_eoi, self.es_eoi)
423 self.es_text = self.es_eoi
424 self.flush_bytes_text_accu()
425 self.ss_eoi = self.es_eoi = None
426 self.curr_eoi = None
427
428 def handle_atn_change(self, atn):
429 # Track ATN line for parallel and serial input.
430 self.curr_atn = atn
431 if atn:
432 self.flush_bytes_text_accu()
433
434 def handle_iec_periph(self, ss, es, addr, sec, data):
435 # The annotation is optional.
436 if self.options['iec_periph'] != 'yes':
437 return
438 # Void internal state.
439 if addr is None and sec is None and data is None:
440 self.last_iec_addr = None
441 self.last_iec_sec = None
442 return
443 # Grab and evaluate new input.
444 _iec_addr_names = {
445 # TODO Add more items here. See the "Device numbering" section
446 # of the https://en.wikipedia.org/wiki/Commodore_bus page.
447 8: 'Disk 0',
448 9: 'Disk 1',
449 }
450 _iec_disk_range = range(8, 16)
451 if addr is not None:
452 self.last_iec_addr = addr
453 name = _iec_addr_names.get(addr, None)
454 if name:
455 self.emit_data_ann(ss, es, ANN_IEC_PERIPH, [name])
456 addr = self.last_iec_addr # Simplify subsequent logic.
457 if sec is not None:
458 # BEWARE! The secondary address is a full byte and includes
459 # the 0x60 offset, to also work when the MSB was set.
460 self.last_iec_sec = sec
461 subcmd, channel = sec & 0xf0, sec & 0x0f
462 channel_ord = ord('0') + channel
463 if addr is not None and addr in _iec_disk_range:
464 subcmd_fmts = {
465 0x60: ['Reopen {ch:d}', 'Re {ch:d}', 'R{ch_ord:c}'],
466 0xe0: ['Close {ch:d}', 'Cl {ch:d}', 'C{ch_ord:c}'],
467 0xf0: ['Open {ch:d}', 'Op {ch:d}', 'O{ch_ord:c}'],
468 }.get(subcmd, None)
469 if subcmd_fmts:
470 texts = _format_ann_texts(subcmd_fmts, ch = channel, ch_ord = channel_ord)
471 self.emit_data_ann(ss, es, ANN_IEC_PERIPH, texts)
472 sec = self.last_iec_sec # Simplify subsequent logic.
473 if data is not None:
474 if addr is None or sec is None:
475 return
476 # TODO Process data depending on peripheral type and channel?
477
478 def handle_data_byte(self):
962983f3
GS
479 if not self.curr_atn:
480 self.check_extra_flush(self.curr_raw)
92ee3b28
GS
481 b = self.curr_raw
482 texts = _get_raw_text(b, self.curr_atn)
483 self.emit_data_ann(self.ss_raw, self.es_raw, ANN_RAW_BYTE, texts)
484 self.emit_bin_ann(self.ss_raw, self.es_raw, BIN_RAW, b.to_bytes(1, byteorder='big'))
485 self.putpy(self.ss_raw, self.es_raw, 'GPIB_RAW', None, _get_raw_byte(b, self.curr_atn))
486 if self.curr_atn:
487 ann_cls = None
488 upd_iec = False,
489 py_type = None
490 py_peers = False
f697102e
GS
491 if self.options['atn_parity'] == 'yes':
492 par = 1 if b & 0x80 else 0
493 b &= ~0x80
494 ones = bin(b).count('1') + par
495 if ones % 2:
496 warn_texts = ['Command parity error', 'parity', 'PAR']
497 self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
92ee3b28
GS
498 is_cmd, is_unl, is_unt = _is_command(b)
499 laddr = _is_listen_addr(b)
500 taddr = _is_talk_addr(b)
501 saddr = _is_secondary_addr(b)
502 msb = _is_msb_set(b)
503 if is_cmd:
504 known, texts = _get_command_texts(b)
505 if not known:
506 warn_texts = ['Unknown GPIB command', 'unknown', 'UNK']
507 self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
508 ann_cls = ANN_CMD
509 py_type, py_addr = 'COMMAND', None
510 if is_unl:
511 self.last_listener = []
512 py_peers = True
513 if is_unt:
514 self.last_talker = None
515 py_peers = True
516 if is_unl or is_unt:
517 upd_iec = True, None, None, None
518 elif laddr is not None:
519 addr = laddr
520 texts = _get_address_texts(b)
521 ann_cls = ANN_LADDR
522 py_type, py_addr = 'LISTEN', addr
523 if addr == self.last_talker:
524 self.last_talker = None
525 self.last_listener.append(addr)
526 upd_iec = True, addr, None, None
527 py_peers = True
528 elif taddr is not None:
529 addr = taddr
530 texts = _get_address_texts(b)
531 ann_cls = ANN_TADDR
532 py_type, py_addr = 'TALK', addr
533 if addr in self.last_listener:
534 self.last_listener.remove(addr)
535 self.last_talker = addr
536 upd_iec = True, addr, None, None
537 py_peers = True
538 elif saddr is not None:
539 addr = saddr
540 texts = _get_address_texts(b)
541 ann_cls = ANN_SADDR
542 upd_iec = True, None, b, None
543 py_type, py_addr = 'SECONDARY', addr
544 elif msb is not None:
545 # These are not really "secondary addresses", but they
546 # are used by the Commodore IEC bus (floppy channels).
547 texts = _get_address_texts(b)
548 ann_cls = ANN_SADDR
549 upd_iec = True, None, b, None
550 py_type, py_addr = 'MSB_SET', b
551 if ann_cls is not None and texts is not None:
552 self.emit_data_ann(self.ss_raw, self.es_raw, ann_cls, texts)
553 if upd_iec[0]:
554 self.handle_iec_periph(self.ss_raw, self.es_raw, upd_iec[1], upd_iec[2], upd_iec[3])
555 if py_type:
556 self.putpy(self.ss_raw, self.es_raw, py_type, py_addr, b)
557 if py_peers:
558 self.last_listener.sort()
559 self.putpy(self.ss_raw, self.es_raw, 'TALK_LISTEN', self.last_talker, self.last_listener)
560 else:
561 self.accu_bytes.append(b)
562 text = _get_data_text(b)
563 if not self.accu_text:
564 self.ss_text = self.ss_raw
565 self.accu_text.append(text)
566 self.es_text = self.es_raw
567 self.emit_data_ann(self.ss_raw, self.es_raw, ANN_DATA, [text])
568 self.handle_iec_periph(self.ss_raw, self.es_raw, None, None, b)
569 self.putpy(self.ss_raw, self.es_raw, 'DATA_BYTE', self.last_talker, b)
570
571 def handle_dav_change(self, dav, data):
572 if dav:
573 # Data availability starts when the flag goes active.
574 self.ss_raw = self.samplenum
575 self.curr_raw = bitpack(data)
576 self.latch_atn = self.curr_atn
577 self.latch_eoi = self.curr_eoi
578 return
579 # Data availability ends when the flag goes inactive. Handle the
580 # previously captured data byte according to associated flags.
581 self.es_raw = self.samplenum
582 self.handle_data_byte()
583 self.ss_raw = self.es_raw = None
584 self.curr_raw = None
585
586 def inject_dav_phase(self, ss, es, data):
587 # Inspection of serial input has resulted in one raw byte which
588 # spans a given period of time. Pretend we had seen a DAV active
589 # phase, to re-use code for the parallel transmission.
590 self.ss_raw = ss
591 self.curr_raw = bitpack(data)
592 self.latch_atn = self.curr_atn
593 self.latch_eoi = self.curr_eoi
594 self.es_raw = es
595 self.handle_data_byte()
596 self.ss_raw = self.es_raw = None
597 self.curr_raw = None
598
599 def invert_pins(self, pins):
600 # All lines (including data bits!) are low active and thus need
601 # to get inverted to receive their logical state (high active,
602 # regular data bit values). Cope with inputs being optional.
603 return [1 - p if p in (0, 1) else p for p in pins]
604
605 def decode_serial(self, has_clk, has_data_1, has_atn, has_srq):
606 if not has_clk or not has_data_1 or not has_atn:
607 raise ChannelError('IEC bus needs at least ATN and serial CLK and DATA.')
608
609 # This is a rephrased version of decoders/iec/pd.py:decode().
610 # SRQ was not used there either. Magic numbers were eliminated.
611 (
612 STEP_WAIT_READY_TO_SEND,
613 STEP_WAIT_READY_FOR_DATA,
614 STEP_PREP_DATA_TEST_EOI,
615 STEP_CLOCK_DATA_BITS,
616 ) = range(4)
617 step_wait_conds = (
618 [{PIN_ATN: 'f'}, {PIN_DATA: 'l', PIN_CLK: 'h'}],
619 [{PIN_ATN: 'f'}, {PIN_DATA: 'h', PIN_CLK: 'h'}, {PIN_CLK: 'l'}],
620 [{PIN_ATN: 'f'}, {PIN_DATA: 'f'}, {PIN_CLK: 'l'}],
621 [{PIN_ATN: 'f'}, {PIN_CLK: 'e'}],
622 )
623 step = STEP_WAIT_READY_TO_SEND
624 bits = []
625
626 while True:
627
628 # Sample input pin values. Keep DATA/CLK in verbatim form to
629 # re-use 'iec' decoder logic. Turn ATN to positive logic for
630 # easier processing. The data bits get handled during byte
631 # accumulation.
632 pins = self.wait(step_wait_conds[step])
633 data, clk = pins[PIN_DATA], pins[PIN_CLK]
634 atn, = self.invert_pins([pins[PIN_ATN]])
635
636 if self.matched[0]:
637 # Falling edge on ATN, reset step.
638 step = STEP_WAIT_READY_TO_SEND
639
640 if step == STEP_WAIT_READY_TO_SEND:
641 # Don't use self.matched[1] here since we might come from
642 # a step with different conds due to the code above.
643 if data == 0 and clk == 1:
644 # Rising edge on CLK while DATA is low: Ready to send.
645 step = STEP_WAIT_READY_FOR_DATA
646 elif step == STEP_WAIT_READY_FOR_DATA:
647 if data == 1 and clk == 1:
648 # Rising edge on DATA while CLK is high: Ready for data.
649 ss_byte = self.samplenum
650 self.handle_atn_change(atn)
651 if self.curr_eoi:
652 self.handle_eoi_change(False)
653 bits = []
654 step = STEP_PREP_DATA_TEST_EOI
655 elif clk == 0:
656 # CLK low again, transfer aborted.
657 step = STEP_WAIT_READY_TO_SEND
658 elif step == STEP_PREP_DATA_TEST_EOI:
659 if data == 0 and clk == 1:
660 # DATA goes low while CLK is still high, EOI confirmed.
661 self.handle_eoi_change(True)
662 elif clk == 0:
663 step = STEP_CLOCK_DATA_BITS
664 ss_bit = self.samplenum
665 elif step == STEP_CLOCK_DATA_BITS:
666 if self.matched[1]:
667 if clk == 1:
668 # Rising edge on CLK; latch DATA.
669 bits.append(data)
670 elif clk == 0:
671 # Falling edge on CLK; end of bit.
672 es_bit = self.samplenum
673 self.emit_data_ann(ss_bit, es_bit, ANN_RAW_BIT, ['{:d}'.format(bits[-1])])
674 self.putpy(ss_bit, es_bit, 'IEC_BIT', None, bits[-1])
675 ss_bit = self.samplenum
676 if len(bits) == 8:
677 es_byte = self.samplenum
678 self.inject_dav_phase(ss_byte, es_byte, bits)
679 if self.curr_eoi:
680 self.handle_eoi_change(False)
681 step = STEP_WAIT_READY_TO_SEND
682
683 def decode_parallel(self, has_data_n, has_dav, has_atn, has_eoi, has_srq):
684
685 if False in has_data_n or not has_dav or not has_atn:
686 raise ChannelError('IEEE-488 needs at least ATN and DAV and eight DIO lines.')
687 has_ifc = self.has_channel(PIN_IFC)
688
689 # Capture data lines at the falling edge of DAV, process their
690 # values at rising DAV edge (when data validity ends). Also make
691 # sure to start inspection when the capture happens to start with
692 # low signal levels, i.e. won't include the initial falling edge.
693 # Scan for ATN/EOI edges as well (including the trick which works
694 # around initial pin state).
695 # Map low-active physical transport lines to positive logic here,
696 # to simplify logical inspection/decoding of communicated data,
697 # and to avoid redundancy and inconsistency in later code paths.
698 waitcond = []
699 idx_dav = len(waitcond)
700 waitcond.append({PIN_DAV: 'l'})
701 idx_atn = len(waitcond)
702 waitcond.append({PIN_ATN: 'l'})
703 idx_eoi = None
704 if has_eoi:
705 idx_eoi = len(waitcond)
706 waitcond.append({PIN_EOI: 'l'})
707 idx_ifc = None
708 if has_ifc:
709 idx_ifc = len(waitcond)
710 waitcond.append({PIN_IFC: 'l'})
711 while True:
712 pins = self.wait(waitcond)
713 pins = self.invert_pins(pins)
714
715 # BEWARE! Order of evaluation does matter. For low samplerate
716 # captures, many edges fall onto the same sample number. So
717 # we process active edges of flags early (before processing
718 # data bits), and inactive edges late (after data got processed).
719 if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1:
720 self.handle_ifc_change(pins[PIN_IFC])
721 if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1:
722 self.handle_eoi_change(pins[PIN_EOI])
723 if self.matched[idx_atn] and pins[PIN_ATN] == 1:
724 self.handle_atn_change(pins[PIN_ATN])
725 if self.matched[idx_dav]:
726 self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1])
727 if self.matched[idx_atn] and pins[PIN_ATN] == 0:
728 self.handle_atn_change(pins[PIN_ATN])
729 if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0:
730 self.handle_eoi_change(pins[PIN_EOI])
731 if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0:
732 self.handle_ifc_change(pins[PIN_IFC])
733
734 waitcond[idx_dav][PIN_DAV] = 'e'
735 waitcond[idx_atn][PIN_ATN] = 'e'
736 if has_eoi:
737 waitcond[idx_eoi][PIN_EOI] = 'e'
738 if has_ifc:
739 waitcond[idx_ifc][PIN_IFC] = 'e'
740
741 def decode(self):
742 # The decoder's boilerplate declares some of the input signals as
743 # optional, but only to support both serial and parallel variants.
744 # The CLK signal discriminates the two. For either variant some
745 # of the "optional" signals are not really optional for proper
746 # operation of the decoder. Check these conditions here.
747 has_clk = self.has_channel(PIN_CLK)
748 has_data_1 = self.has_channel(PIN_DIO1)
749 has_data_n = [bool(self.has_channel(pin) for pin in range(PIN_DIO1, PIN_DIO8 + 1))]
750 has_dav = self.has_channel(PIN_DAV)
751 has_atn = self.has_channel(PIN_ATN)
752 has_eoi = self.has_channel(PIN_EOI)
753 has_srq = self.has_channel(PIN_SRQ)
754 if has_clk:
755 self.decode_serial(has_clk, has_data_1, has_atn, has_srq)
756 else:
757 self.decode_parallel(has_data_n, has_dav, has_atn, has_eoi, has_srq)