]> sigrok.org Git - libsigrokdecode.git/blob - decoders/swim/pd.py
2f0c12162ba925dda6d7113b86b08f1dbccb7f32
[libsigrokdecode.git] / decoders / swim / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2018 Mike Jagdis <mjagdis@eris-associates.co.uk>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, write to the Free Software
18 ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19 ##
20
21 import math
22 import sigrokdecode as srd
23
24 class SamplerateError(Exception):
25     pass
26
27 class Decoder(srd.Decoder):
28     api_version = 3
29     id = 'swim'
30     name = 'SWIM'
31     longname = 'STM8 SWIM bus'
32     desc = 'STM8 Single Wire Interface Module (SWIM) protocol.'
33     license = 'gplv2+'
34     inputs = ['logic']
35     outputs = []
36     tags = ['Debug/trace']
37     options = (
38         {'id': 'debug', 'desc': 'Debug', 'default': 'no', 'values': ('yes', 'no') },
39     )
40     channels = (
41         {'id': 'swim', 'name': 'SWIM', 'desc': 'SWIM data line'},
42     )
43     annotations = (
44         ('bit', 'Bit'),
45         ('enterseq', 'SWIM enter sequence'),
46         ('start-host', 'Start bit (host)'),
47         ('start-target', 'Start bit (target)'),
48         ('parity', 'Parity bit'),
49         ('ack', 'Acknowledgement'),
50         ('nack', 'Negative acknowledgement'),
51         ('byte-write', 'Byte write'),
52         ('byte-read', 'Byte read'),
53         ('cmd-unknown', 'Unknown SWIM command'),
54         ('cmd', 'SWIM command'),
55         ('bytes', 'Byte count'),
56         ('address', 'Address'),
57         ('data-write', 'Data write'),
58         ('data-read', 'Data read'),
59         ('debug', 'Debug'),
60     )
61     annotation_rows = (
62         ('bits', 'Bits', (0,)),
63         ('framing', 'Framing', (2, 3, 4, 5, 6, 7, 8)),
64         ('protocol', 'Protocol', (1, 9, 10, 11, 12, 13, 14)),
65         ('debug', 'Debug', (15,)),
66     )
67     binary = (
68         ('tx', 'Dump of data written to target'),
69         ('rx', 'Dump of data read from target'),
70     )
71
72     def __init__(self):
73         # SWIM clock for the target is normally HSI/2 where HSI is 8MHz +- 5%
74         # although the divisor can be removed by setting the SWIMCLK bit in
75         # the CLK_SWIMCCR register. There is no standard for the host so we
76         # will be generous and assume it is using an 8MHz +- 10% oscillator.
77         # We do not need to be accurate. We just need to avoid treating enter
78         # sequence pulses as bits. A synchronization frame will cause this
79         # to be adjusted.
80         self.HSI = 8000000
81         self.HSI_min = self.HSI * 0.9
82         self.HSI_max = self.HSI * 1.1
83         self.swim_clock = self.HSI_min / 2
84
85         self.eseq_edge = [[-1, None], [-1, None]]
86         self.eseq_pairnum = 0
87         self.eseq_pairstart = None
88
89         self.reset()
90
91     def reset(self):
92         self.bit_edge = [[-1, None], [-1, None]]
93         self.bit_maxlen = -1
94         self.bitseq_len = 0
95         self.bitseq_end = None
96         self.proto_state = 'CMD'
97
98     def metadata(self, key, value):
99         if key == srd.SRD_CONF_SAMPLERATE:
100             self.samplerate = value
101
102     def adjust_timings(self):
103         # A low-speed bit is 22 SWIM clocks long.
104         # There are options to shorten bits to 10 clocks or use HSI rather
105         # than HSI/2 as the SWIM clock but the longest valid bit should be no
106         # more than this many samples. This does not need to be accurate.
107         # It exists simply to prevent bits extending unecessarily far into
108         # trailing bus-idle periods. This will be adjusted every time we see
109         # a synchronization frame or start bit in order to show idle periods
110         # as accurately as possible.
111         self.bit_reflen = math.ceil(self.samplerate * 22 / self.swim_clock)
112
113     def start(self):
114         self.out_ann = self.register(srd.OUTPUT_ANN)
115         self.out_binary = self.register(srd.OUTPUT_BINARY)
116
117         if not self.samplerate:
118             raise SamplerateError('Cannot decode without samplerate.')
119
120         # A synchronization frame is a low that lasts for more than 64 but no
121         # more than 128 SWIM clock periods based on the standard SWIM clock.
122         # Note: we also allow for the possibility that the SWIM clock divisor
123         # has been disabled here.
124         self.sync_reflen_min = math.floor(self.samplerate * 64 / self.HSI_max)
125         self.sync_reflen_max = math.ceil(self.samplerate * 128 / (self.HSI_min / 2))
126
127         self.debug = True if self.options['debug'] == 'yes' else False
128
129         # The SWIM entry sequence is 4 pulses at 2kHz followed by 4 at 1kHz.
130         self.eseq_reflen = math.ceil(self.samplerate / 2048)
131
132         self.adjust_timings()
133
134     def protocol(self):
135         if self.proto_state == 'CMD':
136             # Command
137             if self.bitseq_value == 0x00:
138                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['system reset', 'SRST', '!']])
139             elif self.bitseq_value == 0x01:
140                 self.proto_state = 'N'
141                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['read on-the-fly', 'ROTF', 'r']])
142             elif self.bitseq_value == 0x02:
143                 self.proto_state = 'N'
144                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['write on-the-fly', 'WOTF', 'w']])
145             else:
146                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [9, ['unknown', 'UNK']])
147         elif self.proto_state == 'N':
148             # Number of bytes
149             self.proto_byte_count = self.bitseq_value
150             self.proto_state = '@E'
151             self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [11, ['byte count 0x%02x' % self.bitseq_value, 'bytes 0x%02x' % self.bitseq_value, '0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
152         elif self.proto_state == '@E':
153             # Address byte 1
154             self.proto_addr = self.bitseq_value
155             self.proto_addr_start = self.bitseq_start
156             self.proto_state = '@H'
157         elif self.proto_state == '@H':
158             # Address byte 2
159             self.proto_addr = (self.proto_addr << 8) | self.bitseq_value
160             self.proto_state = '@L'
161         elif self.proto_state == '@L':
162             # Address byte 3
163             self.proto_addr = (self.proto_addr << 8) | self.bitseq_value
164             self.proto_state = 'D'
165             self.put(self.proto_addr_start, self.bitseq_end, self.out_ann, [12, ['address 0x%06x' % self.proto_addr, 'addr 0x%06x' % self.proto_addr, '0x%06x' % self.proto_addr, '%06x' %self.proto_addr, '%x' % self.proto_addr]])
166         else:
167             if self.proto_byte_count > 0:
168                 self.proto_byte_count -= 1
169                 if self.proto_byte_count == 0:
170                     self.proto_state = 'CMD'
171
172             self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [13 + self.bitseq_dir, ['0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
173             self.put(self.bitseq_start, self.bitseq_end, self.out_binary, [0 + self.bitseq_dir, bytes([self.bitseq_value])])
174             if self.debug:
175                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [15, ['%d more' % self.proto_byte_count, '%d' % self.proto_byte_count]])
176
177     def bitseq(self, bitstart, bitend, bit):
178         if self.bitseq_len == 0:
179             # Looking for start of a bit sequence (command or byte).
180             self.bit_reflen = bitend - bitstart
181             self.bitseq_value = 0
182             self.bitseq_dir = bit
183             self.bitseq_len = 1
184             self.put(bitstart, bitend, self.out_ann, [2 + self.bitseq_dir, ['start', 's']])
185         elif (self.proto_state == 'CMD' and self.bitseq_len == 4) or (self.proto_state != 'CMD' and self.bitseq_len == 9):
186             # Parity bit
187             self.bitseq_end = bitstart
188             self.bitseq_len += 1
189
190             self.put(bitstart, bitend, self.out_ann, [4, ['parity', 'par', 'p']])
191
192             # The start bit is not data but was used for parity calculation.
193             self.bitseq_value &= 0xff
194             self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [7 + self.bitseq_dir, ['0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
195         elif (self.proto_state == 'CMD' and self.bitseq_len == 5) or (self.proto_state != 'CMD' and self.bitseq_len == 10):
196             # ACK/NACK bit.
197             if bit:
198                 self.put(bitstart, bitend, self.out_ann, [5, ['ack', 'a']])
199             else:
200                 self.put(bitstart, bitend, self.out_ann, [6, ['nack', 'n']])
201
202             # We only pass data that was ack'd up the stack.
203             if bit:
204                 self.protocol()
205
206             self.bitseq_len = 0
207         else:
208             if self.bitseq_len == 1:
209                 self.bitseq_start = bitstart
210             self.bitseq_value = (self.bitseq_value << 1) | bit
211             self.bitseq_len += 1
212
213     def bit(self, start, mid, end):
214         if mid - start >= end - mid:
215             self.put(start, end, self.out_ann, [0, ['0']])
216             bit = 0
217         else:
218             self.put(start, end, self.out_ann, [0, ['1']])
219             bit = 1
220
221         self.bitseq(start, end, bit)
222
223     def detect_synchronize_frame(self, start, end):
224         # Strictly speaking, synchronization frames are only recognised when
225         # SWIM is active. A falling edge on reset disables SWIM and an enter
226         # sequence is needed to re-enable it. However we do not want to be
227         # reliant on seeing the NRST pin just for that and we also want to be
228         # able to decode SWIM even if we just sample parts of the dialogue.
229         # For this reason we limit ourselves to only recognizing
230         # synchronization frames that have believable lengths based on our
231         # knowledge of the range of possible SWIM clocks.
232         if self.samplenum - self.eseq_edge[1][1] >= self.sync_reflen_min and self.samplenum - self.eseq_edge[1][1] <= self.sync_reflen_max:
233             self.put(self.eseq_edge[1][1], self.samplenum, self.out_ann, [1, ['synchronization frame', 'synchronization', 'sync', 's']])
234
235             # A low that lasts for more than 64 SWIM clock periods causes a
236             # reset of the SWIM communication state machine and will switch
237             # the SWIM to low-speed mode (SWIM_CSR.HS is cleared).
238             self.reset()
239
240             # The low SHOULD last 128 SWIM clocks. This is used to
241             # resynchronize in order to allow for variation in the frequency
242             # of the internal RC oscillator.
243             self.swim_clock = 128 * (self.samplerate / (self.samplenum - self.eseq_edge[1][1]))
244             self.adjust_timings()
245
246     def eseq_potential_start(self, start, end):
247         self.eseq_pairstart = start
248         self.eseq_reflen = end - start
249         self.eseq_pairnum = 1
250
251     def detect_enter_sequence(self, start, end):
252         # According to the spec the enter sequence is four pulses at 2kHz
253         # followed by four at 1kHz. We do not check the frequency but simply
254         # check the lengths of successive pulses against the first. This means
255         # we have no need to account for the accuracy (or lack of) of the
256         # host's oscillator.
257         if self.eseq_pairnum == 0 or abs(self.eseq_reflen - (end - start)) > 2:
258             self.eseq_potential_start(start, end)
259
260         elif self.eseq_pairnum < 4:
261             # The next three pulses should be the same length as the first.
262             self.eseq_pairnum += 1
263
264             if self.eseq_pairnum == 4:
265                 self.eseq_reflen /= 2
266         else:
267             # The final four pulses should each be half the length of the
268             # initial pair. Again, a mismatch causes us to reset and use the
269             # current pulse as a new potential enter sequence start.
270             self.eseq_pairnum += 1
271             if self.eseq_pairnum == 8:
272                 # Four matching pulses followed by four more that match each
273                 # other but are half the length of the first 4. SWIM is active!
274                 self.put(self.eseq_pairstart, end, self.out_ann, [1, ['enter sequence', 'enter seq', 'enter', 'ent', 'e']])
275                 self.eseq_pairnum = 0
276
277     def decode(self):
278         while True:
279             if self.bit_maxlen >= 0:
280                 (swim,) = self.wait()
281                 self.bit_maxlen -= 1
282             else:
283                 (swim,) = self.wait({0: 'e'})
284
285             if swim != self.eseq_edge[1][0]:
286                 if swim == 1 and self.eseq_edge[1][1] is not None:
287                     self.detect_synchronize_frame(self.eseq_edge[1][1], self.samplenum)
288                     if self.eseq_edge[0][1] is not None:
289                         self.detect_enter_sequence(self.eseq_edge[0][1], self.samplenum)
290                 self.eseq_edge.pop(0)
291                 self.eseq_edge.append([swim, self.samplenum])
292
293             if (swim != self.bit_edge[1][0] and (swim != 1 or self.bit_edge[1][0] != -1)) or self.bit_maxlen == 0:
294                 if self.bit_maxlen == 0 and self.bit_edge[1][0] == 1:
295                     swim = -1
296
297                 if self.bit_edge[1][0] != 0 and swim == 0:
298                     self.bit_maxlen = self.bit_reflen
299
300                 if self.bit_edge[0][0] == 0 and self.bit_edge[1][0] == 1 and self.samplenum - self.bit_edge[0][1] <= self.bit_reflen + 2:
301                     self.bit(self.bit_edge[0][1], self.bit_edge[1][1], self.samplenum)
302
303                 self.bit_edge.pop(0)
304                 self.bit_edge.append([swim, self.samplenum])