# first, since usually only that variant will add "-lpython3.8".
# https://docs.python.org/3/whatsnew/3.8.html#debug-build-uses-the-same-abi-as-release-build
SR_PKG_CHECK([python3], [SRD_PKGLIBS],
- [python-3.10-embed], [python-3.9-embed], [python-3.8-embed], [python3-embed], [python-3.8 >= 3.8], [python-3.7 >= 3.7], [python-3.6 >= 3.6], [python-3.5 >= 3.5], [python-3.4 >= 3.4], [python-3.3 >= 3.3], [python-3.2 >= 3.2], [python3 >= 3.2])
+ [python-3.12-embed], [python-3.11-embed],
+ [python-3.10-embed], [python-3.9-embed], [python-3.8-embed], [python3-embed],
+ [python-3.8 >= 3.8], [python-3.7 >= 3.7], [python-3.6 >= 3.6], [python-3.5 >= 3.5],
+ [python-3.4 >= 3.4], [python-3.3 >= 3.3], [python-3.2 >= 3.2], [python3 >= 3.2])
AS_IF([test "x$sr_have_python3" = xno],
[AC_MSG_ERROR([Cannot find Python 3 development headers.])])
##
import sigrokdecode as srd
+from common.srdhelper import bitpack_lsb
def disabled_enabled(v):
return ['Disabled', 'Enabled'][v]
def output_power(v):
- return '%+ddBm' % [-4, -1, 2, 5][v]
+ return '{:+d}dBm'.format([-4, -1, 2, 5][v])
+# Notes on the implementation:
+# - A register's description is an iterable of tuples which contain:
+# The starting bit position, the bit count, the name of a field, and
+# an optional parser which interprets the field's content. Parser are
+# expected to yield a single text string when they exist. Other types
+# of output are passed to Python's .format() routine as is.
+# - Bit fields' width in registers determines the range of indices in
+# table/tuple lookups. Keep the implementation as robust as possible
+# during future maintenance. Avoid Python runtime errors when adjusting
+# the decoder.
regs = {
-# reg: name offset width parser
- 0: [
- ('FRAC', 3, 12, None),
- ('INT', 15, 16, lambda v: 'Not Allowed' if v < 32 else v)
- ],
- 1: [
- ('MOD', 3, 12, None),
- ('Phase', 15, 12, None),
- ('Prescalar', 27, 1, lambda v: ['4/5', '8/9'][v]),
- ('Phase Adjust', 28, 1, lambda v: ['Off', 'On'][v]),
- ],
- 2: [
- ('Counter Reset', 3, 1, disabled_enabled),
- ('Charge Pump Three-State', 4, 1, disabled_enabled),
- ('Power-Down', 5, 1, disabled_enabled),
- ('PD Polarity', 6, 1, lambda v: ['Negative', 'Positive'][v]),
- ('LDP', 7, 1, lambda v: ['10ns', '6ns'][v]),
- ('LDF', 8, 1, lambda v: ['FRAC-N', 'INT-N'][v]),
- ('Charge Pump Current Setting', 9, 4, lambda v: '%0.2fmA @ 5.1kΩ' %
- [0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50,
- 2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00][v]),
- ('Double Buffer', 13, 1, disabled_enabled),
- ('R Counter', 14, 10, None),
- ('RDIV2', 24, 1, disabled_enabled),
- ('Reference Doubler', 25, 1, disabled_enabled),
- ('MUXOUT', 26, 3, lambda v:
- ['Three-State Output', 'DVdd', 'DGND', 'R Counter Output', 'N Divider Output',
- 'Analog Lock Detect', 'Digital Lock Detect', 'Reserved'][v]),
- ('Low Noise and Low Spur Modes', 29, 2, lambda v:
- ['Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode'][v])
- ],
- 3: [
- ('Clock Divider', 3, 12, None),
- ('Clock Divider Mode', 15, 2, lambda v:
- ['Clock Divider Off', 'Fast Lock Enable', 'Resync Enable', 'Reserved'][v]),
- ('CSR Enable', 18, 1, disabled_enabled),
- ('Charge Cancellation', 21, 1, disabled_enabled),
- ('ABP', 22, 1, lambda v: ['6ns (FRAC-N)', '3ns (INT-N)'][v]),
- ('Band Select Clock Mode', 23, 1, lambda v: ['Low', 'High'][v])
- ],
- 4: [
- ('Output Power', 3, 2, output_power),
- ('Output Enable', 5, 1, disabled_enabled),
- ('AUX Output Power', 6, 2, output_power),
- ('AUX Output Select', 8, 1, lambda v: ['Divided Output', 'Fundamental'][v]),
- ('AUX Output Enable', 9, 1, disabled_enabled),
- ('MTLD', 10, 1, disabled_enabled),
- ('VCO Power-Down', 11, 1, lambda v:
- 'VCO Powered ' + ('Down' if v == 1 else 'Up')),
- ('Band Select Clock Divider', 12, 8, None),
- ('RF Divider Select', 20, 3, lambda v: '÷' + str(2**v)),
- ('Feedback Select', 23, 1, lambda v: ['Divided', 'Fundamental'][v]),
- ],
- 5: [
- ('LD Pin Mode', 22, 2, lambda v:
- ['Low', 'Digital Lock Detect', 'Low', 'High'][v])
- ]
+ # Register description fields:
+ # offset, width, name, parser.
+ 0: (
+ ( 3, 12, 'FRAC'),
+ (15, 16, 'INT',
+ None, lambda v: 'Not Allowed' if v < 23 else None,
+ ),
+ ),
+ 1: (
+ ( 3, 12, 'MOD'),
+ (15, 12, 'Phase'),
+ (27, 1, 'Prescalar', lambda v: ('4/5', '8/9',)[v]),
+ (28, 1, 'Phase Adjust', lambda v: ('Off', 'On',)[v]),
+ ),
+ 2: (
+ ( 3, 1, 'Counter Reset', disabled_enabled),
+ ( 4, 1, 'Charge Pump Three-State', disabled_enabled),
+ ( 5, 1, 'Power-Down', disabled_enabled),
+ ( 6, 1, 'PD Polarity', lambda v: ('Negative', 'Positive',)[v]),
+ ( 7, 1, 'LDP', lambda v: ('10ns', '6ns',)[v]),
+ ( 8, 1, 'LDF', lambda v: ('FRAC-N', 'INT-N',)[v]),
+ ( 9, 4, 'Charge Pump Current Setting',
+ lambda v: '{curr:0.2f}mA @ 5.1kΩ'.format(curr = (
+ 0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50,
+ 2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00,
+ )[v])),
+ (13, 1, 'Double Buffer', disabled_enabled),
+ (14, 10, 'R Counter'),
+ (24, 1, 'RDIV2', disabled_enabled),
+ (25, 1, 'Reference Doubler', disabled_enabled),
+ (26, 3, 'MUXOUT',
+ lambda v: '{text}'.format(text = (
+ 'Three-State Output', 'DVdd', 'DGND',
+ 'R Counter Output', 'N Divider Output',
+ 'Analog Lock Detect', 'Digital Lock Detect',
+ 'Reserved',
+ )[v])),
+ (29, 2, 'Low Noise and Low Spur Modes',
+ lambda v: '{text}'.format(text = (
+ 'Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode',
+ )[v])),
+ ),
+ 3: (
+ ( 3, 12, 'Clock Divider'),
+ (15, 2, 'Clock Divider Mode',
+ lambda v: '{text}'.format(text = (
+ 'Clock Divider Off', 'Fast Lock Enable',
+ 'Resync Enable', 'Reserved',
+ )[v])),
+ (18, 1, 'CSR Enable', disabled_enabled),
+ (21, 1, 'Charge Cancellation', disabled_enabled),
+ (22, 1, 'ABP', lambda v: ('6ns (FRAC-N)', '3ns (INT-N)',)[v]),
+ (23, 1, 'Band Select Clock Mode', lambda v: ('Low', 'High',)[v]),
+ ),
+ 4: (
+ ( 3, 2, 'Output Power', output_power),
+ ( 5, 1, 'Output Enable', disabled_enabled),
+ ( 6, 2, 'AUX Output Power', output_power),
+ ( 8, 1, 'AUX Output Select',
+ lambda v: ('Divided Output', 'Fundamental',)[v]),
+ ( 9, 1, 'AUX Output Enable', disabled_enabled),
+ (10, 1, 'MTLD', disabled_enabled),
+ (11, 1, 'VCO Power-Down',
+ lambda v: 'VCO Powered {ud}'.format(ud = 'Down' if v else 'Up')),
+ (12, 8, 'Band Select Clock Divider'),
+ (20, 3, 'RF Divider Select', lambda v: '÷{:d}'.format(2 ** v)),
+ (23, 1, 'Feedback Select', lambda v: ('Divided', 'Fundamental',)[v]),
+ ),
+ 5: (
+ (22, 2, 'LD Pin Mode',
+ lambda v: '{text}'.format(text = (
+ 'Low', 'Digital Lock Detect', 'Low', 'High',
+ )[v])),
+ ),
}
-ANN_REG = 0
+( ANN_REG, ANN_WARN, ) = range(2)
class Decoder(srd.Decoder):
api_version = 3
annotations = (
# Sent from the host to the chip.
('write', 'Register write'),
+ ('warning', "Warnings"),
)
annotation_rows = (
('writes', 'Register writes', (ANN_REG,)),
+ ('warnings', 'Warnings', (ANN_WARN,)),
)
def __init__(self):
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
+ def putg(self, ss, es, cls, data):
+ self.put(ss, es, self.out_ann, [ cls, data, ])
+
def decode_bits(self, offset, width):
- return (sum([(1 << i) if self.bits[offset + i][0] else 0 for i in range(width)]),
- (self.bits[offset + width - 1][1], self.bits[offset][2]))
+ '''Extract a bit field. Expects LSB input data.'''
+ bits = self.bits[offset:][:width]
+ ss, es = bits[-1][1], bits[0][2]
+ value = bitpack_lsb(bits, 0)
+ return ( value, ( ss, es, ))
+
+ def decode_field(self, name, offset, width, parser = None, checker = None):
+ '''Interpret a bit field. Emits an annotation.'''
+ # Get the register field's content and position.
+ val, ( ss, es, ) = self.decode_bits(offset, width)
+ # Have the field's content formatted, emit an annotation.
+ formatted = parser(val) if parser else '{}'.format(val)
+ if formatted is not None:
+ text = ['{name}: {val}'.format(name = name, val = formatted)]
+ else:
+ text = ['{name}'.format(name = name)]
+ if text:
+ self.putg(ss, es, ANN_REG, text)
+ # Have the field's content checked, emit an optional warning.
+ warn = checker(val) if checker else None
+ if warn:
+ text = ['{}'.format(warn)]
+ self.putg(ss, es, ANN_WARN, text)
- def decode_field(self, name, offset, width, parser):
- val, pos = self.decode_bits(offset, width)
- self.put(pos[0], pos[1], self.out_ann, [ANN_REG,
- ['%s: %s' % (name, parser(val) if parser else str(val))]])
- return val
+ def decode_word(self, ss, es, bits):
+ '''Interpret a 32bit word after accumulation completes.'''
+ # SPI transfer content must be exactly one 32bit word.
+ count = len(self.bits)
+ if count != 32:
+ text = [
+ 'Frame error: Bit count: want 32, got {}'.format(count),
+ 'Frame error: Bit count',
+ 'Frame error',
+ ]
+ self.putg(ss, es, ANN_WARN, text)
+ return
+ # Holding bits in LSB order during interpretation simplifies
+ # bit field extraction. And annotation emitting routines expect
+ # this reverse order of bits' timestamps.
+ self.bits.reverse()
+ # Determine which register was accessed.
+ reg_addr, ( reg_ss, reg_es, ) = self.decode_bits(0, 3)
+ text = [
+ 'Register: {addr}'.format(addr = reg_addr),
+ 'Reg: {addr}'.format(addr = reg_addr),
+ '[{addr}]'.format(addr = reg_addr),
+ ]
+ self.putg(reg_ss, reg_es, ANN_REG, text)
+ # Interpret the register's content (when parsers are available).
+ field_descs = regs.get(reg_addr, None)
+ if not field_descs:
+ return
+ for field_desc in field_descs:
+ parser = None
+ checker = None
+ if len(field_desc) == 3:
+ start, count, name, = field_desc
+ elif len(field_desc) == 4:
+ start, count, name, parser = field_desc
+ elif len(field_desc) == 5:
+ start, count, name, parser, checker = field_desc
+ else:
+ # Unsupported regs{} syntax, programmer's error.
+ return
+ self.decode_field(name, start, count, parser, checker)
def decode(self, ss, es, data):
+ ptype, _, _ = data
+
+ if ptype == 'TRANSFER':
+ # Process accumulated bits after completion of a transfer.
+ self.decode_word(ss, es, self.bits)
+ self.bits.clear()
- ptype, data1, data2 = data
-
- if ptype == 'CS-CHANGE':
- if data1 == 1:
- if len(self.bits) == 32:
- reg_value, reg_pos = self.decode_bits(0, 3)
- self.put(reg_pos[0], reg_pos[1], self.out_ann, [ANN_REG,
- ['Register: %d' % reg_value, 'Reg: %d' % reg_value,
- '[%d]' % reg_value]])
- if reg_value < len(regs):
- field_descs = regs[reg_value]
- for field_desc in field_descs:
- field = self.decode_field(*field_desc)
- self.bits = []
if ptype == 'BITS':
- self.bits = data1 + self.bits
+ _, mosi_bits, miso_bits = data
+ # Accumulate bits in MSB order as they are seen in SPI frames.
+ msb_bits = mosi_bits.copy()
+ msb_bits.reverse()
+ self.bits.extend(msb_bits)
# Vendor code
vendor_code = {
- 0x1e: 'Atmel',
+ 0x1E: 'Atmel',
0x00: 'Device locked',
}
# (Part family + flash size, part number)
part = {
(0x90, 0x01): 'AT90S1200',
+ (0x90, 0x05): 'ATtiny12',
+ (0x90, 0x06): 'ATtiny15',
+ (0x90, 0x07): 'ATtiny13',
(0x91, 0x01): 'AT90S2313',
+ (0x91, 0x02): 'AT90S2323',
+ (0x91, 0x03): 'AT90S2343',
+ (0x91, 0x05): 'AT90S2333',
+ (0x91, 0x06): 'ATtiny22',
+ (0x91, 0x07): 'ATtiny28',
+ (0x91, 0x08): 'ATtiny25',
+ (0x91, 0x09): 'ATtiny26',
+ (0x91, 0x0A): 'ATtiny2313',
+ (0x91, 0x0B): 'ATtiny24',
+ (0x91, 0x0C): 'ATtiny261',
(0x92, 0x01): 'AT90S4414',
- (0x92, 0x05): 'ATmega48', # 4kB flash
+ (0x92, 0x03): 'AT90S4433',
+ (0x92, 0x05): 'ATmega48(A)',
+ (0x92, 0x06): 'ATtiny45',
+ (0x92, 0x08): 'ATtiny461',
+ (0x92, 0x09): 'ATtiny48',
+ (0x92, 0x0A): 'ATmega48PA',
+ (0x92, 0x0D): 'ATtiny4313',
+ (0x92, 0x10): 'ATmega48PB',
(0x93, 0x01): 'AT90S8515',
- (0x93, 0x0a): 'ATmega88', # 8kB flash
- (0x94, 0x06): 'ATmega168', # 16kB flash
- (0xff, 0xff): 'Device code erased, or target missing',
+ (0x93, 0x03): 'AT90S8535',
+ (0x93, 0x07): 'ATmega8',
+ (0x93, 0x0A): 'ATmega88(A)',
+ (0x93, 0x0B): 'ATtiny85',
+ (0x93, 0x0D): 'ATtiny861',
+ (0x93, 0x0F): 'ATmega88PA',
+ (0x93, 0x11): 'ATtiny88',
+ (0x93, 0x16): 'ATmega88PB',
+ (0x93, 0x89): 'ATmega8U2',
+ (0x94, 0x01): 'ATmega161',
+ (0x94, 0x02): 'ATmega163',
+ (0x94, 0x03): 'ATmega16',
+ (0x94, 0x04): 'ATmega162',
+ (0x94, 0x06): 'ATmega168(A)',
+ (0x94, 0x0A): 'ATmega164PA',
+ (0x94, 0x0B): 'ATmega168PA',
+ (0x94, 0x0F): 'ATmega164A',
+ (0x94, 0x12): 'ATtiny1634',
+ (0x94, 0x15): 'ATmega168PB',
+ (0x94, 0x88): 'ATmega16U4',
+ (0x94, 0x89): 'ATmega16U2',
+ (0x95, 0x01): 'ATmega32',
+ (0x95, 0x01): 'ATmega323',
+ (0x95, 0x0F): 'ATmega328P',
+ (0x95, 0x11): 'ATmega324PA',
+ (0x95, 0x14): 'ATmega328',
+ (0x95, 0x15): 'ATmega324A',
+ (0x95, 0x87): 'ATmega32U4',
+ (0x95, 0x8A): 'ATmega32U2',
+ (0x96, 0x08): 'ATmega640',
+ (0x96, 0x09): 'ATmega644(A)',
+ (0x96, 0x0A): 'ATmega644PA',
+ (0x97, 0x01): 'ATmega103',
+ (0x97, 0x03): 'ATmega1280',
+ (0x97, 0x04): 'ATmega1281',
+ (0x97, 0x05): 'ATmega1284P',
+ (0x97, 0x06): 'ATmega1284',
+ (0x98, 0x01): 'ATmega2560',
+ (0x98, 0x02): 'ATmega2561',
+ (0xFF, 0xFF): 'Device code erased, or target missing',
(0x01, 0x02): 'Device locked',
- # TODO: Lots more entries.
}
self.part_number = ret[3]
self.putx([Ann.RSB2, ['Part number: 0x%02x' % ret[3]]])
- p = part[(self.part_fam_flash_size, self.part_number)]
- data = [Ann.DEV, ['Device: Atmel %s' % p]]
- self.put(self.ss_device, self.es_cmd, self.out_ann, data)
+ # Part name if known
+ key = (self.part_fam_flash_size, self.part_number)
+ if key in part:
+ p = part[key]
+ data = [Ann.DEV, ['Device: Atmel %s' % p]]
+ self.put(self.ss_device, self.es_cmd, self.out_ann, data)
# Sanity check on reply.
if ret[1] != 0x30 or ret[2] != self.xx or ret[0] != self.mm:
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
+import copy
import sigrokdecode as srd
from .lists import *
self.reset_variables()
def decode(self, ss, es, data):
- self.cmd, self.databyte = data
+ cmd, _ = data
# Collect the 'BITS' packet, then return. The next packet is
# guaranteed to belong to these bits we just stored.
- if self.cmd == 'BITS':
- self.bits = self.databyte
+ if cmd == 'BITS':
+ _, databits = data
+ self.bits = copy.deepcopy(databits)
return
- # Store the start/end samples of this I²C packet.
+ # Store the start/end samples of this I²C packet. Deep copy
+ # caller's data, assuming that implementation details of the
+ # above complex methods can access the data after returning
+ # from the .decode() invocation, with the data having become
+ # invalid by that time of access. This conservative approach
+ # can get weakened after close inspection of those methods.
self.ss, self.es = ss, es
+ _, databyte = data
+ databyte = copy.deepcopy(databyte)
+ self.cmd, self.databyte = cmd, databyte
# State machine.
s = 'handle_%s' % self.state.lower().replace(' ', '_')
# TODO: Implement support for inverting SDA/SCL levels (0->1 and 1->0).
# TODO: Implement support for detecting various bus errors.
+from common.srdhelper import bitpack_msb
import sigrokdecode as srd
'''
command. Slave addresses do not include bit 0 (the READ/WRITE indication bit).
For example, a slave address field could be 0x51 (instead of 0xa2).
For 'START', 'START REPEAT', 'STOP', 'ACK', and 'NACK' <pdata> is None.
+For 'BITS' <pdata> is a sequence of tuples of bit values and their start and
+stop positions, in LSB first order (although the I2C protocol is MSB first).
'''
-# CMD: [annotation-type-index, long annotation, short annotation]
+# Meaning of table items:
+# command -> [annotation class, annotation text in order of decreasing length]
proto = {
- 'START': [0, 'Start', 'S'],
- 'START REPEAT': [1, 'Start repeat', 'Sr'],
- 'STOP': [2, 'Stop', 'P'],
- 'ACK': [3, 'ACK', 'A'],
- 'NACK': [4, 'NACK', 'N'],
- 'BIT': [5, 'Bit', 'B'],
- 'ADDRESS READ': [6, 'Address read', 'AR'],
- 'ADDRESS WRITE': [7, 'Address write', 'AW'],
- 'DATA READ': [8, 'Data read', 'DR'],
- 'DATA WRITE': [9, 'Data write', 'DW'],
+ 'START': [0, 'Start', 'S'],
+ 'START REPEAT': [1, 'Start repeat', 'Sr'],
+ 'STOP': [2, 'Stop', 'P'],
+ 'ACK': [3, 'ACK', 'A'],
+ 'NACK': [4, 'NACK', 'N'],
+ 'BIT': [5, '{b:1d}'],
+ 'ADDRESS READ': [6, 'Address read: {b:02X}', 'AR: {b:02X}', '{b:02X}'],
+ 'ADDRESS WRITE': [7, 'Address write: {b:02X}', 'AW: {b:02X}', '{b:02X}'],
+ 'DATA READ': [8, 'Data read: {b:02X}', 'DR: {b:02X}', '{b:02X}'],
+ 'DATA WRITE': [9, 'Data write: {b:02X}', 'DW: {b:02X}', '{b:02X}'],
+ 'WARN': [10, '{text}'],
}
class Decoder(srd.Decoder):
def reset(self):
self.samplerate = None
- self.ss = self.es = self.ss_byte = -1
- self.bitcount = 0
- self.databyte = 0
- self.wr = -1
- self.is_repeat_start = 0
- self.state = 'FIND START'
+ self.is_write = None
+ self.rem_addr_bytes = None
+ self.slave_addr_7 = None
+ self.slave_addr_10 = None
+ self.is_repeat_start = False
self.pdu_start = None
self.pdu_bits = 0
- self.bits = []
+ self.data_bits = []
+ self.bitwidth = 0
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.out_bitrate = self.register(srd.OUTPUT_META,
meta=(int, 'Bitrate', 'Bitrate from Start bit to Stop bit'))
- def putx(self, data):
- self.put(self.ss, self.es, self.out_ann, data)
-
- def putp(self, data):
- self.put(self.ss, self.es, self.out_python, data)
-
- def putb(self, data):
- self.put(self.ss, self.es, self.out_binary, data)
-
- def handle_start(self, pins):
- self.ss, self.es = self.samplenum, self.samplenum
- self.pdu_start = self.samplenum
- self.pdu_bits = 0
- cmd = 'START REPEAT' if (self.is_repeat_start == 1) else 'START'
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- self.state = 'FIND ADDRESS'
- self.bitcount = self.databyte = 0
- self.is_repeat_start = 1
- self.wr = -1
- self.bits = []
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
+
+ def putp(self, ss, es, data):
+ self.put(ss, es, self.out_python, data)
+
+ def putb(self, ss, es, data):
+ self.put(ss, es, self.out_binary, data)
+
+ def _wants_start(self):
+ # Check whether START is required (to sync to the input stream).
+ return self.pdu_start is None
+
+ def _collects_address(self):
+ # Check whether the transfer still is in the address phase (is
+ # still collecting address and r/w details, or has not started
+ # collecting it).
+ return self.rem_addr_bytes is None or self.rem_addr_bytes != 0
+
+ def _collects_byte(self):
+ # Check whether bits of a byte are being collected. Outside of
+ # the data byte, the bit is the ACK/NAK slot.
+ return self.data_bits is None or len(self.data_bits) < 8
+
+ def handle_start(self, ss, es):
+ if self.is_repeat_start:
+ cmd = 'START REPEAT'
+ else:
+ cmd = 'START'
+ self.pdu_start = ss
+ self.pdu_bits = 0
+ self.putp(ss, es, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss, es, cls, texts)
+ self.is_repeat_start = True
+ self.is_write = None
+ self.slave_addr_7 = None
+ self.slave_addr_10 = None
+ self.rem_addr_bytes = None
+ self.data_bits.clear()
+ self.bitwidth = 0
# Gather 8 bits of data plus the ACK/NACK bit.
- def handle_address_or_data(self, pins):
- scl, sda = pins
+ def handle_address_or_data(self, ss, es, value):
self.pdu_bits += 1
- # Address and data are transmitted MSB-first.
- self.databyte <<= 1
- self.databyte |= sda
-
- # Remember the start of the first data/address bit.
- if self.bitcount == 0:
- self.ss_byte = self.samplenum
-
- # Store individual bits and their start/end samplenumbers.
- # In the list, index 0 represents the LSB (I²C transmits MSB-first).
- self.bits.insert(0, [sda, self.samplenum, self.samplenum])
- if self.bitcount > 0:
- self.bits[1][2] = self.samplenum
- if self.bitcount == 7:
- self.bitwidth = self.bits[1][2] - self.bits[2][2]
- self.bits[0][2] += self.bitwidth
-
- # Return if we haven't collected all 8 + 1 bits, yet.
- if self.bitcount < 7:
- self.bitcount += 1
+ # Accumulate a byte's bits, including its start position.
+ # Accumulate individual bits and their start/end sample numbers
+ # as we see them. Get the start sample number at the time when
+ # the bit value gets sampled. Assume the start of the next bit
+ # as the end sample number of the previous bit. Guess the last
+ # bit's end sample number from the second last bit's width.
+ # Keep the bits in receive order (MSB first) during accumulation.
+ # (gsi: Strictly speaking falling SCL would be the end of the
+ # bit value's validity. That'd break compatibility though.)
+ if self.data_bits:
+ self.data_bits[-1][2] = ss
+ self.data_bits.append([value, ss, es])
+ if len(self.data_bits) < 8:
return
-
- d = self.databyte
- if self.state == 'FIND ADDRESS':
- # The READ/WRITE bit is only in address bytes, not data bytes.
- self.wr = 0 if (self.databyte & 1) else 1
- if self.options['address_format'] == 'shifted':
- d = d >> 1
-
+ self.bitwidth = self.data_bits[-2][2] - self.data_bits[-3][2]
+ self.data_bits[-1][2] = self.data_bits[-1][1] + self.bitwidth
+
+ # Get the byte value. Address and data are transmitted MSB-first.
+ d = bitpack_msb(self.data_bits, 0)
+ ss_byte, es_byte = self.data_bits[0][1], self.data_bits[-1][2]
+
+ # Process the address bytes at the start of a transfer. The
+ # first byte will carry the R/W bit, and all of the 7bit address
+ # or part of a 10bit address. Bit pattern 0b11110xxx signals
+ # that another byte follows which carries the remaining bits of
+ # a 10bit slave address.
+ is_address = self._collects_address()
+ if is_address:
+ addr_byte = d
+ if self.rem_addr_bytes is None:
+ if (addr_byte & 0xf8) == 0xf0:
+ self.rem_addr_bytes = 2
+ self.slave_addr_7 = None
+ self.slave_addr_10 = addr_byte & 0x06
+ self.slave_addr_10 <<= 7
+ else:
+ self.rem_addr_bytes = 1
+ self.slave_addr_7 = addr_byte >> 1
+ self.slave_addr_10 = None
+ has_rw_bit = self.is_write is None
+ if self.is_write is None:
+ read_bit = bool(addr_byte & 1)
+ if self.options['address_format'] == 'shifted':
+ d >>= 1
+ self.is_write = False if read_bit else True
+ elif self.slave_addr_10 is not None:
+ self.slave_addr_10 |= addr_byte
+ else:
+ cls, texts = proto['WARN'][0], proto['WARN'][1:]
+ msg = 'Unhandled address byte'
+ texts = [t.format(text = msg) for t in texts]
+ self.putg(ss_byte, es_byte, cls, texts)
+ is_write = self.is_write
+ is_seven = self.slave_addr_7 is not None
+
+ # Determine annotation classes depending on whether the byte is
+ # an address or payload data, and whether it's written or read.
bin_class = -1
- if self.state == 'FIND ADDRESS' and self.wr == 1:
+ if is_address and is_write:
cmd = 'ADDRESS WRITE'
bin_class = 1
- elif self.state == 'FIND ADDRESS' and self.wr == 0:
+ elif is_address and not is_write:
cmd = 'ADDRESS READ'
bin_class = 0
- elif self.state == 'FIND DATA' and self.wr == 1:
+ elif not is_address and is_write:
cmd = 'DATA WRITE'
bin_class = 3
- elif self.state == 'FIND DATA' and self.wr == 0:
+ elif not is_address and not is_write:
cmd = 'DATA READ'
bin_class = 2
- self.ss, self.es = self.ss_byte, self.samplenum + self.bitwidth
-
- self.putp(['BITS', self.bits])
- self.putp([cmd, d])
-
- self.putb([bin_class, bytes([d])])
-
- for bit in self.bits:
- self.put(bit[1], bit[2], self.out_ann, [5, ['%d' % bit[0]]])
-
- if cmd.startswith('ADDRESS'):
- self.ss, self.es = self.samplenum, self.samplenum + self.bitwidth
- w = ['Write', 'Wr', 'W'] if self.wr else ['Read', 'Rd', 'R']
- self.putx([proto[cmd][0], w])
- self.ss, self.es = self.ss_byte, self.samplenum
-
- self.putx([proto[cmd][0], ['%s: %02X' % (proto[cmd][1], d),
- '%s: %02X' % (proto[cmd][2], d), '%02X' % d]])
-
- # Done with this packet.
- self.bitcount = self.databyte = 0
- self.bits = []
- self.state = 'FIND ACK'
-
- def get_ack(self, pins):
- scl, sda = pins
- self.ss, self.es = self.samplenum, self.samplenum + self.bitwidth
- cmd = 'NACK' if (sda == 1) else 'ACK'
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- # There could be multiple data bytes in a row, so either find
- # another data byte or a STOP condition next.
- self.state = 'FIND DATA'
-
- def handle_stop(self, pins):
+ # Reverse the list of bits to LSB first order before emitting
+ # annotations and passing bits to upper layers. This may be
+ # unexpected because the protocol is MSB first, but it keeps
+ # backwards compatibility.
+ lsb_bits = self.data_bits[:]
+ lsb_bits.reverse()
+ self.putp(ss_byte, es_byte, ['BITS', lsb_bits])
+ self.putp(ss_byte, es_byte, [cmd, d])
+
+ self.putb(ss_byte, es_byte, [bin_class, bytes([d])])
+
+ for bit_value, ss_bit, es_bit in lsb_bits:
+ cls, texts = proto['BIT'][0], proto['BIT'][1:]
+ texts = [t.format(b = bit_value) for t in texts]
+ self.putg(ss_bit, es_bit, cls, texts)
+
+ if is_address and has_rw_bit:
+ # Assign the last bit's location to the R/W annotation.
+ # Adjust the address value's location to the left.
+ ss_bit, es_bit = self.data_bits[-1][1], self.data_bits[-1][2]
+ es_byte = self.data_bits[-2][2]
+ cls = proto[cmd][0]
+ w = ['Write', 'Wr', 'W'] if self.is_write else ['Read', 'Rd', 'R']
+ self.putg(ss_bit, es_bit, cls, w)
+
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ texts = [t.format(b = d) for t in texts]
+ self.putg(ss_byte, es_byte, cls, texts)
+
+ def get_ack(self, ss, es, value):
+ ss_bit, es_bit = ss, es
+ cmd = 'ACK' if value == 0 else 'NACK'
+ self.putp(ss_bit, es_bit, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss_bit, es_bit, cls, texts)
+ # Slave addresses can span one or two bytes, before data bytes
+ # follow. There can be an arbitrary number of data bytes. Stick
+ # with getting more address bytes if applicable, or enter or
+ # remain in the data phase of the transfer otherwise.
+ if self.rem_addr_bytes:
+ self.rem_addr_bytes -= 1
+ self.data_bits.clear()
+
+ def handle_stop(self, ss, es):
# Meta bitrate
- if self.samplerate:
- elapsed = 1 / float(self.samplerate) * (self.samplenum - self.pdu_start + 1)
+ if self.samplerate and self.pdu_start:
+ elapsed = es - self.pdu_start + 1
+ elapsed /= self.samplerate
bitrate = int(1 / elapsed * self.pdu_bits)
- self.put(self.ss_byte, self.samplenum, self.out_bitrate, bitrate)
+ ss_meta, es_meta = self.pdu_start, es
+ self.put(ss_meta, es_meta, self.out_bitrate, bitrate)
+ self.pdu_start = None
+ self.pdu_bits = 0
cmd = 'STOP'
- self.ss, self.es = self.samplenum, self.samplenum
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- self.state = 'FIND START'
- self.is_repeat_start = 0
- self.wr = -1
- self.bits = []
+ self.putp(ss, es, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss, es, cls, texts)
+ self.is_repeat_start = False
+ self.is_write = None
+ self.data_bits.clear()
def decode(self):
+ # Check for several bus conditions. Determine sample numbers
+ # here and pass ss, es, and bit values to handling routines.
while True:
# State machine.
- if self.state == 'FIND START':
+ # BEWARE! This implementation expects to see valid traffic,
+ # is rather picky in which phase which symbols get handled.
+ # This attempts to support severely undersampled captures,
+ # which a previous implementation happened to read instead
+ # of rejecting the inadequate input data.
+ # NOTE that handling bits at the start of their validity,
+ # and assuming that they remain valid until the next bit
+ # starts, is also done for backwards compatibility.
+ if self._wants_start():
# Wait for a START condition (S): SCL = high, SDA = falling.
- self.handle_start(self.wait({0: 'h', 1: 'f'}))
- elif self.state == 'FIND ADDRESS':
+ pins = self.wait({0: 'h', 1: 'f'})
+ ss, es = self.samplenum, self.samplenum
+ self.handle_start(ss, es)
+ elif self._collects_address() and self._collects_byte():
# Wait for a data bit: SCL = rising.
- self.handle_address_or_data(self.wait({0: 'r'}))
- elif self.state == 'FIND DATA':
+ pins = self.wait({0: 'r'})
+ _, sda = pins
+ ss, es = self.samplenum, self.samplenum + self.bitwidth
+ self.handle_address_or_data(ss, es, sda)
+ elif self._collects_byte():
# Wait for any of the following conditions (or combinations):
# a) Data sampling of receiver: SCL = rising, and/or
# b) START condition (S): SCL = high, SDA = falling, and/or
# Check which of the condition(s) matched and handle them.
if self.matched[0]:
- self.handle_address_or_data(pins)
+ _, sda = pins
+ ss, es = self.samplenum, self.samplenum + self.bitwidth
+ self.handle_address_or_data(ss, es, sda)
elif self.matched[1]:
- self.handle_start(pins)
+ ss, es = self.samplenum, self.samplenum
+ self.handle_start(ss, es)
elif self.matched[2]:
- self.handle_stop(pins)
- elif self.state == 'FIND ACK':
+ ss, es = self.samplenum, self.samplenum
+ self.handle_stop(ss, es)
+ else:
# Wait for a data/ack bit: SCL = rising.
- self.get_ack(self.wait({0: 'r'}))
+ pins = self.wait({0: 'r'})
+ _, sda = pins
+ ss, es = self.samplenum, self.samplenum + self.bitwidth
+ self.get_ack(ss, es, sda)
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
-# TODO: Support for filtering out multiple slave/direction pairs?
+# TODO
+# - Accept other slave address forms than decimal numbers?
+# - Support for filtering out multiple slave/direction pairs?
+# - Support 10bit slave addresses?
+import copy
import sigrokdecode as srd
class Decoder(srd.Decoder):
self.reset()
def reset(self):
- self.curslave = -1
- self.curdirection = None
- self.packets = [] # Local cache of I²C packets
+ self.seen_packets = []
+ self.do_forward = None
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON, proto_id='i2c')
if self.options['address'] not in range(0, 127 + 1):
raise Exception('Invalid slave (must be 0..127).')
+ self.want_addrs = []
+ if self.options['address']:
+ self.want_addrs.append(self.options['address'])
+ self.want_dir = {
+ 'read': 'READ', 'write': 'WRITE',
+ }.get(self.options['direction'], None)
- # Grab I²C packets into a local cache, until an I²C STOP condition
- # packet comes along. At some point before that STOP condition, there
- # will have been an ADDRESS READ or ADDRESS WRITE which contains the
- # I²C address of the slave that the master wants to talk to.
- # If that slave shall be filtered, output the cache (all packets from
- # START to STOP) as proto 'i2c', otherwise drop it.
- def decode(self, ss, es, data):
+ def _need_to_forward(self, slave_addr, direction):
+ if self.want_addrs and slave_addr not in self.want_addrs:
+ return False
+ if self.want_dir and direction != self.want_dir:
+ return False
+ return True
- cmd, databyte = data
+ # Accumulate observed I2C packets until a STOP or REPEATED START
+ # condition is seen. These are conditions where transfers end or
+ # where direction potentially changes. Forward all previously
+ # accumulated traffic if it passes the slave address and direction
+ # filter. This assumes that the slave address as well as the read
+ # or write direction was part of the observed traffic. There should
+ # be no surprise when incomplete traffic does not match the filter
+ # condition.
+ def decode(self, ss, es, data):
- # Add the I²C packet to our local cache.
- self.packets.append([ss, es, data])
+ # Unconditionally accumulate every lower layer packet we see.
+ # Keep deep copies for later, only reference caller's values
+ # as long as this .decode() invocation executes.
+ self.seen_packets.append([ss, es, copy.deepcopy(data)])
+ cmd, _ = data
+ # Check the slave address and transfer direction early when
+ # we see them. Keep accumulating packets while it's already
+ # known here whether to forward them. This simplifies other
+ # code paths. Including future handling of 10bit addresses.
if cmd in ('ADDRESS READ', 'ADDRESS WRITE'):
- self.curslave = databyte
- self.curdirection = cmd[8:].lower()
- elif cmd in ('STOP', 'START REPEAT'):
- # If this chunk was not for the correct slave, drop it.
- if self.options['address'] == 0:
- pass
- elif self.curslave != self.options['address']:
- self.packets = []
- return
-
- # If this chunk was not in the right direction, drop it.
- if self.options['direction'] == 'both':
- pass
- elif self.options['direction'] != self.curdirection:
- self.packets = []
- return
-
- # TODO: START->STOP chunks with both read and write (Repeat START)
- # Otherwise, send out the whole chunk of I²C packets.
- for p in self.packets:
- self.put(p[0], p[1], self.out_python, p[2])
+ direction = cmd[len('ADDRESS '):]
+ _, slave_addr = data
+ self.do_forward = self._need_to_forward(slave_addr, direction)
+ return
- self.packets = []
- else:
- pass # Do nothing, only add the I²C packet to our cache.
+ # Forward previously accumulated packets as we see their
+ # completion, and when they pass the filter condition. Prepare
+ # to handle the next transfer (the next read/write part of it).
+ if cmd in ('STOP', 'START REPEAT'):
+ if self.do_forward:
+ for ss, es, data in self.seen_packets:
+ self.put(ss, es, self.out_python, data)
+ self.seen_packets.clear()
+ self.do_forward = None
+ return
def reset(self):
self.variant = None
- self.ss_block = None
- self.es_block = None
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.variant = self.options['variant']
- def putx(self, data):
- self.put(self.ss_block, self.es_block, self.out_ann, data)
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, [text]])
- def handle_data(self, value):
- if value == 0xFF:
- self.putx([1, ['No button is pressed']])
- return
+ def handle_data(self, ss, es, value):
+ if value == 0xff:
+ self.putg(ss, es, 1, 'No button is pressed')
+ return
if value == 0x00:
- self.putx([2, ['Gamepad is not connected']])
- return
+ self.putg(ss, es, 2, 'Gamepad is not connected')
+ return
buttons = [
'A',
'North',
'South',
'West',
- 'East'
+ 'East',
]
- bits = format(value, '08b')
- button_str = ''
-
- for b in enumerate(bits):
- button_index = b[0]
- button_is_pressed = b[1] == '0'
-
- if button_is_pressed:
- if button_str != '':
- button_str += ' + '
- button_str += buttons[button_index]
-
- self.putx([0, ['%s' % button_str]])
+ bits = '{:08b}'.format(value)
+ text = [buttons[i] for i, b in enumerate(bits) if b == '0']
+ text = ' + '.join(text)
+ self.putg(ss, es, 0, text)
def decode(self, ss, es, data):
- ptype, mosi, miso = data
- self.ss_block, self.es_block = ss, es
-
- if ptype != 'DATA':
- return
-
- self.handle_data(miso)
+ ptype, _, _ = data
+ if ptype == 'DATA':
+ _, _, miso = data
+ self.handle_data(ss, es, miso)
+ return
# This results in robust operation for low-oversampled input.
in_reset = False
while True:
- pins = self.wait(conds)
+ try:
+ pins = self.wait(conds)
+ except EOFError as e:
+ break
clock_edge = cond_idx_clock is not None and self.matched[cond_idx_clock]
data_edge = cond_idx_data_0 is not None and [idx for idx in range(cond_idx_data_0, cond_idx_data_N) if self.matched[idx]]
reset_edge = cond_idx_reset is not None and self.matched[cond_idx_reset]
data_bits = data_bits[:num_item_bits]
item = bitpack(data_bits)
self.handle_bits(self.samplenum, item, num_item_bits)
+
+ self.handle_bits(self.samplenum, None, num_item_bits)
+ # TODO Determine whether a WARN annotation needs to get emitted.
+ # The decoder has not seen the end of the last accumulated item.
+ # Instead it just ran out of input data.
import sigrokdecode as srd
+( ANN_RGB, ) = range(1)
+
class Decoder(srd.Decoder):
api_version = 3
id = 'rgb_led_spi'
self.reset()
def reset(self):
- self.ss_cmd, self.es_cmd = 0, 0
+ self.ss_cmd = None
self.mosi_bytes = []
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
- def putx(self, data):
- self.put(self.ss_cmd, self.es_cmd, self.out_ann, data)
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
def decode(self, ss, es, data):
- ptype, mosi, miso = data
+ ptype = data[0]
- # Only care about data packets.
+ # Grab the payload of three DATA packets. These hold the
+ # RGB values (in this very order).
if ptype != 'DATA':
return
- self.ss, self.es = ss, es
-
- if len(self.mosi_bytes) == 0:
+ _, mosi, _ = data
+ if not self.mosi_bytes:
self.ss_cmd = ss
self.mosi_bytes.append(mosi)
-
- # RGB value == 3 bytes
- if len(self.mosi_bytes) != 3:
+ if len(self.mosi_bytes) < 3:
return
- red, green, blue = self.mosi_bytes
+ # Emit annotations. Invalidate accumulated details as soon as
+ # they were processed, to prepare the next iteration.
+ ss_cmd, es_cmd = self.ss_cmd, es
+ self.ss_cmd = None
+ red, green, blue = self.mosi_bytes[:3]
+ self.mosi_bytes.clear()
rgb_value = int(red) << 16 | int(green) << 8 | int(blue)
-
- self.es_cmd = es
- self.putx([0, ['#%.6x' % rgb_value]])
- self.mosi_bytes = []
+ self.putg(ss_cmd, es_cmd, ANN_RGB, ['#{:06x}'.format(rgb_value)])
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
+# Implementor's notes on the wire format:
+# - World Semi vendor, (Adafruit copy of the) datasheet
+# https://cdn-shop.adafruit.com/datasheets/WS2812.pdf
+# - reset pulse is 50us (or more) of low pin level
+# - 24bits per WS281x item, 3x 8bits, MSB first, GRB sequence,
+# cascaded WS281x items, all "excess bits" are passed through
+# - bit time starts with high period, continues with low period,
+# high to low periods' ratio determines bit value, datasheet
+# mentions 0.35us/0.8us for value 0, 0.7us/0.6us for value 1
+# (huge 150ns tolerances, un-even 0/1 value length, hmm)
+# - experience suggests the timing "is variable", rough estimation
+# often is good enough, microcontroller firmware got away with
+# four quanta per bit time, or even with three quanta (30%/60%),
+# Adafruit learn article suggests 1.2us total and 0.4/0.8 or
+# 0.8/0.4 high/low parts, four quanta are easier to handle when
+# the bit stream is sent via SPI to avoid MCU bit banging and its
+# inaccurate timing (when interrupts are used in the firmware)
+# - RGBW datasheet (Adafruit copy) for SK6812
+# https://cdn-shop.adafruit.com/product-files/2757/p2757_SK6812RGBW_REV01.pdf
+# also 1.2us total, shared across 0.3/0.9 for 0, 0.6/0.6 for 1,
+# 80us reset pulse, R8/G8/B8/W8 format per 32bits
+# - WS2815, RGB LED, uses GRB wire format, 280us RESET pulse width
+# - more vendors and models available and in popular use,
+# suggests "one third" or "two thirds" ratio would be most robust,
+# sample "a little before" the bit half? reset pulse width may need
+# to become an option? matrices and/or fast refresh environments
+# may want to experiment with back to back pixel streams
+
import sigrokdecode as srd
-from functools import reduce
+from common.srdhelper import bitpack_msb
class SamplerateError(Exception):
pass
+class DecoderError(Exception):
+ pass
+
+(
+ ANN_BIT, ANN_RESET, ANN_RGB,
+ ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,
+) = range(7)
+
class Decoder(srd.Decoder):
api_version = 3
id = 'rgb_led_ws281x'
('bit', 'Bit'),
('reset', 'RESET'),
('rgb', 'RGB'),
+ ('r', 'R'),
+ ('g', 'G'),
+ ('b', 'B'),
+ ('w', 'W'),
)
annotation_rows = (
- ('bits', 'Bits', (0, 1)),
- ('rgb-vals', 'RGB values', (2,)),
+ ('bits', 'Bits', (ANN_BIT, ANN_RESET,)),
+ ('rgb-comps', 'RGB components', (ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,)),
+ ('rgb-vals', 'RGB values', (ANN_RGB,)),
)
options = (
- {'id': 'type', 'desc': 'RGB or RGBW', 'default': 'RGB',
- 'values': ('RGB', 'RGBW')},
+ {'id': 'wireorder', 'desc': 'colour components order (wire)',
+ 'default': 'GRB',
+ 'values': ('BGR', 'BRG', 'GBR', 'GRB', 'RBG', 'RGB', 'RWBG', 'RGBW')},
+ {'id': 'textorder', 'desc': 'components output order (text)',
+ 'default': 'RGB[W]', 'values': ('wire', 'RGB[W]', 'RGB', 'RGBW', 'RGWB')},
)
def __init__(self):
def reset(self):
self.samplerate = None
- self.oldpin = None
- self.ss_packet = None
- self.ss = None
- self.es = None
self.bits = []
- self.inreset = False
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
- def handle_bits(self, samplenum):
- if self.options['type'] == 'RGB':
- if len(self.bits) == 24:
- grb = reduce(lambda a, b: (a << 1) | b, self.bits)
- rgb = (grb & 0xff0000) >> 8 | (grb & 0x00ff00) << 8 | (grb & 0x0000ff)
- self.put(self.ss_packet, samplenum, self.out_ann,
- [2, ['#%06x' % rgb]])
- self.bits = []
- self.ss_packet = None
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
+
+ def handle_bits(self):
+ if len(self.bits) < self.need_bits:
+ return
+ ss_packet, es_packet = self.bits[0][1], self.bits[-1][2]
+ r, g, b, w = 0, 0, 0, None
+ comps = []
+ for i, c in enumerate(self.wireformat):
+ first_idx, after_idx = 8 * i, 8 * i + 8
+ comp_bits = self.bits[first_idx:after_idx]
+ comp_ss, comp_es = comp_bits[0][1], comp_bits[-1][2]
+ comp_value = bitpack_msb(comp_bits, 0)
+ comp_text = '{:02x}'.format(comp_value)
+ comp_ann = {
+ 'r': ANN_COMP_R, 'g': ANN_COMP_G,
+ 'b': ANN_COMP_B, 'w': ANN_COMP_W,
+ }.get(c.lower(), None)
+ comp_item = (comp_ss, comp_es, comp_ann, comp_value, comp_text)
+ comps.append(comp_item)
+ if c.lower() == 'r':
+ r = comp_value
+ elif c.lower() == 'g':
+ g = comp_value
+ elif c.lower() == 'b':
+ b = comp_value
+ elif c.lower() == 'w':
+ w = comp_value
+ wt = '' if w is None else '{:02x}'.format(w)
+ if self.textformat == 'wire':
+ rgb_text = '#' + ''.join([c[-1] for c in comps])
else:
- if len(self.bits) == 32:
- grb = reduce(lambda a, b: (a << 1) | b, self.bits)
- rgb = (grb & 0xff0000) >> 8 | (grb & 0x00ff00) << 8 | (grb & 0xff0000ff)
- self.put(self.ss_packet, samplenum, self.out_ann,
- [2, ['#%08x' % rgb]])
- self.bits = []
- self.ss_packet = None
+ rgb_text = self.textformat.format(r = r, g = g, b = b, w = w, wt = wt)
+ for ss_comp, es_comp, cls_comp, value_comp, text_comp in comps:
+ self.putg(ss_comp, es_comp, cls_comp, [text_comp])
+ if rgb_text:
+ self.putg(ss_packet, es_packet, ANN_RGB, [rgb_text])
+ self.bits.clear()
+
+ def handle_bit(self, ss, es, value, ann_late = False):
+ if not ann_late:
+ text = ['{:d}'.format(value)]
+ self.putg(ss, es, ANN_BIT, text)
+ item = (value, ss, es)
+ self.bits.append(item)
+ self.handle_bits()
+ if ann_late:
+ text = ['{:d}'.format(value)]
+ self.putg(ss, es, ANN_BIT, text)
def decode(self):
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
- while True:
- # TODO: Come up with more appropriate self.wait() conditions.
- (pin,) = self.wait()
-
- if self.oldpin is None:
- self.oldpin = pin
- continue
-
- # Check RESET condition (manufacturer recommends 50 usec minimal,
- # but real minimum is ~10 usec).
- if not self.inreset and not pin and self.es is not None and \
- self.ss is not None and \
- (self.samplenum - self.es) / self.samplerate > 50e-6:
-
- # Decode last bit value.
- tH = (self.es - self.ss) / self.samplerate
- bit_ = True if tH >= 625e-9 else False
-
- self.bits.append(bit_)
- self.handle_bits(self.es)
-
- self.put(self.ss, self.es, self.out_ann, [0, ['%d' % bit_]])
- self.put(self.es, self.samplenum, self.out_ann,
- [1, ['RESET', 'RST', 'R']])
-
- self.inreset = True
- self.bits = []
- self.ss_packet = None
- self.ss = None
-
- if not self.oldpin and pin:
- # Rising edge.
- if self.ss and self.es:
- period = self.samplenum - self.ss
- duty = self.es - self.ss
- # Ideal duty for T0H: 33%, T1H: 66%.
- bit_ = (duty / period) > 0.5
+ # Preprocess options here, to simplify logic which executes
+ # much later in loops while settings have the same values.
+ wireorder = self.options['wireorder'].lower()
+ self.wireformat = [c for c in wireorder if c in 'rgbw']
+ self.need_bits = len(self.wireformat) * 8
+ textorder = self.options['textorder'].lower()
+ if textorder == 'wire':
+ self.textformat = 'wire'
+ elif textorder == 'rgb[w]':
+ self.textformat = '#{r:02x}{g:02x}{b:02x}{wt:s}'
+ else:
+ self.textformat = {
+ # "Obvious" permutations of R/G/B.
+ 'bgr': '#{b:02x}{g:02x}{r:02x}',
+ 'brg': '#{b:02x}{r:02x}{g:02x}',
+ 'gbr': '#{g:02x}{b:02x}{r:02x}',
+ 'grb': '#{g:02x}{r:02x}{b:02x}',
+ 'rbg': '#{r:02x}{b:02x}{g:02x}',
+ 'rgb': '#{r:02x}{g:02x}{b:02x}',
+ # RGB plus White. Only one of them useful?
+ 'rgbw': '#{r:02x}{g:02x}{b:02x}{w:02x}',
+ # Weird RGBW permutation for compatibility to test case.
+ # Neither used RGBW nor the 'wire' order. Obsolete now?
+ 'rgwb': '#{r:02x}{g:02x}{w:02x}{b:02x}',
+ }.get(textorder, None)
+ if self.textformat is None:
+ raise DecoderError('Unsupported text output format.')
- self.put(self.ss, self.samplenum, self.out_ann,
- [0, ['%d' % bit_]])
+ # Either check for edges which communicate bit values, or for
+ # long periods of idle level which represent a reset pulse.
+ # Track the left-most, right-most, and inner edge positions of
+ # a bit. The positive period's width determines the bit's value.
+ # Initially synchronize to the input stream by searching for a
+ # low period, which preceeds a data bit or starts a reset pulse.
+ # Don't annotate the very first reset pulse, but process it. We
+ # may not see the right-most edge of a data bit when reset is
+ # adjacent to that bit time.
+ cond_bit_starts = {0: 'r'}
+ cond_inbit_edge = {0: 'f'}
+ samples_625ns = int(self.samplerate * 625e-9)
+ samples_50us = round(self.samplerate * 50e-6)
+ cond_reset_pulse = {'skip': samples_50us + 1}
+ conds = [cond_bit_starts, cond_inbit_edge, cond_reset_pulse]
+ ss_bit, inv_bit, es_bit = None, None, None
+ pin, = self.wait({0: 'l'})
+ inv_bit = self.samplenum
+ check_reset = False
+ while True:
+ pin, = self.wait(conds)
- self.bits.append(bit_)
- self.handle_bits(self.samplenum)
+ # Check RESET condition. Manufacturers may disagree on the
+ # minimal pulse width. 50us are recommended in datasheets,
+ # experiments suggest the limit is around 10us.
+ # When the RESET pulse is adjacent to the low phase of the
+ # last bit time, we have no appropriate condition for the
+ # bit time's end location. That's why this BIT's annotation
+ # is shorter (only spans the high phase), and the RESET
+ # annotation immediately follows (spans from the falling edge
+ # to the end of the minimum RESET pulse width).
+ if check_reset and self.matched[2]:
+ es_bit = inv_bit
+ ss_rst, es_rst = inv_bit, self.samplenum
- if self.ss_packet is None:
- self.ss_packet = self.samplenum
+ if ss_bit and inv_bit and es_bit:
+ # Decode last bit value. Use the last processed bit's
+ # width for comparison when available. Fallback to an
+ # arbitrary threshold otherwise (which can result in
+ # false detection of value 1 for those captures where
+ # high and low pulses are of similar width).
+ duty = inv_bit - ss_bit
+ thres = samples_625ns
+ if self.bits:
+ period = self.bits[-1][2] - self.bits[-1][1]
+ thres = period * 0.5
+ bit_value = 1 if duty >= thres else 0
+ self.handle_bit(ss_bit, inv_bit, bit_value, True)
- self.ss = self.samplenum
+ if ss_rst and es_rst:
+ text = ['RESET', 'RST', 'R']
+ self.putg(ss_rst, es_rst, ANN_RESET, text)
+ check_reset = False
- elif self.oldpin and not pin:
- # Falling edge.
- self.inreset = False
- self.es = self.samplenum
+ self.bits.clear()
+ ss_bit, inv_bit, es_bit = None, None, None
- self.oldpin = pin
+ # Rising edge starts a bit time. Falling edge ends its high
+ # period. Get the previous bit's duty cycle and thus its
+ # bit value when the next bit starts.
+ if self.matched[0]: # and pin:
+ check_reset = False
+ if ss_bit and inv_bit:
+ # Got a previous bit? Handle it.
+ es_bit = self.samplenum
+ period = es_bit - ss_bit
+ duty = inv_bit - ss_bit
+ # Ideal duty for T0H: 33%, T1H: 66%.
+ bit_value = 1 if (duty / period) > 0.5 else 0
+ self.handle_bit(ss_bit, es_bit, bit_value)
+ ss_bit, inv_bit, es_bit = self.samplenum, None, None
+ if self.matched[1]: # and not pin:
+ check_reset = True
+ inv_bit = self.samplenum
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2016 Anthony Symons <antus@pcmhacking.net>
+## Copyright (C) 2023 Gerhard Sittig <gerhard.sittig@gmx.net>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
##
import sigrokdecode as srd
+from common.srdhelper import bitpack_msb
+
+# VPW Timings. From the SAE J1850 1995 rev section 23.406 documentation.
+# Ideal, minimum and maximum tolerances.
+VPW_SOF = 200
+VPW_SOFL = 164
+VPW_SOFH = 245 # 240 by the spec, 245 so a 60us 4x sample will pass
+VPW_LONG = 128
+VPW_LONGL = 97
+VPW_LONGH = 170 # 164 by the spec but 170 for low sample rate tolerance.
+VPW_SHORT = 64
+VPW_SHORTL = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling
+VPW_SHORTH = 97
+VPW_IFS = 240
class SamplerateError(Exception):
pass
-def timeuf(t):
- return int (t * 1000.0 * 1000.0)
-
-class Ann:
- ANN_RAW, ANN_SOF, ANN_IFS, ANN_DATA, \
- ANN_PACKET = range(5)
+(
+ ANN_SOF, ANN_BIT, ANN_IFS, ANN_BYTE,
+ ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM,
+ ANN_M1_PID,
+ ANN_WARN,
+) = range(12)
class Decoder(srd.Decoder):
api_version = 3
{'id': 'data', 'name': 'Data', 'desc': 'Data line'},
)
annotations = (
- ('raw', 'Raw'),
('sof', 'SOF'),
+ ('bit', 'Bit'),
('ifs', 'EOF/IFS'),
+ ('byte', 'Byte'),
+ ('prio', 'Priority'),
+ ('dest', 'Destination'),
+ ('src', 'Source'),
+ ('mode', 'Mode'),
('data', 'Data'),
- ('packet', 'Packet'),
+ ('csum', 'Checksum'),
+ ('m1_pid', 'Pid'),
+ ('warn', 'Warning'),
)
annotation_rows = (
- ('raws', 'Raws', (Ann.ANN_RAW, Ann.ANN_SOF, Ann.ANN_IFS,)),
- ('bytes', 'Bytes', (Ann.ANN_DATA,)),
- ('packets', 'Packets', (Ann.ANN_PACKET,)),
+ ('bits', 'Bits', (ANN_SOF, ANN_BIT, ANN_IFS,)),
+ ('bytes', 'Bytes', (ANN_BYTE,)),
+ ('fields', 'Fields', (ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM,)),
+ ('values', 'Values', (ANN_M1_PID,)),
+ ('warns', 'Warnings', (ANN_WARN,)),
)
+ # TODO Add support for options? Polarity. Glitch length.
def __init__(self):
self.reset()
def reset(self):
- self.state = 'IDLE'
self.samplerate = None
- self.byte = 0 # the byte offset in the packet
- self.mode = 0 # for by packet decode
- self.data = 0 # the current byte
- self.datastart = 0 # sample number this byte started at
- self.csa = 0 # track the last byte seperately to retrospectively add the CS marker
- self.csb = 0
- self.count = 0 # which bit number we are up to
- self.active = 0 # which logic level is considered active
-
- # vpw timings. ideal, min and max tollerances.
- # From SAE J1850 1995 rev section 23.406
-
- self.sof = 200
- self.sofl = 164
- self.sofh = 245 # 240 by the spec, 245 so a 60us 4x sample will pass
- self.long = 128
- self.longl = 97
- self.longh = 170 # 164 by the spec but 170 for low sample rate tolerance.
- self.short = 64
- self.shortl = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling
- self.shorth = 97
- self.ifs = 240
- self.spd = 1 # set to 4 when a 4x SOF is detected (VPW high speed frame)
+ self.active = 0 # Signal polarity. Needs to become an option?
+ self.bits = []
+ self.fields = {}
- def handle_bit(self, ss, es, b):
- self.data |= (b << 7-self.count) # MSB-first
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["%d" % b]])
- if self.count == 0:
- self.datastart = ss
- if self.count == 7:
- self.csa = self.datastart # for CS
- self.csb = self.samplenum # for CS
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_DATA, ["%02X" % self.data]])
- # add protocol parsing here
- if self.byte == 0:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Priority','Prio','P']])
- elif self.byte == 1:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Destination','Dest','D']])
- elif self.byte == 2:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Source','Src','S']])
- elif self.byte == 3:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Mode','M']])
- self.mode = self.data
- elif self.mode == 1 and self.byte == 4: # mode 1 payload
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Pid','P']])
-
- # prepare for next byte
- self.count = -1
- self.data = 0
- self.byte = self.byte + 1 # track packet offset
- self.count = self.count + 1
+ def start(self):
+ self.out_ann = self.register(srd.OUTPUT_ANN)
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
- def start(self):
- self.out_ann = self.register(srd.OUTPUT_ANN)
+ def putg(self, ss, es, cls, texts):
+ self.put(ss, es, self.out_ann, [cls, texts])
+
+ def invalidate_frame_details(self):
+ self.bits.clear()
+ self.fields.clear()
+
+ def handle_databytes(self, fields, data):
+ # TODO Deep inspection of header fields and data values, including
+ # checksum verification results.
+ mode = fields.get('mode', None)
+ if mode is None:
+ return
+ if mode == 1:
+ # An earlier implementation commented that for mode 1 the
+ # first data byte would be the PID. But example captures
+ # have no data bytes in packets for that mode. This position
+ # is taken by the checksum. Is this correct?
+ pid = data[0] if data else fields.get('csum', None)
+ if pid is None:
+ text = ['PID missing']
+ self.putg(ss, es, ANN_WARN, text)
+ else:
+ byte_text = '{:02x}'.format(pid)
+ self.putg(ss, es, ANN_M1_PID, [byte_text])
+
+ def handle_byte(self, ss, es, b):
+ # Annotate all raw byte values. Inspect and process the first
+ # bytes in a frame already. Cease inspection and only accumulate
+ # all other bytes after the mode. The checksum's position and
+ # thus the data bytes' span will only be known when EOF or IFS
+ # were seen. Implementor's note: This method just identifies
+ # header fields. Processing is left to the .handle_databytes()
+ # method. Until then validity will have been checked, too (CS).
+ byte_text = '{:02x}'.format(b)
+ self.putg(ss, es, ANN_BYTE, [byte_text])
+
+ if not 'prio' in self.fields:
+ self.fields.update({'prio': b})
+ self.putg(ss, es, ANN_PRIO, [byte_text])
+ return
+ if not 'dest' in self.fields:
+ self.fields.update({'dest': b})
+ self.putg(ss, es, ANN_DEST, [byte_text])
+ return
+ if not 'src' in self.fields:
+ self.fields.update({'src': b})
+ self.putg(ss, es, ANN_SRC, [byte_text])
+ return
+ if not 'mode' in self.fields:
+ self.fields.update({'mode': b})
+ self.putg(ss, es, ANN_MODE, [byte_text])
+ return
+ if not 'data' in self.fields:
+ self.fields.update({'data': [], 'csum': None})
+ self.fields['data'].append((b, ss, es))
+
+ def handle_sof(self, ss, es, speed):
+ text = ['{speed:d}x SOF', 'S{speed:d}', 'S']
+ text = [f.format(speed = speed) for f in text]
+ self.putg(ss, es, ANN_SOF, text)
+ self.invalidate_frame_details()
+ self.fields.update({'speed': speed})
+
+ def handle_bit(self, ss, es, b):
+ self.bits.append((b, ss, es))
+ self.putg(ss, es, ANN_BIT, ['{:d}'.format(b)])
+ if len(self.bits) < 8:
+ return
+ ss, es = self.bits[0][1], self.bits[-1][2]
+ b = bitpack_msb(self.bits, 0)
+ self.bits.clear()
+ self.handle_byte(ss, es, b)
+
+ def handle_eof(self, ss, es, is_ifs = False):
+ # EOF or IFS were seen. Post process the data bytes sequence.
+ # Separate the checksum from the data bytes. Emit annotations.
+ # Pass data bytes and header fields to deeper inspection.
+ data = self.fields.get('data', {})
+ if not data:
+ text = ['Short data phase', 'Data']
+ self.putg(ss, es, ANN_WARN, text)
+ csum = None
+ if len(data) >= 1:
+ csum, ss_csum, es_csum = data.pop()
+ self.fields.update({'csum': csum})
+ # TODO Verify checksum's correctness?
+ if data:
+ ss_data, es_data = data[0][1], data[-1][2]
+ text = ' '.join(['{:02x}'.format(b[0]) for b in data])
+ self.putg(ss_data, es_data, ANN_DATA, [text])
+ if csum is not None:
+ text = '{:02x}'.format(csum)
+ self.putg(ss_csum, es_csum, ANN_CSUM, [text])
+ text = ['IFS', 'I'] if is_ifs else ['EOF', 'E']
+ self.putg(ss, es, ANN_IFS, text)
+ self.handle_databytes(self.fields, data);
+ self.invalidate_frame_details()
+
+ def handle_unknown(self, ss, es):
+ text = ['Unknown condition', 'Unknown', 'UNK']
+ self.putg(ss, es, ANN_WARN, text)
+ self.invalidate_frame_details()
+
+ def usecs_to_samples(self, us):
+ us *= 1e-6
+ us *= self.samplerate
+ return int(us)
+
+ def samples_to_usecs(self, n):
+ n /= self.samplerate
+ n *= 1000.0 * 1000.0
+ return int(n)
def decode(self):
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
- self.wait({0: 'e'})
+ # Get the distance between edges. Classify the distance
+ # to derive symbols and data bit values. Prepare waiting
+ # for an interframe gap as well, while this part of the
+ # condition is optional (switches in and out at runtime).
+ conds_edge = {0: 'e'}
+ conds_edge_only = [conds_edge]
+ conds_edge_idle = [conds_edge, {'skip': 0}]
+ conds = conds_edge_only
+ self.wait(conds)
es = self.samplenum
+ spd = None
while True:
ss = es
- pin, = self.wait({0: 'e'})
+ pin, = self.wait(conds)
es = self.samplenum
+ count = es - ss
+ t = self.samples_to_usecs(count)
+
+ # Synchronization to the next frame. Wait for SOF.
+ # Silently keep synchronizing until SOF was seen.
+ if spd is None:
+ if not self.matched[0]:
+ continue
+ if pin != self.active:
+ continue
+
+ # Detect the frame's speed from the SOF length. Adjust
+ # the expected BIT lengths to the SOF derived speed.
+ # Arrange for the additional supervision of EOF/IFS.
+ if t in range(VPW_SOFL // 1, VPW_SOFH // 1):
+ spd = 1
+ elif t in range(VPW_SOFL // 4, VPW_SOFH // 4):
+ spd = 4
+ else:
+ continue
+ short_lower, short_upper = VPW_SHORTL // spd, VPW_SHORTH // spd
+ long_lower, long_upper = VPW_LONGL // spd, VPW_LONGH // spd
+ samples = self.usecs_to_samples(VPW_IFS // spd)
+ conds_edge_idle[-1]['skip'] = samples
+ conds = conds_edge_idle
+
+ # Emit the SOF annotation. Start collecting DATA.
+ self.handle_sof(ss, es, spd)
+ continue
+
+ # Inside the DATA phase. Get data bits. Handle EOF/IFS.
+ if len(conds) > 1 and self.matched[1]:
+ # TODO The current implementation gets here after a
+ # pre-determined minimum wait time. Does not differ
+ # between EOF and IFS. An earlier implementation had
+ # this developer note: EOF=239-280 IFS=281+
+ self.handle_eof(ss, es)
+ # Enter the IDLE phase. Wait for the next SOF.
+ spd = None
+ conds = conds_edge_only
+ continue
+ if t in range(short_lower, short_upper):
+ value = 1 if pin == self.active else 0
+ self.handle_bit(ss, es, value)
+ continue
+ if t in range(long_lower, long_upper):
+ value = 0 if pin == self.active else 1
+ self.handle_bit(ss, es, value)
+ continue
- samples = es - ss
- t = timeuf(samples / self.samplerate)
- if self.state == 'IDLE': # detect and set speed from the size of sof
- if pin == self.active and t in range(self.sofl , self.sofh):
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['1X SOF', 'S1', 'S']])
- self.spd = 1
- self.data = 0
- self.count = 0
- self.state = 'DATA'
- elif pin == self.active and t in range(int(self.sofl / 4) , int(self.sofh / 4)):
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['4X SOF', 'S4', '4']])
- self.spd = 4
- self.data = 0
- self.count = 0
- self.state = 'DATA'
-
- elif self.state == 'DATA':
- if t >= int(self.ifs / self.spd):
- self.state = 'IDLE'
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["EOF/IFS", "E"]]) # EOF=239-280 IFS=281+
- self.put(self.csa, self.csb, self.out_ann, [Ann.ANN_PACKET, ['Checksum','CS','C']]) # retrospective print of CS
- self.byte = 0 # reset packet offset
- elif t in range(int(self.shortl / self.spd), int(self.shorth / self.spd)):
- if pin == self.active:
- self.handle_bit(ss, es, 1)
- else:
- self.handle_bit(ss, es, 0)
- elif t in range(int(self.longl / self.spd), int(self.longh / self.spd)):
- if pin == self.active:
- self.handle_bit(ss, es, 0)
- else:
- self.handle_bit(ss, es, 1)
+ # Implementation detail: An earlier implementation used to
+ # ignore everything that was not handled above. This would
+ # be motivated by the noisy environment the protocol is
+ # typically used in. This more recent implementation accepts
+ # short glitches, but by design falls back to synchronization
+ # to the input stream for other unhandled conditions. This
+ # wants to improve usability of the decoder, by presenting
+ # potential issues to the user. The threshold (microseconds
+ # between edges that are not valid symbols that are handled
+ # above) is an arbitrary choice.
+ if t <= 2:
+ continue
+ self.handle_unknown(ss, es)
+ spd = None
+ conds = conds_edge_only
0x15: 'FM25Q32',
},
'macronix': {
+ 0x13: 'MX25L8006',
0x14: 'MX25L1605D',
0x15: 'MX25L3205D',
0x16: 'MX25L6405D',
'sector_size': 4 * 1024,
'block_size': 64 * 1024,
},
+ 'macronix_mx25l8006': {
+ 'vendor': 'Macronix',
+ 'model': 'MX25L8006',
+ 'res_id': 0x13,
+ 'rems_id': 0xc213,
+ 'rems2_id': 0xc213,
+ 'rdid_id': 0xc22013,
+ 'page_size': 256,
+ 'sector_size': 4 * 1024,
+ 'block_size': 64 * 1024,
+ },
# Winbond
'winbond_w25q80dv': {
'vendor': 'Winbond',
##
## This file is part of the libsigrokdecode project.
##
-## Copyright (C) 2019-2020 Benjamin Vernoux <bvernoux@gmail.com>
+## Copyright (C) 2019-2021 Benjamin Vernoux <bvernoux@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
+## v0.1 - 17 September 2019 B.VERNOUX
+### Use ST25R3916 Datasheet DS12484 Rev 1 (January 2019)
+## v0.2 - 28 April 2020 B.VERNOUX
+### Use ST25R3916 Datasheet DS12484 Rev 2 (December 2019)
+## v0.3 - 17 June 2020 B.VERNOUX
+### Use ST25R3916 Datasheet DS12484 Rev 3 (04 June 2020)
+## v0.4 - 10 Aug 2021 B.VERNOUX
+### Fix FIFOR/FIFOW issues with Pulseview (with "Tabular Output View")
+### because of FIFO Read/FIFO Write commands, was not returning the
+### annotations short name FIFOR/FIFOW
import sigrokdecode as srd
from collections import namedtuple
def format_command(self):
'''Returns the label for the current command.'''
- if self.cmd in ('Write', 'Read', 'WriteB', 'ReadB', 'WriteT', 'ReadT', 'FIFO Write', 'FIFO Read'):
+ if self.cmd in ('Write', 'Read', 'WriteB', 'ReadB', 'WriteT', 'ReadT', 'FIFOW', 'FIFOR'):
return self.cmd
if self.cmd == 'Cmd':
reg = dir_cmd.get(self.dat, 'Unknown direct command')
# Register Space-B Access 0b11111011 0xFB => 'Space B'
# Register Test Access 0b11111100 0xFC => 'TestAccess'
if b == 0x80:
- return ('FIFO Write', b, 1, 99999)
+ return ('FIFOW', b, 1, 99999)
if b == 0xA0:
return ('Write', b, 1, 99999)
if b == 0xA8:
if b == 0xBF:
return ('Read', b, 1, 99999)
if b == 0x9F:
- return ('FIFO Read', b, 1, 99999)
+ return ('FIFOR', b, 1, 99999)
if (b >= 0x0C and b <= 0xE8) :
return ('Cmd', b, 0, 0)
if b == 0xFB:
self.decode_reg(pos, Ann.BURST_WRITET, self.dat, self.mosi_bytes())
elif self.cmd == 'ReadT':
self.decode_reg(pos, Ann.BURST_READT, self.dat, self.miso_bytes())
- elif self.cmd == 'FIFO Write':
+ elif self.cmd == 'FIFOW':
self.decode_reg(pos, Ann.FIFO_WRITE, self.dat, self.mosi_bytes())
- elif self.cmd == 'FIFO Read':
+ elif self.cmd == 'FIFOR':
self.decode_reg(pos, Ann.FIFO_READ, self.dat, self.miso_bytes())
elif self.cmd == 'Cmd':
self.decode_reg(pos, Ann.DIRECTCMD, self.dat, self.mosi_bytes())
return ret;
}
}
+ env_path = g_getenv("SIGROKDECODE_PATH");
+ if (env_path) {
+ char **dir_list, **dir_iter, *dir_item;
+ dir_list = g_strsplit(env_path, G_SEARCHPATH_SEPARATOR_S, 0);
+ for (dir_iter = dir_list; *dir_iter; dir_iter++) {
+ dir_item = *dir_iter;
+ if (!dir_item || !*dir_item)
+ continue;
+ ret = srd_decoder_searchpath_add(dir_item);
+ if (ret != SRD_OK) {
+ Py_Finalize();
+ return ret;
+ }
+ }
+ g_strfreev(dir_list);
+ }
- /* Initialize the Python GIL (this also happens to acquire it). */
+#if PY_VERSION_HEX < 0x03090000
+ /*
+ * Initialize and acquire the Python GIL. In Python 3.7+ this
+ * will be done implicitly as part of the Py_InitializeEx()
+ * call above. PyEval_InitThreads() was deprecated in 3.9.
+ */
PyEval_InitThreads();
+#endif
/* Release the GIL (ignore return value, we don't need it here). */
(void)PyEval_SaveThread();
/* Should be a list of [annotation class, [string, ...]]. */
if (!PyList_Check(obj)) {
- srd_err("Protocol decoder %s submitted an annotation that"
- " is not a list", di->decoder->name);
+ srd_err("Protocol decoder %s submitted an annotation that is not a list",
+ di->decoder->name);
goto err;
}
/* Should have 2 elements. */
if (PyList_Size(obj) != 2) {
- srd_err("Protocol decoder %s submitted annotation list with "
- "%zd elements instead of 2", di->decoder->name,
- PyList_Size(obj));
+ ssize_t sz = PyList_Size(obj);
+ srd_err("Protocol decoder %s submitted annotation list with %zd elements instead of 2",
+ di->decoder->name, sz);
goto err;
}
*/
py_tmp = PyList_GetItem(obj, 0);
if (!PyLong_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted annotation list, but "
- "first element was not an integer.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted annotation list, but first element was not an integer.",
+ di->decoder->name);
goto err;
}
ann_class = PyLong_AsLong(py_tmp);
if (!(pdo = g_slist_nth_data(di->decoder->annotations, ann_class))) {
- srd_err("Protocol decoder %s submitted data to unregistered "
- "annotation class %d.", di->decoder->name, ann_class);
+ srd_err("Protocol decoder %s submitted data to unregistered annotation class %d.",
+ di->decoder->name, ann_class);
goto err;
}
/* Second element must be a list. */
py_tmp = PyList_GetItem(obj, 1);
if (!PyList_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted annotation list, but "
- "second element was not a list.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted annotation list, but second element was not a list.",
+ di->decoder->name);
goto err;
}
if (py_strseq_to_char(py_tmp, &ann_text) != SRD_OK) {
- srd_err("Protocol decoder %s submitted annotation list, but "
- "second element was malformed.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted annotation list, but second element was malformed.",
+ di->decoder->name);
goto err;
}
/* Should have 2 elements. */
if (PyList_Size(obj) != 2) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list "
- "with %zd elements instead of 2", di->decoder->name,
- PyList_Size(obj));
+ ssize_t sz = PyList_Size(obj);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list with %zd elements instead of 2",
+ di->decoder->name, sz);
goto err;
}
/* The first element should be an integer. */
py_tmp = PyList_GetItem(obj, 0);
if (!PyLong_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, "
- "but first element was not an integer.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, but first element was not an integer.",
+ di->decoder->name);
goto err;
}
bin_class = PyLong_AsLong(py_tmp);
if (!(class_name = g_slist_nth_data(di->decoder->binary, bin_class))) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with "
- "unregistered binary class %d.", di->decoder->name, bin_class);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with unregistered binary class %d.",
+ di->decoder->name, bin_class);
goto err;
}
/* Second element should be bytes. */
py_tmp = PyList_GetItem(obj, 1);
if (!PyBytes_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, "
- "but second element was not bytes.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, but second element was not bytes.",
+ di->decoder->name);
goto err;
}
/* Consider an empty set of bytes a bug. */
if (PyBytes_Size(py_tmp) == 0) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY "
- "with empty data set.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with empty data set.",
+ di->decoder->name);
goto err;
}
if (g_variant_type_equal(pdata->pdo->meta_type, G_VARIANT_TYPE_INT64)) {
if (!PyLong_Check(obj)) {
- PyErr_Format(PyExc_TypeError, "This output was registered "
- "as 'int', but something else was passed.");
+ PyErr_Format(PyExc_TypeError,
+ "This output was registered as 'int', but something else was passed.");
goto err;
}
intvalue = PyLong_AsLongLong(obj);
pdata->data = g_variant_new_int64(intvalue);
} else if (g_variant_type_equal(pdata->pdo->meta_type, G_VARIANT_TYPE_DOUBLE)) {
if (!PyFloat_Check(obj)) {
- PyErr_Format(PyExc_TypeError, "This output was registered "
- "as 'float', but something else was passed.");
+ PyErr_Format(PyExc_TypeError,
+ "This output was registered as 'float', but something else was passed.");
goto err;
}
dvalue = PyFloat_AsDouble(obj);
start_sample,
end_sample, output_type_name(pdo->output_type),
output_id, pdo->proto_id, next_di->inst_id);
- if (!(py_res = PyObject_CallMethod(
- next_di->py_inst, "decode", "KKO", start_sample,
- end_sample, py_data))) {
+ py_res = PyObject_CallMethod(next_di->py_inst, "decode",
+ "KKO", start_sample, end_sample, py_data);
+ if (!py_res) {
srd_exception_catch("Calling %s decode() failed",
next_di->inst_id);
}
* when the sample data is exhausted.
*/
if (di->communicate_eof) {
+ /* Advance self.samplenum to the (absolute) last sample number. */
+ py_samplenum = PyLong_FromUnsignedLongLong(di->abs_cur_samplenum);
+ PyObject_SetAttrString(di->py_inst, "samplenum", py_samplenum);
+ Py_DECREF(py_samplenum);
+ /* Raise an EOFError Python exception. */
srd_dbg("%s: %s: Raising EOF from wait().",
di->inst_id, __func__);
g_mutex_unlock(&di->data_mutex);
SRD_PRIV int py_attr_as_strlist(PyObject *py_obj, const char *attr, GSList **outstrlist)
{
PyObject *py_list;
- Py_ssize_t i;
+ ssize_t idx;
int ret;
char *outstr;
PyGILState_STATE gstate;
*outstrlist = NULL;
- for (i = 0; i < PyList_Size(py_list); i++) {
- ret = py_listitem_as_str(py_list, i, &outstr);
+ for (idx = 0; idx < PyList_Size(py_list); idx++) {
+ ret = py_listitem_as_str(py_list, idx, &outstr);
if (ret < 0) {
- srd_dbg("Couldn't get item %" PY_FORMAT_SIZE_T "d.", i);
+ srd_dbg("Couldn't get item %zd.", idx);
goto err;
}
*outstrlist = g_slist_append(*outstrlist, outstr);
SRD_PRIV int py_listitem_as_str(PyObject *py_obj, Py_ssize_t idx,
char **outstr)
{
- PyObject *py_value;
PyGILState_STATE gstate;
+ ssize_t item_idx;
+ PyObject *py_value;
gstate = PyGILState_Ensure();
goto err;
}
- if (!(py_value = PyList_GetItem(py_obj, idx))) {
- srd_dbg("Couldn't get list item %" PY_FORMAT_SIZE_T "d.", idx);
+ item_idx = idx;
+ if (!(py_value = PyList_GetItem(py_obj, item_idx))) {
+ srd_dbg("Couldn't get list item %zd.", item_idx);
goto err;
}