]> sigrok.org Git - libsigrokdecode.git/blobdiff - decoders/parallel/pd.py
avr_isp: Add more parts
[libsigrokdecode.git] / decoders / parallel / pd.py
index 9c203543f21a65593df2eea5b753dc1a1c32b6f4..1e3120858a7a4f750cd1ee3810a803c4439b7435 100644 (file)
@@ -1,7 +1,7 @@
 ##
 ## This file is part of the libsigrokdecode project.
 ##
-## Copyright (C) 2013 Uwe Hermann <uwe@hermann-uwe.de>
+## Copyright (C) 2013-2016 Uwe Hermann <uwe@hermann-uwe.de>
 ##
 ## This program is free software; you can redistribute it and/or modify
 ## it under the terms of the GNU General Public License as published by
 ## GNU General Public License for more details.
 ##
 ## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
 ##
 
-# Parallel (sync) bus protocol decoder
-
 import sigrokdecode as srd
+from common.srdhelper import bitpack
 
 '''
-Protocol output format:
+OUTPUT_PYTHON format:
 
 Packet:
 [<ptype>, <pdata>]
@@ -56,15 +54,24 @@ Packet:
    word <worditemcount> is 7, and so on.
 '''
 
-def probe_list(num_probes):
-    l = []
-    for i in range(num_probes):
-        d = {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i}
-        l.append(d)
-    return l
+NUM_CHANNELS = 16
+
+class Pin:
+    CLOCK = 0
+    DATA_0 = CLOCK + 1
+    DATA_N = DATA_0 + NUM_CHANNELS
+    # BEWARE! DATA_N points _beyond_ the data partition (Python range(3)
+    # semantics, useful to have to simplify other code locations).
+    RESET = DATA_N
+
+class Ann:
+    ITEM, WORD, WARN = range(3)
+
+class ChannelError(Exception):
+    pass
 
 class Decoder(srd.Decoder):
-    api_version = 1
+    api_version = 3
     id = 'parallel'
     name = 'Parallel'
     longname = 'Parallel sync bus'
@@ -72,125 +79,207 @@ class Decoder(srd.Decoder):
     license = 'gplv2+'
     inputs = ['logic']
     outputs = ['parallel']
-    probes = [
-        {'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'},
-    ]
-    optional_probes = probe_list(8)
-    options = {
-        'clock_edge': ['Clock edge to sample on', 'rising'],
-        'wordsize': ['Word size of the data', 1],
-        'endianness': ['Endianness of the data', 'little'],
-        'format': ['Data format', 'hex'],
-    }
-    annotations = [
-        ['items', 'Items'],
-        ['words', 'Words'],
-    ]
+    tags = ['Util']
+    optional_channels = tuple(
+        [{'id': 'clk', 'name': 'CLK', 'desc': 'Clock line'}] +
+        [
+            {'id': 'd%d' % i, 'name': 'D%d' % i, 'desc': 'Data line %d' % i}
+            for i in range(NUM_CHANNELS)
+        ] +
+        [{'id': 'rst', 'name': 'RST', 'desc': 'RESET line'}]
+    )
+    options = (
+        {'id': 'clock_edge', 'desc': 'Clock edge to sample on',
+            'default': 'rising', 'values': ('rising', 'falling', 'either')},
+        {'id': 'reset_polarity', 'desc': 'Reset line polarity',
+            'default': 'low-active', 'values': ('low-active', 'high-active')},
+        {'id': 'wordsize', 'desc': 'Data wordsize (# bus cycles)',
+            'default': 0},
+        {'id': 'endianness', 'desc': 'Data endianness',
+            'default': 'little', 'values': ('little', 'big')},
+    )
+    annotations = (
+        ('item', 'Item'),
+        ('word', 'Word'),
+        ('warning', 'Warning'),
+    )
+    annotation_rows = (
+        ('items', 'Items', (Ann.ITEM,)),
+        ('words', 'Words', (Ann.WORD,)),
+        ('warnings', 'Warnings', (Ann.WARN,)),
+    )
+    binary = (
+        ('binary', 'Binary'),
+    )
 
     def __init__(self):
-        self.oldclk = None
-        self.items = []
-        self.itemcount = 0
-        self.saved_item = None
-        self.samplenum = 0
-        self.oldpins = None
-        self.ss_item = self.es_item = None
-        self.first = True
-        self.state = 'IDLE'
-
-    def start(self, metadata):
-        self.out_proto = self.register(srd.OUTPUT_PYTHON)
-        self.out_ann = self.register(srd.OUTPUT_ANN)
-
-    def report(self):
-        pass
-
-    def putpb(self, data):
-        self.put(self.ss_item, self.es_item, self.out_proto, data)
-
-    def putb(self, data):
-        self.put(self.ss_item, self.es_item, self.out_ann, data)
-
-    def putpw(self, data):
-        self.put(self.ss_word, self.es_word, self.out_proto, data)
+        self.reset()
 
