X-Git-Url: https://sigrok.org/gitweb/?a=blobdiff_plain;f=decoders%2Fspdif%2Fpd.py;h=1b7f6e92a7cf196f3e1e5edc4953d8c32d997f4c;hb=499bf266989634a02e33e5361a720f853934fe03;hp=964638655b4c12fcdb05235a21673cb99cc1d4eb;hpb=586361052b414d9e17fdecf54c5db0282c25339f;p=libsigrokdecode.git diff --git a/decoders/spdif/pd.py b/decoders/spdif/pd.py index 9646386..1b7f6e9 100644 --- a/decoders/spdif/pd.py +++ b/decoders/spdif/pd.py @@ -14,8 +14,7 @@ ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License -## along with this program; if not, write to the Free Software -## Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +## along with this program; if not, see . ## import sigrokdecode as srd @@ -24,31 +23,32 @@ class SamplerateError(Exception): pass class Decoder(srd.Decoder): - api_version = 2 + api_version = 3 id = 'spdif' name = 'S/PDIF' longname = 'Sony/Philips Digital Interface Format' desc = 'Serial bus for connecting digital audio devices.' license = 'gplv2+' inputs = ['logic'] - outputs = ['spdif'] + outputs = [] + tags = ['Audio', 'PC'] channels = ( {'id': 'data', 'name': 'Data', 'desc': 'Data line'}, ) annotations = ( ('bitrate', 'Bitrate / baudrate'), ('preamble', 'Preamble'), - ('bits', 'Bits'), - ('aux', 'Auxillary-audio-databits'), - ('samples', 'Audio Samples'), + ('bit', 'Bit'), + ('aux', 'Auxillary-audio-databit'), + ('sample', 'Audio Sample'), ('validity', 'Data Valid'), ('subcode', 'Subcode data'), ('chan_stat', 'Channnel Status'), ('parity', 'Parity Bit'), ) annotation_rows = ( - ('info', 'Info', (0, 1, 3, 5, 6, 7, 8)), ('bits', 'Bits', (2,)), + ('info', 'Info', (0, 1, 3, 5, 6, 7, 8)), ('samples', 'Samples', (4,)), ) @@ -58,11 +58,14 @@ class Decoder(srd.Decoder): def puty(self, data): self.put(self.ss_edge, self.samplenum, self.out_ann, data) - def __init__(self, **kwargs): + def __init__(self): + self.reset() + + def reset(self): self.state = 'GET FIRST PULSE WIDTH' - self.olddata = None self.ss_edge = None self.first_edge = True + self.samplenum_prev_edge = 0 self.pulse_width = 0 self.clocks = [] @@ -74,6 +77,14 @@ class Decoder(srd.Decoder): self.seen_preamble = False self.last_preamble = 0 + self.bitrate_message_start = 0 + self.bitrate_message_end = 0 + self.frame_counter = 0 + self.frame_start = 0 + self.frame_length = 0 + + self.sampleratetmp = 1 + self.first_one = True self.subframe = [] @@ -85,8 +96,6 @@ class Decoder(srd.Decoder): self.samplerate = value def get_pulse_type(self): - if self.range1 == 0 or self.range2 == 0: - return -1 if self.pulse_width >= self.range2: return 2 elif self.pulse_width >= self.range1: @@ -98,32 +107,54 @@ class Decoder(srd.Decoder): if self.pulse_width != 0: self.clocks.append(self.pulse_width) self.state = 'GET SECOND PULSE WIDTH' + self.puty([2, ['Found width 1: %d' % self.pulse_width, 'W1: %d' % self.pulse_width]]) + self.ss_edge = self.samplenum def find_second_pulse_width(self): if self.pulse_width > (self.clocks[0] * 1.3) or \ - self.pulse_width < (self.clocks[0] * 0.7): + self.pulse_width <= (self.clocks[0] * 0.75): + self.puty([2, ['Found width 2: %d' % self.pulse_width, 'W2: %d' % self.pulse_width]]) self.clocks.append(self.pulse_width) self.state = 'GET THIRD PULSE WIDTH' + else: + self.puty([2, ['Search width 2: %d' % self.pulse_width, 'SW2: %d' % self.pulse_width]]) + self.ss_edge = self.samplenum def find_third_pulse_width(self): if not ((self.pulse_width > (self.clocks[0] * 1.3) or \ - self.pulse_width < (self.clocks[0] * 0.7)) \ + self.pulse_width <= (self.clocks[0] * 0.75)) \ and (self.pulse_width > (self.clocks[1] * 1.3) or \ - self.pulse_width < (self.clocks[1] * 0.7))): + self.pulse_width <= (self.clocks[1] * 0.75))): + self.puty([2, ['Search width 3: %d' % self.pulse_width, 'SW3: %d' % self.pulse_width]]) + self.ss_edge = self.samplenum return + else: + self.puty([2, ['Found width 3: %d' % self.pulse_width, 'W3: %d' % self.pulse_width]]) + self.ss_edge = self.samplenum + # The message of the calculated bitrate should start at this sample + # (right after the synchronisation). + self.bitrate_message_start = self.samplenum self.clocks.append(self.pulse_width) self.clocks.sort() self.range1 = (self.clocks[0] + self.clocks[1]) / 2 self.range2 = (self.clocks[1] + self.clocks[2]) / 2 - spdif_bitrate = int(self.samplerate / (self.clocks[2] / 1.5)) + # Give some feedback during synchronisation and inform if sample rate + # is too low. + if self.clocks[0] <= 3: + self.putx(0, self.samplenum, [0, ['Short pulses detected. Increase sample rate!']]) + raise SamplerateError('Short pulses detected') + else: + self.putx(0, self.samplenum, [0, ['Synchronisation']]) self.ss_edge = 0 - self.puty([0, ['Signal Bitrate: %d Mbit/s (=> %d kHz)' % \ - (spdif_bitrate, (spdif_bitrate/ (2 * 32)))]]) - - clock_period_nsec = 1000000000 / spdif_bitrate + # Mostly, the synchronisation ends with a long pulse because they + # appear rarely. A skip of the next pulse will then prevent a 'M' + # frame to be labeled an unknown preamble for the first decoded frame. + (data,) = self.wait({0: 'e'}) + self.pulse_width = self.samplenum - self.samplenum_prev_edge + self.samplenum_prev_edge = self.samplenum self.last_preamble = self.samplenum # We are done recovering the clock, now let's decode the data stream. @@ -137,24 +168,39 @@ class Decoder(srd.Decoder): if pulse == 2: self.preamble.append(self.get_pulse_type()) self.state = 'DECODE PREAMBLE' - self.ss_edge = self.samplenum - self.pulse_width - 1 + self.ss_edge = self.samplenum - self.pulse_width + # Use the first ten frames to calculate bit rates + if self.frame_counter == 0: + # This is the first preamble to be decoded. Measurement of + # bit rates starts here. + self.frame_start = self.samplenum + # The bit rate message should end here. + self.bitrate_message_end = self.ss_edge + elif self.frame_counter == 10: + self.frame_length = self.samplenum - self.frame_start + # Use section between end of synchronisation and start of + # first preamble to show measured bit rates. + if self.samplerate: + self.putx(self.bitrate_message_start, self.bitrate_message_end,\ + [0, ['Audio samplingrate: %6.2f kHz; Bit rate: %6.3f MBit/s' %\ + ((self.samplerate / 200 / self.frame_length), (self.samplerate / 200 * 64 / 1000 / self.frame_length))]]) + else: + self.putx(self.bitrate_message_start, self.bitrate_message_end, [0, ['No sample rate given']]) + self.frame_counter += 1 return # We've seen a preamble. if pulse == 1 and self.first_one: self.first_one = False - self.subframe.append([pulse, self.samplenum - \ - self.pulse_width - 1, self.samplenum]) + self.subframe.append([pulse, self.samplenum - self.pulse_width, self.samplenum]) elif pulse == 1 and not self.first_one: self.subframe[-1][2] = self.samplenum self.putx(self.subframe[-1][1], self.samplenum, [2, ['1']]) self.bitcount += 1 self.first_one = True else: - self.subframe.append([pulse, self.samplenum - \ - self.pulse_width - 1, self.samplenum]) - self.putx(self.samplenum - self.pulse_width - 1, - self.samplenum, [2, ['0']]) + self.subframe.append([pulse, self.samplenum - self.pulse_width, self.samplenum]) + self.putx(self.samplenum - self.pulse_width, self.samplenum, [2, ['0']]) self.bitcount += 1 if self.bitcount == 28: @@ -210,7 +256,7 @@ class Decoder(srd.Decoder): elif self.preamble == [2, 1, 1, 2]: self.puty([1, ['Preamble B', 'B']]) else: - self.puty([1, ['Unknown Preamble', 'Unkown Prea.', 'U']]) + self.puty([1, ['Unknown Preamble', 'Unknown Prea.', 'U']]) self.preamble = [] self.seen_preamble = True self.bitcount = 0 @@ -218,40 +264,32 @@ class Decoder(srd.Decoder): self.last_preamble = self.samplenum - def decode(self, ss, es, data): + def decode(self): + # Set samplerate to 0 if it is not given. Decoding is still possible. if not self.samplerate: - raise SamplerateError('Cannot decode without samplerate.') - - for (self.samplenum, pins) in data: - data = pins[0] - - # Initialize self.olddata with the first sample value. - if self.olddata == None: - self.olddata = data - continue - - # First we need to recover the clock. - if self.olddata == data: - self.pulse_width += 1 - continue - - # Found rising or falling edge. - if self.first_edge: - # Throw away first detected edge as it might be mangled data. - self.first_edge = False - self.pulse_width = 0 - else: - if self.state == 'GET FIRST PULSE WIDTH': - self.find_first_pulse_width() - elif self.state == 'GET SECOND PULSE WIDTH': - self.find_second_pulse_width() - elif self.state == 'GET THIRD PULSE WIDTH': - self.find_third_pulse_width() - elif self.state == 'DECODE STREAM': - self.decode_stream() - elif self.state == 'DECODE PREAMBLE': - self.decode_preamble() - - self.pulse_width = 0 - - self.olddata = data + self.samplerate = 0 + + # Throw away first two edges as it might be mangled data. + self.wait({0: 'e'}) + self.wait({0: 'e'}) + self.ss_edge = 0 + self.puty([2, ['Skip']]) + self.ss_edge = self.samplenum + self.samplenum_prev_edge = self.samplenum + + while True: + # Wait for any edge (rising or falling). + (data,) = self.wait({0: 'e'}) + self.pulse_width = self.samplenum - self.samplenum_prev_edge + self.samplenum_prev_edge = self.samplenum + + if self.state == 'GET FIRST PULSE WIDTH': + self.find_first_pulse_width() + elif self.state == 'GET SECOND PULSE WIDTH': + self.find_second_pulse_width() + elif self.state == 'GET THIRD PULSE WIDTH': + self.find_third_pulse_width() + elif self.state == 'DECODE STREAM': + self.decode_stream() + elif self.state == 'DECODE PREAMBLE': + self.decode_preamble()