]> sigrok.org Git - libsigrokdecode.git/blobdiff - decoders/sle44xx/pd.py
sle44xx: support memory read "to end of capacity"
[libsigrokdecode.git] / decoders / sle44xx / pd.py
index 25b1cc22e467e4bde3aaa2c6a501c9d22d81e1c3..9f33207789636b5bbb668df8df5ad1d61ad99321 100644 (file)
 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
 ##
 
+from common.srdhelper import bitpack_lsb
 import sigrokdecode as srd
 
 class Pin:
     RST, CLK, IO, = range(3)
 
 class Ann:
-    BIT, ATR, CMD, DATA, RESET, = range(5)
+    RESET_SYM, INTR_SYM, START_SYM, STOP_SYM, BIT_SYM, \
+    ATR_BYTE, CMD_BYTE, OUT_BYTE, PROC_BYTE, \
+    ATR_DATA, CMD_DATA, OUT_DATA, PROC_DATA, \
+    = range(13)
 
 class Bin:
-    SEND_DATA, = range(1)
-
-# CMD: [annotation class index, long annotation, short annotation]
-proto = {
-    'ATR':   [Ann.ATR,   'ATR',     'ATR'],
-    'CMD':   [Ann.CMD,   'Command', 'C'],
-    'DATA':  [Ann.DATA,  'Data',    'D'],
-    'RESET': [Ann.RESET, 'Reset',   'R'],
-}
+    BYTES, = range(1)
 
 class Decoder(srd.Decoder):
     api_version = 3
@@ -52,30 +48,46 @@ class Decoder(srd.Decoder):
         {'id': 'io', 'name': 'I/O', 'desc': 'I/O data line'},
     )
     annotations = (
-        ('bit', 'Bit'),
-        ('atr', 'ATR'),
-        ('cmd', 'Command'),
-        ('data', 'Data exchange'),
-        ('reset', 'Reset'),
+        ('reset_sym', 'Reset Symbol'),
+        ('intr_sym', 'Interrupt Symbol'),
+        ('start_sym', 'Start Symbol'),
+        ('stop_sym', 'Stop Symbol'),
+        ('bit_sym', 'Bit Symbol'),
+        ('atr_byte', 'ATR Byte'),
+        ('cmd_byte', 'Command Byte'),
+        ('out_byte', 'Outgoing Byte'),
+        ('proc_byte', 'Processing Byte'),
+        ('atr_data', 'ATR data'),
+        ('cmd_data', 'Command data'),
+        ('out_data', 'Outgoing data'),
+        ('proc_data', 'Processing data'),
     )
     annotation_rows = (
-        ('bits', 'Bits', (Ann.BIT,)),
-        ('fields', 'Fields', (Ann.ATR, Ann.CMD, Ann.DATA)),
-        ('interrupts', 'Interrupts', (Ann.RESET,)),
+        ('symbols', 'Symbols', (Ann.RESET_SYM, Ann.INTR_SYM,
+            Ann.START_SYM, Ann.STOP_SYM, Ann.BIT_SYM,)),
+        ('fields', 'Fields', (Ann.ATR_BYTE,
+            Ann.CMD_BYTE, Ann.OUT_BYTE, Ann.PROC_BYTE,)),
+        ('operations', 'Operations', (Ann.ATR_DATA,
+            Ann.CMD_DATA, Ann.OUT_DATA, Ann.PROC_DATA,)),
     )
     binary = (
-        ('send-data', 'Send data'),
+        ('bytes', 'Bytes'),
     )
 
     def __init__(self):
         self.reset()
 
     def reset(self):
-        self.ss = self.es = self.ss_byte = -1
-        self.bitcount = 0
-        self.databyte = 0
+        self.samplerate = None
+        self.max_addr = 256
         self.bits = []
-        self.cmd = 'RESET'
+        self.atr_bytes = []
+        self.cmd_bytes = []
+        self.cmd_proc = None
+        self.out_len = None
+        self.out_bytes = []
+        self.proc_state = None
+        self.state = None
 
     def metadata(self, key, value):
         if key == srd.SRD_CONF_SAMPLERATE:
