X-Git-Url: https://sigrok.org/gitweb/?p=libsigrokdecode.git;a=blobdiff_plain;f=decoders%2Fsae_j1850_vpw%2Fpd.py;fp=decoders%2Fsae_j1850_vpw%2Fpd.py;h=3655f964747aa59219c02108b30da96b748d00c9;hp=fd2389ec55e38534a616f9ceed6b03aac10cf82d;hb=df3a4a3bd1763324765f53932d878525a4a20102;hpb=bb5af7af06b8fb1b37a88a03a94db3e35a7d15de diff --git a/decoders/sae_j1850_vpw/pd.py b/decoders/sae_j1850_vpw/pd.py index fd2389e..3655f96 100644 --- a/decoders/sae_j1850_vpw/pd.py +++ b/decoders/sae_j1850_vpw/pd.py @@ -2,6 +2,7 @@ ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2016 Anthony Symons +## Copyright (C) 2023 Gerhard Sittig ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -18,16 +19,30 @@ ## import sigrokdecode as srd +from common.srdhelper import bitpack_msb + +# VPW Timings. From the SAE J1850 1995 rev section 23.406 documentation. +# Ideal, minimum and maximum tolerances. +VPW_SOF = 200 +VPW_SOFL = 164 +VPW_SOFH = 245 # 240 by the spec, 245 so a 60us 4x sample will pass +VPW_LONG = 128 +VPW_LONGL = 97 +VPW_LONGH = 170 # 164 by the spec but 170 for low sample rate tolerance. +VPW_SHORT = 64 +VPW_SHORTL = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling +VPW_SHORTH = 97 +VPW_IFS = 240 class SamplerateError(Exception): pass -def timeuf(t): - return int (t * 1000.0 * 1000.0) - -class Ann: - ANN_RAW, ANN_SOF, ANN_IFS, ANN_DATA, \ - ANN_PACKET = range(5) +( + ANN_SOF, ANN_BIT, ANN_IFS, ANN_BYTE, + ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM, + ANN_M1_PID, + ANN_WARN, +) = range(12) class Decoder(srd.Decoder): api_version = 3 @@ -43,123 +58,239 @@ class Decoder(srd.Decoder): {'id': 'data', 'name': 'Data', 'desc': 'Data line'}, ) annotations = ( - ('raw', 'Raw'), ('sof', 'SOF'), + ('bit', 'Bit'), ('ifs', 'EOF/IFS'), + ('byte', 'Byte'), + ('prio', 'Priority'), + ('dest', 'Destination'), + ('src', 'Source'), + ('mode', 'Mode'), ('data', 'Data'), - ('packet', 'Packet'), + ('csum', 'Checksum'), + ('m1_pid', 'Pid'), + ('warn', 'Warning'), ) annotation_rows = ( - ('raws', 'Raws', (Ann.ANN_RAW, Ann.ANN_SOF, Ann.ANN_IFS,)), - ('bytes', 'Bytes', (Ann.ANN_DATA,)), - ('packets', 'Packets', (Ann.ANN_PACKET,)), + ('bits', 'Bits', (ANN_SOF, ANN_BIT, ANN_IFS,)), + ('bytes', 'Bytes', (ANN_BYTE,)), + ('fields', 'Fields', (ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM,)), + ('values', 'Values', (ANN_M1_PID,)), + ('warns', 'Warnings', (ANN_WARN,)), ) + # TODO Add support for options? Polarity. Glitch length. def __init__(self): self.reset() def reset(self): - self.state = 'IDLE' self.samplerate = None - self.byte = 0 # the byte offset in the packet - self.mode = 0 # for by packet decode - self.data = 0 # the current byte - self.datastart = 0 # sample number this byte started at - self.csa = 0 # track the last byte seperately to retrospectively add the CS marker - self.csb = 0 - self.count = 0 # which bit number we are up to - self.active = 0 # which logic level is considered active - - # vpw timings. ideal, min and max tollerances. - # From SAE J1850 1995 rev section 23.406 - - self.sof = 200 - self.sofl = 164 - self.sofh = 245 # 240 by the spec, 245 so a 60us 4x sample will pass - self.long = 128 - self.longl = 97 - self.longh = 170 # 164 by the spec but 170 for low sample rate tolerance. - self.short = 64 - self.shortl = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling - self.shorth = 97 - self.ifs = 240 - self.spd = 1 # set to 4 when a 4x SOF is detected (VPW high speed frame) + self.active = 0 # Signal polarity. Needs to become an option? + self.bits = [] + self.fields = {} - def handle_bit(self, ss, es, b): - self.data |= (b << 7-self.count) # MSB-first - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["%d" % b]]) - if self.count == 0: - self.datastart = ss - if self.count == 7: - self.csa = self.datastart # for CS - self.csb = self.samplenum # for CS - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_DATA, ["%02X" % self.data]]) - # add protocol parsing here - if self.byte == 0: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Priority','Prio','P']]) - elif self.byte == 1: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Destination','Dest','D']]) - elif self.byte == 2: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Source','Src','S']]) - elif self.byte == 3: - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Mode','M']]) - self.mode = self.data - elif self.mode == 1 and self.byte == 4: # mode 1 payload - self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Pid','P']]) - - # prepare for next byte - self.count = -1 - self.data = 0 - self.byte = self.byte + 1 # track packet offset - self.count = self.count + 1 + def start(self): + self.out_ann = self.register(srd.OUTPUT_ANN) def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value - def start(self): - self.out_ann = self.register(srd.OUTPUT_ANN) + def putg(self, ss, es, cls, texts): + self.put(ss, es, self.out_ann, [cls, texts]) + + def invalidate_frame_details(self): + self.bits.clear() + self.fields.clear() + + def handle_databytes(self, fields, data): + # TODO Deep inspection of header fields and data values, including + # checksum verification results. + mode = fields.get('mode', None) + if mode is None: + return + if mode == 1: + # An earlier implementation commented that for mode 1 the + # first data byte would be the PID. But example captures + # have no data bytes in packets for that mode. This position + # is taken by the checksum. Is this correct? + pid = data[0] if data else fields.get('csum', None) + if pid is None: + text = ['PID missing'] + self.putg(ss, es, ANN_WARN, text) + else: + byte_text = '{:02x}'.format(pid) + self.putg(ss, es, ANN_M1_PID, [byte_text]) + + def handle_byte(self, ss, es, b): + # Annotate all raw byte values. Inspect and process the first + # bytes in a frame already. Cease inspection and only accumulate + # all other bytes after the mode. The checksum's position and + # thus the data bytes' span will only be known when EOF or IFS + # were seen. Implementor's note: This method just identifies + # header fields. Processing is left to the .handle_databytes() + # method. Until then validity will have been checked, too (CS). + byte_text = '{:02x}'.format(b) + self.putg(ss, es, ANN_BYTE, [byte_text]) + + if not 'prio' in self.fields: + self.fields.update({'prio': b}) + self.putg(ss, es, ANN_PRIO, [byte_text]) + return + if not 'dest' in self.fields: + self.fields.update({'dest': b}) + self.putg(ss, es, ANN_DEST, [byte_text]) + return + if not 'src' in self.fields: + self.fields.update({'src': b}) + self.putg(ss, es, ANN_SRC, [byte_text]) + return + if not 'mode' in self.fields: + self.fields.update({'mode': b}) + self.putg(ss, es, ANN_MODE, [byte_text]) + return + if not 'data' in self.fields: + self.fields.update({'data': [], 'csum': None}) + self.fields['data'].append((b, ss, es)) + + def handle_sof(self, ss, es, speed): + text = ['{speed:d}x SOF', 'S{speed:d}', 'S'] + text = [f.format(speed = speed) for f in text] + self.putg(ss, es, ANN_SOF, text) + self.invalidate_frame_details() + self.fields.update({'speed': speed}) + + def handle_bit(self, ss, es, b): + self.bits.append((b, ss, es)) + self.putg(ss, es, ANN_BIT, ['{:d}'.format(b)]) + if len(self.bits) < 8: + return + ss, es = self.bits[0][1], self.bits[-1][2] + b = bitpack_msb(self.bits, 0) + self.bits.clear() + self.handle_byte(ss, es, b) + + def handle_eof(self, ss, es, is_ifs = False): + # EOF or IFS were seen. Post process the data bytes sequence. + # Separate the checksum from the data bytes. Emit annotations. + # Pass data bytes and header fields to deeper inspection. + data = self.fields.get('data', {}) + if not data: + text = ['Short data phase', 'Data'] + self.putg(ss, es, ANN_WARN, text) + csum = None + if len(data) >= 1: + csum, ss_csum, es_csum = data.pop() + self.fields.update({'csum': csum}) + # TODO Verify checksum's correctness? + if data: + ss_data, es_data = data[0][1], data[-1][2] + text = ' '.join(['{:02x}'.format(b[0]) for b in data]) + self.putg(ss_data, es_data, ANN_DATA, [text]) + if csum is not None: + text = '{:02x}'.format(csum) + self.putg(ss_csum, es_csum, ANN_CSUM, [text]) + text = ['IFS', 'I'] if is_ifs else ['EOF', 'E'] + self.putg(ss, es, ANN_IFS, text) + self.handle_databytes(self.fields, data); + self.invalidate_frame_details() + + def handle_unknown(self, ss, es): + text = ['Unknown condition', 'Unknown', 'UNK'] + self.putg(ss, es, ANN_WARN, text) + self.invalidate_frame_details() + + def usecs_to_samples(self, us): + us *= 1e-6 + us *= self.samplerate + return int(us) + + def samples_to_usecs(self, n): + n /= self.samplerate + n *= 1000.0 * 1000.0 + return int(n) def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') - self.wait({0: 'e'}) + # Get the distance between edges. Classify the distance + # to derive symbols and data bit values. Prepare waiting + # for an interframe gap as well, while this part of the + # condition is optional (switches in and out at runtime). + conds_edge = {0: 'e'} + conds_edge_only = [conds_edge] + conds_edge_idle = [conds_edge, {'skip': 0}] + conds = conds_edge_only + self.wait(conds) es = self.samplenum + spd = None while True: ss = es - pin, = self.wait({0: 'e'}) + pin, = self.wait(conds) es = self.samplenum + count = es - ss + t = self.samples_to_usecs(count) + + # Synchronization to the next frame. Wait for SOF. + # Silently keep synchronizing until SOF was seen. + if spd is None: + if not self.matched[0]: + continue + if pin != self.active: + continue + + # Detect the frame's speed from the SOF length. Adjust + # the expected BIT lengths to the SOF derived speed. + # Arrange for the additional supervision of EOF/IFS. + if t in range(VPW_SOFL // 1, VPW_SOFH // 1): + spd = 1 + elif t in range(VPW_SOFL // 4, VPW_SOFH // 4): + spd = 4 + else: + continue + short_lower, short_upper = VPW_SHORTL // spd, VPW_SHORTH // spd + long_lower, long_upper = VPW_LONGL // spd, VPW_LONGH // spd + samples = self.usecs_to_samples(VPW_IFS // spd) + conds_edge_idle[-1]['skip'] = samples + conds = conds_edge_idle + + # Emit the SOF annotation. Start collecting DATA. + self.handle_sof(ss, es, spd) + continue + + # Inside the DATA phase. Get data bits. Handle EOF/IFS. + if len(conds) > 1 and self.matched[1]: + # TODO The current implementation gets here after a + # pre-determined minimum wait time. Does not differ + # between EOF and IFS. An earlier implementation had + # this developer note: EOF=239-280 IFS=281+ + self.handle_eof(ss, es) + # Enter the IDLE phase. Wait for the next SOF. + spd = None + conds = conds_edge_only + continue + if t in range(short_lower, short_upper): + value = 1 if pin == self.active else 0 + self.handle_bit(ss, es, value) + continue + if t in range(long_lower, long_upper): + value = 0 if pin == self.active else 1 + self.handle_bit(ss, es, value) + continue - samples = es - ss - t = timeuf(samples / self.samplerate) - if self.state == 'IDLE': # detect and set speed from the size of sof - if pin == self.active and t in range(self.sofl , self.sofh): - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['1X SOF', 'S1', 'S']]) - self.spd = 1 - self.data = 0 - self.count = 0 - self.state = 'DATA' - elif pin == self.active and t in range(int(self.sofl / 4) , int(self.sofh / 4)): - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['4X SOF', 'S4', '4']]) - self.spd = 4 - self.data = 0 - self.count = 0 - self.state = 'DATA' - - elif self.state == 'DATA': - if t >= int(self.ifs / self.spd): - self.state = 'IDLE' - self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["EOF/IFS", "E"]]) # EOF=239-280 IFS=281+ - self.put(self.csa, self.csb, self.out_ann, [Ann.ANN_PACKET, ['Checksum','CS','C']]) # retrospective print of CS - self.byte = 0 # reset packet offset - elif t in range(int(self.shortl / self.spd), int(self.shorth / self.spd)): - if pin == self.active: - self.handle_bit(ss, es, 1) - else: - self.handle_bit(ss, es, 0) - elif t in range(int(self.longl / self.spd), int(self.longh / self.spd)): - if pin == self.active: - self.handle_bit(ss, es, 0) - else: - self.handle_bit(ss, es, 1) + # Implementation detail: An earlier implementation used to + # ignore everything that was not handled above. This would + # be motivated by the noisy environment the protocol is + # typically used in. This more recent implementation accepts + # short glitches, but by design falls back to synchronization + # to the input stream for other unhandled conditions. This + # wants to improve usability of the decoder, by presenting + # potential issues to the user. The threshold (microseconds + # between edges that are not valid symbols that are handled + # above) is an arbitrary choice. + if t <= 2: + continue + self.handle_unknown(ss, es) + spd = None + conds = conds_edge_only