]> sigrok.org Git - libsigrokdecode.git/blob - decoders/onewire_link/pd.py
Add a CFP decoder.
[libsigrokdecode.git] / decoders / onewire_link / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2017 Kevin Redon <kingkevin@cuvoodoo.info>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class SamplerateError(Exception):
23     pass
24
25 # Timing values in us for the signal at regular and overdrive speed.
26 timing = {
27     'RSTL': {
28         'min': {
29             False: 480.0,
30             True: 48.0,
31         },
32         'max': {
33             False: 960.0,
34             True: 80.0,
35         },
36     },
37     'RSTH': {
38         'min': {
39             False: 480.0,
40             True: 48.0,
41         },
42     },
43     'PDH': {
44         'min': {
45             False: 15.0,
46             True: 2.0,
47         },
48         'max': {
49             False: 60.0,
50             True: 6.0,
51         },
52     },
53     'PDL': {
54         'min': {
55             False: 60.0,
56             True: 8.0,
57         },
58         'max': {
59             False: 240.0,
60             True: 24.0,
61         },
62     },
63     'SLOT': {
64         'min': {
65             False: 60.0,
66             True: 6.0,
67         },
68         'max': {
69             False: 120.0,
70             True: 16.0,
71         },
72     },
73     'REC': {
74         'min': {
75             False: 1.0,
76             True: 1.0,
77         },
78     },
79     'LOWR': {
80         'min': {
81             False: 1.0,
82             True: 1.0,
83         },
84         'max': {
85             False: 15.0,
86             True: 2.0,
87         },
88     },
89 }
90
91 class Decoder(srd.Decoder):
92     api_version = 3
93     id = 'onewire_link'
94     name = '1-Wire link layer'
95     longname = '1-Wire serial communication bus (link layer)'
96     desc = 'Bidirectional, half-duplex, asynchronous serial bus.'
97     license = 'gplv2+'
98     inputs = ['logic']
99     outputs = ['onewire_link']
100     channels = (
101         {'id': 'owr', 'name': 'OWR', 'desc': '1-Wire signal line'},
102     )
103     options = (
104         {'id': 'overdrive', 'desc': 'Start in overdrive speed',
105             'default': 'no', 'values': ('yes', 'no')},
106     )
107     annotations = (
108         ('bit', 'Bit'),
109         ('warnings', 'Warnings'),
110         ('reset', 'Reset'),
111         ('presence', 'Presence'),
112         ('overdrive', 'Overdrive speed notifications'),
113     )
114     annotation_rows = (
115         ('bits', 'Bits', (0, 2, 3)),
116         ('info', 'Info', (4,)),
117         ('warnings', 'Warnings', (1,)),
118     )
119
120     def __init__(self):
121         self.reset()
122
123     def reset(self):
124         self.samplerate = None
125         self.state = 'INITIAL'
126         self.present = 0
127         self.bit = 0
128         self.bit_count = -1
129         self.command = 0
130         self.overdrive = False
131         self.fall = 0
132         self.rise = 0
133
134     def start(self):
135         self.out_python = self.register(srd.OUTPUT_PYTHON)
136         self.out_ann = self.register(srd.OUTPUT_ANN)
137         self.overdrive = (self.options['overdrive'] == 'yes')
138         self.fall = 0
139         self.rise = 0
140         self.bit_count = -1
141
142     def putm(self, data):
143         self.put(0, 0, self.out_ann, data)
144
145     def putpfs(self, data):
146         self.put(self.fall, self.samplenum, self.out_python, data)
147
148     def putfs(self, data):
149         self.put(self.fall, self.samplenum, self.out_ann, data)
150
151     def putfr(self, data):
152         self.put(self.fall, self.rise, self.out_ann, data)
153
154     def putprs(self, data):
155         self.put(self.rise, self.samplenum, self.out_python, data)
156
157     def putrs(self, data):
158         self.put(self.rise, self.samplenum, self.out_ann, data)
159
160     def checks(self):
161         # Check if samplerate is appropriate.
162         if self.options['overdrive'] == 'yes':
163             if self.samplerate < 2000000:
164                 self.putm([1, ['Sampling rate is too low. Must be above ' +
165                                '2MHz for proper overdrive mode decoding.']])
166             elif self.samplerate < 5000000:
167                 self.putm([1, ['Sampling rate is suggested to be above 5MHz ' +
168                                'for proper overdrive mode decoding.']])
169         else:
170             if self.samplerate < 400000:
171                 self.putm([1, ['Sampling rate is too low. Must be above ' +
172                                '400kHz for proper normal mode decoding.']])
173             elif self.samplerate < 1000000:
174                 self.putm([1, ['Sampling rate is suggested to be above ' +
175                                '1MHz for proper normal mode decoding.']])
176
177     def metadata(self, key, value):
178         if key != srd.SRD_CONF_SAMPLERATE:
179             return
180         self.samplerate = value
181
182     def wait_falling_timeout(self, start, t):
183         # Wait until either a falling edge is seen, and/or the specified
184         # number of samples have been skipped (i.e. time has passed).
185         cnt = int((t[self.overdrive] / 1000000.0) * self.samplerate)
186         samples_to_skip = (start + cnt) - self.samplenum
187         samples_to_skip = samples_to_skip if (samples_to_skip > 0) else 0
188         return self.wait([{0: 'f'}, {'skip': samples_to_skip}])
189
190     def decode(self):
191         if not self.samplerate:
192             raise SamplerateError('Cannot decode without samplerate.')
193         self.checks()
194         while True:
195             # State machine.
196             if self.state == 'INITIAL': # Unknown initial state.
197                 # Wait until we reach the idle high state.
198                 self.wait({0: 'h'})
199                 self.rise = self.samplenum
200                 self.state = 'IDLE'
201             elif self.state == 'IDLE': # Idle high state.
202                 # Wait for falling edge.
203                 self.wait({0: 'f'})
204                 self.fall = self.samplenum
205                 # Get time since last rising edge.
206                 time = ((self.fall - self.rise) / self.samplerate) * 1000000.0
207                 if self.rise > 0 and \
208                     time < timing['REC']['min'][self.overdrive]:
209                     self.putfr([1, ['Recovery time not long enough'
210                         'Recovery too short',
211                         'REC < ' + str(timing['REC']['min'][self.overdrive])]])
212                 # A reset pulse or slot can start on a falling edge.
213                 self.state = 'LOW'
214                 # TODO: Check minimum recovery time.
215             elif self.state == 'LOW': # Reset pulse or slot.
216                 # Wait for rising edge.
217                 self.wait({0: 'r'})
218                 self.rise = self.samplenum
219                 # Detect reset or slot base on timing.
220                 time = ((self.rise - self.fall) / self.samplerate) * 1000000.0
221                 if time >= timing['RSTL']['min'][False]: # Normal reset pulse.
222                     if time > timing['RSTL']['max'][False]:
223                         self.putfr([1, ['Too long reset pulse might mask interrupt ' +
224                             'signalling by other devices',
225                             'Reset pulse too long',
226                             'RST > ' + str(timing['RSTL']['max'][False])]])
227                     # Regular reset pulse clears overdrive speed.
228                     if self.overdrive:
229                         self.putfr([4, ['Exiting overdrive mode', 'Overdrive off']])
230                     self.overdrive = False
231                     self.putfr([2, ['Reset', 'Rst', 'R']])
232                     self.state = 'PRESENCE DETECT HIGH'
233                 elif self.overdrive == True and \
234                     time >= timing['RSTL']['min'][self.overdrive] and \
235                     time < timing['RSTL']['max'][self.overdrive]:
236                     # Overdrive reset pulse.
237                     self.putfr([2, ['Reset', 'Rst', 'R']])
238                     self.state = 'PRESENCE DETECT HIGH'
239                 elif time < timing['SLOT']['max'][self.overdrive]:
240                     # Read/write time slot.
241                     if time < timing['LOWR']['min'][self.overdrive]:
242                         self.putfr([1, ['Low signal not long enough',
243                             'Low too short',
244                             'LOW < ' + str(timing['LOWR']['min'][self.overdrive])]])
245                     if time < timing['LOWR']['max'][self.overdrive]:
246                         self.bit = 1 # Short pulse is a 1 bit.
247                     else:
248                         self.bit = 0 # Long pulse is a 0 bit.
249                     # Wait for end of slot.
250                     self.state = 'SLOT'
251                 else:
252                     # Timing outside of known states.
253                     self.putfr([1, ['Erroneous signal', 'Error', 'Err', 'E']])
254                     self.state = 'IDLE'
255             elif self.state == 'PRESENCE DETECT HIGH': # Wait for slave presence signal.
256                 # Wait for a falling edge and/or presence detect signal.
257                 self.wait_falling_timeout(self.rise, timing['PDH']['max'])
258
259                 # Calculate time since rising edge.
260                 time = ((self.samplenum - self.rise) / self.samplerate) * 1000000.0
261
262                 if self.matched[0] and not self.matched[1]:
263                     # Presence detected.
264                     if time < timing['PDH']['min'][self.overdrive]:
265                         self.putrs([1, ['Presence detect signal is too early',
266                             'Presence detect too early',
267                             'PDH < ' + str(timing['PDH']['min'][self.overdrive])]])
268                     self.fall = self.samplenum
269                     self.state = 'PRESENCE DETECT LOW'
270                 else: # No presence detected.
271                     self.putrs([3, ['Presence: false', 'Presence', 'Pres', 'P']])
272                     self.putprs(['RESET/PRESENCE', False])
273                     self.state = 'IDLE'
274             elif self.state == 'PRESENCE DETECT LOW': # Slave presence signalled.
275                 # Wait for end of presence signal (on rising edge).
276                 self.wait({0: 'r'})
277                 # Calculate time since start of presence signal.
278                 time = ((self.samplenum - self.fall) / self.samplerate) * 1000000.0
279                 if time < timing['PDL']['min'][self.overdrive]:
280                     self.putfs([1, ['Presence detect signal is too short',
281                         'Presence detect too short',
282                         'PDL < ' + str(timing['PDL']['min'][self.overdrive])]])
283                 elif time > timing['PDL']['max'][self.overdrive]:
284                     self.putfs([1, ['Presence detect signal is too long',
285                         'Presence detect too long',
286                         'PDL > ' + str(timing['PDL']['max'][self.overdrive])]])
287                 if time > timing['RSTH']['min'][self.overdrive]:
288                     self.rise = self.samplenum
289                 # Wait for end of presence detect.
290                 self.state = 'PRESENCE DETECT'
291
292             # End states (for additional checks).
293             if self.state == 'SLOT': # Wait for end of time slot.
294                 # Wait for a falling edge and/or end of timeslot.
295                 self.wait_falling_timeout(self.fall, timing['SLOT']['min'])
296
297                 if self.matched[0] and not self.matched[1]:
298                     # Low detected before end of slot.
299                     self.putfs([1, ['Time slot not long enough',
300                         'Slot too short',
301                         'SLOT < ' + str(timing['SLOT']['min'][self.overdrive])]])
302                     # Don't output invalid bit.
303                     self.fall = self.samplenum
304                     self.state = 'LOW'
305                 else: # End of time slot.
306                     # Output bit.
307                     self.putfs([0, ['Bit: %d' % self.bit, '%d' % self.bit]])
308                     self.putpfs(['BIT', self.bit])
309                     # Save command bits.
310                     if self.bit_count >= 0:
311                         self.command += (self.bit << self.bit_count)
312                         self.bit_count += 1
313                     # Check for overdrive ROM command.
314                     if self.bit_count >= 8:
315                         if self.command == 0x3c or self.command == 0x69:
316                             self.overdrive = True
317                             self.put(self.samplenum, self.samplenum,
318                                 self.out_ann,
319                                 [4, ['Entering overdrive mode', 'Overdrive on']])
320                         self.bit_count = -1
321                     self.state = 'IDLE'
322
323             if self.state == 'PRESENCE DETECT':
324                 # Wait for a falling edge and/or end of presence detect.
325                 self.wait_falling_timeout(self.rise, timing['RSTH']['min'])
326
327                 if self.matched[0] and not self.matched[1]:
328                     # Low detected before end of presence detect.
329                     self.putfs([1, ['Presence detect not long enough',
330                         'Presence detect too short',
331                         'RTSH < ' + str(timing['RSTH']['min'][self.overdrive])]])
332                     # Inform about presence detected.
333                     self.putrs([3, ['Slave presence detected', 'Slave present',
334                         'Present', 'P']])
335                     self.putprs(['RESET/PRESENCE', True])
336                     self.fall = self.samplenum
337                     self.state = 'LOW'
338                 else: # End of time slot.
339                     # Inform about presence detected.
340                     self.putrs([3, ['Presence: true', 'Presence', 'Pres', 'P']])
341                     self.putprs(['RESET/PRESENCE', True])
342                     self.rise = self.samplenum
343                     # Start counting the first 8 bits to get the ROM command.
344                     self.bit_count = 0
345                     self.command = 0
346                     self.state = 'IDLE'