]> sigrok.org Git - libsigrokdecode.git/commitdiff
sbus_futaba: add decoder for SBUS hobby remote control by Futaba
authorGerhard Sittig <redacted>
Wed, 20 Apr 2022 05:58:28 +0000 (07:58 +0200)
committerGerhard Sittig <redacted>
Sat, 23 Apr 2022 16:58:48 +0000 (18:58 +0200)
This is the SBUS remote control by Futaba, the 25 bytes on top of UART.
Not the computer peripheral bus. Hence the suffix in the decoder name.

The implementation was tested with synthetic data. Example captures with
real world data have yet to become available. This implementation shows
the message framing, the proportional and digital channels' values, and
the flags. Several warnings for short and long and invalid messages are
implemented, as are user adjustable channel value range limits.

The boilerplate may need adjustment. All naming was made up by me based
on what information was available (vendor doc was missing).

decoders/sbus_futaba/__init__.py [new file with mode: 0644]
decoders/sbus_futaba/pd.py [new file with mode: 0644]

diff --git a/decoders/sbus_futaba/__init__.py b/decoders/sbus_futaba/__init__.py
new file mode 100644 (file)
index 0000000..9404f4f
--- /dev/null
@@ -0,0 +1,35 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+'''
+SBUS by Futaba, a hobby remote control protocol on top of UART.
+Sometimes referred to as "Serial BUS" or S-BUS.
+
+UART communication typically runs at 100kbps with 8e2 frame format and
+inverted signals (high voltage level is logic low).
+
+SBUS messages take 3ms to transfer, and typically repeat in intervals
+of 7ms or 14ms. An SBUS message consists of 25 UART bytes, and carries
+16 proportional channels with 11 bits each, and 2 digital channels
+(boolean, 1 bit), and flags which represent current communication state.
+Proportional channel values typically are in the 192..1792 range, but
+individual implementations may differ.
+'''
+
+from .pd import Decoder
diff --git a/decoders/sbus_futaba/pd.py b/decoders/sbus_futaba/pd.py
new file mode 100644 (file)
index 0000000..75c2cfb
--- /dev/null
@@ -0,0 +1,273 @@
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+"""
+OUTPUT_PYTHON format:
+
+Packet:
+(<ptype>, <pdata>)
+
+This is the list of <ptype> codes and their respective <pdata> values:
+ - 'HEADER': The data is the header byte's value.
+ - 'PROPORTIONAL': The data is a tuple of the channel number (1-based)
+   and the channel's value.
+ - 'DIGITAL': The data is a tuple of the channel number (1-based)
+   and the channel's value.
+ - 'FLAG': The data is a tuple of the flag's name, and the flag's value.
+ - 'FOOTER': The data is the footer byte's value.
+"""
+
+import sigrokdecode as srd
+from common.srdhelper import bitpack_lsb
+
+class Ann:
+    HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \
+    WARN = range(7)
+    FLAG_LSB = FRAME_LOST
+
+class Decoder(srd.Decoder):
+    api_version = 3
+    id = 'sbus_futaba'
+    name = 'SBUS (Futaba)'
+    longname = 'Futaba SBUS (Serial bus)'
+    desc = 'Serial bus for hobby remote control by Futaba'
+    license = 'gplv2+'
+    inputs = ['uart']
+    outputs = ['sbus_futaba']
+    tags = ['Remote Control']
+    options = (
+        {'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0},
+        {'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047},
+    )
+    annotations = (
+        ('header', 'Header'),
+        ('proportional', 'Proportional'),
+        ('digital', 'Digital'),
+        ('framelost', 'Frame Lost'),
+        ('failsafe', 'Failsafe'),
+        ('footer', 'Footer'),
+        ('warning', 'Warning'),
+    )
+    annotation_rows = (
+        ('framing', 'Framing', (Ann.HEADER, Ann.FOOTER,
+            Ann.FRAME_LOST, Ann.FAILSAFE)),
+        ('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)),
+        ('warnings', 'Warnings', (Ann.WARN,)),
+    )
+
+    def __init__(self):
+        self.bits_accum = []
+        self.sent_fields = None
+        self.msg_complete = None
+        self.failed = None
+        self.reset()
+
+    def reset(self):
+        self.bits_accum.clear()
+        self.sent_fields = 0
+        self.msg_complete = False
+        self.failed = None
+
+    def start(self):
+        self.out_ann = self.register(srd.OUTPUT_ANN)
+        self.out_py = self.register(srd.OUTPUT_PYTHON)
+
+    def putg(self, ss, es, data):
+        # Put a graphical annotation.
+        self.put(ss, es, self.out_ann, data)
+
+    def putpy(self, ss, es, data):
+        # Pass Python to upper layers.
+        self.put(ss, es, self.out_py, data)
+
+    def get_ss_es_bits(self, bitcount):
+        # Get start/end times, and bit values of given length.
+        # Gets all remaining data when 'bitcount' is None.
+        if bitcount is None:
+            bitcount = len(self.bits_accum)
+        if len(self.bits_accum) < bitcount:
+            return None, None, None
+        bits = self.bits_accum[:bitcount]
+        self.bits_accum = self.bits_accum[bitcount:]
+        ss, es = bits[0][1], bits[-1][2]
+        bits = [b[0] for b in bits]
+        return ss, es, bits
+
+    def flush_accum_bits(self):
+        # Valid data was queued. See if we got full SBUS fields so far.
+        # Annotate them early, cease inspection of failed messages. The
+        # implementation is phrased to reduce the potential for clipboard
+        # errors: 'upto' is the next supported field count, 'want' is one
+        # field's bit count. Grab as many as we find in an invocation.
+        upto = 0
+        if self.failed:
+            return
+        # Annotate the header byte. Not seeing the expected bit pattern
+        # emits a warning annotation, but by design won't fail the SBUS
+        # message. It's considered more useful to present the channels'
+        # values instead. The warning still raises awareness.
+        upto += 1
+        want = 8
+        while self.sent_fields < upto:
+            if len(self.bits_accum) < want:
+                return
+            ss, es, bits = self.get_ss_es_bits(want)
+            value = bitpack_lsb(bits)
+            text = ['0x{:02x}'.format(value)]
+            self.putg(ss, es, [Ann.HEADER, text])
+            if value != 0x0f:
+                text = ['Unexpected header', 'Header']
+                self.putg(ss, es, [Ann.WARN, text])
+            self.putpy(ss, es, ['HEADER', value])
+            self.sent_fields += 1
+        # Annotate the proportional channels' data. Check for user
+        # provided value range violations. Channel numbers are in
+        # the 1..18 range (1-based).
+        upto += 16
+        want = 11
+        while self.sent_fields < upto:
+            if len(self.bits_accum) < want:
+                return
+            ss, es, bits = self.get_ss_es_bits(want)
+            value = bitpack_lsb(bits)
+            text = ['{:d}'.format(value)]
+            self.putg(ss, es, [Ann.PROPORTIONAL, text])
+            if value < self.options['prop_val_min']:
+                text = ['Low proportional value', 'Low value', 'Low']
+                self.putg(ss, es, [Ann.WARN, text])
+            if value > self.options['prop_val_max']:
+                text = ['High proportional value', 'High value', 'High']
+                self.putg(ss, es, [Ann.WARN, text])
+            idx = self.sent_fields - (upto - 16)
+            ch_nr = 1 + idx
+            self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)])
+            self.sent_fields += 1
+        # Annotate the digital channels' data.
+        upto += 2
+        want = 1
+        while self.sent_fields < upto:
+            if len(self.bits_accum) < want:
+                return
+            ss, es, bits = self.get_ss_es_bits(want)
+            value = bitpack_lsb(bits)
+            text = ['{:d}'.format(value)]
+            self.putg(ss, es, [Ann.DIGITAL, text])
+            idx = self.sent_fields - (upto - 2)
+            ch_nr = 17 + idx
+            self.putpy(ss, es, ['DIGITAL', (ch_nr, value)])
+            self.sent_fields += 1
+        # Annotate the flags' state. Index starts from LSB.
+        flag_names = ['framelost', 'failsafe', 'msb']
+        upto += 2
+        want = 1
+        while self.sent_fields < upto:
+            if len(self.bits_accum) < want:
+                return
+            ss, es, bits = self.get_ss_es_bits(want)
+            value = bitpack_lsb(bits)
+            text = ['{:d}'.format(value)]
+            idx = self.sent_fields - (upto - 2)
+            cls = Ann.FLAG_LSB + idx
+            self.putg(ss, es, [cls, text])
+            flg_name = flag_names[idx]
+            self.putpy(ss, es, ['FLAG', (flg_name, value)])
+            self.sent_fields += 1
+        # Warn when flags' padding (bits [7:4]) is unexpexted.
+        upto += 1
+        want = 4
+        while self.sent_fields < upto:
+            if len(self.bits_accum) < want:
+                return
+            ss, es, bits = self.get_ss_es_bits(want)
+            value = bitpack_lsb(bits)
+            if value != 0x0:
+                text = ['Unexpected MSB flags', 'Flags']
+                self.putg(ss, es, [Ann.WARN, text])
+            flg_name = flag_names[-1]
+            self.putpy(ss, es, ['FLAG', (flg_name, value)])
+            self.sent_fields += 1
+        # Annotate the footer byte. Warn when unexpected.
+        upto += 1
+        want = 8
+        while self.sent_fields < upto:
+            if len(self.bits_accum) < want:
+                return
+            ss, es, bits = self.get_ss_es_bits(want)
+            value = bitpack_lsb(bits)
+            text = ['0x{:02x}'.format(value)]
+            self.putg(ss, es, [Ann.FOOTER, text])
+            if value != 0x00:
+                text = ['Unexpected footer', 'Footer']
+                self.putg(ss, es, [Ann.WARN, text])
+            self.putpy(ss, es, ['FOOTER', value])
+            self.sent_fields += 1
+        # Check for the completion of an SBUS message. Warn when more
+        # UART data is seen after the message. Defer the warning until
+        # more bits were collected, flush at next IDLE or BREAK, which
+        # spans all unprocessed data, and improves perception.
+        if self.sent_fields >= upto:
+            self.msg_complete = True
+        if self.msg_complete and self.bits_accum:
+            self.failed = ['Excess data bits', 'Excess']
+
+    def handle_bits(self, ss, es, bits):
+        # UART data bits were seen. Store them, validity is yet unknown.
+        self.bits_accum.extend(bits)
+
+    def handle_frame(self, ss, es, value, valid):
+        # A UART frame became complete. Get its validity. Process its bits.
+        if not valid:
+            self.failed = ['Invalid data', 'Invalid']
+        self.flush_accum_bits()
+
+    def handle_idle(self, ss, es):
+        # An IDLE period was seen in the UART level. Flush, reset state.
+        if self.bits_accum and not self.failed:
+            self.failed = ['Unprocessed data bits', 'Unprocessed']
+        if self.bits_accum and self.failed:
+            ss, es, _ = self.get_ss_es_bits(None)
+            self.putg(ss, es, [Ann.WARN, self.failed])
+        self.reset()
+
+    def handle_break(self, ss, es):
+        # A BREAK period was seen in the UART level. Warn, reset state.
+        break_ss, break_es = ss, es
+        if not self.failed:
+            self.failed = ['BREAK condition', 'Break']
+        # Re-use logic for "annotated bits warning".
+        self.handle_idle(None, None)
+        # Unconditionally annotate BREAK as warning.
+        text = ['BREAK condition', 'Break']
+        self.putg(ss, es, [Ann.WARN, text])
+        self.reset()
+
+    def decode(self, ss, es, data):
+        # Implementor's note: Expects DATA bits to arrive before FRAME
+        # validity. Either of IDLE or BREAK terminates an SBUS message.
+        ptype, rxtx, pdata = data
+        if ptype == 'DATA':
+            _, bits = pdata
+            self.handle_bits(ss, es, bits)
+        elif ptype == 'FRAME':
+            value, valid = pdata
+            self.handle_frame(ss, es, value, valid)
+        elif ptype == 'IDLE':
+            self.handle_idle(ss, es)
+        elif ptype == 'BREAK':
+            self.handle_break(ss, es)