word <worditemcount> is 7, and so on.
'''
-def channel_list(num_channels):
- l = [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}]
- for i in range(num_channels):
- d = {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i}
- l.append(d)
- return tuple(l)
+NUM_CHANNELS = 16
+
+class Pin:
+ CLOCK = 0
+ DATA_0 = CLOCK + 1
+ DATA_N = DATA_0 + NUM_CHANNELS
+ # BEWARE! DATA_N points _beyond_ the data partition (Python range(3)
+ # semantics, useful to have to simplify other code locations).
+ RESET = DATA_N
+
+class Ann:
+ ITEM, WORD, WARN = range(3)
class ChannelError(Exception):
pass
-NUM_CHANNELS = 8
-
class Decoder(srd.Decoder):
api_version = 3
id = 'parallel'
license = 'gplv2+'
inputs = ['logic']
outputs = ['parallel']
- tags = ['Logic', 'Bus']
- optional_channels = channel_list(NUM_CHANNELS)
+ tags = ['Util']
+ optional_channels = tuple(
+ [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}] +
+ [
+ {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i}
+ for i in range(NUM_CHANNELS)
+ ] +
+ [{'id': 'rst', 'name': 'RST', 'desc': 'RESET line'}]
+ )
options = (
{'id': 'clock_edge', 'desc': 'Clock edge to sample on',
- 'default': 'rising', 'values': ('rising', 'falling')},
+ 'default': 'rising', 'values': ('rising', 'falling', 'either')},
+ {'id': 'reset_polarity', 'desc': 'Reset line polarity',
+ 'default': 'low-active', 'values': ('low-active', 'high-active')},
{'id': 'wordsize', 'desc': 'Data wordsize (# bus cycles)',
'default': 0},
{'id': 'endianness', 'desc': 'Data endianness',
'default': 'little', 'values': ('little', 'big')},
)
annotations = (
- ('items', 'Items'),
- ('words', 'Words'),
+ ('item', 'Item'),
+ ('word', 'Word'),
+ ('warning', 'Warning'),
)
annotation_rows = (
- ('items', 'Items', (0,)),
- ('words', 'Words', (1,)),
+ ('items', 'Items', (Ann.ITEM,)),
+ ('words', 'Words', (Ann.WORD,)),
+ ('warnings', 'Warnings', (Ann.WARN,)),
)
def __init__(self):
self.reset()
def reset(self):
- self.items = []
- self.saved_item = None
- self.ss_item = self.es_item = None
- self.saved_word = None
- self.ss_word = self.es_word = None
- self.first = True
+ self.pend_item = None
+ self.word_items = []
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
self.out_ann = self.register(srd.OUTPUT_ANN)
- def putpb(self, data):
- self.put(self.ss_item, self.es_item, self.out_python, data)
-
- def putb(self, data):
- self.put(self.ss_item, self.es_item, self.out_ann, data)
-
- def putpw(self, data):
- self.put(self.ss_word, self.es_word, self.out_python, data)
-
- def putw(self, data):
- self.put(self.ss_word, self.es_word, self.out_ann, data)
-
- def handle_bits(self, item, used_pins):
-
- # If a word was previously accumulated, then emit its annotation
- # now after its end samplenumber became available.
- if self.saved_word is not None:
- if self.options['wordsize'] > 0:
- self.es_word = self.samplenum
- self.putw([1, [self.fmt_word.format(self.saved_word)]])
- self.putpw(['WORD', self.saved_word])
- self.saved_word = None
-
- # Defer annotations for individual items until the next sample
- # is taken, and the previous sample's end samplenumber has
- # become available.
- if self.first:
- # Save the start sample and item for later (no output yet).
- self.ss_item = self.samplenum
- self.first = False
- self.saved_item = item
- else:
- # Output the saved item (from the last CLK edge to the current).
- self.es_item = self.samplenum
- self.putpb(['ITEM', self.saved_item])
- self.putb([0, [self.fmt_item.format(self.saved_item)]])
- self.ss_item = self.samplenum
- self.saved_item = item
-
- # Get as many items as the configured wordsize specifies.
- if not self.items:
- self.ss_word = self.samplenum
- self.items.append(item)
- ws = self.options['wordsize']
- if len(self.items) < ws:
+ def putg(self, ss, es, ann, txts):
+ self.put(ss, es, self.out_ann, [ann, txts])
+
+ def putpy(self, ss, es, ann, data):
+ self.put(ss, es, self.out_python, [ann, data])
+
+ def flush_word(self, bus_width):
+ if not self.word_items:
+ return
+ word_size = self.options['wordsize']
+
+ items = self.word_items
+ ss, es = items[0][0], items[-1][1]
+ items = [i[2] for i in items]
+ if self.options['endianness'] == 'big':
+ items.reverse()
+ word = sum([d << (i * bus_width) for i, d in enumerate(items)])
+
+ txts = [self.fmt_word.format(word)]
+ self.putg(ss, es, Ann.WORD, txts)
+ self.putpy(ss, es, 'WORD', (word, bus_width, word_size))
+
+ if len(items) != word_size:
+ txts = ['incomplete word size', 'word size', 'ws']
+ self.putg(ss, es, Ann.WARN, txts)
+
+ self.word_items.clear()
+
+ def queue_word(self, now, item, bus_width):
+ wordsize = self.options['wordsize']
+ if not wordsize:
return
- # Collect words and prepare annotation details, but defer emission
- # until the end samplenumber becomes available.
- endian = self.options['endianness']
- if endian == 'big':
- self.items.reverse()
- word = sum([self.items[i] << (i * used_pins) for i in range(ws)])
- self.saved_word = word
- self.items = []
+ # Terminate a previously seen item of a word first. Emit the
+ # word's annotation when the last item's end was seen.
+ if self.word_items:
+ ss, _, data = self.word_items[-1]
+ es = now
+ self.word_items[-1] = (ss, es, data)
+ if len(self.word_items) == wordsize:
+ self.flush_word(bus_width)
+
+ # Start tracking the currently seen item (yet unknown end time).
+ if item is not None:
+ pend = (now, None, item)
+ self.word_items.append(pend)
+
+ def handle_bits(self, now, item, bus_width):
+
+ # Optionally flush a previously started item.
+ if self.pend_item:
+ ss, _, data = self.pend_item
+ self.pend_item = None
+ es = now
+ txts = [self.fmt_item.format(data)]
+ self.putg(ss, es, Ann.ITEM, txts)
+ self.putpy(ss, es, 'ITEM', (data, bus_width))
+
+ # Optionally queue the currently seen item.
+ if item is not None:
+ self.pend_item = (now, None, item)
+
+ # Pass the current item to the word accumulation logic.
+ self.queue_word(now, item, bus_width)
def decode(self):
# Determine which (optional) channels have input data. Insist in
# a non-empty input data set. Cope with sparse connection maps.
# Store enough state to later "compress" sampled input data.
- max_possible = len(self.optional_channels)
- idx_channels = [
+ data_indices = [
idx if self.has_channel(idx) else None
- for idx in range(max_possible)
+ for idx in range(Pin.DATA_0, Pin.DATA_N)
]
- has_channels = [idx for idx in idx_channels if idx is not None]
- if not has_channels:
- raise ChannelError('At least one channel has to be supplied.')
- max_connected = max(has_channels)
-
- # Determine .wait() conditions, depending on the presence of a
- # clock signal. Either inspect samples on the configured edge of
- # the clock, or inspect samples upon ANY edge of ANY of the pins
- # which provide input data.
- if self.has_channel(0):
- edge = self.options['clock_edge'][0]
- conds = {0: edge}
- else:
- conds = [{idx: 'e'} for idx in has_channels]
+ has_data = [idx for idx in data_indices if idx is not None]
+ if not has_data:
+ raise ChannelError('Need at least one data channel.')
+ max_connected = max(has_data)
# Pre-determine which input data to strip off, the width of
# individual items and multiplexed words, as well as format
# strings here. This simplifies call sites which run in tight
# loops later.
- idx_strip = max_connected + 1
- num_item_bits = idx_strip - 1
+ upper_data_bound = max_connected + 1
+ num_item_bits = upper_data_bound - Pin.DATA_0
num_word_items = self.options['wordsize']
num_word_bits = num_item_bits * num_word_items
- num_digits = (num_item_bits + 3) // 4
+ num_digits = (num_item_bits + 4 - 1) // 4
self.fmt_item = "{{:0{}x}}".format(num_digits)
- num_digits = (num_word_bits + 3) // 4
+ num_digits = (num_word_bits + 4 - 1) // 4
self.fmt_word = "{{:0{}x}}".format(num_digits)
+ # Determine .wait() conditions, depending on the presence of a
+ # clock signal. Either inspect samples on the configured edge of
+ # the clock, or inspect samples upon ANY edge of ANY of the pins
+ # which provide input data.
+ conds = []
+ cond_idx_clock = None
+ cond_idx_data_0 = None
+ cond_idx_data_N = None
+ cond_idx_reset = None
+ has_clock = self.has_channel(Pin.CLOCK)
+ if has_clock:
+ cond_idx_clock = len(conds)
+ edge = {
+ 'rising': 'r',
+ 'falling': 'f',
+ 'either': 'e',
+ }.get(self.options['clock_edge'])
+ conds.append({Pin.CLOCK: edge})
+ else:
+ cond_idx_data_0 = len(conds)
+ conds.extend([{idx: 'e'} for idx in has_data])
+ cond_idx_data_N = len(conds)
+ has_reset = self.has_channel(Pin.RESET)
+ if has_reset:
+ cond_idx_reset = len(conds)
+ conds.append({Pin.RESET: 'e'})
+ reset_active = {
+ 'low-active': 0,
+ 'high-active': 1,
+ }.get(self.options['reset_polarity'])
+
# Keep processing the input stream. Assume "always zero" for
# not-connected input lines. Pass data bits (all inputs except
- # clock) to the handle_bits() method.
+ # clock and reset) to the handle_bits() method. Handle reset
+ # edges first and data changes then, within the same iteration.
+ # This results in robust operation for low-oversampled input.
+ in_reset = False
while True:
pins = self.wait(conds)
- bits = [0 if idx is None else pins[idx] for idx in idx_channels]
- item = bitpack(bits[1:idx_strip])
- self.handle_bits(item, num_item_bits)
+ clock_edge = cond_idx_clock is not None and self.matched[cond_idx_clock]
+ data_edge = cond_idx_data_0 is not None and [idx for idx in range(cond_idx_data_0, cond_idx_data_N) if self.matched[idx]]
+ reset_edge = cond_idx_reset is not None and self.matched[cond_idx_reset]
+
+ if reset_edge:
+ in_reset = pins[Pin.RESET] == reset_active
+ if in_reset:
+ self.handle_bits(self.samplenum, None, num_item_bits)
+ self.flush_word(num_item_bits)
+ if in_reset:
+ continue
+
+ if clock_edge or data_edge:
+ data_bits = [0 if idx is None else pins[idx] for idx in data_indices]
+ data_bits = data_bits[:num_item_bits]
+ item = bitpack(data_bits)
+ self.handle_bits(self.samplenum, item, num_item_bits)