-    def putw(self, data):
-        self.put(self.ss_word, self.es_word, self.out_ann, data)
+    def reset(self):
+        self.pend_item = None
+        self.word_items = []
 
-    def handle_bits(self, datapins):
-        # If this is the first item in a word, save its sample number.
-        if self.itemcount == 0:
-            self.ss_word = self.samplenum
-
-        # Get the bits for this item.
-        item, used_pins = 0, datapins.count(b'\x01') + datapins.count(b'\x00')
-        for i in range(used_pins):
-            item |= datapins[i] << i
+    def start(self):
+        self.out_python = self.register(srd.OUTPUT_PYTHON)
+        self.out_binary = self.register(srd.OUTPUT_BINARY)
+        self.out_ann = self.register(srd.OUTPUT_ANN)
 
-        self.items.append(item)
-        self.itemcount += 1
+    def putg(self, ss, es, ann, txts):
+        self.put(ss, es, self.out_ann, [ann, txts])
 
-        if self.first == True:
-            # Save the start sample and item for later (no output yet).
-            self.ss_item = self.samplenum
-            self.first = False
-            self.saved_item = item
-        else:
-            # Output the saved item (from the last CLK edge to the current).
-            self.es_item = self.samplenum
-            self.putpb(['ITEM', self.saved_item])
-            self.putb([0, ['%X' % self.saved_item]])
-            self.ss_item = self.samplenum
-            self.saved_item = item
+    def putpy(self, ss, es, ann, data):
+        self.put(ss, es, self.out_python, [ann, data])
 
-        endian, ws = self.options['endianness'], self.options['wordsize']
+    def putbin(self, ss, es, ann_class, data):
+        self.put(ss, es, self.out_binary, [ann_class, data])
 
-        # Get as many items as the configured wordsize says.
-        if self.itemcount < ws:
+    def flush_word(self, bus_width):
+        if not self.word_items:
             return
+        word_size = self.options['wordsize']
 
-        # Output annotations/proto for a word (a collection of items).
-        word = 0
-        for i in range(ws):
-            if endian == 'little':
-                word |= self.items[i] << ((ws - 1 - i) * used_pins)
-            elif endian == 'big':
-                word |= self.items[i] << (i * used_pins)
+        items = self.word_items
+        ss, es = items[0][0], items[-1][1]
+        items = [i[2] for i in items]
+        if self.options['endianness'] == 'big':
+            items.reverse()
+        word = sum([d << (i * bus_width) for i, d in enumerate(items)])
 
-        self.es_word = self.samplenum
-        # self.putpw(['WORD', word])
-        # self.putw([1, ['%X' % word]])
-        self.ss_word = self.samplenum
+        txts = [self.fmt_word.format(word)]
+        self.putg(ss, es, Ann.WORD, txts)
+        self.putpy(ss, es, 'WORD', (word, bus_width, word_size))
 
-        self.itemcount, self.items = 0, []
+        if len(items) != word_size:
+            txts = ['incomplete word size', 'word size', 'ws']
+            self.putg(ss, es, Ann.WARN, txts)
 
-    def find_clk_edge(self, clk, datapins):
-        # Ignore sample if the clock pin hasn't changed.
-        if clk == self.oldclk:
-            return
-        self.oldclk = clk
+        self.word_items.clear()
 
-        # Sample data on rising/falling clock edge (depends on config).
-        c = self.options['clock_edge']
-        if c == 'rising' and clk == 0: # Sample on rising clock edge.
+    def queue_word(self, now, item, bus_width):
+        wordsize = self.options['wordsize']
+        if not wordsize:
             return
-        elif c == 'falling' and clk == 1: # Sample on falling clock edge.
-            return
-
-        # Found the correct clock edge, now get the bits.
-        self.handle_bits(datapins)
 
