RST, CLK, IO, = range(3)
class Ann:
- BIT, ATR, CMD, DATA, RESET, = range(5)
+ RESET_SYM, INTR_SYM, START_SYM, STOP_SYM, BIT_SYM, \
+ ATR_BYTE, CMD_BYTE, OUT_BYTE, PROC_BYTE, \
+ ATR_DATA, CMD_DATA, OUT_DATA, PROC_DATA, \
+ = range(13)
class Bin:
- SEND_DATA, = range(1)
-
-# CMD: [annotation class index, annotation texts for zoom levels]
-proto = {
- 'BIT': [Ann.BIT, '{bit}',],
- 'ATR': [Ann.ATR, 'Answer To Reset: {data:02x}', 'ATR: {data:02x}', '{data:02x}',],
- 'CMD': [Ann.CMD, 'Command: {data:02x}', 'Cmd: {data:02x}', '{data:02x}',],
- 'DATA': [Ann.DATA, 'Data: {data:02x}', '{data:02x}',],
- 'RESET': [Ann.RESET, 'Reset', 'R',],
-}
-
-def lookup_proto_ann_txt(cmd, variables):
- ann = proto.get(cmd, None)
- if ann is None:
- return None, []
- cls, texts = ann[0], ann[1:]
- texts = [t.format(**variables) for t in texts]
- return cls, texts
+ BYTES, = range(1)
class Decoder(srd.Decoder):
api_version = 3
{'id': 'io', 'name': 'I/O', 'desc': 'I/O data line'},
)
annotations = (
- ('bit', 'Bit'),
- ('atr', 'ATR'),
- ('cmd', 'Command'),
- ('data', 'Data exchange'),
- ('reset', 'Reset'),
+ ('reset_sym', 'Reset Symbol'),
+ ('intr_sym', 'Interrupt Symbol'),
+ ('start_sym', 'Start Symbol'),
+ ('stop_sym', 'Stop Symbol'),
+ ('bit_sym', 'Bit Symbol'),
+ ('atr_byte', 'ATR Byte'),
+ ('cmd_byte', 'Command Byte'),
+ ('out_byte', 'Outgoing Byte'),
+ ('proc_byte', 'Processing Byte'),
+ ('atr_data', 'ATR data'),
+ ('cmd_data', 'Command data'),
+ ('out_data', 'Outgoing data'),
+ ('proc_data', 'Processing data'),
)
annotation_rows = (
- ('bits', 'Bits', (Ann.BIT,)),
- ('fields', 'Fields', (Ann.ATR, Ann.CMD, Ann.DATA)),
- ('interrupts', 'Interrupts', (Ann.RESET,)),
+ ('symbols', 'Symbols', (Ann.RESET_SYM, Ann.INTR_SYM,
+ Ann.START_SYM, Ann.STOP_SYM, Ann.BIT_SYM,)),
+ ('fields', 'Fields', (Ann.ATR_BYTE,
+ Ann.CMD_BYTE, Ann.OUT_BYTE, Ann.PROC_BYTE,)),
+ ('operations', 'Operations', (Ann.ATR_DATA,
+ Ann.CMD_DATA, Ann.OUT_DATA, Ann.PROC_DATA,)),
)
binary = (
- ('send-data', 'Send data'),
+ ('bytes', 'Bytes'),
)
def __init__(self):
def reset(self):
self.bits = []
- self.cmd = None
+ self.atr_bytes = []
+ self.cmd_bytes = []
+ self.cmd_proc = None
+ self.out_len = None
+ self.out_bytes = []
+ self.proc_state = None
+ self.state = None
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
def putb(self, ss, es, cls , data):
self.put(ss, es, self.out_binary, [cls, data,])
- def handle_reset(self, pins):
- self.cmd = 'RESET'
- cls, texts = lookup_proto_ann_txt(self.cmd, {})
- self.putx(self.samplenum, self.samplenum, cls, texts)
+ def lookup_proto_ann_txt(self, key, variables):
+ ann = {
+ 'RESET_SYM': [Ann.RESET_SYM, 'Reset', 'R',],
+ 'INTR_SYM': [Ann.INTR_SYM, 'Interrupt', 'Intr', 'I',],
+ 'START_SYM': [Ann.START_SYM, 'Start', 'ST', 'S',],
+ 'STOP_SYM': [Ann.STOP_SYM, 'Stop', 'SP', 'P',],
+ 'BIT_SYM': [Ann.BIT_SYM, '{bit}',],
+ 'ATR_BYTE': [Ann.ATR_BYTE,
+ 'Answer To Reset: {data:02x}',
+ 'ATR: {data:02x}',
+ '{data:02x}',
+ ],
+ 'CMD_BYTE': [Ann.CMD_BYTE,
+ 'Command: {data:02x}',
+ 'Cmd: {data:02x}',
+ '{data:02x}',
+ ],
+ 'OUT_BYTE': [Ann.OUT_BYTE,
+ 'Outgoing data: {data:02x}',
+ 'Data: {data:02x}',
+ '{data:02x}',
+ ],
+ 'PROC_BYTE': [Ann.PROC_BYTE,
+ 'Internal processing: {data:02x}',
+ 'Proc: {data:02x}',
+ '{data:02x}',
+ ],
+ 'ATR_DATA': [Ann.ATR_DATA,
+ 'Answer To Reset: {data}',
+ 'ATR: {data}',
+ '{data}',
+ ],
+ 'CMD_DATA': [Ann.CMD_DATA,
+ 'Command: {data}',
+ 'Cmd: {data}',
+ '{data}',
+ ],
+ 'OUT_DATA': [Ann.OUT_DATA,
+ 'Outgoing: {data}',
+ 'Out: {data}',
+ '{data}',
+ ],
+ 'PROC_DATA': [Ann.PROC_DATA,
+ 'Processing: {data}',
+ 'Proc: {data}',
+ '{data}',
+ ],
+ }.get(key, None)
+ if ann is None:
+ return None, []
+ cls, texts = ann[0], ann[1:]
+ texts = [t.format(**variables) for t in texts]
+ return cls, texts
+
+ def text_for_accu_bytes(self, accu):
+ if not accu:
+ return None, None, None, None
+ ss, es = accu[0][1], accu[-1][2]
+ data = [a[0] for a in accu]
+ text = " ".join(['{:02x}'.format(a) for a in data])
+ return ss, es, data, text
+
+ def flush_queued(self):
+ '''Flush previously accumulated operations details.'''
+
+ # Can be called when either the completion of an operation got
+ # detected (reliably), or when some kind of reset condition was
+ # met while a potential previously observed operation has not
+ # been postprocessed yet (best effort). Should not harm when the
+ # routine gets invoked while no data was collected yet, or was
+ # flushed already.
+ # BEWARE! Will void internal state. Should really only get called
+ # "between operations", NOT between fields of an operation.
+
+ if self.atr_bytes:
+ key = 'ATR_DATA'
+ ss, es, _, text = self.text_for_accu_bytes(self.atr_bytes)
+ cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+ self.putx(ss, es, cls, texts)
+
+ if self.cmd_bytes:
+ key = 'CMD_DATA'
+ ss, es, _, text = self.text_for_accu_bytes(self.cmd_bytes)
+ cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+ self.putx(ss, es, cls, texts)
+
+ if self.out_bytes:
+ key = 'OUT_DATA'
+ ss, es, _, text = self.text_for_accu_bytes(self.out_bytes)
+ cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+ self.putx(ss, es, cls, texts)
+
+ if self.proc_state:
+ key = 'PROC_DATA'
+ ss = self.proc_state['ss']
+ es = self.proc_state['es']
+ clk = self.proc_state['clk']
+ high = self.proc_state['io1']
+ text = '{clk} clocks, I/O {high}'.format(clk = clk, high = int(high))
+ cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+ self.putx(ss, es, cls, texts)
+
+ self.atr_bytes = None
+ self.cmd_bytes = None
+ self.cmd_proc = None
+ self.out_len = None
+ self.out_bytes = None
+ self.proc_state = None
+ self.state = None
+
+ def handle_reset(self, ss, es, has_clk):
+ self.flush_queued()
+ key = '{}_SYM'.format('RESET' if has_clk else 'INTR')
+ cls, texts = self.lookup_proto_ann_txt(key, {})
+ self.putx(ss, es, cls, texts)
self.bits = []
- # Next data bytes will be Answer To Reset.
- self.cmd = 'ATR'
-
- def handle_command(self, pins):
- rst, clk, io = pins
- # XXX Is the comment inverted?
- # If I/O is rising -> command START
- # if I/O is falling -> command STOP and response data incoming
- self.cmd = 'CMD' if io == 0 else 'DATA'
+ self.state = 'ATR' if has_clk else None
+
+ def handle_command(self, ss, is_start):
+ if is_start:
+ self.flush_queued()
+ key = '{}_SYM'.format('START' if is_start else 'STOP')
+ cls, texts = self.lookup_proto_ann_txt(key, {})
+ self.putx(ss, ss, cls, texts)
self.bits = []
+ self.state = 'CMD' if is_start else 'DATA'
- # Gather 8 bits of data
- def handle_data(self, pins):
- rst, clk, io = pins
-
- # Remember the start of the first data/address bit. Collect
- # bits in LSB first order. "Estimate" the bit's width at first,
- # update end times as better data becomes available.
- # TODO This estimation logic is imprecise and fragile. A single
- # slightly stretched clock period throws off the following bit
- # annotation. Better look for more reliable conditions. Available
- # documentation suggests bit values are valid during high CLK.
- bit_val = io
- bit_ss = self.samplenum
- bit_es = bit_ss # self.bitwidth is not known yet.
- if self.bits:
- self.bits[-1][2] = bit_ss
- self.bits.append([bit_val, bit_ss, bit_es])
- if len(self.bits) < 8:
+ def command_check(self, ctrl, addr, data):
+ '''Interpret CTRL/ADDR/DATA command entry.'''
+
+ # See the Siemens Datasheet section 2.3 Commands. The abbreviated
+ # text variants are my guesses, terse for readability at coarser
+ # zoom levels.
+ codes_table = {
+ 0x30: {
+ 'fmt': [
+ 'read main memory, addr {addr:02x}',
+ 'RD-M @{addr:02x}',
+ ],
+ },
+ 0x31: {
+ 'fmt': [
+ 'read security memory',
+ 'RD-S',
+ ],
+ 'len': 4,
+ },
+ 0x33: {
+ 'fmt': [
+ 'compare verification data, addr {addr:02x}, data {data:02x}',
+ 'CMP-V @{addr:02x} ={data:02x}',
+ ],
+ 'proc': True,
+ },
+ 0x34: {
+ 'fmt': [
+ 'read protection memory, addr {addr:02x}',
+ 'RD-P @{addr:02x}',
+ ],
+ 'len': 4,
+ },
+ 0x38: {
+ 'fmt': [
+ 'update main memory, addr {addr:02x}, data {data:02x}',
+ 'WR-M @{addr:02x} ={data:02x}',
+ ],
+ 'proc': True,
+ },
+ 0x39: {
+ 'fmt': [
+ 'update security memory, addr {addr:02x}, data {data:02x}',
+ 'WR-S @{addr:02x} ={data:02x}',
+ ],
+ 'proc': True,
+ },
+ 0x3c: {
+ 'fmt': [
+ 'write protection memory, addr {addr:02x}, data {data:02x}',
+ 'WR-P @{addr:02x} ={data:02x}',
+ ],
+ 'proc': True,
+ },
+ }
+ code = codes_table.get(ctrl, {})
+ dflt_fmt = [
+ 'unknown, ctrl {ctrl:02x}, addr {addr:02x}, data {data:02x}',
+ 'UNK-{ctrl:02x} @{addr:02x}, ={data:02x}',
+ ]
+ fmt = code.get('fmt', dflt_fmt)
+ if not isinstance(fmt, (list, tuple,)):
+ fmt = [fmt,]
+ texts = [f.format(ctrl = ctrl, addr = addr, data = data) for f in fmt]
+ length = code.get('len', None)
+ is_proc = code.get('proc', False)
+ return texts, length, is_proc
+
+ def processing_start(self, ss, es, io_high):
+ self.proc_state = {
+ 'ss': ss or es,
+ 'es': es or ss,
+ 'clk': 0,
+ 'io1': bool(io_high),
+ }
+
+ def processing_update(self, es, clk_inc, io_high):
+ if es is not None and es > self.proc_state['es']:
+ self.proc_state['es'] = es
+ self.proc_state['clk'] += clk_inc
+ if io_high:
+ self.proc_state['io1'] = True
+
+ def handle_data_byte(self, ss, es, data, bits):
+ '''Accumulate CMD or OUT data bytes.'''
+
+ if self.state == 'ATR':
+ if not self.atr_bytes:
+ self.atr_bytes = []
+ self.atr_bytes.append([data, ss, es, bits,])
+ if len(self.atr_bytes) == 4:
+ self.flush_queued()
+ return
+
+ if self.state == 'CMD':
+ if not self.cmd_bytes:
+ self.cmd_bytes = []
+ self.cmd_bytes.append([data, ss, es, bits,])
+ if len(self.cmd_bytes) == 3:
+ ctrl, addr, data = [c[0] for c in self.cmd_bytes]
+ texts, length, proc = self.command_check(ctrl, addr, data)
+ # Immediately emit the annotation to not lose the text,
+ # and to support zoom levels for this specific case.
+ ss, es = self.cmd_bytes[0][1], self.cmd_bytes[-1][2]
+ cls = Ann.CMD_DATA
+ self.putx(ss, es, cls, texts)
+ self.cmd_bytes = []
+ # Prepare to continue either at OUT or PROC after CMD.
+ self.out_len = length
+ self.cmd_proc = bool(proc)
+ self.state = None
return
- bitwidth = self.bits[-1][1] - self.bits[-2][1]
- self.bits[-1][2] += bitwidth
- # Get the data byte value, and byte's ss/es.
- databyte = bitpack_lsb(self.bits, 0)
- byte_ss = self.bits[0][1]
- byte_es = self.bits[-1][2]
+ if self.state == 'OUT':
+ if not self.out_bytes:
+ self.out_bytes = []
+ self.out_bytes.append([data, ss, es, bits,])
+ if self.out_len is not None and len(self.out_bytes) == self.out_len:
+ self.flush_queued()
+ return
- self.putb(byte_ss, byte_es, Bin.SEND_DATA, bytes([databyte]))
+ def handle_data_bit(self, ss, es, bit):
+ '''Gather 8 bits of data (or track processing progress).'''
- # TODO Present bit values earlier. As soon as their es is known.
- for bit_val, bit_ss, bit_es in self.bits:
- cls, texts = lookup_proto_ann_txt('BIT', {'bit': bit_val})
- self.putx(bit_ss, bit_es, cls, texts)
+ # Switch late from DATA to either OUT or PROC. We can tell the
+ # type and potentially fixed length at the end of CMD already,
+ # but a START/STOP condition may void this information. So we
+ # do the switch at the first data bit after CMD.
+ # In the OUT case data bytes get accumulated, until either the
+ # expected byte count is reached, or another CMD starts. In the
+ # PROC case a high I/O level terminates execution.
+ if self.state == 'DATA':
+ if self.out_len:
+ self.state = 'OUT'
+ elif self.cmd_proc:
+ self.state = 'PROC'
+ self.processing_start(ss or es, es or ss, bit == 1)
+ else:
+ # Implementor's note: Handle unknown situations like
+ # outgoing data bytes, for the user's convenience. This
+ # will show OUT bytes even if it's just processing CLK
+ # cycles with constant or irrelevant I/O bit patterns.
+ self.state = 'OUT'
+ if self.state == 'PROC':
+ high = bit == 1
+ if ss is not None:
+ self.processing_update(ss, 0, high)
+ if es is not None:
+ self.processing_update(es, 1, high)
+ if high:
+ self.flush_queued()
+ return
- cls, texts = lookup_proto_ann_txt(self.cmd, {'data': databyte})
- if cls:
- self.putx(byte_ss, byte_es, cls, texts)
+ # This routine gets called two times per bit value. Track the
+ # bit's value and ss timestamp when the bit period starts. And
+ # update the es timestamp at the end of the bit's validity.
+ if ss is not None:
+ self.bits.append([bit, ss, es or ss])
+ return
+ if es is None:
+ # Unexpected invocation. Could be a glitch or invalid input
+ # data, or an interaction with RESET/START/STOP conditions.
+ self.bits = []
+ return
+ if not self.bits:
+ return
+ if bit is not None:
+ self.bits[-1][0] = bit
+ # TODO Check for consistent bit level at ss and es when
+ # the information was available? Is bit data sampled at
+ # different clock edges depending whether data is sent
+ # or received?
+ self.bits[-1][2] = es
+ # Emit the bit's annotation. See if a byte was received.
+ bit, ss, es = self.bits[-1]
+ cls, texts = self.lookup_proto_ann_txt('BIT_SYM', {'bit': bit})
+ self.putx(ss, es, cls, texts)
+ if len(self.bits) < 8:
+ return
- # Done with this packet.
+ # Get the data byte value, and the byte's ss/es. Emit the byte's
+ # annotation and binary output. Pass the byte to upper layers.
+ # TODO Vary annotation classes with the byte's position within
+ # a field? To tell CTRL/ADDR/DATA of a CMD entry apart?
+ bits = self.bits
self.bits = []
+ data = bitpack_lsb(bits, 0)
+ ss = bits[0][1]
+ es = bits[-1][2]
+
+ key = '{}_BYTE'.format(self.state)
+ cls, texts = self.lookup_proto_ann_txt(key, {'data': data})
+ if cls:
+ self.putx(ss, es, cls, texts)
+ self.putb(ss, es, Bin.BYTES, bytes([data]))
+
+ self.handle_data_byte(ss, es, data, bits)
def decode(self):
+ '''Decoder's main data interpretation loop.'''
+
+ # Signal conditions tracked by the protocol decoder:
+ # - Rising and falling RST edges, which span the width of a
+ # high-active RESET pulse. RST has highest priority, no
+ # other activity can take place in this period.
+ # - Rising and falling CLK edges when RST is active. The
+ # CLK pulse when RST is asserted will reset the card's
+ # address counter. RST alone can terminate memory reads.
+ # - Rising and falling CLK edges when RST is inactive. This
+ # determines the period where BIT values are valid.
+ # - I/O edges during high CLK. These are START and STOP
+ # conditions that tell COMMAND and DATA phases apart.
+ # - Rise of I/O during internal processing. This expression
+ # is an unconditional part of the .wait() condition set. It
+ # is assumed that skipping this match in many cases is more
+ # efficient than the permanent re-construction of the .wait()
+ # condition list in every loop iteration, and preferrable to
+ # the maintainance cost of duplicating RST and CLK handling
+ # when checking I/O during internal processing.
+ (
+ COND_RESET_START, COND_RESET_STOP,
+ COND_RSTCLK_START, COND_RSTCLK_STOP,
+ COND_DATA_START, COND_DATA_STOP,
+ COND_CMD_START, COND_CMD_STOP,
+ COND_PROC_IOH,
+ ) = range(9)
+ conditions = [
+ {Pin.RST: 'r'},
+ {Pin.RST: 'f'},
+ {Pin.RST: 'h', Pin.CLK: 'r'},
+ {Pin.RST: 'h', Pin.CLK: 'f'},
+ {Pin.RST: 'l', Pin.CLK: 'r'},
+ {Pin.RST: 'l', Pin.CLK: 'f'},
+ {Pin.CLK: 'h', Pin.IO: 'f'},
+ {Pin.CLK: 'h', Pin.IO: 'r'},
+ {Pin.RST: 'l', Pin.IO: 'r'},
+ ]
+
+ ss_reset = es_reset = ss_clk = es_clk = None
while True:
- # Signal conditions tracked by the protocol decoder:
- # - RESET condition (R): RST = rising
- # - Incoming data (D): RST = low, CLK = rising.
- # TODO Add "RST low, CLK fall" for "end of DATA" here?
- # - Command mode START: CLK = high, I/O = falling.
- # - Command mode STOP: CLK = high, I/O = rising.
- (COND_RESET, COND_DATA, COND_CMD_START, COND_CMD_STOP,) = range(4)
- conditions = [
- {Pin.RST: 'r'},
- {Pin.RST: 'l', Pin.CLK: 'r'},
- {Pin.CLK: 'h', Pin.IO: 'f'},
- {Pin.CLK: 'h', Pin.IO: 'r'},
- ]
+
+ is_outgoing = self.state == 'OUT'
+ is_processing = self.state == 'PROC'
pins = self.wait(conditions)
- if self.matched[COND_RESET]:
- self.handle_reset(pins)
- elif self.matched[COND_DATA]:
- self.handle_data(pins)
- elif self.matched[COND_CMD_START]:
- self.handle_command(pins)
- elif self.matched[COND_CMD_STOP]:
- self.handle_command(pins)
+ io = pins[Pin.IO]
+
+ # Handle RESET conditions, including an optional CLK pulse
+ # while RST is asserted.
+ if self.matched[COND_RESET_START]:
+ self.flush_queued()
+ ss_reset = self.samplenum
+ es_reset = ss_clk = es_clk = None
+ continue
+ if self.matched[COND_RESET_STOP]:
+ es_reset = self.samplenum
+ self.handle_reset(ss_reset or 0, es_reset, ss_clk and es_clk)
+ ss_reset = es_reset = ss_clk = es_clk = None
+ continue
+ if self.matched[COND_RSTCLK_START]:
+ ss_clk = self.samplenum
+ es_clk = None
+ continue
+ if self.matched[COND_RSTCLK_STOP]:
+ es_clk = self.samplenum
+ continue
+
+ # Handle data bits' validity boundaries. Also covers the
+ # periodic check for high I/O level and update of details
+ # during internal processing.
+ if self.matched[COND_DATA_START]:
+ self.handle_data_bit(self.samplenum, None, io)
+ continue
+ if self.matched[COND_DATA_STOP]:
+ self.handle_data_bit(None, self.samplenum, None)
+ continue
+
+ # Additional check for idle I/O during internal processing,
+ # independent of CLK edges this time. This assures that the
+ # decoder ends processing intervals as soon as possible, at
+ # the most precise timestamp.
+ if is_processing and self.matched[COND_PROC_IOH]:
+ self.handle_data_bit(self.samplenum, self.samplenum, io)
+ continue
+
+ # The START/STOP conditions are only applicable outside of
+ # "outgoing data" or "internal processing" periods. This is
+ # what the data sheet specifies.
+ # TODO There is the decoder's inability to reliably detect
+ # where memory reads are done because they reached the end
+ # of the chip's capacity. Which makes the decoder miss the
+ # next START symbol, and lose synchronization to the BIT
+ # stream (bit counts are off, which breaks the accumulation
+ # of bytes). That's why this decoder unconditionally keeps
+ # detecting the START condition although it should not.
+ if not is_outgoing and not is_processing:
+ if self.matched[COND_CMD_START]:
+ self.handle_command(self.samplenum, True)
+ continue
+ if self.matched[COND_CMD_STOP]:
+ self.handle_command(self.samplenum, False)
+ continue
+ if True: # HACK See the comment above.
+ if self.matched[COND_CMD_START]:
+ self.handle_command(self.samplenum, True)
+ continue