X-Git-Url: https://sigrok.org/gitweb/?p=libsigrokdecode.git;a=blobdiff_plain;f=decoders%2Flin%2Fpd.py;h=426cbf0520f4f0208fc905ac8a5af1addb0a6e56;hp=d1c14814b681ea0563146d68f334645a47c10148;hb=d871cc2d11dba2a57b747a940beace0d9456c166;hpb=fe9160d567fbb4c890ac7b8bc2e1f2f9143f0935 diff --git a/decoders/lin/pd.py b/decoders/lin/pd.py index d1c1481..426cbf0 100644 --- a/decoders/lin/pd.py +++ b/decoders/lin/pd.py @@ -21,7 +21,7 @@ import sigrokdecode as srd class LinFsm: class State: - WaitForBreak = 'WAIT FOR BREAK' + WaitForBreak = 'WAIT_FOR_BREAK' Sync = 'SYNC' Pid = 'PID' Data = 'DATA' @@ -37,24 +37,24 @@ class LinFsm: def _transition_allowed(self, target_state): if target_state == LinFsm.State.Error: return True - for s in self.allowed_state[self.state]: - if s == target_state: - return True - return False + return target_state in self.allowed_state[self.state] def reset(self): self.state = LinFsm.State.WaitForBreak + self.uart_idle_count = 0 def __init__(self): - self.allowed_state = dict() - self.allowed_state[LinFsm.State.WaitForBreak] = (LinFsm.State.Sync,) - self.allowed_state[LinFsm.State.Sync] = (LinFsm.State.Pid,) - self.allowed_state[LinFsm.State.Pid] = (LinFsm.State.Data,) - self.allowed_state[LinFsm.State.Data] = (LinFsm.State.Data, LinFsm.State.Checksum) - self.allowed_state[LinFsm.State.Checksum] = (LinFsm.State.WaitForBreak,) - self.allowed_state[LinFsm.State.Error] = (LinFsm.State.Sync,) + a = dict() + a[LinFsm.State.WaitForBreak] = (LinFsm.State.Sync,) + a[LinFsm.State.Sync] = (LinFsm.State.Pid,) + a[LinFsm.State.Pid] = (LinFsm.State.Data,) + a[LinFsm.State.Data] = (LinFsm.State.Data, LinFsm.State.Checksum) + a[LinFsm.State.Checksum] = (LinFsm.State.WaitForBreak,) + a[LinFsm.State.Error] = (LinFsm.State.Sync,) + self.allowed_state = a self.state = None + self.uart_idle_count = 0 self.reset() class Decoder(srd.Decoder): @@ -65,7 +65,8 @@ class Decoder(srd.Decoder): desc = 'Local Interconnect Network (LIN) protocol.' license = 'gplv2+' inputs = ['uart'] - outputs = ['lin'] + outputs = [] + tags = ['Automotive'] options = ( {'id': 'version', 'desc': 'Protocol version', 'default': 2, 'values': (1, 2)}, ) @@ -101,7 +102,7 @@ class Decoder(srd.Decoder): def wipe_break_null_byte(self, value): # Upon a break condition a null byte is received which must be ignored. - if self.fsm.state != LinFsm.State.WaitForBreak and self.fsm.state != LinFsm.State.Error: + if self.fsm.state not in (LinFsm.State.WaitForBreak, LinFsm.State.Error): if len(self.lin_rsp): value = self.lin_rsp.pop()[2] else: @@ -109,16 +110,25 @@ class Decoder(srd.Decoder): if value != 0: self.fsm.transit(LinFsm.State.Error) - self.handle_error() + self.handle_error(None) return False return True + def handle_uart_idle(self): + if self.fsm.state not in (LinFsm.State.WaitForBreak, LinFsm.State.Error): + self.fsm.uart_idle_count += 1 + + if self.fsm.uart_idle_count == 2: + self.fsm.transit(LinFsm.State.Checksum) + self.handle_checksum() + self.fsm.reset() + def handle_wait_for_break(self, value): self.wipe_break_null_byte(value) def handle_break(self, value): - if self.fsm.state != LinFsm.State.WaitForBreak and self.fsm.state != LinFsm.State.Error: + if self.fsm.state not in (LinFsm.State.WaitForBreak, LinFsm.State.Error): if self.wipe_break_null_byte(value): self.fsm.transit(LinFsm.State.Checksum) self.handle_checksum() @@ -126,8 +136,7 @@ class Decoder(srd.Decoder): self.fsm.reset() self.fsm.transit(LinFsm.State.Sync) - data = [1, ['Break condition', 'Break', 'Brk', 'B']] - self.put(self.ss_block, self.es_block, self.out_ann, data) + self.putx([1, ['Break condition', 'Break', 'Brk', 'B']]) def handle_sync(self, value): self.fsm.transit(LinFsm.State.Pid) @@ -153,7 +162,7 @@ class Decoder(srd.Decoder): checksum = self.lin_rsp.pop() if len(self.lin_rsp) else None if pid: - id = pid[2] & 0x3F + id_ = pid[2] & 0x3F parity = pid[2] >> 6 expected_parity = self.calc_parity(pid[2]) @@ -164,8 +173,8 @@ class Decoder(srd.Decoder): ann_class = 0 if parity_valid else 3 self.put(pid[0], pid[1], self.out_ann, [ann_class, [ - 'ID: %02X Parity: %d (%s)' % (id, parity, 'ok' if parity_valid else 'bad'), - 'ID: 0x%02X' % id, 'I: %d' % id + 'ID: %02X Parity: %d (%s)' % (id_, parity, 'ok' if parity_valid else 'bad'), + 'ID: 0x%02X' % id_, 'I: %d' % id_ ]]) if len(self.lin_rsp): @@ -186,14 +195,14 @@ class Decoder(srd.Decoder): self.lin_header.clear() self.lin_rsp.clear() - def handle_error(self): + def handle_error(self, dummy): self.putx([3, ['Error', 'Err', 'E']]) def checksum_is_valid(self, pid, data, checksum): if self.lin_version == 2: - id = pid & 0x3F + id_ = pid & 0x3F - if id != 60 and id != 61: + if id_ != 60 and id_ != 61: checksum += pid for d in data: @@ -206,10 +215,10 @@ class Decoder(srd.Decoder): @staticmethod def calc_parity(pid): - id = [((pid & 0x3F) >> i) & 1 for i in range(8)] + id_ = [((pid & 0x3F) >> i) & 1 for i in range(8)] - p0 = id[0] ^ id[1] ^ id[2] ^ id[4] - p1 = not (id[1] ^ id[3] ^ id[4] ^ id[5]) + p0 = id_[0] ^ id_[1] ^ id_[2] ^ id_[4] + p1 = not (id_[1] ^ id_[3] ^ id_[4] ^ id_[5]) return (p0 << 0) | (p1 << 1) @@ -219,6 +228,8 @@ class Decoder(srd.Decoder): self.ss_block, self.es_block = ss, es # Ignore all UART packets except the actual data packets or BREAK. + if ptype == 'IDLE': + self.handle_uart_idle() if ptype == 'BREAK': self.handle_break(pdata) if ptype != 'DATA': @@ -233,9 +244,5 @@ class Decoder(srd.Decoder): # - Sync byte is followed by a PID byte (Protected Identifier). # - PID byte is followed by 1 - 8 data bytes and a final checksum byte. - if self.fsm.state == LinFsm.State.Error: self.handle_error() - elif self.fsm.state == LinFsm.State.WaitForBreak: self.handle_wait_for_break(pdata) - elif self.fsm.state == LinFsm.State.Sync: self.handle_sync(pdata) - elif self.fsm.state == LinFsm.State.Pid: self.handle_pid(pdata) - elif self.fsm.state == LinFsm.State.Data: self.handle_data(pdata) - elif self.fsm.state == LinFsm.State.Checksum: self.handle_checksum() + handler = getattr(self, 'handle_%s' % self.fsm.state.lower()) + handler(pdata)