]> sigrok.org Git - libsigrokdecode.git/blob - decoders/swim/pd.py
Decoder for STM8 series MCUs SWIM protocol.
[libsigrokdecode.git] / decoders / swim / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2018 Mike Jagdis <mjagdis@eris-associates.co.uk>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, write to the Free Software
18 ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19 ##
20
21 import math
22 import sigrokdecode as srd
23
24 class SamplerateError(Exception):
25     pass
26
27 class Decoder(srd.Decoder):
28     api_version = 3
29     id = 'swim'
30     name = 'SWIM'
31     longname = 'STM8 SWIM bus'
32     desc = 'STM8 Single Wire Interface Module (SWIM) protocol.'
33     license = 'gplv2+'
34     inputs = ['logic']
35     outputs = []
36     options = (
37         {'id': 'debug', 'desc': 'Debug', 'default': 'no', 'values': ('yes', 'no') },
38     )
39     channels = (
40         {'id': 'swim', 'name': 'SWIM', 'desc': 'SWIM data line'},
41     )
42     annotations = (
43         ('bit', 'Bit'),
44         ('enterseq', 'SWIM enter sequence'),
45         ('start-host', 'Start bit (host)'),
46         ('start-target', 'Start bit (target)'),
47         ('parity', 'Parity bit'),
48         ('ack', 'Acknowledgement'),
49         ('nack', 'Negative acknowledgement'),
50         ('byte-write', 'Byte write'),
51         ('byte-read', 'Byte read'),
52         ('cmd-unknown', 'Unknown SWIM command'),
53         ('cmd', 'SWIM command'),
54         ('bytes', 'Byte count'),
55         ('address', 'Address'),
56         ('data-write', 'Data write'),
57         ('data-read', 'Data read'),
58         ('debug', 'Debug'),
59     )
60     annotation_rows = (
61         ('bits', 'Bits', (0,)),
62         ('framing', 'Framing', (2, 3, 4, 5, 6, 7, 8)),
63         ('protocol', 'Protocol', (1, 9, 10, 11, 12, 13, 14)),
64         ('debug', 'Debug', (15,)),
65     )
66     binary = (
67         ('tx', 'Dump of data written to target'),
68         ('rx', 'Dump of data read from target'),
69     )
70
71     def __init__(self):
72         # SWIM clock for the target is normally HSI/2 where HSI is 8MHz +- 5%
73         # although the divisor can be removed by setting the SWIMCLK bit in
74         # the CLK_SWIMCCR register. There is no standard for the host so we
75         # will be generous and assume it is using an 8MHz +- 10% oscillator.
76         # We do not need to be accurate. We just need to avoid treating enter
77         # sequence pulses as bits. A synchronization frame will cause this
78         # to be adjusted.
79         self.HSI = 8000000
80         self.HSI_min = self.HSI * 0.9
81         self.HSI_max = self.HSI * 1.1
82         self.swim_clock = self.HSI_min / 2
83
84         self.eseq_edge = [[-1, None], [-1, None]]
85         self.eseq_pairnum = 0
86         self.eseq_pairstart = None
87
88         self.reset()
89
90     def reset(self):
91         self.bit_edge = [[-1, None], [-1, None]]
92         self.bit_maxlen = -1
93         self.bitseq_len = 0
94         self.bitseq_end = None
95         self.proto_state = 'CMD'
96
97     def metadata(self, key, value):
98         if key == srd.SRD_CONF_SAMPLERATE:
99             self.samplerate = value
100
101     def adjust_timings(self):
102         # A low-speed bit is 22 SWIM clocks long.
103         # There are options to shorten bits to 10 clocks or use HSI rather
104         # than HSI/2 as the SWIM clock but the longest valid bit should be no
105         # more than this many samples. This does not need to be accurate.
106         # It exists simply to prevent bits extending unecessarily far into
107         # trailing bus-idle periods. This will be adjusted every time we see
108         # a synchronization frame or start bit in order to show idle periods
109         # as accurately as possible.
110         self.bit_reflen = math.ceil(self.samplerate * 22 / self.swim_clock)
111
112     def start(self):
113         self.out_ann = self.register(srd.OUTPUT_ANN)
114         self.out_binary = self.register(srd.OUTPUT_BINARY)
115
116         if not self.samplerate:
117             raise SamplerateError('Cannot decode without samplerate.')
118
119         # A synchronization frame is a low that lasts for more than 64 but no
120         # more than 128 SWIM clock periods based on the standard SWIM clock.
121         # Note: we also allow for the possibility that the SWIM clock divisor
122         # has been disabled here.
123         self.sync_reflen_min = math.floor(self.samplerate * 64 / self.HSI_max)
124         self.sync_reflen_max = math.ceil(self.samplerate * 128 / (self.HSI_min / 2))
125
126         if self.options['debug'] == 'yes':
127             self.debug = True
128         else:
129             self.debug = False
130
131         # The SWIM entry sequence is 4 pulses at 2kHz followed by 4 at 1kHz.
132         self.eseq_reflen = math.ceil(self.samplerate / 2048)
133
134         self.adjust_timings()
135
136     def protocol(self):
137         if self.proto_state == 'CMD':
138             # Command
139             if self.bitseq_value == 0x00:
140                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['system reset', 'SRST', '!']])
141             elif self.bitseq_value == 0x01:
142                 self.proto_state = 'N'
143                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['read on-the-fly', 'ROTF', 'r']])
144             elif self.bitseq_value == 0x02:
145                 self.proto_state = 'N'
146                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['write on-the-fly', 'WOTF', 'w']])
147             else:
148                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [9, ['unknown', 'UNK']])
149         elif self.proto_state == 'N':
150             # Number of bytes
151             self.proto_byte_count = self.bitseq_value
152             self.proto_state = '@E'
153             self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [11, ['byte count 0x%02x' % self.bitseq_value, 'bytes 0x%02x' % self.bitseq_value, '0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
154         elif self.proto_state == '@E':
155             # Address byte 1
156             self.proto_addr = self.bitseq_value
157             self.proto_addr_start = self.bitseq_start
158             self.proto_state = '@H'
159         elif self.proto_state == '@H':
160             # Address byte 2
161             self.proto_addr = (self.proto_addr << 8) | self.bitseq_value
162             self.proto_state = '@L'
163         elif self.proto_state == '@L':
164             # Address byte 3
165             self.proto_addr = (self.proto_addr << 8) | self.bitseq_value
166             self.proto_state = 'D'
167             self.put(self.proto_addr_start, self.bitseq_end, self.out_ann, [12, ['address 0x%06x' % self.proto_addr, 'addr 0x%06x' % self.proto_addr, '0x%06x' % self.proto_addr, '%06x' %self.proto_addr, '%x' % self.proto_addr]])
168         else:
169             if self.proto_byte_count > 0:
170                 self.proto_byte_count -= 1
171                 if self.proto_byte_count == 0:
172                     self.proto_state = 'CMD'
173
174             self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [13 + self.bitseq_dir, ['0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
175             self.put(self.bitseq_start, self.bitseq_end, self.out_binary, [0 + self.bitseq_dir, bytes([self.bitseq_value])])
176             if self.debug:
177                 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [15, ['%d more' % self.proto_byte_count, '%d' % self.proto_byte_count]])
178
179     def bitseq(self, bitstart, bitend, bit):
180         if self.bitseq_len == 0:
181             # Looking for start of a bit sequence (command or byte).
182             self.bit_reflen = bitend - bitstart
183             self.bitseq_value = 0
184             self.bitseq_dir = bit
185             self.bitseq_len = 1
186             self.put(bitstart, bitend, self.out_ann, [2 + self.bitseq_dir, ['start', 's']])
187         elif (self.proto_state == 'CMD' and self.bitseq_len == 4) or (self.proto_state != 'CMD' and self.bitseq_len == 9):
188             # Parity bit
189             self.bitseq_end = bitstart
190             self.bitseq_len += 1
191
192             self.put(bitstart, bitend, self.out_ann, [4, ['parity', 'par', 'p']])
193
194             # The start bit is not data but was used for parity calculation.
195             self.bitseq_value &= 0xff
196             self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [7 + self.bitseq_dir, ['0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
197         elif (self.proto_state == 'CMD' and self.bitseq_len == 5) or (self.proto_state != 'CMD' and self.bitseq_len == 10):
198             # ACK/NACK bit.
199             if bit:
200                 self.put(bitstart, bitend, self.out_ann, [5, ['ack', 'a']])
201             else:
202                 self.put(bitstart, bitend, self.out_ann, [6, ['nack', 'n']])
203
204             # We only pass data that was ack'd up the stack.
205             if bit:
206                 self.protocol()
207
208             self.bitseq_len = 0
209         else:
210             if self.bitseq_len == 1:
211                 self.bitseq_start = bitstart
212             self.bitseq_value = (self.bitseq_value << 1) | bit
213             self.bitseq_len += 1
214
215     def bit(self, start, mid, end):
216         if mid - start >= end - mid:
217             self.put(start, end, self.out_ann, [0, ['0']])
218             bit = 0
219         else:
220             self.put(start, end, self.out_ann, [0, ['1']])
221             bit = 1
222
223         self.bitseq(start, end, bit)
224
225     def detect_synchronize_frame(self, start, end):
226         # Strictly speaking, synchronization frames are only recognised when
227         # SWIM is active. A falling edge on reset disables SWIM and an enter
228         # sequence is needed to re-enable it. However we do not want to be
229         # reliant on seeing the NRST pin just for that and we also want to be
230         # able to decode SWIM even if we just sample parts of the dialogue.
231         # For this reason we limit ourselves to only recognizing
232         # synchronization frames that have believable lengths based on our
233         # knowledge of the range of possible SWIM clocks.
234         if self.samplenum - self.eseq_edge[1][1] >= self.sync_reflen_min and self.samplenum - self.eseq_edge[1][1] <= self.sync_reflen_max:
235             self.put(self.eseq_edge[1][1], self.samplenum, self.out_ann, [1, ['synchronization frame', 'synchronization', 'sync', 's']])
236
237             # A low that lasts for more than 64 SWIM clock periods causes a
238             # reset of the SWIM communication state machine and will switch
239             # the SWIM to low-speed mode (SWIM_CSR.HS is cleared).
240             self.reset()
241
242             # The low SHOULD last 128 SWIM clocks. This is used to
243             # resynchronize in order to allow for variation in the frequency
244             # of the internal RC oscillator.
245             self.swim_clock = 128 * (self.samplerate / (self.samplenum - self.eseq_edge[1][1]))
246             self.adjust_timings()
247
248     def eseq_potential_start(self, start, end):
249         self.eseq_pairstart = start
250         self.eseq_reflen = end - start
251         self.eseq_pairnum = 1
252
253     def detect_enter_sequence(self, start, end):
254         # According to the spec the enter sequence is four pulses at 2kHz
255         # followed by four at 1kHz. We do not check the frequency but simply
256         # check the lengths of successive pulses against the first. This means
257         # we have no need to account for the accuracy (or lack of) of the
258         # host's oscillator.
259         if self.eseq_pairnum == 0 or abs(self.eseq_reflen - (end - start)) > 2:
260             self.eseq_potential_start(start, end)
261
262         elif self.eseq_pairnum < 4:
263             # The next three pulses should be the same length as the first.
264             self.eseq_pairnum += 1
265
266             if self.eseq_pairnum == 4:
267                 self.eseq_reflen /= 2
268         else:
269             # The final four pulses should each be half the length of the
270             # initial pair. Again, a mismatch causes us to reset and use the
271             # current pulse as a new potential enter sequence start.
272             self.eseq_pairnum += 1
273             if self.eseq_pairnum == 8:
274                 # Four matching pulses followed by four more that match each
275                 # other but are half the length of the first 4. SWIM is active!
276                 self.put(self.eseq_pairstart, end, self.out_ann, [1, ['enter sequence', 'enter seq', 'enter', 'ent', 'e']])
277                 self.eseq_pairnum = 0
278
279     def decode(self):
280         while True:
281             if self.bit_maxlen >= 0:
282                 (swim,) = self.wait()
283                 self.bit_maxlen -= 1
284             else:
285                 (swim,) = self.wait({0: 'e'})
286
287             if swim != self.eseq_edge[1][0]:
288                 if swim == 1 and self.eseq_edge[1][1] is not None:
289                     self.detect_synchronize_frame(self.eseq_edge[1][1], self.samplenum)
290                     if self.eseq_edge[0][1] is not None:
291                         self.detect_enter_sequence(self.eseq_edge[0][1], self.samplenum)
292                 self.eseq_edge.pop(0)
293                 self.eseq_edge.append([swim, self.samplenum])
294
295             if (swim != self.bit_edge[1][0] and (swim != 1 or self.bit_edge[1][0] != -1)) or self.bit_maxlen == 0:
296                 if self.bit_maxlen == 0 and self.bit_edge[1][0] == 1:
297                     swim = -1
298
299                 if self.bit_edge[1][0] != 0 and swim == 0:
300                     self.bit_maxlen = self.bit_reflen
301
302                 if self.bit_edge[0][0] == 0 and self.bit_edge[1][0] == 1 and self.samplenum - self.bit_edge[0][1] <= self.bit_reflen + 2:
303                     self.bit(self.bit_edge[0][1], self.bit_edge[1][1], self.samplenum)
304
305                 self.bit_edge.pop(0)
306                 self.bit_edge.append([swim, self.samplenum])