-    def decode(self, ss, es, data):
-        for (self.samplenum, pins) in data:
-
-            # Ignore identical samples early on (for performance reasons).
-            if self.oldpins == pins:
+        # Terminate a previously seen item of a word first. Emit the
+        # word's annotation when the last item's end was seen.
+        if self.word_items:
+            ss, _, data = self.word_items[-1]
+            es = now
+            self.word_items[-1] = (ss, es, data)
+            if len(self.word_items) == wordsize:
+                self.flush_word(bus_width)
+
+        # Start tracking the currently seen item (yet unknown end time).
+        if item is not None:
+            pend = (now, None, item)
+            self.word_items.append(pend)
+
+    def handle_bits(self, now, item, bus_width):
+
+        # Optionally flush a previously started item.
+        if self.pend_item:
+            ss, _, data = self.pend_item
+            self.pend_item = None
+            es = now
+            txts = [self.fmt_item.format(data)]
+            self.putg(ss, es, Ann.ITEM, txts)
+            self.putpy(ss, es, 'ITEM', (data, bus_width))
+            self.putbin(ss, es, 0, data.to_bytes(1, byteorder='big'))
+
+        # Optionally queue the currently seen item.
+        if item is not None:
+            self.pend_item = (now, None, item)
+
+        # Pass the current item to the word accumulation logic.
+        self.queue_word(now, item, bus_width)
+
+    def decode(self):
+        # Determine which (optional) channels have input data. Insist in
+        # a non-empty input data set. Cope with sparse connection maps.
+        # Store enough state to later "compress" sampled input data.
+        data_indices = [
+            idx if self.has_channel(idx) else None
+            for idx in range(Pin.DATA_0, Pin.DATA_N)
+        ]
+        has_data = [idx for idx in data_indices if idx is not None]
+        if not has_data:
+            raise ChannelError('Need at least one data channel.')
+        max_connected = max(has_data)
+
+        # Pre-determine which input data to strip off, the width of
+        # individual items and multiplexed words, as well as format
+        # strings here. This simplifies call sites which run in tight
+        # loops later.
+        upper_data_bound = max_connected + 1
+        num_item_bits = upper_data_bound - Pin.DATA_0
+        num_word_items = self.options['wordsize']
+        num_word_bits = num_item_bits * num_word_items
+        num_digits = (num_item_bits + 4 - 1) // 4
+        self.fmt_item = "{{:0{}x}}".format(num_digits)
+        num_digits = (num_word_bits + 4 - 1) // 4
+        self.fmt_word = "{{:0{}x}}".format(num_digits)
+
+        # Determine .wait() conditions, depending on the presence of a
+        # clock signal. Either inspect samples on the configured edge of
+        # the clock, or inspect samples upon ANY edge of ANY of the pins
+        # which provide input data.
+        conds = []
+        cond_idx_clock = None
+        cond_idx_data_0 = None
+        cond_idx_data_N = None
+        cond_idx_reset = None
+        has_clock = self.has_channel(Pin.CLOCK)
+        if has_clock:
+            cond_idx_clock = len(conds)
+            edge = {
+                'rising': 'r',
+                'falling': 'f',
+                'either': 'e',
+            }.get(self.options['clock_edge'])
+            conds.append({Pin.CLOCK: edge})
+        else:
+            cond_idx_data_0 = len(conds)
+            conds.extend([{idx: 'e'} for idx in has_data])
+            cond_idx_data_N = len(conds)
+        has_reset = self.has_channel(Pin.RESET)
+        if has_reset:
+            cond_idx_reset = len(conds)
+            conds.append({Pin.RESET: 'e'})
+            reset_active = {
+                'low-active': 0,
+                'high-active': 1,
+            }.get(self.options['reset_polarity'])
+
+        # Keep processing the input stream. Assume "always zero" for
+        # not-connected input lines. Pass data bits (all inputs except
+        # clock and reset) to the handle_bits() method. Handle reset
+        # edges first and data changes then, within the same iteration.
+        # This results in robust operation for low-oversampled input.
+        in_reset = False
+        while True:
+            try:
+                pins = self.wait(conds)
+            except EOFError as e:
+                break
+            clock_edge = cond_idx_clock is not None and self.matched[cond_idx_clock]
+            data_edge = cond_idx_data_0 is not None and [idx for idx in range(cond_idx_data_0, cond_idx_data_N) if self.matched[idx]]
+            reset_edge = cond_idx_reset is not None and self.matched[cond_idx_reset]
+
+            if reset_edge:
+                in_reset = pins[Pin.RESET] == reset_active
+                if in_reset:
+                    self.handle_bits(self.samplenum, None, num_item_bits)
+                    self.flush_word(num_item_bits)
+            if in_reset:
                 continue
-            self.oldpins = pins
 
-            # State machine.
-            if self.state == 'IDLE':
-                self.find_clk_edge(pins[0], pins[1:])
-            else:
-                raise Exception('Invalid state: %s' % self.state)
+            if clock_edge or data_edge:
+                data_bits = [0 if idx is None else pins[idx] for idx in data_indices]
+                data_bits = data_bits[:num_item_bits]
+                item = bitpack(data_bits)
+                self.handle_bits(self.samplenum, item, num_item_bits)
 
+        self.handle_bits(self.samplenum, None, num_item_bits)
+        # TODO Determine whether a WARN annotation needs to get emitted.
+        # The decoder has not seen the end of the last accumulated item.
+        # Instead it just ran out of input data.