]> sigrok.org Git - libsigrokdecode.git/blob - decoders/qi/pd.py
Use consistent __init__() format across all PDs.
[libsigrokdecode.git] / decoders / qi / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2015 Josef Gajdusek <atx@atx.name>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, write to the Free Software
18 ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19 ##
20
21 import sigrokdecode as srd
22 import operator
23 import collections
24 from functools import reduce
25
26 end_codes = (
27     'Unknown',
28     'Charge Complete',
29     'Internal Fault',
30     'Over Temperature',
31     'Over Voltage',
32     'Over Current',
33     'Battery Failure',
34     'Reconfigure',
35     'No Response',
36 )
37
38 class SamplerateError(Exception):
39     pass
40
41 def calc_checksum(packet):
42     return reduce(operator.xor, packet[:-1])
43
44 def bits_to_uint(bits):
45     # LSB first
46     return reduce(lambda i, v: (i >> 1) | (v << (len(bits) - 1)), bits, 0)
47
48 class Decoder(srd.Decoder):
49     api_version = 2
50     id = 'qi'
51     name = 'Qi'
52     longname = 'Qi charger protocol'
53     desc = 'Protocol used by Qi receiver'
54     license = 'gplv2+'
55     inputs = ['logic']
56     outputs = ['qi']
57     channels = (
58         {'id': 'qi', 'name': 'Qi', 'desc': 'Demodulated Qi data line'},
59     )
60     annotations = (
61         ('bits', 'Bits'),
62         ('bytes-errors', 'Bit errors'),
63         ('bytes-start', 'Start bits'),
64         ('bytes-info', 'Info bits'),
65         ('bytes-data', 'Data bytes'),
66         ('packets-data', 'Packet data'),
67         ('packets-checksum-ok', 'Packet checksum'),
68         ('packets-checksum-err', 'Packet checksum'),
69     )
70     annotation_rows = (
71         ('bits', 'Bits', (0,)),
72         ('bytes', 'Bytes', (1, 2, 3, 4)),
73         ('packets', 'Packets', (5, 6, 7)),
74     )
75
76     def __init__(self):
77         self.samplerate = None
78         self.reset_variables()
79
80     def reset_variables(self):
81         self.counter = 0
82         self.prev = None
83         self.state = 'IDLE'
84         self.lastbit = 0
85         self.bytestart = 0
86         self.deq = collections.deque(maxlen = 2)
87         self.bits = []
88         self.bitsi = [0]
89         self.bytesi = []
90         self.packet = []
91
92     def metadata(self, key, value):
93         if key == srd.SRD_CONF_SAMPLERATE:
94             self.samplerate = value
95             self.bit_width = float(self.samplerate) / 2e3
96
97     def start(self):
98         self.out_ann = self.register(srd.OUTPUT_ANN)
99         self.reset_variables()
100
101     def packet_len(self, byte):
102         if 0x00 <= byte <= 0x1f:
103             return int(1 + (byte - 0) / 32)
104         if 0x20 <= byte <= 0x7f:
105             return int(2 + (byte - 32) / 16)
106         if 0x80 <= byte <= 0xdf:
107             return int(8 + (byte - 128) / 8)
108         if 0xe0 <= byte <= 0xff:
109             return int(20 + (byte - 224) / 4)
110
111     def in_tolerance(self, l):
112         return (0.75 * self.bit_width) < l < (1.25 * self.bit_width)
113
114     def putp(self, data):
115         self.put(self.bytesi[0], self.bytesi[-1], self.out_ann, [5, data])
116
117     def process_packet(self):
118         if self.packet[0] == 0x01: # Signal Strength
119             self.putp(['Signal Strength: %d' % self.packet[1],
120                        'SS: %d' % self.packet[1], 'SS'])
121         elif self.packet[0] == 0x02: # End Power Transfer
122             reason = end_codes[self.packet[1]] if self.packet[1] < len(end_codes) else 'Reserved'
123             self.putp(['End Power Transfer: %s' % reason,
124                        'EPT: %s' % reason, 'EPT'])
125         elif self.packet[0] == 0x03: # Control Error
126             val = self.packet[1] if self.packet[1] < 128 else (self.packet[1] & 0x7f) - 128
127             self.putp(['Control Error: %d' % val, 'CE: %d' % val, 'CE'])
128         elif self.packet[0] == 0x04: # Received Power
129             self.putp(['Received Power: %d' % self.packet[1],
130                        'RP: %d' % self.packet[1], 'RP'])
131         elif self.packet[0] == 0x05: # Charge Status
132             self.putp(['Charge Status: %d' % self.packet[1],
133                        'CS: %d' % self.packet[1], 'CS'])
134         elif self.packet[0] == 0x06: # Power Control Hold-off
135             self.putp(['Power Control Hold-off: %dms' % self.packet[1],
136                        'PCH: %d' % self.packet[1]], 'PCH')
137         elif self.packet[0] == 0x51: # Configuration
138             powerclass = (self.packet[1] & 0xc0) >> 7
139             maxpower = self.packet[1] & 0x3f
140             prop = (self.packet[3] & 0x80) >> 7
141             count = self.packet[3] & 0x07
142             winsize = (self.packet[4] & 0xf8) >> 3
143             winoff = self.packet[4] & 0x07
144             self.putp(['Configuration: Power Class = %d, Maximum Power = %d, Prop = %d,'
145                        'Count = %d, Window Size = %d, Window Offset = %d' %
146                        (powerclass, maxpower, prop, count, winsize, winoff),
147                        'C: PC = %d MP = %d P = %d C = %d WS = %d WO = %d' %
148                        (powerclass, maxpower, prop, count, winsize, winoff),
149                        'Configuration', 'C'])
150         elif self.packet[0] == 0x71: # Identification 
151             version = '%d.%d' % ((self.packet[1] & 0xf0) >> 4, self.packet[1] & 0x0f)
152             mancode = '%02x%02x' % (self.packet[2], self.packet[3])
153             devid = '%02x%02x%02x%02x' % (self.packet[4] & ~0x80,
154                     self.packet[5], self.packet[6], self.packet[7])
155             self.putp(['Identification: Version = %s, Manufacturer = %s, ' \
156                        'Device = %s' % (version, mancode, devid),
157                        'ID: %s %s %s' % (version, mancode, devid), 'ID'])
158         elif self.packet[0] == 0x81: # Extended Identification
159             edevid = '%02x%02x%02x%02x%02x%02x%02x%02x' % self.packet[1:-1]
160             self.putp(['Extended Identification: %s' % edevid,
161                        'EI: %s' % edevid, 'EI'])
162         elif self.packet[0] in (0x18, 0x19, 0x28, 0x29, 0x38, 0x48, 0x58, 0x68,
163                 0x78, 0x85, 0xa4, 0xc4, 0xe2): # Proprietary
164             self.putp(['Proprietary', 'P'])
165         else: # Unknown
166             self.putp(['Unknown', '?'])
167         self.put(self.bytesi[-1], self.samplenum, self.out_ann,
168                  [6, ['Checksum OK', 'OK']] if \
169                  calc_checksum(self.packet) == self.packet[-1]
170                  else [6, ['Checksum error', 'ERR']])
171
172     def process_byte(self):
173         self.put(self.bytestart, self.bitsi[0], self.out_ann,
174                  ([2, ['Start bit', 'Start', 'S']]) if self.bits[0] == 0 else
175                  ([1, ['Start error', 'Start err', 'SE']]))
176         databits = self.bits[1:9]
177         data = bits_to_uint(databits)
178         parity = reduce(lambda i, v: (i + v) % 2, databits, 1)
179         self.put(self.bitsi[0], self.bitsi[8], self.out_ann, [4, ['%02x' % data]])
180         self.put(self.bitsi[8], self.bitsi[9], self.out_ann,
181                  ([3, ['Parity bit', 'Parity', 'P']]) if self.bits[9] == parity else
182                  ([1, ['Parity error', 'Parity err', 'PE']]))
183         self.put(self.bitsi[9], self.bitsi[10], self.out_ann,
184                  ([3, ['Stop bit', 'Stop', 'S']]) if self.bits[10] == 1 else
185                  ([1, ['Stop error', 'Stop err', 'SE']]))
186
187         self.bytesi.append(self.bytestart)
188         self.packet.append(data)
189         if self.packet_len(self.packet[0]) + 2 == len(self.packet):
190             self.process_packet()
191             self.bytesi.clear()
192             self.packet.clear()
193
194     def add_bit(self, bit):
195         self.bits.append(bit)
196         self.bitsi.append(self.samplenum)
197
198         if self.state == 'IDLE' and len(self.bits) >= 5 and \
199                                     self.bits[-5:] == [1, 1, 1, 1, 0]:
200             self.state = 'DATA'
201             self.bytestart = self.bitsi[-2]
202             self.bits = [0]
203             self.bitsi = [self.samplenum]
204             self.packet.clear()
205         elif self.state == 'DATA' and len(self.bits) == 11:
206             self.process_byte()
207             self.bytestart = self.samplenum
208             self.bits.clear()
209             self.bitsi.clear()
210         if self.state != 'IDLE':
211             self.put(self.lastbit, self.samplenum, self.out_ann, [0, ['%d' % bit]])
212         self.lastbit = self.samplenum
213
214     def handle_transition(self, l, htl):
215         self.deq.append(l)
216         if len(self.deq) >= 2 and \
217                 (self.in_tolerance(self.deq[-1] + self.deq[-2]) or \
218                 htl and self.in_tolerance(l * 2) and \
219                 self.deq[-2] > 1.25 * self.bit_width):
220             self.add_bit(1)
221             self.deq.clear()
222         elif self.in_tolerance(l):
223             self.add_bit(0)
224             self.deq.clear()
225         elif l > (1.25 * self.bit_width):
226             self.state = 'IDLE'
227             self.bytesi.clear()
228             self.packet.clear()
229             self.bits.clear()
230             self.bitsi.clear()
231
232     def next_sample(self, s):
233         if s == self.prev:
234             self.counter += 1
235         else:
236             self.handle_transition(self.counter, s == 0)
237             self.prev = s
238             self.counter = 1
239
240     def decode(self, ss, es, data):
241         if not self.samplerate:
242             raise SamplerateError('Cannot decode without samplerate.')
243         for (self.samplenum, (qi,)) in data:
244             self.next_sample(qi)