From: Joel Holdsworth Date: Sun, 2 Mar 2014 17:27:12 +0000 (+0000) Subject: Implemented threaded decode X-Git-Tag: pulseview-0.2.0~37 X-Git-Url: http://sigrok.org/gitweb/?p=pulseview.git;a=commitdiff_plain;h=f70d8673a56471c7e4b22159b99684da7d6c4be1 Implemented threaded decode --- diff --git a/pv/data/decoderstack.cpp b/pv/data/decoderstack.cpp index 3bb13284..87ac3261 100644 --- a/pv/data/decoderstack.cpp +++ b/pv/data/decoderstack.cpp @@ -38,7 +38,9 @@ using boost::lock_guard; using boost::mutex; +using boost::optional; using boost::shared_ptr; +using boost::unique_lock; using std::deque; using std::make_pair; using std::max; @@ -62,9 +64,16 @@ mutex DecoderStack::_global_decode_mutex; DecoderStack::DecoderStack(pv::SigSession &session, const srd_decoder *const dec) : _session(session), + _sample_count(0), + _frame_complete(false), _samples_decoded(0) { - connect(&_session, SIGNAL(frame_began()), this, SLOT(on_new_frame())); + connect(&_session, SIGNAL(frame_began()), + this, SLOT(on_new_frame())); + connect(&_session, SIGNAL(data_received()), + this, SLOT(on_data_received())); + connect(&_session, SIGNAL(frame_ended()), + this, SLOT(on_frame_ended())); _stack.push_back(shared_ptr( new decode::Decoder(dec))); @@ -164,6 +173,8 @@ QString DecoderStack::error_message() void DecoderStack::clear() { + _sample_count = 0; + _frame_complete = false; _samples_decoded = 0; _error_message = QString(); _rows.clear(); @@ -225,6 +236,13 @@ void DecoderStack::begin_decode() if (!data) return; + // Check we have a snapshot of data + const deque< shared_ptr > &snapshots = + data->get_snapshots(); + if (snapshots.empty()) + return; + _snapshot = snapshots.front(); + // Get the samplerate and start time _start_time = data->get_start_time(); _samplerate = data->samplerate(); @@ -247,16 +265,26 @@ uint64_t DecoderStack::get_max_sample_count() const return max_sample_count; } +optional DecoderStack::wait_for_data() const +{ + unique_lock input_lock(_input_mutex); + while(!boost::this_thread::interruption_requested() && + !_frame_complete && _samples_decoded >= _sample_count) + _input_cond.wait(input_lock); + return boost::make_optional( + !boost::this_thread::interruption_requested() && + (_samples_decoded < _sample_count || !_frame_complete), + _sample_count); +} + void DecoderStack::decode_data( - const shared_ptr &snapshot, + const int64_t sample_count, const unsigned int unit_size, srd_session *const session) { uint8_t chunk[DecodeChunkLength]; - const int64_t sample_count = snapshot->get_sample_count(); - const unsigned int unit_size = snapshot->unit_size(); const unsigned int chunk_sample_count = - DecodeChunkLength / snapshot->unit_size(); + DecodeChunkLength / _snapshot->unit_size(); for (int64_t i = 0; !boost::this_thread::interruption_requested() && @@ -267,7 +295,7 @@ void DecoderStack::decode_data( const int64_t chunk_end = min( i + chunk_sample_count, sample_count); - snapshot->get_samples(chunk, i, chunk_end); + _snapshot->get_samples(chunk, i, chunk_end); if (srd_session_send(session, i, i + sample_count, chunk, (chunk_end - i) * unit_size) != SRD_OK) { @@ -285,16 +313,12 @@ void DecoderStack::decode_data( void DecoderStack::decode_proc(shared_ptr data) { + optional sample_count; srd_session *session; srd_decoder_inst *prev_di = NULL; assert(data); - - // Check we have a snapshot of data - const deque< shared_ptr > &snapshots = - data->get_snapshots(); - if (snapshots.empty()) - return; + assert(_snapshot); // Check that all decoders have the required probes BOOST_FOREACH(const shared_ptr &dec, _stack) @@ -306,8 +330,7 @@ void DecoderStack::decode_proc(shared_ptr data) assert(session); // Create the decoders - const shared_ptr &snapshot = snapshots.front(); - const unsigned int unit_size = snapshot->unit_size(); + const unsigned int unit_size = _snapshot->unit_size(); BOOST_FOREACH(const shared_ptr &dec, _stack) { @@ -326,6 +349,12 @@ void DecoderStack::decode_proc(shared_ptr data) prev_di = di; } + // Get the intial sample count + { + unique_lock input_lock(_input_mutex); + sample_count = _sample_count = _snapshot->get_sample_count(); + } + // Start the session srd_session_metadata_set(session, SRD_CONF_SAMPLERATE, g_variant_new_uint64((uint64_t)_samplerate)); @@ -335,7 +364,9 @@ void DecoderStack::decode_proc(shared_ptr data) srd_session_start(session); - decode_data(snapshot, session); + do { + decode_data(*sample_count, unit_size, session); + } while(_error_message.isEmpty() && (sample_count = wait_for_data())); // Destroy the session srd_session_destroy(session); @@ -391,5 +422,23 @@ void DecoderStack::on_new_frame() begin_decode(); } +void DecoderStack::on_data_received() +{ + { + unique_lock lock(_input_mutex); + _sample_count = _snapshot->get_sample_count(); + } + _input_cond.notify_one(); +} + +void DecoderStack::on_frame_ended() +{ + { + unique_lock lock(_input_mutex); + _frame_complete = true; + } + _input_cond.notify_one(); +} + } // namespace data } // namespace pv diff --git a/pv/data/decoderstack.h b/pv/data/decoderstack.h index 308dce6e..073f2692 100644 --- a/pv/data/decoderstack.h +++ b/pv/data/decoderstack.h @@ -25,6 +25,7 @@ #include +#include #include #include @@ -103,9 +104,10 @@ public: void begin_decode(); private: - void decode_data( - const boost::shared_ptr &snapshot, - srd_session *const session); + boost::optional wait_for_data() const; + + void decode_data(const int64_t sample_count, + const unsigned int unit_size, srd_session *const session); void decode_proc(boost::shared_ptr data); @@ -115,6 +117,10 @@ private: private slots: void on_new_frame(); + void on_data_received(); + + void on_frame_ended(); + signals: void new_decode_data(); @@ -131,6 +137,13 @@ private: std::list< boost::shared_ptr > _stack; + boost::shared_ptr _snapshot; + + mutable boost::mutex _input_mutex; + mutable boost::condition_variable _input_cond; + int64_t _sample_count; + bool _frame_complete; + mutable boost::mutex _output_mutex; int64_t _samples_decoded;