]> sigrok.org Git - libsigrokdecode.git/blame - decoders/sle44xx/pd.py
sle44xx: optionally use samplerate to show processing durations
[libsigrokdecode.git] / decoders / sle44xx / pd.py
CommitLineData
7e87b2f7
FC
1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2019 Federico Cerutti <federico@ceres-c.it>
5##
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, see <http://www.gnu.org/licenses/>.
18##
19
80c76d20 20from common.srdhelper import bitpack_lsb
7e87b2f7
FC
21import sigrokdecode as srd
22
5c47b179
GS
23class Pin:
24 RST, CLK, IO, = range(3)
25
52f08e6d 26class Ann:
7f2fb1b3
GS
27 RESET_SYM, INTR_SYM, START_SYM, STOP_SYM, BIT_SYM, \
28 ATR_BYTE, CMD_BYTE, OUT_BYTE, PROC_BYTE, \
29 ATR_DATA, CMD_DATA, OUT_DATA, PROC_DATA, \
30 = range(13)
52f08e6d
GS
31
32class Bin:
7f2fb1b3 33 BYTES, = range(1)
c328c181 34
7e87b2f7
FC
35class Decoder(srd.Decoder):
36 api_version = 3
37 id = 'sle44xx'
38 name = 'SLE 44xx'
b46b88f3 39 longname = 'SLE44xx memory card'
7e87b2f7
FC
40 desc = 'SLE 4418/28/32/42 memory card serial protocol'
41 license = 'gplv2+'
42 inputs = ['logic']
0411c41c 43 outputs = []
ead00318 44 tags = ['Memory']
7e87b2f7
FC
45 channels = (
46 {'id': 'rst', 'name': 'RST', 'desc': 'Reset line'},
47 {'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'},
48 {'id': 'io', 'name': 'I/O', 'desc': 'I/O data line'},
49 )
50 annotations = (
7f2fb1b3
GS
51 ('reset_sym', 'Reset Symbol'),
52 ('intr_sym', 'Interrupt Symbol'),
53 ('start_sym', 'Start Symbol'),
54 ('stop_sym', 'Stop Symbol'),
55 ('bit_sym', 'Bit Symbol'),
56 ('atr_byte', 'ATR Byte'),
57 ('cmd_byte', 'Command Byte'),
58 ('out_byte', 'Outgoing Byte'),
59 ('proc_byte', 'Processing Byte'),
60 ('atr_data', 'ATR data'),
61 ('cmd_data', 'Command data'),
62 ('out_data', 'Outgoing data'),
63 ('proc_data', 'Processing data'),
7e87b2f7
FC
64 )
65 annotation_rows = (
7f2fb1b3
GS
66 ('symbols', 'Symbols', (Ann.RESET_SYM, Ann.INTR_SYM,
67 Ann.START_SYM, Ann.STOP_SYM, Ann.BIT_SYM,)),
68 ('fields', 'Fields', (Ann.ATR_BYTE,
69 Ann.CMD_BYTE, Ann.OUT_BYTE, Ann.PROC_BYTE,)),
70 ('operations', 'Operations', (Ann.ATR_DATA,
71 Ann.CMD_DATA, Ann.OUT_DATA, Ann.PROC_DATA,)),
7e87b2f7
FC
72 )
73 binary = (
7f2fb1b3 74 ('bytes', 'Bytes'),
7e87b2f7
FC
75 )
76
77 def __init__(self):
78 self.reset()
79
80 def reset(self):
6bddf39e 81 self.samplerate = None
7e87b2f7 82 self.bits = []
7f2fb1b3
GS
83 self.atr_bytes = []
84 self.cmd_bytes = []
85 self.cmd_proc = None
86 self.out_len = None
87 self.out_bytes = []
88 self.proc_state = None
89 self.state = None
7e87b2f7
FC
90
91 def metadata(self, key, value):
92 if key == srd.SRD_CONF_SAMPLERATE:
93 self.samplerate = value
94
95 def start(self):
7e87b2f7
FC
96 self.out_ann = self.register(srd.OUTPUT_ANN)
97 self.out_binary = self.register(srd.OUTPUT_BINARY)
98
1dfaf1e8
GS
99 def putx(self, ss, es, cls, data):
100 self.put(ss, es, self.out_ann, [cls, data,])
7e87b2f7 101
1dfaf1e8
GS
102 def putb(self, ss, es, cls , data):
103 self.put(ss, es, self.out_binary, [cls, data,])
7e87b2f7 104
6bddf39e
GS
105 def snums_to_usecs(self, snum_count):
106 if not self.samplerate:
107 return None
108 snums_per_usec = self.samplerate / 1e6
109 usecs = snum_count / snums_per_usec
110 return usecs
111
7f2fb1b3
GS
112 def lookup_proto_ann_txt(self, key, variables):
113 ann = {
114 'RESET_SYM': [Ann.RESET_SYM, 'Reset', 'R',],
115 'INTR_SYM': [Ann.INTR_SYM, 'Interrupt', 'Intr', 'I',],
116 'START_SYM': [Ann.START_SYM, 'Start', 'ST', 'S',],
117 'STOP_SYM': [Ann.STOP_SYM, 'Stop', 'SP', 'P',],
118 'BIT_SYM': [Ann.BIT_SYM, '{bit}',],
119 'ATR_BYTE': [Ann.ATR_BYTE,
120 'Answer To Reset: {data:02x}',
121 'ATR: {data:02x}',
122 '{data:02x}',
123 ],
124 'CMD_BYTE': [Ann.CMD_BYTE,
125 'Command: {data:02x}',
126 'Cmd: {data:02x}',
127 '{data:02x}',
128 ],
129 'OUT_BYTE': [Ann.OUT_BYTE,
130 'Outgoing data: {data:02x}',
131 'Data: {data:02x}',
132 '{data:02x}',
133 ],
134 'PROC_BYTE': [Ann.PROC_BYTE,
135 'Internal processing: {data:02x}',
136 'Proc: {data:02x}',
137 '{data:02x}',
138 ],
139 'ATR_DATA': [Ann.ATR_DATA,
140 'Answer To Reset: {data}',
141 'ATR: {data}',
142 '{data}',
143 ],
144 'CMD_DATA': [Ann.CMD_DATA,
145 'Command: {data}',
146 'Cmd: {data}',
147 '{data}',
148 ],
149 'OUT_DATA': [Ann.OUT_DATA,
150 'Outgoing: {data}',
151 'Out: {data}',
152 '{data}',
153 ],
154 'PROC_DATA': [Ann.PROC_DATA,
155 'Processing: {data}',
156 'Proc: {data}',
157 '{data}',
158 ],
159 }.get(key, None)
160 if ann is None:
161 return None, []
162 cls, texts = ann[0], ann[1:]
163 texts = [t.format(**variables) for t in texts]
164 return cls, texts
165
166 def text_for_accu_bytes(self, accu):
167 if not accu:
168 return None, None, None, None
169 ss, es = accu[0][1], accu[-1][2]
170 data = [a[0] for a in accu]
171 text = " ".join(['{:02x}'.format(a) for a in data])
172 return ss, es, data, text
173
174 def flush_queued(self):
175 '''Flush previously accumulated operations details.'''
176
177 # Can be called when either the completion of an operation got
178 # detected (reliably), or when some kind of reset condition was
179 # met while a potential previously observed operation has not
180 # been postprocessed yet (best effort). Should not harm when the
181 # routine gets invoked while no data was collected yet, or was
182 # flushed already.
183 # BEWARE! Will void internal state. Should really only get called
184 # "between operations", NOT between fields of an operation.
185
186 if self.atr_bytes:
187 key = 'ATR_DATA'
188 ss, es, _, text = self.text_for_accu_bytes(self.atr_bytes)
189 cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
190 self.putx(ss, es, cls, texts)
191
192 if self.cmd_bytes:
193 key = 'CMD_DATA'
194 ss, es, _, text = self.text_for_accu_bytes(self.cmd_bytes)
195 cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
196 self.putx(ss, es, cls, texts)
197
198 if self.out_bytes:
199 key = 'OUT_DATA'
200 ss, es, _, text = self.text_for_accu_bytes(self.out_bytes)
201 cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
202 self.putx(ss, es, cls, texts)
203
204 if self.proc_state:
205 key = 'PROC_DATA'
206 ss = self.proc_state['ss']
207 es = self.proc_state['es']
208 clk = self.proc_state['clk']
209 high = self.proc_state['io1']
210 text = '{clk} clocks, I/O {high}'.format(clk = clk, high = int(high))
6bddf39e
GS
211 usecs = self.snums_to_usecs(es - ss)
212 if usecs:
213 msecs = usecs / 1000
214 text = '{msecs:.2f} ms, {text}'.format(msecs = msecs, text = text)
7f2fb1b3
GS
215 cls, texts = self.lookup_proto_ann_txt(key, {'data': text})
216 self.putx(ss, es, cls, texts)
217
218 self.atr_bytes = None
219 self.cmd_bytes = None
220 self.cmd_proc = None
221 self.out_len = None
222 self.out_bytes = None
223 self.proc_state = None
224 self.state = None
225
226 def handle_reset(self, ss, es, has_clk):
227 self.flush_queued()
228 key = '{}_SYM'.format('RESET' if has_clk else 'INTR')
229 cls, texts = self.lookup_proto_ann_txt(key, {})
230 self.putx(ss, es, cls, texts)
7e87b2f7 231 self.bits = []
7f2fb1b3
GS
232 self.state = 'ATR' if has_clk else None
233
234 def handle_command(self, ss, is_start):
235 if is_start:
236 self.flush_queued()
237 key = '{}_SYM'.format('START' if is_start else 'STOP')
238 cls, texts = self.lookup_proto_ann_txt(key, {})
239 self.putx(ss, ss, cls, texts)
7e87b2f7 240 self.bits = []
7f2fb1b3 241 self.state = 'CMD' if is_start else 'DATA'
7e87b2f7 242
7f2fb1b3
GS
243 def command_check(self, ctrl, addr, data):
244 '''Interpret CTRL/ADDR/DATA command entry.'''
245
246 # See the Siemens Datasheet section 2.3 Commands. The abbreviated
247 # text variants are my guesses, terse for readability at coarser
248 # zoom levels.
249 codes_table = {
250 0x30: {
251 'fmt': [
252 'read main memory, addr {addr:02x}',
253 'RD-M @{addr:02x}',
254 ],
255 },
256 0x31: {
257 'fmt': [
258 'read security memory',
259 'RD-S',
260 ],
261 'len': 4,
262 },
263 0x33: {
264 'fmt': [
265 'compare verification data, addr {addr:02x}, data {data:02x}',
266 'CMP-V @{addr:02x} ={data:02x}',
267 ],
268 'proc': True,
269 },
270 0x34: {
271 'fmt': [
272 'read protection memory, addr {addr:02x}',
273 'RD-P @{addr:02x}',
274 ],
275 'len': 4,
276 },
277 0x38: {
278 'fmt': [
279 'update main memory, addr {addr:02x}, data {data:02x}',
280 'WR-M @{addr:02x} ={data:02x}',
281 ],
282 'proc': True,
283 },
284 0x39: {
285 'fmt': [
286 'update security memory, addr {addr:02x}, data {data:02x}',
287 'WR-S @{addr:02x} ={data:02x}',
288 ],
289 'proc': True,
290 },
291 0x3c: {
292 'fmt': [
293 'write protection memory, addr {addr:02x}, data {data:02x}',
294 'WR-P @{addr:02x} ={data:02x}',
295 ],
296 'proc': True,
297 },
298 }
299 code = codes_table.get(ctrl, {})
300 dflt_fmt = [
301 'unknown, ctrl {ctrl:02x}, addr {addr:02x}, data {data:02x}',
302 'UNK-{ctrl:02x} @{addr:02x}, ={data:02x}',
303 ]
304 fmt = code.get('fmt', dflt_fmt)
305 if not isinstance(fmt, (list, tuple,)):
306 fmt = [fmt,]
307 texts = [f.format(ctrl = ctrl, addr = addr, data = data) for f in fmt]
308 length = code.get('len', None)
309 is_proc = code.get('proc', False)
310 return texts, length, is_proc
311
312 def processing_start(self, ss, es, io_high):
313 self.proc_state = {
314 'ss': ss or es,
315 'es': es or ss,
316 'clk': 0,
317 'io1': bool(io_high),
318 }
319
320 def processing_update(self, es, clk_inc, io_high):
321 if es is not None and es > self.proc_state['es']:
322 self.proc_state['es'] = es
323 self.proc_state['clk'] += clk_inc
324 if io_high:
325 self.proc_state['io1'] = True
326
327 def handle_data_byte(self, ss, es, data, bits):
328 '''Accumulate CMD or OUT data bytes.'''
329
330 if self.state == 'ATR':
331 if not self.atr_bytes:
332 self.atr_bytes = []
333 self.atr_bytes.append([data, ss, es, bits,])
334 if len(self.atr_bytes) == 4:
335 self.flush_queued()
336 return
337
338 if self.state == 'CMD':
339 if not self.cmd_bytes:
340 self.cmd_bytes = []
341 self.cmd_bytes.append([data, ss, es, bits,])
342 if len(self.cmd_bytes) == 3:
343 ctrl, addr, data = [c[0] for c in self.cmd_bytes]
344 texts, length, proc = self.command_check(ctrl, addr, data)
345 # Immediately emit the annotation to not lose the text,
346 # and to support zoom levels for this specific case.
347 ss, es = self.cmd_bytes[0][1], self.cmd_bytes[-1][2]
348 cls = Ann.CMD_DATA
349 self.putx(ss, es, cls, texts)
350 self.cmd_bytes = []
351 # Prepare to continue either at OUT or PROC after CMD.
352 self.out_len = length
353 self.cmd_proc = bool(proc)
354 self.state = None
7e87b2f7
FC
355 return
356
7f2fb1b3
GS
357 if self.state == 'OUT':
358 if not self.out_bytes:
359 self.out_bytes = []
360 self.out_bytes.append([data, ss, es, bits,])
361 if self.out_len is not None and len(self.out_bytes) == self.out_len:
362 self.flush_queued()
363 return
7e87b2f7 364
7f2fb1b3
GS
365 def handle_data_bit(self, ss, es, bit):
366 '''Gather 8 bits of data (or track processing progress).'''
7e87b2f7 367
7f2fb1b3
GS
368 # Switch late from DATA to either OUT or PROC. We can tell the
369 # type and potentially fixed length at the end of CMD already,
370 # but a START/STOP condition may void this information. So we
371 # do the switch at the first data bit after CMD.
372 # In the OUT case data bytes get accumulated, until either the
373 # expected byte count is reached, or another CMD starts. In the
374 # PROC case a high I/O level terminates execution.
375 if self.state == 'DATA':
376 if self.out_len:
377 self.state = 'OUT'
378 elif self.cmd_proc:
379 self.state = 'PROC'
380 self.processing_start(ss or es, es or ss, bit == 1)
381 else:
382 # Implementor's note: Handle unknown situations like
383 # outgoing data bytes, for the user's convenience. This
384 # will show OUT bytes even if it's just processing CLK
385 # cycles with constant or irrelevant I/O bit patterns.
386 self.state = 'OUT'
387 if self.state == 'PROC':
388 high = bit == 1
389 if ss is not None:
390 self.processing_update(ss, 0, high)
391 if es is not None:
392 self.processing_update(es, 1, high)
393 if high:
394 self.flush_queued()
395 return
7e87b2f7 396
7f2fb1b3
GS
397 # This routine gets called two times per bit value. Track the
398 # bit's value and ss timestamp when the bit period starts. And
399 # update the es timestamp at the end of the bit's validity.
400 if ss is not None:
401 self.bits.append([bit, ss, es or ss])
402 return
403 if es is None:
404 # Unexpected invocation. Could be a glitch or invalid input
405 # data, or an interaction with RESET/START/STOP conditions.
406 self.bits = []
407 return
408 if not self.bits:
409 return
410 if bit is not None:
411 self.bits[-1][0] = bit
412 # TODO Check for consistent bit level at ss and es when
413 # the information was available? Is bit data sampled at
414 # different clock edges depending whether data is sent
415 # or received?
416 self.bits[-1][2] = es
417 # Emit the bit's annotation. See if a byte was received.
418 bit, ss, es = self.bits[-1]
419 cls, texts = self.lookup_proto_ann_txt('BIT_SYM', {'bit': bit})
420 self.putx(ss, es, cls, texts)
421 if len(self.bits) < 8:
422 return
7e87b2f7 423
7f2fb1b3
GS
424 # Get the data byte value, and the byte's ss/es. Emit the byte's
425 # annotation and binary output. Pass the byte to upper layers.
426 # TODO Vary annotation classes with the byte's position within
427 # a field? To tell CTRL/ADDR/DATA of a CMD entry apart?
428 bits = self.bits
7e87b2f7 429 self.bits = []
7f2fb1b3
GS
430 data = bitpack_lsb(bits, 0)
431 ss = bits[0][1]
432 es = bits[-1][2]
433
434 key = '{}_BYTE'.format(self.state)
435 cls, texts = self.lookup_proto_ann_txt(key, {'data': data})
436 if cls:
437 self.putx(ss, es, cls, texts)
438 self.putb(ss, es, Bin.BYTES, bytes([data]))
439
440 self.handle_data_byte(ss, es, data, bits)
7e87b2f7
FC
441
442 def decode(self):
7f2fb1b3
GS
443 '''Decoder's main data interpretation loop.'''
444
445 # Signal conditions tracked by the protocol decoder:
446 # - Rising and falling RST edges, which span the width of a
447 # high-active RESET pulse. RST has highest priority, no
448 # other activity can take place in this period.
449 # - Rising and falling CLK edges when RST is active. The
450 # CLK pulse when RST is asserted will reset the card's
451 # address counter. RST alone can terminate memory reads.
452 # - Rising and falling CLK edges when RST is inactive. This
453 # determines the period where BIT values are valid.
454 # - I/O edges during high CLK. These are START and STOP
455 # conditions that tell COMMAND and DATA phases apart.
456 # - Rise of I/O during internal processing. This expression
457 # is an unconditional part of the .wait() condition set. It
458 # is assumed that skipping this match in many cases is more
459 # efficient than the permanent re-construction of the .wait()
460 # condition list in every loop iteration, and preferrable to
461 # the maintainance cost of duplicating RST and CLK handling
462 # when checking I/O during internal processing.
463 (
464 COND_RESET_START, COND_RESET_STOP,
465 COND_RSTCLK_START, COND_RSTCLK_STOP,
466 COND_DATA_START, COND_DATA_STOP,
467 COND_CMD_START, COND_CMD_STOP,
468 COND_PROC_IOH,
469 ) = range(9)
470 conditions = [
471 {Pin.RST: 'r'},
472 {Pin.RST: 'f'},
473 {Pin.RST: 'h', Pin.CLK: 'r'},
474 {Pin.RST: 'h', Pin.CLK: 'f'},
475 {Pin.RST: 'l', Pin.CLK: 'r'},
476 {Pin.RST: 'l', Pin.CLK: 'f'},
477 {Pin.CLK: 'h', Pin.IO: 'f'},
478 {Pin.CLK: 'h', Pin.IO: 'r'},
479 {Pin.RST: 'l', Pin.IO: 'r'},
480 ]
481
482 ss_reset = es_reset = ss_clk = es_clk = None
7e87b2f7 483 while True:
7f2fb1b3
GS
484
485 is_outgoing = self.state == 'OUT'
486 is_processing = self.state == 'PROC'
5c47b179 487 pins = self.wait(conditions)
7f2fb1b3
GS
488 io = pins[Pin.IO]
489
490 # Handle RESET conditions, including an optional CLK pulse
491 # while RST is asserted.
492 if self.matched[COND_RESET_START]:
493 self.flush_queued()
494 ss_reset = self.samplenum
495 es_reset = ss_clk = es_clk = None
496 continue
497 if self.matched[COND_RESET_STOP]:
498 es_reset = self.samplenum
499 self.handle_reset(ss_reset or 0, es_reset, ss_clk and es_clk)
500 ss_reset = es_reset = ss_clk = es_clk = None
501 continue
502 if self.matched[COND_RSTCLK_START]:
503 ss_clk = self.samplenum
504 es_clk = None
505 continue
506 if self.matched[COND_RSTCLK_STOP]:
507 es_clk = self.samplenum
508 continue
509
510 # Handle data bits' validity boundaries. Also covers the
511 # periodic check for high I/O level and update of details
512 # during internal processing.
513 if self.matched[COND_DATA_START]:
514 self.handle_data_bit(self.samplenum, None, io)
515 continue
516 if self.matched[COND_DATA_STOP]:
517 self.handle_data_bit(None, self.samplenum, None)
518 continue
519
520 # Additional check for idle I/O during internal processing,
521 # independent of CLK edges this time. This assures that the
522 # decoder ends processing intervals as soon as possible, at
523 # the most precise timestamp.
524 if is_processing and self.matched[COND_PROC_IOH]:
525 self.handle_data_bit(self.samplenum, self.samplenum, io)
526 continue
527
528 # The START/STOP conditions are only applicable outside of
529 # "outgoing data" or "internal processing" periods. This is
530 # what the data sheet specifies.
531 # TODO There is the decoder's inability to reliably detect
532 # where memory reads are done because they reached the end
533 # of the chip's capacity. Which makes the decoder miss the
534 # next START symbol, and lose synchronization to the BIT
535 # stream (bit counts are off, which breaks the accumulation
536 # of bytes). That's why this decoder unconditionally keeps
537 # detecting the START condition although it should not.
538 if not is_outgoing and not is_processing:
539 if self.matched[COND_CMD_START]:
540 self.handle_command(self.samplenum, True)
541 continue
542 if self.matched[COND_CMD_STOP]:
543 self.handle_command(self.samplenum, False)
544 continue
545 if True: # HACK See the comment above.
546 if self.matched[COND_CMD_START]:
547 self.handle_command(self.samplenum, True)
548 continue