+ def putg(self, ss, es, cls, texts):
+ self.put(ss, es, self.out_ann, [cls, texts])
+
+ def invalidate_frame_details(self):
+ self.bits.clear()
+ self.fields.clear()
+
+ def handle_databytes(self, fields, data):
+ # TODO Deep inspection of header fields and data values, including
+ # checksum verification results.
+ mode = fields.get('mode', None)
+ if mode is None:
+ return
+ if mode == 1:
+ # An earlier implementation commented that for mode 1 the
+ # first data byte would be the PID. But example captures
+ # have no data bytes in packets for that mode. This position
+ # is taken by the checksum. Is this correct?
+ pid = data[0] if data else fields.get('csum', None)
+ if pid is None:
+ text = ['PID missing']
+ self.putg(ss, es, ANN_WARN, text)
+ else:
+ byte_text = '{:02x}'.format(pid)
+ self.putg(ss, es, ANN_M1_PID, [byte_text])
+
+ def handle_byte(self, ss, es, b):
+ # Annotate all raw byte values. Inspect and process the first
+ # bytes in a frame already. Cease inspection and only accumulate
+ # all other bytes after the mode. The checksum's position and
+ # thus the data bytes' span will only be known when EOF or IFS
+ # were seen. Implementor's note: This method just identifies
+ # header fields. Processing is left to the .handle_databytes()
+ # method. Until then validity will have been checked, too (CS).
+ byte_text = '{:02x}'.format(b)
+ self.putg(ss, es, ANN_BYTE, [byte_text])
+
+ if not 'prio' in self.fields:
+ self.fields.update({'prio': b})
+ self.putg(ss, es, ANN_PRIO, [byte_text])
+ return
+ if not 'dest' in self.fields:
+ self.fields.update({'dest': b})
+ self.putg(ss, es, ANN_DEST, [byte_text])
+ return
+ if not 'src' in self.fields:
+ self.fields.update({'src': b})
+ self.putg(ss, es, ANN_SRC, [byte_text])
+ return
+ if not 'mode' in self.fields:
+ self.fields.update({'mode': b})
+ self.putg(ss, es, ANN_MODE, [byte_text])
+ return
+ if not 'data' in self.fields:
+ self.fields.update({'data': [], 'csum': None})
+ self.fields['data'].append((b, ss, es))
+
+ def handle_sof(self, ss, es, speed):
+ text = ['{speed:d}x SOF', 'S{speed:d}', 'S']
+ text = [f.format(speed = speed) for f in text]
+ self.putg(ss, es, ANN_SOF, text)
+ self.invalidate_frame_details()
+ self.fields.update({'speed': speed})
+
+ def handle_bit(self, ss, es, b):
+ self.bits.append((b, ss, es))
+ self.putg(ss, es, ANN_BIT, ['{:d}'.format(b)])
+ if len(self.bits) < 8:
+ return
+ ss, es = self.bits[0][1], self.bits[-1][2]
+ b = bitpack_msb(self.bits, 0)
+ self.bits.clear()
+ self.handle_byte(ss, es, b)
+
+ def handle_eof(self, ss, es, is_ifs = False):
+ # EOF or IFS were seen. Post process the data bytes sequence.
+ # Separate the checksum from the data bytes. Emit annotations.
+ # Pass data bytes and header fields to deeper inspection.
+ data = self.fields.get('data', {})
+ if not data:
+ text = ['Short data phase', 'Data']
+ self.putg(ss, es, ANN_WARN, text)
+ csum = None
+ if len(data) >= 1:
+ csum, ss_csum, es_csum = data.pop()
+ self.fields.update({'csum': csum})
+ # TODO Verify checksum's correctness?
+ if data:
+ ss_data, es_data = data[0][1], data[-1][2]
+ text = ' '.join(['{:02x}'.format(b[0]) for b in data])
+ self.putg(ss_data, es_data, ANN_DATA, [text])
+ if csum is not None:
+ text = '{:02x}'.format(csum)
+ self.putg(ss_csum, es_csum, ANN_CSUM, [text])
+ text = ['IFS', 'I'] if is_ifs else ['EOF', 'E']
+ self.putg(ss, es, ANN_IFS, text)
+ self.handle_databytes(self.fields, data);
+ self.invalidate_frame_details()
+
+ def handle_unknown(self, ss, es):
+ text = ['Unknown condition', 'Unknown', 'UNK']
+ self.putg(ss, es, ANN_WARN, text)
+ self.invalidate_frame_details()
+
+ def usecs_to_samples(self, us):
+ us *= 1e-6
+ us *= self.samplerate
+ return int(us)
+
+ def samples_to_usecs(self, n):
+ n /= self.samplerate
+ n *= 1000.0 * 1000.0
+ return int(n)