]>
Commit | Line | Data |
---|---|---|
1 | ## | |
2 | ## This file is part of the sigrok project. | |
3 | ## | |
4 | ## Copyright (C) 2011-2012 Uwe Hermann <uwe@hermann-uwe.de> | |
5 | ## | |
6 | ## This program is free software; you can redistribute it and/or modify | |
7 | ## it under the terms of the GNU General Public License as published by | |
8 | ## the Free Software Foundation; either version 2 of the License, or | |
9 | ## (at your option) any later version. | |
10 | ## | |
11 | ## This program is distributed in the hope that it will be useful, | |
12 | ## but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 | ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 | ## GNU General Public License for more details. | |
15 | ## | |
16 | ## You should have received a copy of the GNU General Public License | |
17 | ## along with this program; if not, write to the Free Software | |
18 | ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA | |
19 | ## | |
20 | ||
21 | # 1-Wire protocol decoder | |
22 | ||
23 | import sigrokdecode as srd | |
24 | ||
25 | # Annotation feed formats | |
26 | ANN_LINK = 0 | |
27 | ANN_NETWORK = 1 | |
28 | ANN_TRANSPORT = 2 | |
29 | ||
30 | # a dictionary of ROM commands and their names | |
31 | rom_command = {0x33: "READ ROM", | |
32 | 0x0f: "CONDITIONAL READ ROM", | |
33 | 0xcc: "SKIP ROM", | |
34 | 0x55: "MATCH ROM", | |
35 | 0xf0: "SEARCH ROM", | |
36 | 0xec: "CONDITIONAL SEARCH ROM", | |
37 | 0x3c: "OVERDRIVE SKIP ROM", | |
38 | 0x6d: "OVERDRIVE MATCH ROM"} | |
39 | ||
40 | class Decoder(srd.Decoder): | |
41 | api_version = 1 | |
42 | id = 'onewire' | |
43 | name = '1-Wire' | |
44 | longname = '' | |
45 | desc = '1-Wire bus and MicroLan' | |
46 | license = 'gplv2+' | |
47 | inputs = ['logic'] | |
48 | outputs = ['onewire'] | |
49 | probes = [ | |
50 | {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire bus'}, | |
51 | ] | |
52 | optional_probes = [ | |
53 | {'id': 'pwr', 'name': 'PWR', 'desc': '1-Wire power'}, | |
54 | ] | |
55 | options = { | |
56 | 'overdrive' : ['Overdrive', 1], | |
57 | 'cnt_normal_bit' : ['Time (in samplerate periods) for normal mode sample bit' , 0], | |
58 | 'cnt_normal_presence' : ['Time (in samplerate periods) for normal mode sample presence', 0], | |
59 | 'cnt_normal_reset' : ['Time (in samplerate periods) for normal mode reset' , 0], | |
60 | 'cnt_overdrive_bit' : ['Time (in samplerate periods) for overdrive mode sample bit' , 0], | |
61 | 'cnt_overdrive_presence': ['Time (in samplerate periods) for overdrive mode sample presence', 0], | |
62 | 'cnt_overdrive_reset' : ['Time (in samplerate periods) for overdrive mode reset' , 0], | |
63 | } | |
64 | annotations = [ | |
65 | ['Link', 'Link layer events (reset, presence, bit slots)'], | |
66 | ['Network', 'Network layer events (device addressing)'], | |
67 | ['Transport', 'Transport layer events'], | |
68 | ] | |
69 | ||
70 | def __init__(self, **kwargs): | |
71 | # Common variables | |
72 | self.samplenum = 0 | |
73 | # Link layer variables | |
74 | self.lnk_state = 'WAIT FOR FALLING EDGE' | |
75 | self.lnk_event = 'NONE' | |
76 | self.lnk_present = 0 | |
77 | self.lnk_bit = 0 | |
78 | self.lnk_overdrive = 0 | |
79 | # Event timing variables | |
80 | self.lnk_fall = 0 | |
81 | self.lnk_rise = 0 | |
82 | self.net_beg = 0 | |
83 | self.net_end = 0 | |
84 | # Network layer variables | |
85 | self.net_state = 'IDLE' | |
86 | self.net_cnt = 0 | |
87 | self.net_search = "P" | |
88 | self.net_data_p = 0x0 | |
89 | self.net_data_n = 0x0 | |
90 | self.net_data = 0x0 | |
91 | self.net_rom = 0x0000000000000000 | |
92 | ||
93 | def start(self, metadata): | |
94 | self.out_proto = self.add(srd.OUTPUT_PROTO, 'onewire') | |
95 | self.out_ann = self.add(srd.OUTPUT_ANN , 'onewire') | |
96 | ||
97 | # check if samplerate is appropriate | |
98 | self.samplerate = metadata['samplerate'] | |
99 | if (self.options['overdrive']): | |
100 | self.put(0, 0, self.out_ann, [ANN_LINK, ['NOTE: Sample rate checks assume overdrive mode.']]) | |
101 | if (self.samplerate < 2000000): | |
102 | self.put(0, 0, self.out_ann, [ANN_LINK, ['ERROR: Sampling rate is too low must be above 2MHz for proper overdrive mode decoding.']]) | |
103 | elif (self.samplerate < 5000000): | |
104 | self.put(0, 0, self.out_ann, [ANN_LINK, ['WARNING: Sampling rate is suggested to be above 5MHz for proper overdrive mode decoding.']]) | |
105 | else: | |
106 | self.put(0, 0, self.out_ann, [ANN_LINK, ['NOTE: Sample rate checks assume normal mode only.']]) | |
107 | if (self.samplerate < 400000): | |
108 | self.put(0, 0, self.out_ann, [ANN_LINK, ['ERROR: Sampling rate is too low must be above 400kHz for proper normal mode decoding.']]) | |
109 | elif (self.samplerate < 1000000): | |
110 | self.put(0, 0, self.out_ann, [ANN_LINK, ['WARNING: Sampling rate is suggested to be above 1MHz for proper normal mode decoding.']]) | |
111 | ||
112 | # The default 1-Wire time base is 30us, this is used to calculate sampling times. | |
113 | if (self.options['cnt_normal_bit']): self.cnt_normal_bit = self.options['cnt_normal_bit'] | |
114 | else: self.cnt_normal_bit = int(float(self.samplerate) * 0.000015) - 1 # 15ns | |
115 | if (self.options['cnt_normal_presence']): self.cnt_normal_presence = self.options['cnt_normal_presence'] | |
116 | else: self.cnt_normal_presence = int(float(self.samplerate) * 0.000075) - 1 # 75ns | |
117 | if (self.options['cnt_normal_reset']): self.cnt_normal_reset = self.options['cnt_normal_reset'] | |
118 | else: self.cnt_normal_reset = int(float(self.samplerate) * 0.000480) - 1 # 480ns | |
119 | if (self.options['cnt_overdrive_bit']): self.cnt_overdrive_bit = self.options['cnt_overdrive_bit'] | |
120 | else: self.cnt_overdrive_bit = int(float(self.samplerate) * 0.000002) - 1 # 2ns | |
121 | if (self.options['cnt_overdrive_presence']): self.cnt_overdrive_presence = self.options['cnt_overdrive_presence'] | |
122 | else: self.cnt_overdrive_presence = int(float(self.samplerate) * 0.000010) - 1 # 10ns | |
123 | if (self.options['cnt_overdrive_reset']): self.cnt_overdrive_reset = self.options['cnt_overdrive_reset'] | |
124 | else: self.cnt_overdrive_reset = int(float(self.samplerate) * 0.000048) - 1 # 48ns | |
125 | ||
126 | # Check if sample times are in the allowed range | |
127 | time_min = float(self.cnt_normal_bit ) / self.samplerate | |
128 | time_max = float(self.cnt_normal_bit+1) / self.samplerate | |
129 | if ( (time_min < 0.000005) or (time_max > 0.000015) ) : | |
130 | self.put(0, 0, self.out_ann, [ANN_LINK, ['WARNING: The normal mode data sample time interval (%2.1fus-%2.1fus) should be inside (5.0us, 15.0us).' % (time_min*1000000, time_max*1000000)]]) | |
131 | time_min = float(self.cnt_normal_presence ) / self.samplerate | |
132 | time_max = float(self.cnt_normal_presence+1) / self.samplerate | |
133 | if ( (time_min < 0.0000681) or (time_max > 0.000075) ) : | |
134 | self.put(0, 0, self.out_ann, [ANN_LINK, ['WARNING: The normal mode presence sample time interval (%2.1fus-%2.1fus) should be inside (68.1us, 75.0us).' % (time_min*1000000, time_max*1000000)]]) | |
135 | time_min = float(self.cnt_overdrive_bit ) / self.samplerate | |
136 | time_max = float(self.cnt_overdrive_bit+1) / self.samplerate | |
137 | if ( (time_min < 0.000001) or (time_max > 0.000002) ) : | |
138 | self.put(0, 0, self.out_ann, [ANN_LINK, ['WARNING: The overdrive mode data sample time interval (%2.1fus-%2.1fus) should be inside (1.0us, 2.0us).' % (time_min*1000000, time_max*1000000)]]) | |
139 | time_min = float(self.cnt_overdrive_presence ) / self.samplerate | |
140 | time_max = float(self.cnt_overdrive_presence+1) / self.samplerate | |
141 | if ( (time_min < 0.0000073) or (time_max > 0.000010) ) : | |
142 | self.put(0, 0, self.out_ann, [ANN_LINK, ['WARNING: The overdrive mode presence sample time interval (%2.1fus-%2.1fus) should be inside (7.3us, 10.0us).' % (time_min*1000000, time_max*1000000)]]) | |
143 | ||
144 | def report(self): | |
145 | pass | |
146 | ||
147 | def decode(self, ss, es, data): | |
148 | for (self.samplenum, (owr, pwr)) in data: | |
149 | ||
150 | # Data link layer | |
151 | ||
152 | # Clear events. | |
153 | self.lnk_event = "NONE" | |
154 | # State machine. | |
155 | if self.lnk_state == 'WAIT FOR FALLING EDGE': | |
156 | # The start of a cycle is a falling edge. | |
157 | if (owr == 0): | |
158 | # Save the sample number for the falling edge. | |
159 | self.lnk_fall = self.samplenum | |
160 | # Go to waiting for sample time | |
161 | self.lnk_state = 'WAIT FOR DATA SAMPLE' | |
162 | elif self.lnk_state == 'WAIT FOR DATA SAMPLE': | |
163 | # Sample data bit | |
164 | if (self.lnk_overdrive): cnt = self.cnt_overdrive_bit | |
165 | else : cnt = self.cnt_normal_bit | |
166 | if (self.samplenum - self.lnk_fall == cnt): | |
167 | self.lnk_bit = owr & 0x1 | |
168 | self.lnk_event = "DATA BIT" | |
169 | if (self.lnk_bit): self.lnk_state = 'WAIT FOR FALLING EDGE' | |
170 | else : self.lnk_state = 'WAIT FOR RISING EDGE' | |
171 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['BIT: %01x' % self.lnk_bit]]) | |
172 | elif self.lnk_state == 'WAIT FOR RISING EDGE': | |
173 | # The end of a cycle is a rising edge. | |
174 | if (owr == 1): | |
175 | # Check if this was a reset cycle | |
176 | if (self.samplenum - self.lnk_fall > self.cnt_normal_reset): | |
177 | # Save the sample number for the falling edge. | |
178 | self.lnk_rise = self.samplenum | |
179 | # Send a reset event to the next protocol layer. | |
180 | self.lnk_event = "RESET" | |
181 | self.lnk_state = "WAIT FOR PRESENCE DETECT" | |
182 | self.put(self.lnk_fall, self.samplenum, self.out_proto, ['RESET']) | |
183 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK , ['RESET']]) | |
184 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK , ['RESET']]) | |
185 | # Reset the timer. | |
186 | self.lnk_fall = self.samplenum | |
187 | elif ((self.samplenum - self.lnk_fall > self.cnt_overdrive_reset) and (self.lnk_overdrive)): | |
188 | # Save the sample number for the falling edge. | |
189 | self.lnk_rise = self.samplenum | |
190 | # Send a reset event to the next protocol layer. | |
191 | self.lnk_event = "RESET" | |
192 | self.lnk_state = "WAIT FOR PRESENCE DETECT" | |
193 | self.put(self.lnk_fall, self.samplenum, self.out_proto, ['RESET OVERDRIVE']) | |
194 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK , ['RESET OVERDRIVE']]) | |
195 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK , ['RESET OVERDRIVE']]) | |
196 | # Reset the timer. | |
197 | self.lnk_fall = self.samplenum | |
198 | # Otherwise this is assumed to be a data bit. | |
199 | else : | |
200 | self.lnk_state = "WAIT FOR FALLING EDGE" | |
201 | elif self.lnk_state == 'WAIT FOR PRESENCE DETECT': | |
202 | # Sample presence status | |
203 | if (self.lnk_overdrive): cnt = self.cnt_overdrive_presence | |
204 | else : cnt = self.cnt_normal_presence | |
205 | if (self.samplenum - self.lnk_rise == cnt): | |
206 | self.lnk_present = owr & 0x1 | |
207 | # Save the sample number for the falling edge. | |
208 | if not (self.lnk_present) : self.lnk_fall = self.samplenum | |
209 | # create presence detect event | |
210 | #self.lnk_event = "PRESENCE DETECT" | |
211 | if (self.lnk_present) : self.lnk_state = 'WAIT FOR FALLING EDGE' | |
212 | else : self.lnk_state = 'WAIT FOR RISING EDGE' | |
213 | present_str = "False" if self.lnk_present else "True" | |
214 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK , ['PRESENCE: ' + present_str]]) | |
215 | self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['PRESENCE: ' + present_str]]) | |
216 | else: | |
217 | raise Exception('Invalid lnk_state: %d' % self.lnk_state) | |
218 | ||
219 | # Network layer | |
220 | ||
221 | # State machine. | |
222 | if (self.lnk_event == "RESET"): | |
223 | self.net_state = "COMMAND" | |
224 | self.net_search = "P" | |
225 | self.net_cnt = 0 | |
226 | elif (self.net_state == "IDLE"): | |
227 | pass | |
228 | elif (self.net_state == "COMMAND"): | |
229 | # Receiving and decoding a ROM command | |
230 | if (self.onewire_collect(8)): | |
231 | self.put(self.net_beg, self.net_end, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: 0x%02x \'%s\'' % (self.net_data, rom_command[self.net_data])]]) | |
232 | if (self.net_data == 0x33): # READ ROM | |
233 | self.net_state = "GET ROM" | |
234 | elif (self.net_data == 0x0f): # CONDITIONAL READ ROM | |
235 | self.net_state = "GET ROM" | |
236 | elif (self.net_data == 0xcc): # SKIP ROM | |
237 | self.net_state = "TRANSPORT" | |
238 | elif (self.net_data == 0x55): # MATCH ROM | |
239 | self.net_state = "GET ROM" | |
240 | elif (self.net_data == 0xf0): # SEARCH ROM | |
241 | self.net_state = "SEARCH ROM" | |
242 | elif (self.net_data == 0xec): # CONDITIONAL SEARCH ROM | |
243 | self.net_state = "SEARCH ROM" | |
244 | elif (self.net_data == 0x3c): # OVERDRIVE SKIP ROM | |
245 | self.lnk_overdrive = 1 | |
246 | self.net_state = "TRANSPORT" | |
247 | elif (self.net_data == 0x69): # OVERDRIVE MATCH ROM | |
248 | self.lnk_overdrive = 1 | |
249 | self.net_state = "GET ROM" | |
250 | elif (self.net_state == "GET ROM"): | |
251 | # A 64 bit device address is selected | |
252 | # family code (1B) + serial number (6B) + CRC (1B) | |
253 | if (self.onewire_collect(64)): | |
254 | self.net_rom = self.net_data & 0xffffffffffffffff | |
255 | self.put(self.net_beg, self.net_end, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]]) | |
256 | self.net_state = "TRANSPORT" | |
257 | elif (self.net_state == "SEARCH ROM"): | |
258 | # A 64 bit device address is searched for | |
259 | # family code (1B) + serial number (6B) + CRC (1B) | |
260 | if (self.onewire_search(64)): | |
261 | self.net_rom = self.net_data & 0xffffffffffffffff | |
262 | self.put(self.net_beg, self.net_end, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]]) | |
263 | self.net_state = "TRANSPORT" | |
264 | elif (self.net_state == "TRANSPORT"): | |
265 | # The transport layer is handled in byte sized units | |
266 | if (self.onewire_collect(8)): | |
267 | self.put(self.net_beg, self.net_end, self.out_ann, [ANN_NETWORK , ['TRANSPORT: 0x%02x' % self.net_data]]) | |
268 | self.put(self.net_beg, self.net_end, self.out_ann, [ANN_TRANSPORT, ['TRANSPORT: 0x%02x' % self.net_data]]) | |
269 | self.put(self.net_beg, self.net_end, self.out_proto, ['transfer', self.net_data]) | |
270 | # TODO: Sending translort layer data to 1-Wire device models | |
271 | else: | |
272 | raise Exception('Invalid net_state: %s' % self.net_state) | |
273 | ||
274 | ||
275 | # Link/Network layer data collector | |
276 | def onewire_collect (self, length): | |
277 | if (self.lnk_event == "DATA BIT"): | |
278 | # Storing the sampe this sequence begins with | |
279 | if (self.net_cnt == 1): | |
280 | self.net_beg = self.samplenum | |
281 | self.net_data = self.net_data & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
282 | self.net_cnt = self.net_cnt + 1 | |
283 | # Storing the sampe this sequence ends with | |
284 | # In case the full length of the sequence is received, return 1 | |
285 | if (self.net_cnt == length): | |
286 | self.net_end = self.samplenum | |
287 | self.net_data = self.net_data & ((1<<length)-1) | |
288 | self.net_cnt = 0 | |
289 | return (1) | |
290 | else: | |
291 | return (0) | |
292 | else: | |
293 | return (0) | |
294 | ||
295 | # Link/Network layer search collector | |
296 | def onewire_search (self, length): | |
297 | if (self.lnk_event == "DATA BIT"): | |
298 | # Storing the sampe this sequence begins with | |
299 | if ((self.net_cnt == 0) and (self.net_search == "P")): | |
300 | self.net_beg = self.samplenum | |
301 | # Master receives an original address bit | |
302 | if (self.net_search == "P"): | |
303 | self.net_data_p = self.net_data_p & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
304 | self.net_search = "N" | |
305 | # Master receives a complemented address bit | |
306 | elif (self.net_search == "N"): | |
307 | self.net_data_n = self.net_data_n & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
308 | self.net_search = "D" | |
309 | # Master transmits an address bit | |
310 | elif (self.net_search == "D"): | |
311 | self.net_data = self.net_data & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt) | |
312 | self.net_search = "P" | |
313 | self.net_cnt = self.net_cnt + 1 | |
314 | # Storing the sampe this sequence ends with | |
315 | # In case the full length of the sequence is received, return 1 | |
316 | if (self.net_cnt == length): | |
317 | self.net_end = self.samplenum | |
318 | self.net_data_p = self.net_data_p & ((1<<length)-1) | |
319 | self.net_data_n = self.net_data_n & ((1<<length)-1) | |
320 | self.net_data = self.net_data & ((1<<length)-1) | |
321 | self.net_search = "P" | |
322 | self.net_cnt = 0 | |
323 | return (1) | |
324 | else: | |
325 | return (0) | |
326 | else: | |
327 | return (0) |