]> sigrok.org Git - libsigrokdecode.git/blob - decoders/onewire_link/pd.py
onewire_link: (Re-)convert to PD API version 3.
[libsigrokdecode.git] / decoders / onewire_link / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2017 Kevin Redon <kingkevin@cuvoodoo.info>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class SamplerateError(Exception):
23     pass
24
25 # Timing values in us for the signal at regular and overdrive speed.
26 timing = {
27     'RSTL': {
28         'min': {
29             False: 480.0,
30             True: 48.0,
31         },
32         'max': {
33             False: 960.0,
34             True: 80.0,
35         },
36     },
37     'RSTH': {
38         'min': {
39             False: 480.0,
40             True: 48.0,
41         },
42     },
43     'PDH': {
44         'min': {
45             False: 15.0,
46             True: 2.0,
47         },
48         'max': {
49             False: 60.0,
50             True: 6.0,
51         },
52     },
53     'PDL': {
54         'min': {
55             False: 60.0,
56             True: 8.0,
57         },
58         'max': {
59             False: 240.0,
60             True: 24.0,
61         },
62     },
63     'SLOT': {
64         'min': {
65             False: 60.0,
66             True: 6.0,
67         },
68         'max': {
69             False: 120.0,
70             True: 16.0,
71         },
72     },
73     'REC': {
74         'min': {
75             False: 1.0,
76             True: 1.0,
77         },
78     },
79     'LOWR': {
80         'min': {
81             False: 1.0,
82             True: 1.0,
83         },
84         'max': {
85             False: 15.0,
86             True: 2.0,
87         },
88     },
89 }
90
91 class Decoder(srd.Decoder):
92     api_version = 3
93     id = 'onewire_link'
94     name = '1-Wire link layer'
95     longname = '1-Wire serial communication bus (link layer)'
96     desc = 'Bidirectional, half-duplex, asynchronous serial bus.'
97     license = 'gplv2+'
98     inputs = ['logic']
99     outputs = ['onewire_link']
100     channels = (
101         {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire signal line'},
102     )
103     options = (
104         {'id': 'overdrive', 'desc': 'Start in overdrive speed',
105             'default': 'no', 'values': ('yes', 'no')},
106     )
107     annotations = (
108         ('bit', 'Bit'),
109         ('warnings', 'Warnings'),
110         ('reset', 'Reset'),
111         ('presence', 'Presence'),
112         ('overdrive', 'Overdrive speed notifications'),
113     )
114     annotation_rows = (
115         ('bits', 'Bits', (0, 2, 3)),
116         ('info', 'Info', (4,)),
117         ('warnings', 'Warnings', (1,)),
118     )
119
120     def __init__(self):
121         self.samplerate = None
122         self.state = 'INITIAL'
123         self.present = 0
124         self.bit = 0
125         self.bit_count = -1
126         self.command = 0
127         self.overdrive = False
128         self.fall = 0
129         self.rise = 0
130
131     def start(self):
132         self.out_python = self.register(srd.OUTPUT_PYTHON)
133         self.out_ann = self.register(srd.OUTPUT_ANN)
134         self.overdrive = (self.options['overdrive'] == 'yes')
135         self.fall = 0
136         self.rise = 0
137         self.bit_count = -1
138
139     def putm(self, data):
140         self.put(0, 0, self.out_ann, data)
141
142     def putpfs(self, data):
143         self.put(self.fall, self.samplenum, self.out_python, data)
144
145     def putfs(self, data):
146         self.put(self.fall, self.samplenum, self.out_ann, data)
147
148     def putfr(self, data):
149         self.put(self.fall, self.rise, self.out_ann, data)
150
151     def putprs(self, data):
152         self.put(self.rise, self.samplenum, self.out_python, data)
153
154     def putrs(self, data):
155         self.put(self.rise, self.samplenum, self.out_ann, data)
156
157     def checks(self):
158         # Check if samplerate is appropriate.
159         if self.options['overdrive'] == 'yes':
160             if self.samplerate < 2000000:
161                 self.putm([1, ['Sampling rate is too low. Must be above ' +
162                                '2MHz for proper overdrive mode decoding.']])
163             elif self.samplerate < 5000000:
164                 self.putm([1, ['Sampling rate is suggested to be above 5MHz ' +
165                                'for proper overdrive mode decoding.']])
166         else:
167             if self.samplerate < 400000:
168                 self.putm([1, ['Sampling rate is too low. Must be above ' +
169                                '400kHz for proper normal mode decoding.']])
170             elif self.samplerate < 1000000:
171                 self.putm([1, ['Sampling rate is suggested to be above ' +
172                                '1MHz for proper normal mode decoding.']])
173
174     def metadata(self, key, value):
175         if key != srd.SRD_CONF_SAMPLERATE:
176             return
177         self.samplerate = value
178
179     def wait_falling_timeout(self, start, t):
180         # Wait until either a falling edge is seen, and/or the specified
181         # number of samples have been skipped (i.e. time has passed).
182         cnt = int((t[self.overdrive] / 1000000.0) * self.samplerate)
183         samples_to_skip = (start + cnt) - self.samplenum
184         samples_to_skip = samples_to_skip if (samples_to_skip > 0) else 0
185         return self.wait([{0: 'f'}, {'skip': samples_to_skip}])
186
187     def decode(self):
188         if not self.samplerate:
189             raise SamplerateError('Cannot decode without samplerate.')
190         self.checks()
191         while True:
192             # State machine.
193             if self.state == 'INITIAL': # Unknown initial state.
194                 # Wait until we reach the idle high state.
195                 self.wait({0: 'h'})
196                 self.rise = self.samplenum
197                 self.state = 'IDLE'
198             elif self.state == 'IDLE': # Idle high state.
199                 # Wait for falling edge.
200                 self.wait({0: 'f'})
201                 self.fall = self.samplenum
202                 # Get time since last rising edge.
203                 time = ((self.fall - self.rise) / self.samplerate) * 1000000.0
204                 if self.rise > 0 and \
205                     time < timing['REC']['min'][self.overdrive]:
206                     self.putfr([1, ['Recovery time not long enough'
207                         'Recovery too short',
208                         'REC < ' + str(timing['REC']['min'][self.overdrive])]])
209                 # A reset pulse or slot can start on a falling edge.
210                 self.state = 'LOW'
211                 # TODO: Check minimum recovery time.
212             elif self.state == 'LOW': # Reset pulse or slot.
213                 # Wait for rising edge.
214                 self.wait({0: 'r'})
215                 self.rise = self.samplenum
216                 # Detect reset or slot base on timing.
217                 time = ((self.rise - self.fall) / self.samplerate) * 1000000.0
218                 if time >= timing['RSTL']['min'][False]: # Normal reset pulse.
219                     if time > timing['RSTL']['max'][False]:
220                         self.putfr([1, ['Too long reset pulse might mask interrupt ' +
221                             'signalling by other devices',
222                             'Reset pulse too long',
223                             'RST > ' + str(timing['RSTL']['max'][False])]])
224                     # Regular reset pulse clears overdrive speed.
225                     if self.overdrive:
226                         self.putfr([4, ['Exiting overdrive mode', 'Overdrive off']])
227                     self.overdrive = False
228                     self.putfr([2, ['Reset', 'Rst', 'R']])
229                     self.state = 'PRESENCE DETECT HIGH'
230                 elif self.overdrive == True and \
231                     time >= timing['RSTL']['min'][self.overdrive] and \
232                     time < timing['RSTL']['max'][self.overdrive]:
233                     # Overdrive reset pulse.
234                     self.putfr([2, ['Reset', 'Rst', 'R']])
235                     self.state = 'PRESENCE DETECT HIGH'
236                 elif time < timing['SLOT']['max'][self.overdrive]:
237                     # Read/write time slot.
238                     if time < timing['LOWR']['min'][self.overdrive]:
239                         self.putfr([1, ['Low signal not long enough',
240                             'Low too short',
241                             'LOW < ' + str(timing['LOWR']['min'][self.overdrive])]])
242                     if time < timing['LOWR']['max'][self.overdrive]:
243                         self.bit = 1 # Short pulse is a 1 bit.
244                     else:
245                         self.bit = 0 # Long pulse is a 0 bit.
246                     # Wait for end of slot.
247                     self.state = 'SLOT'
248                 else:
249                     # Timing outside of known states.
250                     self.putfr([1, ['Erroneous signal', 'Error', 'Err', 'E']])
251                     self.state = 'IDLE'
252             elif self.state == 'PRESENCE DETECT HIGH': # Wait for slave presence signal.
253                 # Wait for a falling edge and/or presence detect signal.
254                 self.wait_falling_timeout(self.rise, timing['PDH']['max'])
255
256                 # Calculate time since rising edge.
257                 time = ((self.samplenum - self.rise) / self.samplerate) * 1000000.0
258
259                 if self.matched[0] and not self.matched[1]:
260                     # Presence detected.
261                     if time < timing['PDH']['min'][self.overdrive]:
262                         self.putrs([1, ['Presence detect signal is too early',
263                             'Presence detect too early',
264                             'PDH < ' + str(timing['PDH']['min'][self.overdrive])]])
265                     self.fall = self.samplenum
266                     self.state = 'PRESENCE DETECT LOW'
267                 else: # No presence detected.
268                     self.putrs([3, ['Presence: false', 'Presence', 'Pres', 'P']])
269                     self.putprs(['RESET/PRESENCE', False])
270                     self.state = 'IDLE'
271             elif self.state == 'PRESENCE DETECT LOW': # Slave presence signalled.
272                 # Wait for end of presence signal (on rising edge).
273                 self.wait({0: 'r'})
274                 # Calculate time since start of presence signal.
275                 time = ((self.samplenum - self.fall) / self.samplerate) * 1000000.0
276                 if time < timing['PDL']['min'][self.overdrive]:
277                     self.putfs([1, ['Presence detect signal is too short',
278                         'Presence detect too short',
279                         'PDL < ' + str(timing['PDL']['min'][self.overdrive])]])
280                 elif time > timing['PDL']['max'][self.overdrive]:
281                     self.putfs([1, ['Presence detect signal is too long',
282                         'Presence detect too long',
283                         'PDL > ' + str(timing['PDL']['max'][self.overdrive])]])
284                 if time > timing['RSTH']['min'][self.overdrive]:
285                     self.rise = self.samplenum
286                 # Wait for end of presence detect.
287                 self.state = 'PRESENCE DETECT'
288
289             # End states (for additional checks).
290             if self.state == 'SLOT': # Wait for end of time slot.
291                 # Wait for a falling edge and/or end of timeslot.
292                 self.wait_falling_timeout(self.fall, timing['SLOT']['min'])
293
294                 if self.matched[0] and not self.matched[1]:
295                     # Low detected before end of slot.
296                     self.putfs([1, ['Time slot not long enough',
297                         'Slot too short',
298                         'SLOT < ' + str(timing['SLOT']['min'][self.overdrive])]])
299                     # Don't output invalid bit.
300                     self.fall = self.samplenum
301                     self.state = 'LOW'
302                 else: # End of time slot.
303                     # Output bit.
304                     self.putfs([0, ['Bit: %d' % self.bit, '%d' % self.bit]])
305                     self.putpfs(['BIT', self.bit])
306                     # Save command bits.
307                     if self.bit_count >= 0:
308                         self.command += (self.bit << self.bit_count)
309                         self.bit_count += 1
310                     # Check for overdrive ROM command.
311                     if self.bit_count >= 8:
312                         if self.command == 0x3c or self.command == 0x69:
313                             self.overdrive = True
314                             self.put(self.samplenum, self.samplenum,
315                                 self.out_ann,
316                                 [4, ['Entering overdrive mode', 'Overdrive on']])
317                         self.bit_count = -1
318                     self.state = 'IDLE'
319
320             if self.state == 'PRESENCE DETECT':
321                 # Wait for a falling edge and/or end of presence detect.
322                 self.wait_falling_timeout(self.rise, timing['RSTH']['min'])
323
324                 if self.matched[0] and not self.matched[1]:
325                     # Low detected before end of presence detect.
326                     self.putfs([1, ['Presence detect not long enough',
327                         'Presence detect too short',
328                         'RTSH < ' + str(timing['RSTH']['min'][self.overdrive])]])
329                     # Inform about presence detected.
330                     self.putrs([3, ['Slave presence detected', 'Slave present',
331                         'Present', 'P']])
332                     self.putprs(['RESET/PRESENCE', True])
333                     self.fall = self.samplenum
334                     self.state = 'LOW'
335                 else: # End of time slot.
336                     # Inform about presence detected.
337                     self.putrs([3, ['Presence: true', 'Presence', 'Pres', 'P']])
338                     self.putprs(['RESET/PRESENCE', True])
339                     self.rise = self.samplenum
340                     # Start counting the first 8 bits to get the ROM command.
341                     self.bit_count = 0
342                     self.command = 0
343                     self.state = 'IDLE'