]> sigrok.org Git - libsigrokdecode.git/blob - decoders/onewire_link/pd.py
564d6f0bc86895725b37116f683dbfe15bfae9eb
[libsigrokdecode.git] / decoders / onewire_link / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2017 Kevin Redon <kingkevin@cuvoodoo.info>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class SamplerateError(Exception):
23     pass
24
25 # Timing values in us for the signal at regular and overdrive speed.
26 timing = {
27     'RSTL': {
28         'min': {
29             False: 480.0,
30             True: 48.0,
31         },
32         'max': {
33             False: 960.0,
34             True: 80.0,
35         },
36     },
37     'RSTH': {
38         'min': {
39             False: 480.0,
40             True: 48.0,
41         },
42     },
43     'PDH': {
44         'min': {
45             False: 15.0,
46             True: 2.0,
47         },
48         'max': {
49             False: 60.0,
50             True: 6.0,
51         },
52     },
53     'PDL': {
54         'min': {
55             False: 60.0,
56             True: 8.0,
57         },
58         'max': {
59             False: 240.0,
60             True: 24.0,
61         },
62     },
63     'SLOT': {
64         'min': {
65             False: 60.0,
66             True: 6.0,
67         },
68         'max': {
69             False: 120.0,
70             True: 16.0,
71         },
72     },
73     'REC': {
74         'min': {
75             False: 1.0,
76             True: 1.0,
77         },
78     },
79     'LOWR': {
80         'min': {
81             False: 1.0,
82             True: 1.0,
83         },
84         'max': {
85             False: 15.0,
86             True: 2.0,
87         },
88     },
89 }
90
91 class Decoder(srd.Decoder):
92     api_version = 3
93     id = 'onewire_link'
94     name = '1-Wire link layer'
95     longname = '1-Wire serial communication bus (link layer)'
96     desc = 'Bidirectional, half-duplex, asynchronous serial bus.'
97     license = 'gplv2+'
98     inputs = ['logic']
99     outputs = ['onewire_link']
100     tags = ['Embedded/industrial']
101     channels = (
102         {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire signal line'},
103     )
104     options = (
105         {'id': 'overdrive', 'desc': 'Start in overdrive speed',
106             'default': 'no', 'values': ('yes', 'no')},
107     )
108     annotations = (
109         ('bit', 'Bit'),
110         ('warnings', 'Warnings'),
111         ('reset', 'Reset'),
112         ('presence', 'Presence'),
113         ('overdrive', 'Overdrive speed notifications'),
114     )
115     annotation_rows = (
116         ('bits', 'Bits', (0, 2, 3)),
117         ('info', 'Info', (4,)),
118         ('warnings', 'Warnings', (1,)),
119     )
120
121     def __init__(self):
122         self.reset()
123
124     def reset(self):
125         self.samplerate = None
126         self.state = 'INITIAL'
127         self.present = 0
128         self.bit = 0
129         self.bit_count = -1
130         self.command = 0
131         self.overdrive = False
132         self.fall = 0
133         self.rise = 0
134
135     def start(self):
136         self.out_python = self.register(srd.OUTPUT_PYTHON)
137         self.out_ann = self.register(srd.OUTPUT_ANN)
138         self.overdrive = (self.options['overdrive'] == 'yes')
139         self.fall = 0
140         self.rise = 0
141         self.bit_count = -1
142
143     def putm(self, data):
144         self.put(0, 0, self.out_ann, data)
145
146     def putpfs(self, data):
147         self.put(self.fall, self.samplenum, self.out_python, data)
148
149     def putfs(self, data):
150         self.put(self.fall, self.samplenum, self.out_ann, data)
151
152     def putfr(self, data):
153         self.put(self.fall, self.rise, self.out_ann, data)
154
155     def putprs(self, data):
156         self.put(self.rise, self.samplenum, self.out_python, data)
157
158     def putrs(self, data):
159         self.put(self.rise, self.samplenum, self.out_ann, data)
160
161     def checks(self):
162         # Check if samplerate is appropriate.
163         if self.options['overdrive'] == 'yes':
164             if self.samplerate < 2000000:
165                 self.putm([1, ['Sampling rate is too low. Must be above ' +
166                                '2MHz for proper overdrive mode decoding.']])
167             elif self.samplerate < 5000000:
168                 self.putm([1, ['Sampling rate is suggested to be above 5MHz ' +
169                                'for proper overdrive mode decoding.']])
170         else:
171             if self.samplerate < 400000:
172                 self.putm([1, ['Sampling rate is too low. Must be above ' +
173                                '400kHz for proper normal mode decoding.']])
174             elif self.samplerate < 1000000:
175                 self.putm([1, ['Sampling rate is suggested to be above ' +
176                                '1MHz for proper normal mode decoding.']])
177
178     def metadata(self, key, value):
179         if key != srd.SRD_CONF_SAMPLERATE:
180             return
181         self.samplerate = value
182
183     def wait_falling_timeout(self, start, t):
184         # Wait until either a falling edge is seen, and/or the specified
185         # number of samples have been skipped (i.e. time has passed).
186         cnt = int((t[self.overdrive] / 1000000.0) * self.samplerate)
187         samples_to_skip = (start + cnt) - self.samplenum
188         samples_to_skip = samples_to_skip if (samples_to_skip > 0) else 0
189         return self.wait([{0: 'f'}, {'skip': samples_to_skip}])
190
191     def decode(self):
192         if not self.samplerate:
193             raise SamplerateError('Cannot decode without samplerate.')
194         self.checks()
195         while True:
196             # State machine.
197             if self.state == 'INITIAL': # Unknown initial state.
198                 # Wait until we reach the idle high state.
199                 self.wait({0: 'h'})
200                 self.rise = self.samplenum
201                 self.state = 'IDLE'
202             elif self.state == 'IDLE': # Idle high state.
203                 # Wait for falling edge.
204                 self.wait({0: 'f'})
205                 self.fall = self.samplenum
206                 # Get time since last rising edge.
207                 time = ((self.fall - self.rise) / self.samplerate) * 1000000.0
208                 if self.rise > 0 and \
209                     time < timing['REC']['min'][self.overdrive]:
210                     self.putfr([1, ['Recovery time not long enough'
211                         'Recovery too short',
212                         'REC < ' + str(timing['REC']['min'][self.overdrive])]])
213                 # A reset pulse or slot can start on a falling edge.
214                 self.state = 'LOW'
215                 # TODO: Check minimum recovery time.
216             elif self.state == 'LOW': # Reset pulse or slot.
217                 # Wait for rising edge.
218                 self.wait({0: 'r'})
219                 self.rise = self.samplenum
220                 # Detect reset or slot base on timing.
221                 time = ((self.rise - self.fall) / self.samplerate) * 1000000.0
222                 if time >= timing['RSTL']['min'][False]: # Normal reset pulse.
223                     if time > timing['RSTL']['max'][False]:
224                         self.putfr([1, ['Too long reset pulse might mask interrupt ' +
225                             'signalling by other devices',
226                             'Reset pulse too long',
227                             'RST > ' + str(timing['RSTL']['max'][False])]])
228                     # Regular reset pulse clears overdrive speed.
229                     if self.overdrive:
230                         self.putfr([4, ['Exiting overdrive mode', 'Overdrive off']])
231                     self.overdrive = False
232                     self.putfr([2, ['Reset', 'Rst', 'R']])
233                     self.state = 'PRESENCE DETECT HIGH'
234                 elif self.overdrive == True and \
235                     time >= timing['RSTL']['min'][self.overdrive] and \
236                     time < timing['RSTL']['max'][self.overdrive]:
237                     # Overdrive reset pulse.
238                     self.putfr([2, ['Reset', 'Rst', 'R']])
239                     self.state = 'PRESENCE DETECT HIGH'
240                 elif time < timing['SLOT']['max'][self.overdrive]:
241                     # Read/write time slot.
242                     if time < timing['LOWR']['min'][self.overdrive]:
243                         self.putfr([1, ['Low signal not long enough',
244                             'Low too short',
245                             'LOW < ' + str(timing['LOWR']['min'][self.overdrive])]])
246                     if time < timing['LOWR']['max'][self.overdrive]:
247                         self.bit = 1 # Short pulse is a 1 bit.
248                     else:
249                         self.bit = 0 # Long pulse is a 0 bit.
250                     # Wait for end of slot.
251                     self.state = 'SLOT'
252                 else:
253                     # Timing outside of known states.
254                     self.putfr([1, ['Erroneous signal', 'Error', 'Err', 'E']])
255                     self.state = 'IDLE'
256             elif self.state == 'PRESENCE DETECT HIGH': # Wait for slave presence signal.
257                 # Wait for a falling edge and/or presence detect signal.
258                 self.wait_falling_timeout(self.rise, timing['PDH']['max'])
259
260                 # Calculate time since rising edge.
261                 time = ((self.samplenum - self.rise) / self.samplerate) * 1000000.0
262
263                 if self.matched[0] and not self.matched[1]:
264                     # Presence detected.
265                     if time < timing['PDH']['min'][self.overdrive]:
266                         self.putrs([1, ['Presence detect signal is too early',
267                             'Presence detect too early',
268                             'PDH < ' + str(timing['PDH']['min'][self.overdrive])]])
269                     self.fall = self.samplenum
270                     self.state = 'PRESENCE DETECT LOW'
271                 else: # No presence detected.
272                     self.putrs([3, ['Presence: false', 'Presence', 'Pres', 'P']])
273                     self.putprs(['RESET/PRESENCE', False])
274                     self.state = 'IDLE'
275             elif self.state == 'PRESENCE DETECT LOW': # Slave presence signalled.
276                 # Wait for end of presence signal (on rising edge).
277                 self.wait({0: 'r'})
278                 # Calculate time since start of presence signal.
279                 time = ((self.samplenum - self.fall) / self.samplerate) * 1000000.0
280                 if time < timing['PDL']['min'][self.overdrive]:
281                     self.putfs([1, ['Presence detect signal is too short',
282                         'Presence detect too short',
283                         'PDL < ' + str(timing['PDL']['min'][self.overdrive])]])
284                 elif time > timing['PDL']['max'][self.overdrive]:
285                     self.putfs([1, ['Presence detect signal is too long',
286                         'Presence detect too long',
287                         'PDL > ' + str(timing['PDL']['max'][self.overdrive])]])
288                 if time > timing['RSTH']['min'][self.overdrive]:
289                     self.rise = self.samplenum
290                 # Wait for end of presence detect.
291                 self.state = 'PRESENCE DETECT'
292
293             # End states (for additional checks).
294             if self.state == 'SLOT': # Wait for end of time slot.
295                 # Wait for a falling edge and/or end of timeslot.
296                 self.wait_falling_timeout(self.fall, timing['SLOT']['min'])
297
298                 if self.matched[0] and not self.matched[1]:
299                     # Low detected before end of slot.
300                     self.putfs([1, ['Time slot not long enough',
301                         'Slot too short',
302                         'SLOT < ' + str(timing['SLOT']['min'][self.overdrive])]])
303                     # Don't output invalid bit.
304                     self.fall = self.samplenum
305                     self.state = 'LOW'
306                 else: # End of time slot.
307                     # Output bit.
308                     self.putfs([0, ['Bit: %d' % self.bit, '%d' % self.bit]])
309                     self.putpfs(['BIT', self.bit])
310                     # Save command bits.
311                     if self.bit_count >= 0:
312                         self.command += (self.bit << self.bit_count)
313                         self.bit_count += 1
314                     # Check for overdrive ROM command.
315                     if self.bit_count >= 8:
316                         if self.command == 0x3c or self.command == 0x69:
317                             self.overdrive = True
318                             self.put(self.samplenum, self.samplenum,
319                                 self.out_ann,
320                                 [4, ['Entering overdrive mode', 'Overdrive on']])
321                         self.bit_count = -1
322                     self.state = 'IDLE'
323
324             if self.state == 'PRESENCE DETECT':
325                 # Wait for a falling edge and/or end of presence detect.
326                 self.wait_falling_timeout(self.rise, timing['RSTH']['min'])
327
328                 if self.matched[0] and not self.matched[1]:
329                     # Low detected before end of presence detect.
330                     self.putfs([1, ['Presence detect not long enough',
331                         'Presence detect too short',
332                         'RTSH < ' + str(timing['RSTH']['min'][self.overdrive])]])
333                     # Inform about presence detected.
334                     self.putrs([3, ['Slave presence detected', 'Slave present',
335                         'Present', 'P']])
336                     self.putprs(['RESET/PRESENCE', True])
337                     self.fall = self.samplenum
338                     self.state = 'LOW'
339                 else: # End of time slot.
340                     # Inform about presence detected.
341                     self.putrs([3, ['Presence: true', 'Presence', 'Pres', 'P']])
342                     self.putprs(['RESET/PRESENCE', True])
343                     self.rise = self.samplenum
344                     # Start counting the first 8 bits to get the ROM command.
345                     self.bit_count = 0
346                     self.command = 0
347                     self.state = 'IDLE'