]> sigrok.org Git - libsigrokdecode.git/blob - decoders/nrf905/pd.py
0e5d808158e2d2c0c0245897b8b8e2b2cd79fb8d
[libsigrokdecode.git] / decoders / nrf905 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2020 Jorge Solla Rubiales <jorgesolla@gmail.com>
5 ##
6 ## Permission is hereby granted, free of charge, to any person obtaining a copy
7 ## of this software and associated documentation files (the "Software"), to deal
8 ## in the Software without restriction, including without limitation the rights
9 ## to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10 ## copies of the Software, and to permit persons to whom the Software is
11 ## furnished to do so, subject to the following conditions:
12 ##
13 ## The above copyright notice and this permission notice shall be included in all
14 ## copies or substantial portions of the Software.
15 ##
16 ## THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 ## IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 ## FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 ## AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 ## LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21 ## OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22 ## SOFTWARE.
23
24 import sigrokdecode as srd
25 from common.srdhelper import SrdIntEnum
26
27 CFG_REGS = {
28     0: [{'name': 'CH_NO', 'stbit': 7, 'nbits': 8}],
29     1: [
30         {'name': 'AUTO_RETRAN', 'stbit': 5, 'nbits': 1,
31          'opts': {0: 'No retransmission', 1: 'Retransmission of data packet'}},
32         {'name': 'RX_RED_PWR', 'stbit': 4, 'nbits': 1,
33          'opts': {0: 'Normal operation', 1: 'Reduced power'}},
34         {'name': 'PA_PWR', 'stbit': 3, 'nbits': 2,
35          'opts': {0: '-10 dBm', 1: '-2 dBm', 2: '+6 dBm', 3: '+10 dBm'}},
36         {'name': 'HFREQ_PLL', 'stbit': 1, 'nbits': 1,
37          'opts': {0: '433 MHz', 1: '868 / 915 MHz'}},
38         {'name': 'CH_NO_8', 'stbit': 0, 'nbits': 1},
39     ],
40     2: [
41         {'name': 'TX_AFW (TX addr width)', 'stbit': 6, 'nbits': 3},
42         {'name': 'RX_AFW (RX addr width)', 'stbit': 2, 'nbits': 3},
43     ],
44     3: [{'name': 'RW_PW (RX payload width)', 'stbit': 5, 'nbits': 6}],
45     4: [{'name': 'TX_PW (TX payload width)', 'stbit': 5, 'nbits': 6}],
46     5: [{'name': 'RX_ADDR_0', 'stbit': 7, 'nbits': 8}],
47     6: [{'name': 'RX_ADDR_1', 'stbit': 7, 'nbits': 8}],
48     7: [{'name': 'RX_ADDR_2', 'stbit': 7, 'nbits': 8}],
49     8: [{'name': 'RX_ADDR_3', 'stbit': 7, 'nbits': 8}],
50     9: [
51         {'name': 'CRC_MODE', 'stbit': 7, 'nbits': 1,
52          'opts': {0: '8 CRC check bit', 1: '16 CRC check bit'}},
53         {'name': 'CRC_EN', 'stbit': 6, 'nbits': 1,
54          'opts': {0: 'Disabled', 1: 'Enabled'}},
55         {'name': 'XOR', 'stbit': 5, 'nbits': 3,
56          'opts': {0: '4 MHz', 1: '8 MHz', 2: '12 MHz',
57                   3: '16 MHz', 4: '20 MHz'}},
58         {'name': 'UP_CLK_EN', 'stbit': 2, 'nbits': 1,
59          'opts': {0: 'No external clock signal avail.',
60                   1: 'External clock signal enabled'}},
61         {'name': 'UP_CLK_FREQ', 'stbit': 1, 'nbits': 2,
62          'opts': {0: '4 MHz', 1: '2 MHz', 2: '1 MHz', 3: '500 kHz'}},
63     ],
64 }
65
66 CHN_CFG = [
67     {'name': 'PA_PWR', 'stbit': 3, 'nbits': 2,
68      'opts': {0: '-10 dBm', 1: '-2 dBm', 2: '+6 dBm', 3: '+10 dBm'}},
69     {'name': 'HFREQ_PLL', 'stbit': 1, 'nbits': 1,
70      'opts': {0: '433 MHz', 1: '868 / 915 MHz'}},
71 ]
72
73 STAT_REG = [
74     {'name': 'AM', 'stbit': 7, 'nbits': 1},
75     {'name': 'DR', 'stbit': 5, 'nbits': 1},
76 ]
77
78 Ann = SrdIntEnum.from_str('Ann', 'CMD REG_WR REG_RD TX RX RESP WARN')
79
80 class Decoder(srd.Decoder):
81     api_version = 3
82     id = 'nrf905'
83     name = 'nRF905'
84     longname = 'Nordic Semiconductor nRF905'
85     desc = '433/868/933MHz transceiver chip.'
86     license = 'mit'
87     inputs = ['spi']
88     outputs = ['nrf905']
89     tags = ['IC', 'Wireless/RF']
90     annotations = (
91         ('cmd', 'Command sent to the device'),
92         ('reg-write', 'Config register written to the device'),
93         ('reg-read', 'Config register read from the device'),
94         ('tx-data', 'Payload sent to the device'),
95         ('rx-data', 'Payload read from the device'),
96         ('resp', 'Response to commands received from the device'),
97         ('warning', 'Warning'),
98     )
99     annotation_rows = (
100         ('commands', 'Commands', (Ann.CMD,)),
101         ('responses', 'Responses', (Ann.RESP,)),
102         ('registers', 'Registers', (Ann.REG_WR, Ann.REG_RD)),
103         ('tx', 'Transmitted data', (Ann.TX,)),
104         ('rx', 'Received data', (Ann.RX,)),
105         ('warnings', 'Warnings', (Ann.WARN,)),
106     )
107
108     def __init__(self):
109         self.ss_cmd, self.es_cmd = 0, 0
110         self.cs_asserted = False
111         self.reset()
112
113     def reset(self):
114         self.mosi_bytes, self.miso_bytes = [], []
115         self.cmd_samples = {'ss': 0, 'es': 0}
116
117     def start(self):
118         self.out_ann = self.register(srd.OUTPUT_ANN)
119
120     def extract_bits(self, byte, start_bit, num_bits):
121         begin = 7 - start_bit
122         end = begin + num_bits
123
124         if begin < 0 or end > 8:
125             return 0
126
127         binary = format(byte, '08b')[begin:end]
128         return int(binary, 2)
129
130     def extract_vars(self, reg_vars, reg_value):
131         # Iterate all vars on current register.
132         data = ''
133         for var in reg_vars:
134             var_value = self.extract_bits(reg_value, var['stbit'],
135                                           var['nbits'])
136             data += var['name'] + ' = ' + str(var_value)
137             opt = ''
138
139             # If var has options, just add the option meaning.
140             if 'opts' in var:
141                 if var_value in var['opts']:
142                     opt = var['opts'][var_value]
143                 else:
144                     opt = 'unknown'
145                 data += ' (' + opt + ')'
146
147             # Add var separator.
148             if reg_vars.index(var) != len(reg_vars) - 1:
149                 data = data + ' | '
150         return data
151
152     def parse_config_register(self, addr, value, is_write):
153         reg_value = value[0]
154         data = 'CFG_REG[' + hex(addr) + '] -> '
155
156         # Get register vars for this register.
157         if addr in CFG_REGS:
158             reg_vars = CFG_REGS[addr]
159         else:
160             # Invalid register address.
161             self.put(value[1], value[2],
162                      self.out_ann, [Ann.WARN, ['Invalid reg. addr']])
163             return
164
165         data += self.extract_vars(reg_vars, reg_value)
166
167         ann = Ann.REG_WR if is_write else Ann.REG_RD
168         self.put(value[1], value[2], self.out_ann, [ann, [data]])
169
170     def parse_config_registers(self, addr, registers, is_write):
171         i = 0
172         while i < len(registers):
173             reg_addr = i + addr
174             if reg_addr <= 9:
175                 self.parse_config_register(reg_addr, registers[i], is_write)
176             else:
177                 print('INVALID REGISTER ADDR ' + hex(reg_addr))
178             i += 1
179
180     def dump_cmd_bytes(self, prefix, cmd_bytes, ann):
181         ss = cmd_bytes[1][1]
182         es = 0
183         data = ''
184         for byte in cmd_bytes[1:]:
185             data += '0x' + format(byte[0], '02x') + ' '
186             es = byte[2]
187
188         self.put(ss, es, self.out_ann, [ann, [prefix + data]])
189
190     def handle_WC(self):
191         start_addr = self.mosi_bytes[0][0] & 0x0F
192
193         if start_addr > 9:
194             print('ERROR: WRONG OFFSET')
195             return
196
197         self.parse_config_registers(start_addr, self.mosi_bytes[1:], True)
198
199     def handle_RC(self):
200         start_addr = self.mosi_bytes[0][0] & 0x0F
201
202         if start_addr > 9:
203             print('ERROR: WRONG OFFSET')
204             return
205         self.parse_config_registers(start_addr, self.miso_bytes[1:], False)
206
207     def handle_WTP(self):
208         self.dump_cmd_bytes('Write TX payload.: ', self.mosi_bytes, Ann.TX)
209
210     def handle_RTP(self):
211         self.dump_cmd_bytes('Read TX payload: ', self.miso_bytes, Ann.RESP)
212
213     def handle_WTA(self):
214         self.dump_cmd_bytes('Write TX addr: ', self.mosi_bytes, Ann.REG_WR)
215
216     def handle_RTA(self):
217         self.dump_cmd_bytes('Read TX addr: ', self.miso_bytes, Ann.RESP)
218
219     def handle_RRP(self):
220         self.dump_cmd_bytes('Read RX payload: ', self.miso_bytes, Ann.RX)
221
222     def handle_CC(self):
223         cmd = self.mosi_bytes[0]
224         dta = self.mosi_bytes[1]
225
226         channel = (cmd[0] & 0x01) << 8
227         channel = channel + dta[0]
228
229         data = self.extract_vars(CHN_CFG, cmd[0])
230
231         data = data + '| CHN = ' + str(channel)
232         self.put(self.mosi_bytes[0][1], self.mosi_bytes[1][2],
233                  self.out_ann, [Ann.REG_WR, [data]])
234
235     def handle_STAT(self):
236         status = 'STAT = ' + self.extract_vars(STAT_REG, self.miso_bytes[0][0])
237         self.put(self.miso_bytes[0][1], self.miso_bytes[0][2],
238                  self.out_ann, [Ann.REG_RD, [status]])
239
240     def process_cmd(self):
241         cmd = ''
242         cmd_name = ''
243         cmd_hnd = None
244
245         for byte in self.mosi_bytes:
246             cmd += hex(byte[0]) + ' '
247
248         cmd = self.mosi_bytes[0][0]
249
250         if (cmd & 0xF0) == 0x00:
251             cmd_name = 'CMD: W_CONFIG (WC)'
252             cmd_hnd = self.handle_WC
253
254         elif (cmd & 0xF0) == 0x10:
255             cmd_name = 'CMD: R_CONFIG (RC)'
256             cmd_hnd = self.handle_RC
257
258         elif cmd == 0x20:
259             cmd_name = 'CMD: W_TX_PAYLOAD (WTP)'
260             cmd_hnd = self.handle_WTP
261
262         elif cmd == 0x21:
263             cmd_name = 'CMD: R_TX_PAYLOAD (RTP)'
264             cmd_hnd = self.handle_RTP
265
266         elif cmd == 0x22:
267             cmd_name = 'CMD: W_TX_ADDRESS (WTA)'
268             cmd_hnd = self.handle_WTA
269
270         elif cmd == 0x23:
271             cmd_name = 'CMD: R_TX_ADDRESS (RTA)'
272             cmd_hnd = self.handle_RTA
273
274         elif cmd == 0x24:
275             cmd_name = 'CMD: R_RX_PAYLOAD (RRP)'
276             cmd_hnd = self.handle_RRP
277
278         elif (cmd & 0xF0 == 0x80):
279             cmd_name = 'CMD: CHANNEL_CONFIG (CC)'
280             cmd_hnd = self.handle_CC
281
282         # Report command name.
283         self.put(self.cmd_samples['ss'], self.cmd_samples['es'],
284                  self.out_ann, [Ann.CMD, [cmd_name]])
285
286         # Handle status byte.
287         self.handle_STAT()
288
289         # Handle command.
290         if cmd_hnd is not None:
291             cmd_hnd()
292
293     def set_cs_status(self, sample, asserted):
294         if self.cs_asserted == asserted:
295             return
296
297         if asserted:
298             self.cmd_samples['ss'] = sample
299             self.cmd_samples['es'] = -1
300         else:
301             self.cmd_samples['es'] = sample
302
303         self.cs_asserted = asserted
304
305     def decode(self, ss, es, data):
306         ptype, data1, data2 = data
307
308         if ptype == 'CS-CHANGE':
309             if data1 is None and data2 is None:
310                 self.requirements_met = False
311                 raise ChannelError('CS# pin required.')
312
313             if data1 is None and data2 == 0:
314                 self.set_cs_status(ss, True)
315
316             elif data1 is None and data2 == 1:
317                 self.set_cs_status(ss, False)
318
319             elif data1 == 1 and data2 == 0:
320                 self.set_cs_status(ss, True)
321
322             elif data1 == 0 and data2 == 1:
323                 self.set_cs_status(ss, False)
324                 if len(self.mosi_bytes):
325                     self.process_cmd()
326                     self.reset()
327
328         elif ptype == 'DATA':
329             # Ignore traffic if CS is not asserted.
330             if self.cs_asserted is False:
331                 return
332
333             mosi, miso = data1, data2
334             if miso is None or mosi is None:
335                 raise ChannelError('Both MISO and MOSI pins required.')
336
337             self.mosi_bytes.append((mosi, ss, es))
338             self.miso_bytes.append((miso, ss, es))