@@ -85,88 +97,445 @@ class Decoder(srd.Decoder):
         self.out_ann = self.register(srd.OUTPUT_ANN)
         self.out_binary = self.register(srd.OUTPUT_BINARY)
 
-    def putx(self, data):
-        self.put(self.ss, self.es, self.out_ann, data)
+    def putx(self, ss, es, cls, data):
+        self.put(ss, es, self.out_ann, [cls, data,])
+
+    def putb(self, ss, es, cls , data):
+        self.put(ss, es, self.out_binary, [cls, data,])
+
+    def snums_to_usecs(self, snum_count):
+        if not self.samplerate:
+            return None
+        snums_per_usec = self.samplerate / 1e6
+        usecs = snum_count / snums_per_usec
+        return usecs
+
+    def lookup_proto_ann_txt(self, key, variables):
+        ann = {
+            'RESET_SYM': [Ann.RESET_SYM, 'Reset', 'R',],
+            'INTR_SYM': [Ann.INTR_SYM, 'Interrupt', 'Intr', 'I',],
+            'START_SYM': [Ann.START_SYM, 'Start', 'ST', 'S',],
+            'STOP_SYM': [Ann.STOP_SYM, 'Stop', 'SP', 'P',],
+            'BIT_SYM': [Ann.BIT_SYM, '{bit}',],
+            'ATR_BYTE': [Ann.ATR_BYTE,
+                'Answer To Reset: {data:02x}',
+                'ATR: {data:02x}',
+                '{data:02x}',
+            ],
+            'CMD_BYTE': [Ann.CMD_BYTE,
+                'Command: {data:02x}',
+                'Cmd: {data:02x}',
+                '{data:02x}',
+            ],
+            'OUT_BYTE': [Ann.OUT_BYTE,
+                'Outgoing data: {data:02x}',
+                'Data: {data:02x}',
+                '{data:02x}',
+            ],
+            'PROC_BYTE': [Ann.PROC_BYTE,
+                'Internal processing: {data:02x}',
+                'Proc: {data:02x}',
+                '{data:02x}',
+            ],
+            'ATR_DATA': [Ann.ATR_DATA,
+                'Answer To Reset: {data}',
+                'ATR: {data}',
+                '{data}',
+            ],
+            'CMD_DATA': [Ann.CMD_DATA,
+                'Command: {data}',
+                'Cmd: {data}',
+                '{data}',
+            ],
+            'OUT_DATA': [Ann.OUT_DATA,
+                'Outgoing: {data}',
+                'Out: {data}',
+                '{data}',
+            ],
+            'PROC_DATA': [Ann.PROC_DATA,
+                'Processing: {data}',
+                'Proc: {data}',
+                '{data}',
+            ],
+        }.get(key, None)
+        if ann is None:
+            return None, []
+        cls, texts = ann[0], ann[1:]
+        texts = [t.format(**variables) for t in texts]
+        return cls, texts
+
+    def text_for_accu_bytes(self, accu):
+        if not accu:
+            return None, None, None, None
+        ss, es = accu[0][1], accu[-1][2]
+        data = [a[0] for a in accu]
+        text = " ".join(['{:02x}'.format(a) for a in data])
+        return ss, es, data, text
+
+    def flush_queued(self):
+        '''Flush previously accumulated operations details.'''
+
+        # Can be called when either the completion of an operation got
+        # detected (reliably), or when some kind of reset condition was
+        # met while a potential previously observed operation has not
+        # been postprocessed yet (best effort). Should not harm when the
+        # routine gets invoked while no data was collected yet, or was
+        # flushed already.
+        # BEWARE! Will void internal state. Should really only get called
+        # "between operations", NOT between fields of an operation.
+
+        if self.atr_bytes:
+            key = 'ATR_DATA'
+            ss, es, _, text = self.text_for_accu_bytes(self.atr_bytes)
+            cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+            self.putx(ss, es, cls, texts)
+
+        if self.cmd_bytes:
+            key = 'CMD_DATA'
+            ss, es, _, text = self.text_for_accu_bytes(self.cmd_bytes)
+            cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+            self.putx(ss, es, cls, texts)
 
