From: Gerhard Sittig Date: Sat, 7 Nov 2020 12:46:56 +0000 (+0100) Subject: parallel: rephrase handling of data lines, symbolic upper bound X-Git-Url: http://sigrok.org/gitweb/?p=libsigrokdecode.git;a=commitdiff_plain;h=615f86f6bf7f7166f2975c9efe7e727bf2b65734 parallel: rephrase handling of data lines, symbolic upper bound Strictly speaking this decoder considers all input signals as optional. The previous version accepted clock alone. Though reading values from zero data bits is of limited. Tighten the check for connected inputs. Inline the declaration of all channels in the decoder boiler plate, the helper routine was only used in a single spot. Change the order of the data lines stripe details and the .wait() conditions, improve locality of assignment and use of related variables. Don't assume that "all channels but clock" are data lines. Use a symbolic upper bound for the data lines partition, to prepare the introduction of a reset/enable signal. --- diff --git a/decoders/parallel/pd.py b/decoders/parallel/pd.py index dd356b4..10255f3 100644 --- a/decoders/parallel/pd.py +++ b/decoders/parallel/pd.py @@ -58,18 +58,12 @@ NUM_CHANNELS = 8 class Pin: CLOCK = 0 - DATA = range(1, NUM_CHANNELS + 1) + DATA_0 = CLOCK + 1 + DATA_N = DATA_0 + NUM_CHANNELS class Ann: ITEM, WORD = range(2) -def channel_list(): - l = [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}] - for i in range(NUM_CHANNELS): - d = {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i} - l.append(d) - return tuple(l) - class ChannelError(Exception): pass @@ -83,7 +77,13 @@ class Decoder(srd.Decoder): inputs = ['logic'] outputs = ['parallel'] tags = ['Util'] - optional_channels = channel_list() + optional_channels = tuple( + [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}] + + [ + {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i} + for i in range(NUM_CHANNELS) + ] + ) options = ( {'id': 'clock_edge', 'desc': 'Clock edge to sample on', 'default': 'rising', 'values': ('rising', 'falling')}, @@ -176,15 +176,27 @@ class Decoder(srd.Decoder): # Determine which (optional) channels have input data. Insist in # a non-empty input data set. Cope with sparse connection maps. # Store enough state to later "compress" sampled input data. - max_possible = len(self.optional_channels) - idx_channels = [ + data_indices = [ idx if self.has_channel(idx) else None - for idx in range(max_possible) + for idx in range(Pin.DATA_0, Pin.DATA_N) ] - has_channels = [idx for idx in idx_channels if idx is not None] - if not has_channels: - raise ChannelError('At least one channel has to be supplied.') - max_connected = max(has_channels) + has_data = [idx for idx in data_indices if idx is not None] + if not has_data: + raise ChannelError('Need at least one data channel.') + max_connected = max(has_data) + + # Pre-determine which input data to strip off, the width of + # individual items and multiplexed words, as well as format + # strings here. This simplifies call sites which run in tight + # loops later. + upper_data_bound = max_connected + 1 + num_item_bits = upper_data_bound - Pin.DATA_0 + num_word_items = self.options['wordsize'] + num_word_bits = num_item_bits * num_word_items + num_digits = (num_item_bits + 4 - 1) // 4 + self.fmt_item = "{{:0{}x}}".format(num_digits) + num_digits = (num_word_bits + 4 - 1) // 4 + self.fmt_word = "{{:0{}x}}".format(num_digits) # Determine .wait() conditions, depending on the presence of a # clock signal. Either inspect samples on the configured edge of @@ -195,26 +207,14 @@ class Decoder(srd.Decoder): edge = self.options['clock_edge'][0] conds = [{Pin.CLOCK: edge}] else: - conds = [{idx: 'e'} for idx in has_channels] - - # Pre-determine which input data to strip off, the width of - # individual items and multiplexed words, as well as format - # strings here. This simplifies call sites which run in tight - # loops later. - idx_strip = max_connected + 1 - num_item_bits = idx_strip - 1 - num_word_items = self.options['wordsize'] - num_word_bits = num_item_bits * num_word_items - num_digits = (num_item_bits + 3) // 4 - self.fmt_item = "{{:0{}x}}".format(num_digits) - num_digits = (num_word_bits + 3) // 4 - self.fmt_word = "{{:0{}x}}".format(num_digits) + conds = [{idx: 'e'} for idx in has_data] # Keep processing the input stream. Assume "always zero" for # not-connected input lines. Pass data bits (all inputs except # clock) to the handle_bits() method. while True: pins = self.wait(conds) - bits = [0 if idx is None else pins[idx] for idx in idx_channels] - item = bitpack(bits[Pin.DATA[0]:idx_strip]) + data_bits = [0 if idx is None else pins[idx] for idx in data_indices] + data_bits = data_bits[:num_item_bits] + item = bitpack(data_bits) self.handle_bits(item, num_item_bits)