class LinFsm:
class State:
- WaitForBreak = 'WAIT FOR BREAK'
+ WaitForBreak = 'WAIT_FOR_BREAK'
Sync = 'SYNC'
Pid = 'PID'
Data = 'DATA'
self.state = LinFsm.State.WaitForBreak
def __init__(self):
- self.allowed_state = dict()
- self.allowed_state[LinFsm.State.WaitForBreak] = (LinFsm.State.Sync,)
- self.allowed_state[LinFsm.State.Sync] = (LinFsm.State.Pid,)
- self.allowed_state[LinFsm.State.Pid] = (LinFsm.State.Data,)
- self.allowed_state[LinFsm.State.Data] = (LinFsm.State.Data, LinFsm.State.Checksum)
- self.allowed_state[LinFsm.State.Checksum] = (LinFsm.State.WaitForBreak,)
- self.allowed_state[LinFsm.State.Error] = (LinFsm.State.Sync,)
+ a = dict()
+ a[LinFsm.State.WaitForBreak] = (LinFsm.State.Sync,)
+ a[LinFsm.State.Sync] = (LinFsm.State.Pid,)
+ a[LinFsm.State.Pid] = (LinFsm.State.Data,)
+ a[LinFsm.State.Data] = (LinFsm.State.Data, LinFsm.State.Checksum)
+ a[LinFsm.State.Checksum] = (LinFsm.State.WaitForBreak,)
+ a[LinFsm.State.Error] = (LinFsm.State.Sync,)
+ self.allowed_state = a
self.state = None
self.reset()
desc = 'Local Interconnect Network (LIN) protocol.'
license = 'gplv2+'
inputs = ['uart']
- outputs = ['lin']
+ outputs = []
+ tags = ['Automotive']
options = (
{'id': 'version', 'desc': 'Protocol version', 'default': 2, 'values': (1, 2)},
)
def wipe_break_null_byte(self, value):
# Upon a break condition a null byte is received which must be ignored.
- if self.fsm.state != LinFsm.State.WaitForBreak and self.fsm.state != LinFsm.State.Error:
+ if self.fsm.state not in (LinFsm.State.WaitForBreak, LinFsm.State.Error):
if len(self.lin_rsp):
value = self.lin_rsp.pop()[2]
else:
if value != 0:
self.fsm.transit(LinFsm.State.Error)
- self.handle_error()
+ self.handle_error(None)
return False
return True
self.wipe_break_null_byte(value)
def handle_break(self, value):
- if self.fsm.state != LinFsm.State.WaitForBreak and self.fsm.state != LinFsm.State.Error:
+ if self.fsm.state not in (LinFsm.State.WaitForBreak, LinFsm.State.Error):
if self.wipe_break_null_byte(value):
self.fsm.transit(LinFsm.State.Checksum)
self.handle_checksum()
self.fsm.reset()
self.fsm.transit(LinFsm.State.Sync)
- data = [1, ['Break condition', 'Break', 'Brk', 'B']]
- self.put(self.ss_block, self.es_block, self.out_ann, data)
+ self.putx([1, ['Break condition', 'Break', 'Brk', 'B']])
def handle_sync(self, value):
self.fsm.transit(LinFsm.State.Pid)
checksum = self.lin_rsp.pop() if len(self.lin_rsp) else None
if pid:
- id = pid[2] & 0x3F
+ id_ = pid[2] & 0x3F
parity = pid[2] >> 6
expected_parity = self.calc_parity(pid[2])
ann_class = 0 if parity_valid else 3
self.put(pid[0], pid[1], self.out_ann, [ann_class, [
- 'ID: %02X Parity: %d (%s)' % (id, parity, 'ok' if parity_valid else 'bad'),
- 'ID: 0x%02X' % id, 'I: %d' % id
+ 'ID: %02X Parity: %d (%s)' % (id_, parity, 'ok' if parity_valid else 'bad'),
+ 'ID: 0x%02X' % id_, 'I: %d' % id_
]])
if len(self.lin_rsp):
self.lin_header.clear()
self.lin_rsp.clear()
- def handle_error(self):
+ def handle_error(self, dummy):
self.putx([3, ['Error', 'Err', 'E']])
def checksum_is_valid(self, pid, data, checksum):
if self.lin_version == 2:
- id = pid & 0x3F
+ id_ = pid & 0x3F
- if id != 60 and id != 61:
+ if id_ != 60 and id_ != 61:
checksum += pid
for d in data:
@staticmethod
def calc_parity(pid):
- id = [((pid & 0x3F) >> i) & 1 for i in range(8)]
+ id_ = [((pid & 0x3F) >> i) & 1 for i in range(8)]
- p0 = id[0] ^ id[1] ^ id[2] ^ id[4]
- p1 = not (id[1] ^ id[3] ^ id[4] ^ id[5])
+ p0 = id_[0] ^ id_[1] ^ id_[2] ^ id_[4]
+ p1 = not (id_[1] ^ id_[3] ^ id_[4] ^ id_[5])
return (p0 << 0) | (p1 << 1)
# - Sync byte is followed by a PID byte (Protected Identifier).
# - PID byte is followed by 1 - 8 data bytes and a final checksum byte.
- if self.fsm.state == LinFsm.State.Error: self.handle_error()
- elif self.fsm.state == LinFsm.State.WaitForBreak: self.handle_wait_for_break(pdata)
- elif self.fsm.state == LinFsm.State.Sync: self.handle_sync(pdata)
- elif self.fsm.state == LinFsm.State.Pid: self.handle_pid(pdata)
- elif self.fsm.state == LinFsm.State.Data: self.handle_data(pdata)
- elif self.fsm.state == LinFsm.State.Checksum: self.handle_checksum()
+ handler = getattr(self, 'handle_%s' % self.fsm.state.lower())
+ handler(pdata)