From: atoomnetmarc Date: Mon, 4 Mar 2024 19:36:46 +0000 (+0100) Subject: avr_isp: Add more parts X-Git-Url: https://sigrok.org/gitweb/?p=libsigrokdecode.git;a=commitdiff_plain;h=HEAD;hp=ec5fc441e38e59a57cd508abf420e30fc3c28e0d avr_isp: Add more parts --- diff --git a/configure.ac b/configure.ac index 7a2795f..dc04166 100644 --- a/configure.ac +++ b/configure.ac @@ -100,7 +100,10 @@ SR_PKG_CHECK_SUMMARY([srd_pkglibs_summary]) # first, since usually only that variant will add "-lpython3.8". # https://docs.python.org/3/whatsnew/3.8.html#debug-build-uses-the-same-abi-as-release-build SR_PKG_CHECK([python3], [SRD_PKGLIBS], - [python-3.10-embed], [python-3.9-embed], [python-3.8-embed], [python3-embed], [python-3.8 >= 3.8], [python-3.7 >= 3.7], [python-3.6 >= 3.6], [python-3.5 >= 3.5], [python-3.4 >= 3.4], [python-3.3 >= 3.3], [python-3.2 >= 3.2], [python3 >= 3.2]) + [python-3.12-embed], [python-3.11-embed], + [python-3.10-embed], [python-3.9-embed], [python-3.8-embed], [python3-embed], + [python-3.8 >= 3.8], [python-3.7 >= 3.7], [python-3.6 >= 3.6], [python-3.5 >= 3.5], + [python-3.4 >= 3.4], [python-3.3 >= 3.3], [python-3.2 >= 3.2], [python3 >= 3.2]) AS_IF([test "x$sr_have_python3" = xno], [AC_MSG_ERROR([Cannot find Python 3 development headers.])]) diff --git a/decoders/adf435x/pd.py b/decoders/adf435x/pd.py index c60ed4e..ca61fbc 100644 --- a/decoders/adf435x/pd.py +++ b/decoders/adf435x/pd.py @@ -18,74 +18,102 @@ ## import sigrokdecode as srd +from common.srdhelper import bitpack_lsb def disabled_enabled(v): return ['Disabled', 'Enabled'][v] def output_power(v): - return '%+ddBm' % [-4, -1, 2, 5][v] + return '{:+d}dBm'.format([-4, -1, 2, 5][v]) +# Notes on the implementation: +# - A register's description is an iterable of tuples which contain: +# The starting bit position, the bit count, the name of a field, and +# an optional parser which interprets the field's content. Parser are +# expected to yield a single text string when they exist. Other types +# of output are passed to Python's .format() routine as is. +# - Bit fields' width in registers determines the range of indices in +# table/tuple lookups. Keep the implementation as robust as possible +# during future maintenance. Avoid Python runtime errors when adjusting +# the decoder. regs = { -# reg: name offset width parser - 0: [ - ('FRAC', 3, 12, None), - ('INT', 15, 16, lambda v: 'Not Allowed' if v < 32 else v) - ], - 1: [ - ('MOD', 3, 12, None), - ('Phase', 15, 12, None), - ('Prescalar', 27, 1, lambda v: ['4/5', '8/9'][v]), - ('Phase Adjust', 28, 1, lambda v: ['Off', 'On'][v]), - ], - 2: [ - ('Counter Reset', 3, 1, disabled_enabled), - ('Charge Pump Three-State', 4, 1, disabled_enabled), - ('Power-Down', 5, 1, disabled_enabled), - ('PD Polarity', 6, 1, lambda v: ['Negative', 'Positive'][v]), - ('LDP', 7, 1, lambda v: ['10ns', '6ns'][v]), - ('LDF', 8, 1, lambda v: ['FRAC-N', 'INT-N'][v]), - ('Charge Pump Current Setting', 9, 4, lambda v: '%0.2fmA @ 5.1kΩ' % - [0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50, - 2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00][v]), - ('Double Buffer', 13, 1, disabled_enabled), - ('R Counter', 14, 10, None), - ('RDIV2', 24, 1, disabled_enabled), - ('Reference Doubler', 25, 1, disabled_enabled), - ('MUXOUT', 26, 3, lambda v: - ['Three-State Output', 'DVdd', 'DGND', 'R Counter Output', 'N Divider Output', - 'Analog Lock Detect', 'Digital Lock Detect', 'Reserved'][v]), - ('Low Noise and Low Spur Modes', 29, 2, lambda v: - ['Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode'][v]) - ], - 3: [ - ('Clock Divider', 3, 12, None), - ('Clock Divider Mode', 15, 2, lambda v: - ['Clock Divider Off', 'Fast Lock Enable', 'Resync Enable', 'Reserved'][v]), - ('CSR Enable', 18, 1, disabled_enabled), - ('Charge Cancellation', 21, 1, disabled_enabled), - ('ABP', 22, 1, lambda v: ['6ns (FRAC-N)', '3ns (INT-N)'][v]), - ('Band Select Clock Mode', 23, 1, lambda v: ['Low', 'High'][v]) - ], - 4: [ - ('Output Power', 3, 2, output_power), - ('Output Enable', 5, 1, disabled_enabled), - ('AUX Output Power', 6, 2, output_power), - ('AUX Output Select', 8, 1, lambda v: ['Divided Output', 'Fundamental'][v]), - ('AUX Output Enable', 9, 1, disabled_enabled), - ('MTLD', 10, 1, disabled_enabled), - ('VCO Power-Down', 11, 1, lambda v: - 'VCO Powered ' + ('Down' if v == 1 else 'Up')), - ('Band Select Clock Divider', 12, 8, None), - ('RF Divider Select', 20, 3, lambda v: '÷' + str(2**v)), - ('Feedback Select', 23, 1, lambda v: ['Divided', 'Fundamental'][v]), - ], - 5: [ - ('LD Pin Mode', 22, 2, lambda v: - ['Low', 'Digital Lock Detect', 'Low', 'High'][v]) - ] + # Register description fields: + # offset, width, name, parser. + 0: ( + ( 3, 12, 'FRAC'), + (15, 16, 'INT', + None, lambda v: 'Not Allowed' if v < 23 else None, + ), + ), + 1: ( + ( 3, 12, 'MOD'), + (15, 12, 'Phase'), + (27, 1, 'Prescalar', lambda v: ('4/5', '8/9',)[v]), + (28, 1, 'Phase Adjust', lambda v: ('Off', 'On',)[v]), + ), + 2: ( + ( 3, 1, 'Counter Reset', disabled_enabled), + ( 4, 1, 'Charge Pump Three-State', disabled_enabled), + ( 5, 1, 'Power-Down', disabled_enabled), + ( 6, 1, 'PD Polarity', lambda v: ('Negative', 'Positive',)[v]), + ( 7, 1, 'LDP', lambda v: ('10ns', '6ns',)[v]), + ( 8, 1, 'LDF', lambda v: ('FRAC-N', 'INT-N',)[v]), + ( 9, 4, 'Charge Pump Current Setting', + lambda v: '{curr:0.2f}mA @ 5.1kΩ'.format(curr = ( + 0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50, + 2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00, + )[v])), + (13, 1, 'Double Buffer', disabled_enabled), + (14, 10, 'R Counter'), + (24, 1, 'RDIV2', disabled_enabled), + (25, 1, 'Reference Doubler', disabled_enabled), + (26, 3, 'MUXOUT', + lambda v: '{text}'.format(text = ( + 'Three-State Output', 'DVdd', 'DGND', + 'R Counter Output', 'N Divider Output', + 'Analog Lock Detect', 'Digital Lock Detect', + 'Reserved', + )[v])), + (29, 2, 'Low Noise and Low Spur Modes', + lambda v: '{text}'.format(text = ( + 'Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode', + )[v])), + ), + 3: ( + ( 3, 12, 'Clock Divider'), + (15, 2, 'Clock Divider Mode', + lambda v: '{text}'.format(text = ( + 'Clock Divider Off', 'Fast Lock Enable', + 'Resync Enable', 'Reserved', + )[v])), + (18, 1, 'CSR Enable', disabled_enabled), + (21, 1, 'Charge Cancellation', disabled_enabled), + (22, 1, 'ABP', lambda v: ('6ns (FRAC-N)', '3ns (INT-N)',)[v]), + (23, 1, 'Band Select Clock Mode', lambda v: ('Low', 'High',)[v]), + ), + 4: ( + ( 3, 2, 'Output Power', output_power), + ( 5, 1, 'Output Enable', disabled_enabled), + ( 6, 2, 'AUX Output Power', output_power), + ( 8, 1, 'AUX Output Select', + lambda v: ('Divided Output', 'Fundamental',)[v]), + ( 9, 1, 'AUX Output Enable', disabled_enabled), + (10, 1, 'MTLD', disabled_enabled), + (11, 1, 'VCO Power-Down', + lambda v: 'VCO Powered {ud}'.format(ud = 'Down' if v else 'Up')), + (12, 8, 'Band Select Clock Divider'), + (20, 3, 'RF Divider Select', lambda v: '÷{:d}'.format(2 ** v)), + (23, 1, 'Feedback Select', lambda v: ('Divided', 'Fundamental',)[v]), + ), + 5: ( + (22, 2, 'LD Pin Mode', + lambda v: '{text}'.format(text = ( + 'Low', 'Digital Lock Detect', 'Low', 'High', + )[v])), + ), } -ANN_REG = 0 +( ANN_REG, ANN_WARN, ) = range(2) class Decoder(srd.Decoder): api_version = 3 @@ -100,9 +128,11 @@ class Decoder(srd.Decoder): annotations = ( # Sent from the host to the chip. ('write', 'Register write'), + ('warning', "Warnings"), ) annotation_rows = ( ('writes', 'Register writes', (ANN_REG,)), + ('warnings', 'Warnings', (ANN_WARN,)), ) def __init__(self): @@ -114,31 +144,87 @@ class Decoder(srd.Decoder): def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) + def putg(self, ss, es, cls, data): + self.put(ss, es, self.out_ann, [ cls, data, ]) + def decode_bits(self, offset, width): - return (sum([(1 << i) if self.bits[offset + i][0] else 0 for i in range(width)]), - (self.bits[offset + width - 1][1], self.bits[offset][2])) + '''Extract a bit field. Expects LSB input data.''' + bits = self.bits[offset:][:width] + ss, es = bits[-1][1], bits[0][2] + value = bitpack_lsb(bits, 0) + return ( value, ( ss, es, )) + + def decode_field(self, name, offset, width, parser = None, checker = None): + '''Interpret a bit field. Emits an annotation.''' + # Get the register field's content and position. + val, ( ss, es, ) = self.decode_bits(offset, width) + # Have the field's content formatted, emit an annotation. + formatted = parser(val) if parser else '{}'.format(val) + if formatted is not None: + text = ['{name}: {val}'.format(name = name, val = formatted)] + else: + text = ['{name}'.format(name = name)] + if text: + self.putg(ss, es, ANN_REG, text) + # Have the field's content checked, emit an optional warning. + warn = checker(val) if checker else None + if warn: + text = ['{}'.format(warn)] + self.putg(ss, es, ANN_WARN, text) - def decode_field(self, name, offset, width, parser): - val, pos = self.decode_bits(offset, width) - self.put(pos[0], pos[1], self.out_ann, [ANN_REG, - ['%s: %s' % (name, parser(val) if parser else str(val))]]) - return val + def decode_word(self, ss, es, bits): + '''Interpret a 32bit word after accumulation completes.''' + # SPI transfer content must be exactly one 32bit word. + count = len(self.bits) + if count != 32: + text = [ + 'Frame error: Bit count: want 32, got {}'.format(count), + 'Frame error: Bit count', + 'Frame error', + ] + self.putg(ss, es, ANN_WARN, text) + return + # Holding bits in LSB order during interpretation simplifies + # bit field extraction. And annotation emitting routines expect + # this reverse order of bits' timestamps. + self.bits.reverse() + # Determine which register was accessed. + reg_addr, ( reg_ss, reg_es, ) = self.decode_bits(0, 3) + text = [ + 'Register: {addr}'.format(addr = reg_addr), + 'Reg: {addr}'.format(addr = reg_addr), + '[{addr}]'.format(addr = reg_addr), + ] + self.putg(reg_ss, reg_es, ANN_REG, text) + # Interpret the register's content (when parsers are available). + field_descs = regs.get(reg_addr, None) + if not field_descs: + return + for field_desc in field_descs: + parser = None + checker = None + if len(field_desc) == 3: + start, count, name, = field_desc + elif len(field_desc) == 4: + start, count, name, parser = field_desc + elif len(field_desc) == 5: + start, count, name, parser, checker = field_desc + else: + # Unsupported regs{} syntax, programmer's error. + return + self.decode_field(name, start, count, parser, checker) def decode(self, ss, es, data): + ptype, _, _ = data + + if ptype == 'TRANSFER': + # Process accumulated bits after completion of a transfer. + self.decode_word(ss, es, self.bits) + self.bits.clear() - ptype, data1, data2 = data - - if ptype == 'CS-CHANGE': - if data1 == 1: - if len(self.bits) == 32: - reg_value, reg_pos = self.decode_bits(0, 3) - self.put(reg_pos[0], reg_pos[1], self.out_ann, [ANN_REG, - ['Register: %d' % reg_value, 'Reg: %d' % reg_value, - '[%d]' % reg_value]]) - if reg_value < len(regs): - field_descs = regs[reg_value] - for field_desc in field_descs: - field = self.decode_field(*field_desc) - self.bits = [] if ptype == 'BITS': - self.bits = data1 + self.bits + _, mosi_bits, miso_bits = data + # Accumulate bits in MSB order as they are seen in SPI frames. + msb_bits = mosi_bits.copy() + msb_bits.reverse() + self.bits.extend(msb_bits) diff --git a/decoders/avr_isp/parts.py b/decoders/avr_isp/parts.py index 0767789..fee4d9b 100644 --- a/decoders/avr_isp/parts.py +++ b/decoders/avr_isp/parts.py @@ -22,20 +22,76 @@ # Vendor code vendor_code = { - 0x1e: 'Atmel', + 0x1E: 'Atmel', 0x00: 'Device locked', } # (Part family + flash size, part number) part = { (0x90, 0x01): 'AT90S1200', + (0x90, 0x05): 'ATtiny12', + (0x90, 0x06): 'ATtiny15', + (0x90, 0x07): 'ATtiny13', (0x91, 0x01): 'AT90S2313', + (0x91, 0x02): 'AT90S2323', + (0x91, 0x03): 'AT90S2343', + (0x91, 0x05): 'AT90S2333', + (0x91, 0x06): 'ATtiny22', + (0x91, 0x07): 'ATtiny28', + (0x91, 0x08): 'ATtiny25', + (0x91, 0x09): 'ATtiny26', + (0x91, 0x0A): 'ATtiny2313', + (0x91, 0x0B): 'ATtiny24', + (0x91, 0x0C): 'ATtiny261', (0x92, 0x01): 'AT90S4414', - (0x92, 0x05): 'ATmega48', # 4kB flash + (0x92, 0x03): 'AT90S4433', + (0x92, 0x05): 'ATmega48(A)', + (0x92, 0x06): 'ATtiny45', + (0x92, 0x08): 'ATtiny461', + (0x92, 0x09): 'ATtiny48', + (0x92, 0x0A): 'ATmega48PA', + (0x92, 0x0D): 'ATtiny4313', + (0x92, 0x10): 'ATmega48PB', (0x93, 0x01): 'AT90S8515', - (0x93, 0x0a): 'ATmega88', # 8kB flash - (0x94, 0x06): 'ATmega168', # 16kB flash - (0xff, 0xff): 'Device code erased, or target missing', + (0x93, 0x03): 'AT90S8535', + (0x93, 0x07): 'ATmega8', + (0x93, 0x0A): 'ATmega88(A)', + (0x93, 0x0B): 'ATtiny85', + (0x93, 0x0D): 'ATtiny861', + (0x93, 0x0F): 'ATmega88PA', + (0x93, 0x11): 'ATtiny88', + (0x93, 0x16): 'ATmega88PB', + (0x93, 0x89): 'ATmega8U2', + (0x94, 0x01): 'ATmega161', + (0x94, 0x02): 'ATmega163', + (0x94, 0x03): 'ATmega16', + (0x94, 0x04): 'ATmega162', + (0x94, 0x06): 'ATmega168(A)', + (0x94, 0x0A): 'ATmega164PA', + (0x94, 0x0B): 'ATmega168PA', + (0x94, 0x0F): 'ATmega164A', + (0x94, 0x12): 'ATtiny1634', + (0x94, 0x15): 'ATmega168PB', + (0x94, 0x88): 'ATmega16U4', + (0x94, 0x89): 'ATmega16U2', + (0x95, 0x01): 'ATmega32', + (0x95, 0x01): 'ATmega323', + (0x95, 0x0F): 'ATmega328P', + (0x95, 0x11): 'ATmega324PA', + (0x95, 0x14): 'ATmega328', + (0x95, 0x15): 'ATmega324A', + (0x95, 0x87): 'ATmega32U4', + (0x95, 0x8A): 'ATmega32U2', + (0x96, 0x08): 'ATmega640', + (0x96, 0x09): 'ATmega644(A)', + (0x96, 0x0A): 'ATmega644PA', + (0x97, 0x01): 'ATmega103', + (0x97, 0x03): 'ATmega1280', + (0x97, 0x04): 'ATmega1281', + (0x97, 0x05): 'ATmega1284P', + (0x97, 0x06): 'ATmega1284', + (0x98, 0x01): 'ATmega2560', + (0x98, 0x02): 'ATmega2561', + (0xFF, 0xFF): 'Device code erased, or target missing', (0x01, 0x02): 'Device locked', - # TODO: Lots more entries. } diff --git a/decoders/avr_isp/pd.py b/decoders/avr_isp/pd.py index e3af4d6..9e3c5df 100644 --- a/decoders/avr_isp/pd.py +++ b/decoders/avr_isp/pd.py @@ -123,9 +123,12 @@ class Decoder(srd.Decoder): self.part_number = ret[3] self.putx([Ann.RSB2, ['Part number: 0x%02x' % ret[3]]]) - p = part[(self.part_fam_flash_size, self.part_number)] - data = [Ann.DEV, ['Device: Atmel %s' % p]] - self.put(self.ss_device, self.es_cmd, self.out_ann, data) + # Part name if known + key = (self.part_fam_flash_size, self.part_number) + if key in part: + p = part[key] + data = [Ann.DEV, ['Device: Atmel %s' % p]] + self.put(self.ss_device, self.es_cmd, self.out_ann, data) # Sanity check on reply. if ret[1] != 0x30 or ret[2] != self.xx or ret[0] != self.mm: diff --git a/decoders/eeprom24xx/pd.py b/decoders/eeprom24xx/pd.py index 549ee2d..7491f58 100644 --- a/decoders/eeprom24xx/pd.py +++ b/decoders/eeprom24xx/pd.py @@ -17,6 +17,7 @@ ## along with this program; if not, see . ## +import copy import sigrokdecode as srd from .lists import * @@ -416,16 +417,25 @@ class Decoder(srd.Decoder): self.reset_variables() def decode(self, ss, es, data): - self.cmd, self.databyte = data + cmd, _ = data # Collect the 'BITS' packet, then return. The next packet is # guaranteed to belong to these bits we just stored. - if self.cmd == 'BITS': - self.bits = self.databyte + if cmd == 'BITS': + _, databits = data + self.bits = copy.deepcopy(databits) return - # Store the start/end samples of this I²C packet. + # Store the start/end samples of this I²C packet. Deep copy + # caller's data, assuming that implementation details of the + # above complex methods can access the data after returning + # from the .decode() invocation, with the data having become + # invalid by that time of access. This conservative approach + # can get weakened after close inspection of those methods. self.ss, self.es = ss, es + _, databyte = data + databyte = copy.deepcopy(databyte) + self.cmd, self.databyte = cmd, databyte # State machine. s = 'handle_%s' % self.state.lower().replace(' ', '_') diff --git a/decoders/i2c/pd.py b/decoders/i2c/pd.py index 8297662..2259b45 100644 --- a/decoders/i2c/pd.py +++ b/decoders/i2c/pd.py @@ -21,6 +21,7 @@ # TODO: Implement support for inverting SDA/SCL levels (0->1 and 1->0). # TODO: Implement support for detecting various bus errors. +from common.srdhelper import bitpack_msb import sigrokdecode as srd ''' @@ -45,20 +46,24 @@ Packet: command. Slave addresses do not include bit 0 (the READ/WRITE indication bit). For example, a slave address field could be 0x51 (instead of 0xa2). For 'START', 'START REPEAT', 'STOP', 'ACK', and 'NACK' is None. +For 'BITS' is a sequence of tuples of bit values and their start and +stop positions, in LSB first order (although the I2C protocol is MSB first). ''' -# CMD: [annotation-type-index, long annotation, short annotation] +# Meaning of table items: +# command -> [annotation class, annotation text in order of decreasing length] proto = { - 'START': [0, 'Start', 'S'], - 'START REPEAT': [1, 'Start repeat', 'Sr'], - 'STOP': [2, 'Stop', 'P'], - 'ACK': [3, 'ACK', 'A'], - 'NACK': [4, 'NACK', 'N'], - 'BIT': [5, 'Bit', 'B'], - 'ADDRESS READ': [6, 'Address read', 'AR'], - 'ADDRESS WRITE': [7, 'Address write', 'AW'], - 'DATA READ': [8, 'Data read', 'DR'], - 'DATA WRITE': [9, 'Data write', 'DW'], + 'START': [0, 'Start', 'S'], + 'START REPEAT': [1, 'Start repeat', 'Sr'], + 'STOP': [2, 'Stop', 'P'], + 'ACK': [3, 'ACK', 'A'], + 'NACK': [4, 'NACK', 'N'], + 'BIT': [5, '{b:1d}'], + 'ADDRESS READ': [6, 'Address read: {b:02X}', 'AR: {b:02X}', '{b:02X}'], + 'ADDRESS WRITE': [7, 'Address write: {b:02X}', 'AW: {b:02X}', '{b:02X}'], + 'DATA READ': [8, 'Data read: {b:02X}', 'DR: {b:02X}', '{b:02X}'], + 'DATA WRITE': [9, 'Data write: {b:02X}', 'DW: {b:02X}', '{b:02X}'], + 'WARN': [10, '{text}'], } class Decoder(srd.Decoder): @@ -109,15 +114,15 @@ class Decoder(srd.Decoder): def reset(self): self.samplerate = None - self.ss = self.es = self.ss_byte = -1 - self.bitcount = 0 - self.databyte = 0 - self.wr = -1 - self.is_repeat_start = 0 - self.state = 'FIND START' + self.is_write = None + self.rem_addr_bytes = None + self.slave_addr_7 = None + self.slave_addr_10 = None + self.is_repeat_start = False self.pdu_start = None self.pdu_bits = 0 - self.bits = [] + self.data_bits = [] + self.bitwidth = 0 def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: @@ -130,136 +135,210 @@ class Decoder(srd.Decoder): self.out_bitrate = self.register(srd.OUTPUT_META, meta=(int, 'Bitrate', 'Bitrate from Start bit to Stop bit')) - def putx(self, data): - self.put(self.ss, self.es, self.out_ann, data) - - def putp(self, data): - self.put(self.ss, self.es, self.out_python, data) - - def putb(self, data): - self.put(self.ss, self.es, self.out_binary, data) - - def handle_start(self, pins): - self.ss, self.es = self.samplenum, self.samplenum - self.pdu_start = self.samplenum - self.pdu_bits = 0 - cmd = 'START REPEAT' if (self.is_repeat_start == 1) else 'START' - self.putp([cmd, None]) - self.putx([proto[cmd][0], proto[cmd][1:]]) - self.state = 'FIND ADDRESS' - self.bitcount = self.databyte = 0 - self.is_repeat_start = 1 - self.wr = -1 - self.bits = [] + def putg(self, ss, es, cls, text): + self.put(ss, es, self.out_ann, [cls, text]) + + def putp(self, ss, es, data): + self.put(ss, es, self.out_python, data) + + def putb(self, ss, es, data): + self.put(ss, es, self.out_binary, data) + + def _wants_start(self): + # Check whether START is required (to sync to the input stream). + return self.pdu_start is None + + def _collects_address(self): + # Check whether the transfer still is in the address phase (is + # still collecting address and r/w details, or has not started + # collecting it). + return self.rem_addr_bytes is None or self.rem_addr_bytes != 0 + + def _collects_byte(self): + # Check whether bits of a byte are being collected. Outside of + # the data byte, the bit is the ACK/NAK slot. + return self.data_bits is None or len(self.data_bits) < 8 + + def handle_start(self, ss, es): + if self.is_repeat_start: + cmd = 'START REPEAT' + else: + cmd = 'START' + self.pdu_start = ss + self.pdu_bits = 0 + self.putp(ss, es, [cmd, None]) + cls, texts = proto[cmd][0], proto[cmd][1:] + self.putg(ss, es, cls, texts) + self.is_repeat_start = True + self.is_write = None + self.slave_addr_7 = None + self.slave_addr_10 = None + self.rem_addr_bytes = None + self.data_bits.clear() + self.bitwidth = 0 # Gather 8 bits of data plus the ACK/NACK bit. - def handle_address_or_data(self, pins): - scl, sda = pins + def handle_address_or_data(self, ss, es, value): self.pdu_bits += 1 - # Address and data are transmitted MSB-first. - self.databyte <<= 1 - self.databyte |= sda - - # Remember the start of the first data/address bit. - if self.bitcount == 0: - self.ss_byte = self.samplenum - - # Store individual bits and their start/end samplenumbers. - # In the list, index 0 represents the LSB (I²C transmits MSB-first). - self.bits.insert(0, [sda, self.samplenum, self.samplenum]) - if self.bitcount > 0: - self.bits[1][2] = self.samplenum - if self.bitcount == 7: - self.bitwidth = self.bits[1][2] - self.bits[2][2] - self.bits[0][2] += self.bitwidth - - # Return if we haven't collected all 8 + 1 bits, yet. - if self.bitcount < 7: - self.bitcount += 1 + # Accumulate a byte's bits, including its start position. + # Accumulate individual bits and their start/end sample numbers + # as we see them. Get the start sample number at the time when + # the bit value gets sampled. Assume the start of the next bit + # as the end sample number of the previous bit. Guess the last + # bit's end sample number from the second last bit's width. + # Keep the bits in receive order (MSB first) during accumulation. + # (gsi: Strictly speaking falling SCL would be the end of the + # bit value's validity. That'd break compatibility though.) + if self.data_bits: + self.data_bits[-1][2] = ss + self.data_bits.append([value, ss, es]) + if len(self.data_bits) < 8: return - - d = self.databyte - if self.state == 'FIND ADDRESS': - # The READ/WRITE bit is only in address bytes, not data bytes. - self.wr = 0 if (self.databyte & 1) else 1 - if self.options['address_format'] == 'shifted': - d = d >> 1 - + self.bitwidth = self.data_bits[-2][2] - self.data_bits[-3][2] + self.data_bits[-1][2] = self.data_bits[-1][1] + self.bitwidth + + # Get the byte value. Address and data are transmitted MSB-first. + d = bitpack_msb(self.data_bits, 0) + ss_byte, es_byte = self.data_bits[0][1], self.data_bits[-1][2] + + # Process the address bytes at the start of a transfer. The + # first byte will carry the R/W bit, and all of the 7bit address + # or part of a 10bit address. Bit pattern 0b11110xxx signals + # that another byte follows which carries the remaining bits of + # a 10bit slave address. + is_address = self._collects_address() + if is_address: + addr_byte = d + if self.rem_addr_bytes is None: + if (addr_byte & 0xf8) == 0xf0: + self.rem_addr_bytes = 2 + self.slave_addr_7 = None + self.slave_addr_10 = addr_byte & 0x06 + self.slave_addr_10 <<= 7 + else: + self.rem_addr_bytes = 1 + self.slave_addr_7 = addr_byte >> 1 + self.slave_addr_10 = None + has_rw_bit = self.is_write is None + if self.is_write is None: + read_bit = bool(addr_byte & 1) + if self.options['address_format'] == 'shifted': + d >>= 1 + self.is_write = False if read_bit else True + elif self.slave_addr_10 is not None: + self.slave_addr_10 |= addr_byte + else: + cls, texts = proto['WARN'][0], proto['WARN'][1:] + msg = 'Unhandled address byte' + texts = [t.format(text = msg) for t in texts] + self.putg(ss_byte, es_byte, cls, texts) + is_write = self.is_write + is_seven = self.slave_addr_7 is not None + + # Determine annotation classes depending on whether the byte is + # an address or payload data, and whether it's written or read. bin_class = -1 - if self.state == 'FIND ADDRESS' and self.wr == 1: + if is_address and is_write: cmd = 'ADDRESS WRITE' bin_class = 1 - elif self.state == 'FIND ADDRESS' and self.wr == 0: + elif is_address and not is_write: cmd = 'ADDRESS READ' bin_class = 0 - elif self.state == 'FIND DATA' and self.wr == 1: + elif not is_address and is_write: cmd = 'DATA WRITE' bin_class = 3 - elif self.state == 'FIND DATA' and self.wr == 0: + elif not is_address and not is_write: cmd = 'DATA READ' bin_class = 2 - self.ss, self.es = self.ss_byte, self.samplenum + self.bitwidth - - self.putp(['BITS', self.bits]) - self.putp([cmd, d]) - - self.putb([bin_class, bytes([d])]) - - for bit in self.bits: - self.put(bit[1], bit[2], self.out_ann, [5, ['%d' % bit[0]]]) - - if cmd.startswith('ADDRESS'): - self.ss, self.es = self.samplenum, self.samplenum + self.bitwidth - w = ['Write', 'Wr', 'W'] if self.wr else ['Read', 'Rd', 'R'] - self.putx([proto[cmd][0], w]) - self.ss, self.es = self.ss_byte, self.samplenum - - self.putx([proto[cmd][0], ['%s: %02X' % (proto[cmd][1], d), - '%s: %02X' % (proto[cmd][2], d), '%02X' % d]]) - - # Done with this packet. - self.bitcount = self.databyte = 0 - self.bits = [] - self.state = 'FIND ACK' - - def get_ack(self, pins): - scl, sda = pins - self.ss, self.es = self.samplenum, self.samplenum + self.bitwidth - cmd = 'NACK' if (sda == 1) else 'ACK' - self.putp([cmd, None]) - self.putx([proto[cmd][0], proto[cmd][1:]]) - # There could be multiple data bytes in a row, so either find - # another data byte or a STOP condition next. - self.state = 'FIND DATA' - - def handle_stop(self, pins): + # Reverse the list of bits to LSB first order before emitting + # annotations and passing bits to upper layers. This may be + # unexpected because the protocol is MSB first, but it keeps + # backwards compatibility. + lsb_bits = self.data_bits[:] + lsb_bits.reverse() + self.putp(ss_byte, es_byte, ['BITS', lsb_bits]) + self.putp(ss_byte, es_byte, [cmd, d]) + + self.putb(ss_byte, es_byte, [bin_class, bytes([d])]) + + for bit_value, ss_bit, es_bit in lsb_bits: + cls, texts = proto['BIT'][0], proto['BIT'][1:] + texts = [t.format(b = bit_value) for t in texts] + self.putg(ss_bit, es_bit, cls, texts) + + if is_address and has_rw_bit: + # Assign the last bit's location to the R/W annotation. + # Adjust the address value's location to the left. + ss_bit, es_bit = self.data_bits[-1][1], self.data_bits[-1][2] + es_byte = self.data_bits[-2][2] + cls = proto[cmd][0] + w = ['Write', 'Wr', 'W'] if self.is_write else ['Read', 'Rd', 'R'] + self.putg(ss_bit, es_bit, cls, w) + + cls, texts = proto[cmd][0], proto[cmd][1:] + texts = [t.format(b = d) for t in texts] + self.putg(ss_byte, es_byte, cls, texts) + + def get_ack(self, ss, es, value): + ss_bit, es_bit = ss, es + cmd = 'ACK' if value == 0 else 'NACK' + self.putp(ss_bit, es_bit, [cmd, None]) + cls, texts = proto[cmd][0], proto[cmd][1:] + self.putg(ss_bit, es_bit, cls, texts) + # Slave addresses can span one or two bytes, before data bytes + # follow. There can be an arbitrary number of data bytes. Stick + # with getting more address bytes if applicable, or enter or + # remain in the data phase of the transfer otherwise. + if self.rem_addr_bytes: + self.rem_addr_bytes -= 1 + self.data_bits.clear() + + def handle_stop(self, ss, es): # Meta bitrate - if self.samplerate: - elapsed = 1 / float(self.samplerate) * (self.samplenum - self.pdu_start + 1) + if self.samplerate and self.pdu_start: + elapsed = es - self.pdu_start + 1 + elapsed /= self.samplerate bitrate = int(1 / elapsed * self.pdu_bits) - self.put(self.ss_byte, self.samplenum, self.out_bitrate, bitrate) + ss_meta, es_meta = self.pdu_start, es + self.put(ss_meta, es_meta, self.out_bitrate, bitrate) + self.pdu_start = None + self.pdu_bits = 0 cmd = 'STOP' - self.ss, self.es = self.samplenum, self.samplenum - self.putp([cmd, None]) - self.putx([proto[cmd][0], proto[cmd][1:]]) - self.state = 'FIND START' - self.is_repeat_start = 0 - self.wr = -1 - self.bits = [] + self.putp(ss, es, [cmd, None]) + cls, texts = proto[cmd][0], proto[cmd][1:] + self.putg(ss, es, cls, texts) + self.is_repeat_start = False + self.is_write = None + self.data_bits.clear() def decode(self): + # Check for several bus conditions. Determine sample numbers + # here and pass ss, es, and bit values to handling routines. while True: # State machine. - if self.state == 'FIND START': + # BEWARE! This implementation expects to see valid traffic, + # is rather picky in which phase which symbols get handled. + # This attempts to support severely undersampled captures, + # which a previous implementation happened to read instead + # of rejecting the inadequate input data. + # NOTE that handling bits at the start of their validity, + # and assuming that they remain valid until the next bit + # starts, is also done for backwards compatibility. + if self._wants_start(): # Wait for a START condition (S): SCL = high, SDA = falling. - self.handle_start(self.wait({0: 'h', 1: 'f'})) - elif self.state == 'FIND ADDRESS': + pins = self.wait({0: 'h', 1: 'f'}) + ss, es = self.samplenum, self.samplenum + self.handle_start(ss, es) + elif self._collects_address() and self._collects_byte(): # Wait for a data bit: SCL = rising. - self.handle_address_or_data(self.wait({0: 'r'})) - elif self.state == 'FIND DATA': + pins = self.wait({0: 'r'}) + _, sda = pins + ss, es = self.samplenum, self.samplenum + self.bitwidth + self.handle_address_or_data(ss, es, sda) + elif self._collects_byte(): # Wait for any of the following conditions (or combinations): # a) Data sampling of receiver: SCL = rising, and/or # b) START condition (S): SCL = high, SDA = falling, and/or @@ -268,11 +347,18 @@ class Decoder(srd.Decoder): # Check which of the condition(s) matched and handle them. if self.matched[0]: - self.handle_address_or_data(pins) + _, sda = pins + ss, es = self.samplenum, self.samplenum + self.bitwidth + self.handle_address_or_data(ss, es, sda) elif self.matched[1]: - self.handle_start(pins) + ss, es = self.samplenum, self.samplenum + self.handle_start(ss, es) elif self.matched[2]: - self.handle_stop(pins) - elif self.state == 'FIND ACK': + ss, es = self.samplenum, self.samplenum + self.handle_stop(ss, es) + else: # Wait for a data/ack bit: SCL = rising. - self.get_ack(self.wait({0: 'r'})) + pins = self.wait({0: 'r'}) + _, sda = pins + ss, es = self.samplenum, self.samplenum + self.bitwidth + self.get_ack(ss, es, sda) diff --git a/decoders/i2cfilter/pd.py b/decoders/i2cfilter/pd.py index a54baab..877c467 100644 --- a/decoders/i2cfilter/pd.py +++ b/decoders/i2cfilter/pd.py @@ -18,8 +18,12 @@ ## along with this program; if not, see . ## -# TODO: Support for filtering out multiple slave/direction pairs? +# TODO +# - Accept other slave address forms than decimal numbers? +# - Support for filtering out multiple slave/direction pairs? +# - Support 10bit slave addresses? +import copy import sigrokdecode as srd class Decoder(srd.Decoder): @@ -43,51 +47,60 @@ class Decoder(srd.Decoder): self.reset() def reset(self): - self.curslave = -1 - self.curdirection = None - self.packets = [] # Local cache of I²C packets + self.seen_packets = [] + self.do_forward = None def start(self): self.out_python = self.register(srd.OUTPUT_PYTHON, proto_id='i2c') if self.options['address'] not in range(0, 127 + 1): raise Exception('Invalid slave (must be 0..127).') + self.want_addrs = [] + if self.options['address']: + self.want_addrs.append(self.options['address']) + self.want_dir = { + 'read': 'READ', 'write': 'WRITE', + }.get(self.options['direction'], None) - # Grab I²C packets into a local cache, until an I²C STOP condition - # packet comes along. At some point before that STOP condition, there - # will have been an ADDRESS READ or ADDRESS WRITE which contains the - # I²C address of the slave that the master wants to talk to. - # If that slave shall be filtered, output the cache (all packets from - # START to STOP) as proto 'i2c', otherwise drop it. - def decode(self, ss, es, data): + def _need_to_forward(self, slave_addr, direction): + if self.want_addrs and slave_addr not in self.want_addrs: + return False + if self.want_dir and direction != self.want_dir: + return False + return True - cmd, databyte = data + # Accumulate observed I2C packets until a STOP or REPEATED START + # condition is seen. These are conditions where transfers end or + # where direction potentially changes. Forward all previously + # accumulated traffic if it passes the slave address and direction + # filter. This assumes that the slave address as well as the read + # or write direction was part of the observed traffic. There should + # be no surprise when incomplete traffic does not match the filter + # condition. + def decode(self, ss, es, data): - # Add the I²C packet to our local cache. - self.packets.append([ss, es, data]) + # Unconditionally accumulate every lower layer packet we see. + # Keep deep copies for later, only reference caller's values + # as long as this .decode() invocation executes. + self.seen_packets.append([ss, es, copy.deepcopy(data)]) + cmd, _ = data + # Check the slave address and transfer direction early when + # we see them. Keep accumulating packets while it's already + # known here whether to forward them. This simplifies other + # code paths. Including future handling of 10bit addresses. if cmd in ('ADDRESS READ', 'ADDRESS WRITE'): - self.curslave = databyte - self.curdirection = cmd[8:].lower() - elif cmd in ('STOP', 'START REPEAT'): - # If this chunk was not for the correct slave, drop it. - if self.options['address'] == 0: - pass - elif self.curslave != self.options['address']: - self.packets = [] - return - - # If this chunk was not in the right direction, drop it. - if self.options['direction'] == 'both': - pass - elif self.options['direction'] != self.curdirection: - self.packets = [] - return - - # TODO: START->STOP chunks with both read and write (Repeat START) - # Otherwise, send out the whole chunk of I²C packets. - for p in self.packets: - self.put(p[0], p[1], self.out_python, p[2]) + direction = cmd[len('ADDRESS '):] + _, slave_addr = data + self.do_forward = self._need_to_forward(slave_addr, direction) + return - self.packets = [] - else: - pass # Do nothing, only add the I²C packet to our cache. + # Forward previously accumulated packets as we see their + # completion, and when they pass the filter condition. Prepare + # to handle the next transfer (the next read/write part of it). + if cmd in ('STOP', 'START REPEAT'): + if self.do_forward: + for ss, es, data in self.seen_packets: + self.put(ss, es, self.out_python, data) + self.seen_packets.clear() + self.do_forward = None + return diff --git a/decoders/nes_gamepad/pd.py b/decoders/nes_gamepad/pd.py index b276e5d..a393abf 100644 --- a/decoders/nes_gamepad/pd.py +++ b/decoders/nes_gamepad/pd.py @@ -51,24 +51,22 @@ class Decoder(srd.Decoder): def reset(self): self.variant = None - self.ss_block = None - self.es_block = None def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) self.variant = self.options['variant'] - def putx(self, data): - self.put(self.ss_block, self.es_block, self.out_ann, data) + def putg(self, ss, es, cls, text): + self.put(ss, es, self.out_ann, [cls, [text]]) - def handle_data(self, value): - if value == 0xFF: - self.putx([1, ['No button is pressed']]) - return + def handle_data(self, ss, es, value): + if value == 0xff: + self.putg(ss, es, 1, 'No button is pressed') + return if value == 0x00: - self.putx([2, ['Gamepad is not connected']]) - return + self.putg(ss, es, 2, 'Gamepad is not connected') + return buttons = [ 'A', @@ -78,28 +76,17 @@ class Decoder(srd.Decoder): 'North', 'South', 'West', - 'East' + 'East', ] - bits = format(value, '08b') - button_str = '' - - for b in enumerate(bits): - button_index = b[0] - button_is_pressed = b[1] == '0' - - if button_is_pressed: - if button_str != '': - button_str += ' + ' - button_str += buttons[button_index] - - self.putx([0, ['%s' % button_str]]) + bits = '{:08b}'.format(value) + text = [buttons[i] for i, b in enumerate(bits) if b == '0'] + text = ' + '.join(text) + self.putg(ss, es, 0, text) def decode(self, ss, es, data): - ptype, mosi, miso = data - self.ss_block, self.es_block = ss, es - - if ptype != 'DATA': - return - - self.handle_data(miso) + ptype, _, _ = data + if ptype == 'DATA': + _, _, miso = data + self.handle_data(ss, es, miso) + return diff --git a/decoders/parallel/pd.py b/decoders/parallel/pd.py index 96741e7..1e31208 100644 --- a/decoders/parallel/pd.py +++ b/decoders/parallel/pd.py @@ -257,7 +257,10 @@ class Decoder(srd.Decoder): # This results in robust operation for low-oversampled input. in_reset = False while True: - pins = self.wait(conds) + try: + pins = self.wait(conds) + except EOFError as e: + break clock_edge = cond_idx_clock is not None and self.matched[cond_idx_clock] data_edge = cond_idx_data_0 is not None and [idx for idx in range(cond_idx_data_0, cond_idx_data_N) if self.matched[idx]] reset_edge = cond_idx_reset is not None and self.matched[cond_idx_reset] @@ -275,3 +278,8 @@ class Decoder(srd.Decoder): data_bits = data_bits[:num_item_bits] item = bitpack(data_bits) self.handle_bits(self.samplenum, item, num_item_bits) + + self.handle_bits(self.samplenum, None, num_item_bits) + # TODO Determine whether a WARN annotation needs to get emitted. + # The decoder has not seen the end of the last accumulated item. + # Instead it just ran out of input data. diff --git a/decoders/rgb_led_spi/pd.py b/decoders/rgb_led_spi/pd.py index 82877b3..899a64a 100644 --- a/decoders/rgb_led_spi/pd.py +++ b/decoders/rgb_led_spi/pd.py @@ -19,6 +19,8 @@ import sigrokdecode as srd +( ANN_RGB, ) = range(1) + class Decoder(srd.Decoder): api_version = 3 id = 'rgb_led_spi' @@ -37,34 +39,34 @@ class Decoder(srd.Decoder): self.reset() def reset(self): - self.ss_cmd, self.es_cmd = 0, 0 + self.ss_cmd = None self.mosi_bytes = [] def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) - def putx(self, data): - self.put(self.ss_cmd, self.es_cmd, self.out_ann, data) + def putg(self, ss, es, cls, text): + self.put(ss, es, self.out_ann, [cls, text]) def decode(self, ss, es, data): - ptype, mosi, miso = data + ptype = data[0] - # Only care about data packets. + # Grab the payload of three DATA packets. These hold the + # RGB values (in this very order). if ptype != 'DATA': return - self.ss, self.es = ss, es - - if len(self.mosi_bytes) == 0: + _, mosi, _ = data + if not self.mosi_bytes: self.ss_cmd = ss self.mosi_bytes.append(mosi) - - # RGB value == 3 bytes - if len(self.mosi_bytes) != 3: + if len(self.mosi_bytes) < 3: return - red, green, blue = self.mosi_bytes + # Emit annotations. Invalidate accumulated details as soon as + # they were processed, to prepare the next iteration. + ss_cmd, es_cmd = self.ss_cmd, es + self.ss_cmd = None + red, green, blue = self.mosi_bytes[:3] + self.mosi_bytes.clear() rgb_value = int(red) << 16 | int(green) << 8 | int(blue) - - self.es_cmd = es - self.putx([0, ['#%.6x' % rgb_value]]) - self.mosi_bytes = [] + self.putg(ss_cmd, es_cmd, ANN_RGB, ['#{:06x}'.format(rgb_value)]) diff --git a/decoders/rgb_led_ws281x/pd.py b/decoders/rgb_led_ws281x/pd.py index 43fbce4..099a2ce 100644 --- a/decoders/rgb_led_ws281x/pd.py +++ b/decoders/rgb_led_ws281x/pd.py @@ -17,12 +17,48 @@ ## along with this program; if not, see . ## +# Implementor's notes on the wire format: +# - World Semi vendor, (Adafruit copy of the) datasheet +# https://cdn-shop.adafruit.com/datasheets/WS2812.pdf +# - reset pulse is 50us (or more) of low pin level +# - 24bits per WS281x item, 3x 8bits, MSB first, GRB sequence, +# cascaded WS281x items, all "excess bits" are passed through +# - bit time starts with high period, continues with low period, +# high to low periods' ratio determines bit value, datasheet +# mentions 0.35us/0.8us for value 0, 0.7us/0.6us for value 1 +# (huge 150ns tolerances, un-even 0/1 value length, hmm) +# - experience suggests the timing "is variable", rough estimation +# often is good enough, microcontroller firmware got away with +# four quanta per bit time, or even with three quanta (30%/60%), +# Adafruit learn article suggests 1.2us total and 0.4/0.8 or +# 0.8/0.4 high/low parts, four quanta are easier to handle when +# the bit stream is sent via SPI to avoid MCU bit banging and its +# inaccurate timing (when interrupts are used in the firmware) +# - RGBW datasheet (Adafruit copy) for SK6812 +# https://cdn-shop.adafruit.com/product-files/2757/p2757_SK6812RGBW_REV01.pdf +# also 1.2us total, shared across 0.3/0.9 for 0, 0.6/0.6 for 1, +# 80us reset pulse, R8/G8/B8/W8 format per 32bits +# - WS2815, RGB LED, uses GRB wire format, 280us RESET pulse width +# - more vendors and models available and in popular use, +# suggests "one third" or "two thirds" ratio would be most robust, +# sample "a little before" the bit half? reset pulse width may need +# to become an option? matrices and/or fast refresh environments +# may want to experiment with back to back pixel streams + import sigrokdecode as srd -from functools import reduce +from common.srdhelper import bitpack_msb class SamplerateError(Exception): pass +class DecoderError(Exception): + pass + +( + ANN_BIT, ANN_RESET, ANN_RGB, + ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W, +) = range(7) + class Decoder(srd.Decoder): api_version = 3 id = 'rgb_led_ws281x' @@ -40,14 +76,22 @@ class Decoder(srd.Decoder): ('bit', 'Bit'), ('reset', 'RESET'), ('rgb', 'RGB'), + ('r', 'R'), + ('g', 'G'), + ('b', 'B'), + ('w', 'W'), ) annotation_rows = ( - ('bits', 'Bits', (0, 1)), - ('rgb-vals', 'RGB values', (2,)), + ('bits', 'Bits', (ANN_BIT, ANN_RESET,)), + ('rgb-comps', 'RGB components', (ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,)), + ('rgb-vals', 'RGB values', (ANN_RGB,)), ) options = ( - {'id': 'type', 'desc': 'RGB or RGBW', 'default': 'RGB', - 'values': ('RGB', 'RGBW')}, + {'id': 'wireorder', 'desc': 'colour components order (wire)', + 'default': 'GRB', + 'values': ('BGR', 'BRG', 'GBR', 'GRB', 'RBG', 'RGB', 'RWBG', 'RGBW')}, + {'id': 'textorder', 'desc': 'components output order (text)', + 'default': 'RGB[W]', 'values': ('wire', 'RGB[W]', 'RGB', 'RGBW', 'RGWB')}, ) def __init__(self): @@ -55,12 +99,7 @@ class Decoder(srd.Decoder): def reset(self): self.samplerate = None - self.oldpin = None - self.ss_packet = None - self.ss = None - self.es = None self.bits = [] - self.inreset = False def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) @@ -69,80 +108,160 @@ class Decoder(srd.Decoder): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value - def handle_bits(self, samplenum): - if self.options['type'] == 'RGB': - if len(self.bits) == 24: - grb = reduce(lambda a, b: (a << 1) | b, self.bits) - rgb = (grb & 0xff0000) >> 8 | (grb & 0x00ff00) << 8 | (grb & 0x0000ff) - self.put(self.ss_packet, samplenum, self.out_ann, - [2, ['#%06x' % rgb]]) - self.bits = [] - self.ss_packet = None + def putg(self, ss, es, cls, text): + self.put(ss, es, self.out_ann, [cls, text]) + + def handle_bits(self): + if len(self.bits) < self.need_bits: + return + ss_packet, es_packet = self.bits[0][1], self.bits[-1][2] + r, g, b, w = 0, 0, 0, None + comps = [] + for i, c in enumerate(self.wireformat): + first_idx, after_idx = 8 * i, 8 * i + 8 + comp_bits = self.bits[first_idx:after_idx] + comp_ss, comp_es = comp_bits[0][1], comp_bits[-1][2] + comp_value = bitpack_msb(comp_bits, 0) + comp_text = '{:02x}'.format(comp_value) + comp_ann = { + 'r': ANN_COMP_R, 'g': ANN_COMP_G, + 'b': ANN_COMP_B, 'w': ANN_COMP_W, + }.get(c.lower(), None) + comp_item = (comp_ss, comp_es, comp_ann, comp_value, comp_text) + comps.append(comp_item) + if c.lower() == 'r': + r = comp_value + elif c.lower() == 'g': + g = comp_value + elif c.lower() == 'b': + b = comp_value + elif c.lower() == 'w': + w = comp_value + wt = '' if w is None else '{:02x}'.format(w) + if self.textformat == 'wire': + rgb_text = '#' + ''.join([c[-1] for c in comps]) else: - if len(self.bits) == 32: - grb = reduce(lambda a, b: (a << 1) | b, self.bits) - rgb = (grb & 0xff0000) >> 8 | (grb & 0x00ff00) << 8 | (grb & 0xff0000ff) - self.put(self.ss_packet, samplenum, self.out_ann, - [2, ['#%08x' % rgb]]) - self.bits = [] - self.ss_packet = None + rgb_text = self.textformat.format(r = r, g = g, b = b, w = w, wt = wt) + for ss_comp, es_comp, cls_comp, value_comp, text_comp in comps: + self.putg(ss_comp, es_comp, cls_comp, [text_comp]) + if rgb_text: + self.putg(ss_packet, es_packet, ANN_RGB, [rgb_text]) + self.bits.clear() + + def handle_bit(self, ss, es, value, ann_late = False): + if not ann_late: + text = ['{:d}'.format(value)] + self.putg(ss, es, ANN_BIT, text) + item = (value, ss, es) + self.bits.append(item) + self.handle_bits() + if ann_late: + text = ['{:d}'.format(value)] + self.putg(ss, es, ANN_BIT, text) def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') - while True: - # TODO: Come up with more appropriate self.wait() conditions. - (pin,) = self.wait() - - if self.oldpin is None: - self.oldpin = pin - continue - - # Check RESET condition (manufacturer recommends 50 usec minimal, - # but real minimum is ~10 usec). - if not self.inreset and not pin and self.es is not None and \ - self.ss is not None and \ - (self.samplenum - self.es) / self.samplerate > 50e-6: - - # Decode last bit value. - tH = (self.es - self.ss) / self.samplerate - bit_ = True if tH >= 625e-9 else False - - self.bits.append(bit_) - self.handle_bits(self.es) - - self.put(self.ss, self.es, self.out_ann, [0, ['%d' % bit_]]) - self.put(self.es, self.samplenum, self.out_ann, - [1, ['RESET', 'RST', 'R']]) - - self.inreset = True - self.bits = [] - self.ss_packet = None - self.ss = None - - if not self.oldpin and pin: - # Rising edge. - if self.ss and self.es: - period = self.samplenum - self.ss - duty = self.es - self.ss - # Ideal duty for T0H: 33%, T1H: 66%. - bit_ = (duty / period) > 0.5 + # Preprocess options here, to simplify logic which executes + # much later in loops while settings have the same values. + wireorder = self.options['wireorder'].lower() + self.wireformat = [c for c in wireorder if c in 'rgbw'] + self.need_bits = len(self.wireformat) * 8 + textorder = self.options['textorder'].lower() + if textorder == 'wire': + self.textformat = 'wire' + elif textorder == 'rgb[w]': + self.textformat = '#{r:02x}{g:02x}{b:02x}{wt:s}' + else: + self.textformat = { + # "Obvious" permutations of R/G/B. + 'bgr': '#{b:02x}{g:02x}{r:02x}', + 'brg': '#{b:02x}{r:02x}{g:02x}', + 'gbr': '#{g:02x}{b:02x}{r:02x}', + 'grb': '#{g:02x}{r:02x}{b:02x}', + 'rbg': '#{r:02x}{b:02x}{g:02x}', + 'rgb': '#{r:02x}{g:02x}{b:02x}', + # RGB plus White. Only one of them useful? + 'rgbw': '#{r:02x}{g:02x}{b:02x}{w:02x}', + # Weird RGBW permutation for compatibility to test case. + # Neither used RGBW nor the 'wire' order. Obsolete now? + 'rgwb': '#{r:02x}{g:02x}{w:02x}{b:02x}', + }.get(textorder, None) + if self.textformat is None: + raise DecoderError('Unsupported text output format.') - self.put(self.ss, self.samplenum, self.out_ann, - [0, ['%d' % bit_]]) + # Either check for edges which communicate bit values, or for + # long periods of idle level which represent a reset pulse. + # Track the left-most, right-most, and inner edge positions of + # a bit. The positive period's width determines the bit's value. + # Initially synchronize to the input stream by searching for a + # low period, which preceeds a data bit or starts a reset pulse. + # Don't annotate the very first reset pulse, but process it. We + # may not see the right-most edge of a data bit when reset is + # adjacent to that bit time. + cond_bit_starts = {0: 'r'} + cond_inbit_edge = {0: 'f'} + samples_625ns = int(self.samplerate * 625e-9) + samples_50us = round(self.samplerate * 50e-6) + cond_reset_pulse = {'skip': samples_50us + 1} + conds = [cond_bit_starts, cond_inbit_edge, cond_reset_pulse] + ss_bit, inv_bit, es_bit = None, None, None + pin, = self.wait({0: 'l'}) + inv_bit = self.samplenum + check_reset = False + while True: + pin, = self.wait(conds) - self.bits.append(bit_) - self.handle_bits(self.samplenum) + # Check RESET condition. Manufacturers may disagree on the + # minimal pulse width. 50us are recommended in datasheets, + # experiments suggest the limit is around 10us. + # When the RESET pulse is adjacent to the low phase of the + # last bit time, we have no appropriate condition for the + # bit time's end location. That's why this BIT's annotation + # is shorter (only spans the high phase), and the RESET + # annotation immediately follows (spans from the falling edge + # to the end of the minimum RESET pulse width). + if check_reset and self.matched[2]: + es_bit = inv_bit + ss_rst, es_rst = inv_bit, self.samplenum - if self.ss_packet is None: - self.ss_packet = self.samplenum + if ss_bit and inv_bit and es_bit: + # Decode last bit value. Use the last processed bit's + # width for comparison when available. Fallback to an + # arbitrary threshold otherwise (which can result in + # false detection of value 1 for those captures where + # high and low pulses are of similar width). + duty = inv_bit - ss_bit + thres = samples_625ns + if self.bits: + period = self.bits[-1][2] - self.bits[-1][1] + thres = period * 0.5 + bit_value = 1 if duty >= thres else 0 + self.handle_bit(ss_bit, inv_bit, bit_value, True) - self.ss = self.samplenum + if ss_rst and es_rst: + text = ['RESET', 'RST', 'R'] + self.putg(ss_rst, es_rst, ANN_RESET, text) + check_reset = False - elif self.oldpin and not pin: - # Falling edge. - self.inreset = False - self.es = self.samplenum + self.bits.clear() + ss_bit, inv_bit, es_bit = None, None, None - self.oldpin = pin + # Rising edge starts a bit time. Falling edge ends its high + # period. Get the previous bit's duty cycle and thus its + # bit value when the next bit starts. + if self.matched[0]: # and pin: + check_reset = False + if ss_bit and inv_bit: + # Got a previous bit? Handle it. + es_bit = self.samplenum + period = es_bit - ss_bit + duty = inv_bit - ss_bit + # Ideal duty for T0H: 33%, T1H: 66%. + bit_value = 1 if (duty / period) > 0.5 else 0 + self.handle_bit(ss_bit, es_bit, bit_value) + ss_bit, inv_bit, es_bit = self.samplenum, None, None + if self.matched[1]: # and not pin: + check_reset = True + inv_bit = self.samplenum diff --git a/decoders/sae_j1850_vpw/pd.py b/decoders/sae_j1850_vpw/pd.py index fd2389e..3655f96 100644 --- a/decoders/sae_j1850_vpw/pd.py +++ b/decoders/sae_j1850_vpw/pd.py @@ -2,6 +2,7 @@ ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2016 Anthony Symons +## Copyright (C) 2023 Gerhard Sittig ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -18,16 +19,30 @@ ## import sigrokdecode as srd +from common.srdhelper import bitpack_msb + +# VPW Timings. From the SAE J1850 1995 rev section 23.406 documentation. +# Ideal, minimum and maximum tolerances. +VPW_SOF = 200 +VPW_SOFL = 164 +VPW_SOFH = 245 # 240 by the spec, 245 so a 60us 4x sample will pass +VPW_LONG = 128 +VPW_LONGL = 97 +VPW_LONGH = 170 # 164 by the spec but 170 for low sample rate tolerance. +VPW_SHORT = 64 +VPW_SHORTL = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling +VPW_SHORTH = 97 +VPW_IFS = 240 class SamplerateError(Exception): pass -def timeuf(t): - return int (t * 1000.0 * 1000.0) - -class Ann: - ANN_RAW, ANN_SOF, ANN_IFS, ANN_DATA, \ - ANN_PACKET = range(5) +( + ANN_SOF, ANN_BIT, ANN_IFS, ANN_BYTE, + ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM, + ANN_M1_PID, + ANN_WARN, +) = range(12) class Decoder(srd.Decoder): api_version = 3 @@ -43,123 +58,239 @@ class Decoder(srd.Decoder): {'id': 'data', 'name': 'Data', 'desc': 'Data line'}, ) annotations = ( - ('raw', 'Raw'), ('sof', 'SOF'), + ('bit', 'Bit'), ('ifs', 'EOF/IFS'), + ('byte', 'Byte'), + ('prio', 'Priority'), + ('dest', 'Destination'), + ('src', 'Source'), + ('mode', 'Mode'), ('data', 'Data'), - ('packet', 'Packet'), + ('csum', 'Checksum'), + ('m1_pid', 'Pid'), + ('warn', 'Warning'), ) annotation_rows = ( - ('raws', 'Raws', (Ann.ANN_RAW, Ann.ANN_SOF, Ann.ANN_IFS,)), - ('bytes', 'Bytes', (Ann.ANN_DATA,)), - ('packets', 'Packets', (Ann.ANN_PACKET,)), + ('bits', 'Bits', (ANN_SOF, ANN_BIT, ANN_IFS,)), + ('bytes', 'Bytes', (ANN_BYTE,)), + ('fields', 'Fields', (ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM,)), + ('values', 'Values', (ANN_M1_PID,)), + ('warns', 'Warnings', (ANN_WARN,)), ) + # TODO Add support for options? Polarity. Glitch length. def __init__(self): self.reset() def reset(self): - self.state = 'IDLE' self.samplerate = None - self.byte = 0 # the byte offset in the packet - self.mode = 0 # for by packet decode - self.data = 0 # the current byte - self.datastart = 0 # sample number this byte started at - self.csa = 0 # track the last byte seperately to retrospectively add the CS marker - self.csb = 0 - self.count = 0 # which bit number we are up to - self.active = 0 # which logic level is considered active - - # vpw timings. ideal, min and max tollerances. - # From SAE J1850 1995 rev section 23.406 - - self.sof = 200 - self.sofl = 164 - self.sofh = 245 # 240 by the spec, 245 so a 60us 4x sample will pass - self.long = 128 - self.longl = 97 - self.longh = 170 # 164 by the spec but 170 for low sample rate tolerance. - self.short = 64 - self.shortl = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling - self.shorth = 97 - self.ifs = 240 - self.spd = 1 # set to 4 when a 4x SOF is detected (VPW high speed frame) + self.active = 0 # Signal polarity. Needs to become an option? + self.bits = [] + self.fields = {} - def handle_bit(self, ss, es, b): - self.data |= (b << 7-self.count) # MSB-first - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["%d" % b]]) - if self.count == 0: - self.datastart = ss - if self.count == 7: - self.csa = self.datastart # for CS - self.csb = self.samplenum # for CS - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_DATA, ["%02X" % self.data]]) - # add protocol parsing here - if self.byte == 0: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Priority','Prio','P']]) - elif self.byte == 1: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Destination','Dest','D']]) - elif self.byte == 2: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Source','Src','S']]) - elif self.byte == 3: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Mode','M']]) - self.mode = self.data - elif self.mode == 1 and self.byte == 4: # mode 1 payload - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Pid','P']]) - - # prepare for next byte - self.count = -1 - self.data = 0 - self.byte = self.byte + 1 # track packet offset - self.count = self.count + 1 + def start(self): + self.out_ann = self.register(srd.OUTPUT_ANN) def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value - def start(self): - self.out_ann = self.register(srd.OUTPUT_ANN) + def putg(self, ss, es, cls, texts): + self.put(ss, es, self.out_ann, [cls, texts]) + + def invalidate_frame_details(self): + self.bits.clear() + self.fields.clear() + + def handle_databytes(self, fields, data): + # TODO Deep inspection of header fields and data values, including + # checksum verification results. + mode = fields.get('mode', None) + if mode is None: + return + if mode == 1: + # An earlier implementation commented that for mode 1 the + # first data byte would be the PID. But example captures + # have no data bytes in packets for that mode. This position + # is taken by the checksum. Is this correct? + pid = data[0] if data else fields.get('csum', None) + if pid is None: + text = ['PID missing'] + self.putg(ss, es, ANN_WARN, text) + else: + byte_text = '{:02x}'.format(pid) + self.putg(ss, es, ANN_M1_PID, [byte_text]) + + def handle_byte(self, ss, es, b): + # Annotate all raw byte values. Inspect and process the first + # bytes in a frame already. Cease inspection and only accumulate + # all other bytes after the mode. The checksum's position and + # thus the data bytes' span will only be known when EOF or IFS + # were seen. Implementor's note: This method just identifies + # header fields. Processing is left to the .handle_databytes() + # method. Until then validity will have been checked, too (CS). + byte_text = '{:02x}'.format(b) + self.putg(ss, es, ANN_BYTE, [byte_text]) + + if not 'prio' in self.fields: + self.fields.update({'prio': b}) + self.putg(ss, es, ANN_PRIO, [byte_text]) + return + if not 'dest' in self.fields: + self.fields.update({'dest': b}) + self.putg(ss, es, ANN_DEST, [byte_text]) + return + if not 'src' in self.fields: + self.fields.update({'src': b}) + self.putg(ss, es, ANN_SRC, [byte_text]) + return + if not 'mode' in self.fields: + self.fields.update({'mode': b}) + self.putg(ss, es, ANN_MODE, [byte_text]) + return + if not 'data' in self.fields: + self.fields.update({'data': [], 'csum': None}) + self.fields['data'].append((b, ss, es)) + + def handle_sof(self, ss, es, speed): + text = ['{speed:d}x SOF', 'S{speed:d}', 'S'] + text = [f.format(speed = speed) for f in text] + self.putg(ss, es, ANN_SOF, text) + self.invalidate_frame_details() + self.fields.update({'speed': speed}) + + def handle_bit(self, ss, es, b): + self.bits.append((b, ss, es)) + self.putg(ss, es, ANN_BIT, ['{:d}'.format(b)]) + if len(self.bits) < 8: + return + ss, es = self.bits[0][1], self.bits[-1][2] + b = bitpack_msb(self.bits, 0) + self.bits.clear() + self.handle_byte(ss, es, b) + + def handle_eof(self, ss, es, is_ifs = False): + # EOF or IFS were seen. Post process the data bytes sequence. + # Separate the checksum from the data bytes. Emit annotations. + # Pass data bytes and header fields to deeper inspection. + data = self.fields.get('data', {}) + if not data: + text = ['Short data phase', 'Data'] + self.putg(ss, es, ANN_WARN, text) + csum = None + if len(data) >= 1: + csum, ss_csum, es_csum = data.pop() + self.fields.update({'csum': csum}) + # TODO Verify checksum's correctness? + if data: + ss_data, es_data = data[0][1], data[-1][2] + text = ' '.join(['{:02x}'.format(b[0]) for b in data]) + self.putg(ss_data, es_data, ANN_DATA, [text]) + if csum is not None: + text = '{:02x}'.format(csum) + self.putg(ss_csum, es_csum, ANN_CSUM, [text]) + text = ['IFS', 'I'] if is_ifs else ['EOF', 'E'] + self.putg(ss, es, ANN_IFS, text) + self.handle_databytes(self.fields, data); + self.invalidate_frame_details() + + def handle_unknown(self, ss, es): + text = ['Unknown condition', 'Unknown', 'UNK'] + self.putg(ss, es, ANN_WARN, text) + self.invalidate_frame_details() + + def usecs_to_samples(self, us): + us *= 1e-6 + us *= self.samplerate + return int(us) + + def samples_to_usecs(self, n): + n /= self.samplerate + n *= 1000.0 * 1000.0 + return int(n) def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') - self.wait({0: 'e'}) + # Get the distance between edges. Classify the distance + # to derive symbols and data bit values. Prepare waiting + # for an interframe gap as well, while this part of the + # condition is optional (switches in and out at runtime). + conds_edge = {0: 'e'} + conds_edge_only = [conds_edge] + conds_edge_idle = [conds_edge, {'skip': 0}] + conds = conds_edge_only + self.wait(conds) es = self.samplenum + spd = None while True: ss = es - pin, = self.wait({0: 'e'}) + pin, = self.wait(conds) es = self.samplenum + count = es - ss + t = self.samples_to_usecs(count) + + # Synchronization to the next frame. Wait for SOF. + # Silently keep synchronizing until SOF was seen. + if spd is None: + if not self.matched[0]: + continue + if pin != self.active: + continue + + # Detect the frame's speed from the SOF length. Adjust + # the expected BIT lengths to the SOF derived speed. + # Arrange for the additional supervision of EOF/IFS. + if t in range(VPW_SOFL // 1, VPW_SOFH // 1): + spd = 1 + elif t in range(VPW_SOFL // 4, VPW_SOFH // 4): + spd = 4 + else: + continue + short_lower, short_upper = VPW_SHORTL // spd, VPW_SHORTH // spd + long_lower, long_upper = VPW_LONGL // spd, VPW_LONGH // spd + samples = self.usecs_to_samples(VPW_IFS // spd) + conds_edge_idle[-1]['skip'] = samples + conds = conds_edge_idle + + # Emit the SOF annotation. Start collecting DATA. + self.handle_sof(ss, es, spd) + continue + + # Inside the DATA phase. Get data bits. Handle EOF/IFS. + if len(conds) > 1 and self.matched[1]: + # TODO The current implementation gets here after a + # pre-determined minimum wait time. Does not differ + # between EOF and IFS. An earlier implementation had + # this developer note: EOF=239-280 IFS=281+ + self.handle_eof(ss, es) + # Enter the IDLE phase. Wait for the next SOF. + spd = None + conds = conds_edge_only + continue + if t in range(short_lower, short_upper): + value = 1 if pin == self.active else 0 + self.handle_bit(ss, es, value) + continue + if t in range(long_lower, long_upper): + value = 0 if pin == self.active else 1 + self.handle_bit(ss, es, value) + continue - samples = es - ss - t = timeuf(samples / self.samplerate) - if self.state == 'IDLE': # detect and set speed from the size of sof - if pin == self.active and t in range(self.sofl , self.sofh): - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['1X SOF', 'S1', 'S']]) - self.spd = 1 - self.data = 0 - self.count = 0 - self.state = 'DATA' - elif pin == self.active and t in range(int(self.sofl / 4) , int(self.sofh / 4)): - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['4X SOF', 'S4', '4']]) - self.spd = 4 - self.data = 0 - self.count = 0 - self.state = 'DATA' - - elif self.state == 'DATA': - if t >= int(self.ifs / self.spd): - self.state = 'IDLE' - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["EOF/IFS", "E"]]) # EOF=239-280 IFS=281+ - self.put(self.csa, self.csb, self.out_ann, [Ann.ANN_PACKET, ['Checksum','CS','C']]) # retrospective print of CS - self.byte = 0 # reset packet offset - elif t in range(int(self.shortl / self.spd), int(self.shorth / self.spd)): - if pin == self.active: - self.handle_bit(ss, es, 1) - else: - self.handle_bit(ss, es, 0) - elif t in range(int(self.longl / self.spd), int(self.longh / self.spd)): - if pin == self.active: - self.handle_bit(ss, es, 0) - else: - self.handle_bit(ss, es, 1) + # Implementation detail: An earlier implementation used to + # ignore everything that was not handled above. This would + # be motivated by the noisy environment the protocol is + # typically used in. This more recent implementation accepts + # short glitches, but by design falls back to synchronization + # to the input stream for other unhandled conditions. This + # wants to improve usability of the decoder, by presenting + # potential issues to the user. The threshold (microseconds + # between edges that are not valid symbols that are handled + # above) is an arbitrary choice. + if t <= 2: + continue + self.handle_unknown(ss, es) + spd = None + conds = conds_edge_only diff --git a/decoders/spiflash/lists.py b/decoders/spiflash/lists.py index 80ca27d..e31daf7 100644 --- a/decoders/spiflash/lists.py +++ b/decoders/spiflash/lists.py @@ -60,6 +60,7 @@ device_name = { 0x15: 'FM25Q32', }, 'macronix': { + 0x13: 'MX25L8006', 0x14: 'MX25L1605D', 0x15: 'MX25L3205D', 0x16: 'MX25L6405D', @@ -151,6 +152,17 @@ chips = { 'sector_size': 4 * 1024, 'block_size': 64 * 1024, }, + 'macronix_mx25l8006': { + 'vendor': 'Macronix', + 'model': 'MX25L8006', + 'res_id': 0x13, + 'rems_id': 0xc213, + 'rems2_id': 0xc213, + 'rdid_id': 0xc22013, + 'page_size': 256, + 'sector_size': 4 * 1024, + 'block_size': 64 * 1024, + }, # Winbond 'winbond_w25q80dv': { 'vendor': 'Winbond', diff --git a/decoders/st25r39xx_spi/pd.py b/decoders/st25r39xx_spi/pd.py index 1b0df07..a6f55b9 100644 --- a/decoders/st25r39xx_spi/pd.py +++ b/decoders/st25r39xx_spi/pd.py @@ -1,7 +1,7 @@ ## ## This file is part of the libsigrokdecode project. ## -## Copyright (C) 2019-2020 Benjamin Vernoux +## Copyright (C) 2019-2021 Benjamin Vernoux ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -16,6 +16,16 @@ ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## +## v0.1 - 17 September 2019 B.VERNOUX +### Use ST25R3916 Datasheet DS12484 Rev 1 (January 2019) +## v0.2 - 28 April 2020 B.VERNOUX +### Use ST25R3916 Datasheet DS12484 Rev 2 (December 2019) +## v0.3 - 17 June 2020 B.VERNOUX +### Use ST25R3916 Datasheet DS12484 Rev 3 (04 June 2020) +## v0.4 - 10 Aug 2021 B.VERNOUX +### Fix FIFOR/FIFOW issues with Pulseview (with "Tabular Output View") +### because of FIFO Read/FIFO Write commands, was not returning the +### annotations short name FIFOR/FIFOW import sigrokdecode as srd from collections import namedtuple @@ -127,7 +137,7 @@ class Decoder(srd.Decoder): def format_command(self): '''Returns the label for the current command.''' - if self.cmd in ('Write', 'Read', 'WriteB', 'ReadB', 'WriteT', 'ReadT', 'FIFO Write', 'FIFO Read'): + if self.cmd in ('Write', 'Read', 'WriteB', 'ReadB', 'WriteT', 'ReadT', 'FIFOW', 'FIFOR'): return self.cmd if self.cmd == 'Cmd': reg = dir_cmd.get(self.dat, 'Unknown direct command') @@ -182,7 +192,7 @@ class Decoder(srd.Decoder): # Register Space-B Access 0b11111011 0xFB => 'Space B' # Register Test Access 0b11111100 0xFC => 'TestAccess' if b == 0x80: - return ('FIFO Write', b, 1, 99999) + return ('FIFOW', b, 1, 99999) if b == 0xA0: return ('Write', b, 1, 99999) if b == 0xA8: @@ -192,7 +202,7 @@ class Decoder(srd.Decoder): if b == 0xBF: return ('Read', b, 1, 99999) if b == 0x9F: - return ('FIFO Read', b, 1, 99999) + return ('FIFOR', b, 1, 99999) if (b >= 0x0C and b <= 0xE8) : return ('Cmd', b, 0, 0) if b == 0xFB: @@ -273,9 +283,9 @@ class Decoder(srd.Decoder): self.decode_reg(pos, Ann.BURST_WRITET, self.dat, self.mosi_bytes()) elif self.cmd == 'ReadT': self.decode_reg(pos, Ann.BURST_READT, self.dat, self.miso_bytes()) - elif self.cmd == 'FIFO Write': + elif self.cmd == 'FIFOW': self.decode_reg(pos, Ann.FIFO_WRITE, self.dat, self.mosi_bytes()) - elif self.cmd == 'FIFO Read': + elif self.cmd == 'FIFOR': self.decode_reg(pos, Ann.FIFO_READ, self.dat, self.miso_bytes()) elif self.cmd == 'Cmd': self.decode_reg(pos, Ann.DIRECTCMD, self.dat, self.mosi_bytes()) diff --git a/srd.c b/srd.c index 6bff918..10dfaf6 100644 --- a/srd.c +++ b/srd.c @@ -285,9 +285,31 @@ SRD_API int srd_init(const char *path) return ret; } } + env_path = g_getenv("SIGROKDECODE_PATH"); + if (env_path) { + char **dir_list, **dir_iter, *dir_item; + dir_list = g_strsplit(env_path, G_SEARCHPATH_SEPARATOR_S, 0); + for (dir_iter = dir_list; *dir_iter; dir_iter++) { + dir_item = *dir_iter; + if (!dir_item || !*dir_item) + continue; + ret = srd_decoder_searchpath_add(dir_item); + if (ret != SRD_OK) { + Py_Finalize(); + return ret; + } + } + g_strfreev(dir_list); + } - /* Initialize the Python GIL (this also happens to acquire it). */ +#if PY_VERSION_HEX < 0x03090000 + /* + * Initialize and acquire the Python GIL. In Python 3.7+ this + * will be done implicitly as part of the Py_InitializeEx() + * call above. PyEval_InitThreads() was deprecated in 3.9. + */ PyEval_InitThreads(); +#endif /* Release the GIL (ignore return value, we don't need it here). */ (void)PyEval_SaveThread(); diff --git a/type_decoder.c b/type_decoder.c index 6c6eab6..6932cde 100644 --- a/type_decoder.c +++ b/type_decoder.c @@ -67,16 +67,16 @@ static int convert_annotation(struct srd_decoder_inst *di, PyObject *obj, /* Should be a list of [annotation class, [string, ...]]. */ if (!PyList_Check(obj)) { - srd_err("Protocol decoder %s submitted an annotation that" - " is not a list", di->decoder->name); + srd_err("Protocol decoder %s submitted an annotation that is not a list", + di->decoder->name); goto err; } /* Should have 2 elements. */ if (PyList_Size(obj) != 2) { - srd_err("Protocol decoder %s submitted annotation list with " - "%zd elements instead of 2", di->decoder->name, - PyList_Size(obj)); + ssize_t sz = PyList_Size(obj); + srd_err("Protocol decoder %s submitted annotation list with %zd elements instead of 2", + di->decoder->name, sz); goto err; } @@ -86,27 +86,27 @@ static int convert_annotation(struct srd_decoder_inst *di, PyObject *obj, */ py_tmp = PyList_GetItem(obj, 0); if (!PyLong_Check(py_tmp)) { - srd_err("Protocol decoder %s submitted annotation list, but " - "first element was not an integer.", di->decoder->name); + srd_err("Protocol decoder %s submitted annotation list, but first element was not an integer.", + di->decoder->name); goto err; } ann_class = PyLong_AsLong(py_tmp); if (!(pdo = g_slist_nth_data(di->decoder->annotations, ann_class))) { - srd_err("Protocol decoder %s submitted data to unregistered " - "annotation class %d.", di->decoder->name, ann_class); + srd_err("Protocol decoder %s submitted data to unregistered annotation class %d.", + di->decoder->name, ann_class); goto err; } /* Second element must be a list. */ py_tmp = PyList_GetItem(obj, 1); if (!PyList_Check(py_tmp)) { - srd_err("Protocol decoder %s submitted annotation list, but " - "second element was not a list.", di->decoder->name); + srd_err("Protocol decoder %s submitted annotation list, but second element was not a list.", + di->decoder->name); goto err; } if (py_strseq_to_char(py_tmp, &ann_text) != SRD_OK) { - srd_err("Protocol decoder %s submitted annotation list, but " - "second element was malformed.", di->decoder->name); + srd_err("Protocol decoder %s submitted annotation list, but second element was malformed.", + di->decoder->name); goto err; } @@ -235,38 +235,38 @@ static int convert_binary(struct srd_decoder_inst *di, PyObject *obj, /* Should have 2 elements. */ if (PyList_Size(obj) != 2) { - srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list " - "with %zd elements instead of 2", di->decoder->name, - PyList_Size(obj)); + ssize_t sz = PyList_Size(obj); + srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list with %zd elements instead of 2", + di->decoder->name, sz); goto err; } /* The first element should be an integer. */ py_tmp = PyList_GetItem(obj, 0); if (!PyLong_Check(py_tmp)) { - srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, " - "but first element was not an integer.", di->decoder->name); + srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, but first element was not an integer.", + di->decoder->name); goto err; } bin_class = PyLong_AsLong(py_tmp); if (!(class_name = g_slist_nth_data(di->decoder->binary, bin_class))) { - srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with " - "unregistered binary class %d.", di->decoder->name, bin_class); + srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with unregistered binary class %d.", + di->decoder->name, bin_class); goto err; } /* Second element should be bytes. */ py_tmp = PyList_GetItem(obj, 1); if (!PyBytes_Check(py_tmp)) { - srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, " - "but second element was not bytes.", di->decoder->name); + srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, but second element was not bytes.", + di->decoder->name); goto err; } /* Consider an empty set of bytes a bug. */ if (PyBytes_Size(py_tmp) == 0) { - srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY " - "with empty data set.", di->decoder->name); + srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with empty data set.", + di->decoder->name); goto err; } @@ -359,8 +359,8 @@ static int convert_meta(struct srd_proto_data *pdata, PyObject *obj) if (g_variant_type_equal(pdata->pdo->meta_type, G_VARIANT_TYPE_INT64)) { if (!PyLong_Check(obj)) { - PyErr_Format(PyExc_TypeError, "This output was registered " - "as 'int', but something else was passed."); + PyErr_Format(PyExc_TypeError, + "This output was registered as 'int', but something else was passed."); goto err; } intvalue = PyLong_AsLongLong(obj); @@ -369,8 +369,8 @@ static int convert_meta(struct srd_proto_data *pdata, PyObject *obj) pdata->data = g_variant_new_int64(intvalue); } else if (g_variant_type_equal(pdata->pdo->meta_type, G_VARIANT_TYPE_DOUBLE)) { if (!PyFloat_Check(obj)) { - PyErr_Format(PyExc_TypeError, "This output was registered " - "as 'float', but something else was passed."); + PyErr_Format(PyExc_TypeError, + "This output was registered as 'float', but something else was passed."); goto err; } dvalue = PyFloat_AsDouble(obj); @@ -482,9 +482,9 @@ static PyObject *Decoder_put(PyObject *self, PyObject *args) start_sample, end_sample, output_type_name(pdo->output_type), output_id, pdo->proto_id, next_di->inst_id); - if (!(py_res = PyObject_CallMethod( - next_di->py_inst, "decode", "KKO", start_sample, - end_sample, py_data))) { + py_res = PyObject_CallMethod(next_di->py_inst, "decode", + "KKO", start_sample, end_sample, py_data); + if (!py_res) { srd_exception_catch("Calling %s decode() failed", next_di->inst_id); } @@ -1098,6 +1098,11 @@ static PyObject *Decoder_wait(PyObject *self, PyObject *args) * when the sample data is exhausted. */ if (di->communicate_eof) { + /* Advance self.samplenum to the (absolute) last sample number. */ + py_samplenum = PyLong_FromUnsignedLongLong(di->abs_cur_samplenum); + PyObject_SetAttrString(di->py_inst, "samplenum", py_samplenum); + Py_DECREF(py_samplenum); + /* Raise an EOFError Python exception. */ srd_dbg("%s: %s: Raising EOF from wait().", di->inst_id, __func__); g_mutex_unlock(&di->data_mutex); diff --git a/util.c b/util.c index 1e914e3..3a5e336 100644 --- a/util.c +++ b/util.c @@ -115,7 +115,7 @@ err: SRD_PRIV int py_attr_as_strlist(PyObject *py_obj, const char *attr, GSList **outstrlist) { PyObject *py_list; - Py_ssize_t i; + ssize_t idx; int ret; char *outstr; PyGILState_STATE gstate; @@ -139,10 +139,10 @@ SRD_PRIV int py_attr_as_strlist(PyObject *py_obj, const char *attr, GSList **out *outstrlist = NULL; - for (i = 0; i < PyList_Size(py_list); i++) { - ret = py_listitem_as_str(py_list, i, &outstr); + for (idx = 0; idx < PyList_Size(py_list); idx++) { + ret = py_listitem_as_str(py_list, idx, &outstr); if (ret < 0) { - srd_dbg("Couldn't get item %" PY_FORMAT_SIZE_T "d.", i); + srd_dbg("Couldn't get item %zd.", idx); goto err; } *outstrlist = g_slist_append(*outstrlist, outstr); @@ -217,8 +217,9 @@ err: SRD_PRIV int py_listitem_as_str(PyObject *py_obj, Py_ssize_t idx, char **outstr) { - PyObject *py_value; PyGILState_STATE gstate; + ssize_t item_idx; + PyObject *py_value; gstate = PyGILState_Ensure(); @@ -227,8 +228,9 @@ SRD_PRIV int py_listitem_as_str(PyObject *py_obj, Py_ssize_t idx, goto err; } - if (!(py_value = PyList_GetItem(py_obj, idx))) { - srd_dbg("Couldn't get list item %" PY_FORMAT_SIZE_T "d.", idx); + item_idx = idx; + if (!(py_value = PyList_GetItem(py_obj, item_idx))) { + srd_dbg("Couldn't get list item %zd.", item_idx); goto err; }