word <worditemcount> is 7, and so on.
'''
-def channel_list(num_channels):
- l = [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}]
- for i in range(num_channels):
- d = {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i}
- l.append(d)
- return tuple(l)
+NUM_CHANNELS = 16
+
+class Pin:
+ CLOCK = 0
+ DATA_0 = CLOCK + 1
+ DATA_N = DATA_0 + NUM_CHANNELS
+ # BEWARE! DATA_N points _beyond_ the data partition (Python range(3)
+ # semantics, useful to have to simplify other code locations).
+ RESET = DATA_N
+
+class Ann:
+ ITEM, WORD, WARN = range(3)
class ChannelError(Exception):
pass
-NUM_CHANNELS = 8
-
class Decoder(srd.Decoder):
api_version = 3
id = 'parallel'
license = 'gplv2+'
inputs = ['logic']
outputs = ['parallel']
- optional_channels = channel_list(NUM_CHANNELS)
+ tags = ['Util']
+ optional_channels = tuple(
+ [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}] +
+ [
+ {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i}
+ for i in range(NUM_CHANNELS)
+ ] +
+ [{'id': 'rst', 'name': 'RST', 'desc': 'RESET line'}]
+ )
options = (
{'id': 'clock_edge', 'desc': 'Clock edge to sample on',
- 'default': 'rising', 'values': ('rising', 'falling')},
- {'id': 'wordsize', 'desc': 'Data wordsize', 'default': 1},
+ 'default': 'rising', 'values': ('rising', 'falling', 'either')},
+ {'id': 'reset_polarity', 'desc': 'Reset line polarity',
+ 'default': 'low-active', 'values': ('low-active', 'high-active')},
+ {'id': 'wordsize', 'desc': 'Data wordsize (# bus cycles)',
+ 'default': 0},
{'id': 'endianness', 'desc': 'Data endianness',
'default': 'little', 'values': ('little', 'big')},
)
annotations = (
- ('items', 'Items'),
- ('words', 'Words'),
+ ('item', 'Item'),
+ ('word', 'Word'),
+ ('warning', 'Warning'),
+ )
+ annotation_rows = (
+ ('items', 'Items', (Ann.ITEM,)),
+ ('words', 'Words', (Ann.WORD,)),
+ ('warnings', 'Warnings', (Ann.WARN,)),
+ )
+ binary = (
+ ('binary', 'Binary'),
)
def __init__(self):
self.reset()
def reset(self):
- self.items = []
- self.saved_item = None
- self.ss_item = self.es_item = None
- self.first = True
+ self.pend_item = None
+ self.word_items = []
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
+ self.out_binary = self.register(srd.OUTPUT_BINARY)
self.out_ann = self.register(srd.OUTPUT_ANN)
- def putpb(self, data):
- self.put(self.ss_item, self.es_item, self.out_python, data)
+ def putg(self, ss, es, ann, txts):
+ self.put(ss, es, self.out_ann, [ann, txts])
+
+ def putpy(self, ss, es, ann, data):
+ self.put(ss, es, self.out_python, [ann, data])
- def putb(self, data):
- self.put(self.ss_item, self.es_item, self.out_ann, data)
+ def putbin(self, ss, es, ann_class, data):
+ self.put(ss, es, self.out_binary, [ann_class, data])
- def putpw(self, data):
- self.put(self.ss_word, self.es_word, self.out_python, data)
+ def flush_word(self, bus_width):
+ if not self.word_items:
+ return
+ word_size = self.options['wordsize']
- def putw(self, data):
- self.put(self.ss_word, self.es_word, self.out_ann, data)
+ items = self.word_items
+ ss, es = items[0][0], items[-1][1]
+ items = [i[2] for i in items]
+ if self.options['endianness'] == 'big':
+ items.reverse()
+ word = sum([d << (i * bus_width) for i, d in enumerate(items)])
- def handle_bits(self, item, used_pins):
- # Save the item, and its sample number if it's the first part of a word.
- if not self.items:
- self.ss_word = self.samplenum
- self.items.append(item)
+ txts = [self.fmt_word.format(word)]
+ self.putg(ss, es, Ann.WORD, txts)
+ self.putpy(ss, es, 'WORD', (word, bus_width, word_size))
- if self.first:
- # Save the start sample and item for later (no output yet).
- self.ss_item = self.samplenum
- self.first = False
- self.saved_item = item
- else:
- # Output the saved item (from the last CLK edge to the current).
- self.es_item = self.samplenum
- self.putpb(['ITEM', self.saved_item])
- self.putb([0, ['%X' % self.saved_item]])
- self.ss_item = self.samplenum
- self.saved_item = item
-
- # Get as many items as the configured wordsize says.
- ws = self.options['wordsize']
- if len(self.items) < ws:
+ if len(items) != word_size:
+ txts = ['incomplete word size', 'word size', 'ws']
+ self.putg(ss, es, Ann.WARN, txts)
+
+ self.word_items.clear()
+
+ def queue_word(self, now, item, bus_width):
+ wordsize = self.options['wordsize']
+ if not wordsize:
return
- # Output annotations/python for a word (a collection of items).
- # NOTE that this feature is currently not effective. The emission
- # of Python annotations is commented out.
- endian = self.options['endianness']
- if endian == 'little':
- self.items.reverse()
- word = 0
- for i in range(ws):
- word |= self.items[i] << (i * used_pins)
+ # Terminate a previously seen item of a word first. Emit the
+ # word's annotation when the last item's end was seen.
+ if self.word_items:
+ ss, _, data = self.word_items[-1]
+ es = now
+ self.word_items[-1] = (ss, es, data)
+ if len(self.word_items) == wordsize:
+ self.flush_word(bus_width)
+
+ # Start tracking the currently seen item (yet unknown end time).
+ if item is not None:
+ pend = (now, None, item)
+ self.word_items.append(pend)
+
+ def handle_bits(self, now, item, bus_width):
+
+ # Optionally flush a previously started item.
+ if self.pend_item:
+ ss, _, data = self.pend_item
+ self.pend_item = None
+ es = now
+ txts = [self.fmt_item.format(data)]
+ self.putg(ss, es, Ann.ITEM, txts)
+ self.putpy(ss, es, 'ITEM', (data, bus_width))
+ self.putbin(ss, es, 0, data.to_bytes(1, byteorder='big'))
- self.es_word = self.samplenum
- # self.putpw(['WORD', word])
- # self.putw([1, ['%X' % word]])
- self.ss_word = self.samplenum
+ # Optionally queue the currently seen item.
+ if item is not None:
+ self.pend_item = (now, None, item)
- self.items = []
+ # Pass the current item to the word accumulation logic.
+ self.queue_word(now, item, bus_width)
def decode(self):
# Determine which (optional) channels have input data. Insist in
# a non-empty input data set. Cope with sparse connection maps.
# Store enough state to later "compress" sampled input data.
- max_possible = len(self.optional_channels)
- idx_channels = [
+ data_indices = [
idx if self.has_channel(idx) else None
- for idx in range(max_possible)
+ for idx in range(Pin.DATA_0, Pin.DATA_N)
]
- has_channels = [idx for idx in idx_channels if idx is not None]
- if not has_channels:
- raise ChannelError('At least one channel has to be supplied.')
- max_connected = max(has_channels)
- idx_strip = max_connected + 1
+ has_data = [idx for idx in data_indices if idx is not None]
+ if not has_data:
+ raise ChannelError('Need at least one data channel.')
+ max_connected = max(has_data)
+
+ # Pre-determine which input data to strip off, the width of
+ # individual items and multiplexed words, as well as format
+ # strings here. This simplifies call sites which run in tight
+ # loops later.
+ upper_data_bound = max_connected + 1
+ num_item_bits = upper_data_bound - Pin.DATA_0
+ num_word_items = self.options['wordsize']
+ num_word_bits = num_item_bits * num_word_items
+ num_digits = (num_item_bits + 4 - 1) // 4
+ self.fmt_item = "{{:0{}x}}".format(num_digits)
+ num_digits = (num_word_bits + 4 - 1) // 4
+ self.fmt_word = "{{:0{}x}}".format(num_digits)
# Determine .wait() conditions, depending on the presence of a
# clock signal. Either inspect samples on the configured edge of
# the clock, or inspect samples upon ANY edge of ANY of the pins
# which provide input data.
- if self.has_channel(0):
- edge = self.options['clock_edge'][0]
- conds = {0: edge}
+ conds = []
+ cond_idx_clock = None
+ cond_idx_data_0 = None
+ cond_idx_data_N = None
+ cond_idx_reset = None
+ has_clock = self.has_channel(Pin.CLOCK)
+ if has_clock:
+ cond_idx_clock = len(conds)
+ edge = {
+ 'rising': 'r',
+ 'falling': 'f',
+ 'either': 'e',
+ }.get(self.options['clock_edge'])
+ conds.append({Pin.CLOCK: edge})
else:
- conds = [{idx: 'e'} for idx in has_channels]
+ cond_idx_data_0 = len(conds)
+ conds.extend([{idx: 'e'} for idx in has_data])
+ cond_idx_data_N = len(conds)
+ has_reset = self.has_channel(Pin.RESET)
+ if has_reset:
+ cond_idx_reset = len(conds)
+ conds.append({Pin.RESET: 'e'})
+ reset_active = {
+ 'low-active': 0,
+ 'high-active': 1,
+ }.get(self.options['reset_polarity'])
# Keep processing the input stream. Assume "always zero" for
# not-connected input lines. Pass data bits (all inputs except
- # clock) to the handle_bits() method.
+ # clock and reset) to the handle_bits() method. Handle reset
+ # edges first and data changes then, within the same iteration.
+ # This results in robust operation for low-oversampled input.
+ in_reset = False
while True:
- pins = self.wait(conds)
- bits = [0 if idx is None else pins[idx] for idx in idx_channels]
- bits = bits[1:idx_strip]
- self.handle_bits(bitpack(bits), len(bits))
+ try:
+ pins = self.wait(conds)
+ except EOFError as e:
+ break
+ clock_edge = cond_idx_clock is not None and self.matched[cond_idx_clock]
+ data_edge = cond_idx_data_0 is not None and [idx for idx in range(cond_idx_data_0, cond_idx_data_N) if self.matched[idx]]
+ reset_edge = cond_idx_reset is not None and self.matched[cond_idx_reset]
+
+ if reset_edge:
+ in_reset = pins[Pin.RESET] == reset_active
+ if in_reset:
+ self.handle_bits(self.samplenum, None, num_item_bits)
+ self.flush_word(num_item_bits)
+ if in_reset:
+ continue
+
+ if clock_edge or data_edge:
+ data_bits = [0 if idx is None else pins[idx] for idx in data_indices]
+ data_bits = data_bits[:num_item_bits]
+ item = bitpack(data_bits)
+ self.handle_bits(self.samplenum, item, num_item_bits)
+
+ self.handle_bits(self.samplenum, None, num_item_bits)
+ # TODO Determine whether a WARN annotation needs to get emitted.
+ # The decoder has not seen the end of the last accumulated item.
+ # Instead it just ran out of input data.