]> sigrok.org Git - libsigrokdecode.git/blob - decoders/dmx512/pd.py
Add a CFP decoder.
[libsigrokdecode.git] / decoders / dmx512 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class Decoder(srd.Decoder):
23     api_version = 3
24     id = 'dmx512'
25     name = 'DMX512'
26     longname = 'Digital MultipleX 512'
27     desc = 'Professional lighting control protocol.'
28     license = 'gplv2+'
29     inputs = ['logic']
30     outputs = ['dmx512']
31     channels = (
32         {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
33     )
34     annotations = (
35         ('bit', 'Bit'),
36         ('break', 'Break'),
37         ('mab', 'Mark after break'),
38         ('startbit', 'Start bit'),
39         ('stopbits', 'Stop bit'),
40         ('startcode', 'Start code'),
41         ('channel', 'Channel'),
42         ('interframe', 'Interframe'),
43         ('interpacket', 'Interpacket'),
44         ('data', 'Data'),
45         ('error', 'Error'),
46     )
47     annotation_rows = (
48         ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
49         ('data', 'Data', (9,)),
50         ('bits', 'Bits', (0, 3, 4)),
51         ('errors', 'Errors', (10,)),
52     )
53
54     def __init__(self):
55         self.reset()
56
57     def reset(self):
58         self.samplerate = None
59         self.sample_usec = None
60         self.run_start = -1
61         self.run_bit = 0
62         self.state = 'FIND BREAK'
63
64     def start(self):
65         self.out_ann = self.register(srd.OUTPUT_ANN)
66
67     def metadata(self, key, value):
68         if key == srd.SRD_CONF_SAMPLERATE:
69             self.samplerate = value
70             self.sample_usec = 1 / value * 1000000
71             self.skip_per_bit = int(4 / self.sample_usec)
72
73     def putr(self, data):
74         self.put(self.run_start, self.samplenum, self.out_ann, data)
75
76     def decode(self):
77         if not self.samplerate:
78             raise SamplerateError('Cannot decode without samplerate.')
79         while True:
80             # Seek for an interval with no state change with a length between
81             # 88 and 1000000 us (BREAK).
82             if self.state == 'FIND BREAK':
83                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
84                 runlen = (self.samplenum - self.run_start) * self.sample_usec
85                 if runlen > 88 and runlen < 1000000:
86                     self.putr([1, ['Break']])
87                     self.bit_break = self.run_bit
88                     self.state = 'MARK MAB'
89                     self.channel = 0
90                 elif runlen >= 1000000:
91                     # Error condition.
92                     self.putr([10, ['Invalid break length']])
93                 self.run_bit = dmx
94                 self.run_start = self.samplenum
95             # Directly following the BREAK is the MARK AFTER BREAK.
96             elif self.state == 'MARK MAB':
97                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
98                 self.putr([2, ['MAB']])
99                 self.state = 'READ BYTE'
100                 self.channel = 0
101                 self.bit = 0
102                 self.aggreg = dmx
103                 self.run_start = self.samplenum
104             # Mark and read a single transmitted byte
105             # (start bit, 8 data bits, 2 stop bits).
106             elif self.state == 'READ BYTE':
107                 (dmx,) = self.wait()
108                 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
109                 self.aggreg += dmx
110                 if self.samplenum != self.next_sample:
111                     continue
112                 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
113
114                 if self.bit == 0:
115                     self.byte = 0
116                     self.putr([3, ['Start bit']])
117                     if bit_value != 0:
118                         # (Possibly) invalid start bit, mark but don't fail.
119                         self.put(self.samplenum, self.samplenum,
120                                  self.out_ann, [10, ['Invalid start bit']])
121                 elif self.bit >= 9:
122                     self.put(self.samplenum - self.skip_per_bit,
123                         self.samplenum, self.out_ann, [4, ['Stop bit']])
124                     if bit_value != 1:
125                         # Invalid stop bit, mark.
126                         self.put(self.samplenum, self.samplenum,
127                             self.out_ann, [10, ['Invalid stop bit']])
128                         if self.bit == 10:
129                             # On invalid 2nd stop bit, search for new break.
130                             self.run_bit = dmx
131                             self.state = 'FIND BREAK'
132                 else:
133                     # Label and process one bit.
134                     self.put(self.samplenum - self.skip_per_bit,
135                         self.samplenum, self.out_ann, [0, [str(bit_value)]])
136                     self.byte |= bit_value << (self.bit - 1)
137
138                 # Label a complete byte.
139                 if self.bit == 10:
140                     if self.channel == 0:
141                         d = [5, ['Start code']]
142                     else:
143                         d = [6, ['Channel ' + str(self.channel)]]
144                     self.put(self.run_start, self.next_sample, self.out_ann, d)
145                     self.put(self.run_start + self.skip_per_bit,
146                         self.next_sample - 2 * self.skip_per_bit,
147                         self.out_ann, [9, [str(self.byte) + ' / ' + \
148                         str(hex(self.byte))]])
149                     # Continue by scanning the IFT.
150                     self.channel += 1
151                     self.run_start = self.samplenum
152                     self.run_bit = dmx
153                     self.state = 'MARK IFT'
154
155                 self.aggreg = dmx
156                 self.bit += 1
157             # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
158             elif self.state == 'MARK IFT':
159                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
160                 if self.channel > 512:
161                     self.putr([8, ['Interpacket']])
162                     self.state = 'FIND BREAK'
163                     self.run_bit = dmx
164                     self.run_start = self.samplenum
165                 else:
166                     self.putr([7, ['Interframe']])
167                     self.state = 'READ BYTE'
168                     self.bit = 0
169                     self.run_start = self.samplenum