]> sigrok.org Git - libsigrokdecode.git/blobdiff - decoders/usb_signalling/pd.py
decoders: Don't set self.samplenum.
[libsigrokdecode.git] / decoders / usb_signalling / pd.py
index f514c480e32079f0f3e648d92ae06dfb497a48a4..626dcd0a3d00c40809227802d7f4f61fb3f813c1 100644 (file)
@@ -15,8 +15,7 @@
 ## GNU General Public License for more details.
 ##
 ## You should have received a copy of the GNU General Public License
 ## GNU General Public License for more details.
 ##
 ## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
 ##
 
 import sigrokdecode as srd
 ##
 
 import sigrokdecode as srd
@@ -33,12 +32,15 @@ Packet:
  - 'BIT', <bit>
  - 'STUFF BIT', None
  - 'EOP', None
  - 'BIT', <bit>
  - 'STUFF BIT', None
  - 'EOP', None
+ - 'ERR', None
+ - 'KEEP ALIVE', None
+ - 'RESET', None
 
 <sym>:
  - 'J', 'K', 'SE0', or 'SE1'
 
 <bit>:
 
 <sym>:
  - 'J', 'K', 'SE0', or 'SE1'
 
 <bit>:
- - 0 or 1
+ - '0' or '1'
  - Note: Symbols like SE0, SE1, and the J that's part of EOP don't yield 'BIT'.
 '''
 
  - Note: Symbols like SE0, SE1, and the J that's part of EOP don't yield 'BIT'.
 '''
 
@@ -59,55 +61,96 @@ symbols = {
         (0, 1): 'K',
         (1, 1): 'SE1',
     },
         (0, 1): 'K',
         (1, 1): 'SE1',
     },
+    'automatic': {
+        # (<dp>, <dm>): <symbol/state>
+        (0, 0): 'SE0',
+        (1, 0): 'FS_J',
+        (0, 1): 'LS_J',
+        (1, 1): 'SE1',
+    },
+    # After a PREamble PID, the bus segment between Host and Hub uses LS
+    # signalling rate and FS signalling polarity (USB 2.0 spec, 11.8.4: "For
+    # both upstream and downstream low-speed data, the hub is responsible for
+    # inverting the polarity of the data before transmitting to/from a
+    # low-speed port.").
+    'low-speed-rp': {
+        # (<dp>, <dm>): <symbol/state>
+        (0, 0): 'SE0',
+        (1, 0): 'J',
+        (0, 1): 'K',
+        (1, 1): 'SE1',
+    },
 }
 
 bitrates = {
 }
 
 bitrates = {
-    'low-speed': 1500000,   # 1.5Mb/s (+/- 1.5%)
+    'low-speed': 1500000, # 1.5Mb/s (+/- 1.5%)
+    'low-speed-rp': 1500000, # 1.5Mb/s (+/- 1.5%)
     'full-speed': 12000000, # 12Mb/s (+/- 0.25%)
     'full-speed': 12000000, # 12Mb/s (+/- 0.25%)
+    'automatic': None
 }
 
 }
 
+sym_annotation = {
+    'J': [0, ['J']],
+    'K': [1, ['K']],
+    'SE0': [2, ['SE0', '0']],
+    'SE1': [3, ['SE1', '1']],
+}
+
+class SamplerateError(Exception):
+    pass
+
 class Decoder(srd.Decoder):
 class Decoder(srd.Decoder):
-    api_version = 1
+    api_version = 3
     id = 'usb_signalling'
     name = 'USB signalling'
     longname = 'Universal Serial Bus (LS/FS) signalling'
     id = 'usb_signalling'
     name = 'USB signalling'
     longname = 'Universal Serial Bus (LS/FS) signalling'
-    desc = 'USB (low-speed and full-speed) signalling protocol.'
+    desc = 'USB (low-speed/full-speed) signalling protocol.'
     license = 'gplv2+'
     inputs = ['logic']
     outputs = ['usb_signalling']
     license = 'gplv2+'
     inputs = ['logic']
     outputs = ['usb_signalling']
