X-Git-Url: https://sigrok.org/gitweb/?p=libsigrokdecode.git;a=blobdiff_plain;f=decoders%2Fusb_power_delivery%2Fpd.py;h=936842921ba7463283c8f921e992ea0ea916d30f;hp=c6d7c6cd5d07102f61fa2bc5151d94e919d5dbdd;hb=43047b89962667436f6ed11c5da5e6f4a1ccfbde;hpb=bc6f82bb2856c0b5175e6858a13dc2e071cf2fee diff --git a/decoders/usb_power_delivery/pd.py b/decoders/usb_power_delivery/pd.py index c6d7c6c..9368429 100644 --- a/decoders/usb_power_delivery/pd.py +++ b/decoders/usb_power_delivery/pd.py @@ -2,6 +2,8 @@ ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2015 Google, Inc +## Copyright (C) 2018 davidanger +## Copyright (C) 2018 Peter Hazenberg ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by @@ -14,8 +16,7 @@ ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +## along with this program; if not, see . ## import sigrokdecode as srd @@ -45,7 +46,13 @@ CTRL_TYPES = { 12: 'WAIT', 13: 'SOFT RESET', 14: 'reserved', - 15: 'reserved' + 15: 'reserved', + 16: 'Not Supported', + 17: 'Get_Source_Cap_Extended', + 18: 'Get_Status', + 19: 'FR_Swap', + 20: 'Get_PPS_Status', + 21: 'Get_Country_Codes', } # Data message type @@ -54,6 +61,9 @@ DATA_TYPES = { 2: 'REQUEST', 3: 'BIST', 4: 'SINK CAP', + 5: 'Battery_Status', + 6: 'Alert', + 7: 'Get_Country_Info', 15: 'VDM' } @@ -102,14 +112,23 @@ EOP = 0x16 SYNC_CODES = [SYNC1, SYNC2, SYNC3] HRST_CODES = [RST1, RST1, RST1, RST2] +SOP_SEQUENCES = [ + (SYNC1, SYNC1, SYNC1, SYNC2), + (SYNC1, SYNC1, SYNC3, SYNC3), + (SYNC1, SYNC3, SYNC1, SYNC3), + (SYNC1, RST2, RST2, SYNC3), + (SYNC1, RST2, SYNC3, SYNC2), + (RST1, SYNC1, RST1, SYNC3), + (RST1, RST1, RST1, RST2), +] START_OF_PACKETS = { - (SYNC1, SYNC1, SYNC1, SYNC2): 'SOP', - (SYNC1, SYNC1, SYNC3, SYNC3): "SOP'", - (SYNC1, SYNC3, SYNC1, SYNC3): 'SOP"', - (SYNC1, RST2, RST2, SYNC3): "SOP' Debug", - (SYNC1, RST2, SYNC3, SYNC2): 'SOP" Debug', - (RST1, SYNC1, RST1, SYNC3): 'Cable Reset', - (RST1, RST1, RST1, RST2): 'Hard Reset', + SOP_SEQUENCES[0]: 'SOP', + SOP_SEQUENCES[1]: "SOP'", + SOP_SEQUENCES[2]: 'SOP"', + SOP_SEQUENCES[3]: "SOP' Debug", + SOP_SEQUENCES[4]: 'SOP" Debug', + SOP_SEQUENCES[5]: 'Cable Reset', + SOP_SEQUENCES[6]: 'Hard Reset', } SYM_NAME = [ @@ -139,19 +158,12 @@ SYM_NAME = [ ] RDO_FLAGS = { + (1 << 23): 'unchunked', (1 << 24): 'no_suspend', (1 << 25): 'comm_cap', (1 << 26): 'cap_mismatch', (1 << 27): 'give_back' } -PDO_TYPE = ['', 'BATT:', 'VAR:', ''] -PDO_FLAGS = { - (1 << 29): 'dual_role_power', - (1 << 28): 'suspend', - (1 << 27): 'ext', - (1 << 26): 'comm_cap', - (1 << 25): 'dual_role_data' -} BIST_MODES = { 0: 'Receiver', @@ -178,6 +190,7 @@ VDM_CMDS = { } VDM_ACK = ['REQ', 'ACK', 'NAK', 'BSY'] + class SamplerateError(Exception): pass @@ -191,21 +204,24 @@ class Decoder(srd.Decoder): inputs = ['logic'] outputs = ['usb_pd'] channels = ( - {'id': 'cc', 'name': 'CC', 'desc': 'Control channel'}, + {'id': 'cc1', 'name': 'CC1', 'desc': 'Configuration Channel 1'}, + ) + optional_channels = ( + {'id': 'cc2', 'name': 'CC2', 'desc': 'Configuration Channel 2'}, ) options = ( - {'id': 'fulltext', 'desc': 'full text decoding of the packet', + {'id': 'fulltext', 'desc': 'Full text decoding of packets', 'default': 'no', 'values': ('yes', 'no')}, ) annotations = ( ('type', 'Packet Type'), - ('Preamble', 'Preamble'), - ('SOP', 'Start of Packet'), - ('Head', 'Header'), - ('Data', 'Data'), - ('CRC', 'Checksum'), - ('EOP', 'End Of Packet'), - ('Sym', '4b5b symbols'), + ('preamble', 'Preamble'), + ('sop', 'Start of Packet'), + ('header', 'Header'), + ('data', 'Data'), + ('crc', 'Checksum'), + ('eop', 'End Of Packet'), + ('sym', '4b5b symbols'), ('warnings', 'Warnings'), ('src', 'Source Message'), ('snk', 'Sink Message'), @@ -213,81 +229,128 @@ class Decoder(srd.Decoder): ('text', 'Plain text'), ) annotation_rows = ( - ('4B5B', 'symbols', (7, )), - ('Phase', 'parts', (1, 2, 3, 4, 5, 6, )), - ('payload', 'Payload', (11, )), - ('type', 'Type', (0, 9, 10, )), - ('warnings', 'Warnings', (8, )), - ('text', 'Full text', (12, )), + ('4b5b', 'Symbols', (7,)), + ('phase', 'Parts', (1, 2, 3, 4, 5, 6)), + ('payload', 'Payload', (11,)), + ('type', 'Type', (0, 9, 10)), + ('warnings', 'Warnings', (8,)), + ('text', 'Full text', (12,)), ) binary = ( ('raw-data', 'RAW binary data'), ) + stored_pdos = {} + def get_request(self, rdo): pos = (rdo >> 28) & 7 - op_ma = ((rdo >> 10) & 0x3ff) * 10 - max_ma = (rdo & 0x3ff) * 10 - flags = '' - for f in RDO_FLAGS.keys(): - if rdo & f: - flags += ' ' + RDO_FLAGS[f] - return '[%d]%d/%d mA%s' % (pos, op_ma, max_ma, flags) - - def get_source_cap(self, pdo): - t = (pdo >> 30) & 3 - if t == 0: - mv = ((pdo >> 10) & 0x3ff) * 50 - ma = ((pdo >> 0) & 0x3ff) * 10 - p = '%.1fV %.1fA' % (mv/1000.0, ma/1000.0) - elif t == 1: - minv = ((pdo >> 10) & 0x3ff) * 50 - maxv = ((pdo >> 20) & 0x3ff) * 50 - mw = ((pdo >> 0) & 0x3ff) * 250 - p = '%.1f/%.1fV %.1fW' % (minv/1000.0, maxv/1000.0, mw/1000.0) - elif t == 2: - minv = ((pdo >> 10) & 0x3ff) * 50 - maxv = ((pdo >> 20) & 0x3ff) * 50 - ma = ((pdo >> 0) & 0x3ff) * 10 - p = '%.1f/%.1fV %.1fA' % (minv/1000.0, maxv/1000.0, ma/1000.0) + + op_ma = ((rdo >> 10) & 0x3ff) * 0.01 + max_ma = (rdo & 0x3ff) * 0.01 + + mark = self.cap_mark[pos] + if mark == 3: + op_v = ((rdo >> 9) & 0x7ff) * 0.02 + op_a = (rdo & 0x3f) * 0.05 + t_settings = '%gV %gA' % (op_v, op_a) + elif mark == 2: + op_w = ((rdo >> 10) & 0x3ff) * 0.25 + mp_w = (rdo & 0x3ff) * 0.25 + t_settings = '%gW (operating)' % op_w else: - p = '' - flags = '' - for f in PDO_FLAGS.keys(): - if pdo & f: - flags += ' ' + PDO_FLAGS[f] - return '%s%s%s' % (PDO_TYPE[t], p, flags) - - def get_sink_cap(self, pdo): - t = (pdo >> 30) & 3 - if t == 0: - mv = ((pdo >> 10) & 0x3ff) * 50 - ma = ((pdo >> 0) & 0x3ff) * 10 - p = '%.1fV %.1fA' % (mv/1000.0, ma/1000.0) - elif t == 1: - minv = ((pdo >> 10) & 0x3ff) * 50 - maxv = ((pdo >> 20) & 0x3ff) * 50 - mw = ((pdo >> 0) & 0x3ff) * 250 - p = '%.1f/%.1fV %.1fW' % (minv/1000.0, maxv/1000.0, mw/1000.0) - elif t == 2: - minv = ((pdo >> 10) & 0x3ff) * 50 - maxv = ((pdo >> 20) & 0x3ff) * 50 - ma = ((pdo >> 0) & 0x3ff) * 10 - p = '%.1f/%.1fV %.1fA' % (minv/1000.0, maxv/1000.0, ma/1000.0) + op_a = ((rdo >> 10) & 0x3ff) * 0.01 + max_a = (rdo & 0x3ff) * 0.01 + t_settings = '%gA (operating) / %gA (max)' % (op_a, max_a) + + t_flags = '' + for f in sorted(RDO_FLAGS.keys(), reverse = True): + if rdo & f: + t_flags += ' [' + RDO_FLAGS[f] + ']' + + if pos in self.stored_pdos.keys(): + t_pdo = '#%d: %s' % (pos, self.stored_pdos[pos]) else: - p = '' - flags = '' - for f in PDO_FLAGS.keys(): + t_pdo = '#d' % (pos) + + return '(PDO %s) %s%s' % (t_pdo, t_settings, t_flags) + + def get_source_sink_cap(self, pdo, idx, source): + t1 = (pdo >> 30) & 3 + self.cap_mark[idx] = t1 + + flags = {} + if t1 == 0: + t_name = 'Fixed' + if source: + flags = { + (1 << 29): 'dual_role_power', + (1 << 28): 'suspend', + (1 << 27): 'unconstrained', + (1 << 26): 'comm_cap', + (1 << 25): 'dual_role_data', + (1 << 24): 'unchunked', + } + else: # Sink + flags = { + (1 << 29): 'dual_role_power', + (1 << 28): 'high_capability', + (1 << 27): 'unconstrained', + (1 << 26): 'comm_cap', + (1 << 25): 'dual_role_data', + (0b01 << 23): 'fr_swap default power', + (0b10 << 23): 'fr_swap 1.5 A', + (0b11 << 23): 'fr_swap 3.0 A', + } + mv = ((pdo >> 10) & 0x3ff) * 0.05 + ma = ((pdo >> 0) & 0x3ff) * 0.01 + p = '%gV %gA (%gW)' % (mv, ma, mv*ma) + self.stored_pdos[idx] = '%s %gV' % (t_name, mv) + elif t1 == 1: + t_name = 'Battery' + flags = {} # No flags defined for Battery PDO in PD 3.0 spec + minv = ((pdo >> 10) & 0x3ff) * 0.05 + maxv = ((pdo >> 20) & 0x3ff) * 0.05 + mw = ((pdo >> 0) & 0x3ff) * 0.25 + p = '%g/%gV %gW' % (minv, maxv, mw) + self.stored_pdos[idx] = '%s %g/%gV' % (t_name, minv, maxv) + elif t1 == 2: + t_name = 'Variable' + flags = {} # No flags defined for Variable PDO in PD 3.0 spec + minv = ((pdo >> 10) & 0x3ff) * 0.05 + maxv = ((pdo >> 20) & 0x3ff) * 0.05 + ma = ((pdo >> 0) & 0x3ff) * 0.01 + p = '%g/%gV %gA' % (minv, maxv, ma) + self.stored_pdos[idx] = '%s %g/%gV' % (t_name, minv, maxv) + elif t1 == 3: + t2 = (pdo >> 28) & 3 + if t2 == 0: + t_name = 'Programmable|PPS' + flags = { + (1 << 29): 'power_limited', + } + minv = ((pdo >> 8) & 0xff) * 0.1 + maxv = ((pdo >> 17) & 0xff) * 0.1 + ma = ((pdo >> 0) & 0xff) * 0.05 + p = '%g/%gV %gA' % (minv, maxv, ma) + if (pdo >> 27) & 0x1: + p += ' [limited]' + self.stored_pdos[idx] = '%s %g/%gV' % (t_name, minv, maxv) + else: + t_name = 'Reserved APDO: '+bin(t2) + p = '[raw: %s]' % (bin(pdo)) + self.stored_pdos[idx] = '%s %s' % (t_name, p) + t_flags = '' + for f in sorted(flags.keys(), reverse = True): if pdo & f: - flags += ' ' + PDO_FLAGS[f] - return '%s%s%s' % (PDO_TYPE[t], p, flags) + t_flags += ' [' + flags[f] + ']' + return '[%s] %s%s' % (t_name, p, t_flags) def get_vdm(self, idx, data): - if idx == 0: # VDM header + if idx == 0: # VDM header vid = data >> 16 struct = data & (1 << 15) txt = 'VDM' - if struct: # Structured VDM + if struct: # Structured VDM cmd = data & 0x1f src = data & (1 << 5) ack = (data >> 6) & 3 @@ -296,10 +359,10 @@ class Decoder(srd.Decoder): txt = VDM_ACK[ack] + ' ' txt += VDM_CMDS[cmd] if cmd in VDM_CMDS else 'cmd?' txt += ' pos %d' % (pos) if pos else ' ' - else: # Unstructured VDM + else: # Unstructured VDM txt = 'unstruct [%04x]' % (data & 0x7fff) txt += ' SVID:%04x' % (vid) - else: # VDM payload + else: # VDM payload txt = 'VDO:%08x' % (data) return txt @@ -309,22 +372,20 @@ class Decoder(srd.Decoder): mode_name = BIST_MODES[mode] if mode in BIST_MODES else 'INVALID' if mode == 2: mode_name = 'Counter[= %d]' % (counter) - # TODO check all 0 bits are 0 / emit warnings + # TODO: Check all 0 bits are 0 / emit warnings. return 'mode %s' % (mode_name) if idx == 0 else 'invalid BRO' def putpayload(self, s0, s1, idx): t = self.head_type() - txt = '???' + txt = '['+str(idx+1)+'] ' if t == 2: - txt = self.get_request(self.data[idx]) - elif t == 1: - txt = self.get_source_cap(self.data[idx]) - elif t == 4: - txt = self.get_sink_cap(self.data[idx]) + txt += self.get_request(self.data[idx]) + elif t == 1 or t == 4: + txt += self.get_source_sink_cap(self.data[idx], idx+1, t==1) elif t == 15: - txt = self.get_vdm(idx, self.data[idx]) + txt += self.get_vdm(idx, self.data[idx]) elif t == 3: - txt = self.get_bist(idx, self.data[idx]) + txt += self.get_bist(idx, self.data[idx]) self.putx(s0, s1, [11, [txt, txt]]) self.text += ' - ' + txt @@ -339,7 +400,7 @@ class Decoder(srd.Decoder): else: shortm = DATA_TYPES[t] if t in DATA_TYPES else 'DAT???' - longm = '{:s}[{:d}]:{:s}'.format(role, self.head_id(), shortm) + longm = '(r{:d}) {:s}[{:d}]: {:s}'.format(self.head_rev(), role, self.head_id(), shortm) self.putx(0, -1, [ann_type, [longm, shortm]]) self.text += longm @@ -385,13 +446,13 @@ class Decoder(srd.Decoder): def get_short(self): i = self.idx - # Check it's not a truncated packet + # Check it's not a truncated packet. if len(self.bits) - i <= 20: self.putwarn('Truncated', '!') return 0x0BAD k = [self.get_sym(i), self.get_sym(i+5), self.get_sym(i+10), self.get_sym(i+15)] - # TODO check bad symbols + # TODO: Check bad symbols. val = k[0] | (k[1] << 4) | (k[2] << 8) | (k[3] << 12) self.idx += 20 return val @@ -404,7 +465,7 @@ class Decoder(srd.Decoder): def find_corrupted_sop(self, k): # Start of packet are valid even if they have only 3 correct symbols # out of 4. - for seq in START_OF_PACKETS.keys(): + for seq in SOP_SEQUENCES: if [k[i] == seq[i] for i in range(len(k))].count(True) >= 3: return START_OF_PACKETS[seq] return None @@ -413,33 +474,36 @@ class Decoder(srd.Decoder): for i in range(len(self.bits) - 19): k = (self.get_sym(i, rec=False), self.get_sym(i+5, rec=False), self.get_sym(i+10, rec=False), self.get_sym(i+15, rec=False)) - sym = START_OF_PACKETS[k] if k in START_OF_PACKETS else None + sym = START_OF_PACKETS.get(k, None) if not sym: sym = self.find_corrupted_sop(k) - # We have an interesting symbol sequence + # We have an interesting symbol sequence. if sym: - # annotate the preamble + # Annotate the preamble. self.putx(0, i, [1, ['Preamble', '...']]) - # annotate each symbol + # Annotate each symbol. self.rec_sym(i, k[0]) self.rec_sym(i+5, k[1]) self.rec_sym(i+10, k[2]) self.rec_sym(i+15, k[3]) if sym == 'Hard Reset': self.text += 'HRST' - return -1 # Hard reset + return -1 # Hard reset elif sym == 'Cable Reset': self.text += 'CRST' - return -1 # Cable reset + return -1 # Cable reset else: self.putx(i, i+20, [2, [sym, 'S']]) return i+20 self.putx(0, len(self.bits), [1, ['Junk???', 'XXX']]) self.text += 'Junk???' self.putwarn('No start of packet found', 'XXX') - return -1 # No Start Of Packet + return -1 # No Start Of Packet def __init__(self): + self.reset() + + def reset(self): self.samplerate = None self.idx = 0 self.packet_seq = 0 @@ -450,17 +514,18 @@ class Decoder(srd.Decoder): self.bad = [] self.half_one = False self.start_one = 0 + self.stored_pdos = {} + self.cap_mark = [0, 0, 0, 0, 0, 0, 0, 0] def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value - # 0 is 2 UI, space larger than 1.5x 0 is definitely wrong + # 0 is 2 UI, space larger than 1.5x 0 is definitely wrong. self.maxbit = self.us2samples(3 * UI_US) - # duration threshold between half 1 and 0 + # Duration threshold between half 1 and 0. self.threshold = self.us2samples(THRESHOLD_US) def start(self): - self.out_python = self.register(srd.OUTPUT_PYTHON) self.out_ann = self.register(srd.OUTPUT_ANN) self.out_binary = self.register(srd.OUTPUT_BINARY) self.out_bitrate = self.register( @@ -477,7 +542,7 @@ class Decoder(srd.Decoder): self.text = '' if len(self.edges) < 50: - return # Not a real PD packet + return # Not a real PD packet self.packet_seq += 1 tstamp = float(self.startsample) / self.samplerate @@ -485,9 +550,9 @@ class Decoder(srd.Decoder): self.idx = self.scan_eop() if self.idx < 0: - # Full text trace of the issue + # Full text trace of the issue. self.putx(0, self.idx, [12, [self.text, '...']]) - return # No real packet: ABORT + return # No real packet: ABORT. # Packet header self.head = self.get_short() @@ -529,9 +594,9 @@ class Decoder(srd.Decoder): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') while True: - self.wait({0: 'e'}) + pins = self.wait([{0: 'e'}, {1: 'e'}, {'skip': int(self.samplerate/1e3)}]) - # First sample of the packet, just record the start date + # First sample of the packet, just record the start date. if not self.startsample: self.startsample = self.samplenum self.previous = self.samplenum @@ -539,20 +604,20 @@ class Decoder(srd.Decoder): diff = self.samplenum - self.previous - # Large idle: use it as the end of packet + # Large idle: use it as the end of packet. if diff > self.maxbit: - # the last edge of the packet + # The last edge of the packet. self.edges.append(self.previous) - # Export the packet + # Export the packet. self.decode_packet() - # Reset for next packet + # Reset for next packet. self.startsample = self.samplenum self.bits = [] self.edges = [] self.bad = [] self.half_one = False self.start_one = 0 - else: # add the bit to the packet + else: # Add the bit to the packet. is_zero = diff > self.threshold if is_zero and not self.half_one: self.bits.append(0) @@ -564,9 +629,9 @@ class Decoder(srd.Decoder): elif not is_zero and not self.half_one: self.half_one = True self.start_one = self.previous - else: # Invalid BMC sequence + else: # Invalid BMC sequence self.bad.append((self.start_one, self.previous)) - # TODO try to recover + # TODO: Try to recover. self.bits.append(0) self.edges.append(self.previous) self.half_one = False