## SOFTWARE.
import sigrokdecode as srd
+from common.srdhelper import bitpack
+
+# Millimeters per inch.
+mm_per_inch = 25.4
class Decoder(srd.Decoder):
api_version = 3
id = 'caliper'
name = 'Caliper'
longname = 'Digital calipers'
- desc = 'Protocol of cheap generic digital calipers'
+ desc = 'Protocol of cheap generic digital calipers.'
license = 'mit'
inputs = ['logic']
outputs = []
channels = (
- {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock line'},
- {'id': 'data', 'name': 'DATA', 'desc': 'Serial data line'},
+ {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock line'},
+ {'id': 'data', 'name': 'DATA', 'desc': 'Serial data line'},
)
options = (
- {'id': 'timeout_ms', 'desc': 'Timeout packet after X ms, 0 to disable', 'default': 10},
- {'id': 'unit', 'desc': 'Convert units', 'default': 'keep', 'values': ('keep', 'mm', 'inch')},
- {'id': 'changes', 'desc': 'Changes only', 'default': 'no', 'values': ('no', 'yes')},
+ {'id': 'timeout_ms', 'desc': 'Packet timeout in ms, 0 to disable',
+ 'default': 10},
+ {'id': 'unit', 'desc': 'Convert units', 'default': 'keep',
+ 'values': ('keep', 'mm', 'inch')},
+ {'id': 'changes', 'desc': 'Changes only', 'default': 'no',
+ 'values': ('no', 'yes')},
)
- tags = ['Analog/digital', 'IC', 'Sensor']
+ tags = ['Analog/digital', 'Sensor']
annotations = (
- ('measurements', 'Measurements'),
- ('warning', 'Warnings'),
+ ('measurement', 'Measurement'),
+ ('warning', 'Warning'),
)
annotation_rows = (
('measurements', 'Measurements', (0,)),
('warnings', 'Warnings', (1,)),
)
- def reset_data(self):
- self.bits = 0
- self.number = 0
- self.flags = 0
-
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
self.reset()
def reset(self):
- self.ss_cmd, self.es_cmd = 0, 0
- self.reset_data()
+ self.ss, self.es = 0, 0
+ self.number_bits = []
+ self.flags_bits = []
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
- #Switch bit order of variable x, which is l bit long
- def bitr(self,x,l):
- return int(bin(x)[2:].zfill(l)[::-1], 2)
+ def putg(self, ss, es, cls, data):
+ self.put(ss, es, self.out_ann, [cls, data])
def decode(self):
- self.last_measurement = None
+ last_sent = None
+ timeout_ms = self.options['timeout_ms']
+ want_unit = self.options['unit']
+ show_all = self.options['changes'] == 'no'
+ wait_cond = [{0: 'r'}]
+ if timeout_ms:
+ snum_per_ms = self.samplerate / 1000
+ timeout_snum = timeout_ms * snum_per_ms
+ wait_cond.append({'skip': round(timeout_snum)})
while True:
- clk, data = self.wait([{0: 'r'},{'skip': round(self.samplerate/1000)}])
- #print([clk,data])
-
- #Timeout after inactivity
- if(self.options['timeout_ms'] > 0):
- if self.samplenum > self.es_cmd + (self.samplerate/(1000/self.options['timeout_ms'])):
- if self.bits > 0:
- self.put(self.ss_cmd, self.samplenum, self.out_ann, [1, ['timeout with %s bits in buffer'%(self.bits),'timeout']])
- self.reset()
-
- #Do nothing if there was timeout without rising clock edge
- if self.matched == (False, True):
+ # Sample data at the rising clock edge. Optionally timeout
+ # after inactivity for a user specified period. Present the
+ # number of unprocessed bits to the user for diagnostics.
+ clk, data = self.wait(wait_cond)
+ if timeout_ms and not self.matched[0]:
+ if self.number_bits or self.flags_bits:
+ count = len(self.number_bits) + len(self.flags_bits)
+ self.putg(self.ss, self.samplenum, 1, [
+ 'timeout with {} bits in buffer'.format(count),
+ 'timeout ({} bits)'.format(count),
+ 'timeout',
+ ])
+ self.reset()
continue
- #Store position of last activity
- self.es_cmd = self.samplenum
-
- #Store position of first bit
- if self.ss_cmd == 0:
- self.ss_cmd = self.samplenum
-
- #Shift in measured number
- if self.bits < 16:
- self.number = (self.number << 1) | (data & 0b1)
- self.bits+=1
+ # Store position of first bit and last activity.
+ # Shift in measured number and flag bits.
+ if not self.ss:
+ self.ss = self.samplenum
+ self.es = self.samplenum
+ if len(self.number_bits) < 16:
+ self.number_bits.append(data)
continue
-
- #Shift in flag bits
- if self.bits < 24:
- self.flags = (self.flags << 1) | (data & 0b1)
- self.bits+=1
- if self.bits < 24:
+ if len(self.flags_bits) < 8:
+ self.flags_bits.append(data)
+ if len(self.flags_bits) < 8:
continue
- #Hooray! We got last bit of data
- self.es_cmd = self.samplenum
-
- #Do actual decoding
-
- #print(format(self.flags, '08b'));
- negative = ((self.flags & 0b00001000) >> 3)
- inch = (self.flags & 0b00000001)
-
- number = self.bitr(self.number, 16)
-
- #print(format(number, '016b'))
-
- if negative > 0:
+ # Get raw values from received data bits. Run the number
+ # conversion, controlled by flags and/or user specs.
+ negative = bool(self.flags_bits[4])
+ is_inch = bool(self.flags_bits[7])
+ number = bitpack(self.number_bits)
+ if negative:
number = -number
-
- inchmm = 25.4 #how many mms in inch
-
- if inch:
- number = number/2000
- if self.options['unit'] == 'mm':
- number *= inchmm
- inch = 0
+ if is_inch:
+ number /= 2000
+ if want_unit == 'mm':
+ number *= mm_per_inch
+ is_inch = False
else:
- number = number/100
- if self.options['unit'] == 'inch':
- number = round(number/inchmm,4)
- inch = 1
-
- units = "in" if inch else "mm"
-
- measurement = (str(number)+units)
- #print(measurement)
-
- if ((self.options['changes'] == 'no') or (self.last_measurement != measurement)):
- self.put(self.ss_cmd, self.es_cmd, self.out_ann, [0, [measurement, str(number)]])
- self.last_measurement = measurement
-
- #Prepare for next packet
+ number /= 100
+ if want_unit == 'inch':
+ number = round(number / mm_per_inch, 4)
+ is_inch = True
+ unit = 'in' if is_inch else 'mm'
+
+ # Construct and emit an annotation.
+ if show_all or (number, unit) != last_sent:
+ self.putg(self.ss, self.es, 0, [
+ '{number}{unit}'.format(**locals()),
+ '{number}'.format(**locals()),
+ ])
+ last_sent = (number, unit)
+
+ # Reset internal state for the start of the next packet.
self.reset()