-    def putb(self, data):
-        self.put(self.ss, self.es, self.out_binary, data)
+        if self.out_bytes:
+            key = 'OUT_DATA'
+            ss, es, _, text = self.text_for_accu_bytes(self.out_bytes)
+            cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+            self.putx(ss, es, cls, texts)
 
-    def handle_reset(self, pins):
-        self.ss, self.es = self.samplenum, self.samplenum
-        cmd = 'RESET' # No need to set the global self.cmd as this command is atomic
-        self.putx([proto[cmd][0], proto[cmd][1:]])
-        self.bitcount = self.databyte = 0
+        if self.proc_state:
+            key = 'PROC_DATA'
+            ss = self.proc_state['ss']
+            es = self.proc_state['es']
+            clk = self.proc_state['clk']
+            high = self.proc_state['io1']
+            text = '{clk} clocks, I/O {high}'.format(clk = clk, high = int(high))
+            usecs = self.snums_to_usecs(es - ss)
+            if usecs:
+                msecs = usecs / 1000
+                text = '{msecs:.2f} ms, {text}'.format(msecs = msecs, text = text)
+            cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
+            self.putx(ss, es, cls, texts)
+
+        self.atr_bytes = None
+        self.cmd_bytes = None
+        self.cmd_proc = None
+        self.out_len = None
+        self.out_bytes = None
+        self.proc_state = None
+        self.state = None
+
+    def handle_reset(self, ss, es, has_clk):
+        self.flush_queued()
+        key = '{}_SYM'.format('RESET' if has_clk else 'INTR')
+        cls, texts = self.lookup_proto_ann_txt(key, {})
+        self.putx(ss, es, cls, texts)
         self.bits = []
-        self.cmd = 'ATR' # Next data bytes will be ATR
-
-    def handle_command(self, pins):
-        rst, clk, io = pins
-        self.ss, self.es = self.samplenum, self.samplenum
-        # If I/O is rising -> command START
-        # if I/O is falling -> command STOP and response data incoming
-        self.cmd = 'CMD' if (io == 0) else 'DATA'
-        self.bitcount = self.databyte = 0
+        self.state = 'ATR' if has_clk else None
+
+    def handle_command(self, ss, is_start):
+        if is_start:
+            self.flush_queued()
+        key = '{}_SYM'.format('START' if is_start else 'STOP')
+        cls, texts = self.lookup_proto_ann_txt(key, {})
+        self.putx(ss, ss, cls, texts)
         self.bits = []
+        self.state = 'CMD' if is_start else 'DATA'
+
+    def command_check(self, ctrl, addr, data):
+        '''Interpret CTRL/ADDR/DATA command entry.'''
+
+        # See the Siemens Datasheet section 2.3 Commands. The abbreviated
+        # text variants are my guesses, terse for readability at coarser
+        # zoom levels.
+        codes_table = {
+            0x30: {
+                'fmt': [
+                    'read main memory, addr {addr:02x}',
+                    'RD-M @{addr:02x}',
+                ],
+                'len': lambda ctrl, addr, data: self.max_addr - addr,
+            },
+            0x31: {
+                'fmt': [
+                    'read security memory',
+                    'RD-S',
+                ],
+                'len': 4,
+            },
+            0x33: {
+                'fmt': [
+                    'compare verification data, addr {addr:02x}, data {data:02x}',
+                    'CMP-V @{addr:02x} ={data:02x}',
+                ],
+                'proc': True,
+            },
+            0x34: {
+                'fmt': [
+                    'read protection memory, addr {addr:02x}',
+                    'RD-P @{addr:02x}',
+                ],
+                'len': 4,
+            },
+            0x38: {
+                'fmt': [
+                    'update main memory, addr {addr:02x}, data {data:02x}',
+                    'WR-M @{addr:02x} ={data:02x}',
+                ],
+                'proc': True,
+            },
+            0x39: {
+                'fmt': [
+                    'update security memory, addr {addr:02x}, data {data:02x}',
+                    'WR-S @{addr:02x} ={data:02x}',
+                ],
+                'proc': True,
+            },
+            0x3c: {
+                'fmt': [
+                    'write protection memory, addr {addr:02x}, data {data:02x}',
+                    'WR-P @{addr:02x} ={data:02x}',
+                ],
+                'proc': True,
+            },
+        }
+        code = codes_table.get(ctrl, {})
+        dflt_fmt = [
+            'unknown, ctrl {ctrl:02x}, addr {addr:02x}, data {data:02x}',
+            'UNK-{ctrl:02x} @{addr:02x}, ={data:02x}',
+        ]
+        fmt = code.get('fmt', dflt_fmt)
+        if not isinstance(fmt, (list, tuple,)):
+            fmt = [fmt,]
+        texts = [f.format(ctrl = ctrl, addr = addr, data = data) for f in fmt]
+        length = code.get('len', None)
+        if callable(length):
+            length = length(ctrl, addr, data)
+        is_proc = code.get('proc', False)
+        return texts, length, is_proc
+
+    def processing_start(self, ss, es, io_high):
+        self.proc_state = {
+            'ss': ss or es,
+            'es': es or ss,
+            'clk': 0,
+            'io1': bool(io_high),
+        }
+
+    def processing_update(self, es, clk_inc, io_high):
+        if es is not None and es > self.proc_state['es']:
+            self.proc_state['es'] = es
+        self.proc_state['clk'] += clk_inc
+        if io_high:
+            self.proc_state['io1'] = True
+
+    def handle_data_byte(self, ss, es, data, bits):
+        '''Accumulate CMD or OUT data bytes.'''
 