-    probes = (
+    tags = ['PC']
+    channels = (
         {'id': 'dp', 'name': 'D+', 'desc': 'USB D+ signal'},
         {'id': 'dm', 'name': 'D-', 'desc': 'USB D- signal'},
     )
     options = (
         {'id': 'signalling', 'desc': 'Signalling',
         {'id': 'dp', 'name': 'D+', 'desc': 'USB D+ signal'},
         {'id': 'dm', 'name': 'D-', 'desc': 'USB D- signal'},
     )
     options = (
         {'id': 'signalling', 'desc': 'Signalling',
-            'default': 'full-speed', 'values': ('full-speed', 'low-speed')},
+            'default': 'automatic', 'values': ('automatic', 'full-speed', 'low-speed')},
     )
     annotations = (
     )
     annotations = (
-        ('sym', 'Symbol'),
+        ('sym-j', 'J symbol'),
+        ('sym-k', 'K symbol'),
+        ('sym-se0', 'SE0 symbol'),
+        ('sym-se1', 'SE1 symbol'),
         ('sop', 'Start of packet (SOP)'),
         ('eop', 'End of packet (EOP)'),
         ('bit', 'Bit'),
         ('stuffbit', 'Stuff bit'),
         ('sop', 'Start of packet (SOP)'),
         ('eop', 'End of packet (EOP)'),
         ('bit', 'Bit'),
         ('stuffbit', 'Stuff bit'),
+        ('error', 'Error'),
+        ('keep-alive', 'Low-speed keep-alive'),
+        ('reset', 'Reset'),
     )
     annotation_rows = (
     )
     annotation_rows = (
-        ('bits', 'Bits', (1, 2, 3, 4)),
-        ('symbols', 'Symbols', (0,)),
+        ('bits', 'Bits', (4, 5, 6, 7, 8, 9, 10)),
+        ('symbols', 'Symbols', (0, 1, 2, 3)),
     )
 
     def __init__(self):
     )
 
     def __init__(self):
+        self.reset()
+
+    def reset(self):
         self.samplerate = None
         self.oldsym = 'J' # The "idle" state is J.
         self.samplerate = None
         self.oldsym = 'J' # The "idle" state is J.
-        self.ss_sop = None
         self.ss_block = None
         self.ss_block = None
-        self.samplenum = 0
-        self.syms = []
         self.bitrate = None
         self.bitwidth = None
         self.bitrate = None
         self.bitwidth = None
-        self.bitnum = 0
+        self.samplepos = None
         self.samplenum_target = None
         self.samplenum_target = None
-        self.oldpins = None
+        self.samplenum_edge = None
+        self.samplenum_lastedge = 0
+        self.edgepins = None
         self.consecutive_ones = 0
         self.consecutive_ones = 0
+        self.bits = None
         self.state = 'IDLE'
 
     def start(self):
         self.state = 'IDLE'
 
     def start(self):
@@ -117,60 +160,72 @@ class Decoder(srd.Decoder):
     def metadata(self, key, value):
         if key == srd.SRD_CONF_SAMPLERATE:
             self.samplerate = value
     def metadata(self, key, value):
         if key == srd.SRD_CONF_SAMPLERATE:
             self.samplerate = value
-            self.bitrate = bitrates[self.options['signalling']]
-            self.bitwidth = float(self.samplerate) / float(self.bitrate)
-            self.halfbit = int(self.bitwidth / 2)
+            self.signalling = self.options['signalling']
+            if self.signalling != 'automatic':
+                self.update_bitrate()
+
+    def update_bitrate(self):
+        self.bitrate = bitrates[self.signalling]
+        self.bitwidth = float(self.samplerate) / float(self.bitrate)
 
     def putpx(self, data):
 
     def putpx(self, data):
-        self.put(self.samplenum, self.samplenum, self.out_python, data)
+        s = self.samplenum_edge
+        self.put(s, s, self.out_python, data)
 
     def putx(self, data):
 
     def putx(self, data):
-        self.put(self.samplenum, self.samplenum, self.out_ann, data)
+        s = self.samplenum_edge
+        self.put(s, s, self.out_ann, data)
 
     def putpm(self, data):
 
     def putpm(self, data):
-        s, h = self.samplenum, self.halfbit
-        self.put(self.ss_block - h, s + h, self.out_python, data)
+        e = self.samplenum_edge
+        self.put(self.ss_block, e, self.out_python, data)
 
     def putm(self, data):
 
     def putm(self, data):
-        s, h = self.samplenum, self.halfbit
-        self.put(self.ss_block - h, s + h, self.out_ann, data)
+        e = self.samplenum_edge
+        self.put(self.ss_block, e, self.out_ann, data)
 
     def putpb(self, data):
 
     def putpb(self, data):
-        s, h = self.samplenum, self.halfbit
-        self.put(s - h, s + h, self.out_python, data)
+        s, e = self.samplenum_lastedge, self.samplenum_edge
+        self.put(s, e, self.out_python, data)
 
     def putb(self, data):
 
     def putb(self, data):
-        s, h = self.samplenum, self.halfbit
-        self.put(s - h, s + h, self.out_ann, data)
+        s, e = self.samplenum_lastedge, self.samplenum_edge
+        self.put(s, e, self.out_ann, data)
 
     def set_new_target_samplenum(self):
 
     def set_new_target_samplenum(self):
