]> sigrok.org Git - libsigrokdecode.git/blob - decoders/sle44xx/pd.py
sle44xx: optionally use samplerate to show processing durations
[libsigrokdecode.git] / decoders / sle44xx / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2019 Federico Cerutti <federico@ceres-c.it>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 from common.srdhelper import bitpack_lsb
21 import sigrokdecode as srd
22
23 class Pin:
24     RST, CLK, IO, = range(3)
25
26 class Ann:
27     RESET_SYM, INTR_SYM, START_SYM, STOP_SYM, BIT_SYM, \
28     ATR_BYTE, CMD_BYTE, OUT_BYTE, PROC_BYTE, \
29     ATR_DATA, CMD_DATA, OUT_DATA, PROC_DATA, \
30     = range(13)
31
32 class Bin:
33     BYTES, = range(1)
34
35 class Decoder(srd.Decoder):
36     api_version = 3
37     id = 'sle44xx'
38     name = 'SLE 44xx'
39     longname = 'SLE44xx memory card'
40     desc = 'SLE 4418/28/32/42 memory card serial protocol'
41     license = 'gplv2+'
42     inputs = ['logic']
43     outputs = []
44     tags = ['Memory']
45     channels = (
46         {'id': 'rst', 'name': 'RST', 'desc': 'Reset line'},
47         {'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'},
48         {'id': 'io', 'name': 'I/O', 'desc': 'I/O data line'},
49     )
50     annotations = (
51         ('reset_sym', 'Reset Symbol'),
52         ('intr_sym', 'Interrupt Symbol'),
53         ('start_sym', 'Start Symbol'),
54         ('stop_sym', 'Stop Symbol'),
55         ('bit_sym', 'Bit Symbol'),
56         ('atr_byte', 'ATR Byte'),
57         ('cmd_byte', 'Command Byte'),
58         ('out_byte', 'Outgoing Byte'),
59         ('proc_byte', 'Processing Byte'),
60         ('atr_data', 'ATR data'),
61         ('cmd_data', 'Command data'),
62         ('out_data', 'Outgoing data'),
63         ('proc_data', 'Processing data'),
64     )
65     annotation_rows = (
66         ('symbols', 'Symbols', (Ann.RESET_SYM, Ann.INTR_SYM,
67             Ann.START_SYM, Ann.STOP_SYM, Ann.BIT_SYM,)),
68         ('fields', 'Fields', (Ann.ATR_BYTE,
69             Ann.CMD_BYTE, Ann.OUT_BYTE, Ann.PROC_BYTE,)),
70         ('operations', 'Operations', (Ann.ATR_DATA,
71             Ann.CMD_DATA, Ann.OUT_DATA, Ann.PROC_DATA,)),
72     )
73     binary = (
74         ('bytes', 'Bytes'),
75     )
76
77     def __init__(self):
78         self.reset()
79
80     def reset(self):
81         self.samplerate = None
82         self.bits = []
83         self.atr_bytes = []
84         self.cmd_bytes = []
85         self.cmd_proc = None
86         self.out_len = None
87         self.out_bytes = []
88         self.proc_state = None
89         self.state = None
90
91     def metadata(self, key, value):
92         if key == srd.SRD_CONF_SAMPLERATE:
93             self.samplerate = value
94
95     def start(self):
96         self.out_ann = self.register(srd.OUTPUT_ANN)
97         self.out_binary = self.register(srd.OUTPUT_BINARY)
98
99     def putx(self, ss, es, cls, data):
100         self.put(ss, es, self.out_ann, [cls, data,])
101
102     def putb(self, ss, es, cls , data):
103         self.put(ss, es, self.out_binary, [cls, data,])
104
105     def snums_to_usecs(self, snum_count):
106         if not self.samplerate:
107             return None
108         snums_per_usec = self.samplerate / 1e6
109         usecs = snum_count / snums_per_usec
110         return usecs
111
112     def lookup_proto_ann_txt(self, key, variables):
113         ann = {
114             'RESET_SYM': [Ann.RESET_SYM, 'Reset', 'R',],
115             'INTR_SYM': [Ann.INTR_SYM, 'Interrupt', 'Intr', 'I',],
116             'START_SYM': [Ann.START_SYM, 'Start', 'ST', 'S',],
117             'STOP_SYM': [Ann.STOP_SYM, 'Stop', 'SP', 'P',],
118             'BIT_SYM': [Ann.BIT_SYM, '{bit}',],
119             'ATR_BYTE': [Ann.ATR_BYTE,
120                 'Answer To Reset: {data:02x}',
121                 'ATR: {data:02x}',
122                 '{data:02x}',
123             ],
124             'CMD_BYTE': [Ann.CMD_BYTE,
125                 'Command: {data:02x}',
126                 'Cmd: {data:02x}',
127                 '{data:02x}',
128             ],
129             'OUT_BYTE': [Ann.OUT_BYTE,
130                 'Outgoing data: {data:02x}',
131                 'Data: {data:02x}',
132                 '{data:02x}',
133             ],
134             'PROC_BYTE': [Ann.PROC_BYTE,
135                 'Internal processing: {data:02x}',
136                 'Proc: {data:02x}',
137                 '{data:02x}',
138             ],
139             'ATR_DATA': [Ann.ATR_DATA,
140                 'Answer To Reset: {data}',
141                 'ATR: {data}',
142                 '{data}',
143             ],
144             'CMD_DATA': [Ann.CMD_DATA,
145                 'Command: {data}',
146                 'Cmd: {data}',
147                 '{data}',
148             ],
149             'OUT_DATA': [Ann.OUT_DATA,
150                 'Outgoing: {data}',
151                 'Out: {data}',
152                 '{data}',
153             ],
154             'PROC_DATA': [Ann.PROC_DATA,
155                 'Processing: {data}',
156                 'Proc: {data}',
157                 '{data}',
158             ],
159         }.get(key, None)
160         if ann is None:
161             return None, []
162         cls, texts = ann[0], ann[1:]
163         texts = [t.format(**variables) for t in texts]
164         return cls, texts
165
166     def text_for_accu_bytes(self, accu):
167         if not accu:
168             return None, None, None, None
169         ss, es = accu[0][1], accu[-1][2]
170         data = [a[0] for a in accu]
171         text = " ".join(['{:02x}'.format(a) for a in data])
172         return ss, es, data, text
173
174     def flush_queued(self):
175         '''Flush previously accumulated operations details.'''
176
177         # Can be called when either the completion of an operation got
178         # detected (reliably), or when some kind of reset condition was
179         # met while a potential previously observed operation has not
180         # been postprocessed yet (best effort). Should not harm when the
181         # routine gets invoked while no data was collected yet, or was
182         # flushed already.
183         # BEWARE! Will void internal state. Should really only get called
184         # "between operations", NOT between fields of an operation.
185
186         if self.atr_bytes:
187             key = 'ATR_DATA'
188             ss, es, _, text = self.text_for_accu_bytes(self.atr_bytes)
189             cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
190             self.putx(ss, es, cls, texts)
191
192         if self.cmd_bytes:
193             key = 'CMD_DATA'
194             ss, es, _, text = self.text_for_accu_bytes(self.cmd_bytes)
195             cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
196             self.putx(ss, es, cls, texts)
197
198         if self.out_bytes:
199             key = 'OUT_DATA'
200             ss, es, _, text = self.text_for_accu_bytes(self.out_bytes)
201             cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
202             self.putx(ss, es, cls, texts)
203
204         if self.proc_state:
205             key = 'PROC_DATA'
206             ss = self.proc_state['ss']
207             es = self.proc_state['es']
208             clk = self.proc_state['clk']
209             high = self.proc_state['io1']
210             text = '{clk} clocks, I/O {high}'.format(clk = clk, high = int(high))
211             usecs = self.snums_to_usecs(es - ss)
212             if usecs:
213                 msecs = usecs / 1000
214                 text = '{msecs:.2f} ms, {text}'.format(msecs = msecs, text = text)
215             cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
216             self.putx(ss, es, cls, texts)
217
218         self.atr_bytes = None
219         self.cmd_bytes = None
220         self.cmd_proc = None
221         self.out_len = None
222         self.out_bytes = None
223         self.proc_state = None
224         self.state = None
225
226     def handle_reset(self, ss, es, has_clk):
227         self.flush_queued()
228         key = '{}_SYM'.format('RESET' if has_clk else 'INTR')
229         cls, texts = self.lookup_proto_ann_txt(key, {})
230         self.putx(ss, es, cls, texts)
231         self.bits = []
232         self.state = 'ATR' if has_clk else None
233
234     def handle_command(self, ss, is_start):
235         if is_start:
236             self.flush_queued()
237         key = '{}_SYM'.format('START' if is_start else 'STOP')
238         cls, texts = self.lookup_proto_ann_txt(key, {})
239         self.putx(ss, ss, cls, texts)
240         self.bits = []
241         self.state = 'CMD' if is_start else 'DATA'
242
243     def command_check(self, ctrl, addr, data):
244         '''Interpret CTRL/ADDR/DATA command entry.'''
245
246         # See the Siemens Datasheet section 2.3 Commands. The abbreviated
247         # text variants are my guesses, terse for readability at coarser
248         # zoom levels.
249         codes_table = {
250             0x30: {
251                 'fmt': [
252                     'read main memory, addr {addr:02x}',
253                     'RD-M @{addr:02x}',
254                 ],
255             },
256             0x31: {
257                 'fmt': [
258                     'read security memory',
259                     'RD-S',
260                 ],
261                 'len': 4,
262             },
263             0x33: {
264                 'fmt': [
265                     'compare verification data, addr {addr:02x}, data {data:02x}',
266                     'CMP-V @{addr:02x} ={data:02x}',
267                 ],
268                 'proc': True,
269             },
270             0x34: {
271                 'fmt': [
272                     'read protection memory, addr {addr:02x}',
273                     'RD-P @{addr:02x}',
274                 ],
275                 'len': 4,
276             },
277             0x38: {
278                 'fmt': [
279                     'update main memory, addr {addr:02x}, data {data:02x}',
280                     'WR-M @{addr:02x} ={data:02x}',
281                 ],
282                 'proc': True,
283             },
284             0x39: {
285                 'fmt': [
286                     'update security memory, addr {addr:02x}, data {data:02x}',
287                     'WR-S @{addr:02x} ={data:02x}',
288                 ],
289                 'proc': True,
290             },
291             0x3c: {
292                 'fmt': [
293                     'write protection memory, addr {addr:02x}, data {data:02x}',
294                     'WR-P @{addr:02x} ={data:02x}',
295                 ],
296                 'proc': True,
297             },
298         }
299         code = codes_table.get(ctrl, {})
300         dflt_fmt = [
301             'unknown, ctrl {ctrl:02x}, addr {addr:02x}, data {data:02x}',
302             'UNK-{ctrl:02x} @{addr:02x}, ={data:02x}',
303         ]
304         fmt = code.get('fmt', dflt_fmt)
305         if not isinstance(fmt, (list, tuple,)):
306             fmt = [fmt,]
307         texts = [f.format(ctrl = ctrl, addr = addr, data = data) for f in fmt]
308         length = code.get('len', None)
309         is_proc = code.get('proc', False)
310         return texts, length, is_proc
311
312     def processing_start(self, ss, es, io_high):
313         self.proc_state = {
314             'ss': ss or es,
315             'es': es or ss,
316             'clk': 0,
317             'io1': bool(io_high),
318         }
319
320     def processing_update(self, es, clk_inc, io_high):
321         if es is not None and es > self.proc_state['es']:
322             self.proc_state['es'] = es
323         self.proc_state['clk'] += clk_inc
324         if io_high:
325             self.proc_state['io1'] = True
326
327     def handle_data_byte(self, ss, es, data, bits):
328         '''Accumulate CMD or OUT data bytes.'''
329
330         if self.state == 'ATR':
331             if not self.atr_bytes:
332                 self.atr_bytes = []
333             self.atr_bytes.append([data, ss, es, bits,])
334             if len(self.atr_bytes) == 4:
335                 self.flush_queued()
336             return
337
338         if self.state == 'CMD':
339             if not self.cmd_bytes:
340                 self.cmd_bytes = []
341             self.cmd_bytes.append([data, ss, es, bits,])
342             if len(self.cmd_bytes) == 3:
343                 ctrl, addr, data = [c[0] for c in self.cmd_bytes]
344                 texts, length, proc = self.command_check(ctrl, addr, data)
345                 # Immediately emit the annotation to not lose the text,
346                 # and to support zoom levels for this specific case.
347                 ss, es = self.cmd_bytes[0][1], self.cmd_bytes[-1][2]
348                 cls = Ann.CMD_DATA
349                 self.putx(ss, es, cls, texts)
350                 self.cmd_bytes = []
351                 # Prepare to continue either at OUT or PROC after CMD.
352                 self.out_len = length
353                 self.cmd_proc = bool(proc)
354                 self.state = None
355             return
356
357         if self.state == 'OUT':
358             if not self.out_bytes:
359                 self.out_bytes = []
360             self.out_bytes.append([data, ss, es, bits,])
361             if self.out_len is not None and len(self.out_bytes) == self.out_len:
362                 self.flush_queued()
363             return
364
365     def handle_data_bit(self, ss, es, bit):
366         '''Gather 8 bits of data (or track processing progress).'''
367
368         # Switch late from DATA to either OUT or PROC. We can tell the
369         # type and potentially fixed length at the end of CMD already,
370         # but a START/STOP condition may void this information. So we
371         # do the switch at the first data bit after CMD.
372         # In the OUT case data bytes get accumulated, until either the
373         # expected byte count is reached, or another CMD starts. In the
374         # PROC case a high I/O level terminates execution.
375         if self.state == 'DATA':
376             if self.out_len:
377                 self.state = 'OUT'
378             elif self.cmd_proc:
379                 self.state = 'PROC'
380                 self.processing_start(ss or es, es or ss, bit == 1)
381             else:
382                 # Implementor's note: Handle unknown situations like
383                 # outgoing data bytes, for the user's convenience. This
384                 # will show OUT bytes even if it's just processing CLK
385                 # cycles with constant or irrelevant I/O bit patterns.
386                 self.state = 'OUT'
387         if self.state == 'PROC':
388             high = bit == 1
389             if ss is not None:
390                 self.processing_update(ss, 0, high)
391             if es is not None:
392                 self.processing_update(es, 1, high)
393             if high:
394                 self.flush_queued()
395             return
396
397         # This routine gets called two times per bit value. Track the
398         # bit's value and ss timestamp when the bit period starts. And
399         # update the es timestamp at the end of the bit's validity.
400         if ss is not None:
401             self.bits.append([bit, ss, es or ss])
402             return
403         if es is None:
404             # Unexpected invocation. Could be a glitch or invalid input
405             # data, or an interaction with RESET/START/STOP conditions.
406             self.bits = []
407             return
408         if not self.bits:
409             return
410         if bit is not None:
411             self.bits[-1][0] = bit
412             # TODO Check for consistent bit level at ss and es when
413             # the information was available? Is bit data sampled at
414             # different clock edges depending whether data is sent
415             # or received?
416         self.bits[-1][2] = es
417         # Emit the bit's annotation. See if a byte was received.
418         bit, ss, es = self.bits[-1]
419         cls, texts = self.lookup_proto_ann_txt('BIT_SYM', {'bit': bit})
420         self.putx(ss, es, cls, texts)
421         if len(self.bits) < 8:
422             return
423
424         # Get the data byte value, and the byte's ss/es. Emit the byte's
425         # annotation and binary output. Pass the byte to upper layers.
426         # TODO Vary annotation classes with the byte's position within
427         # a field? To tell CTRL/ADDR/DATA of a CMD entry apart?
428         bits = self.bits
429         self.bits = []
430         data = bitpack_lsb(bits, 0)
431         ss = bits[0][1]
432         es = bits[-1][2]
433
434         key = '{}_BYTE'.format(self.state)
435         cls, texts = self.lookup_proto_ann_txt(key, {'data': data})
436         if cls:
437             self.putx(ss, es, cls, texts)
438         self.putb(ss, es, Bin.BYTES, bytes([data]))
439
440         self.handle_data_byte(ss, es, data, bits)
441
442     def decode(self):
443         '''Decoder's main data interpretation loop.'''
444
445         # Signal conditions tracked by the protocol decoder:
446         # - Rising and falling RST edges, which span the width of a
447         #   high-active RESET pulse. RST has highest priority, no
448         #   other activity can take place in this period.
449         # - Rising and falling CLK edges when RST is active. The
450         #   CLK pulse when RST is asserted will reset the card's
451         #   address counter. RST alone can terminate memory reads.
452         # - Rising and falling CLK edges when RST is inactive. This
453         #   determines the period where BIT values are valid.
454         # - I/O edges during high CLK. These are START and STOP
455         #   conditions that tell COMMAND and DATA phases apart.
456         # - Rise of I/O during internal processing. This expression
457         #   is an unconditional part of the .wait() condition set. It
458         #   is assumed that skipping this match in many cases is more
459         #   efficient than the permanent re-construction of the .wait()
460         #   condition list in every loop iteration, and preferrable to
461         #   the maintainance cost of duplicating RST and CLK handling
462         #   when checking I/O during internal processing.
463         (
464             COND_RESET_START, COND_RESET_STOP,
465             COND_RSTCLK_START, COND_RSTCLK_STOP,
466             COND_DATA_START, COND_DATA_STOP,
467             COND_CMD_START, COND_CMD_STOP,
468             COND_PROC_IOH,
469         ) = range(9)
470         conditions = [
471             {Pin.RST: 'r'},
472             {Pin.RST: 'f'},
473             {Pin.RST: 'h', Pin.CLK: 'r'},
474             {Pin.RST: 'h', Pin.CLK: 'f'},
475             {Pin.RST: 'l', Pin.CLK: 'r'},
476             {Pin.RST: 'l', Pin.CLK: 'f'},
477             {Pin.CLK: 'h', Pin.IO: 'f'},
478             {Pin.CLK: 'h', Pin.IO: 'r'},
479             {Pin.RST: 'l', Pin.IO: 'r'},
480         ]
481
482         ss_reset = es_reset = ss_clk = es_clk = None
483         while True:
484
485             is_outgoing = self.state == 'OUT'
486             is_processing = self.state == 'PROC'
487             pins = self.wait(conditions)
488             io = pins[Pin.IO]
489
490             # Handle RESET conditions, including an optional CLK pulse
491             # while RST is asserted.
492             if self.matched[COND_RESET_START]:
493                 self.flush_queued()
494                 ss_reset = self.samplenum
495                 es_reset = ss_clk = es_clk = None
496                 continue
497             if self.matched[COND_RESET_STOP]:
498                 es_reset = self.samplenum
499                 self.handle_reset(ss_reset or 0, es_reset, ss_clk and es_clk)
500                 ss_reset = es_reset = ss_clk = es_clk = None
501                 continue
502             if self.matched[COND_RSTCLK_START]:
503                 ss_clk = self.samplenum
504                 es_clk = None
505                 continue
506             if self.matched[COND_RSTCLK_STOP]:
507                 es_clk = self.samplenum
508                 continue
509
510             # Handle data bits' validity boundaries. Also covers the
511             # periodic check for high I/O level and update of details
512             # during internal processing.
513             if self.matched[COND_DATA_START]:
514                 self.handle_data_bit(self.samplenum, None, io)
515                 continue
516             if self.matched[COND_DATA_STOP]:
517                 self.handle_data_bit(None, self.samplenum, None)
518                 continue
519
520             # Additional check for idle I/O during internal processing,
521             # independent of CLK edges this time. This assures that the
522             # decoder ends processing intervals as soon as possible, at
523             # the most precise timestamp.
524             if is_processing and self.matched[COND_PROC_IOH]:
525                 self.handle_data_bit(self.samplenum, self.samplenum, io)
526                 continue
527
528             # The START/STOP conditions are only applicable outside of
529             # "outgoing data" or "internal processing" periods. This is
530             # what the data sheet specifies.
531             # TODO There is the decoder's inability to reliably detect
532             # where memory reads are done because they reached the end
533             # of the chip's capacity. Which makes the decoder miss the
534             # next START symbol, and lose synchronization to the BIT
535             # stream (bit counts are off, which breaks the accumulation
536             # of bytes). That's why this decoder unconditionally keeps
537             # detecting the START condition although it should not.
538             if not is_outgoing and not is_processing:
539                 if self.matched[COND_CMD_START]:
540                     self.handle_command(self.samplenum, True)
541                     continue
542                 if self.matched[COND_CMD_STOP]:
543                     self.handle_command(self.samplenum, False)
544                     continue
545             if True: # HACK See the comment above.
546                 if self.matched[COND_CMD_START]:
547                     self.handle_command(self.samplenum, True)
548                     continue