]> sigrok.org Git - libsigrokdecode.git/blob - decoders/dmx512/pd.py
decoders: Rephrase condition-less .wait() calls (self documentation)
[libsigrokdecode.git] / decoders / dmx512 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class Decoder(srd.Decoder):
23     api_version = 3
24     id = 'dmx512'
25     name = 'DMX512'
26     longname = 'Digital MultipleX 512'
27     desc = 'Professional lighting control protocol.'
28     license = 'gplv2+'
29     inputs = ['logic']
30     outputs = ['dmx512']
31     channels = (
32         {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
33     )
34     annotations = (
35         ('bit', 'Bit'),
36         ('break', 'Break'),
37         ('mab', 'Mark after break'),
38         ('startbit', 'Start bit'),
39         ('stopbits', 'Stop bit'),
40         ('startcode', 'Start code'),
41         ('channel', 'Channel'),
42         ('interframe', 'Interframe'),
43         ('interpacket', 'Interpacket'),
44         ('data', 'Data'),
45         ('error', 'Error'),
46     )
47     annotation_rows = (
48         ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
49         ('data', 'Data', (9,)),
50         ('bits', 'Bits', (0, 3, 4)),
51         ('errors', 'Errors', (10,)),
52     )
53
54     def __init__(self):
55         self.samplerate = None
56         self.sample_usec = None
57         self.run_start = -1
58         self.run_bit = 0
59         self.state = 'FIND BREAK'
60
61     def start(self):
62         self.out_ann = self.register(srd.OUTPUT_ANN)
63
64     def metadata(self, key, value):
65         if key == srd.SRD_CONF_SAMPLERATE:
66             self.samplerate = value
67             self.sample_usec = 1 / value * 1000000
68             self.skip_per_bit = int(4 / self.sample_usec)
69
70     def putr(self, data):
71         self.put(self.run_start, self.samplenum, self.out_ann, data)
72
73     def decode(self):
74         if not self.samplerate:
75             raise SamplerateError('Cannot decode without samplerate.')
76         while True:
77             # Seek for an interval with no state change with a length between
78             # 88 and 1000000 us (BREAK).
79             if self.state == 'FIND BREAK':
80                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
81                 runlen = (self.samplenum - self.run_start) * self.sample_usec
82                 if runlen > 88 and runlen < 1000000:
83                     self.putr([1, ['Break']])
84                     self.bit_break = self.run_bit
85                     self.state = 'MARK MAB'
86                     self.channel = 0
87                 elif runlen >= 1000000:
88                     # Error condition.
89                     self.putr([10, ['Invalid break length']])
90                 self.run_bit = dmx
91                 self.run_start = self.samplenum
92             # Directly following the BREAK is the MARK AFTER BREAK.
93             elif self.state == 'MARK MAB':
94                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
95                 self.putr([2, ['MAB']])
96                 self.state = 'READ BYTE'
97                 self.channel = 0
98                 self.bit = 0
99                 self.aggreg = dmx
100                 self.run_start = self.samplenum
101             # Mark and read a single transmitted byte
102             # (start bit, 8 data bits, 2 stop bits).
103             elif self.state == 'READ BYTE':
104                 (dmx,) = self.wait()
105                 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
106                 self.aggreg += dmx
107                 if self.samplenum != self.next_sample:
108                     continue
109                 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
110
111                 if self.bit == 0:
112                     self.byte = 0
113                     self.putr([3, ['Start bit']])
114                     if bit_value != 0:
115                         # (Possibly) invalid start bit, mark but don't fail.
116                         self.put(self.samplenum, self.samplenum,
117                                  self.out_ann, [10, ['Invalid start bit']])
118                 elif self.bit >= 9:
119                     self.put(self.samplenum - self.skip_per_bit,
120                         self.samplenum, self.out_ann, [4, ['Stop bit']])
121                     if bit_value != 1:
122                         # Invalid stop bit, mark.
123                         self.put(self.samplenum, self.samplenum,
124                             self.out_ann, [10, ['Invalid stop bit']])
125                         if self.bit == 10:
126                             # On invalid 2nd stop bit, search for new break.
127                             self.run_bit = dmx
128                             self.state = 'FIND BREAK'
129                 else:
130                     # Label and process one bit.
131                     self.put(self.samplenum - self.skip_per_bit,
132                         self.samplenum, self.out_ann, [0, [str(bit_value)]])
133                     self.byte |= bit_value << (self.bit - 1)
134
135                 # Label a complete byte.
136                 if self.bit == 10:
137                     if self.channel == 0:
138                         d = [5, ['Start code']]
139                     else:
140                         d = [6, ['Channel ' + str(self.channel)]]
141                     self.put(self.run_start, self.next_sample, self.out_ann, d)
142                     self.put(self.run_start + self.skip_per_bit,
143                         self.next_sample - 2 * self.skip_per_bit,
144                         self.out_ann, [9, [str(self.byte) + ' / ' + \
145                         str(hex(self.byte))]])
146                     # Continue by scanning the IFT.
147                     self.channel += 1
148                     self.run_start = self.samplenum
149                     self.run_bit = dmx
150                     self.state = 'MARK IFT'
151
152                 self.aggreg = dmx
153                 self.bit += 1
154             # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
155             elif self.state == 'MARK IFT':
156                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
157                 if self.channel > 512:
158                     self.putr([8, ['Interpacket']])
159                     self.state = 'FIND BREAK'
160                     self.run_bit = dmx
161                     self.run_start = self.samplenum
162                 else:
163                     self.putr([7, ['Interframe']])
164                     self.state = 'READ BYTE'
165                     self.bit = 0
166                     self.run_start = self.samplenum