-        bitpos = self.ss_sop + (self.bitwidth / 2)
-        bitpos += self.bitnum * self.bitwidth
-        self.samplenum_target = int(bitpos)
+        self.samplepos += self.bitwidth
+        self.samplenum_target = int(self.samplepos)
+        self.samplenum_lastedge = self.samplenum_edge
+        self.samplenum_edge = int(self.samplepos - (self.bitwidth / 2))
 
     def wait_for_sop(self, sym):
         # Wait for a Start of Packet (SOP), i.e. a J->K symbol change.
 
     def wait_for_sop(self, sym):
         # Wait for a Start of Packet (SOP), i.e. a J->K symbol change.
-        if sym != 'K':
-            self.oldsym = sym
+        if sym != 'K' or self.oldsym != 'J':
             return
             return
-        self.ss_sop = self.samplenum
+        self.consecutive_ones = 0
+        self.bits = ''
+        self.update_bitrate()
+        self.samplepos = self.samplenum - (self.bitwidth / 2) + 0.5
         self.set_new_target_samplenum()
         self.putpx(['SOP', None])
         self.set_new_target_samplenum()
         self.putpx(['SOP', None])
-        self.putx([1, ['SOP', 'S']])
+        self.putx([4, ['SOP', 'S']])
         self.state = 'GET BIT'
 
         self.state = 'GET BIT'
 
-    def handle_bit(self, sym, b):
-        if self.consecutive_ones == 6 and b == '0':
-            # Stuff bit.
-            self.putpb(['STUFF BIT', None])
-            self.putb([4, ['Stuff bit: %s' % b, 'SB: %s' % b, '%s' % b]])
-            self.putb([0, ['%s' % sym]])
-            self.consecutive_ones = 0
+    def handle_bit(self, b):
+        if self.consecutive_ones == 6:
+            if b == '0':
+                # Stuff bit.
+                self.putpb(['STUFF BIT', None])
+                self.putb([7, ['Stuff bit: 0', 'SB: 0', '0']])
+                self.consecutive_ones = 0
+            else:
+                self.putpb(['ERR', None])
+                self.putb([8, ['Bit stuff error', 'BS ERR', 'B']])
+                self.state = 'IDLE'
         else:
             # Normal bit (not a stuff bit).
             self.putpb(['BIT', b])
         else:
             # Normal bit (not a stuff bit).
             self.putpb(['BIT', b])
-            self.putb([3, ['%s' % b]])
-            self.putb([0, ['%s' % sym]])
+            self.putb([6, ['%s' % b]])
             if b == '1':
                 self.consecutive_ones += 1
             else:
             if b == '1':
                 self.consecutive_ones += 1
             else:
@@ -178,55 +233,113 @@ class Decoder(srd.Decoder):
 
     def get_eop(self, sym):
         # EOP: SE0 for >= 1 bittime (usually 2 bittimes), then J.
 
     def get_eop(self, sym):
         # EOP: SE0 for >= 1 bittime (usually 2 bittimes), then J.
-        self.syms.append(sym)
-        self.putpb(['SYM', sym])
-        self.putb([0, ['%s' % sym, '%s' % sym[0]]])
-        self.bitnum += 1
         self.set_new_target_samplenum()
         self.set_new_target_samplenum()
+        self.putpb(['SYM', sym])
+        self.putb(sym_annotation[sym])
         self.oldsym = sym
         self.oldsym = sym
-        if self.syms[-2:] == ['SE0', 'J']:
+        if sym == 'SE0':
+            pass
+        elif sym == 'J':
             # Got an EOP.
             self.putpm(['EOP', None])
             # Got an EOP.
             self.putpm(['EOP', None])
-            self.putm([2, ['EOP', 'E']])
-            self.bitnum, self.syms, self.state = 0, [], 'IDLE'
-            self.consecutive_ones = 0
+            self.putm([5, ['EOP', 'E']])
+            self.state = 'WAIT IDLE'
+        else:
+            self.putpm(['ERR', None])
+            self.putm([8, ['EOP Error', 'EErr', 'E']])
+            self.state = 'IDLE'
 
     def get_bit(self, sym):
 
     def get_bit(self, sym):
+        self.set_new_target_samplenum()
+        b = '0' if self.oldsym != sym else '1'
+        self.oldsym = sym
         if sym == 'SE0':
         if sym == 'SE0':
-            # Start of an EOP. Change state, run get_eop() for this bit.
+            # Start of an EOP. Change state, save edge
             self.state = 'GET EOP'
             self.state = 'GET EOP'
-            self.ss_block = self.samplenum
-            self.get_eop(sym)
-            return
-        self.syms.append(sym)
+            self.ss_block = self.samplenum_lastedge
+        else:
+            self.handle_bit(b)
         self.putpb(['SYM', sym])
         self.putpb(['SYM', sym])
