]> sigrok.org Git - libsigrokdecode.git/blobdiff - decoders/eeprom24xx/pd.py
eeprom24xx: avoid access to caller's data after validity ends
[libsigrokdecode.git] / decoders / eeprom24xx / pd.py
index c259dc10d4af1b44e9a492323d58d40bbfbbe280..7491f581c572e15fec3864f6e2f92b98bcb9310e 100644 (file)
 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
 ##
 
+import copy
 import sigrokdecode as srd
 from .lists import *
 
 class Decoder(srd.Decoder):
-    api_version = 2
+    api_version = 3
     id = 'eeprom24xx'
     name = '24xx EEPROM'
     longname = '24xx I²C EEPROM'
     desc = '24xx series I²C EEPROM protocol.'
     license = 'gplv2+'
     inputs = ['i2c']
-    outputs = ['eeprom24xx']
+    outputs = []
+    tags = ['IC', 'Memory']
     options = (
         {'id': 'chip', 'desc': 'Chip', 'default': 'generic',
             'values': tuple(chips.keys())},
@@ -37,7 +39,7 @@ class Decoder(srd.Decoder):
     )
     annotations = (
         # Warnings
-        ('warnings', 'Warnings'),
+        ('warning', 'Warning'),
         # Bits/bytes
         ('control-code', 'Control code'),
         ('address-pin', 'Address pin (A0/A1/A2)'),
@@ -75,6 +77,9 @@ class Decoder(srd.Decoder):
     def __init__(self):
         self.reset()
 
+    def reset(self):
+        self.reset_variables()
+
     def start(self):
         self.out_ann = self.register(srd.OUTPUT_ANN)
         self.out_binary = self.register(srd.OUTPUT_BINARY)
@@ -90,7 +95,7 @@ class Decoder(srd.Decoder):
     def putbits(self, bit1, bit2, bits, data):
         self.put(bits[bit1][1], bits[bit2][2], self.out_ann, data)
 
-    def reset(self):
+    def reset_variables(self):
         self.state = 'WAIT FOR START'
         self.packets = []
         self.bytebuf = []
@@ -179,7 +184,7 @@ class Decoder(srd.Decoder):
 
     def decide_on_seq_or_rnd_read(self):
         if len(self.bytebuf) < 2:
-            self.reset()
+            self.reset_variables()
             return
         if len(self.bytebuf) == 2:
             self.is_random_access_read = True
@@ -237,7 +242,7 @@ class Decoder(srd.Decoder):
     def handle_get_control_word(self):
         # The packet after START must be an ADDRESS READ or ADDRESS WRITE.
         if self.cmd not in ('ADDRESS READ', 'ADDRESS WRITE'):
-            self.reset()
+            self.reset_variables()
             return
         self.packet_append()
         self.put_control_word(self.bits)
@@ -249,18 +254,18 @@ class Decoder(srd.Decoder):
         elif self.cmd == 'NACK':
             self.es_block = self.es
             self.putb([0, ['Warning: No reply from slave!']])
-            self.reset()
+            self.reset_variables()
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r_get_word_addr_or_byte(self):
         if self.cmd == 'STOP':
             self.es_block = self.es
             self.putb([0, ['Warning: Slave replied, but master aborted!']])
-            self.reset()
+            self.reset_variables()
             return
         elif self.cmd != 'DATA READ':
-            self.reset()
+            self.reset_variables()
             return
         self.packet_append()
         self.state = 'R GET ACK NACK AFTER WORD ADDR OR BYTE'
@@ -272,20 +277,20 @@ class Decoder(srd.Decoder):
             self.is_cur_addr_read = True
             self.state = 'GET STOP AFTER LAST BYTE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r_get_restart(self):
         if self.cmd == 'RESTART':
             self.state = 'R READ BYTE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r_read_byte(self):
         if self.cmd == 'DATA READ':
             self.packet_append()
             self.state = 'R GET ACK NACK AFTER BYTE WAS READ'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r_get_ack_nack_after_byte_was_read(self):
         if self.cmd == 'ACK':
@@ -294,7 +299,7 @@ class Decoder(srd.Decoder):
             # It's either a RANDOM READ or a SEQUENTIAL READ.
             self.state = 'GET STOP AFTER LAST BYTE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_w_get_ack_nack_after_control_word(self):
         if self.cmd == 'ACK':
@@ -302,18 +307,18 @@ class Decoder(srd.Decoder):
         elif self.cmd == 'NACK':
             self.es_block = self.es
             self.putb([0, ['Warning: No reply from slave!']])
-            self.reset()
+            self.reset_variables()
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_w_get_word_addr(self):
         if self.cmd == 'STOP':
             self.es_block = self.es
             self.putb([0, ['Warning: Slave replied, but master aborted!']])
-            self.reset()
+            self.reset_variables()
             return
         elif self.cmd != 'DATA WRITE':
-            self.reset()
+            self.reset_variables()
             return
         self.packet_append()
         self.state = 'W GET ACK AFTER WORD ADDR'
@@ -322,7 +327,7 @@ class Decoder(srd.Decoder):
         if self.cmd == 'ACK':
             self.state = 'W DETERMINE EEPROM READ OR WRITE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_w_determine_eeprom_read_or_write(self):
         if self.cmd == 'START REPEAT':
@@ -332,7 +337,7 @@ class Decoder(srd.Decoder):
             self.packet_append()
             self.state = 'W GET ACK NACK AFTER BYTE WAS WRITTEN'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_w_write_byte(self):
         if self.cmd == 'DATA WRITE':
@@ -340,7 +345,7 @@ class Decoder(srd.Decoder):
             self.state = 'W GET ACK NACK AFTER BYTE WAS WRITTEN'
         elif self.cmd == 'STOP':
             if len(self.bytebuf) < 2:
-                self.reset()
+                self.reset_variables()
                 return
             self.es_block = self.es
             if len(self.bytebuf) == 2:
@@ -348,31 +353,31 @@ class Decoder(srd.Decoder):
             else:
                 self.is_page_write = True
             self.put_operation()
-            self.reset()
+            self.reset_variables()
         elif self.cmd == 'START REPEAT':
             # It's either a RANDOM ACCESS READ or SEQUENTIAL RANDOM READ.
             self.state = 'R2 GET CONTROL WORD'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_w_get_ack_nack_after_byte_was_written(self):
         if self.cmd == 'ACK':
             self.state = 'W WRITE BYTE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r2_get_control_word(self):
         if self.cmd == 'ADDRESS READ':
             self.packet_append()
             self.state = 'R2 GET ACK AFTER ADDR READ'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r2_get_ack_after_addr_read(self):
         if self.cmd == 'ACK':
             self.state = 'R2 READ BYTE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r2_read_byte(self):
         if self.cmd == 'DATA READ':
@@ -383,9 +388,9 @@ class Decoder(srd.Decoder):
             self.es_block = self.es
             self.putb([0, ['Warning: STOP expected after a NACK (not ACK)']])
             self.put_operation()
-            self.reset()
+            self.reset_variables()
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_r2_get_ack_nack_after_byte_was_read(self):
         if self.cmd == 'ACK':
@@ -394,34 +399,43 @@ class Decoder(srd.Decoder):
             self.decide_on_seq_or_rnd_read()
             self.state = 'GET STOP AFTER LAST BYTE'
         else:
-            self.reset()
+            self.reset_variables()
 
     def handle_get_stop_after_last_byte(self):
         if self.cmd == 'STOP':
             self.es_block = self.es
             self.put_operation()
-            self.reset()
+            self.reset_variables()
         elif self.cmd == 'START REPEAT':
             self.es_block = self.es
             self.putb([0, ['Warning: STOP expected (not RESTART)']])
             self.put_operation()
-            self.reset()
+            self.reset_variables()
             self.ss_block = self.ss
             self.state = 'GET CONTROL WORD'
         else:
-            self.reset()
+            self.reset_variables()
 
     def decode(self, ss, es, data):
-        self.cmd, self.databyte = data
+        cmd, _ = data
 
         # Collect the 'BITS' packet, then return. The next packet is
         # guaranteed to belong to these bits we just stored.
-        if self.cmd == 'BITS':
-            self.bits = self.databyte
+        if cmd == 'BITS':
+            _, databits = data
+            self.bits = copy.deepcopy(databits)
             return
 
-        # Store the start/end samples of this I²C packet.
+        # Store the start/end samples of this I²C packet. Deep copy
+        # caller's data, assuming that implementation details of the
+        # above complex methods can access the data after returning
+        # from the .decode() invocation, with the data having become
+        # invalid by that time of access. This conservative approach
+        # can get weakened after close inspection of those methods.
         self.ss, self.es = ss, es
+        _, databyte = data
+        databyte = copy.deepcopy(databyte)
+        self.cmd, self.databyte = cmd, databyte
 
         # State machine.
         s = 'handle_%s' % self.state.lower().replace(' ', '_')