]> sigrok.org Git - libsigrokdecode.git/blame - decoders/swim/pd.py
decoders: Add/update tags for each PD.
[libsigrokdecode.git] / decoders / swim / pd.py
CommitLineData
f8cc803b
MJ
1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2018 Mike Jagdis <mjagdis@eris-associates.co.uk>
5##
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, write to the Free Software
18## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19##
20
21import math
22import sigrokdecode as srd
23
24class SamplerateError(Exception):
25 pass
26
27class Decoder(srd.Decoder):
28 api_version = 3
29 id = 'swim'
30 name = 'SWIM'
31 longname = 'STM8 SWIM bus'
32 desc = 'STM8 Single Wire Interface Module (SWIM) protocol.'
33 license = 'gplv2+'
34 inputs = ['logic']
35 outputs = []
d6d8a8a4 36 tags = ['Debug/trace']
f8cc803b
MJ
37 options = (
38 {'id': 'debug', 'desc': 'Debug', 'default': 'no', 'values': ('yes', 'no') },
39 )
40 channels = (
41 {'id': 'swim', 'name': 'SWIM', 'desc': 'SWIM data line'},
42 )
43 annotations = (
44 ('bit', 'Bit'),
45 ('enterseq', 'SWIM enter sequence'),
46 ('start-host', 'Start bit (host)'),
47 ('start-target', 'Start bit (target)'),
48 ('parity', 'Parity bit'),
49 ('ack', 'Acknowledgement'),
50 ('nack', 'Negative acknowledgement'),
51 ('byte-write', 'Byte write'),
52 ('byte-read', 'Byte read'),
53 ('cmd-unknown', 'Unknown SWIM command'),
54 ('cmd', 'SWIM command'),
55 ('bytes', 'Byte count'),
56 ('address', 'Address'),
57 ('data-write', 'Data write'),
58 ('data-read', 'Data read'),
59 ('debug', 'Debug'),
60 )
61 annotation_rows = (
62 ('bits', 'Bits', (0,)),
63 ('framing', 'Framing', (2, 3, 4, 5, 6, 7, 8)),
64 ('protocol', 'Protocol', (1, 9, 10, 11, 12, 13, 14)),
65 ('debug', 'Debug', (15,)),
66 )
67 binary = (
68 ('tx', 'Dump of data written to target'),
69 ('rx', 'Dump of data read from target'),
70 )
71
72 def __init__(self):
73 # SWIM clock for the target is normally HSI/2 where HSI is 8MHz +- 5%
74 # although the divisor can be removed by setting the SWIMCLK bit in
75 # the CLK_SWIMCCR register. There is no standard for the host so we
76 # will be generous and assume it is using an 8MHz +- 10% oscillator.
77 # We do not need to be accurate. We just need to avoid treating enter
78 # sequence pulses as bits. A synchronization frame will cause this
79 # to be adjusted.
80 self.HSI = 8000000
81 self.HSI_min = self.HSI * 0.9
82 self.HSI_max = self.HSI * 1.1
83 self.swim_clock = self.HSI_min / 2
84
85 self.eseq_edge = [[-1, None], [-1, None]]
86 self.eseq_pairnum = 0
87 self.eseq_pairstart = None
88
89 self.reset()
90
91 def reset(self):
92 self.bit_edge = [[-1, None], [-1, None]]
93 self.bit_maxlen = -1
94 self.bitseq_len = 0
95 self.bitseq_end = None
96 self.proto_state = 'CMD'
97
98 def metadata(self, key, value):
99 if key == srd.SRD_CONF_SAMPLERATE:
100 self.samplerate = value
101
102 def adjust_timings(self):
103 # A low-speed bit is 22 SWIM clocks long.
104 # There are options to shorten bits to 10 clocks or use HSI rather
105 # than HSI/2 as the SWIM clock but the longest valid bit should be no
106 # more than this many samples. This does not need to be accurate.
107 # It exists simply to prevent bits extending unecessarily far into
108 # trailing bus-idle periods. This will be adjusted every time we see
109 # a synchronization frame or start bit in order to show idle periods
110 # as accurately as possible.
111 self.bit_reflen = math.ceil(self.samplerate * 22 / self.swim_clock)
112
113 def start(self):
114 self.out_ann = self.register(srd.OUTPUT_ANN)
115 self.out_binary = self.register(srd.OUTPUT_BINARY)
116
117 if not self.samplerate:
118 raise SamplerateError('Cannot decode without samplerate.')
119
120 # A synchronization frame is a low that lasts for more than 64 but no
121 # more than 128 SWIM clock periods based on the standard SWIM clock.
122 # Note: we also allow for the possibility that the SWIM clock divisor
123 # has been disabled here.
124 self.sync_reflen_min = math.floor(self.samplerate * 64 / self.HSI_max)
125 self.sync_reflen_max = math.ceil(self.samplerate * 128 / (self.HSI_min / 2))
126
d08f387b 127 self.debug = True if self.options['debug'] == 'yes' else False
f8cc803b
MJ
128
129 # The SWIM entry sequence is 4 pulses at 2kHz followed by 4 at 1kHz.
130 self.eseq_reflen = math.ceil(self.samplerate / 2048)
131
132 self.adjust_timings()
133
134 def protocol(self):
135 if self.proto_state == 'CMD':
136 # Command
137 if self.bitseq_value == 0x00:
138 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['system reset', 'SRST', '!']])
139 elif self.bitseq_value == 0x01:
140 self.proto_state = 'N'
141 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['read on-the-fly', 'ROTF', 'r']])
142 elif self.bitseq_value == 0x02:
143 self.proto_state = 'N'
144 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [10, ['write on-the-fly', 'WOTF', 'w']])
145 else:
146 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [9, ['unknown', 'UNK']])
147 elif self.proto_state == 'N':
148 # Number of bytes
149 self.proto_byte_count = self.bitseq_value
150 self.proto_state = '@E'
151 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [11, ['byte count 0x%02x' % self.bitseq_value, 'bytes 0x%02x' % self.bitseq_value, '0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
152 elif self.proto_state == '@E':
153 # Address byte 1
154 self.proto_addr = self.bitseq_value
155 self.proto_addr_start = self.bitseq_start
156 self.proto_state = '@H'
157 elif self.proto_state == '@H':
158 # Address byte 2
159 self.proto_addr = (self.proto_addr << 8) | self.bitseq_value
160 self.proto_state = '@L'
161 elif self.proto_state == '@L':
162 # Address byte 3
163 self.proto_addr = (self.proto_addr << 8) | self.bitseq_value
164 self.proto_state = 'D'
165 self.put(self.proto_addr_start, self.bitseq_end, self.out_ann, [12, ['address 0x%06x' % self.proto_addr, 'addr 0x%06x' % self.proto_addr, '0x%06x' % self.proto_addr, '%06x' %self.proto_addr, '%x' % self.proto_addr]])
166 else:
167 if self.proto_byte_count > 0:
168 self.proto_byte_count -= 1
169 if self.proto_byte_count == 0:
170 self.proto_state = 'CMD'
171
172 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [13 + self.bitseq_dir, ['0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
173 self.put(self.bitseq_start, self.bitseq_end, self.out_binary, [0 + self.bitseq_dir, bytes([self.bitseq_value])])
174 if self.debug:
175 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [15, ['%d more' % self.proto_byte_count, '%d' % self.proto_byte_count]])
176
177 def bitseq(self, bitstart, bitend, bit):
178 if self.bitseq_len == 0:
179 # Looking for start of a bit sequence (command or byte).
180 self.bit_reflen = bitend - bitstart
181 self.bitseq_value = 0
182 self.bitseq_dir = bit
183 self.bitseq_len = 1
184 self.put(bitstart, bitend, self.out_ann, [2 + self.bitseq_dir, ['start', 's']])
185 elif (self.proto_state == 'CMD' and self.bitseq_len == 4) or (self.proto_state != 'CMD' and self.bitseq_len == 9):
186 # Parity bit
187 self.bitseq_end = bitstart
188 self.bitseq_len += 1
189
190 self.put(bitstart, bitend, self.out_ann, [4, ['parity', 'par', 'p']])
191
192 # The start bit is not data but was used for parity calculation.
193 self.bitseq_value &= 0xff
194 self.put(self.bitseq_start, self.bitseq_end, self.out_ann, [7 + self.bitseq_dir, ['0x%02x' % self.bitseq_value, '%02x' % self.bitseq_value, '%x' % self.bitseq_value]])
195 elif (self.proto_state == 'CMD' and self.bitseq_len == 5) or (self.proto_state != 'CMD' and self.bitseq_len == 10):
196 # ACK/NACK bit.
197 if bit:
198 self.put(bitstart, bitend, self.out_ann, [5, ['ack', 'a']])
199 else:
200 self.put(bitstart, bitend, self.out_ann, [6, ['nack', 'n']])
201
202 # We only pass data that was ack'd up the stack.
203 if bit:
204 self.protocol()
205
206 self.bitseq_len = 0
207 else:
208 if self.bitseq_len == 1:
209 self.bitseq_start = bitstart
210 self.bitseq_value = (self.bitseq_value << 1) | bit
211 self.bitseq_len += 1
212
213 def bit(self, start, mid, end):
214 if mid - start >= end - mid:
215 self.put(start, end, self.out_ann, [0, ['0']])
216 bit = 0
217 else:
218 self.put(start, end, self.out_ann, [0, ['1']])
219 bit = 1
220
221 self.bitseq(start, end, bit)
222
223 def detect_synchronize_frame(self, start, end):
224 # Strictly speaking, synchronization frames are only recognised when
225 # SWIM is active. A falling edge on reset disables SWIM and an enter
226 # sequence is needed to re-enable it. However we do not want to be
227 # reliant on seeing the NRST pin just for that and we also want to be
228 # able to decode SWIM even if we just sample parts of the dialogue.
229 # For this reason we limit ourselves to only recognizing
230 # synchronization frames that have believable lengths based on our
231 # knowledge of the range of possible SWIM clocks.
232 if self.samplenum - self.eseq_edge[1][1] >= self.sync_reflen_min and self.samplenum - self.eseq_edge[1][1] <= self.sync_reflen_max:
233 self.put(self.eseq_edge[1][1], self.samplenum, self.out_ann, [1, ['synchronization frame', 'synchronization', 'sync', 's']])
234
235 # A low that lasts for more than 64 SWIM clock periods causes a
236 # reset of the SWIM communication state machine and will switch
237 # the SWIM to low-speed mode (SWIM_CSR.HS is cleared).
238 self.reset()
239
240 # The low SHOULD last 128 SWIM clocks. This is used to
241 # resynchronize in order to allow for variation in the frequency
242 # of the internal RC oscillator.
243 self.swim_clock = 128 * (self.samplerate / (self.samplenum - self.eseq_edge[1][1]))
244 self.adjust_timings()
245
246 def eseq_potential_start(self, start, end):
247 self.eseq_pairstart = start
248 self.eseq_reflen = end - start
249 self.eseq_pairnum = 1
250
251 def detect_enter_sequence(self, start, end):
252 # According to the spec the enter sequence is four pulses at 2kHz
253 # followed by four at 1kHz. We do not check the frequency but simply
254 # check the lengths of successive pulses against the first. This means
255 # we have no need to account for the accuracy (or lack of) of the
256 # host's oscillator.
257 if self.eseq_pairnum == 0 or abs(self.eseq_reflen - (end - start)) > 2:
258 self.eseq_potential_start(start, end)
259
260 elif self.eseq_pairnum < 4:
261 # The next three pulses should be the same length as the first.
262 self.eseq_pairnum += 1
263
264 if self.eseq_pairnum == 4:
265 self.eseq_reflen /= 2
266 else:
267 # The final four pulses should each be half the length of the
268 # initial pair. Again, a mismatch causes us to reset and use the
269 # current pulse as a new potential enter sequence start.
270 self.eseq_pairnum += 1
271 if self.eseq_pairnum == 8:
272 # Four matching pulses followed by four more that match each
273 # other but are half the length of the first 4. SWIM is active!
274 self.put(self.eseq_pairstart, end, self.out_ann, [1, ['enter sequence', 'enter seq', 'enter', 'ent', 'e']])
275 self.eseq_pairnum = 0
276
277 def decode(self):
278 while True:
279 if self.bit_maxlen >= 0:
280 (swim,) = self.wait()
281 self.bit_maxlen -= 1
282 else:
283 (swim,) = self.wait({0: 'e'})
284
285 if swim != self.eseq_edge[1][0]:
286 if swim == 1 and self.eseq_edge[1][1] is not None:
287 self.detect_synchronize_frame(self.eseq_edge[1][1], self.samplenum)
288 if self.eseq_edge[0][1] is not None:
289 self.detect_enter_sequence(self.eseq_edge[0][1], self.samplenum)
290 self.eseq_edge.pop(0)
291 self.eseq_edge.append([swim, self.samplenum])
292
293 if (swim != self.bit_edge[1][0] and (swim != 1 or self.bit_edge[1][0] != -1)) or self.bit_maxlen == 0:
294 if self.bit_maxlen == 0 and self.bit_edge[1][0] == 1:
295 swim = -1
296
297 if self.bit_edge[1][0] != 0 and swim == 0:
298 self.bit_maxlen = self.bit_reflen
299
300 if self.bit_edge[0][0] == 0 and self.bit_edge[1][0] == 1 and self.samplenum - self.bit_edge[0][1] <= self.bit_reflen + 2:
301 self.bit(self.bit_edge[0][1], self.bit_edge[1][1], self.samplenum)
302
303 self.bit_edge.pop(0)
304 self.bit_edge.append([swim, self.samplenum])