]> sigrok.org Git - libsigrokdecode.git/blob - decoders/onewire/onewire.py
srd: onewire: Fix copyright line, and PD name/description.
[libsigrokdecode.git] / decoders / onewire / onewire.py
1 ##
2 ## This file is part of the sigrok project.
3 ##
4 ## Copyright (C) 2012 Iztok Jeras <iztok.jeras@gmail.com>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, write to the Free Software
18 ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19 ##
20
21 # 1-Wire protocol decoder
22
23 import sigrokdecode as srd
24
25 # Annotation feed formats
26 ANN_LINK      = 0
27 ANN_NETWORK   = 1
28 ANN_TRANSPORT = 2
29
30 # a dictionary of ROM commands and their names
31 rom_command = {0x33: "READ ROM",
32                0x0f: "CONDITIONAL READ ROM",
33                0xcc: "SKIP ROM",
34                0x55: "MATCH ROM",
35                0xf0: "SEARCH ROM",
36                0xec: "CONDITIONAL SEARCH ROM",
37                0x3c: "OVERDRIVE SKIP ROM",
38                0x6d: "OVERDRIVE MATCH ROM"}
39
40 class Decoder(srd.Decoder):
41     api_version = 1
42     id = 'onewire'
43     name = '1-Wire'
44     longname = '1-Wire serial communication bus'
45     desc = 'Bidirectional, half-duplex, asynchronous serial bus.'
46     license = 'gplv2+'
47     inputs = ['logic']
48     outputs = ['onewire']
49     probes = [
50         {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire bus'},
51     ]
52     optional_probes = [
53         {'id': 'pwr', 'name': 'PWR', 'desc': '1-Wire power'},
54     ]
55     options = {
56         'overdrive' : ['Overdrive', 1],
57         'cnt_normal_bit'        : ['Time (in samplerate periods) for normal mode sample bit'     , 0],
58         'cnt_normal_presence'   : ['Time (in samplerate periods) for normal mode sample presence', 0],
59         'cnt_normal_reset'      : ['Time (in samplerate periods) for normal mode reset'          , 0],
60         'cnt_overdrive_bit'     : ['Time (in samplerate periods) for overdrive mode sample bit'     , 0],
61         'cnt_overdrive_presence': ['Time (in samplerate periods) for overdrive mode sample presence', 0],
62         'cnt_overdrive_reset'   : ['Time (in samplerate periods) for overdrive mode reset'          , 0],
63     }
64     annotations = [
65         ['Link', 'Link layer events (reset, presence, bit slots)'],
66         ['Network', 'Network layer events (device addressing)'],
67         ['Transport', 'Transport layer events'],
68     ]
69
70     def __init__(self, **kwargs):
71         # Common variables
72         self.samplenum = 0
73         # Link layer variables
74         self.lnk_state   = 'WAIT FOR FALLING EDGE'
75         self.lnk_event   = 'NONE'
76         self.lnk_present = 0
77         self.lnk_bit     = 0
78         self.lnk_overdrive = 0
79         # Event timing variables
80         self.lnk_fall    = 0
81         self.lnk_rise    = 0
82         self.net_beg     = 0
83         self.net_end     = 0
84         self.net_len     = 0
85         # Network layer variables
86         self.net_state   = 'IDLE'
87         self.net_cnt     = 0
88         self.net_search  = "P"
89         self.net_data_p  = 0x0
90         self.net_data_n  = 0x0
91         self.net_data    = 0x0
92         self.net_rom     = 0x0000000000000000
93
94     def start(self, metadata):
95         self.out_proto = self.add(srd.OUTPUT_PROTO, 'onewire')
96         self.out_ann   = self.add(srd.OUTPUT_ANN  , 'onewire')
97
98         # check if samplerate is appropriate
99         self.samplerate = metadata['samplerate']
100         if (self.options['overdrive']):
101             self.put(0, 0, self.out_ann, [ANN_LINK,
102               ['NOTE: Sample rate checks assume overdrive mode.']])
103             if   (self.samplerate < 2000000):
104                 self.put(0, 0, self.out_ann, [ANN_LINK,
105                   ['ERROR: Sampling rate is too low must be above 2MHz for proper overdrive mode decoding.']])
106             elif (self.samplerate < 5000000):
107                 self.put(0, 0, self.out_ann, [ANN_LINK,
108                   ['WARNING: Sampling rate is suggested to be above 5MHz for proper overdrive mode decoding.']])
109         else:
110             self.put(0, 0, self.out_ann, [ANN_LINK,
111               ['NOTE: Sample rate checks assume normal mode only.']])
112             if   (self.samplerate <  400000):
113                 self.put(0, 0, self.out_ann, [ANN_LINK,
114                   ['ERROR: Sampling rate is too low must be above 400kHz for proper normal mode decoding.']])
115             elif (self.samplerate < 1000000):
116                 self.put(0, 0, self.out_ann, [ANN_LINK,
117                   ['WARNING: Sampling rate is suggested to be above 1MHz for proper normal mode decoding.']])
118
119         # The default 1-Wire time base is 30us, this is used to calculate sampling times.
120         if (self.options['cnt_normal_bit']):
121             self.cnt_normal_bit = self.options['cnt_normal_bit']
122         else:
123             self.cnt_normal_bit = int(float(self.samplerate) * 0.000015) - 1 # 15ns
124         if (self.options['cnt_normal_presence']):
125             self.cnt_normal_presence = self.options['cnt_normal_presence']
126         else:
127             self.cnt_normal_presence = int(float(self.samplerate) * 0.000075) - 1 # 75ns
128         if (self.options['cnt_normal_reset']):
129             self.cnt_normal_reset = self.options['cnt_normal_reset']
130         else:
131             self.cnt_normal_reset = int(float(self.samplerate) * 0.000480) - 1 # 480ns
132         if (self.options['cnt_overdrive_bit']):
133             self.cnt_overdrive_bit = self.options['cnt_overdrive_bit']
134         else:
135             self.cnt_overdrive_bit = int(float(self.samplerate) * 0.000002) - 1 # 2ns
136         if (self.options['cnt_overdrive_presence']):
137             self.cnt_overdrive_presence = self.options['cnt_overdrive_presence']
138         else:
139             self.cnt_overdrive_presence = int(float(self.samplerate) * 0.000010) - 1 # 10ns
140         if (self.options['cnt_overdrive_reset']):
141             self.cnt_overdrive_reset = self.options['cnt_overdrive_reset']
142         else:
143             self.cnt_overdrive_reset = int(float(self.samplerate) * 0.000048) - 1 # 48ns
144
145         # calculating the slot size
146         self.cnt_normal_slot    = int(float(self.samplerate) * 0.000060) - 1 # 60ns
147         self.cnt_overdrive_slot = int(float(self.samplerate) * 0.000006) - 1 #  6ns
148
149         # organize values into lists
150         self.cnt_bit      = [self.cnt_normal_bit     , self.cnt_overdrive_bit     ]
151         self.cnt_presence = [self.cnt_normal_presence, self.cnt_overdrive_presence]
152         self.cnt_reset    = [self.cnt_normal_reset   , self.cnt_overdrive_reset   ]
153         self.cnt_slot     = [self.cnt_normal_slot    , self.cnt_overdrive_slot    ]
154
155         # Check if sample times are in the allowed range
156         time_min = float(self.cnt_normal_bit  ) / self.samplerate
157         time_max = float(self.cnt_normal_bit+1) / self.samplerate
158         if ( (time_min < 0.000005) or (time_max > 0.000015) ) :
159            self.put(0, 0, self.out_ann, [ANN_LINK,
160              ['WARNING: The normal mode data sample time interval (%2.1fus-%2.1fus) should be inside (5.0us, 15.0us).'
161                % (time_min*1000000, time_max*1000000)]])
162         time_min = float(self.cnt_normal_presence  ) / self.samplerate
163         time_max = float(self.cnt_normal_presence+1) / self.samplerate
164         if ( (time_min < 0.0000681) or (time_max > 0.000075) ) :
165            self.put(0, 0, self.out_ann, [ANN_LINK,
166              ['WARNING: The normal mode presence sample time interval (%2.1fus-%2.1fus) should be inside (68.1us, 75.0us).'
167                % (time_min*1000000, time_max*1000000)]])
168         time_min = float(self.cnt_overdrive_bit  ) / self.samplerate
169         time_max = float(self.cnt_overdrive_bit+1) / self.samplerate
170         if ( (time_min < 0.000001) or (time_max > 0.000002) ) :
171            self.put(0, 0, self.out_ann, [ANN_LINK,
172              ['WARNING: The overdrive mode data sample time interval (%2.1fus-%2.1fus) should be inside (1.0us, 2.0us).'
173                % (time_min*1000000, time_max*1000000)]])
174         time_min = float(self.cnt_overdrive_presence  ) / self.samplerate
175         time_max = float(self.cnt_overdrive_presence+1) / self.samplerate
176         if ( (time_min < 0.0000073) or (time_max > 0.000010) ) :
177            self.put(0, 0, self.out_ann, [ANN_LINK,
178              ['WARNING: The overdrive mode presence sample time interval (%2.1fus-%2.1fus) should be inside (7.3us, 10.0us).'
179                % (time_min*1000000, time_max*1000000)]])
180
181     def report(self):
182         pass
183
184     def decode(self, ss, es, data):
185         for (self.samplenum, (owr, pwr)) in data:
186
187             # Data link layer
188
189             # Clear events.
190             self.lnk_event = "NONE"
191             # State machine.
192             if self.lnk_state == 'WAIT FOR FALLING EDGE':
193                 # The start of a cycle is a falling edge.
194                 if (owr == 0):
195                     # Save the sample number for the falling edge.
196                     self.lnk_fall = self.samplenum
197                     # Go to waiting for sample time
198                     self.lnk_state = 'WAIT FOR DATA SAMPLE'
199             elif self.lnk_state == 'WAIT FOR DATA SAMPLE':
200                 # Sample data bit
201                 if (self.samplenum - self.lnk_fall == self.cnt_bit[self.lnk_overdrive]):
202                     self.lnk_bit  = owr & 0x1
203                     self.lnk_event = "DATA BIT"
204                     if (self.lnk_bit):  self.lnk_state = 'WAIT FOR FALLING EDGE'
205                     else             :  self.lnk_state = 'WAIT FOR RISING EDGE'
206                     self.put(self.lnk_fall, self.cnt_bit[self.lnk_overdrive], self.out_ann, [ANN_LINK, ['BIT: %01x' % self.lnk_bit]])
207             elif self.lnk_state == 'WAIT FOR RISING EDGE':
208                 # The end of a cycle is a rising edge.
209                 if (owr == 1):
210                     # Check if this was a reset cycle
211                     if (self.samplenum - self.lnk_fall > self.cnt_normal_reset):
212                         # Save the sample number for the falling edge.
213                         self.lnk_rise = self.samplenum
214                         # Send a reset event to the next protocol layer.
215                         self.lnk_event = "RESET"
216                         self.lnk_state = "WAIT FOR PRESENCE DETECT"
217                         self.put(self.lnk_fall, self.lnk_rise, self.out_proto, ['RESET'])
218                         self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_LINK     , ['RESET']])
219                         self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_NETWORK  , ['RESET']])
220                         # Reset the timer.
221                         self.lnk_fall = self.samplenum
222                     elif ((self.samplenum - self.lnk_fall > self.cnt_overdrive_reset) and (self.lnk_overdrive)):
223                         # Save the sample number for the falling edge.
224                         self.lnk_rise = self.samplenum
225                         # Send a reset event to the next protocol layer.
226                         self.lnk_event = "RESET"
227                         self.lnk_state = "WAIT FOR PRESENCE DETECT"
228                         self.put(self.lnk_fall, self.lnk_rise, self.out_proto, ['RESET OVERDRIVE'])
229                         self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_LINK     , ['RESET OVERDRIVE']])
230                         self.put(self.lnk_fall, self.lnk_rise, self.out_ann, [ANN_NETWORK  , ['RESET OVERDRIVE']])
231                         # Reset the timer.
232                         self.lnk_fall = self.samplenum
233                     # Otherwise this is assumed to be a data bit.
234                     else :
235                         self.lnk_state = "WAIT FOR FALLING EDGE"
236             elif self.lnk_state == 'WAIT FOR PRESENCE DETECT':
237                 # Sample presence status
238                 if (self.samplenum - self.lnk_rise == self.cnt_presence[self.lnk_overdrive]):
239                     self.lnk_present = owr & 0x1
240                     # Save the sample number for the falling edge.
241                     if not (self.lnk_present) :  self.lnk_fall = self.samplenum
242                     # create presence detect event
243                     #self.lnk_event   = "PRESENCE DETECT"
244                     if (self.lnk_present) :  self.lnk_state = 'WAIT FOR FALLING EDGE'
245                     else                  :  self.lnk_state = 'WAIT FOR RISING EDGE'
246                     present_str = "False" if self.lnk_present else "True"
247                     self.put(self.samplenum, 0, self.out_ann, [ANN_LINK   , ['PRESENCE: ' + present_str]])
248                     self.put(self.samplenum, 0, self.out_ann, [ANN_NETWORK, ['PRESENCE: ' + present_str]])
249             else:
250                 raise Exception('Invalid lnk_state: %d' % self.lnk_state)
251
252             # Network layer
253
254             # State machine.
255             if (self.lnk_event == "RESET"):
256                 self.net_state = "COMMAND"
257                 self.net_search = "P"
258                 self.net_cnt    = 0
259             elif (self.net_state == "IDLE"):
260                 pass
261             elif (self.net_state == "COMMAND"):
262                 # Receiving and decoding a ROM command
263                 if (self.onewire_collect(8)):
264                     self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK,
265                       ['ROM COMMAND: 0x%02x \'%s\'' % (self.net_data, rom_command[self.net_data])]])
266                     if   (self.net_data == 0x33):  # READ ROM
267                         self.net_state = "GET ROM"
268                     elif (self.net_data == 0x0f):  # CONDITIONAL READ ROM
269                         self.net_state = "GET ROM"
270                     elif (self.net_data == 0xcc):  # SKIP ROM
271                         self.net_state = "TRANSPORT"
272                     elif (self.net_data == 0x55):  # MATCH ROM
273                         self.net_state = "GET ROM"
274                     elif (self.net_data == 0xf0):  # SEARCH ROM
275                         self.net_state = "SEARCH ROM"
276                     elif (self.net_data == 0xec):  # CONDITIONAL SEARCH ROM
277                         self.net_state = "SEARCH ROM"
278                     elif (self.net_data == 0x3c):  # OVERDRIVE SKIP ROM
279                         self.lnk_overdrive = 1
280                         self.net_state = "TRANSPORT"
281                     elif (self.net_data == 0x69):  # OVERDRIVE MATCH ROM
282                         self.lnk_overdrive = 1
283                         self.net_state = "GET ROM"
284             elif (self.net_state == "GET ROM"):
285                 # A 64 bit device address is selected
286                 # family code (1B) + serial number (6B) + CRC (1B)
287                 if (self.onewire_collect(64)):
288                     self.net_rom = self.net_data & 0xffffffffffffffff
289                     self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]])
290                     self.net_state = "TRANSPORT"
291             elif (self.net_state == "SEARCH ROM"):
292                 # A 64 bit device address is searched for
293                 # family code (1B) + serial number (6B) + CRC (1B)
294                 if (self.onewire_search(64)):
295                     self.net_rom = self.net_data & 0xffffffffffffffff
296                     self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK, ['ROM: 0x%016x' % self.net_rom]])
297                     self.net_state = "TRANSPORT"
298             elif (self.net_state == "TRANSPORT"):
299                 # The transport layer is handled in byte sized units
300                 if (self.onewire_collect(8)):
301                     self.put(self.net_beg, self.net_len, self.out_ann, [ANN_NETWORK  , ['TRANSPORT: 0x%02x' % self.net_data]])
302                     self.put(self.net_beg, self.net_len, self.out_ann, [ANN_TRANSPORT, ['TRANSPORT: 0x%02x' % self.net_data]])
303                     self.put(self.net_beg, self.net_len, self.out_proto, ['transfer', self.net_data])
304                     # TODO: Sending translort layer data to 1-Wire device models
305             else:
306                 raise Exception('Invalid net_state: %s' % self.net_state)
307
308
309     # Link/Network layer data collector
310     def onewire_collect (self, length):
311         if (self.lnk_event == "DATA BIT"):
312             # Storing the sampe this sequence begins with
313             if (self.net_cnt == 1):
314                 self.net_beg = self.lnk_fall
315             self.net_data = self.net_data & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
316             self.net_cnt  = self.net_cnt + 1
317             # Storing the sampe this sequence ends with
318             # In case the full length of the sequence is received, return 1
319             if (self.net_cnt == length):
320                 self.net_end  = self.lnk_fall + self.cnt_slot[self.lnk_overdrive]
321                 self.net_len  = self.net_end - self.net_beg
322                 self.net_data = self.net_data & ((1<<length)-1)
323                 self.net_cnt  = 0
324                 return (1)
325             else:
326                 return (0)
327         else:
328             return (0)
329
330     # Link/Network layer search collector
331     def onewire_search (self, length):
332         if (self.lnk_event == "DATA BIT"):
333             # Storing the sampe this sequence begins with
334             if ((self.net_cnt == 0) and (self.net_search == "P")):
335                 self.net_beg  = self.lnk_fall
336             # Master receives an original address bit
337             if   (self.net_search == "P"):
338               self.net_data_p = self.net_data_p & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
339               self.net_search = "N"
340             # Master receives a complemented address bit
341             elif (self.net_search == "N"):
342               self.net_data_n = self.net_data_n & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
343               self.net_search = "D"
344             # Master transmits an address bit
345             elif (self.net_search == "D"):
346               self.net_data   = self.net_data   & ~(1 << self.net_cnt) | (self.lnk_bit << self.net_cnt)
347               self.net_search = "P"
348               self.net_cnt    = self.net_cnt + 1
349             # Storing the sampe this sequence ends with
350             # In case the full length of the sequence is received, return 1
351             if (self.net_cnt == length):
352                 self.net_end    = self.lnk_fall + self.cnt_slot[self.lnk_overdrive]
353                 self.net_len    = self.net_end - self.net_beg
354                 self.net_data_p = self.net_data_p & ((1<<length)-1)
355                 self.net_data_n = self.net_data_n & ((1<<length)-1)
356                 self.net_data   = self.net_data   & ((1<<length)-1)
357                 self.net_search = "P"
358                 self.net_cnt    = 0
359                 return (1)
360             else:
361                 return (0)
362         else:
363             return (0)