From: Chris Date: Mon, 12 Sep 2016 01:14:44 +0000 (-0700) Subject: midi: Add support for complex MIDI message formats (inter-related). X-Git-Tag: libsigrokdecode-0.5.0~144 X-Git-Url: https://sigrok.org/gitweb/?p=libsigrokdecode.git;a=commitdiff_plain;h=6aa03e3aeb9054122e52016c3344927d5c0f02de midi: Add support for complex MIDI message formats (inter-related). - Handle "Running Status" where status byte can be omitted - SysEx message can be terminated by status byte, not just EOX - Handle SysReal messages that interrupt in-progress message transmission - Identify and print garbage / truncated data - Add 2 new annotations: text-sysreal-verbose and text-error --- diff --git a/decoders/midi/pd.py b/decoders/midi/pd.py index 989ada7..70c7120 100644 --- a/decoders/midi/pd.py +++ b/decoders/midi/pd.py @@ -35,11 +35,19 @@ class Decoder(srd.Decoder): outputs = ['midi'] annotations = ( ('text-verbose', 'Human-readable text (verbose)'), + ('text-sysreal-verbose', 'Human-readable SysReal text (verbose)'), + ('text-error', 'Human-readable Error text'), + ) + annotation_rows = ( + ('normal', 'Normal', (0, 2)), + ('sys-real', 'SysReal', (1,)), ) def __init__(self): - self.cmd = [] self.state = 'IDLE' + self.status_byte = 0 + self.explicit_status_byte = False + self.cmd = [] self.ss = None self.es = None self.ss_block = None @@ -57,14 +65,33 @@ class Decoder(srd.Decoder): else: return 'assuming ' + percussion_notes.get(note, 'undefined') - def handle_channel_msg_0x80(self): + def check_for_garbage_flush(self, is_flushed): + if is_flushed == True: + if self.explicit_status_byte == True: + self.cmd.insert(0, self.status_byte) + self.handle_garbage_msg(None) + + def soft_clear_status_byte(self): + self.explicit_status_byte = False + + def hard_clear_status_byte(self): + self.status_byte = 0 + self.explicit_status_byte = False + + def set_status_byte(self, newbyte): + self.status_byte = newbyte + self.explicit_status_byte = True + + def handle_channel_msg_0x80(self, is_flushed): # Note off: 8n kk vv # n = channel, kk = note, vv = velocity c = self.cmd - if len(c) < 3: + if len(c) < 2: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - msg, chan, note, velocity = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[1], c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + note, velocity = c[0], c[1] note_name = self.get_note_name(chan, note) self.putx([0, ['Channel %d: %s (note = %d \'%s\', velocity = %d)' % \ (chan, status_bytes[msg][0], note, note_name, velocity), @@ -73,16 +100,19 @@ class Decoder(srd.Decoder): '%d: %s %d, vel %d' % \ (chan, status_bytes[msg][2], note, velocity)]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_0x90(self): + def handle_channel_msg_0x90(self, is_flushed): # Note on: 9n kk vv # n = channel, kk = note, vv = velocity # If velocity == 0 that actually means 'note off', though. c = self.cmd - if len(c) < 3: + if len(c) < 2: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - msg, chan, note, velocity = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[1], c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + note, velocity = c[0], c[1] s = status_bytes[0x80] if (velocity == 0) else status_bytes[msg] note_name = self.get_note_name(chan, note) self.putx([0, ['Channel %d: %s (note = %d \'%s\', velocity = %d)' % \ @@ -92,15 +122,18 @@ class Decoder(srd.Decoder): '%d: %s %d, vel %d' % \ (chan, s[2], note, velocity)]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_0xa0(self): + def handle_channel_msg_0xa0(self, is_flushed): # Polyphonic key pressure / aftertouch: An kk vv # n = channel, kk = polyphonic key pressure, vv = pressure value c = self.cmd - if len(c) < 3: + if len(c) < 2: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - msg, chan, note, pressure = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[1], c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + note, pressure = c[0], c[1] note_name = self.get_note_name(chan, note) self.putx([0, ['Channel %d: %s of %d for note = %d \'%s\'' % \ (chan, status_bytes[msg][0], pressure, note, note_name), @@ -109,12 +142,14 @@ class Decoder(srd.Decoder): '%d: %s %d, N %d' % \ (chan, status_bytes[msg][2], pressure, note)]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() def handle_controller_0x44(self): # Legato footswitch: Bn 44 vv # n = channel, vv = value (<= 0x3f: normal, > 0x3f: legato) c = self.cmd - msg, chan, vv = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + vv = c[1] t = ('normal', 'no') if vv <= 0x3f else ('legato', 'yes') self.putx([0, ['Channel %d: %s \'%s\' = %s' % \ (chan, status_bytes[msg][0], @@ -130,7 +165,8 @@ class Decoder(srd.Decoder): # Portamento control (PTC): Bn 54 kk # n = channel, kk = source note for pitch reference c = self.cmd - msg, chan, kk = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + kk = c[1] kk_name = self.get_note_name(chan, kk) self.putx([0, ['Channel %d: %s \'%s\' (source note = %d / %s)' % \ (chan, status_bytes[msg][0], @@ -144,7 +180,8 @@ class Decoder(srd.Decoder): def handle_controller_generic(self): c = self.cmd - msg, chan, fn, param = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[1], c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + fn, param = c[0], c[1] default_name = 'undefined' ctrl_fn = control_functions.get(fn, default_name) if ctrl_fn == default_name: @@ -160,7 +197,8 @@ class Decoder(srd.Decoder): # Channel Mode: Bn mm vv # n = channel, mm = mode number (120 - 127), vv = value c = self.cmd - msg, chan, mm, vv = c[0] & 0xf0, (c[0] & 0x0f) + 1, c[1], c[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + mm, vv = c[0], c[1] mode_fn = control_functions.get(mm, ('undefined', 'undef', 'undef')) # Decode the value based on the mode number. vv_string = ('', '') @@ -188,31 +226,35 @@ class Decoder(srd.Decoder): '%d: %s \'%s\' %s' % \ (chan, status_bytes[msg][2], mode_fn[2], vv_string[1])]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_0xb0(self): + def handle_channel_msg_0xb0(self, is_flushed): # Control change (or channel mode messages): Bn cc vv # n = channel, cc = control number (0 - 119), vv = control value c = self.cmd - if len(c) < 3: + if len(c) < 2: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - if c[1] in range(0x78, 0x7f + 1): + if c[0] in range(0x78, 0x7f + 1): self.handle_channel_mode() return - handle_ctrl = getattr(self, 'handle_controller_0x%02x' % c[1], + handle_ctrl = getattr(self, 'handle_controller_0x%02x' % c[0], self.handle_controller_generic) handle_ctrl() self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_0xc0(self): + def handle_channel_msg_0xc0(self, is_flushed): # Program change: Cn pp # n = channel, pp = program number (0 - 127) c = self.cmd - if len(c) < 2: + if len(c) < 1: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - msg, chan, pp = self.cmd[0] & 0xf0, (self.cmd[0] & 0x0f) + 1, \ - self.cmd[1] + 1 + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + pp = self.cmd[0] + 1 change_type = 'instrument' name = '' if chan != 10: # channel != percussion @@ -227,30 +269,34 @@ class Decoder(srd.Decoder): '%d: %s %d' % \ (chan, status_bytes[msg][2], pp)]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_0xd0(self): + def handle_channel_msg_0xd0(self, is_flushed): # Channel pressure / aftertouch: Dn vv # n = channel, vv = pressure value c = self.cmd - if len(c) < 2: + if len(c) < 1: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - msg, chan, vv = self.cmd[0] & 0xf0, (self.cmd[0] & 0x0f) + 1, \ - self.cmd[1] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + vv = self.cmd[0] self.putx([0, ['Channel %d: %s %d' % (chan, status_bytes[msg][0], vv), 'ch %d: %s %d' % (chan, status_bytes[msg][1], vv), '%d: %s %d' % (chan, status_bytes[msg][2], vv)]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_0xe0(self): + def handle_channel_msg_0xe0(self, is_flushed): # Pitch bend change: En ll mm # n = channel, ll = pitch bend change LSB, mm = pitch bend change MSB c = self.cmd - if len(c) < 3: + if len(c) < 2: + self.check_for_garbage_flush(is_flushed) return self.es_block = self.es - msg, chan, ll, mm = self.cmd[0] & 0xf0, (self.cmd[0] & 0x0f) + 1, \ - self.cmd[1], self.cmd[2] + msg, chan = self.status_byte & 0xf0, (self.status_byte & 0x0f) + 1 + ll, mm = self.cmd[0], self.cmd[1] decimal = (mm << 7) + ll self.putx([0, ['Channel %d: %s 0x%02x 0x%02x (%d)' % \ (chan, status_bytes[msg][0], ll, mm, decimal), @@ -259,33 +305,44 @@ class Decoder(srd.Decoder): '%d: %s (%d)' % \ (chan, status_bytes[msg][2], decimal)]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() - def handle_channel_msg_generic(self): + def handle_channel_msg_generic(self, is_flushed): # TODO: It should not be possible to hit this code. # It currently can not be unit tested. - msg_type = self.cmd[0] & 0xf0 + msg_type = self.status_byte & 0xf0 self.es_block = self.es - self.putx([0, ['Unknown channel message type: 0x%02x' % msg_type]]) + self.putx([2, ['Unknown channel message type: 0x%02x' % msg_type]]) self.cmd, self.state = [], 'IDLE' + self.soft_clear_status_byte() def handle_channel_msg(self, newbyte): - self.cmd.append(newbyte) - msg_type = self.cmd[0] & 0xf0 + if newbyte != None: + if newbyte >= 0x80: + self.set_status_byte(newbyte) + else: + self.cmd.append(newbyte) + msg_type = self.status_byte & 0xf0 handle_msg = getattr(self, 'handle_channel_msg_0x%02x' % msg_type, self.handle_channel_msg_generic) - handle_msg() + handle_msg(newbyte == None) def handle_sysex_msg(self, newbyte): # SysEx message: 1 status byte, 1-3 manuf. bytes, x data bytes, EOX byte - self.cmd.append(newbyte) - if newbyte != 0xf7: # EOX + # + # SysEx message are variable length, can be terminated by EOX byte or + # by any non-SysReal status byte, and it clears self.status_byte. + # Note: all System message code doesn't utilize self.status_byte + self.hard_clear_status_byte() + if newbyte != 0xf7 and newbyte != None: # EOX + self.cmd.append(newbyte) return self.es_block = self.es # Note: Unlike other methods, this code pops bytes out of self.cmd # to isolate the data. - msg, eox = self.cmd.pop(0), self.cmd.pop() + msg = self.cmd.pop(0) if len(self.cmd) < 1: - self.putx([0, ['%s: truncated manufacturer code (<1 bytes)' % \ + self.putx([2, ['%s: truncated manufacturer code (<1 bytes)' % \ status_bytes[msg][0], '%s: truncated manufacturer (<1 bytes)' % \ status_bytes[msg][1], @@ -297,7 +354,7 @@ class Decoder(srd.Decoder): manu = (m1,) if m1 == 0x00: # If byte == 0, then 2 more manufacturer bytes follow. if len(self.cmd) < 2: - self.putx([0, ['%s: truncated manufacturer code (<3 bytes)' % \ + self.putx([2, ['%s: truncated manufacturer code (<3 bytes)' % \ status_bytes[msg][0], '%s: truncated manufacturer (<3 bytes)' % \ status_bytes[msg][1], @@ -341,11 +398,16 @@ class Decoder(srd.Decoder): # MIDI time code quarter frame: F1 nd # n = message type # d = values + # + # Note: all System message code don't utilize self.status_byte, + # and System Exclusive and System Common clear it. c = self.cmd if len(c) < 2: + if newbyte == None: + self.handle_garbage_msg(None) return - msg = self.cmd[0] - nn, dd = (self.cmd[1] & 0x70) >> 4, self.cmd[1] & 0x0f + msg = c[0] + nn, dd = (c[1] & 0x70) >> 4, c[1] & 0x0f group = ('System Common', 'SysCom', 'SC') self.es_block = self.es if nn != 7: # If message type does not contain SMPTE type. @@ -380,9 +442,12 @@ class Decoder(srd.Decoder): # # Note: While the MIDI lists 0xf7 as a "system common" message, it # is actually only used with SysEx messages so it is processed there. - self.cmd.append(newbyte) - msg = self.cmd[0] + # Note 2: all System message code doesn't utilize self.status_byte + self.hard_clear_status_byte() + if newbyte != None: + self.cmd.append(newbyte) c = self.cmd + msg = c[0] group = ('System Common', 'SysCom', 'SC') if msg == 0xf1: # MIDI time code quarter frame @@ -392,8 +457,10 @@ class Decoder(srd.Decoder): # Song position pointer: F2 ll mm # ll = LSB position, mm = MSB position if len(c) < 3: + if newbyte == None: + self.handle_garbage_msg(None) return - ll, mm = self.cmd[1], self.cmd[2] + ll, mm = c[1], c[2] decimal = (mm << 7) + ll self.es_block = self.es self.putx([0, ['%s: %s 0x%02x 0x%02x (%d)' % \ @@ -406,8 +473,10 @@ class Decoder(srd.Decoder): # Song select: F3 ss # ss = song selection number if len(c) < 2: + if newbyte == None: + self.handle_garbage_msg(None) return - ss = self.cmd[1] + ss = c[1] self.es_block = self.es self.putx([0, ['%s: %s number %d' % \ (group[0], status_bytes[msg][0], ss), @@ -426,22 +495,91 @@ class Decoder(srd.Decoder): def handle_sysrealtime_msg(self, newbyte): # System realtime message: 0b11111ttt (t = message type) + # + # Important: these messages are handled different from all others + # because they are allowed to temporarily interrupt other messages. + # The interrupted messages resume after the realtime message is done. + # Thus, they mostly leave 'self' the way it was found. + # Note: all System message code doesn't utilize self.status_byte + old_ss_block = self.ss_block + old_es_block = self.es_block + self.ss_block = self.ss self.es_block = self.es group = ('System Realtime', 'SysReal', 'SR') - self.putx([0, ['%s: %s' % (group[0], status_bytes[newbyte][0]), + self.putx([1, ['%s: %s' % (group[0], status_bytes[newbyte][0]), '%s: %s' % (group[1], status_bytes[newbyte][1]), '%s: %s' % (group[2], status_bytes[newbyte][2])]]) + self.ss_block = old_ss_block + self.es_block = old_es_block + # Deliberately not resetting self.cmd or self.state + + def handle_garbage_msg(self, newbyte): + # Handles messages that are either not handled or are corrupt + self.es_block = self.es + if newbyte != None: + self.cmd.append(newbyte) + return + payload = '' + max_bytes = 16 # Put a limit on the length on the hex dump + for index in range( 0, len(self.cmd) ): + if index == max_bytes: + payload = payload + ' ...' + break + if index == 0: + payload = '0x%02x' % self.cmd[index] + else: + payload = payload + ' 0x%02x' % self.cmd[index] + self.putx([2, ['UNHANDLED DATA: %s' % payload, + 'UNHANDLED', + '???', + '?']]) self.cmd, self.state = [], 'IDLE' + self.hard_clear_status_byte() + + def handle_state(self, state, newbyte): + # 'newbyte' can either be: + # 1. Value between 0x00-0xff, deal with the byte normally + # 2. Value of 'None' which means flush any buffered data. + if state == 'HANDLE CHANNEL MSG': + self.handle_channel_msg(newbyte) + elif state == 'HANDLE SYSEX MSG': + self.handle_sysex_msg(newbyte) + elif state == 'HANDLE SYSCOMMON MSG': + self.handle_syscommon_msg(newbyte) + elif state == 'HANDLE SYSREALTIME MSG': + self.handle_sysrealtime_msg(newbyte) + elif state == 'BUFFER GARBAGE MSG': + self.handle_garbage_msg(newbyte) + + def get_next_state(self, newbyte): + # 'newbyte' must be a valid byte between 0x00 and 0xff + # + # Try to determine the state based off of 'newbyte' parameter + # ... if it is >= 0x80. + if newbyte in range(0x80, 0xef + 1): + return 'HANDLE CHANNEL MSG' + if newbyte == 0xf0: + return 'HANDLE SYSEX MSG' + if newbyte in range(0xf1, 0xf7): + return'HANDLE SYSCOMMON MSG' + if newbyte in range(0xf8, 0xff + 1): + return 'HANDLE SYSREALTIME MSG' + # Passing 0xf7 is an error; messages don't start with 0xf7 + if newbyte == 0xf7: + return 'BUFFER GARBAGE MSG' + # Next, base the state off of self.status_byte + if self.status_byte < 0x80: + return 'BUFFER GARBAGE MSG' + return self.get_next_state(self.status_byte) def decode(self, ss, es, data): ptype, rxtx, pdata = data + state = 'IDLE' # For now, ignore all UART packets except the actual data packets. if ptype != 'DATA': return - self.ss, self.es = ss, es - # We're only interested in the byte value (not individual bits). pdata = pdata[0] @@ -451,28 +589,36 @@ class Decoder(srd.Decoder): # - Real-time system messages: always 1 byte. # - SysEx messages: 1 status byte, n data bytes, EOX byte. + # Aspects of the MIDI protocol that complicate decoding: + # - MIDI System Realtime messages can briefly interrupt other + # message already in progress. + # - "Running Status" allows for omitting the status byte in most + # scenarios if sequential messages have the same status byte. + # - System Exclusive (SysEx) messages can be terminated by ANY + # status byte (not limited to EOX byte). + # State machine. - if self.state == 'IDLE': - # Wait until we see a status byte (bit 7 must be set). - if pdata < 0x80: - return # TODO: How to handle? Ignore? + if pdata >= 0x80 and pdata != 0xf7: + state = self.get_next_state(pdata) + if state != 'HANDLE SYSREALTIME MSG' and self.state != 'IDLE': + # Flush the previous data since a new message is starting + self.handle_state(self.state, None) + # Cache ss and es -after- flushing previous data + self.ss, self.es = ss, es # This is a status byte, remember the start sample. - self.ss_block = ss - if pdata in range(0x80, 0xef + 1): - self.state = 'HANDLE CHANNEL MSG' - elif pdata == 0xf0 or pdata == 0xf7: - self.state = 'HANDLE SYSEX MSG' - elif pdata in range(0xf1, 0xf7): - self.state = 'HANDLE SYSCOMMON MSG' - elif pdata in range(0xf8, 0xff + 1): - self.state = 'HANDLE SYSREALTIME MSG' + if state != 'HANDLE SYSREALTIME MSG': self.ss_block = ss + elif self.state == 'IDLE' or self.state == 'BUFFER GARBAGE MSG': + # Deal with "running status" or that we're buffering garbage + self.ss, self.es = ss, es + if self.state == 'IDLE': self.ss_block = ss + state = self.get_next_state(pdata) + else: + self.ss, self.es = ss, es + state = self.state # Yes, this is intentionally _not_ an 'elif' here. - if self.state == 'HANDLE CHANNEL MSG': - self.handle_channel_msg(pdata) - elif self.state == 'HANDLE SYSEX MSG': - self.handle_sysex_msg(pdata) - elif self.state == 'HANDLE SYSCOMMON MSG': - self.handle_syscommon_msg(pdata) - elif self.state == 'HANDLE SYSREALTIME MSG': - self.handle_sysrealtime_msg(pdata) + if state != 'HANDLE SYSREALTIME MSG': + self.state = state + if state == 'BUFFER GARBAGE MSG': + self.status_byte = 0 + self.handle_state(state, pdata)