]>
Commit | Line | Data |
---|---|---|
1 | ## | |
2 | ## This file is part of the sigrok project. | |
3 | ## | |
4 | ## Copyright (C) 2012 Iztok Jeras <iztok.jeras@gmail.com> | |
5 | ## | |
6 | ## This program is free software; you can redistribute it and/or modify | |
7 | ## it under the terms of the GNU General Public License as published by | |
8 | ## the Free Software Foundation; either version 2 of the License, or | |
9 | ## (at your option) any later version. | |
10 | ## | |
11 | ## This program is distributed in the hope that it will be useful, | |
12 | ## but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 | ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 | ## GNU General Public License for more details. | |
15 | ## | |
16 | ## You should have received a copy of the GNU General Public License | |
17 | ## along with this program; if not, write to the Free Software | |
18 | ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
19 | ## | |
20 | ||
21 | # 1-Wire protocol decoder | |
22 | ||
23 | import sigrokdecode as srd | |
24 | ||
25 | # Annotation feed formats | |
26 | ANN_LINK = 0 | |
27 | ANN_NETWORK = 1 | |
28 | ANN_TRANSPORT = 2 | |
29 | ||
30 | # a dictionary of ROM commands and their names | |
31 | rom_command = {0x33: "READ ROM", | |
32 | 0x0f: "CONDITIONAL READ ROM", | |
33 | 0xcc: "SKIP ROM", | |
34 | 0x55: "MATCH ROM", | |
35 | 0xf0: "SEARCH ROM", | |
36 | 0xec: "CONDITIONAL SEARCH ROM", | |
37 | 0x3c: "OVERDRIVE SKIP ROM", | |
38 | 0x6d: "OVERDRIVE MATCH ROM"} | |
39 | ||
40 | class Decoder(srd.Decoder): | |
41 | api_version = 1 | |
42 | id = 'onewire' | |
43 | name = '1-Wire' | |
44 | longname = '1-Wire serial communication bus' | |
45 | desc = 'Bidirectional, half-duplex, asynchronous serial bus.' | |
46 | license = 'gplv2+' | |
47 | inputs = ['logic'] | |
48 | outputs = ['onewire'] | |
49 | probes = [ | |
50 | {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire bus'}, | |
51 | ] | |
52 | optional_probes = [ | |
53 | {'id': 'pwr', 'name': 'PWR', 'desc': '1-Wire power'}, | |
54 | ] | |
55 | options = { | |
56 | 'overdrive' : ['Overdrive', 1], | |
57 | 'cnt_normal_bit' : ['Time (in samplerate periods) for normal mode sample bit' , 0], | |
58 | 'cnt_normal_presence' : ['Time (in samplerate periods) for normal mode sample presence', 0], | |
59 | 'cnt_normal_reset' : ['Time (in samplerate periods) for normal mode reset' , 0], | |
60 | 'cnt_overdrive_bit' : ['Time (in samplerate periods) for overdrive mode sample bit' , 0], | |
61 | 'cnt_overdrive_presence': ['Time (in samplerate periods) for overdrive mode sample presence', 0], | |
62 | 'cnt_overdrive_reset' : ['Time (in samplerate periods) for overdrive mode reset' , 0], | |
63 | } | |
64 | annotations = [ | |
65 | ['Link', 'Link layer events (reset, presence, bit slots)'], | |
66 | ['Network', 'Network layer events (device addressing)'], | |
67 | ['Transport', 'Transport layer events'], | |
68 | ] | |
69 | ||
70 | def __init__(self, **kwargs): | |
71 | # Common variables | |
72 | self.samplenum = 0 | |
73 | # Link layer variables | |
74 | self.lnk_state = 'WAIT FOR FALLING EDGE' | |
75 | self.lnk_event = 'NONE' | |
76 | self.lnk_present = 0 | |
77 | self.lnk_bit = 0 | |
78 | self.lnk_overdrive = 0 | |
79 | # Event timing variables | |
80 | self.lnk_fall = 0 | |
81 | self.lnk_rise = 0 | |
82 | self.net_beg = 0 | |
83 | self.net_end = 0 | |
84 | self.net_len = 0 | |
85 | # Network layer variables | |
86 | self.net_state = 'IDLE' | |
87 | self.net_cnt = 0 | |
88 | self.net_search = "P" | |
89 | self.net_data_p = 0x0 | |
90 | self.net_data_n = 0x0 | |
91 | self.net_data = 0x0 | |
92 | self.net_rom = 0x0000000000000000 | |
93 | ||
94 | def start(self, metadata): | |
95 | self.out_proto = self.add(srd.OUTPUT_PROTO, 'onewire') | |
96 | self.out_ann = self.add(srd.OUTPUT_ANN , 'onewire') | |
97 | ||
98 | # check if samplerate is appropriate | |
99 | self.samplerate = metadata['samplerate'] | |
100 | if (self.options['overdrive']): | |
101 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
102 | ['NOTE: Sample rate checks assume overdrive mode.']]) | |
103 | if (self.samplerate < 2000000): | |
104 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
105 | ['ERROR: Sampling rate is too low must be above 2MHz for proper overdrive mode decoding.']]) | |
106 | elif (self.samplerate < 5000000): | |
107 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
108 | ['WARNING: Sampling rate is suggested to be above 5MHz for proper overdrive mode decoding.']]) | |
109 | else: | |
110 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
111 | ['NOTE: Sample rate checks assume normal mode only.']]) | |
112 | if (self.samplerate < 400000): | |
113 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
114 | ['ERROR: Sampling rate is too low must be above 400kHz for proper normal mode decoding.']]) | |
115 | elif (self.samplerate < 1000000): | |
116 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
117 | ['WARNING: Sampling rate is suggested to be above 1MHz for proper normal mode decoding.']]) | |
118 | ||
119 | # The default 1-Wire time base is 30us, this is used to calculate sampling times. | |
120 | if (self.options['cnt_normal_bit']): | |
121 | self.cnt_normal_bit = self.options['cnt_normal_bit'] | |
122 | else: | |
123 | self.cnt_normal_bit = int(float(self.samplerate) * 0.000015) - 1 # 15ns | |
124 | if (self.options['cnt_normal_presence']): | |
125 | self.cnt_normal_presence = self.options['cnt_normal_presence'] | |
126 | else: | |
127 | self.cnt_normal_presence = int(float(self.samplerate) * 0.000075) - 1 # 75ns | |
128 | if (self.options['cnt_normal_reset']): | |
129 | self.cnt_normal_reset = self.options['cnt_normal_reset'] | |
130 | else: | |
131 | self.cnt_normal_reset = int(float(self.samplerate) * 0.000480) - 1 # 480ns | |
132 | if (self.options['cnt_overdrive_bit']): | |
133 | self.cnt_overdrive_bit = self.options['cnt_overdrive_bit'] | |
134 | else: | |
135 | self.cnt_overdrive_bit = int(float(self.samplerate) * 0.000002) - 1 # 2ns | |
136 | if (self.options['cnt_overdrive_presence']): | |
137 | self.cnt_overdrive_presence = self.options['cnt_overdrive_presence'] | |
138 | else: | |
139 | self.cnt_overdrive_presence = int(float(self.samplerate) * 0.000010) - 1 # 10ns | |
140 | if (self.options['cnt_overdrive_reset']): | |
141 | self.cnt_overdrive_reset = self.options['cnt_overdrive_reset'] | |
142 | else: | |
143 | self.cnt_overdrive_reset = int(float(self.samplerate) * 0.000048) - 1 # 48ns | |
144 | ||
145 | # calculating the slot size | |
146 | self.cnt_normal_slot = int(float(self.samplerate) * 0.000060) - 1 # 60ns | |
147 | self.cnt_overdrive_slot = int(float(self.samplerate) * 0.000006) - 1 # 6ns | |
148 | ||
149 | # organize values into lists | |
150 | self.cnt_bit = [self.cnt_normal_bit , self.cnt_overdrive_bit ] | |
151 | self.cnt_presence = [self.cnt_normal_presence, self.cnt_overdrive_presence] | |
152 | self.cnt_reset = [self.cnt_normal_reset , self.cnt_overdrive_reset ] | |
153 | self.cnt_slot = [self.cnt_normal_slot , self.cnt_overdrive_slot ] | |
154 | ||
155 | # Check if sample times are in the allowed range | |
156 | time_min = float(self.cnt_normal_bit ) / self.samplerate | |
157 | time_max = float(self.cnt_normal_bit+1) / self.samplerate | |
158 | if ( (time_min < 0.000005) or (time_max > 0.000015) ) : | |
159 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
160 | ['WARNING: The normal mode data sample time interval (%2.1fus-%2.1fus) should be inside (5.0us, 15.0us).' | |
161 | % (time_min*1000000, time_max*1000000)]]) | |
162 | time_min = float(self.cnt_normal_presence ) / self.samplerate | |
163 | time_max = float(self.cnt_normal_presence+1) / self.samplerate | |
164 | if ( (time_min < 0.0000681) or (time_max > 0.000075) ) : | |
165 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
166 | ['WARNING: The normal mode presence sample time interval (%2.1fus-%2.1fus) should be inside (68.1us, 75.0us).' | |
167 | % (time_min*1000000, time_max*1000000)]]) | |
168 | time_min = float(self.cnt_overdrive_bit ) / self.samplerate | |
169 | time_max = float(self.cnt_overdrive_bit+1) / self.samplerate | |
170 | if ( (time_min < 0.000001) or (time_max > 0.000002) ) : | |
171 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
172 | ['WARNING: The overdrive mode data sample time interval (%2.1fus-%2.1fus) should be inside (1.0us, 2.0us).' | |
173 | % (time_min*1000000, time_max*1000000)]]) | |
174 | time_min = float(self.cnt_overdrive_presence ) / self.samplerate | |
175 | time_max = float(self.cnt_overdrive_presence+1) / self.samplerate | |
176 | if ( (time_min < 0.0000073) or (time_max > 0.000010) ) : | |
177 | self.put(0, 0, self.out_ann, [ANN_LINK, | |
178 | ['WARNING: The overdrive mode presence sample time interval (%2.1fus-%2.1fus) should be inside (7.3us, 10.0us).' | |
179 | % (time_min*1000000, time_max*1000000)]]) | |
180 | ||
181 | def report(self): | |
182 | pass | |
183 | ||
184 | def decode(self, ss, es, data): | |
185 | for (self.samplenum, (owr, pwr)) in data: | |
186 | ||
187 | # Data link layer | |
188 | ||
189 | # Clear events. | |
190 | self.lnk_event = "NONE" | |
191 | # State machine. | |
192 | if self.lnk_state == 'WAIT FOR FALLING EDGE': | |
193 | # The start of a cycle is a falling edge. | |
194 | if (owr == 0): | |
195 | # Save the sample number for the falling edge. | |
196 | self.lnk_fall = self.samplenum | |
197 | # Go to waiting for sample time | |
198 | self.lnk_state = 'WAIT FOR DATA SAMPLE' | |
199 | elif self.lnk_state == 'WAIT FOR DATA SAMPLE': | |
200 | # Sample data bit | |
201 | if (self.samplenum - self.lnk_fall == self.cnt_bit[self.lnk_overdrive]): | |
202 | self.lnk_bit = owr & 0x1 | |
203 | self.lnk_event = "DATA BIT" | |
204 | if (self.lnk_bit): self.lnk_state = 'WAIT FOR FALLING EDGE' | |
205 | else : self.lnk_state = 'WAIT FOR RISING EDGE' | |
206 | self.put(self.lnk_fall, self.cnt_bit[self.lnk_overdrive], self.out_ann, [ANN_LINK, ['BIT: %01x' % self.lnk_bit]]) | |
207 | elif self.lnk_state == 'WAIT FOR RISING EDGE': | |
208 | # The end of a cycle is a rising edge. | |
209 | if (owr == 1): | |
210 | # Check if this was a reset cycle | |
211 | if (self.samplenum - self.lnk_fall > self.cnt_normal_reset): | |
212 | # Save the sample number for the falling edge. | |
213 | self.lnk_rise = self.samplenum | |
214 | # Send a reset event to the next protocol layer. | |
215 | self.lnk_event = "RESET" | |
216 | self.lnk_state = "WAIT FOR PRESENCE DETECT" | |
217 | self.put(self.lnk_fall, self.lnk_rise, self.out_proto, ['RESET']) | |
218 | self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_LINK , ['RESET']]) | |
219 | self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_NETWORK , ['RESET']]) | |
220 | # Reset the timer. | |
221 | self.lnk_fall = self.samplenum | |
222 | elif ((self.samplenum - self.lnk_fall > self.cnt_overdrive_reset) and (self.lnk_overdrive)): | |
223 | # Save the sample number for the falling edge. | |
224 | self.lnk_rise = self.samplenum | |
225 | # Send a reset event to the next protocol layer. | |
226 | self.lnk_event = "RESET" | |
227 | self.lnk_state = "WAIT FOR PRESENCE DETECT" | |
228 | self.put(self.lnk_fall, self.lnk_rise, self.out_proto, ['RESET OVERDRIVE']) | |
229 | self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_LINK , ['RESET OVERDRIVE']]) | |
230 | self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_NETWORK , ['RESET OVERDRIVE']]) | |
231 | # Reset the timer. | |
232 | self.lnk_fall = self.samplenum | |
233 | # Otherwise this is assumed to be a data bit. | |
234 | else : | |
235 | self.lnk_state = "WAIT FOR FALLING EDGE" | |
236 | elif self.lnk_state == 'WAIT FOR PRESENCE DETECT': | |
237 | # Sample presence status | |
238 | if (self.samplenum - self.lnk_rise == self.cnt_presence[self.lnk_overdrive]): | |
239 | self.lnk_present = owr & 0x1 | |
240 | # Save the sample number for the falling edge. | |
241 | if not (self.lnk_present) : self.lnk_fall = self.samplenum | |
242 | # create presence detect event | |
243 | #self.lnk_event = "PRESENCE DETECT" | |
244 | if (self.lnk_present) : self.lnk_state = 'WAIT FOR FALLING EDGE' | |
245 | else : self.lnk_state = 'WAIT FOR RISING EDGE' | |
246 | present_str = "False" if self.lnk_present else "True" | |
247 | self.put(self.samplenum, 0, self.out_ann, [ANN_LINK , ['PRESENCE: ' + present_str]]) | |
248 | self.put(self.samplenum, 0, self.out_ann, [ANN_NETWORK, ['PRESENCE: ' + present_str]]) | |
249 | else: | |
250 | raise Exception('Invalid lnk_state: %d' % self.lnk_state) | |
251 | ||
252 | # Network layer | |
253 | ||
254 | # State machine. | |
255 | if (self.lnk_event == "RESET"): | |
256 | self.net_state = "COMMAND" | |
257 | self.net_search = "P" | |
258 | self.net_cnt = 0 | |
259 | elif (self.net_state == "IDLE"): | |
260 | pass | |
261 | elif (self.net_state == "COMMAND"): | |
262 | # Receiving and decoding a ROM command | |
263 | if (self.onewire_collect(8)): | |
264 | self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK, | |
265 | ['ROM COMMAND: 0x%02x \'%s\'' % (self.net_data, rom_command[self.net_data])]]) | |
266 | if (self.net_data == 0x33): # READ ROM | |
267 | self.net_state = "GET ROM" | |
268 | elif (self.net_data == 0x0f): # CONDITIONAL READ ROM | |
269 | self.net_state = "GET ROM" | |
270 | elif (self.net_data == 0xcc): # SKIP ROM | |
271 | self.net_state = "TRANSPORT" | |
272 | elif (self.net_data == 0x55): # MATCH ROM | |
273 | self.net_state = "GET ROM" | |
274 | elif (self.net_data == 0xf0): # SEARCH ROM | |
275 | self.net_state = "SEARCH ROM" | |
276 | elif (self.net_data == 0xec): # CONDITIONAL SEARCH ROM | |
277 | self.net_state = "SEARCH ROM" | |
278 | elif (self.net_data == 0x3c): # OVERDRIVE SKIP ROM | |
279 | self.lnk_overdrive = 1 | |
280 | self.net_state = "TRANSPORT" | |
281 | elif (self.net_data == 0x69): # OVERDRIVE MATCH ROM | |
282 | self.lnk_overdrive = 1 | |
283 | self.net_state = "GET ROM" | |
284 | elif (self.net_state == "GET ROM"): | |
285 | # A 64 bit device address is selected | |
286 | # family code (1B) + serial number (6B) + CRC (1B) | |
287 | if (self.onewire_collect(64)): | |
288 | self.net_rom = self.net_data & 0xffffffffffffffff | |
289 | self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]]) | |
290 | self.net_state = "TRANSPORT" | |
291 | elif (self.net_state == "SEARCH ROM"): | |
292 | # A 64 bit device address is searched for | |
293 | # family code (1B) + serial number (6B) + CRC (1B) | |
294 | if (self.onewire_search(64)): | |
295 | self.net_rom = self.net_data & 0xffffffffffffffff | |
296 | self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]]) | |
297 | self.net_state = "TRANSPORT" | |
298 | elif (self.net_state == "TRANSPORT"): | |
299 | # The transport layer is handled in byte sized units | |
300 | if (self.onewire_collect(8)): | |
301 | self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK , ['TRANSPORT: 0x%02x' % self.net_data]]) | |
302 | self.put(self.net_beg, self.net_len, self.out_ann, [ANN_TRANSPORT, ['TRANSPORT: 0x%02x' % self.net_data]]) | |
303 | self.put(self.net_beg, self.net_len, self.out_proto, ['transfer', self.net_data]) | |
304 | # TODO: Sending translort layer data to 1-Wire device models | |
305 | else: | |
306 | raise Exception('Invalid net_state: %s' % self.net_state) | |
307 | ||
308 | ||
309 | # Link/Network layer data collector | |
310 | def onewire_collect (self, length): | |
311 | if (self.lnk_event == "DATA BIT"): | |
312 | # Storing the sampe this sequence begins with | |
313 | if (self.net_cnt == 1): | |
314 | self.net_beg = self.lnk_fall | |
315 | self.net_data = self.net_data & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
316 | self.net_cnt = self.net_cnt + 1 | |
317 | # Storing the sampe this sequence ends with | |
318 | # In case the full length of the sequence is received, return 1 | |
319 | if (self.net_cnt == length): | |
320 | self.net_end = self.lnk_fall + self.cnt_slot[self.lnk_overdrive] | |
321 | self.net_len = self.net_end - self.net_beg | |
322 | self.net_data = self.net_data & ((1<<length)-1) | |
323 | self.net_cnt = 0 | |
324 | return (1) | |
325 | else: | |
326 | return (0) | |
327 | else: | |
328 | return (0) | |
329 | ||
330 | # Link/Network layer search collector | |
331 | def onewire_search (self, length): | |
332 | if (self.lnk_event == "DATA BIT"): | |
333 | # Storing the sampe this sequence begins with | |
334 | if ((self.net_cnt == 0) and (self.net_search == "P")): | |
335 | self.net_beg = self.lnk_fall | |
336 | # Master receives an original address bit | |
337 | if (self.net_search == "P"): | |
338 | self.net_data_p = self.net_data_p & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
339 | self.net_search = "N" | |
340 | # Master receives a complemented address bit | |
341 | elif (self.net_search == "N"): | |
342 | self.net_data_n = self.net_data_n & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
343 | self.net_search = "D" | |
344 | # Master transmits an address bit | |
345 | elif (self.net_search == "D"): | |
346 | self.net_data = self.net_data & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
347 | self.net_search = "P" | |
348 | self.net_cnt = self.net_cnt + 1 | |
349 | # Storing the sampe this sequence ends with | |
350 | # In case the full length of the sequence is received, return 1 | |
351 | if (self.net_cnt == length): | |
352 | self.net_end = self.lnk_fall + self.cnt_slot[self.lnk_overdrive] | |
353 | self.net_len = self.net_end - self.net_beg | |
354 | self.net_data_p = self.net_data_p & ((1<<length)-1) | |
355 | self.net_data_n = self.net_data_n & ((1<<length)-1) | |
356 | self.net_data = self.net_data & ((1<<length)-1) | |
357 | self.net_search = "P" | |
358 | self.net_cnt = 0 | |
359 | return (1) | |
360 | else: | |
361 | return (0) | |
362 | else: | |
363 | return (0) |