]> sigrok.org Git - libsigrokdecode.git/blob - decoders/onewire/onewire.py
ef80b350c54e429cdc99af58ddfd54634235a914
[libsigrokdecode.git] / decoders / onewire / onewire.py
1 ##
2 ## This file is part of the sigrok project.
3 ##
4 ## Copyright (C) 2011-2012 Uwe Hermann <uwe@hermann-uwe.de>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, write to the Free Software
18 ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19 ##
20
21 # 1-Wire protocol decoder
22
23 import sigrokdecode as srd
24
25 # Annotation feed formats
26 ANN_LINK      = 0
27 ANN_NETWORK   = 1
28 ANN_TRANSPORT = 2
29
30 class Decoder(srd.Decoder):
31     api_version = 1
32     id = 'onewire'
33     name = '1-Wire'
34     longname = ''
35     desc = '1-Wire bus and MicroLan'
36     license = 'gplv2+'
37     inputs = ['logic']
38     outputs = ['onewire']
39     probes = [
40         {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire bus'},
41     ]
42     optional_probes = [
43         {'id': 'pwr', 'name': 'PWR', 'desc': '1-Wire power'},
44     ]
45     options = {
46         'overdrive' : ['Overdrive', 1],
47         'cnt_normal_bit'        : ['Time (in samplerate periods) for normal mode sample bit'     , 0],
48         'cnt_normal_presence'   : ['Time (in samplerate periods) for normal mode sample presence', 0],
49         'cnt_overdrive_bit'     : ['Time (in samplerate periods) for overdrive mode sample bit'     , 0],
50         'cnt_overdrive_presence': ['Time (in samplerate periods) for overdrive mode sample presence', 0],
51     }
52     annotations = [
53         ['Link', 'Link layer events (reset, presence, bit slots)'],
54         ['Network', 'Network layer events (device addressing)'],
55         ['Transport', 'Transport layer events'],
56     ]
57
58     def __init__(self, **kwargs):
59         # Common variables
60         self.samplenum = 0
61         # Link layer variables
62         self.lnk_state   = 'WAIT FOR FALLING EDGE'
63         self.lnk_event   = 'NONE'
64         self.lnk_fall    = 0
65         self.lnk_present = 0
66         self.lnk_bit     = 0
67         # Network layer variables
68         self.net_state   = 'IDLE'
69         self.net_cnt     = 0
70         self.net_search  = "P"
71         self.net_data_p  = 0x0
72         self.net_data_n  = 0x0
73         self.net_data    = 0x0
74         self.net_rom     = 0x0000000000000000
75
76     def start(self, metadata):
77         self.out_proto = self.add(srd.OUTPUT_PROTO, 'onewire')
78         self.out_ann   = self.add(srd.OUTPUT_ANN  , 'onewire')
79
80         # check if samplerate is appropriate
81         self.samplerate = metadata['samplerate']
82         if (self.options['overdrive']):
83             self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['NOTE: Sample rate checks assume overdrive mode.']])
84             if   (self.samplerate < 2000000):
85                 self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['ERROR: Sampling rate is too low must be above 2MHz for proper overdrive mode decoding.']])
86             elif (self.samplerate < 5000000):
87                 self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['WARNING: Sampling rate is suggested to be above 5MHz for proper overdrive mode decoding.']])
88         else:
89             self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['NOTE: Sample rate checks assume normal mode only.']])
90             if   (self.samplerate <  400000):
91                 self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['ERROR: Sampling rate is too low must be above 400kHz for proper normal mode decoding.']])
92             elif (self.samplerate < 1000000):
93                 self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['WARNING: Sampling rate is suggested to be above 1MHz for proper normal mode decoding.']])
94
95         # The default 1-Wire time base is 30us, this is used to calculate sampling times.
96         if (self.options['cnt_normal_bit']):      self.cnt_normal_bit      = self.options['cnt_normal_bit']
97         else:                                     self.cnt_normal_bit      = float(self.samplerate) * 0.000015 # 15ns
98         if (self.options['cnt_normal_presence']): self.cnt_normal_presence = self.options['cnt_normal_presence']
99         else:                                     self.cnt_normal_presence = float(self.samplerate) * 0.000075 # 15ns
100         if (self.options['cnt_overdrive_bit']):      self.cnt_overdrive_bit      = self.options['cnt_overdrive_bit']
101         else:                                        self.cnt_overdrive_bit      = float(self.samplerate) * 0.000002 # 2ns
102         if (self.options['cnt_overdrive_presence']): self.cnt_overdrive_presence = self.options['cnt_overdrive_presence']
103         else:                                        self.cnt_overdrive_presence = float(self.samplerate) * 0.000010 # 10ns
104         # Check if sample times are in the allowed range
105         # TODO
106         self.time_base = float(self.samplerate) * float(0.000030)
107         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['time_base = %d' % self.time_base]])
108
109     def report(self):
110         pass
111
112     def decode(self, ss, es, data):
113         for (self.samplenum, (owr, pwr)) in data:
114
115             # Data link layer
116
117             # Clear events.
118             self.lnk_event = "NONE"
119             # State machine.
120             if self.lnk_state == 'WAIT FOR FALLING EDGE':
121                 # The start of a cycle is a falling edge.
122                 if (owr == 0):
123                     # Save the sample number for the falling edge.
124                     self.lnk_fall = self.samplenum
125                     # Go to waiting for sample time
126                     self.lnk_state = 'WAIT FOR DATA SAMPLE'
127             elif self.lnk_state == 'WAIT FOR DATA SAMPLE':
128                 # Data should be sample one 'time unit' after a falling edge
129                 if (self.samplenum - self.lnk_fall == 0.5*self.time_base):
130                     self.lnk_bit  = owr & 0x1
131                     self.lnk_event = "DATA BIT"
132                     if (self.lnk_bit) :  self.lnk_state = 'WAIT FOR FALLING EDGE'
133                     else              :  self.lnk_state = 'WAIT FOR RISING EDGE'
134                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK, ['BIT: %01x' % self.lnk_bit]])
135             elif self.lnk_state == 'WAIT FOR RISING EDGE':
136                 # The end of a cycle is a rising edge.
137                 if (owr == 1):
138                     # A reset cycle is longer than 8T.
139                     if (self.samplenum - self.lnk_fall > 8*self.time_base):
140                         # Save the sample number for the falling edge.
141                         self.lnk_rise = self.samplenum
142                         # Send a reset event to the next protocol layer.
143                         self.lnk_event = "RESET"
144                         self.lnk_state = "WAIT FOR PRESENCE DETECT"
145                         self.put(self.lnk_fall, self.samplenum, self.out_proto, ['RESET'])
146                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK     , ['RESET']])
147                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK  , ['RESET']])
148                         # Reset the timer.
149                         self.lnk_fall = self.samplenum
150                     # Otherwise this is assumed to be a data bit.
151                     else :
152                         self.lnk_state = "WAIT FOR FALLING EDGE"
153             elif self.lnk_state == 'WAIT FOR PRESENCE DETECT':
154                 # Data should be sample one 'time unit' after a falling edge
155                 if (self.samplenum - self.lnk_rise == 2.5*self.time_base):
156                     self.lnk_present = owr & 0x1
157                     # Save the sample number for the falling edge.
158                     if not (self.lnk_present) :  self.lnk_fall = self.samplenum
159                     # create presence detect event
160                     #self.lnk_event   = "PRESENCE DETECT"
161                     if (self.lnk_present) :  self.lnk_state = 'WAIT FOR FALLING EDGE'
162                     else                  :  self.lnk_state = 'WAIT FOR RISING EDGE'
163                     present_str = "False" if self.lnk_present else "True"
164                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_LINK   , ['PRESENCE: ' + present_str]])
165                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['PRESENCE: ' + present_str]])
166             else:
167                 raise Exception('Invalid lnk_state: %d' % self.lnk_state)
168
169             # Network layer
170             
171             # State machine.
172             if (self.lnk_event == "RESET"):
173                 self.net_state = "COMMAND"
174                 self.net_search = "P"
175                 self.net_cnt    = 0
176             elif (self.net_state == "IDLE"):
177                 pass
178             elif (self.net_state == "TRANSPORT"):
179                 if (self.collect_data(8)):
180                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK  , ['TRANSPORT: 0x%02x' % self.net_data]])
181                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_TRANSPORT, ['TRANSPORT: 0x%02x' % self.net_data]])
182                     self.put(self.lnk_fall, self.samplenum, self.out_proto, ['transfer', self.net_data])
183             elif (self.net_state == "COMMAND"):
184                 if (self.collect_data(8)):
185                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: 0x%02x' % self.net_data]])
186                     if   (self.net_data == 0x33):
187                         # READ ROM
188                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'READ ROM\'']])
189                         self.net_state = "GET ROM"
190                     elif (self.net_data == 0x0f):
191                         # CONDITIONAL READ ROM
192                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'CONDITIONAL READ ROM\'']])
193                         self.net_state = "GET ROM"
194                     elif (self.net_data == 0xcc):
195                         # SKIP ROM
196                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'SKIP ROM\'']])
197                         self.net_state = "TRANSPORT"
198                     elif (self.net_data == 0x55):
199                         # MATCH ROM
200                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'MATCH ROM\'']])
201                         self.net_state = "GET ROM"
202                     elif (self.net_data == 0xf0):
203                         # SEARCH ROM
204                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'SEARCH ROM\'']])
205                         self.net_state = "SEARCH ROM"
206                     elif (self.net_data == 0xec):
207                         # CONDITIONAL SEARCH ROM
208                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'CONDITIONAL SEARCH ROM\'']])
209                         self.net_state = "SEARCH ROM"
210                     elif (self.net_data == 0x3c):
211                         # OVERDRIVE SKIP ROM
212                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'OVERDRIVE SKIP ROM\'']])
213                         self.net_state = "TRANSPORT"
214                     elif (self.net_data == 0x69):
215                         # OVERDRIVE MATCH ROM
216                         self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM COMMAND: \'OVERDRIVE MATCH ROM\'']])
217                         self.net_state = "GET ROM"
218             elif (self.net_state == "GET ROM"):
219                 # family code (1B) + serial number (6B) + CRC (1B)
220                 if (self.collect_data(64)):
221                     self.net_rom = self.net_data & 0xffffffffffffffff
222                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]])
223                     self.net_state = "TRANSPORT"
224             elif (self.net_state == "SEARCH ROM"):
225                 # family code (1B) + serial number (6B) + CRC (1B)
226                 if (self.collect_search(64)):
227                     self.net_rom = self.net_data & 0xffffffffffffffff
228                     self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]])
229                     self.net_state = "TRANSPORT"
230             else:
231                 raise Exception('Invalid net_state: %s' % self.net_state)
232
233
234     # Link/Network layer data collector
235     def collect_data (self, length):
236         if (self.lnk_event == "DATA BIT"):
237             #print ("DEBUG: BIT=%d t0=%d t+=%d" % (self.lnk_bit, self.lnk_fall, self.samplenum))
238             self.net_data = self.net_data & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
239             self.net_cnt  = self.net_cnt + 1
240             if (self.net_cnt == length):
241                 self.net_data = self.net_data & ((1<<length)-1)
242                 self.net_cnt  = 0
243                 return (1)
244             else:
245                 return (0)
246         else:
247             return (0)
248
249     # Link/Network layer search collector
250     def collect_search (self, length):
251         if (self.lnk_event == "DATA BIT"):
252             #print ("DEBUG: SEARCH=%s BIT=%d t0=%d t+=%d" % (self.net_search, self.lnk_bit, self.lnk_fall, self.samplenum))
253             if   (self.net_search == "P"):
254               self.net_data_p = self.net_data_p & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
255               self.net_search = "N"
256             elif (self.net_search == "N"):
257               self.net_data_n = self.net_data_n & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
258               self.net_search = "D"
259             elif (self.net_search == "D"):
260               self.net_data   = self.net_data   & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
261               self.net_search = "P"
262               self.net_cnt    = self.net_cnt + 1
263             if (self.net_cnt == length):
264                 self.net_data_p = self.net_data_p & ((1<<length)-1)
265                 self.net_data_n = self.net_data_n & ((1<<length)-1)
266                 self.net_data   = self.net_data   & ((1<<length)-1)
267                 self.net_search = "P"
268                 self.net_cnt    = 0
269                 return (1)
270             else:
271                 return (0)
272         else:
273             return (0)
274
275
276 #            # Transport layer
277 #            
278 #            # State machine.
279 #            if (self.lnk_event == "RESET"):
280 #                self.trn_state = "IDLE"
281 #            elif (self.trn_state == "IDLE"):
282 #                pass
283 #            elif (self.trn_state == "COMMAND"):
284 #                if (self.collect_data(8)):
285 ##                    self.put(self.lnk_fall, self.samplenum, self.out_proto, ['FUNCTION COMMAND', self.net_data])
286 #                    self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK  , ['FUNCTION COMMAND: 0x%02x' % self.net_data]])
287 #                    self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_TRANSPORT, ['FUNCTION COMMAND: 0x%02x' % self.net_data]])
288 #                    if   (self.net_data == 0x48):
289 #                        # COPY SCRATCHPAD
290 #                        self.trn_state = "TODO"
291 #                    elif (self.net_data == 0x4e):
292 #                        # WRITE SCRATCHPAD
293 #                        self.trn_state = "TODO"
294 #                    elif (self.net_data == 0xbe):
295 #                        # READ SCRATCHPAD
296 #                        self.trn_state = "TODO"
297 #                    elif (self.net_data == 0xb8):
298 #                        # RECALL E2
299 #                        self.trn_state = "TODO"
300 #                    elif (self.net_data == 0xb4):
301 #                        # READ POWER SUPPLY
302 #                        self.trn_state = "TODO"
303 #                    else:
304 #                        # unsupported commands
305 #                        self.trn_state = "TODO"
306 #            elif (self.trn_state == "TODO"):
307 #                if (self.collect_data(8)):
308 #                    self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK  , ['TRANSPORT DATA: 0x%02x' % self.net_data]])
309 #                    self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_TRANSPORT, ['TRANSPORT DATA: 0x%02x' % self.net_data]])
310 ##                self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK  , ['TODO unsupported transport state: %s' % self.trn_state]])
311 ##                self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_TRANSPORT, ['TODO unsupported transport state: %s' % self.trn_state]])
312 #                pass
313 #            else:
314 #                raise Exception('Invalid trn_state: %s' % self.trn_state)
315
316
317 #class onewire_device ():
318 #    def __init__ (self, scratchpad_size = 8):
319 #        pass
320 #    def reset (self):
321 #        pass
322 #    def data (self, data):
323 #        self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_NETWORK  , ['TRANSPORT DATA: 0x%02x' % self.net_data]])
324 #        self.put(self.lnk_fall, self.samplenum, self.out_ann, [ANN_TRANSPORT, ['TRANSPORT DATA: 0x%02x' % self.net_data]])