]> sigrok.org Git - libsigrokdecode.git/blob - decoders/sbus_futaba/pd.py
sbus_futaba: add decoder for SBUS hobby remote control by Futaba
[libsigrokdecode.git] / decoders / sbus_futaba / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 """
21 OUTPUT_PYTHON format:
22
23 Packet:
24 (<ptype>, <pdata>)
25
26 This is the list of <ptype> codes and their respective <pdata> values:
27  - 'HEADER': The data is the header byte's value.
28  - 'PROPORTIONAL': The data is a tuple of the channel number (1-based)
29    and the channel's value.
30  - 'DIGITAL': The data is a tuple of the channel number (1-based)
31    and the channel's value.
32  - 'FLAG': The data is a tuple of the flag's name, and the flag's value.
33  - 'FOOTER': The data is the footer byte's value.
34 """
35
36 import sigrokdecode as srd
37 from common.srdhelper import bitpack_lsb
38
39 class Ann:
40     HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \
41     WARN = range(7)
42     FLAG_LSB = FRAME_LOST
43
44 class Decoder(srd.Decoder):
45     api_version = 3
46     id = 'sbus_futaba'
47     name = 'SBUS (Futaba)'
48     longname = 'Futaba SBUS (Serial bus)'
49     desc = 'Serial bus for hobby remote control by Futaba'
50     license = 'gplv2+'
51     inputs = ['uart']
52     outputs = ['sbus_futaba']
53     tags = ['Remote Control']
54     options = (
55         {'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0},
56         {'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047},
57     )
58     annotations = (
59         ('header', 'Header'),
60         ('proportional', 'Proportional'),
61         ('digital', 'Digital'),
62         ('framelost', 'Frame Lost'),
63         ('failsafe', 'Failsafe'),
64         ('footer', 'Footer'),
65         ('warning', 'Warning'),
66     )
67     annotation_rows = (
68         ('framing', 'Framing', (Ann.HEADER, Ann.FOOTER,
69             Ann.FRAME_LOST, Ann.FAILSAFE)),
70         ('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)),
71         ('warnings', 'Warnings', (Ann.WARN,)),
72     )
73
74     def __init__(self):
75         self.bits_accum = []
76         self.sent_fields = None
77         self.msg_complete = None
78         self.failed = None
79         self.reset()
80
81     def reset(self):
82         self.bits_accum.clear()
83         self.sent_fields = 0
84         self.msg_complete = False
85         self.failed = None
86
87     def start(self):
88         self.out_ann = self.register(srd.OUTPUT_ANN)
89         self.out_py = self.register(srd.OUTPUT_PYTHON)
90
91     def putg(self, ss, es, data):
92         # Put a graphical annotation.
93         self.put(ss, es, self.out_ann, data)
94
95     def putpy(self, ss, es, data):
96         # Pass Python to upper layers.
97         self.put(ss, es, self.out_py, data)
98
99     def get_ss_es_bits(self, bitcount):
100         # Get start/end times, and bit values of given length.
101         # Gets all remaining data when 'bitcount' is None.
102         if bitcount is None:
103             bitcount = len(self.bits_accum)
104         if len(self.bits_accum) < bitcount:
105             return None, None, None
106         bits = self.bits_accum[:bitcount]
107         self.bits_accum = self.bits_accum[bitcount:]
108         ss, es = bits[0][1], bits[-1][2]
109         bits = [b[0] for b in bits]
110         return ss, es, bits
111
112     def flush_accum_bits(self):
113         # Valid data was queued. See if we got full SBUS fields so far.
114         # Annotate them early, cease inspection of failed messages. The
115         # implementation is phrased to reduce the potential for clipboard
116         # errors: 'upto' is the next supported field count, 'want' is one
117         # field's bit count. Grab as many as we find in an invocation.
118         upto = 0
119         if self.failed:
120             return
121         # Annotate the header byte. Not seeing the expected bit pattern
122         # emits a warning annotation, but by design won't fail the SBUS
123         # message. It's considered more useful to present the channels'
124         # values instead. The warning still raises awareness.
125         upto += 1
126         want = 8
127         while self.sent_fields < upto:
128             if len(self.bits_accum) < want:
129                 return
130             ss, es, bits = self.get_ss_es_bits(want)
131             value = bitpack_lsb(bits)
132             text = ['0x{:02x}'.format(value)]
133             self.putg(ss, es, [Ann.HEADER, text])
134             if value != 0x0f:
135                 text = ['Unexpected header', 'Header']
136                 self.putg(ss, es, [Ann.WARN, text])
137             self.putpy(ss, es, ['HEADER', value])
138             self.sent_fields += 1
139         # Annotate the proportional channels' data. Check for user
140         # provided value range violations. Channel numbers are in
141         # the 1..18 range (1-based).
142         upto += 16
143         want = 11
144         while self.sent_fields < upto:
145             if len(self.bits_accum) < want:
146                 return
147             ss, es, bits = self.get_ss_es_bits(want)
148             value = bitpack_lsb(bits)
149             text = ['{:d}'.format(value)]
150             self.putg(ss, es, [Ann.PROPORTIONAL, text])
151             if value < self.options['prop_val_min']:
152                 text = ['Low proportional value', 'Low value', 'Low']
153                 self.putg(ss, es, [Ann.WARN, text])
154             if value > self.options['prop_val_max']:
155                 text = ['High proportional value', 'High value', 'High']
156                 self.putg(ss, es, [Ann.WARN, text])
157             idx = self.sent_fields - (upto - 16)
158             ch_nr = 1 + idx
159             self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)])
160             self.sent_fields += 1
161         # Annotate the digital channels' data.
162         upto += 2
163         want = 1
164         while self.sent_fields < upto:
165             if len(self.bits_accum) < want:
166                 return
167             ss, es, bits = self.get_ss_es_bits(want)
168             value = bitpack_lsb(bits)
169             text = ['{:d}'.format(value)]
170             self.putg(ss, es, [Ann.DIGITAL, text])
171             idx = self.sent_fields - (upto - 2)
172             ch_nr = 17 + idx
173             self.putpy(ss, es, ['DIGITAL', (ch_nr, value)])
174             self.sent_fields += 1
175         # Annotate the flags' state. Index starts from LSB.
176         flag_names = ['framelost', 'failsafe', 'msb']
177         upto += 2
178         want = 1
179         while self.sent_fields < upto:
180             if len(self.bits_accum) < want:
181                 return
182             ss, es, bits = self.get_ss_es_bits(want)
183             value = bitpack_lsb(bits)
184             text = ['{:d}'.format(value)]
185             idx = self.sent_fields - (upto - 2)
186             cls = Ann.FLAG_LSB + idx
187             self.putg(ss, es, [cls, text])
188             flg_name = flag_names[idx]
189             self.putpy(ss, es, ['FLAG', (flg_name, value)])
190             self.sent_fields += 1
191         # Warn when flags' padding (bits [7:4]) is unexpexted.
192         upto += 1
193         want = 4
194         while self.sent_fields < upto:
195             if len(self.bits_accum) < want:
196                 return
197             ss, es, bits = self.get_ss_es_bits(want)
198             value = bitpack_lsb(bits)
199             if value != 0x0:
200                 text = ['Unexpected MSB flags', 'Flags']
201                 self.putg(ss, es, [Ann.WARN, text])
202             flg_name = flag_names[-1]
203             self.putpy(ss, es, ['FLAG', (flg_name, value)])
204             self.sent_fields += 1
205         # Annotate the footer byte. Warn when unexpected.
206         upto += 1
207         want = 8
208         while self.sent_fields < upto:
209             if len(self.bits_accum) < want:
210                 return
211             ss, es, bits = self.get_ss_es_bits(want)
212             value = bitpack_lsb(bits)
213             text = ['0x{:02x}'.format(value)]
214             self.putg(ss, es, [Ann.FOOTER, text])
215             if value != 0x00:
216                 text = ['Unexpected footer', 'Footer']
217                 self.putg(ss, es, [Ann.WARN, text])
218             self.putpy(ss, es, ['FOOTER', value])
219             self.sent_fields += 1
220         # Check for the completion of an SBUS message. Warn when more
221         # UART data is seen after the message. Defer the warning until
222         # more bits were collected, flush at next IDLE or BREAK, which
223         # spans all unprocessed data, and improves perception.
224         if self.sent_fields >= upto:
225             self.msg_complete = True
226         if self.msg_complete and self.bits_accum:
227             self.failed = ['Excess data bits', 'Excess']
228
229     def handle_bits(self, ss, es, bits):
230         # UART data bits were seen. Store them, validity is yet unknown.
231         self.bits_accum.extend(bits)
232
233     def handle_frame(self, ss, es, value, valid):
234         # A UART frame became complete. Get its validity. Process its bits.
235         if not valid:
236             self.failed = ['Invalid data', 'Invalid']
237         self.flush_accum_bits()
238
239     def handle_idle(self, ss, es):
240         # An IDLE period was seen in the UART level. Flush, reset state.
241         if self.bits_accum and not self.failed:
242             self.failed = ['Unprocessed data bits', 'Unprocessed']
243         if self.bits_accum and self.failed:
244             ss, es, _ = self.get_ss_es_bits(None)
245             self.putg(ss, es, [Ann.WARN, self.failed])
246         self.reset()
247
248     def handle_break(self, ss, es):
249         # A BREAK period was seen in the UART level. Warn, reset state.
250         break_ss, break_es = ss, es
251         if not self.failed:
252             self.failed = ['BREAK condition', 'Break']
253         # Re-use logic for "annotated bits warning".
254         self.handle_idle(None, None)
255         # Unconditionally annotate BREAK as warning.
256         text = ['BREAK condition', 'Break']
257         self.putg(ss, es, [Ann.WARN, text])
258         self.reset()
259
260     def decode(self, ss, es, data):
261         # Implementor's note: Expects DATA bits to arrive before FRAME
262         # validity. Either of IDLE or BREAK terminates an SBUS message.
263         ptype, rxtx, pdata = data
264         if ptype == 'DATA':
265             _, bits = pdata
266             self.handle_bits(ss, es, bits)
267         elif ptype == 'FRAME':
268             value, valid = pdata
269             self.handle_frame(ss, es, value, valid)
270         elif ptype == 'IDLE':
271             self.handle_idle(ss, es)
272         elif ptype == 'BREAK':
273             self.handle_break(ss, es)