]> sigrok.org Git - libsigrokdecode.git/blob - decoders/hdcp/pd.py
All PDs: Consistently use singular/plural for annotation classes/rows.
[libsigrokdecode.git] / decoders / hdcp / pd.py
1 ##
2 ## This file is part of the libsigrokdecode project.
3 ##
4 ## Copyright (C) 2018 Dave Craig <dcraig@brightsign.biz>
5 ##
6 ## This program is free software; you can redistribute it and/or modify
7 ## it under the terms of the GNU General Public License as published by
8 ## the Free Software Foundation; either version 2 of the License, or
9 ## (at your option) any later version.
10 ##
11 ## This program is distributed in the hope that it will be useful,
12 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
13 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 ## GNU General Public License for more details.
15 ##
16 ## You should have received a copy of the GNU General Public License
17 ## along with this program; if not, see <http://www.gnu.org/licenses/>.
18 ##
19
20 import sigrokdecode as srd
21
22 msg_ids = {
23     2: 'AKE_Init',
24     3: 'AKE_Send_Cert',
25     4: 'AKE_No_stored_km',
26     5: 'AKE_Stored_km',
27
28     7: 'AKE_Send_H_prime',
29     8: 'AKE_Send_Pairing_Info',
30
31     9: 'LC_Init',
32     10: 'LC_Send_L_prime',
33
34     11: 'SKE_Send_Eks',
35     12: 'RepeaterAuth_Send_ReceiverID_List',
36
37     15: 'RepeaterAuth_Send_Ack',
38     16: 'RepeaterAuth_Stream_Manage',
39     17: 'RepeaterAuth_Stream_Ready',
40 }
41
42 write_items = {
43     0x00: '1.4 Bksv - Receiver KSV',
44     0x08: '1.4 Ri\' - Link Verification',
45     0x0a: '1.4 Pj\' - Enhanced Link Verification',
46     0x10: '1.4 Aksv - Transmitter KSV',
47     0x15: '1.4 Ainfo - Transmitter KSV',
48     0x18: '1.4 An - Session random number',
49     0x20: '1.4 V\'H0',
50     0x24: '1.4 V\'H1',
51     0x28: '1.4 V\'H2',
52     0x2c: '1.4 V\'H3',
53     0x30: '1.4 V\'H4',
54     0x40: '1.4 Bcaps',
55     0x41: '1.4 Bstatus',
56     0x43: '1.4 KSV FIFO',
57     0x50: 'HDCP2Version',
58     0x60: 'Write_Message',
59     0x70: 'RxStatus',
60     0x80: 'Read_Message',
61 }
62
63 class Decoder(srd.Decoder):
64     api_version = 3
65     id = 'hdcp'
66     name = 'HDCP'
67     longname = 'HDCP over HDMI'
68     desc = 'HDCP protocol over HDMI.'
69     license = 'gplv2+'
70     inputs = ['i2c']
71     outputs = ['hdcp']
72     tags = ['PC', 'Security/crypto']
73     annotations = \
74         tuple(('message-0x%02X' % i, 'Message 0x%02X' % i) for i in range(18)) + (
75         ('summary', 'Summary'),
76         ('warning', 'Warning'),
77     )
78     annotation_rows = (
79         ('messages', 'Messages', tuple(range(18))),
80         ('summaries', 'Summaries', (18,)),
81         ('warnings', 'Warnings', (19,)),
82     )
83
84     def __init__(self):
85         self.reset()
86
87     def reset(self):
88         self.state = 'IDLE'
89         self.stack = []
90         self.msg = -1
91         self.ss = self.es = self.ss_block = self.es_block = 0
92         self.init_seq = []
93         self.valid = 0
94         self.type = ''
95
96     def start(self):
97         self.out_ann = self.register(srd.OUTPUT_ANN)
98
99     def putb(self, data):
100         self.put(self.ss_block, self.es_block, self.out_ann, data)
101
102     def decode(self, ss, es, data):
103         cmd, databyte = data
104
105         # Collect the 'BITS' packet, then return. The next packet is
106         # guaranteed to belong to these bits we just stored.
107         if cmd == 'BITS':
108             self.bits = databyte
109             return
110
111         self.ss, self.es = ss, es
112
113         # State machine.
114         if self.state == 'IDLE':
115             # Wait for an I2C START condition.
116             if cmd == 'START':
117                 self.reset()
118                 self.ss_block = ss
119             elif cmd != 'START REPEAT':
120                 return
121             self.state = 'GET SLAVE ADDR'
122         elif self.state == 'GET SLAVE ADDR':
123             if cmd == 'ADDRESS READ':
124                 self.state = 'BUFFER DATA'
125                 if databyte != 0x3a:
126                     self.state = 'IDLE'
127             elif cmd == 'ADDRESS WRITE':
128                 self.state = 'WRITE OFFSET'
129                 if databyte != 0x3a:
130                     self.state = 'IDLE'
131         elif self.state == 'WRITE OFFSET':
132             if cmd == 'DATA WRITE':
133                 if databyte in write_items:
134                     self.type = write_items[databyte]
135                 if databyte in (0x10, 0x15, 0x18, 0x60):
136                     self.state = 'BUFFER DATA'
137                 # If we are reading, then jump back to IDLE for a start repeat.
138                 # If we are writing, then just continue onwards.
139                 if self.state == 'BUFFER DATA':
140                     pass
141                 elif self.type != '':
142                     self.state = 'IDLE'
143         elif self.state == 'BUFFER DATA':
144             if cmd in ('STOP', 'NACK'):
145                 self.es_block = es
146                 self.state = 'IDLE'
147                 if self.type == '':
148                     return
149                 if not self.stack:
150                     self.putb([18, ['%s' % (self.type)]])
151                     return
152                 if self.type == 'RxStatus':
153                     rxstatus = (self.stack.pop() << 8) | self.stack.pop()
154                     reauth_req = (rxstatus & 0x800) != 0
155                     ready = (rxstatus & 0x400) != 0
156                     length = rxstatus & 0x3ff
157                     text = '%s, reauth %s, ready %s, length %s' % \
158                         (self.type, reauth_req, ready, length)
159                     self.putb([18, [text]])
160                 elif self.type == '1.4 Bstatus':
161                     bstatus = (self.stack.pop() << 8) | self.stack.pop()
162                     device_count = bstatus & 0x7f
163                     max_devs_exceeded = (bstatus & 0x80) != 0
164                     depth = ((bstatus & 0x700) >> 8)
165                     max_cascase_exceeded = bstatus & 0x800
166                     hdmi_mode = (bstatus & 0x1000) != 0
167                     text = '%s, %s devices, depth %s, hdmi mode %s' % \
168                         (self.type, device_count, depth, hdmi_mode)
169                     self.putb([18, [text]])
170                 elif self.type == 'Read_Message':
171                     msg = self.stack.pop(0)
172                     self.putb([msg, ['%s, %s' % (self.type,
173                         msg_ids.get(msg, 'Invalid'))]])
174                 elif self.type == 'Write_Message':
175                     msg = self.stack.pop(0)
176                     self.putb([msg, ['%s, %s' % (self.type,
177                         msg_ids.get(msg, 'Invalid'))]])
178                 elif self.type == 'HDCP2Version':
179                     version = self.stack.pop(0)
180                     if (version & 0x4):
181                         self.putb([18, ['HDCP2']])
182                     else:
183                         self.putb([18, ['NOT HDCP2']])
184                 else:
185                     self.putb([18, ['%s' % (self.type)]])
186             elif cmd == 'DATA READ':
187                 # Stack up our data bytes.
188                 self.stack.append(databyte)
189             elif cmd == 'DATA WRITE':
190                 # Stack up our data bytes.
191                 self.stack.append(databyte)