]> sigrok.org Git - libsigrokdecode.git/blame - decoders/sbus_futaba/pd.py
sbus_futaba: add decoder for SBUS hobby remote control by Futaba
[libsigrokdecode.git] / decoders / sbus_futaba / pd.py
CommitLineData
f0356672
GS
1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
5##
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, see <http://www.gnu.org/licenses/>.
18##
19
20"""
21OUTPUT_PYTHON format:
22
23Packet:
24(<ptype>, <pdata>)
25
26This is the list of <ptype> codes and their respective <pdata> values:
27 - 'HEADER': The data is the header byte's value.
28 - 'PROPORTIONAL': The data is a tuple of the channel number (1-based)
29 and the channel's value.
30 - 'DIGITAL': The data is a tuple of the channel number (1-based)
31 and the channel's value.
32 - 'FLAG': The data is a tuple of the flag's name, and the flag's value.
33 - 'FOOTER': The data is the footer byte's value.
34"""
35
36import sigrokdecode as srd
37from common.srdhelper import bitpack_lsb
38
39class Ann:
40 HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \
41 WARN = range(7)
42 FLAG_LSB = FRAME_LOST
43
44class Decoder(srd.Decoder):
45 api_version = 3
46 id = 'sbus_futaba'
47 name = 'SBUS (Futaba)'
48 longname = 'Futaba SBUS (Serial bus)'
49 desc = 'Serial bus for hobby remote control by Futaba'
50 license = 'gplv2+'
51 inputs = ['uart']
52 outputs = ['sbus_futaba']
53 tags = ['Remote Control']
54 options = (
55 {'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0},
56 {'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047},
57 )
58 annotations = (
59 ('header', 'Header'),
60 ('proportional', 'Proportional'),
61 ('digital', 'Digital'),
62 ('framelost', 'Frame Lost'),
63 ('failsafe', 'Failsafe'),
64 ('footer', 'Footer'),
65 ('warning', 'Warning'),
66 )
67 annotation_rows = (
68 ('framing', 'Framing', (Ann.HEADER, Ann.FOOTER,
69 Ann.FRAME_LOST, Ann.FAILSAFE)),
70 ('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)),
71 ('warnings', 'Warnings', (Ann.WARN,)),
72 )
73
74 def __init__(self):
75 self.bits_accum = []
76 self.sent_fields = None
77 self.msg_complete = None
78 self.failed = None
79 self.reset()
80
81 def reset(self):
82 self.bits_accum.clear()
83 self.sent_fields = 0
84 self.msg_complete = False
85 self.failed = None
86
87 def start(self):
88 self.out_ann = self.register(srd.OUTPUT_ANN)
89 self.out_py = self.register(srd.OUTPUT_PYTHON)
90
91 def putg(self, ss, es, data):
92 # Put a graphical annotation.
93 self.put(ss, es, self.out_ann, data)
94
95 def putpy(self, ss, es, data):
96 # Pass Python to upper layers.
97 self.put(ss, es, self.out_py, data)
98
99 def get_ss_es_bits(self, bitcount):
100 # Get start/end times, and bit values of given length.
101 # Gets all remaining data when 'bitcount' is None.
102 if bitcount is None:
103 bitcount = len(self.bits_accum)
104 if len(self.bits_accum) < bitcount:
105 return None, None, None
106 bits = self.bits_accum[:bitcount]
107 self.bits_accum = self.bits_accum[bitcount:]
108 ss, es = bits[0][1], bits[-1][2]
109 bits = [b[0] for b in bits]
110 return ss, es, bits
111
112 def flush_accum_bits(self):
113 # Valid data was queued. See if we got full SBUS fields so far.
114 # Annotate them early, cease inspection of failed messages. The
115 # implementation is phrased to reduce the potential for clipboard
116 # errors: 'upto' is the next supported field count, 'want' is one
117 # field's bit count. Grab as many as we find in an invocation.
118 upto = 0
119 if self.failed:
120 return
121 # Annotate the header byte. Not seeing the expected bit pattern
122 # emits a warning annotation, but by design won't fail the SBUS
123 # message. It's considered more useful to present the channels'
124 # values instead. The warning still raises awareness.
125 upto += 1
126 want = 8
127 while self.sent_fields < upto:
128 if len(self.bits_accum) < want:
129 return
130 ss, es, bits = self.get_ss_es_bits(want)
131 value = bitpack_lsb(bits)
132 text = ['0x{:02x}'.format(value)]
133 self.putg(ss, es, [Ann.HEADER, text])
134 if value != 0x0f:
135 text = ['Unexpected header', 'Header']
136 self.putg(ss, es, [Ann.WARN, text])
137 self.putpy(ss, es, ['HEADER', value])
138 self.sent_fields += 1
139 # Annotate the proportional channels' data. Check for user
140 # provided value range violations. Channel numbers are in
141 # the 1..18 range (1-based).
142 upto += 16
143 want = 11
144 while self.sent_fields < upto:
145 if len(self.bits_accum) < want:
146 return
147 ss, es, bits = self.get_ss_es_bits(want)
148 value = bitpack_lsb(bits)
149 text = ['{:d}'.format(value)]
150 self.putg(ss, es, [Ann.PROPORTIONAL, text])
151 if value < self.options['prop_val_min']:
152 text = ['Low proportional value', 'Low value', 'Low']
153 self.putg(ss, es, [Ann.WARN, text])
154 if value > self.options['prop_val_max']:
155 text = ['High proportional value', 'High value', 'High']
156 self.putg(ss, es, [Ann.WARN, text])
157 idx = self.sent_fields - (upto - 16)
158 ch_nr = 1 + idx
159 self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)])
160 self.sent_fields += 1
161 # Annotate the digital channels' data.
162 upto += 2
163 want = 1
164 while self.sent_fields < upto:
165 if len(self.bits_accum) < want:
166 return
167 ss, es, bits = self.get_ss_es_bits(want)
168 value = bitpack_lsb(bits)
169 text = ['{:d}'.format(value)]
170 self.putg(ss, es, [Ann.DIGITAL, text])
171 idx = self.sent_fields - (upto - 2)
172 ch_nr = 17 + idx
173 self.putpy(ss, es, ['DIGITAL', (ch_nr, value)])
174 self.sent_fields += 1
175 # Annotate the flags' state. Index starts from LSB.
176 flag_names = ['framelost', 'failsafe', 'msb']
177 upto += 2
178 want = 1
179 while self.sent_fields < upto:
180 if len(self.bits_accum) < want:
181 return
182 ss, es, bits = self.get_ss_es_bits(want)
183 value = bitpack_lsb(bits)
184 text = ['{:d}'.format(value)]
185 idx = self.sent_fields - (upto - 2)
186 cls = Ann.FLAG_LSB + idx
187 self.putg(ss, es, [cls, text])
188 flg_name = flag_names[idx]
189 self.putpy(ss, es, ['FLAG', (flg_name, value)])
190 self.sent_fields += 1
191 # Warn when flags' padding (bits [7:4]) is unexpexted.
192 upto += 1
193 want = 4
194 while self.sent_fields < upto:
195 if len(self.bits_accum) < want:
196 return
197 ss, es, bits = self.get_ss_es_bits(want)
198 value = bitpack_lsb(bits)
199 if value != 0x0:
200 text = ['Unexpected MSB flags', 'Flags']
201 self.putg(ss, es, [Ann.WARN, text])
202 flg_name = flag_names[-1]
203 self.putpy(ss, es, ['FLAG', (flg_name, value)])
204 self.sent_fields += 1
205 # Annotate the footer byte. Warn when unexpected.
206 upto += 1
207 want = 8
208 while self.sent_fields < upto:
209 if len(self.bits_accum) < want:
210 return
211 ss, es, bits = self.get_ss_es_bits(want)
212 value = bitpack_lsb(bits)
213 text = ['0x{:02x}'.format(value)]
214 self.putg(ss, es, [Ann.FOOTER, text])
215 if value != 0x00:
216 text = ['Unexpected footer', 'Footer']
217 self.putg(ss, es, [Ann.WARN, text])
218 self.putpy(ss, es, ['FOOTER', value])
219 self.sent_fields += 1
220 # Check for the completion of an SBUS message. Warn when more
221 # UART data is seen after the message. Defer the warning until
222 # more bits were collected, flush at next IDLE or BREAK, which
223 # spans all unprocessed data, and improves perception.
224 if self.sent_fields >= upto:
225 self.msg_complete = True
226 if self.msg_complete and self.bits_accum:
227 self.failed = ['Excess data bits', 'Excess']
228
229 def handle_bits(self, ss, es, bits):
230 # UART data bits were seen. Store them, validity is yet unknown.
231 self.bits_accum.extend(bits)
232
233 def handle_frame(self, ss, es, value, valid):
234 # A UART frame became complete. Get its validity. Process its bits.
235 if not valid:
236 self.failed = ['Invalid data', 'Invalid']
237 self.flush_accum_bits()
238
239 def handle_idle(self, ss, es):
240 # An IDLE period was seen in the UART level. Flush, reset state.
241 if self.bits_accum and not self.failed:
242 self.failed = ['Unprocessed data bits', 'Unprocessed']
243 if self.bits_accum and self.failed:
244 ss, es, _ = self.get_ss_es_bits(None)
245 self.putg(ss, es, [Ann.WARN, self.failed])
246 self.reset()
247
248 def handle_break(self, ss, es):
249 # A BREAK period was seen in the UART level. Warn, reset state.
250 break_ss, break_es = ss, es
251 if not self.failed:
252 self.failed = ['BREAK condition', 'Break']
253 # Re-use logic for "annotated bits warning".
254 self.handle_idle(None, None)
255 # Unconditionally annotate BREAK as warning.
256 text = ['BREAK condition', 'Break']
257 self.putg(ss, es, [Ann.WARN, text])
258 self.reset()
259
260 def decode(self, ss, es, data):
261 # Implementor's note: Expects DATA bits to arrive before FRAME
262 # validity. Either of IDLE or BREAK terminates an SBUS message.
263 ptype, rxtx, pdata = data
264 if ptype == 'DATA':
265 _, bits = pdata
266 self.handle_bits(ss, es, bits)
267 elif ptype == 'FRAME':
268 value, valid = pdata
269 self.handle_frame(ss, es, value, valid)
270 elif ptype == 'IDLE':
271 self.handle_idle(ss, es)
272 elif ptype == 'BREAK':
273 self.handle_break(ss, es)