-    # Gather 8 bits of data
-    def handle_data(self, pins):
-        rst, clk, io = pins
-
-        # Data is transmitted LSB-first.
-        self.databyte |= (io << self.bitcount)
-
-        # Remember the start of the first data/address bit.
-        if self.bitcount == 0:
-            self.ss_byte = self.samplenum
-
-        # Store individual bits and their start/end samplenumbers.
-        # In the list, index 0 represents the LSB (SLE44xx transmits LSB-first).
-        self.bits.insert(0, [io, self.samplenum, self.samplenum])
-        if self.bitcount > 0:
-            self.bits[1][2] = self.samplenum
-        if self.bitcount == 7:
-            self.bitwidth = self.bits[1][2] - self.bits[2][2]
-            self.bits[0][2] += self.bitwidth
-
-        # Return if we haven't collected all 8 bits, yet.
-        if self.bitcount < 7:
-            self.bitcount += 1
+        if self.state == 'ATR':
+            if not self.atr_bytes:
+                self.atr_bytes = []
+            self.atr_bytes.append([data, ss, es, bits,])
+            if len(self.atr_bytes) == 4:
+                self.flush_queued()
             return
 
-        self.ss, self.es = self.ss_byte, self.samplenum + self.bitwidth
+        if self.state == 'CMD':
+            if not self.cmd_bytes:
+                self.cmd_bytes = []
+            self.cmd_bytes.append([data, ss, es, bits,])
+            if len(self.cmd_bytes) == 3:
+                ctrl, addr, data = [c[0] for c in self.cmd_bytes]
+                texts, length, proc = self.command_check(ctrl, addr, data)
+                # Immediately emit the annotation to not lose the text,
+                # and to support zoom levels for this specific case.
+                ss, es = self.cmd_bytes[0][1], self.cmd_bytes[-1][2]
+                cls = Ann.CMD_DATA
+                self.putx(ss, es, cls, texts)
+                self.cmd_bytes = []
+                # Prepare to continue either at OUT or PROC after CMD.
+                self.out_len = length
+                self.cmd_proc = bool(proc)
+                self.state = None
+            return
 
-        self.putb([Bin.SEND_DATA, bytes([self.databyte])])
+        if self.state == 'OUT':
+            if not self.out_bytes:
+                self.out_bytes = []
+            self.out_bytes.append([data, ss, es, bits,])
+            if self.out_len is not None and len(self.out_bytes) == self.out_len:
+                self.flush_queued()
+            return
 
-        for bit in self.bits:
-            self.put(bit[1], bit[2], self.out_ann, [Ann.BIT, ['%d' % bit[0]]])
+    def handle_data_bit(self, ss, es, bit):
+        '''Gather 8 bits of data (or track processing progress).'''
 
