]> sigrok.org Git - libsigrokdecode.git/blob - decoders/nrf905/pd.py
Add decoder for Nordic Semiconductor nRF905 chip
[libsigrokdecode.git] / decoders / nrf905 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2020 Jorge Solla Rubiales <jorgesolla@gmail.com>
5 ##
6 ## Permission is hereby granted, free of charge, to any person obtaining a copy
7 ## of this software and associated documentation files (the "Software"), to deal
8 ## in the Software without restriction, including without limitation the rights
9 ## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 ## copies of the Software, and to permit persons to whom the Software is
11 ## furnished to do so, subject to the following conditions:
12 ##
13 ## The above copyright notice and this permission notice shall be included in all
14 ## copies or substantial portions of the Software.
15 ##
16 ## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 ## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 ## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 ## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 ## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 ## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 ## SOFTWARE.
23
24 import sigrokdecode as srd
25
26 CFG_REGS = {
27     0: [{'name': 'CH_NO', 'stbit': 7, 'nbits': 8}],
28     1: [
29         {'name': 'AUTO_RETRAN', 'stbit': 5, 'nbits': 1,
30          'opts': {0: 'No retransmission', 1: 'Retransmission of data packet'}},
31         {'name': 'RX_RED_PWR', 'stbit': 4, 'nbits': 1,
32          'opts': {0: 'Normal operation', 1: 'Reduced power'}},
33         {'name': 'PA_PWR', 'stbit': 3, 'nbits': 2,
34          'opts': {0: '-10 dBm', 1: '-2 dBm', 2: '+6 dBm', 3: '+10 dBm'}},
35         {'name': 'HFREQ_PLL', 'stbit': 1, 'nbits': 1,
36          'opts': {0: '433 MHz', 1: '868 / 915 MHz'}},
37         {'name': 'CH_NO_8', 'stbit': 0, 'nbits': 1},
38     ],
39     2: [
40         {'name': 'TX_AFW (TX addr width)', 'stbit': 6, 'nbits': 3},
41         {'name': 'RX_AFW (RX addr width)', 'stbit': 2, 'nbits': 3},
42     ],
43     3: [{'name': 'RW_PW (RX payload width)', 'stbit': 5, 'nbits': 6}],
44     4: [{'name': 'TX_PW (TX payload width)', 'stbit': 5, 'nbits': 6}],
45     5: [{'name': 'RX_ADDR_0', 'stbit': 7, 'nbits': 8}],
46     6: [{'name': 'RX_ADDR_1', 'stbit': 7, 'nbits': 8}],
47     7: [{'name': 'RX_ADDR_2', 'stbit': 7, 'nbits': 8}],
48     8: [{'name': 'RX_ADDR_3', 'stbit': 7, 'nbits': 8}],
49     9: [
50         {'name': 'CRC_MODE', 'stbit': 7, 'nbits': 1,
51          'opts': {0: '8 CRC check bit', 1: '16 CRC check bit'}},
52         {'name': 'CRC_EN', 'stbit': 6, 'nbits': 1,
53          'opts': {0: 'Disabled', 1: 'Enabled'}},
54         {'name': 'XOR', 'stbit': 5, 'nbits': 3,
55          'opts': {0: '4 MHz', 1: '8 MHz', 2: '12 MHz',
56                   3: '16 MHz', 4: '20 MHz'}},
57         {'name': 'UP_CLK_EN', 'stbit': 2, 'nbits': 1,
58          'opts': {0: 'No external clock signal avail.',
59                   1: 'External clock signal enabled'}},
60         {'name': 'UP_CLK_FREQ', 'stbit': 1, 'nbits': 2,
61          'opts': {0: '4 MHz', 1: '2 MHz', 2: '1 MHz', 3: '500 kHz'}},
62     ],
63 }
64
65 CHN_CFG = [
66     {'name': 'PA_PWR', 'stbit': 3, 'nbits': 2,
67      'opts': {0: '-10 dBm', 1: '-2 dBm', 2: '+6 dBm', 3: '+10 dBm'}},
68     {'name': 'HFREQ_PLL', 'stbit': 1, 'nbits': 1,
69      'opts': {0: '433 MHz', 1: '868 / 915 MHz'}},
70 ]
71
72 STAT_REG = [
73     {'name': 'AM', 'stbit': 7, 'nbits': 1},
74     {'name': 'DR', 'stbit': 5, 'nbits': 1},
75 ]
76
77 class Decoder(srd.Decoder):
78     api_version = 3
79     id = 'nrf905'
80     name = 'nRF905'
81     longname = 'Nordic Semiconductor nRF905'
82     desc = '433/868/933MHz transceiver chip.'
83     license = 'mit'
84     inputs = ['spi']
85     outputs = ['nrf905']
86
87     annotations = (
88         ('cmd', 'Command sent to the device'),
89         ('reg-write', 'Config register written to the device'),
90         ('reg-read', 'Config register read from the device'),
91         ('tx-data', 'Payload sent to the device'),
92         ('rx-data', 'Payload read from the device'),
93         ('resp', 'Response to commands received from the device'),
94         ('warning', 'Warning')
95     )
96
97     ann_cmd = 0
98     ann_reg_wr = 1
99     ann_reg_rd = 2
100     ann_tx = 3
101     ann_rx = 4
102     ann_resp = 5
103     ann_warn = 6
104
105     annotation_rows = (
106         ('commands', 'Commands', (ann_cmd,)),
107         ('responses', 'Responses', (ann_resp,)),
108         ('registers', 'Registers', (ann_reg_wr, ann_reg_rd)),
109         ('tx', 'Transmitted data', (ann_tx,)),
110         ('rx', 'Received data', (ann_rx,)),
111         ('warnings', 'Warnings', (ann_warn,))
112     )
113
114     def reset_data(self):
115         self.mosi_bytes, self.miso_bytes = [], []
116         self.cmd_samples = {'ss': 0, 'es': 0}
117
118     def __init__(self):
119         self.ss_cmd, self.es_cmd = 0, 0
120         self.cs_asserted = False
121         self.reset_data()
122
123     def start(self):
124         self.out_ann = self.register(srd.OUTPUT_ANN)
125
126     def extract_bits(self, byte, start_bit, num_bits):
127         begin = 7 - start_bit
128         end = begin + num_bits
129
130         if begin < 0 or end > 8:
131             return 0
132
133         binary = format(byte, '08b')[begin:end]
134         return int(binary, 2)
135
136     def extract_vars(self, reg_vars, reg_value):
137         # Iterate all vars on current register.
138         data = ''
139         for var in reg_vars:
140             var_value = self.extract_bits(reg_value, var['stbit'],
141                                           var['nbits'])
142             data += var['name'] + ' = ' + str(var_value)
143             opt = ''
144
145             # If var has options, just add the option meaning.
146             if 'opts' in var:
147                 if var_value in var['opts']:
148                     opt = var['opts'][var_value]
149                 else:
150                     opt = 'unknown'
151                 data += ' (' + opt + ')'
152
153             # Add var separator.
154             if reg_vars.index(var) != len(reg_vars) - 1:
155                 data = data + ' | '
156         return data
157
158     def parse_config_register(self, addr, value, is_write):
159         reg_value = value[0]
160         data = 'CFG_REG[' + hex(addr) + '] -> '
161
162         # Get register vars for this register.
163         if addr in CFG_REGS:
164             reg_vars = CFG_REGS[addr]
165         else:
166             # Invalid register address.
167             self.put(value[1], value[2],
168                      self.out_ann, [self.ann_warn, ['Invalid reg. addr']])
169             return
170
171         data += self.extract_vars(reg_vars, reg_value)
172
173         if is_write:
174             ann = self.ann_reg_wr
175         else:
176             ann = self.ann_reg_rd
177
178         self.put(value[1], value[2], self.out_ann, [ann, [data]])
179
180     def parse_config_registers(self, addr, registers, is_write):
181         i = 0
182         while i < len(registers):
183             reg_addr = i + addr
184             if reg_addr <= 9:
185                 self.parse_config_register(reg_addr, registers[i], is_write)
186             else:
187                 print('INVALID REGISTER ADDR ' + hex(reg_addr))
188             i += 1
189
190     def dump_cmd_bytes(self, prefix, cmd_bytes, ann):
191         ss = cmd_bytes[1][1]
192         es = 0
193         data = ''
194         for byte in cmd_bytes[1:]:
195             data += '0x' + format(byte[0], '02x') + ' '
196             es = byte[2]
197
198         self.put(ss, es, self.out_ann, [ann, [prefix + data]])
199
200     def handle_WC(self):
201         start_addr = self.mosi_bytes[0][0] & 0x0F
202
203         if start_addr > 9:
204             print('ERROR: WRONG OFFSET')
205             return
206
207         self.parse_config_registers(start_addr, self.mosi_bytes[1:], True)
208
209     def handle_RC(self):
210         start_addr = self.mosi_bytes[0][0] & 0x0F
211
212         if start_addr > 9:
213             print('ERROR: WRONG OFFSET')
214             return
215         self.parse_config_registers(start_addr, self.miso_bytes[1:], False)
216
217     def handle_WTP(self):
218         self.dump_cmd_bytes('Write TX payload.: ',
219                             self.mosi_bytes, self.ann_tx)
220
221     def handle_RTP(self):
222         self.dump_cmd_bytes('Read TX payload: ',
223                             self.miso_bytes, self.ann_resp)
224
225     def handle_WTA(self):
226         self.dump_cmd_bytes('Write TX addr: ',
227                             self.mosi_bytes, self.ann_reg_wr)
228
229     def handle_RTA(self):
230         self.dump_cmd_bytes('Read TX addr: ',
231                             self.miso_bytes, self.ann_resp)
232
233     def handle_RRP(self):
234         self.dump_cmd_bytes('Read RX payload: ',
235                             self.miso_bytes, self.ann_rx)
236
237     def handle_CC(self):
238         cmd = self.mosi_bytes[0]
239         dta = self.mosi_bytes[1]
240
241         channel = (cmd[0] & 0x01) << 8
242         channel = channel + dta[0]
243
244         data = self.extract_vars(CHN_CFG, cmd[0])
245
246         data = data + '| CHN = ' + str(channel)
247         self.put(self.mosi_bytes[0][1], self.mosi_bytes[1][2],
248                  self.out_ann, [self.ann_reg_wr, [data]])
249
250     def handle_STAT(self):
251         status = 'STAT = ' + self.extract_vars(STAT_REG, self.miso_bytes[0][0])
252         self.put(self.miso_bytes[0][1], self.miso_bytes[0][2],
253                  self.out_ann, [self.ann_reg_rd, [status]])
254
255     def process_cmd(self):
256         cmd = ''
257         cmd_name = ''
258         cmd_hnd = None
259
260         for byte in self.mosi_bytes:
261             cmd += hex(byte[0]) + ' '
262
263         cmd = self.mosi_bytes[0][0]
264
265         if (cmd & 0xF0) == 0x00:
266             cmd_name = 'CMD: W_CONFIG (WC)'
267             cmd_hnd = self.handle_WC
268
269         elif (cmd & 0xF0) == 0x10:
270             cmd_name = 'CMD: R_CONFIG (RC)'
271             cmd_hnd = self.handle_RC
272
273         elif cmd == 0x20:
274             cmd_name = 'CMD: W_TX_PAYLOAD (WTP)'
275             cmd_hnd = self.handle_WTP
276
277         elif cmd == 0x21:
278             cmd_name = 'CMD: R_TX_PAYLOAD (RTP)'
279             cmd_hnd = self.handle_RTP
280
281         elif cmd == 0x22:
282             cmd_name = 'CMD: W_TX_ADDRESS (WTA)'
283             cmd_hnd = self.handle_WTA
284
285         elif cmd == 0x23:
286             cmd_name = 'CMD: R_TX_ADDRESS (RTA)'
287             cmd_hnd = self.handle_RTA
288
289         elif cmd == 0x24:
290             cmd_name = 'CMD: R_RX_PAYLOAD (RRP)'
291             cmd_hnd = self.handle_RRP
292
293         elif (cmd & 0xF0 == 0x80):
294             cmd_name = 'CMD: CHANNEL_CONFIG (CC)'
295             cmd_hnd = self.handle_CC
296
297         # Report command name.
298         self.put(self.cmd_samples['ss'], self.cmd_samples['es'],
299                  self.out_ann, [self.ann_cmd, [cmd_name]])
300
301         # Handle status byte.
302         self.handle_STAT()
303
304         # Handle command.
305         if cmd_hnd is not None:
306             cmd_hnd()
307
308     def set_cs_status(self, sample, asserted):
309         if self.cs_asserted == asserted:
310             return
311
312         if asserted:
313             self.cmd_samples['ss'] = sample
314             self.cmd_samples['es'] = -1
315         else:
316             self.cmd_samples['es'] = sample
317
318         self.cs_asserted = asserted
319
320     def decode(self, ss, es, data):
321         ptype, data1, data2 = data
322
323         if ptype == 'CS-CHANGE':
324             if data1 is None and data2 is None:
325                 self.requirements_met = False
326                 raise ChannelError('CS# pin required.')
327
328             if data1 is None and data2 == 0:
329                 self.set_cs_status(ss, True)
330
331             elif data1 is None and data2 == 1:
332                 self.set_cs_status(ss, False)
333
334             elif data1 == 1 and data2 == 0:
335                 self.set_cs_status(ss, True)
336
337             elif data1 == 0 and data2 == 1:
338                 self.set_cs_status(ss, False)
339                 if len(self.mosi_bytes):
340                     self.process_cmd()
341                     self.reset_data()
342
343         elif ptype == 'DATA':
344             # Ignore traffic if CS is not asserted.
345             if self.cs_asserted is False:
346                 return
347
348             mosi, miso = data1, data2
349             if miso is None or mosi is None:
350                 raise ChannelError('Both MISO and MOSI pins required.')
351
352             self.mosi_bytes.append((mosi, ss, es))
353             self.miso_bytes.append((miso, ss, es))