## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
-## along with this program; if not, write to the Free Software
-## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
import sigrokdecode as srd
pass
class Decoder(srd.Decoder):
- api_version = 2
+ api_version = 3
id = 'spdif'
name = 'S/PDIF'
longname = 'Sony/Philips Digital Interface Format'
desc = 'Serial bus for connecting digital audio devices.'
license = 'gplv2+'
inputs = ['logic']
- outputs = ['spdif']
+ outputs = []
+ tags = ['Audio', 'PC']
channels = (
{'id': 'data', 'name': 'Data', 'desc': 'Data line'},
)
annotations = (
('bitrate', 'Bitrate / baudrate'),
('preamble', 'Preamble'),
- ('bits', 'Bits'),
- ('aux', 'Auxillary-audio-databits'),
- ('samples', 'Audio Samples'),
+ ('bit', 'Bit'),
+ ('aux', 'Auxillary-audio-databit'),
+ ('sample', 'Audio Sample'),
('validity', 'Data Valid'),
('subcode', 'Subcode data'),
('chan_stat', 'Channnel Status'),
('parity', 'Parity Bit'),
)
annotation_rows = (
- ('info', 'Info', (0, 1, 3, 5, 6, 7, 8)),
('bits', 'Bits', (2,)),
+ ('info', 'Info', (0, 1, 3, 5, 6, 7, 8)),
('samples', 'Samples', (4,)),
)
self.put(self.ss_edge, self.samplenum, self.out_ann, data)
def __init__(self):
+ self.reset()
+
+ def reset(self):
self.state = 'GET FIRST PULSE WIDTH'
- self.olddata = None
self.ss_edge = None
self.first_edge = True
+ self.samplenum_prev_edge = 0
self.pulse_width = 0
self.clocks = []
self.seen_preamble = False
self.last_preamble = 0
+ self.bitrate_message_start = 0
+ self.bitrate_message_end = 0
+ self.frame_counter = 0
+ self.frame_start = 0
+ self.frame_length = 0
+
+ self.sampleratetmp = 1
+
self.first_one = True
self.subframe = []
self.samplerate = value
def get_pulse_type(self):
- if self.range1 == 0 or self.range2 == 0:
- return -1
if self.pulse_width >= self.range2:
return 2
elif self.pulse_width >= self.range1:
if self.pulse_width != 0:
self.clocks.append(self.pulse_width)
self.state = 'GET SECOND PULSE WIDTH'
+ self.puty([2, ['Found width 1: %d' % self.pulse_width, 'W1: %d' % self.pulse_width]])
+ self.ss_edge = self.samplenum
def find_second_pulse_width(self):
if self.pulse_width > (self.clocks[0] * 1.3) or \
- self.pulse_width < (self.clocks[0] * 0.7):
+ self.pulse_width <= (self.clocks[0] * 0.75):
+ self.puty([2, ['Found width 2: %d' % self.pulse_width, 'W2: %d' % self.pulse_width]])
self.clocks.append(self.pulse_width)
self.state = 'GET THIRD PULSE WIDTH'
+ else:
+ self.puty([2, ['Search width 2: %d' % self.pulse_width, 'SW2: %d' % self.pulse_width]])
+ self.ss_edge = self.samplenum
def find_third_pulse_width(self):
if not ((self.pulse_width > (self.clocks[0] * 1.3) or \
- self.pulse_width < (self.clocks[0] * 0.7)) \
+ self.pulse_width <= (self.clocks[0] * 0.75)) \
and (self.pulse_width > (self.clocks[1] * 1.3) or \
- self.pulse_width < (self.clocks[1] * 0.7))):
+ self.pulse_width <= (self.clocks[1] * 0.75))):
+ self.puty([2, ['Search width 3: %d' % self.pulse_width, 'SW3: %d' % self.pulse_width]])
+ self.ss_edge = self.samplenum
return
+ else:
+ self.puty([2, ['Found width 3: %d' % self.pulse_width, 'W3: %d' % self.pulse_width]])
+ self.ss_edge = self.samplenum
+ # The message of the calculated bitrate should start at this sample
+ # (right after the synchronisation).
+ self.bitrate_message_start = self.samplenum
self.clocks.append(self.pulse_width)
self.clocks.sort()
self.range1 = (self.clocks[0] + self.clocks[1]) / 2
self.range2 = (self.clocks[1] + self.clocks[2]) / 2
- spdif_bitrate = int(self.samplerate / (self.clocks[2] / 1.5))
+ # Give some feedback during synchronisation and inform if sample rate
+ # is too low.
+ if self.clocks[0] <= 3:
+ self.putx(0, self.samplenum, [0, ['Short pulses detected. Increase sample rate!']])
+ raise SamplerateError('Short pulses detected')
+ else:
+ self.putx(0, self.samplenum, [0, ['Synchronisation']])
self.ss_edge = 0
- self.puty([0, ['Signal Bitrate: %d Mbit/s (=> %d kHz)' % \
- (spdif_bitrate, (spdif_bitrate/ (2 * 32)))]])
-
- clock_period_nsec = 1000000000 / spdif_bitrate
+ # Mostly, the synchronisation ends with a long pulse because they
+ # appear rarely. A skip of the next pulse will then prevent a 'M'
+ # frame to be labeled an unknown preamble for the first decoded frame.
+ (data,) = self.wait({0: 'e'})
+ self.pulse_width = self.samplenum - self.samplenum_prev_edge
+ self.samplenum_prev_edge = self.samplenum
self.last_preamble = self.samplenum
# We are done recovering the clock, now let's decode the data stream.
if pulse == 2:
self.preamble.append(self.get_pulse_type())
self.state = 'DECODE PREAMBLE'
- self.ss_edge = self.samplenum - self.pulse_width - 1
+ self.ss_edge = self.samplenum - self.pulse_width
+ # Use the first ten frames to calculate bit rates
+ if self.frame_counter == 0:
+ # This is the first preamble to be decoded. Measurement of
+ # bit rates starts here.
+ self.frame_start = self.samplenum
+ # The bit rate message should end here.
+ self.bitrate_message_end = self.ss_edge
+ elif self.frame_counter == 10:
+ self.frame_length = self.samplenum - self.frame_start
+ # Use section between end of synchronisation and start of
+ # first preamble to show measured bit rates.
+ if self.samplerate:
+ self.putx(self.bitrate_message_start, self.bitrate_message_end,\
+ [0, ['Audio samplingrate: %6.2f kHz; Bit rate: %6.3f MBit/s' %\
+ ((self.samplerate / 200 / self.frame_length), (self.samplerate / 200 * 64 / 1000 / self.frame_length))]])
+ else:
+ self.putx(self.bitrate_message_start, self.bitrate_message_end, [0, ['No sample rate given']])
+ self.frame_counter += 1
return
# We've seen a preamble.
if pulse == 1 and self.first_one:
self.first_one = False
- self.subframe.append([pulse, self.samplenum - \
- self.pulse_width - 1, self.samplenum])
+ self.subframe.append([pulse, self.samplenum - self.pulse_width, self.samplenum])
elif pulse == 1 and not self.first_one:
self.subframe[-1][2] = self.samplenum
self.putx(self.subframe[-1][1], self.samplenum, [2, ['1']])
self.bitcount += 1
self.first_one = True
else:
- self.subframe.append([pulse, self.samplenum - \
- self.pulse_width - 1, self.samplenum])
- self.putx(self.samplenum - self.pulse_width - 1,
- self.samplenum, [2, ['0']])
+ self.subframe.append([pulse, self.samplenum - self.pulse_width, self.samplenum])
+ self.putx(self.samplenum - self.pulse_width, self.samplenum, [2, ['0']])
self.bitcount += 1
if self.bitcount == 28:
self.last_preamble = self.samplenum
- def decode(self, ss, es, data):
+ def decode(self):
+ # Set samplerate to 0 if it is not given. Decoding is still possible.
if not self.samplerate:
- raise SamplerateError('Cannot decode without samplerate.')
-
- for (self.samplenum, pins) in data:
- data = pins[0]
-
- # Initialize self.olddata with the first sample value.
- if self.olddata is None:
- self.olddata = data
- continue
-
- # First we need to recover the clock.
- if self.olddata == data:
- self.pulse_width += 1
- continue
-
- # Found rising or falling edge.
- if self.first_edge:
- # Throw away first detected edge as it might be mangled data.
- self.first_edge = False
- self.pulse_width = 0
- else:
- if self.state == 'GET FIRST PULSE WIDTH':
- self.find_first_pulse_width()
- elif self.state == 'GET SECOND PULSE WIDTH':
- self.find_second_pulse_width()
- elif self.state == 'GET THIRD PULSE WIDTH':
- self.find_third_pulse_width()
- elif self.state == 'DECODE STREAM':
- self.decode_stream()
- elif self.state == 'DECODE PREAMBLE':
- self.decode_preamble()
-
- self.pulse_width = 0
-
- self.olddata = data
+ self.samplerate = 0
+
+ # Throw away first two edges as it might be mangled data.
+ self.wait({0: 'e'})
+ self.wait({0: 'e'})
+ self.ss_edge = 0
+ self.puty([2, ['Skip']])
+ self.ss_edge = self.samplenum
+ self.samplenum_prev_edge = self.samplenum
+
+ while True:
+ # Wait for any edge (rising or falling).
+ (data,) = self.wait({0: 'e'})
+ self.pulse_width = self.samplenum - self.samplenum_prev_edge
+ self.samplenum_prev_edge = self.samplenum
+
+ if self.state == 'GET FIRST PULSE WIDTH':
+ self.find_first_pulse_width()
+ elif self.state == 'GET SECOND PULSE WIDTH':
+ self.find_second_pulse_width()
+ elif self.state == 'GET THIRD PULSE WIDTH':
+ self.find_third_pulse_width()
+ elif self.state == 'DECODE STREAM':
+ self.decode_stream()
+ elif self.state == 'DECODE PREAMBLE':
+ self.decode_preamble()