]> sigrok.org Git - libsigrokdecode.git/blob - decoders/dmx512/pd.py
e684b13f2dc1f8aadb709f8fe3bc968544cd2db3
[libsigrokdecode.git] / decoders / dmx512 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, write to the Free Software
18 ## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
19 ##
20
21 import sigrokdecode as srd
22
23 class Decoder(srd.Decoder):
24     api_version = 2
25     id = 'dmx512'
26     name = 'DMX512'
27     longname = 'Digital MultipleX 512'
28     desc = 'Professional lighting control protocol.'
29     license = 'gplv2+'
30     inputs = ['logic']
31     outputs = ['dmx512']
32     channels = (
33         {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
34     )
35     annotations = (
36         ('bit', 'Bit'),
37         ('break', 'Break'),
38         ('mab', 'Mark after break'),
39         ('startbit', 'Start bit'),
40         ('stopbits', 'Stop bit'),
41         ('startcode', 'Start code'),
42         ('channel', 'Channel'),
43         ('interframe', 'Interframe'),
44         ('interpacket', 'Interpacket'),
45         ('data', 'Data'),
46         ('error', 'Error'),
47     )
48     annotation_rows = (
49         ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
50         ('data', 'Data', (9,)),
51         ('bits', 'Bits', (0, 3, 4)),
52         ('errors', 'Errors', (10,)),
53     )
54
55     def __init__(self):
56         self.samplerate = None
57         self.sample_usec = None
58         self.samplenum = -1
59         self.run_start = -1
60         self.run_bit = 0
61         self.state = 'FIND BREAK'
62
63     def start(self):
64         self.out_ann = self.register(srd.OUTPUT_ANN)
65
66     def metadata(self, key, value):
67         if key == srd.SRD_CONF_SAMPLERATE:
68             self.samplerate = value
69             self.sample_usec = 1 / value * 1000000
70             self.skip_per_bit = int(4 / self.sample_usec)
71
72     def putr(self, data):
73         self.put(self.run_start, self.samplenum, self.out_ann, data)
74
75     def decode(self, ss, es, data):
76         if not self.samplerate:
77             raise SamplerateError('Cannot decode without samplerate.')
78         for (self.samplenum, pins) in data:
79             # Seek for an interval with no state change with a length between
80             # 88 and 1000000 us (BREAK).
81             if self.state == 'FIND BREAK':
82                 if self.run_bit == pins[0]:
83                     continue
84                 runlen = (self.samplenum - self.run_start) * self.sample_usec
85                 if runlen > 88 and runlen < 1000000:
86                     self.putr([1, ['Break']])
87                     self.bit_break = self.run_bit
88                     self.state = 'MARK MAB'
89                     self.channel = 0
90                 elif runlen >= 1000000:
91                     # Error condition.
92                     self.putr([10, ['Invalid break length']])
93                 self.run_bit = pins[0]
94                 self.run_start = self.samplenum
95             # Directly following the BREAK is the MARK AFTER BREAK.
96             elif self.state == 'MARK MAB':
97                 if self.run_bit == pins[0]:
98                     continue
99                 self.putr([2, ['MAB']])
100                 self.state = 'READ BYTE'
101                 self.channel = 0
102                 self.bit = 0
103                 self.aggreg = pins[0]
104                 self.run_start = self.samplenum
105             # Mark and read a single transmitted byte
106             # (start bit, 8 data bits, 2 stop bits).
107             elif self.state == 'READ BYTE':
108                 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
109                 self.aggreg += pins[0]
110                 if self.samplenum != self.next_sample:
111                     continue
112                 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
113
114                 if self.bit == 0:
115                     self.byte = 0
116                     self.putr([3, ['Start bit']])
117                     if bit_value != 0:
118                         # (Possibly) invalid start bit, mark but don't fail.
119                         self.put(self.samplenum, self.samplenum,
120                                  self.out_ann, [10, ['Invalid start bit']])
121                 elif self.bit >= 9:
122                     self.put(self.samplenum - self.skip_per_bit,
123                         self.samplenum, self.out_ann, [4, ['Stop bit']])
124                     if bit_value != 1:
125                         # Invalid stop bit, mark.
126                         self.put(self.samplenum, self.samplenum,
127                             self.out_ann, [10, ['Invalid stop bit']])
128                         if self.bit == 10:
129                             # On invalid 2nd stop bit, search for new break.
130                             self.run_bit = pins[0]
131                             self.state = 'FIND BREAK'
132                 else:
133                     # Label and process one bit.
134                     self.put(self.samplenum - self.skip_per_bit,
135                         self.samplenum, self.out_ann, [0, [str(bit_value)]])
136                     self.byte |= bit_value << (self.bit - 1)
137
138                 # Label a complete byte.
139                 if self.bit == 10:
140                     if self.channel == 0:
141                         d = [5, ['Start code']]
142                     else:
143                         d = [6, ['Channel ' + str(self.channel)]]
144                     self.put(self.run_start, self.next_sample, self.out_ann, d)
145                     self.put(self.run_start + self.skip_per_bit,
146                         self.next_sample - 2 * self.skip_per_bit,
147                         self.out_ann, [9, [str(self.byte) + ' / ' + \
148                         str(hex(self.byte))]])
149                     # Continue by scanning the IFT.
150                     self.channel += 1
151                     self.run_start = self.samplenum
152                     self.run_bit = pins[0]
153                     self.state = 'MARK IFT'
154
155                 self.aggreg = pins[0]
156                 self.bit += 1
157             # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
158             elif self.state == 'MARK IFT':
159                 if self.run_bit == pins[0]:
160                     continue
161                 if self.channel > 512:
162                     self.putr([8, ['Interpacket']])
163                     self.state = 'FIND BREAK'
164                     self.run_bit = pins[0]
165                     self.run_start = self.samplenum
166                 else:
167                     self.putr([7, ['Interframe']])
168                     self.state = 'READ BYTE'
169                     self.bit = 0
170                     self.run_start = self.samplenum