]>
Commit | Line | Data |
---|---|---|
1 | ## | |
2 | ## This file is part of the libsigrokdecode project. | |
3 | ## | |
4 | ## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net> | |
5 | ## | |
6 | ## This program is free software; you can redistribute it and/or modify | |
7 | ## it under the terms of the GNU General Public License as published by | |
8 | ## the Free Software Foundation; either version 2 of the License, or | |
9 | ## (at your option) any later version. | |
10 | ## | |
11 | ## This program is distributed in the hope that it will be useful, | |
12 | ## but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 | ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 | ## GNU General Public License for more details. | |
15 | ## | |
16 | ## You should have received a copy of the GNU General Public License | |
17 | ## along with this program; if not, see <http://www.gnu.org/licenses/>. | |
18 | ## | |
19 | ||
20 | """ | |
21 | OUTPUT_PYTHON format: | |
22 | ||
23 | Packet: | |
24 | (<ptype>, <pdata>) | |
25 | ||
26 | This is the list of <ptype> codes and their respective <pdata> values: | |
27 | - 'HEADER': The data is the header byte's value. | |
28 | - 'PROPORTIONAL': The data is a tuple of the channel number (1-based) | |
29 | and the channel's value. | |
30 | - 'DIGITAL': The data is a tuple of the channel number (1-based) | |
31 | and the channel's value. | |
32 | - 'FLAG': The data is a tuple of the flag's name, and the flag's value. | |
33 | - 'FOOTER': The data is the footer byte's value. | |
34 | """ | |
35 | ||
36 | import sigrokdecode as srd | |
37 | from common.srdhelper import bitpack_lsb | |
38 | ||
39 | class Ann: | |
40 | HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \ | |
41 | WARN = range(7) | |
42 | FLAG_LSB = FRAME_LOST | |
43 | ||
44 | class Decoder(srd.Decoder): | |
45 | api_version = 3 | |
46 | id = 'sbus_futaba' | |
47 | name = 'SBUS (Futaba)' | |
48 | longname = 'Futaba SBUS (Serial bus)' | |
49 | desc = 'Serial bus for hobby remote control by Futaba' | |
50 | license = 'gplv2+' | |
51 | inputs = ['uart'] | |
52 | outputs = ['sbus_futaba'] | |
53 | tags = ['Remote Control'] | |
54 | options = ( | |
55 | {'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0}, | |
56 | {'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047}, | |
57 | ) | |
58 | annotations = ( | |
59 | ('header', 'Header'), | |
60 | ('proportional', 'Proportional'), | |
61 | ('digital', 'Digital'), | |
62 | ('framelost', 'Frame Lost'), | |
63 | ('failsafe', 'Failsafe'), | |
64 | ('footer', 'Footer'), | |
65 | ('warning', 'Warning'), | |
66 | ) | |
67 | annotation_rows = ( | |
68 | ('framing', 'Framing', (Ann.HEADER, Ann.FOOTER, | |
69 | Ann.FRAME_LOST, Ann.FAILSAFE)), | |
70 | ('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)), | |
71 | ('warnings', 'Warnings', (Ann.WARN,)), | |
72 | ) | |
73 | ||
74 | def __init__(self): | |
75 | self.bits_accum = [] | |
76 | self.sent_fields = None | |
77 | self.msg_complete = None | |
78 | self.failed = None | |
79 | self.reset() | |
80 | ||
81 | def reset(self): | |
82 | self.bits_accum.clear() | |
83 | self.sent_fields = 0 | |
84 | self.msg_complete = False | |
85 | self.failed = None | |
86 | ||
87 | def start(self): | |
88 | self.out_ann = self.register(srd.OUTPUT_ANN) | |
89 | self.out_py = self.register(srd.OUTPUT_PYTHON) | |
90 | ||
91 | def putg(self, ss, es, data): | |
92 | # Put a graphical annotation. | |
93 | self.put(ss, es, self.out_ann, data) | |
94 | ||
95 | def putpy(self, ss, es, data): | |
96 | # Pass Python to upper layers. | |
97 | self.put(ss, es, self.out_py, data) | |
98 | ||
99 | def get_ss_es_bits(self, bitcount): | |
100 | # Get start/end times, and bit values of given length. | |
101 | # Gets all remaining data when 'bitcount' is None. | |
102 | if bitcount is None: | |
103 | bitcount = len(self.bits_accum) | |
104 | if len(self.bits_accum) < bitcount: | |
105 | return None, None, None | |
106 | bits = self.bits_accum[:bitcount] | |
107 | self.bits_accum = self.bits_accum[bitcount:] | |
108 | ss, es = bits[0][1], bits[-1][2] | |
109 | bits = [b[0] for b in bits] | |
110 | return ss, es, bits | |
111 | ||
112 | def flush_accum_bits(self): | |
113 | # Valid data was queued. See if we got full SBUS fields so far. | |
114 | # Annotate them early, cease inspection of failed messages. The | |
115 | # implementation is phrased to reduce the potential for clipboard | |
116 | # errors: 'upto' is the next supported field count, 'want' is one | |
117 | # field's bit count. Grab as many as we find in an invocation. | |
118 | upto = 0 | |
119 | if self.failed: | |
120 | return | |
121 | # Annotate the header byte. Not seeing the expected bit pattern | |
122 | # emits a warning annotation, but by design won't fail the SBUS | |
123 | # message. It's considered more useful to present the channels' | |
124 | # values instead. The warning still raises awareness. | |
125 | upto += 1 | |
126 | want = 8 | |
127 | while self.sent_fields < upto: | |
128 | if len(self.bits_accum) < want: | |
129 | return | |
130 | ss, es, bits = self.get_ss_es_bits(want) | |
131 | value = bitpack_lsb(bits) | |
132 | text = ['0x{:02x}'.format(value)] | |
133 | self.putg(ss, es, [Ann.HEADER, text]) | |
134 | if value != 0x0f: | |
135 | text = ['Unexpected header', 'Header'] | |
136 | self.putg(ss, es, [Ann.WARN, text]) | |
137 | self.putpy(ss, es, ['HEADER', value]) | |
138 | self.sent_fields += 1 | |
139 | # Annotate the proportional channels' data. Check for user | |
140 | # provided value range violations. Channel numbers are in | |
141 | # the 1..18 range (1-based). | |
142 | upto += 16 | |
143 | want = 11 | |
144 | while self.sent_fields < upto: | |
145 | if len(self.bits_accum) < want: | |
146 | return | |
147 | ss, es, bits = self.get_ss_es_bits(want) | |
148 | value = bitpack_lsb(bits) | |
149 | text = ['{:d}'.format(value)] | |
150 | self.putg(ss, es, [Ann.PROPORTIONAL, text]) | |
151 | if value < self.options['prop_val_min']: | |
152 | text = ['Low proportional value', 'Low value', 'Low'] | |
153 | self.putg(ss, es, [Ann.WARN, text]) | |
154 | if value > self.options['prop_val_max']: | |
155 | text = ['High proportional value', 'High value', 'High'] | |
156 | self.putg(ss, es, [Ann.WARN, text]) | |
157 | idx = self.sent_fields - (upto - 16) | |
158 | ch_nr = 1 + idx | |
159 | self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)]) | |
160 | self.sent_fields += 1 | |
161 | # Annotate the digital channels' data. | |
162 | upto += 2 | |
163 | want = 1 | |
164 | while self.sent_fields < upto: | |
165 | if len(self.bits_accum) < want: | |
166 | return | |
167 | ss, es, bits = self.get_ss_es_bits(want) | |
168 | value = bitpack_lsb(bits) | |
169 | text = ['{:d}'.format(value)] | |
170 | self.putg(ss, es, [Ann.DIGITAL, text]) | |
171 | idx = self.sent_fields - (upto - 2) | |
172 | ch_nr = 17 + idx | |
173 | self.putpy(ss, es, ['DIGITAL', (ch_nr, value)]) | |
174 | self.sent_fields += 1 | |
175 | # Annotate the flags' state. Index starts from LSB. | |
176 | flag_names = ['framelost', 'failsafe', 'msb'] | |
177 | upto += 2 | |
178 | want = 1 | |
179 | while self.sent_fields < upto: | |
180 | if len(self.bits_accum) < want: | |
181 | return | |
182 | ss, es, bits = self.get_ss_es_bits(want) | |
183 | value = bitpack_lsb(bits) | |
184 | text = ['{:d}'.format(value)] | |
185 | idx = self.sent_fields - (upto - 2) | |
186 | cls = Ann.FLAG_LSB + idx | |
187 | self.putg(ss, es, [cls, text]) | |
188 | flg_name = flag_names[idx] | |
189 | self.putpy(ss, es, ['FLAG', (flg_name, value)]) | |
190 | self.sent_fields += 1 | |
191 | # Warn when flags' padding (bits [7:4]) is unexpexted. | |
192 | upto += 1 | |
193 | want = 4 | |
194 | while self.sent_fields < upto: | |
195 | if len(self.bits_accum) < want: | |
196 | return | |
197 | ss, es, bits = self.get_ss_es_bits(want) | |
198 | value = bitpack_lsb(bits) | |
199 | if value != 0x0: | |
200 | text = ['Unexpected MSB flags', 'Flags'] | |
201 | self.putg(ss, es, [Ann.WARN, text]) | |
202 | flg_name = flag_names[-1] | |
203 | self.putpy(ss, es, ['FLAG', (flg_name, value)]) | |
204 | self.sent_fields += 1 | |
205 | # Annotate the footer byte. Warn when unexpected. | |
206 | upto += 1 | |
207 | want = 8 | |
208 | while self.sent_fields < upto: | |
209 | if len(self.bits_accum) < want: | |
210 | return | |
211 | ss, es, bits = self.get_ss_es_bits(want) | |
212 | value = bitpack_lsb(bits) | |
213 | text = ['0x{:02x}'.format(value)] | |
214 | self.putg(ss, es, [Ann.FOOTER, text]) | |
215 | if value != 0x00: | |
216 | text = ['Unexpected footer', 'Footer'] | |
217 | self.putg(ss, es, [Ann.WARN, text]) | |
218 | self.putpy(ss, es, ['FOOTER', value]) | |
219 | self.sent_fields += 1 | |
220 | # Check for the completion of an SBUS message. Warn when more | |
221 | # UART data is seen after the message. Defer the warning until | |
222 | # more bits were collected, flush at next IDLE or BREAK, which | |
223 | # spans all unprocessed data, and improves perception. | |
224 | if self.sent_fields >= upto: | |
225 | self.msg_complete = True | |
226 | if self.msg_complete and self.bits_accum: | |
227 | self.failed = ['Excess data bits', 'Excess'] | |
228 | ||
229 | def handle_bits(self, ss, es, bits): | |
230 | # UART data bits were seen. Store them, validity is yet unknown. | |
231 | self.bits_accum.extend(bits) | |
232 | ||
233 | def handle_frame(self, ss, es, value, valid): | |
234 | # A UART frame became complete. Get its validity. Process its bits. | |
235 | if not valid: | |
236 | self.failed = ['Invalid data', 'Invalid'] | |
237 | self.flush_accum_bits() | |
238 | ||
239 | def handle_idle(self, ss, es): | |
240 | # An IDLE period was seen in the UART level. Flush, reset state. | |
241 | if self.bits_accum and not self.failed: | |
242 | self.failed = ['Unprocessed data bits', 'Unprocessed'] | |
243 | if self.bits_accum and self.failed: | |
244 | ss, es, _ = self.get_ss_es_bits(None) | |
245 | self.putg(ss, es, [Ann.WARN, self.failed]) | |
246 | self.reset() | |
247 | ||
248 | def handle_break(self, ss, es): | |
249 | # A BREAK period was seen in the UART level. Warn, reset state. | |
250 | break_ss, break_es = ss, es | |
251 | if not self.failed: | |
252 | self.failed = ['BREAK condition', 'Break'] | |
253 | # Re-use logic for "annotated bits warning". | |
254 | self.handle_idle(None, None) | |
255 | # Unconditionally annotate BREAK as warning. | |
256 | text = ['BREAK condition', 'Break'] | |
257 | self.putg(ss, es, [Ann.WARN, text]) | |
258 | self.reset() | |
259 | ||
260 | def decode(self, ss, es, data): | |
261 | # Implementor's note: Expects DATA bits to arrive before FRAME | |
262 | # validity. Either of IDLE or BREAK terminates an SBUS message. | |
263 | ptype, rxtx, pdata = data | |
264 | if ptype == 'DATA': | |
265 | _, bits = pdata | |
266 | self.handle_bits(ss, es, bits) | |
267 | elif ptype == 'FRAME': | |
268 | value, valid = pdata | |
269 | self.handle_frame(ss, es, value, valid) | |
270 | elif ptype == 'IDLE': | |
271 | self.handle_idle(ss, es) | |
272 | elif ptype == 'BREAK': | |
273 | self.handle_break(ss, es) |