X-Git-Url: https://sigrok.org/gitweb/?a=blobdiff_plain;f=decoders%2Fi2c%2Fi2c.py;h=5f67d3576ab49d70eb0dbc15e107de67b3d8edea;hb=271acd3bde96474c7ed5f822d470f555af8e7d93;hp=f0a95b79fd334b1dc6d9e2ca875b0dee988694fe;hpb=64c29e28e0efa184319f7831b3eca18c7f73f7d0;p=libsigrokdecode.git diff --git a/decoders/i2c/i2c.py b/decoders/i2c/i2c.py index f0a95b7..5f67d35 100644 --- a/decoders/i2c/i2c.py +++ b/decoders/i2c/i2c.py @@ -18,46 +18,7 @@ ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA ## -# # I2C protocol decoder -# - -# -# The Inter-Integrated Circuit (I2C) bus is a bidirectional, multi-master -# bus using two signals (SCL = serial clock line, SDA = serial data line). -# -# There can be many devices on the same bus. Each device can potentially be -# master or slave (and that can change during runtime). Both slave and master -# can potentially play the transmitter or receiver role (this can also -# change at runtime). -# -# Possible maximum data rates: -# - Standard mode: 100 kbit/s -# - Fast mode: 400 kbit/s -# - Fast-mode Plus: 1 Mbit/s -# - High-speed mode: 3.4 Mbit/s -# -# START condition (S): SDA = falling, SCL = high -# Repeated START condition (Sr): same as S -# Data bit sampling: SCL = rising -# STOP condition (P): SDA = rising, SCL = high -# -# All data bytes on SDA are exactly 8 bits long (transmitted MSB-first). -# Each byte has to be followed by a 9th ACK/NACK bit. If that bit is low, -# that indicates an ACK, if it's high that indicates a NACK. -# -# After the first START condition, a master sends the device address of the -# slave it wants to talk to. Slave addresses are 7 bits long (MSB-first). -# After those 7 bits, a data direction bit is sent. If the bit is low that -# indicates a WRITE operation, if it's high that indicates a READ operation. -# -# Later an optional 10bit slave addressing scheme was added. -# -# Documentation: -# http://www.nxp.com/acrobat/literature/9398/39340011.pdf (v2.1 spec) -# http://www.nxp.com/acrobat/usermanuals/UM10204_3.pdf (v3 spec) -# http://en.wikipedia.org/wiki/I2C -# # TODO: Look into arbitration, collision detection, clock synchronisation, etc. # TODO: Handle clock stretching. @@ -69,36 +30,15 @@ # TODO: Handle multiple different I2C devices on same bus # -> we need to decode multiple protocols at the same time. -''' -Protocol output format: - -I2C packet: -[, , ] - - is one of: - - 'START' (START condition) - - 'START REPEAT' (Repeated START) - - 'ADDRESS READ' (Address, read) - - 'ADDRESS WRITE' (Address, write) - - 'DATA READ' (Data, read) - - 'DATA WRITE' (Data, write) - - 'STOP' (STOP condition) - - is the data or address byte associated with the 'ADDRESS*' and 'DATA*' -command. For 'START', 'START REPEAT' and 'STOP', this is None. - - is either 'ACK' or 'NACK', but may also be None. -''' - import sigrokdecode as srd # Annotation feed formats -ANN_SHIFTED = 0 +ANN_SHIFTED = 0 ANN_SHIFTED_SHORT = 1 -ANN_RAW = 2 +ANN_RAW = 2 # Values are verbose and short annotation, respectively. -protocol = { +proto = { 'START': ['START', 'S'], 'START REPEAT': ['START REPEAT', 'Sr'], 'STOP': ['STOP', 'P'], @@ -110,17 +50,12 @@ protocol = { 'DATA WRITE': ['DATA WRITE', 'DW'], } -# States -FIND_START = 0 -FIND_ADDRESS = 1 -FIND_DATA = 2 - class Decoder(srd.Decoder): + api_version = 1 id = 'i2c' name = 'I2C' longname = 'Inter-Integrated Circuit' desc = 'I2C is a two-wire, multi-master, serial bus.' - longdesc = '...' license = 'gplv2+' inputs = ['logic'] outputs = ['i2c'] @@ -128,6 +63,7 @@ class Decoder(srd.Decoder): {'id': 'scl', 'name': 'SCL', 'desc': 'Serial clock line'}, {'id': 'sda', 'name': 'SDA', 'desc': 'Serial data line'}, ] + optional_probes = [] options = { 'addressing': ['Slave addressing (in bits)', 7], # 7 or 10 } @@ -143,19 +79,16 @@ class Decoder(srd.Decoder): ] def __init__(self, **kwargs): - self.samplecnt = 0 + self.startsample = -1 + self.samplenum = None self.bitcount = 0 self.databyte = 0 self.wr = -1 - self.startsample = -1 self.is_repeat_start = 0 - self.state = FIND_START + self.state = 'FIND START' self.oldscl = None self.oldsda = None - # Set protocol decoder option defaults. - self.addressing = Decoder.options['addressing'][1] - def start(self, metadata): self.out_proto = self.add(srd.OUTPUT_PROTO, 'i2c') self.out_ann = self.add(srd.OUTPUT_ANN, 'i2c') @@ -182,95 +115,90 @@ class Decoder(srd.Decoder): return False def found_start(self, scl, sda): - cmd = 'START REPEAT' if (self.is_repeat_start == 1) else 'START' + self.startsample = self.samplenum - self.put(self.out_proto, [cmd, None, None]) - self.put(self.out_ann, [ANN_SHIFTED, [protocol[cmd][0]]]) - self.put(self.out_ann, [ANN_SHIFTED_SHORT, [protocol[cmd][1]]]) + cmd = 'START REPEAT' if (self.is_repeat_start == 1) else 'START' + self.put(self.out_proto, [cmd, None]) + self.put(self.out_ann, [ANN_SHIFTED, [proto[cmd][0]]]) + self.put(self.out_ann, [ANN_SHIFTED_SHORT, [proto[cmd][1]]]) - self.state = FIND_ADDRESS + self.state = 'FIND ADDRESS' self.bitcount = self.databyte = 0 self.is_repeat_start = 1 self.wr = -1 + # Gather 8 bits of data plus the ACK/NACK bit. def found_address_or_data(self, scl, sda): - # Gather 8 bits of data plus the ACK/NACK bit. - - if self.startsample == -1: - # TODO: Should be samplenum, as received from the feed. - self.startsample = self.samplecnt - self.bitcount += 1 - # Address and data are transmitted MSB-first. self.databyte <<= 1 self.databyte |= sda + if self.bitcount == 0: + self.startsample = self.samplenum + # Return if we haven't collected all 8 + 1 bits, yet. - if self.bitcount != 9: + self.bitcount += 1 + if self.bitcount != 8: return + # We triggered on the ACK/NACK bit, but won't report that until later. + self.startsample -= 1 + # Send raw output annotation before we start shifting out - # read/write and ack/nack bits. + # read/write and ACK/NACK bits. self.put(self.out_ann, [ANN_RAW, ['0x%.2x' % self.databyte]]) - # We received 8 address/data bits and the ACK/NACK bit. - self.databyte >>= 1 # Shift out unwanted ACK/NACK bit here. - - if self.state == FIND_ADDRESS: + if self.state == 'FIND ADDRESS': # The READ/WRITE bit is only in address bytes, not data bytes. self.wr = 0 if (self.databyte & 1) else 1 d = self.databyte >> 1 - elif self.state == FIND_DATA: + elif self.state == 'FIND DATA': d = self.databyte - else: - # TODO: Error? - pass - # Last bit that came in was the ACK/NACK bit (1 = NACK). - ack_bit = 'NACK' if (sda == 1) else 'ACK' - - if self.state == FIND_ADDRESS and self.wr == 1: + if self.state == 'FIND ADDRESS' and self.wr == 1: cmd = 'ADDRESS WRITE' - elif self.state == FIND_ADDRESS and self.wr == 0: + elif self.state == 'FIND ADDRESS' and self.wr == 0: cmd = 'ADDRESS READ' - elif self.state == FIND_DATA and self.wr == 1: + elif self.state == 'FIND DATA' and self.wr == 1: cmd = 'DATA WRITE' - elif self.state == FIND_DATA and self.wr == 0: + elif self.state == 'FIND DATA' and self.wr == 0: cmd = 'DATA READ' - self.put(self.out_proto, [cmd, d, ack_bit]) - self.put(self.out_ann, [ANN_SHIFTED, - [protocol[cmd][0], '0x%02x' % d, protocol[ack_bit][0]]]) - self.put(self.out_ann, [ANN_SHIFTED_SHORT, - [protocol[cmd][1], '0x%02x' % d, protocol[ack_bit][1]]]) + self.put(self.out_proto, [cmd, d]) + self.put(self.out_ann, [ANN_SHIFTED, [proto[cmd][0], '0x%02x' % d]]) + self.put(self.out_ann, [ANN_SHIFTED_SHORT, [proto[cmd][1], '0x%02x' % d]]) - self.bitcount = self.databyte = 0 + # Done with this packet. self.startsample = -1 + self.bitcount = self.databyte = 0 + self.state = 'FIND ACK' - if self.state == FIND_ADDRESS: - self.state = FIND_DATA - elif self.state == FIND_DATA: - # There could be multiple data bytes in a row. - # So, either find a STOP condition or another data byte next. - pass + def get_ack(self, scl, sda): + self.startsample = self.samplenum + ack_bit = 'NACK' if (sda == 1) else 'ACK' + self.put(self.out_proto, [ack_bit, None]) + self.put(self.out_ann, [ANN_SHIFTED, [proto[ack_bit][0]]]) + self.put(self.out_ann, [ANN_SHIFTED_SHORT, [proto[ack_bit][1]]]) + # There could be multiple data bytes in a row, so either find + # another data byte or a STOP condition next. + self.state = 'FIND DATA' def found_stop(self, scl, sda): - self.put(self.out_proto, ['STOP', None, None]) - self.put(self.out_ann, [ANN_SHIFTED, [protocol['STOP'][0]]]) - self.put(self.out_ann, [ANN_SHIFTED_SHORT, [protocol['STOP'][1]]]) + self.startsample = self.samplenum + self.put(self.out_proto, ['STOP', None]) + self.put(self.out_ann, [ANN_SHIFTED, [proto['STOP'][0]]]) + self.put(self.out_ann, [ANN_SHIFTED_SHORT, [proto['STOP'][1]]]) - self.state = FIND_START + self.state = 'FIND START' self.is_repeat_start = 0 self.wr = -1 def put(self, output_id, data): # Inject sample range into the call up to sigrok. - # TODO: 0-0 sample range for now. - super(Decoder, self).put(0, 0, output_id, data) + super(Decoder, self).put(self.startsample, self.samplenum, output_id, data) def decode(self, ss, es, data): - for samplenum, (scl, sda) in data: - self.samplecnt += 1 + for (self.samplenum, (scl, sda)) in data: # First sample: Save SCL/SDA value. if self.oldscl == None: @@ -281,22 +209,24 @@ class Decoder(srd.Decoder): # TODO: Wait until the bus is idle (SDA = SCL = 1) first? # State machine. - if self.state == FIND_START: + if self.state == 'FIND START': if self.is_start_condition(scl, sda): self.found_start(scl, sda) - elif self.state == FIND_ADDRESS: + elif self.state == 'FIND ADDRESS': if self.is_data_bit(scl, sda): self.found_address_or_data(scl, sda) - elif self.state == FIND_DATA: + elif self.state == 'FIND DATA': if self.is_data_bit(scl, sda): self.found_address_or_data(scl, sda) elif self.is_start_condition(scl, sda): self.found_start(scl, sda) elif self.is_stop_condition(scl, sda): self.found_stop(scl, sda) + elif self.state == 'FIND ACK': + if self.is_data_bit(scl, sda): + self.get_ack(scl, sda) else: - # TODO: Error? - pass + raise Exception('Invalid state %d' % self.STATE) # Save current SDA/SCL values for the next round. self.oldscl = scl