]> sigrok.org Git - libsigrokdecode.git/blob - decoders/dmx512/pd.py
e83d943e15f5a8e9dbf893b241bb9bce47cf72c3
[libsigrokdecode.git] / decoders / dmx512 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class Decoder(srd.Decoder):
23     api_version = 2
24     id = 'dmx512'
25     name = 'DMX512'
26     longname = 'Digital MultipleX 512'
27     desc = 'Professional lighting control protocol.'
28     license = 'gplv2+'
29     inputs = ['logic']
30     outputs = ['dmx512']
31     channels = (
32         {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
33     )
34     annotations = (
35         ('bit', 'Bit'),
36         ('break', 'Break'),
37         ('mab', 'Mark after break'),
38         ('startbit', 'Start bit'),
39         ('stopbits', 'Stop bit'),
40         ('startcode', 'Start code'),
41         ('channel', 'Channel'),
42         ('interframe', 'Interframe'),
43         ('interpacket', 'Interpacket'),
44         ('data', 'Data'),
45         ('error', 'Error'),
46     )
47     annotation_rows = (
48         ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
49         ('data', 'Data', (9,)),
50         ('bits', 'Bits', (0, 3, 4)),
51         ('errors', 'Errors', (10,)),
52     )
53
54     def __init__(self):
55         self.samplerate = None
56         self.sample_usec = None
57         self.samplenum = -1
58         self.run_start = -1
59         self.run_bit = 0
60         self.state = 'FIND BREAK'
61
62     def start(self):
63         self.out_ann = self.register(srd.OUTPUT_ANN)
64
65     def metadata(self, key, value):
66         if key == srd.SRD_CONF_SAMPLERATE:
67             self.samplerate = value
68             self.sample_usec = 1 / value * 1000000
69             self.skip_per_bit = int(4 / self.sample_usec)
70
71     def putr(self, data):
72         self.put(self.run_start, self.samplenum, self.out_ann, data)
73
74     def decode(self, ss, es, data):
75         if not self.samplerate:
76             raise SamplerateError('Cannot decode without samplerate.')
77         for (self.samplenum, pins) in data:
78             # Seek for an interval with no state change with a length between
79             # 88 and 1000000 us (BREAK).
80             if self.state == 'FIND BREAK':
81                 if self.run_bit == pins[0]:
82                     continue
83                 runlen = (self.samplenum - self.run_start) * self.sample_usec
84                 if runlen > 88 and runlen < 1000000:
85                     self.putr([1, ['Break']])
86                     self.bit_break = self.run_bit
87                     self.state = 'MARK MAB'
88                     self.channel = 0
89                 elif runlen >= 1000000:
90                     # Error condition.
91                     self.putr([10, ['Invalid break length']])
92                 self.run_bit = pins[0]
93                 self.run_start = self.samplenum
94             # Directly following the BREAK is the MARK AFTER BREAK.
95             elif self.state == 'MARK MAB':
96                 if self.run_bit == pins[0]:
97                     continue
98                 self.putr([2, ['MAB']])
99                 self.state = 'READ BYTE'
100                 self.channel = 0
101                 self.bit = 0
102                 self.aggreg = pins[0]
103                 self.run_start = self.samplenum
104             # Mark and read a single transmitted byte
105             # (start bit, 8 data bits, 2 stop bits).
106             elif self.state == 'READ BYTE':
107                 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
108                 self.aggreg += pins[0]
109                 if self.samplenum != self.next_sample:
110                     continue
111                 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
112
113                 if self.bit == 0:
114                     self.byte = 0
115                     self.putr([3, ['Start bit']])
116                     if bit_value != 0:
117                         # (Possibly) invalid start bit, mark but don't fail.
118                         self.put(self.samplenum, self.samplenum,
119                                  self.out_ann, [10, ['Invalid start bit']])
120                 elif self.bit >= 9:
121                     self.put(self.samplenum - self.skip_per_bit,
122                         self.samplenum, self.out_ann, [4, ['Stop bit']])
123                     if bit_value != 1:
124                         # Invalid stop bit, mark.
125                         self.put(self.samplenum, self.samplenum,
126                             self.out_ann, [10, ['Invalid stop bit']])
127                         if self.bit == 10:
128                             # On invalid 2nd stop bit, search for new break.
129                             self.run_bit = pins[0]
130                             self.state = 'FIND BREAK'
131                 else:
132                     # Label and process one bit.
133                     self.put(self.samplenum - self.skip_per_bit,
134                         self.samplenum, self.out_ann, [0, [str(bit_value)]])
135                     self.byte |= bit_value << (self.bit - 1)
136
137                 # Label a complete byte.
138                 if self.bit == 10:
139                     if self.channel == 0:
140                         d = [5, ['Start code']]
141                     else:
142                         d = [6, ['Channel ' + str(self.channel)]]
143                     self.put(self.run_start, self.next_sample, self.out_ann, d)
144                     self.put(self.run_start + self.skip_per_bit,
145                         self.next_sample - 2 * self.skip_per_bit,
146                         self.out_ann, [9, [str(self.byte) + ' / ' + \
147                         str(hex(self.byte))]])
148                     # Continue by scanning the IFT.
149                     self.channel += 1
150                     self.run_start = self.samplenum
151                     self.run_bit = pins[0]
152                     self.state = 'MARK IFT'
153
154                 self.aggreg = pins[0]
155                 self.bit += 1
156             # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
157             elif self.state == 'MARK IFT':
158                 if self.run_bit == pins[0]:
159                     continue
160                 if self.channel > 512:
161                     self.putr([8, ['Interpacket']])
162                     self.state = 'FIND BREAK'
163                     self.run_bit = pins[0]
164                     self.run_start = self.samplenum
165                 else:
166                     self.putr([7, ['Interframe']])
167                     self.state = 'READ BYTE'
168                     self.bit = 0
169                     self.run_start = self.samplenum