- ]
- optional_probes = []
- options = {
- 'address_format': ['Displayed slave address format', 'shifted'],
- }
- annotations = [
- ['Start', 'Start condition'],
- ['Repeat start', 'Repeat start condition'],
- ['Stop', 'Stop condition'],
- ['ACK', 'ACK'],
- ['NACK', 'NACK'],
- ['Address read', 'Address read'],
- ['Address write', 'Address write'],
- ['Data read', 'Data read'],
- ['Data write', 'Data write'],
- ['Warnings', 'Human-readable warnings'],
- ]
-
- def __init__(self, **kwargs):
- self.startsample = -1
- self.samplenum = None
- self.bitcount = 0
- self.databyte = 0
- self.wr = -1
- self.is_repeat_start = 0
- self.state = 'FIND START'
- self.oldscl = 1
- self.oldsda = 1
- self.oldpins = [1, 1]
-
- def start(self, metadata):
- self.out_proto = self.add(srd.OUTPUT_PROTO, 'i2c')
- self.out_ann = self.add(srd.OUTPUT_ANN, 'i2c')
-
- def report(self):
- pass
-
- def putx(self, data):
- self.put(self.startsample, self.samplenum, self.out_ann, data)
-
- def putp(self, data):
- self.put(self.startsample, self.samplenum, self.out_proto, data)
-
- def is_start_condition(self, scl, sda):
- # START condition (S): SDA = falling, SCL = high
- if (self.oldsda == 1 and sda == 0) and scl == 1:
- return True
- return False
-
- def is_data_bit(self, scl, sda):
- # Data sampling of receiver: SCL = rising
- if self.oldscl == 0 and scl == 1:
- return True
- return False
-
- def is_stop_condition(self, scl, sda):
- # STOP condition (P): SDA = rising, SCL = high
- if (self.oldsda == 0 and sda == 1) and scl == 1:
- return True
- return False
-
- def found_start(self, scl, sda):
- self.startsample = self.samplenum
- cmd = 'START REPEAT' if (self.is_repeat_start == 1) else 'START'
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- self.state = 'FIND ADDRESS'
- self.bitcount = self.databyte = 0
- self.is_repeat_start = 1
- self.wr = -1
+ )
+ options = (
+ {'id': 'address_format', 'desc': 'Displayed slave address format',
+ 'default': 'shifted', 'values': ('shifted', 'unshifted')},
+ )
+ annotations = (
+ ('start', 'Start condition'),
+ ('repeat-start', 'Repeat start condition'),
+ ('stop', 'Stop condition'),
+ ('ack', 'ACK'),
+ ('nack', 'NACK'),
+ ('bit', 'Data/address bit'),
+ ('address-read', 'Address read'),
+ ('address-write', 'Address write'),
+ ('data-read', 'Data read'),
+ ('data-write', 'Data write'),
+ ('warning', 'Warning'),
+ )
+ annotation_rows = (
+ ('bits', 'Bits', (5,)),
+ ('addr-data', 'Address/data', (0, 1, 2, 3, 4, 6, 7, 8, 9)),
+ ('warnings', 'Warnings', (10,)),
+ )
+ binary = (
+ ('address-read', 'Address read'),
+ ('address-write', 'Address write'),
+ ('data-read', 'Data read'),
+ ('data-write', 'Data write'),
+ )
+
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
+ self.samplerate = None
+ self.is_write = None
+ self.rem_addr_bytes = None
+ self.slave_addr_7 = None
+ self.slave_addr_10 = None
+ self.is_repeat_start = False
+ self.pdu_start = None
+ self.pdu_bits = 0
+ self.data_bits = []
+ self.bitwidth = 0
+
+ def metadata(self, key, value):
+ if key == srd.SRD_CONF_SAMPLERATE:
+ self.samplerate = value
+
+ def start(self):
+ self.out_python = self.register(srd.OUTPUT_PYTHON)
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_binary = self.register(srd.OUTPUT_BINARY)
+ self.out_bitrate = self.register(srd.OUTPUT_META,
+ meta=(int, 'Bitrate', 'Bitrate from Start bit to Stop bit'))
+
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
+
+ def putp(self, ss, es, data):
+ self.put(ss, es, self.out_python, data)
+
+ def putb(self, ss, es, data):
+ self.put(ss, es, self.out_binary, data)
+
+ def _wants_start(self):
+ # Check whether START is required (to sync to the input stream).
+ return self.pdu_start is None
+
+ def _collects_address(self):
+ # Check whether the transfer still is in the address phase (is
+ # still collecting address and r/w details, or has not started
+ # collecting it).
+ return self.rem_addr_bytes is None or self.rem_addr_bytes != 0
+
+ def _collects_byte(self):
+ # Check whether bits of a byte are being collected. Outside of
+ # the data byte, the bit is the ACK/NAK slot.
+ return self.data_bits is None or len(self.data_bits) < 8
+
+ def handle_start(self, ss, es):
+ if self.is_repeat_start:
+ cmd = 'START REPEAT'
+ else:
+ cmd = 'START'
+ self.pdu_start = ss
+ self.pdu_bits = 0
+ self.putp(ss, es, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss, es, cls, texts)
+ self.is_repeat_start = True
+ self.is_write = None
+ self.slave_addr_7 = None
+ self.slave_addr_10 = None
+ self.rem_addr_bytes = None
+ self.data_bits.clear()
+ self.bitwidth = 0