From: Gerhard Sittig Date: Mon, 17 Jul 2023 18:16:15 +0000 (+0200) Subject: i2c: rephrase state machine, eliminate text matching X-Git-Url: https://sigrok.org/gitaction?a=commitdiff_plain;h=782c35b0158a84bbdbcfd2fc2f0ed7e507f17650;p=libsigrokdecode.git i2c: rephrase state machine, eliminate text matching Rephrase the conditions which drive the I2C decoder's progress. Remove self.state and its space separated string literals which were tedious to search for, and which "lent themselves" to magic string matching. Use other existing conditions instead, which also reduces redundancy. Defer the "forgetting data bits" until after ACK was seen. Comment on the motivation to keep the "protocol violating" implementation, which misses error conditions that happen on the wire, but happens to support heavily undersampled captures to an astonishing degree. Also addresses minor style nits: Prefer to think of "dominant ACK" and "recessive NAK". Rephrase a byte shift for the first address byte. Emit the first warning annotation for the row which was declared before but was not used so far. --- diff --git a/decoders/i2c/pd.py b/decoders/i2c/pd.py index 1f53e87..2259b45 100644 --- a/decoders/i2c/pd.py +++ b/decoders/i2c/pd.py @@ -63,6 +63,7 @@ proto = { 'ADDRESS WRITE': [7, 'Address write: {b:02X}', 'AW: {b:02X}', '{b:02X}'], 'DATA READ': [8, 'Data read: {b:02X}', 'DR: {b:02X}', '{b:02X}'], 'DATA WRITE': [9, 'Data write: {b:02X}', 'DW: {b:02X}', '{b:02X}'], + 'WARN': [10, '{text}'], } class Decoder(srd.Decoder): @@ -115,8 +116,9 @@ class Decoder(srd.Decoder): self.samplerate = None self.is_write = None self.rem_addr_bytes = None + self.slave_addr_7 = None + self.slave_addr_10 = None self.is_repeat_start = False - self.state = 'FIND START' self.pdu_start = None self.pdu_bits = 0 self.data_bits = [] @@ -142,6 +144,21 @@ class Decoder(srd.Decoder): def putb(self, ss, es, data): self.put(ss, es, self.out_binary, data) + def _wants_start(self): + # Check whether START is required (to sync to the input stream). + return self.pdu_start is None + + def _collects_address(self): + # Check whether the transfer still is in the address phase (is + # still collecting address and r/w details, or has not started + # collecting it). + return self.rem_addr_bytes is None or self.rem_addr_bytes != 0 + + def _collects_byte(self): + # Check whether bits of a byte are being collected. Outside of + # the data byte, the bit is the ACK/NAK slot. + return self.data_bits is None or len(self.data_bits) < 8 + def handle_start(self, ss, es): if self.is_repeat_start: cmd = 'START REPEAT' @@ -152,9 +169,10 @@ class Decoder(srd.Decoder): self.putp(ss, es, [cmd, None]) cls, texts = proto[cmd][0], proto[cmd][1:] self.putg(ss, es, cls, texts) - self.state = 'FIND ADDRESS' self.is_repeat_start = True self.is_write = None + self.slave_addr_7 = None + self.slave_addr_10 = None self.rem_addr_bytes = None self.data_bits.clear() self.bitwidth = 0 @@ -169,8 +187,9 @@ class Decoder(srd.Decoder): # the bit value gets sampled. Assume the start of the next bit # as the end sample number of the previous bit. Guess the last # bit's end sample number from the second last bit's width. - # (gsi: Shouldn't falling SCL be the end of the bit value?) # Keep the bits in receive order (MSB first) during accumulation. + # (gsi: Strictly speaking falling SCL would be the end of the + # bit value's validity. That'd break compatibility though.) if self.data_bits: self.data_bits[-1][2] = ss self.data_bits.append([value, ss, es]) @@ -181,12 +200,15 @@ class Decoder(srd.Decoder): # Get the byte value. Address and data are transmitted MSB-first. d = bitpack_msb(self.data_bits, 0) - if self.state == 'FIND ADDRESS': - # The READ/WRITE bit is only in the first address byte, not - # in data bytes. Address bit pattern 0b1111_0xxx means that - # this is a 10bit slave address, another byte follows. Get - # the R/W direction and the address bytes count from the - # first byte in the I2C transfer. + ss_byte, es_byte = self.data_bits[0][1], self.data_bits[-1][2] + + # Process the address bytes at the start of a transfer. The + # first byte will carry the R/W bit, and all of the 7bit address + # or part of a 10bit address. Bit pattern 0b11110xxx signals + # that another byte follows which carries the remaining bits of + # a 10bit slave address. + is_address = self._collects_address() + if is_address: addr_byte = d if self.rem_addr_bytes is None: if (addr_byte & 0xf8) == 0xf0: @@ -202,27 +224,34 @@ class Decoder(srd.Decoder): if self.is_write is None: read_bit = bool(addr_byte & 1) if self.options['address_format'] == 'shifted': - d = d >> 1 + d >>= 1 self.is_write = False if read_bit else True - else: + elif self.slave_addr_10 is not None: self.slave_addr_10 |= addr_byte - + else: + cls, texts = proto['WARN'][0], proto['WARN'][1:] + msg = 'Unhandled address byte' + texts = [t.format(text = msg) for t in texts] + self.putg(ss_byte, es_byte, cls, texts) + is_write = self.is_write + is_seven = self.slave_addr_7 is not None + + # Determine annotation classes depending on whether the byte is + # an address or payload data, and whether it's written or read. bin_class = -1 - if self.state == 'FIND ADDRESS' and self.is_write: + if is_address and is_write: cmd = 'ADDRESS WRITE' bin_class = 1 - elif self.state == 'FIND ADDRESS' and not self.is_write: + elif is_address and not is_write: cmd = 'ADDRESS READ' bin_class = 0 - elif self.state == 'FIND DATA' and self.is_write: + elif not is_address and is_write: cmd = 'DATA WRITE' bin_class = 3 - elif self.state == 'FIND DATA' and not self.is_write: + elif not is_address and not is_write: cmd = 'DATA READ' bin_class = 2 - ss_byte, es_byte = self.data_bits[0][1], self.data_bits[-1][2] - # Reverse the list of bits to LSB first order before emitting # annotations and passing bits to upper layers. This may be # unexpected because the protocol is MSB first, but it keeps @@ -239,7 +268,7 @@ class Decoder(srd.Decoder): texts = [t.format(b = bit_value) for t in texts] self.putg(ss_bit, es_bit, cls, texts) - if cmd.startswith('ADDRESS') and has_rw_bit: + if is_address and has_rw_bit: # Assign the last bit's location to the R/W annotation. # Adjust the address value's location to the left. ss_bit, es_bit = self.data_bits[-1][1], self.data_bits[-1][2] @@ -252,13 +281,9 @@ class Decoder(srd.Decoder): texts = [t.format(b = d) for t in texts] self.putg(ss_byte, es_byte, cls, texts) - # Done with this packet. - self.data_bits.clear() - self.state = 'FIND ACK' - def get_ack(self, ss, es, value): ss_bit, es_bit = ss, es - cmd = 'NACK' if value == 1 else 'ACK' + cmd = 'ACK' if value == 0 else 'NACK' self.putp(ss_bit, es_bit, [cmd, None]) cls, texts = proto[cmd][0], proto[cmd][1:] self.putg(ss_bit, es_bit, cls, texts) @@ -268,10 +293,7 @@ class Decoder(srd.Decoder): # remain in the data phase of the transfer otherwise. if self.rem_addr_bytes: self.rem_addr_bytes -= 1 - if self.rem_addr_bytes: - self.state = 'FIND ADDRESS' - else: - self.state = 'FIND DATA' + self.data_bits.clear() def handle_stop(self, ss, es): # Meta bitrate @@ -288,7 +310,6 @@ class Decoder(srd.Decoder): self.putp(ss, es, [cmd, None]) cls, texts = proto[cmd][0], proto[cmd][1:] self.putg(ss, es, cls, texts) - self.state = 'FIND START' self.is_repeat_start = False self.is_write = None self.data_bits.clear() @@ -298,18 +319,26 @@ class Decoder(srd.Decoder): # here and pass ss, es, and bit values to handling routines. while True: # State machine. - if self.state == 'FIND START': + # BEWARE! This implementation expects to see valid traffic, + # is rather picky in which phase which symbols get handled. + # This attempts to support severely undersampled captures, + # which a previous implementation happened to read instead + # of rejecting the inadequate input data. + # NOTE that handling bits at the start of their validity, + # and assuming that they remain valid until the next bit + # starts, is also done for backwards compatibility. + if self._wants_start(): # Wait for a START condition (S): SCL = high, SDA = falling. pins = self.wait({0: 'h', 1: 'f'}) ss, es = self.samplenum, self.samplenum self.handle_start(ss, es) - elif self.state == 'FIND ADDRESS': + elif self._collects_address() and self._collects_byte(): # Wait for a data bit: SCL = rising. pins = self.wait({0: 'r'}) _, sda = pins - ss, es = self.samplenum, self.samplenum # TODO plus bitwidth + ss, es = self.samplenum, self.samplenum + self.bitwidth self.handle_address_or_data(ss, es, sda) - elif self.state == 'FIND DATA': + elif self._collects_byte(): # Wait for any of the following conditions (or combinations): # a) Data sampling of receiver: SCL = rising, and/or # b) START condition (S): SCL = high, SDA = falling, and/or @@ -319,7 +348,7 @@ class Decoder(srd.Decoder): # Check which of the condition(s) matched and handle them. if self.matched[0]: _, sda = pins - ss, es = self.samplenum, self.samplenum # TODO plus bitwidth + ss, es = self.samplenum, self.samplenum + self.bitwidth self.handle_address_or_data(ss, es, sda) elif self.matched[1]: ss, es = self.samplenum, self.samplenum @@ -327,7 +356,7 @@ class Decoder(srd.Decoder): elif self.matched[2]: ss, es = self.samplenum, self.samplenum self.handle_stop(ss, es) - elif self.state == 'FIND ACK': + else: # Wait for a data/ack bit: SCL = rising. pins = self.wait({0: 'r'}) _, sda = pins