]> sigrok.org Git - libsigrokdecode.git/blob - decoders/dmx512/pd.py
decoders: Add/update tags for each PD.
[libsigrokdecode.git] / decoders / dmx512 / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2016 Fabian J. Stumpf <sigrok@fabianstumpf.de>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 class Decoder(srd.Decoder):
23     api_version = 3
24     id = 'dmx512'
25     name = 'DMX512'
26     longname = 'Digital MultipleX 512'
27     desc = 'Professional lighting control protocol.'
28     license = 'gplv2+'
29     inputs = ['logic']
30     outputs = ['dmx512']
31     tags = ['Embedded/industrial', 'Lighting']
32     channels = (
33         {'id': 'dmx', 'name': 'DMX data', 'desc': 'Any DMX data line'},
34     )
35     annotations = (
36         ('bit', 'Bit'),
37         ('break', 'Break'),
38         ('mab', 'Mark after break'),
39         ('startbit', 'Start bit'),
40         ('stopbits', 'Stop bit'),
41         ('startcode', 'Start code'),
42         ('channel', 'Channel'),
43         ('interframe', 'Interframe'),
44         ('interpacket', 'Interpacket'),
45         ('data', 'Data'),
46         ('error', 'Error'),
47     )
48     annotation_rows = (
49         ('name', 'Logical', (1, 2, 5, 6, 7, 8)),
50         ('data', 'Data', (9,)),
51         ('bits', 'Bits', (0, 3, 4)),
52         ('errors', 'Errors', (10,)),
53     )
54
55     def __init__(self):
56         self.reset()
57
58     def reset(self):
59         self.samplerate = None
60         self.sample_usec = None
61         self.run_start = -1
62         self.run_bit = 0
63         self.state = 'FIND BREAK'
64
65     def start(self):
66         self.out_ann = self.register(srd.OUTPUT_ANN)
67
68     def metadata(self, key, value):
69         if key == srd.SRD_CONF_SAMPLERATE:
70             self.samplerate = value
71             self.sample_usec = 1 / value * 1000000
72             self.skip_per_bit = int(4 / self.sample_usec)
73
74     def putr(self, data):
75         self.put(self.run_start, self.samplenum, self.out_ann, data)
76
77     def decode(self):
78         if not self.samplerate:
79             raise SamplerateError('Cannot decode without samplerate.')
80         while True:
81             # Seek for an interval with no state change with a length between
82             # 88 and 1000000 us (BREAK).
83             if self.state == 'FIND BREAK':
84                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
85                 runlen = (self.samplenum - self.run_start) * self.sample_usec
86                 if runlen > 88 and runlen < 1000000:
87                     self.putr([1, ['Break']])
88                     self.bit_break = self.run_bit
89                     self.state = 'MARK MAB'
90                     self.channel = 0
91                 elif runlen >= 1000000:
92                     # Error condition.
93                     self.putr([10, ['Invalid break length']])
94                 self.run_bit = dmx
95                 self.run_start = self.samplenum
96             # Directly following the BREAK is the MARK AFTER BREAK.
97             elif self.state == 'MARK MAB':
98                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
99                 self.putr([2, ['MAB']])
100                 self.state = 'READ BYTE'
101                 self.channel = 0
102                 self.bit = 0
103                 self.aggreg = dmx
104                 self.run_start = self.samplenum
105             # Mark and read a single transmitted byte
106             # (start bit, 8 data bits, 2 stop bits).
107             elif self.state == 'READ BYTE':
108                 (dmx,) = self.wait()
109                 self.next_sample = self.run_start + (self.bit + 1) * self.skip_per_bit
110                 self.aggreg += dmx
111                 if self.samplenum != self.next_sample:
112                     continue
113                 bit_value = 0 if round(self.aggreg/self.skip_per_bit) == self.bit_break else 1
114
115                 if self.bit == 0:
116                     self.byte = 0
117                     self.putr([3, ['Start bit']])
118                     if bit_value != 0:
119                         # (Possibly) invalid start bit, mark but don't fail.
120                         self.put(self.samplenum, self.samplenum,
121                                  self.out_ann, [10, ['Invalid start bit']])
122                 elif self.bit >= 9:
123                     self.put(self.samplenum - self.skip_per_bit,
124                         self.samplenum, self.out_ann, [4, ['Stop bit']])
125                     if bit_value != 1:
126                         # Invalid stop bit, mark.
127                         self.put(self.samplenum, self.samplenum,
128                             self.out_ann, [10, ['Invalid stop bit']])
129                         if self.bit == 10:
130                             # On invalid 2nd stop bit, search for new break.
131                             self.run_bit = dmx
132                             self.state = 'FIND BREAK'
133                 else:
134                     # Label and process one bit.
135                     self.put(self.samplenum - self.skip_per_bit,
136                         self.samplenum, self.out_ann, [0, [str(bit_value)]])
137                     self.byte |= bit_value << (self.bit - 1)
138
139                 # Label a complete byte.
140                 if self.bit == 10:
141                     if self.channel == 0:
142                         d = [5, ['Start code']]
143                     else:
144                         d = [6, ['Channel ' + str(self.channel)]]
145                     self.put(self.run_start, self.next_sample, self.out_ann, d)
146                     self.put(self.run_start + self.skip_per_bit,
147                         self.next_sample - 2 * self.skip_per_bit,
148                         self.out_ann, [9, [str(self.byte) + ' / ' + \
149                         str(hex(self.byte))]])
150                     # Continue by scanning the IFT.
151                     self.channel += 1
152                     self.run_start = self.samplenum
153                     self.run_bit = dmx
154                     self.state = 'MARK IFT'
155
156                 self.aggreg = dmx
157                 self.bit += 1
158             # Mark the INTERFRAME-TIME between bytes / INTERPACKET-TIME between packets.
159             elif self.state == 'MARK IFT':
160                 (dmx,) = self.wait({0: 'h' if self.run_bit == 0 else 'l'})
161                 if self.channel > 512:
162                     self.putr([8, ['Interpacket']])
163                     self.state = 'FIND BREAK'
164                     self.run_bit = dmx
165                     self.run_start = self.samplenum
166                 else:
167                     self.putr([7, ['Interframe']])
168                     self.state = 'READ BYTE'
169                     self.bit = 0
170                     self.run_start = self.samplenum