X-Git-Url: https://sigrok.org/gitweb/?a=blobdiff_plain;f=pv%2Fdata%2Fdecoderstack.cpp;h=c197e019fd16c6b1e1d5d35ff2aa02d62b9e2bfa;hb=df4c1a0691f6d3a457a475e496186d5a9decc54a;hp=3bb13284b425503c56786df5337e7c8018dfbb9d;hpb=28b9cc08aa62c02fd64dfb09eff4b9bfdb01f518;p=pulseview.git diff --git a/pv/data/decoderstack.cpp b/pv/data/decoderstack.cpp index 3bb13284..c197e019 100644 --- a/pv/data/decoderstack.cpp +++ b/pv/data/decoderstack.cpp @@ -38,7 +38,9 @@ using boost::lock_guard; using boost::mutex; +using boost::optional; using boost::shared_ptr; +using boost::unique_lock; using std::deque; using std::make_pair; using std::max; @@ -62,9 +64,16 @@ mutex DecoderStack::_global_decode_mutex; DecoderStack::DecoderStack(pv::SigSession &session, const srd_decoder *const dec) : _session(session), + _sample_count(0), + _frame_complete(false), _samples_decoded(0) { - connect(&_session, SIGNAL(frame_began()), this, SLOT(on_new_frame())); + connect(&_session, SIGNAL(frame_began()), + this, SLOT(on_new_frame())); + connect(&_session, SIGNAL(data_received()), + this, SLOT(on_data_received())); + connect(&_session, SIGNAL(frame_ended()), + this, SLOT(on_frame_ended())); _stack.push_back(shared_ptr( new decode::Decoder(dec))); @@ -164,6 +173,8 @@ QString DecoderStack::error_message() void DecoderStack::clear() { + _sample_count = 0; + _frame_complete = false; _samples_decoded = 0; _error_message = QString(); _rows.clear(); @@ -182,6 +193,14 @@ void DecoderStack::begin_decode() clear(); + // Check that all decoders have the required probes + BOOST_FOREACH(const shared_ptr &dec, _stack) + if (!dec->have_required_probes()) { + _error_message = tr("One or more required probes " + "have not been specified"); + return; + } + // Add classes BOOST_FOREACH (const shared_ptr &dec, _stack) { @@ -225,14 +244,20 @@ void DecoderStack::begin_decode() if (!data) return; + // Check we have a snapshot of data + const deque< shared_ptr > &snapshots = + data->get_snapshots(); + if (snapshots.empty()) + return; + _snapshot = snapshots.front(); + // Get the samplerate and start time _start_time = data->get_start_time(); _samplerate = data->samplerate(); if (_samplerate == 0.0) _samplerate = 1.0; - _decode_thread = boost::thread(&DecoderStack::decode_proc, this, - data); + _decode_thread = boost::thread(&DecoderStack::decode_proc, this); } uint64_t DecoderStack::get_max_sample_count() const @@ -247,16 +272,26 @@ uint64_t DecoderStack::get_max_sample_count() const return max_sample_count; } +optional DecoderStack::wait_for_data() const +{ + unique_lock input_lock(_input_mutex); + while(!boost::this_thread::interruption_requested() && + !_frame_complete && _samples_decoded >= _sample_count) + _input_cond.wait(input_lock); + return boost::make_optional( + !boost::this_thread::interruption_requested() && + (_samples_decoded < _sample_count || !_frame_complete), + _sample_count); +} + void DecoderStack::decode_data( - const shared_ptr &snapshot, + const int64_t sample_count, const unsigned int unit_size, srd_session *const session) { uint8_t chunk[DecodeChunkLength]; - const int64_t sample_count = snapshot->get_sample_count(); - const unsigned int unit_size = snapshot->unit_size(); const unsigned int chunk_sample_count = - DecodeChunkLength / snapshot->unit_size(); + DecodeChunkLength / _snapshot->unit_size(); for (int64_t i = 0; !boost::this_thread::interruption_requested() && @@ -267,7 +302,7 @@ void DecoderStack::decode_data( const int64_t chunk_end = min( i + chunk_sample_count, sample_count); - snapshot->get_samples(chunk, i, chunk_end); + _snapshot->get_samples(chunk, i, chunk_end); if (srd_session_send(session, i, i + sample_count, chunk, (chunk_end - i) * unit_size) != SRD_OK) { @@ -281,33 +316,24 @@ void DecoderStack::decode_data( } } + new_decode_data(); } -void DecoderStack::decode_proc(shared_ptr data) +void DecoderStack::decode_proc() { + optional sample_count; srd_session *session; srd_decoder_inst *prev_di = NULL; assert(data); - - // Check we have a snapshot of data - const deque< shared_ptr > &snapshots = - data->get_snapshots(); - if (snapshots.empty()) - return; - - // Check that all decoders have the required probes - BOOST_FOREACH(const shared_ptr &dec, _stack) - if (!dec->have_required_probes()) - return; + assert(_snapshot); // Create the session srd_session_new(&session); assert(session); // Create the decoders - const shared_ptr &snapshot = snapshots.front(); - const unsigned int unit_size = snapshot->unit_size(); + const unsigned int unit_size = _snapshot->unit_size(); BOOST_FOREACH(const shared_ptr &dec, _stack) { @@ -326,6 +352,12 @@ void DecoderStack::decode_proc(shared_ptr data) prev_di = di; } + // Get the intial sample count + { + unique_lock input_lock(_input_mutex); + sample_count = _sample_count = _snapshot->get_sample_count(); + } + // Start the session srd_session_metadata_set(session, SRD_CONF_SAMPLERATE, g_variant_new_uint64((uint64_t)_samplerate)); @@ -335,7 +367,9 @@ void DecoderStack::decode_proc(shared_ptr data) srd_session_start(session); - decode_data(snapshot, session); + do { + decode_data(*sample_count, unit_size, session); + } while(_error_message.isEmpty() && (sample_count = wait_for_data())); // Destroy the session srd_session_destroy(session); @@ -382,8 +416,6 @@ void DecoderStack::annotation_callback(srd_proto_data *pdata, void *decoder) // Add the annotation (*row_iter).second.push_annotation(a); - - d->new_decode_data(); } void DecoderStack::on_new_frame() @@ -391,5 +423,23 @@ void DecoderStack::on_new_frame() begin_decode(); } +void DecoderStack::on_data_received() +{ + { + unique_lock lock(_input_mutex); + _sample_count = _snapshot->get_sample_count(); + } + _input_cond.notify_one(); +} + +void DecoderStack::on_frame_ended() +{ + { + unique_lock lock(_input_mutex); + _frame_complete = true; + } + _input_cond.notify_one(); +} + } // namespace data } // namespace pv