]> sigrok.org Git - libsigrokdecode.git/blame_incremental - decoders/dmx512/pd.py
dmx512: Use a nicer 'dmx' pin name variable.
[libsigrokdecode.git] / decoders / dmx512 / pd.py
... / ...
CommitLineData
1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5##
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
17## along with this program; if not, see <http://www.gnu.org/licenses/>.
18##
19
20import sigrokdecode as srd
21
22class Decoder(srd.Decoder):
23 api_version = 3
24 id = 'dmx512'
25 name = 'DMX512'
26 longname = 'Digital MultipleX 512'
27 desc = 'Professional lighting control protocol.'
28 license = 'gplv2+'
29 inputs = ['logic']
30 outputs = ['dmx512']
31 channels = (
32 {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
33 )
34 annotations = (
35 ('bit', 'Bit'),
36 ('break', 'Break'),
37 ('mab', 'Mark after break'),
38 ('startbit', 'Start bit'),
39 ('stopbits', 'Stop bit'),
40 ('startcode', 'Start code'),
41 ('channel', 'Channel'),
42 ('interframe', 'Interframe'),
43 ('interpacket', 'Interpacket'),
44 ('data', 'Data'),
45 ('error', 'Error'),
46 )
47 annotation_rows = (
48 ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
49 ('data', 'Data', (9,)),
50 ('bits', 'Bits', (0, 3, 4)),
51 ('errors', 'Errors', (10,)),
52 )
53
54 def __init__(self):
55 self.samplerate = None
56 self.sample_usec = None
57 self.run_start = -1
58 self.run_bit = 0
59 self.state = 'FIND BREAK'
60
61 def start(self):
62 self.out_ann = self.register(srd.OUTPUT_ANN)
63
64 def metadata(self, key, value):
65 if key == srd.SRD_CONF_SAMPLERATE:
66 self.samplerate = value
67 self.sample_usec = 1 / value * 1000000
68 self.skip_per_bit = int(4 / self.sample_usec)
69
70 def putr(self, data):
71 self.put(self.run_start, self.samplenum, self.out_ann, data)
72
73 def decode(self):
74 if not self.samplerate:
75 raise SamplerateError('Cannot decode without samplerate.')
76 while True:
77 # Seek for an interval with no state change with a length between
78 # 88 and 1000000 us (BREAK).
79 if self.state == 'FIND BREAK':
80 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
81 runlen = (self.samplenum - self.run_start) * self.sample_usec
82 if runlen > 88 and runlen < 1000000:
83 self.putr([1, ['Break']])
84 self.bit_break = self.run_bit
85 self.state = 'MARK MAB'
86 self.channel = 0
87 elif runlen >= 1000000:
88 # Error condition.
89 self.putr([10, ['Invalid break length']])
90 self.run_bit = dmx
91 self.run_start = self.samplenum
92 # Directly following the BREAK is the MARK AFTER BREAK.
93 elif self.state == 'MARK MAB':
94 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
95 self.putr([2, ['MAB']])
96 self.state = 'READ BYTE'
97 self.channel = 0
98 self.bit = 0
99 self.aggreg = dmx
100 self.run_start = self.samplenum
101 # Mark and read a single transmitted byte
102 # (start bit, 8 data bits, 2 stop bits).
103 elif self.state == 'READ BYTE':
104 (dmx,) = self.wait({'skip': 1})
105 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
106 self.aggreg += dmx
107 if self.samplenum != self.next_sample:
108 continue
109 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
110
111 if self.bit == 0:
112 self.byte = 0
113 self.putr([3, ['Start bit']])
114 if bit_value != 0:
115 # (Possibly) invalid start bit, mark but don't fail.
116 self.put(self.samplenum, self.samplenum,
117 self.out_ann, [10, ['Invalid start bit']])
118 elif self.bit >= 9:
119 self.put(self.samplenum - self.skip_per_bit,
120 self.samplenum, self.out_ann, [4, ['Stop bit']])
121 if bit_value != 1:
122 # Invalid stop bit, mark.
123 self.put(self.samplenum, self.samplenum,
124 self.out_ann, [10, ['Invalid stop bit']])
125 if self.bit == 10:
126 # On invalid 2nd stop bit, search for new break.
127 self.run_bit = dmx
128 self.state = 'FIND BREAK'
129 else:
130 # Label and process one bit.
131 self.put(self.samplenum - self.skip_per_bit,
132 self.samplenum, self.out_ann, [0, [str(bit_value)]])
133 self.byte |= bit_value << (self.bit - 1)
134
135 # Label a complete byte.
136 if self.bit == 10:
137 if self.channel == 0:
138 d = [5, ['Start code']]
139 else:
140 d = [6, ['Channel ' + str(self.channel)]]
141 self.put(self.run_start, self.next_sample, self.out_ann, d)
142 self.put(self.run_start + self.skip_per_bit,
143 self.next_sample - 2 * self.skip_per_bit,
144 self.out_ann, [9, [str(self.byte) + ' / ' + \
145 str(hex(self.byte))]])
146 # Continue by scanning the IFT.
147 self.channel += 1
148 self.run_start = self.samplenum
149 self.run_bit = dmx
150 self.state = 'MARK IFT'
151
152 self.aggreg = dmx
153 self.bit += 1
154 # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
155 elif self.state == 'MARK IFT':
156 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
157 if self.channel > 512:
158 self.putr([8, ['Interpacket']])
159 self.state = 'FIND BREAK'
160 self.run_bit = dmx
161 self.run_start = self.samplenum
162 else:
163 self.putr([7, ['Interframe']])
164 self.state = 'READ BYTE'
165 self.bit = 0
166 self.run_start = self.samplenum