-        b = '0' if self.oldsym != sym else '1'
-        self.handle_bit(sym, b)
-        self.bitnum += 1
-        self.set_new_target_samplenum()
-        self.oldsym = sym
+        self.putb(sym_annotation[sym])
+        if len(self.bits) <= 16:
+            self.bits += b
+        if len(self.bits) == 16 and self.bits == '0000000100111100':
+            # Sync and low-speed PREamble seen
+            self.putpx(['EOP', None])
+            self.state = 'IDLE'
+            self.signalling = 'low-speed-rp'
+            self.update_bitrate()
+            self.oldsym = 'J'
+        if b == '0':
+            edgesym = symbols[self.signalling][tuple(self.edgepins)]
+            if edgesym not in ('SE0', 'SE1'):
+                if edgesym == sym:
+                    self.bitwidth = self.bitwidth - (0.001 * self.bitwidth)
+                    self.samplepos = self.samplepos - (0.01 * self.bitwidth)
+                else:
+                    self.bitwidth = self.bitwidth + (0.001 * self.bitwidth)
+                    self.samplepos = self.samplepos + (0.01 * self.bitwidth)
+
+    def handle_idle(self, sym):
+        self.samplenum_edge = self.samplenum
+        se0_length = float(self.samplenum - self.samplenum_lastedge) / self.samplerate
+        if se0_length > 2.5e-6: # 2.5us
+            self.putpb(['RESET', None])
+            self.putb([10, ['Reset', 'Res', 'R']])
+            self.signalling = self.options['signalling']
+        elif se0_length > 1.2e-6 and self.signalling == 'low-speed':
+            self.putpb(['KEEP ALIVE', None])
+            self.putb([9, ['Keep-alive', 'KA', 'A']])
+
+        if sym == 'FS_J':
+            self.signalling = 'full-speed'
+            self.update_bitrate()
+        elif sym == 'LS_J':
+            self.signalling = 'low-speed'
+            self.update_bitrate()
+        self.oldsym = 'J'
+        self.state = 'IDLE'
+
+    def decode(self):
+        if not self.samplerate:
+            raise SamplerateError('Cannot decode without samplerate.')
 
 
-    def decode(self, ss, es, data):
-        if self.samplerate is None:
-            raise Exception("Cannot decode without samplerate.")
-        for (self.samplenum, pins) in data:
+        # Seed internal state from the very first sample.
+        pins = self.wait()
+        sym = symbols[self.options['signalling']][pins]
+        self.handle_idle(sym)
+
+        while True:
             # State machine.
             if self.state == 'IDLE':
             # State machine.
             if self.state == 'IDLE':
-                # Ignore identical samples early on (for performance reasons).
-                if self.oldpins == pins:
-                    continue
-                self.oldpins = pins
-                sym = symbols[self.options['signalling']][tuple(pins)]
-                self.wait_for_sop(sym)
+                # Wait for any edge on either DP and/or DM.
+                pins = self.wait([{0: 'e'}, {1: 'e'}])
+                sym = symbols[self.signalling][pins]
+                if sym == 'SE0':
+                    self.samplenum_lastedge = self.samplenum
+                    self.state = 'WAIT IDLE'
+                else:
+                    self.wait_for_sop(sym)
+                self.edgepins = pins
             elif self.state in ('GET BIT', 'GET EOP'):
                 # Wait until we're in the middle of the desired bit.
             elif self.state in ('GET BIT', 'GET EOP'):
                 # Wait until we're in the middle of the desired bit.
-                if self.samplenum < self.samplenum_target:
-                    continue
-                sym = symbols[self.options['signalling']][tuple(pins)]
+                self.edgepins = self.wait([{'skip': self.samplenum_edge - self.samplenum}])
+                pins = self.wait([{'skip': self.samplenum_target - self.samplenum}])
+
+                sym = symbols[self.signalling][pins]
                 if self.state == 'GET BIT':
                     self.get_bit(sym)
                 elif self.state == 'GET EOP':
                     self.get_eop(sym)
                 if self.state == 'GET BIT':
                     self.get_bit(sym)
                 elif self.state == 'GET EOP':
                     self.get_eop(sym)
-            else:
-                raise Exception('Invalid state: %s' % self.state)
-
+            elif self.state == 'WAIT IDLE':
+                # Skip "all-low" input. Wait for high level on either DP or DM.
+                pins = self.wait()
+                while not pins[0] and not pins[1]:
+                    pins = self.wait([{0: 'h'}, {1: 'h'}])
+                if self.samplenum - self.samplenum_lastedge > 1:
+                    sym = symbols[self.options['signalling']][pins]
+                    self.handle_idle(sym)
+                else:
+                    sym = symbols[self.signalling][pins]
+                    self.wait_for_sop(sym)
+                self.edgepins = pins