]> sigrok.org Git - libsigrokdecode.git/blame - decoders/dmx512/pd.py
license: remove FSF postal address from boiler plate license text
[libsigrokdecode.git] / decoders / dmx512 / pd.py
CommitLineData
27ea5bac 1##
2## This file is part of the libsigrokdecode project.
3##
4## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5##
6## This program is free software; you can redistribute it and/or modify
7## it under the terms of the GNU General Public License as published by
8## the Free Software Foundation; either version 2 of the License, or
9## (at your option) any later version.
10##
11## This program is distributed in the hope that it will be useful,
12## but WITHOUT ANY WARRANTY; without even the implied warranty of
13## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14## GNU General Public License for more details.
15##
16## You should have received a copy of the GNU General Public License
4539e9ca 17## along with this program; if not, see <http://www.gnu.org/licenses/>.
27ea5bac 18##
19
20import sigrokdecode as srd
21
22class Decoder(srd.Decoder):
23 api_version = 2
24 id = 'dmx512'
25 name = 'DMX512'
26 longname = 'Digital MultipleX 512'
27 desc = 'Professional lighting control protocol.'
28 license = 'gplv2+'
29 inputs = ['logic']
30 outputs = ['dmx512']
31 channels = (
32 {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
33 )
34 annotations = (
35 ('bit', 'Bit'),
36 ('break', 'Break'),
37 ('mab', 'Mark after break'),
38 ('startbit', 'Start bit'),
39 ('stopbits', 'Stop bit'),
40 ('startcode', 'Start code'),
41 ('channel', 'Channel'),
42 ('interframe', 'Interframe'),
43 ('interpacket', 'Interpacket'),
44 ('data', 'Data'),
45 ('error', 'Error'),
46 )
47 annotation_rows = (
48 ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
49 ('data', 'Data', (9,)),
50 ('bits', 'Bits', (0, 3, 4)),
51 ('errors', 'Errors', (10,)),
52 )
53
54 def __init__(self):
55 self.samplerate = None
56 self.sample_usec = None
57 self.samplenum = -1
58 self.run_start = -1
59 self.run_bit = 0
60 self.state = 'FIND BREAK'
61
62 def start(self):
63 self.out_ann = self.register(srd.OUTPUT_ANN)
64
65 def metadata(self, key, value):
66 if key == srd.SRD_CONF_SAMPLERATE:
67 self.samplerate = value
68 self.sample_usec = 1 / value * 1000000
69 self.skip_per_bit = int(4 / self.sample_usec)
70
71 def putr(self, data):
72 self.put(self.run_start, self.samplenum, self.out_ann, data)
73
74 def decode(self, ss, es, data):
75 if not self.samplerate:
76 raise SamplerateError('Cannot decode without samplerate.')
77 for (self.samplenum, pins) in data:
78 # Seek for an interval with no state change with a length between
79 # 88 and 1000000 us (BREAK).
80 if self.state == 'FIND BREAK':
81 if self.run_bit == pins[0]:
82 continue
83 runlen = (self.samplenum - self.run_start) * self.sample_usec
84 if runlen > 88 and runlen < 1000000:
85 self.putr([1, ['Break']])
86 self.bit_break = self.run_bit
87 self.state = 'MARK MAB'
88 self.channel = 0
89 elif runlen >= 1000000:
90 # Error condition.
91 self.putr([10, ['Invalid break length']])
92 self.run_bit = pins[0]
93 self.run_start = self.samplenum
94 # Directly following the BREAK is the MARK AFTER BREAK.
95 elif self.state == 'MARK MAB':
96 if self.run_bit == pins[0]:
97 continue
98 self.putr([2, ['MAB']])
99 self.state = 'READ BYTE'
100 self.channel = 0
101 self.bit = 0
102 self.aggreg = pins[0]
103 self.run_start = self.samplenum
104 # Mark and read a single transmitted byte
105 # (start bit, 8 data bits, 2 stop bits).
106 elif self.state == 'READ BYTE':
107 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
108 self.aggreg += pins[0]
109 if self.samplenum != self.next_sample:
110 continue
111 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
112
113 if self.bit == 0:
114 self.byte = 0
115 self.putr([3, ['Start bit']])
116 if bit_value != 0:
117 # (Possibly) invalid start bit, mark but don't fail.
118 self.put(self.samplenum, self.samplenum,
119 self.out_ann, [10, ['Invalid start bit']])
120 elif self.bit >= 9:
121 self.put(self.samplenum - self.skip_per_bit,
122 self.samplenum, self.out_ann, [4, ['Stop bit']])
123 if bit_value != 1:
124 # Invalid stop bit, mark.
125 self.put(self.samplenum, self.samplenum,
126 self.out_ann, [10, ['Invalid stop bit']])
127 if self.bit == 10:
128 # On invalid 2nd stop bit, search for new break.
129 self.run_bit = pins[0]
130 self.state = 'FIND BREAK'
131 else:
132 # Label and process one bit.
133 self.put(self.samplenum - self.skip_per_bit,
134 self.samplenum, self.out_ann, [0, [str(bit_value)]])
135 self.byte |= bit_value << (self.bit - 1)
136
137 # Label a complete byte.
138 if self.bit == 10:
139 if self.channel == 0:
140 d = [5, ['Start code']]
141 else:
142 d = [6, ['Channel ' + str(self.channel)]]
143 self.put(self.run_start, self.next_sample, self.out_ann, d)
144 self.put(self.run_start + self.skip_per_bit,
145 self.next_sample - 2 * self.skip_per_bit,
146 self.out_ann, [9, [str(self.byte) + ' / ' + \
147 str(hex(self.byte))]])
148 # Continue by scanning the IFT.
149 self.channel += 1
150 self.run_start = self.samplenum
151 self.run_bit = pins[0]
152 self.state = 'MARK IFT'
153
154 self.aggreg = pins[0]
155 self.bit += 1
156 # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
157 elif self.state == 'MARK IFT':
158 if self.run_bit == pins[0]:
159 continue
160 if self.channel > 512:
161 self.putr([8, ['Interpacket']])
162 self.state = 'FIND BREAK'
163 self.run_bit = pins[0]
164 self.run_start = self.samplenum
165 else:
166 self.putr([7, ['Interframe']])
167 self.state = 'READ BYTE'
168 self.bit = 0
169 self.run_start = self.samplenum