-        self.putx([proto[self.cmd][0], ['%s: %02X' % (proto[self.cmd][1], self.databyte),
-                   '%s: %02X' % (proto[self.cmd][2], self.databyte), '%02X' % self.databyte]])
+        # Switch late from DATA to either OUT or PROC. We can tell the
+        # type and potentially fixed length at the end of CMD already,
+        # but a START/STOP condition may void this information. So we
+        # do the switch at the first data bit after CMD.
+        # In the OUT case data bytes get accumulated, until either the
+        # expected byte count is reached, or another CMD starts. In the
+        # PROC case a high I/O level terminates execution.
+        if self.state == 'DATA':
+            if self.out_len:
+                self.state = 'OUT'
+            elif self.cmd_proc:
+                self.state = 'PROC'
+                self.processing_start(ss or es, es or ss, bit == 1)
+            else:
+                # Implementor's note: Handle unknown situations like
+                # outgoing data bytes, for the user's convenience. This
+                # will show OUT bytes even if it's just processing CLK
+                # cycles with constant or irrelevant I/O bit patterns.
+                self.state = 'OUT'
+        if self.state == 'PROC':
+            high = bit == 1
+            if ss is not None:
+                self.processing_update(ss, 0, high)
+            if es is not None:
+                self.processing_update(es, 1, high)
+            if high:
+                self.flush_queued()
+            return
 
-        # Done with this packet.
-        self.bitcount = self.databyte = 0
+        # This routine gets called two times per bit value. Track the
+        # bit's value and ss timestamp when the bit period starts. And
+        # update the es timestamp at the end of the bit's validity.
+        if ss is not None:
+            self.bits.append([bit, ss, es or ss])
+            return
+        if es is None:
+            # Unexpected invocation. Could be a glitch or invalid input
+            # data, or an interaction with RESET/START/STOP conditions.
+            self.bits = []
+            return
+        if not self.bits:
+            return
+        if bit is not None:
+            self.bits[-1][0] = bit
+            # TODO Check for consistent bit level at ss and es when
+            # the information was available? Is bit data sampled at
+            # different clock edges depending whether data is sent
+            # or received?
+        self.bits[-1][2] = es
+        # Emit the bit's annotation. See if a byte was received.
+        bit, ss, es = self.bits[-1]
+        cls, texts = self.lookup_proto_ann_txt('BIT_SYM', {'bit': bit})
+        self.putx(ss, es, cls, texts)
+        if len(self.bits) < 8:
+            return
+
+        # Get the data byte value, and the byte's ss/es. Emit the byte's
+        # annotation and binary output. Pass the byte to upper layers.
+        # TODO Vary annotation classes with the byte's position within
+        # a field? To tell CTRL/ADDR/DATA of a CMD entry apart?
+        bits = self.bits
         self.bits = []
+        data = bitpack_lsb(bits, 0)
+        ss = bits[0][1]
+        es = bits[-1][2]
+
+        key = '{}_BYTE'.format(self.state)
+        cls, texts = self.lookup_proto_ann_txt(key, {'data': data})
+        if cls:
+            self.putx(ss, es, cls, texts)
+        self.putb(ss, es, Bin.BYTES, bytes([data]))
+
+        self.handle_data_byte(ss, es, data, bits)
 
     def decode(self):
