##
## This file is part of the libsigrokdecode project.
##
-## Copyright (C) 2012 Iztok Jeras <iztok.jeras@gmail.com>
+## Copyright (C) 2017 Kevin Redon <kingkevin@cuvoodoo.info>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
-# 1-Wire protocol decoder (link layer)
-
import sigrokdecode as srd
+class SamplerateError(Exception):
+ pass
+
+# Timing values in us for the signal at regular and overdrive speed.
+timing = {
+ 'RSTL': {
+ 'min': {
+ False: 480.0,
+ True: 48.0,
+ },
+ 'max': {
+ False: 960.0,
+ True: 80.0,
+ },
+ },
+ 'RSTH': {
+ 'min': {
+ False: 480.0,
+ True: 48.0,
+ },
+ },
+ 'PDH': {
+ 'min': {
+ False: 15.0,
+ True: 2.0,
+ },
+ 'max': {
+ False: 60.0,
+ True: 6.0,
+ },
+ },
+ 'PDL': {
+ 'min': {
+ False: 60.0,
+ True: 8.0,
+ },
+ 'max': {
+ False: 240.0,
+ True: 24.0,
+ },
+ },
+ 'SLOT': {
+ 'min': {
+ False: 60.0,
+ True: 6.0,
+ },
+ 'max': {
+ False: 120.0,
+ True: 16.0,
+ },
+ },
+ 'REC': {
+ 'min': {
+ False: 1.0,
+ True: 1.0,
+ },
+ },
+ 'LOWR': {
+ 'min': {
+ False: 1.0,
+ True: 1.0,
+ },
+ 'max': {
+ False: 15.0,
+ True: 2.0,
+ },
+ },
+}
+
class Decoder(srd.Decoder):
- api_version = 1
+ api_version = 3
id = 'onewire_link'
name = '1-Wire link layer'
longname = '1-Wire serial communication bus (link layer)'
license = 'gplv2+'
inputs = ['logic']
outputs = ['onewire_link']
- probes = [
+ tags = ['Embedded/industrial']
+ channels = (
{'id': 'owr', 'name': 'OWR', 'desc': '1-Wire signal line'},
- ]
- optional_probes = [
- {'id': 'pwr', 'name': 'PWR', 'desc': '1-Wire power supply pin'},
- ]
- options = {
- 'overdrive': ['Overdrive', 1],
- # Time options (specified in number of samplerate periods):
- 'cnt_normal_bit': ['Normal mode sample bit time', 0],
- 'cnt_normal_slot': ['Normal mode data slot time', 0],
- 'cnt_normal_presence': ['Normal mode sample presence time', 0],
- 'cnt_normal_reset': ['Normal mode reset time', 0],
- 'cnt_overdrive_bit': ['Overdrive mode sample bit time', 0],
- 'cnt_overdrive_slot': ['Overdrive mode data slot time', 0],
- 'cnt_overdrive_presence': ['Overdrive mode sample presence time', 0],
- 'cnt_overdrive_reset': ['Overdrive mode reset time', 0],
- }
- annotations = [
- ['Text', 'Human-readable text'],
- ['Warnings', 'Human-readable warnings'],
- ]
+ )
+ options = (
+ {'id': 'overdrive', 'desc': 'Start in overdrive speed',
+ 'default': 'no', 'values': ('yes', 'no')},
+ )
+ annotations = (
+ ('bit', 'Bit'),
+ ('warnings', 'Warnings'),
+ ('reset', 'Reset'),
+ ('presence', 'Presence'),
+ ('overdrive', 'Overdrive speed notifications'),
+ )
+ annotation_rows = (
+ ('bits', 'Bits', (0, 2, 3)),
+ ('info', 'Info', (4,)),
+ ('warnings', 'Warnings', (1,)),
+ )
- def __init__(self, **kwargs):
- self.samplenum = 0
- self.state = 'WAIT FOR FALLING EDGE'
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.samplerate = None
+ self.state = 'INITIAL'
self.present = 0
self.bit = 0
- self.bit_cnt = 0
+ self.bit_count = -1
self.command = 0
- self.overdrive = 0
+ self.overdrive = False
self.fall = 0
self.rise = 0
- def start(self, metadata):
- self.out_proto = self.add(srd.OUTPUT_PROTO, 'onewire_link')
- self.out_ann = self.add(srd.OUTPUT_ANN, 'onewire_link')
+ def start(self):
+ self.out_python = self.register(srd.OUTPUT_PYTHON)
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.overdrive = (self.options['overdrive'] == 'yes')
+ self.fall = 0
+ self.rise = 0
+ self.bit_count = -1
- self.samplerate = metadata['samplerate']
+ def putm(self, data):
+ self.put(0, 0, self.out_ann, data)
- # Check if samplerate is appropriate.
- if self.options['overdrive']:
- if self.samplerate < 2000000:
- self.put(0, 0, self.out_ann, [1,
- ['ERROR: Sampling rate is too low. Must be above 2MHz ' +
- 'for proper overdrive mode decoding.']])
- elif self.samplerate < 5000000:
- self.put(0, 0, self.out_ann, [1,
- ['WARNING: Sampling rate is suggested to be above 5MHz ' +
- 'for proper overdrive mode decoding.']])
- else:
- if self.samplerate < 400000:
- self.put(0, 0, self.out_ann, [1,
- ['ERROR: Sampling rate is too low. Must be above ' +
- '400kHz for proper normal mode decoding.']])
- elif (self.samplerate < 1000000):
- self.put(0, 0, self.out_ann, [1,
- ['WARNING: Sampling rate is suggested to be above ' +
- '1MHz for proper normal mode decoding.']])
+ def putpfs(self, data):
+ self.put(self.fall, self.samplenum, self.out_python, data)
- # The default 1-Wire time base is 30us. This is used to calculate
- # sampling times.
- samplerate = float(self.samplerate)
- if self.options['cnt_normal_bit']:
- self.cnt_normal_bit = self.options['cnt_normal_bit']
- else:
- self.cnt_normal_bit = int(samplerate * 0.000015) - 1 # 15ns
- if self.options['cnt_normal_slot']:
- self.cnt_normal_slot = self.options['cnt_normal_slot']
- else:
- self.cnt_normal_slot = int(samplerate * 0.000060) - 1 # 60ns
- if self.options['cnt_normal_presence']:
- self.cnt_normal_presence = self.options['cnt_normal_presence']
- else:
- self.cnt_normal_presence = int(samplerate * 0.000075) - 1 # 75ns
- if self.options['cnt_normal_reset']:
- self.cnt_normal_reset = self.options['cnt_normal_reset']
- else:
- self.cnt_normal_reset = int(samplerate * 0.000480) - 1 # 480ns
- if self.options['cnt_overdrive_bit']:
- self.cnt_overdrive_bit = self.options['cnt_overdrive_bit']
- else:
- self.cnt_overdrive_bit = int(samplerate * 0.000002) - 1 # 2ns
- if self.options['cnt_overdrive_slot']:
- self.cnt_overdrive_slot = self.options['cnt_overdrive_slot']
- else:
- self.cnt_overdrive_slot = int(samplerate * 0.0000073) - 1 # 6ns+1.3ns
- if self.options['cnt_overdrive_presence']:
- self.cnt_overdrive_presence = self.options['cnt_overdrive_presence']
- else:
- self.cnt_overdrive_presence = int(samplerate * 0.000010) - 1 # 10ns
- if self.options['cnt_overdrive_reset']:
- self.cnt_overdrive_reset = self.options['cnt_overdrive_reset']
- else:
- self.cnt_overdrive_reset = int(samplerate * 0.000048) - 1 # 48ns
-
- # Organize values into lists.
- self.cnt_bit = [self.cnt_normal_bit, self.cnt_overdrive_bit]
- self.cnt_presence = [self.cnt_normal_presence, self.cnt_overdrive_presence]
- self.cnt_reset = [self.cnt_normal_reset, self.cnt_overdrive_reset]
- self.cnt_slot = [self.cnt_normal_slot, self.cnt_overdrive_slot]
+ def putfs(self, data):
+ self.put(self.fall, self.samplenum, self.out_ann, data)
- # Check if sample times are in the allowed range.
+ def putfr(self, data):
+ self.put(self.fall, self.rise, self.out_ann, data)
- time_min = float(self.cnt_normal_bit) / self.samplerate
- time_max = float(self.cnt_normal_bit + 1) / self.samplerate
- if (time_min < 0.000005) or (time_max > 0.000015):
- self.put(0, 0, self.out_ann, [1,
- ['WARNING: The normal mode data sample time interval ' +
- '(%2.1fus-%2.1fus) should be inside (5.0us, 15.0us).'
- % (time_min * 1000000, time_max * 1000000)]])
+ def putprs(self, data):
+ self.put(self.rise, self.samplenum, self.out_python, data)
- time_min = float(self.cnt_normal_presence) / self.samplerate
- time_max = float(self.cnt_normal_presence + 1) / self.samplerate
- if (time_min < 0.0000681) or (time_max > 0.000075):
- self.put(0, 0, self.out_ann, [1,
- ['WARNING: The normal mode presence sample time interval ' +
- '(%2.1fus-%2.1fus) should be inside (68.1us, 75.0us).'
- % (time_min * 1000000, time_max * 1000000)]])
+ def putrs(self, data):
+ self.put(self.rise, self.samplenum, self.out_ann, data)
- time_min = float(self.cnt_overdrive_bit) / self.samplerate
- time_max = float(self.cnt_overdrive_bit + 1) / self.samplerate
- if (time_min < 0.000001) or (time_max > 0.000002):
- self.put(0, 0, self.out_ann, [1,
- ['WARNING: The overdrive mode data sample time interval ' +
- '(%2.1fus-%2.1fus) should be inside (1.0us, 2.0us).'
- % (time_min * 1000000, time_max * 1000000)]])
+ def checks(self):
+ # Check if samplerate is appropriate.
+ if self.options['overdrive'] == 'yes':
+ if self.samplerate < 2000000:
+ self.putm([1, ['Sampling rate is too low. Must be above ' +
+ '2MHz for proper overdrive mode decoding.']])
+ elif self.samplerate < 5000000:
+ self.putm([1, ['Sampling rate is suggested to be above 5MHz ' +
+ 'for proper overdrive mode decoding.']])
+ else:
+ if self.samplerate < 400000:
+ self.putm([1, ['Sampling rate is too low. Must be above ' +
+ '400kHz for proper normal mode decoding.']])
+ elif self.samplerate < 1000000:
+ self.putm([1, ['Sampling rate is suggested to be above ' +
+ '1MHz for proper normal mode decoding.']])
- time_min = float(self.cnt_overdrive_presence) / self.samplerate
- time_max = float(self.cnt_overdrive_presence + 1) / self.samplerate
- if (time_min < 0.0000073) or (time_max > 0.000010):
- self.put(0, 0, self.out_ann, [1,
- ['WARNING: The overdrive mode presence sample time interval ' +
- '(%2.1fus-%2.1fus) should be inside (7.3us, 10.0us).'
- % (time_min*1000000, time_max*1000000)]])
+ def metadata(self, key, value):
+ if key != srd.SRD_CONF_SAMPLERATE:
+ return
+ self.samplerate = value
- def report(self):
- pass
+ def wait_falling_timeout(self, start, t):
+ # Wait until either a falling edge is seen, and/or the specified
+ # number of samples have been skipped (i.e. time has passed).
+ cnt = int((t[self.overdrive] / 1000000.0) * self.samplerate)
+ samples_to_skip = (start + cnt) - self.samplenum
+ samples_to_skip = samples_to_skip if (samples_to_skip > 0) else 0
+ return self.wait([{0: 'f'}, {'skip': samples_to_skip}])
- def decode(self, ss, es, data):
- for (self.samplenum, (owr, pwr)) in data:
+ def decode(self):
+ if not self.samplerate:
+ raise SamplerateError('Cannot decode without samplerate.')
+ self.checks()
+ while True:
# State machine.
- if self.state == 'WAIT FOR FALLING EDGE':
- # The start of a cycle is a falling edge.
- if owr != 0:
- continue
- # Save the sample number for the falling edge.
+ if self.state == 'INITIAL': # Unknown initial state.
+ # Wait until we reach the idle high state.
+ self.wait({0: 'h'})
+ self.rise = self.samplenum
+ self.state = 'IDLE'
+ elif self.state == 'IDLE': # Idle high state.
+ # Wait for falling edge.
+ self.wait({0: 'f'})
self.fall = self.samplenum
- # Go to waiting for sample time.
- self.state = 'WAIT FOR DATA SAMPLE'
- elif self.state == 'WAIT FOR DATA SAMPLE':
- # Sample data bit.
- t = self.samplenum - self.fall
- if t == self.cnt_bit[self.overdrive]:
- self.bit = owr
- self.state = 'WAIT FOR DATA SLOT END'
- elif self.state == 'WAIT FOR DATA SLOT END':
- # A data slot ends in a recovery period, otherwise, this is
- # probably a reset.
- t = self.samplenum - self.fall
- if t != self.cnt_slot[self.overdrive]:
- continue
+ # Get time since last rising edge.
+ time = ((self.fall - self.rise) / self.samplerate) * 1000000.0
+ if self.rise > 0 and \
+ time < timing['REC']['min'][self.overdrive]:
+ self.putfr([1, ['Recovery time not long enough'
+ 'Recovery too short',
+ 'REC < ' + str(timing['REC']['min'][self.overdrive])]])
+ # A reset pulse or slot can start on a falling edge.
+ self.state = 'LOW'
+ # TODO: Check minimum recovery time.
+ elif self.state == 'LOW': # Reset pulse or slot.
+ # Wait for rising edge.
+ self.wait({0: 'r'})
+ self.rise = self.samplenum
+ # Detect reset or slot base on timing.
+ time = ((self.rise - self.fall) / self.samplerate) * 1000000.0
+ if time >= timing['RSTL']['min'][False]: # Normal reset pulse.
+ if time > timing['RSTL']['max'][False]:
+ self.putfr([1, ['Too long reset pulse might mask interrupt ' +
+ 'signalling by other devices',
+ 'Reset pulse too long',
+ 'RST > ' + str(timing['RSTL']['max'][False])]])
+ # Regular reset pulse clears overdrive speed.
+ if self.overdrive:
+ self.putfr([4, ['Exiting overdrive mode', 'Overdrive off']])
+ self.overdrive = False
+ self.putfr([2, ['Reset', 'Rst', 'R']])
+ self.state = 'PRESENCE DETECT HIGH'
+ elif self.overdrive == True and \
+ time >= timing['RSTL']['min'][self.overdrive] and \
+ time < timing['RSTL']['max'][self.overdrive]:
+ # Overdrive reset pulse.
+ self.putfr([2, ['Reset', 'Rst', 'R']])
+ self.state = 'PRESENCE DETECT HIGH'
+ elif time < timing['SLOT']['max'][self.overdrive]:
+ # Read/write time slot.
+ if time < timing['LOWR']['min'][self.overdrive]:
+ self.putfr([1, ['Low signal not long enough',
+ 'Low too short',
+ 'LOW < ' + str(timing['LOWR']['min'][self.overdrive])]])
+ if time < timing['LOWR']['max'][self.overdrive]:
+ self.bit = 1 # Short pulse is a 1 bit.
+ else:
+ self.bit = 0 # Long pulse is a 0 bit.
+ # Wait for end of slot.
+ self.state = 'SLOT'
+ else:
+ # Timing outside of known states.
+ self.putfr([1, ['Erroneous signal', 'Error', 'Err', 'E']])
+ self.state = 'IDLE'
+ elif self.state == 'PRESENCE DETECT HIGH': # Wait for slave presence signal.
+ # Wait for a falling edge and/or presence detect signal.
+ self.wait_falling_timeout(self.rise, timing['PDH']['max'])
- if owr == 0:
- # This seems to be a reset slot, wait for its end.
- self.state = 'WAIT FOR RISING EDGE'
- continue
+ # Calculate time since rising edge.
+ time = ((self.samplenum - self.rise) / self.samplerate) * 1000000.0
- self.put(self.fall, self.samplenum, self.out_ann,
- [0, ['Bit: %d' % self.bit]])
- self.put(self.fall, self.samplenum, self.out_proto,
- ['BIT', self.bit])
+ if self.matched[0] and not self.matched[1]:
+ # Presence detected.
+ if time < timing['PDH']['min'][self.overdrive]:
+ self.putrs([1, ['Presence detect signal is too early',
+ 'Presence detect too early',
+ 'PDH < ' + str(timing['PDH']['min'][self.overdrive])]])
+ self.fall = self.samplenum
+ self.state = 'PRESENCE DETECT LOW'
+ else: # No presence detected.
+ self.putrs([3, ['Presence: false', 'Presence', 'Pres', 'P']])
+ self.putprs(['RESET/PRESENCE', False])
+ self.state = 'IDLE'
+ elif self.state == 'PRESENCE DETECT LOW': # Slave presence signalled.
+ # Wait for end of presence signal (on rising edge).
+ self.wait({0: 'r'})
+ # Calculate time since start of presence signal.
+ time = ((self.samplenum - self.fall) / self.samplerate) * 1000000.0
+ if time < timing['PDL']['min'][self.overdrive]:
+ self.putfs([1, ['Presence detect signal is too short',
+ 'Presence detect too short',
+ 'PDL < ' + str(timing['PDL']['min'][self.overdrive])]])
+ elif time > timing['PDL']['max'][self.overdrive]:
+ self.putfs([1, ['Presence detect signal is too long',
+ 'Presence detect too long',
+ 'PDL > ' + str(timing['PDL']['max'][self.overdrive])]])
+ if time > timing['RSTH']['min'][self.overdrive]:
+ self.rise = self.samplenum
+ # Wait for end of presence detect.
+ self.state = 'PRESENCE DETECT'
- # Checking the first command to see if overdrive mode
- # should be entered.
- if self.bit_cnt <= 8:
- self.command |= (self.bit << self.bit_cnt)
- elif self.bit_cnt == 8 and self.command in [0x3c, 0x69]:
- self.put(self.fall, self.cnt_bit[self.overdrive],
- self.out_ann, [0, ['Entering overdrive mode']])
- # Increment the bit counter.
- self.bit_cnt += 1
- # Wait for next slot.
- self.state = 'WAIT FOR FALLING EDGE'
- elif self.state == 'WAIT FOR RISING EDGE':
- # The end of a cycle is a rising edge.
- if owr != 1:
- continue
+ # End states (for additional checks).
+ if self.state == 'SLOT': # Wait for end of time slot.
+ # Wait for a falling edge and/or end of timeslot.
+ self.wait_falling_timeout(self.fall, timing['SLOT']['min'])
- # Check if this was a reset cycle.
- t = self.samplenum - self.fall
- if t > self.cnt_normal_reset:
- # Save the sample number for the falling edge.
- self.rise = self.samplenum
- self.state = 'WAIT FOR PRESENCE DETECT'
- # Exit overdrive mode.
- if self.overdrive:
- self.put(self.fall, self.cnt_bit[self.overdrive],
- self.out_ann, [0, ['Exiting overdrive mode']])
- self.overdrive = 0
- # Clear command bit counter and data register.
- self.bit_cnt = 0
- self.command = 0
- elif (t > self.cnt_overdrive_reset) and self.overdrive:
- # Save the sample number for the falling edge.
- self.rise = self.samplenum
- self.state = "WAIT FOR PRESENCE DETECT"
- # Otherwise this is assumed to be a data bit.
- else:
- self.state = "WAIT FOR FALLING EDGE"
- elif self.state == 'WAIT FOR PRESENCE DETECT':
- # Sample presence status.
- t = self.samplenum - self.rise
- if t == self.cnt_presence[self.overdrive]:
- self.present = owr
- self.state = 'WAIT FOR RESET SLOT END'
- elif self.state == 'WAIT FOR RESET SLOT END':
- # A reset slot ends in a long recovery period.
- t = self.samplenum - self.rise
- if t != self.cnt_reset[self.overdrive]:
- continue
+ if self.matched[0] and not self.matched[1]:
+ # Low detected before end of slot.
+ self.putfs([1, ['Time slot not long enough',
+ 'Slot too short',
+ 'SLOT < ' + str(timing['SLOT']['min'][self.overdrive])]])
+ # Don't output invalid bit.
+ self.fall = self.samplenum
+ self.state = 'LOW'
+ else: # End of time slot.
+ # Output bit.
+ self.putfs([0, ['Bit: %d' % self.bit, '%d' % self.bit]])
+ self.putpfs(['BIT', self.bit])
+ # Save command bits.
+ if self.bit_count >= 0:
+ self.command += (self.bit << self.bit_count)
+ self.bit_count += 1
+ # Check for overdrive ROM command.
+ if self.bit_count >= 8:
+ if self.command == 0x3c or self.command == 0x69:
+ self.overdrive = True
+ self.put(self.samplenum, self.samplenum,
+ self.out_ann,
+ [4, ['Entering overdrive mode', 'Overdrive on']])
+ self.bit_count = -1
+ self.state = 'IDLE'
- if owr == 0:
- # This seems to be a reset slot, wait for its end.
- self.state = 'WAIT FOR RISING EDGE'
- continue
+ if self.state == 'PRESENCE DETECT':
+ # Wait for a falling edge and/or end of presence detect.
+ self.wait_falling_timeout(self.rise, timing['RSTH']['min'])
- self.put(self.fall, self.samplenum, self.out_ann,
- [0, ['Reset/presence: %s'
- % ('false' if self.present else 'true')]])
- self.put(self.fall, self.samplenum, self.out_proto,
- ['RESET/PRESENCE', not self.present])
- # Wait for next slot.
- self.state = 'WAIT FOR FALLING EDGE'
- else:
- raise Exception('Invalid state: %s' % self.state)
+ if self.matched[0] and not self.matched[1]:
+ # Low detected before end of presence detect.
+ self.putfs([1, ['Presence detect not long enough',
+ 'Presence detect too short',
+ 'RTSH < ' + str(timing['RSTH']['min'][self.overdrive])]])
+ # Inform about presence detected.
+ self.putrs([3, ['Slave presence detected', 'Slave present',
+ 'Present', 'P']])
+ self.putprs(['RESET/PRESENCE', True])
+ self.fall = self.samplenum
+ self.state = 'LOW'
+ else: # End of time slot.
+ # Inform about presence detected.
+ self.putrs([3, ['Presence: true', 'Presence', 'Pres', 'P']])
+ self.putprs(['RESET/PRESENCE', True])
+ self.rise = self.samplenum
+ # Start counting the first 8 bits to get the ROM command.
+ self.bit_count = 0
+ self.command = 0
+ self.state = 'IDLE'