RX = 0
TX = 1
+rxtx_channels = ('RX', 'TX')
class No_more_data(Exception):
'''This exception is a signal that we should stop parsing an ADU as there
self.annotation_prefix + 'error',
'Message too short or not finished')
self.hasError = True
- if self.hasError and self.parent.options['channel'] == 'RX':
- # If we are on RX mode (so client->server and server->client
- # messages can be seperated) we like to mark blocks containing
- # errors. We don't do this in TX mode, because then we interpret
- # each frame as both a client->server and server->client frame, and
+ if self.hasError and self.parent.options['scchannel'] != self.parent.options['cschannel']:
+ # If we are decoding different channels (so client->server and
+ # server->client messages can be separated) we like to mark blocks
+ # containing errors. We don't do this when decoding the same
+ # channel as both a client->server and server->client frame, and
# one of those is bound to contain an error, making highlighting
# frames useless.
self.parent.puta(data[0].start, data[-1].end,
'error-indication', 'Frame contains error')
if len(data) > 256:
try:
- self.puti(len(data) - 1, self.annotation_prefix + 'error',
+ self.puti(len(data) - 1, 'error',
'Modbus data frames are limited to 256 bytes')
except No_more_data:
pass
self.check_crc(bytecount + 12)
class Decoder(srd.Decoder):
- api_version = 2
+ api_version = 3
id = 'modbus'
name = 'Modbus'
longname = 'Modbus RTU over RS232/RS485'
desc = 'Modbus RTU protocol for industrial applications.'
- license = 'gplv2+'
+ license = 'gplv3+'
inputs = ['uart']
outputs = ['modbus']
+ tags = ['Embedded/industrial']
annotations = (
- ('sc-server-id', ''),
- ('sc-function', ''),
- ('sc-crc', ''),
- ('sc-address', ''),
- ('sc-data', ''),
- ('sc-length', ''),
- ('sc-error', ''),
- ('cs-server-id', ''),
- ('cs-function', ''),
- ('cs-crc', ''),
- ('cs-address', ''),
- ('cs-data', ''),
- ('cs-length', ''),
- ('cs-error', ''),
- ('error-indication', ''),
+ ('sc-server-id', 'SC server ID'),
+ ('sc-function', 'SC function'),
+ ('sc-crc', 'SC CRC'),
+ ('sc-address', 'SC address'),
+ ('sc-data', 'SC data'),
+ ('sc-length', 'SC length'),
+ ('sc-error', 'SC error'),
+ ('cs-server-id', 'CS server ID'),
+ ('cs-function', 'CS function'),
+ ('cs-crc', 'CS CRC'),
+ ('cs-address', 'CS address'),
+ ('cs-data', 'CS data'),
+ ('cs-length', 'CS length'),
+ ('cs-error', 'CS error'),
+ ('error-indication', 'Error indication'),
)
annotation_rows = (
('sc', 'Server->client', (0, 1, 2, 3, 4, 5, 6)),
('cs', 'Client->server', (7, 8, 9, 10, 11, 12, 13)),
- ('error-indicator', 'Errors in frame', (14,)),
+ ('error-indicators', 'Errors in frame', (14,)),
)
options = (
- {'id': 'channel', 'desc': 'Server -> client channel', 'default': 'RX',
- 'values': ('RX', 'TX')},
+ {'id': 'scchannel', 'desc': 'Server -> client channel',
+ 'default': rxtx_channels[0], 'values': rxtx_channels},
+ {'id': 'cschannel', 'desc': 'Client -> server channel',
+ 'default': rxtx_channels[1], 'values': rxtx_channels},
+ {'id': 'framegap', 'desc': 'Inter-frame bit gap', 'default': 28},
)
- def __init__(self, **kwargs):
+ def __init__(self):
+ self.reset()
+
+ def reset(self):
self.ADUSc = None # Start off with empty slave -> client ADU.
self.ADUCs = None # Start off with empty client -> slave ADU.
# somewhere between seems fine.
# A character is 11 bits long, so (3.5 + 1.5)/2 * 11 ~= 28
# TODO: Display error for too short or too long.
- if (ss - ADU.last_read) <= self.bitlength * 28:
+ if (ss - ADU.last_read) <= self.bitlength * self.options['framegap']:
ADU.add_data(ss, es, data)
else:
# It's been too long since the last part of the ADU!
def decode(self, ss, es, data):
ptype, rxtx, pdata = data
+ # Ignore unknown/unsupported ptypes.
+ if ptype not in ('STARTBIT', 'DATA', 'STOPBIT'):
+ return
+
# Decide what ADU(s) we need this packet to go to.
# Note that it's possible to go to both ADUs.
- if rxtx == TX:
- self.decode_adu(ss, es, data, 'Cs')
- if rxtx == TX and self.options['channel'] == 'TX':
- self.decode_adu(ss, es, data, 'Sc')
- if rxtx == RX and self.options['channel'] == 'RX':
+ if rxtx_channels[rxtx] == self.options['scchannel']:
self.decode_adu(ss, es, data, 'Sc')
+ if rxtx_channels[rxtx] == self.options['cschannel']:
+ self.decode_adu(ss, es, data, 'Cs')