+        '''Decoder's main data interpretation loop.'''
+
+        # Signal conditions tracked by the protocol decoder:
+        # - Rising and falling RST edges, which span the width of a
+        #   high-active RESET pulse. RST has highest priority, no
+        #   other activity can take place in this period.
+        # - Rising and falling CLK edges when RST is active. The
+        #   CLK pulse when RST is asserted will reset the card's
+        #   address counter. RST alone can terminate memory reads.
+        # - Rising and falling CLK edges when RST is inactive. This
+        #   determines the period where BIT values are valid.
+        # - I/O edges during high CLK. These are START and STOP
+        #   conditions that tell COMMAND and DATA phases apart.
+        # - Rise of I/O during internal processing. This expression
+        #   is an unconditional part of the .wait() condition set. It
+        #   is assumed that skipping this match in many cases is more
+        #   efficient than the permanent re-construction of the .wait()
+        #   condition list in every loop iteration, and preferrable to
+        #   the maintainance cost of duplicating RST and CLK handling
+        #   when checking I/O during internal processing.
+        (
+            COND_RESET_START, COND_RESET_STOP,
+            COND_RSTCLK_START, COND_RSTCLK_STOP,
+            COND_DATA_START, COND_DATA_STOP,
+            COND_CMD_START, COND_CMD_STOP,
+            COND_PROC_IOH,
+        ) = range(9)
+        conditions = [
+            {Pin.RST: 'r'},
+            {Pin.RST: 'f'},
+            {Pin.RST: 'h', Pin.CLK: 'r'},
+            {Pin.RST: 'h', Pin.CLK: 'f'},
+            {Pin.RST: 'l', Pin.CLK: 'r'},
+            {Pin.RST: 'l', Pin.CLK: 'f'},
+            {Pin.CLK: 'h', Pin.IO: 'f'},
+            {Pin.CLK: 'h', Pin.IO: 'r'},
+            {Pin.RST: 'l', Pin.IO: 'r'},
+        ]
+
+        ss_reset = es_reset = ss_clk = es_clk = None
         while True:
-            # Signal conditions tracked by the protocol decoder:
-            # - RESET condition (R): RST = rising
-            # - Incoming data (D): RST = low, CLK = rising.
-            # - Command mode START: CLK = high, I/O = falling.
-            # - Command mode STOP: CLK = high, I/O = rising.
-            (COND_RESET, COND_DATA, COND_CMD_START, COND_CMD_STOP,) = range(4)
-            conditions = [
-                {Pin.RST: 'r'},
-                {Pin.RST: 'l', Pin.CLK: 'r'},
-                {Pin.CLK: 'h', Pin.IO: 'f'},
-                {Pin.CLK: 'h', Pin.IO: 'r'},
-            ]
+
+            is_outgoing = self.state == 'OUT'
+            is_processing = self.state == 'PROC'
             pins = self.wait(conditions)
-            if self.matched[COND_RESET]:
-                self.handle_reset(pins)
-            elif self.matched[COND_DATA]:
-                self.handle_data(pins)
-            elif self.matched[COND_CMD_START]:
-                self.handle_command(pins)
-            elif self.matched[COND_CMD_STOP]:
-                self.handle_command(pins)
+            io = pins[Pin.IO]
+
+            # Handle RESET conditions, including an optional CLK pulse
+            # while RST is asserted.
+            if self.matched[COND_RESET_START]:
+                self.flush_queued()
+                ss_reset = self.samplenum
+                es_reset = ss_clk = es_clk = None
+                continue
+            if self.matched[COND_RESET_STOP]:
+                es_reset = self.samplenum
+                self.handle_reset(ss_reset or 0, es_reset, ss_clk and es_clk)
+                ss_reset = es_reset = ss_clk = es_clk = None
+                continue
+            if self.matched[COND_RSTCLK_START]:
+                ss_clk = self.samplenum
+                es_clk = None
+                continue
+            if self.matched[COND_RSTCLK_STOP]:
+                es_clk = self.samplenum
+                continue
+
+            # Handle data bits' validity boundaries. Also covers the
+            # periodic check for high I/O level and update of details
+            # during internal processing.
+            if self.matched[COND_DATA_START]:
+                self.handle_data_bit(self.samplenum, None, io)
+                continue
+            if self.matched[COND_DATA_STOP]:
+                self.handle_data_bit(None, self.samplenum, None)
+                continue
+
+            # Additional check for idle I/O during internal processing,
+            # independent of CLK edges this time. This assures that the
+            # decoder ends processing intervals as soon as possible, at
+            # the most precise timestamp.
+            if is_processing and self.matched[COND_PROC_IOH]:
+                self.handle_data_bit(self.samplenum, self.samplenum, io)
+                continue
+
+            # The START/STOP conditions are only applicable outside of
+            # "outgoing data" or "internal processing" periods. This is
+            # what the data sheet specifies.
+            if not is_outgoing and not is_processing:
+                if self.matched[COND_CMD_START]:
+                    self.handle_command(self.samplenum, True)
+                    continue
+                if self.matched[COND_CMD_STOP]:
+                    self.handle_command(self.samplenum, False)
+                    continue