X-Git-Url: https://sigrok.org/gitweb/?a=blobdiff_plain;f=decoders%2Fcaliper%2Fpd.py;h=20a2a5555d01d735df723d0f0b8ef8c42ab80f92;hb=cc75bd231ee4878c08fed6b352caaf340d84728e;hp=3db047309be104c2223fe8032974dce7495ad7c3;hpb=5ed5cea580fe8bc9405043b216eb420792889283;p=libsigrokdecode.git diff --git a/decoders/caliper/pd.py b/decoders/caliper/pd.py index 3db0473..20a2a55 100644 --- a/decoders/caliper/pd.py +++ b/decoders/caliper/pd.py @@ -22,40 +22,42 @@ ## SOFTWARE. import sigrokdecode as srd +from common.srdhelper import bitpack + +# Millimeters per inch. +mm_per_inch = 25.4 class Decoder(srd.Decoder): api_version = 3 id = 'caliper' name = 'Caliper' longname = 'Digital calipers' - desc = 'Protocol of cheap generic digital calipers' + desc = 'Protocol of cheap generic digital calipers.' license = 'mit' inputs = ['logic'] outputs = [] channels = ( - {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock line'}, - {'id': 'data', 'name': 'DATA', 'desc': 'Serial data line'}, + {'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock line'}, + {'id': 'data', 'name': 'DATA', 'desc': 'Serial data line'}, ) options = ( - {'id': 'timeout_ms', 'desc': 'Timeout packet after X ms, 0 to disable', 'default': 10}, - {'id': 'unit', 'desc': 'Convert units', 'default': 'keep', 'values': ('keep', 'mm', 'inch')}, - {'id': 'changes', 'desc': 'Changes only', 'default': 'no', 'values': ('no', 'yes')}, + {'id': 'timeout_ms', 'desc': 'Packet timeout in ms, 0 to disable', + 'default': 10}, + {'id': 'unit', 'desc': 'Convert units', 'default': 'keep', + 'values': ('keep', 'mm', 'inch')}, + {'id': 'changes', 'desc': 'Changes only', 'default': 'no', + 'values': ('no', 'yes')}, ) - tags = ['Analog/digital', 'IC', 'Sensor'] + tags = ['Analog/digital', 'Sensor'] annotations = ( - ('measurements', 'Measurements'), - ('warning', 'Warnings'), + ('measurement', 'Measurement'), + ('warning', 'Warning'), ) annotation_rows = ( ('measurements', 'Measurements', (0,)), ('warnings', 'Warnings', (1,)), ) - def reset_data(self): - self.bits = 0 - self.number = 0 - self.flags = 0 - def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value @@ -64,90 +66,81 @@ class Decoder(srd.Decoder): self.reset() def reset(self): - self.ss_cmd, self.es_cmd = 0, 0 - self.reset_data() + self.ss, self.es = 0, 0 + self.number_bits = [] + self.flags_bits = [] def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) - #Switch bit order of variable x, which is l bit long - def bitr(self,x,l): - return int(bin(x)[2:].zfill(l)[::-1], 2) + def putg(self, ss, es, cls, data): + self.put(ss, es, self.out_ann, [cls, data]) def decode(self): - self.last_measurement = None + last_sent = None + timeout_ms = self.options['timeout_ms'] + want_unit = self.options['unit'] + show_all = self.options['changes'] == 'no' + wait_cond = [{0: 'r'}] + if timeout_ms: + snum_per_ms = self.samplerate / 1000 + timeout_snum = timeout_ms * snum_per_ms + wait_cond.append({'skip': round(timeout_snum)}) while True: - clk, data = self.wait([{0: 'r'},{'skip': round(self.samplerate/1000)}]) - #print([clk,data]) - - #Timeout after inactivity - if(self.options['timeout_ms'] > 0): - if self.samplenum > self.es_cmd + (self.samplerate/(1000/self.options['timeout_ms'])): - if self.bits > 0: - self.put(self.ss_cmd, self.samplenum, self.out_ann, [1, ['timeout with %s bits in buffer'%(self.bits),'timeout']]) - self.reset() - - #Do nothing if there was timeout without rising clock edge - if self.matched == (False, True): + # Sample data at the rising clock edge. Optionally timeout + # after inactivity for a user specified period. Present the + # number of unprocessed bits to the user for diagnostics. + clk, data = self.wait(wait_cond) + if timeout_ms and not self.matched[0]: + if self.number_bits or self.flags_bits: + count = len(self.number_bits) + len(self.flags_bits) + self.putg(self.ss, self.samplenum, 1, [ + 'timeout with {} bits in buffer'.format(count), + 'timeout ({} bits)'.format(count), + 'timeout', + ]) + self.reset() continue - #Store position of last activity - self.es_cmd = self.samplenum - - #Store position of first bit - if self.ss_cmd == 0: - self.ss_cmd = self.samplenum - - #Shift in measured number - if self.bits < 16: - self.number = (self.number << 1) | (data & 0b1) - self.bits+=1 + # Store position of first bit and last activity. + # Shift in measured number and flag bits. + if not self.ss: + self.ss = self.samplenum + self.es = self.samplenum + if len(self.number_bits) < 16: + self.number_bits.append(data) continue - - #Shift in flag bits - if self.bits < 24: - self.flags = (self.flags << 1) | (data & 0b1) - self.bits+=1 - if self.bits < 24: + if len(self.flags_bits) < 8: + self.flags_bits.append(data) + if len(self.flags_bits) < 8: continue - #Hooray! We got last bit of data - self.es_cmd = self.samplenum - - #Do actual decoding - - #print(format(self.flags, '08b')); - negative = ((self.flags & 0b00001000) >> 3) - inch = (self.flags & 0b00000001) - - number = self.bitr(self.number, 16) - - #print(format(number, '016b')) - - if negative > 0: + # Get raw values from received data bits. Run the number + # conversion, controlled by flags and/or user specs. + negative = bool(self.flags_bits[4]) + is_inch = bool(self.flags_bits[7]) + number = bitpack(self.number_bits) + if negative: number = -number - - inchmm = 25.4 #how many mms in inch - - if inch: - number = number/2000 - if self.options['unit'] == 'mm': - number *= inchmm - inch = 0 + if is_inch: + number /= 2000 + if want_unit == 'mm': + number *= mm_per_inch + is_inch = False else: - number = number/100 - if self.options['unit'] == 'inch': - number = round(number/inchmm,4) - inch = 1 - - units = "in" if inch else "mm" - - measurement = (str(number)+units) - #print(measurement) - - if ((self.options['changes'] == 'no') or (self.last_measurement != measurement)): - self.put(self.ss_cmd, self.es_cmd, self.out_ann, [0, [measurement, str(number)]]) - self.last_measurement = measurement - - #Prepare for next packet + number /= 100 + if want_unit == 'inch': + number = round(number / mm_per_inch, 4) + is_inch = True + unit = 'in' if is_inch else 'mm' + + # Construct and emit an annotation. + if show_all or (number, unit) != last_sent: + self.putg(self.ss, self.es, 0, [ + '{number}{unit}'.format(**locals()), + '{number}'.format(**locals()), + ]) + last_sent = (number, unit) + + # Reset internal state for the start of the next packet. self.reset()