From 144e72c9ec677e7df35d37d7de6e8a18bb3f2ba1 Mon Sep 17 00:00:00 2001 From: Soeren Apel Date: Sat, 22 Aug 2020 21:38:37 +0200 Subject: [PATCH] Rework all subthread-based workers to make notifications more robust --- pv/data/decodesignal.cpp | 216 ++++++++++++++++++++++++--------------- pv/data/decodesignal.hpp | 1 + pv/data/mathsignal.cpp | 57 ++++++----- pv/data/mathsignal.hpp | 2 +- pv/data/signalbase.cpp | 18 ++-- 5 files changed, 179 insertions(+), 115 deletions(-) diff --git a/pv/data/decodesignal.cpp b/pv/data/decodesignal.cpp index a17ea693..256c29b8 100644 --- a/pv/data/decodesignal.cpp +++ b/pv/data/decodesignal.cpp @@ -235,10 +235,8 @@ void DecodeSignal::begin_decode() // Receive notifications when new sample data is available connect_input_notifiers(); - if (get_input_segment_count() == 0) { + if (get_input_segment_count() == 0) set_error_message(tr("No input data")); - return; - } // Make sure the logic output data is complete and up-to-date logic_mux_interrupt_ = false; @@ -838,6 +836,30 @@ void DecodeSignal::set_error_message(QString msg) qDebug().nospace() << name() << ": " << msg; } +bool DecodeSignal::all_input_segments_complete(uint32_t segment_id) const +{ + bool all_complete = true; + + for (const decode::DecodeChannel& ch : channels_) + if (ch.assigned_signal) { + if (!ch.assigned_signal->logic_data()) + continue; + + const shared_ptr logic_data = ch.assigned_signal->logic_data(); + if (logic_data->logic_segments().empty()) + return false; + + if (segment_id >= logic_data->logic_segments().size()) + return false; + + const shared_ptr segment = logic_data->logic_segments()[segment_id]; + if (!segment->is_complete()) + all_complete = false; + } + + return all_complete; +} + uint32_t DecodeSignal::get_input_segment_count() const { uint64_t count = std::numeric_limits::max(); @@ -1077,10 +1099,23 @@ void DecodeSignal::mux_logic_samples(uint32_t segment_id, const int64_t start, c void DecodeSignal::logic_mux_proc() { - uint32_t segment_id = 0; + uint32_t input_segment_count; + do { + input_segment_count = get_input_segment_count(); + if (input_segment_count == 0) { + // Wait for input data + unique_lock logic_mux_lock(logic_mux_mutex_); + logic_mux_cond_.wait(logic_mux_lock); + } + } while ((!logic_mux_interrupt_) && (input_segment_count == 0)); + + if (logic_mux_interrupt_) + return; assert(logic_mux_data_); + uint32_t segment_id = 0; + // Create initial logic mux segment shared_ptr output_segment = make_shared(*logic_mux_data_, segment_id, @@ -1089,56 +1124,63 @@ void DecodeSignal::logic_mux_proc() output_segment->set_samplerate(get_input_samplerate(0)); + // Logic mux data is being updated + logic_mux_data_invalid_ = false; + + uint64_t samples_to_process; do { - const uint64_t input_sample_count = get_working_sample_count(segment_id); - const uint64_t output_sample_count = output_segment->get_sample_count(); + do { + const uint64_t input_sample_count = get_working_sample_count(segment_id); + const uint64_t output_sample_count = output_segment->get_sample_count(); - const uint64_t samples_to_process = - (input_sample_count > output_sample_count) ? - (input_sample_count - output_sample_count) : 0; + samples_to_process = + (input_sample_count > output_sample_count) ? + (input_sample_count - output_sample_count) : 0; - // Process the samples if necessary... - if (samples_to_process > 0) { - const uint64_t unit_size = output_segment->unit_size(); - const uint64_t chunk_sample_count = DecodeChunkLength / unit_size; + if (samples_to_process > 0) { + const uint64_t unit_size = output_segment->unit_size(); + const uint64_t chunk_sample_count = DecodeChunkLength / unit_size; - uint64_t processed_samples = 0; - do { - const uint64_t start_sample = output_sample_count + processed_samples; - const uint64_t sample_count = - min(samples_to_process - processed_samples, chunk_sample_count); + uint64_t processed_samples = 0; + do { + const uint64_t start_sample = output_sample_count + processed_samples; + const uint64_t sample_count = + min(samples_to_process - processed_samples, chunk_sample_count); - mux_logic_samples(segment_id, start_sample, start_sample + sample_count); - processed_samples += sample_count; + mux_logic_samples(segment_id, start_sample, start_sample + sample_count); + processed_samples += sample_count; - // ...and process the newly muxed logic data - decode_input_cond_.notify_one(); - } while (!logic_mux_interrupt_ && (processed_samples < samples_to_process)); - } + // ...and process the newly muxed logic data + decode_input_cond_.notify_one(); + } while (!logic_mux_interrupt_ && (processed_samples < samples_to_process)); + } + } while (!logic_mux_interrupt_ && (samples_to_process > 0)); - if (samples_to_process == 0) { - // TODO Optimize this by caching the input segment count and only - // querying it when the cached value was reached - if (segment_id < get_input_segment_count() - 1) { - // Process next segment - segment_id++; + if (!logic_mux_interrupt_) { + // samples_to_process is now 0, we've exhausted the currently available input data - output_segment = - make_shared(*logic_mux_data_, segment_id, - logic_mux_unit_size_, 0); - logic_mux_data_->push_segment(output_segment); + // If the input segments are complete, we've completed this segment + if (all_input_segments_complete(segment_id)) { + if (!output_segment->is_complete()) + output_segment->set_complete(); - output_segment->set_samplerate(get_input_samplerate(segment_id)); - } else { - // All segments have been processed - logic_mux_data_invalid_ = false; + if (segment_id < get_input_segment_count() - 1) { + // Process next segment + segment_id++; + + output_segment = + make_shared(*logic_mux_data_, segment_id, + logic_mux_unit_size_, 0); + logic_mux_data_->push_segment(output_segment); + output_segment->set_samplerate(get_input_samplerate(segment_id)); + } + } else { // Wait for more input unique_lock logic_mux_lock(logic_mux_mutex_); logic_mux_cond_.wait(logic_mux_lock); } } - } while (!logic_mux_interrupt_); } @@ -1168,8 +1210,10 @@ void DecodeSignal::decode_data( input_segment->get_samples(i, chunk_end, chunk); if (srd_session_send(srd_session_, i, chunk_end, chunk, - data_size, unit_size) != SRD_OK) + data_size, unit_size) != SRD_OK) { set_error_message(tr("Decoder reported an error")); + decode_interrupt_ = true; + } delete[] chunk; @@ -1195,10 +1239,13 @@ void DecodeSignal::decode_proc() current_segment_id_ = 0; // If there is no input data available yet, wait until it is or we're interrupted - if (logic_mux_data_->logic_segments().size() == 0) { - unique_lock input_wait_lock(input_mutex_); - decode_input_cond_.wait(input_wait_lock); - } + do { + if (logic_mux_data_->logic_segments().size() == 0) { + // Wait for input data + unique_lock input_wait_lock(input_mutex_); + decode_input_cond_.wait(input_wait_lock); + } + } while ((!decode_interrupt_) && (logic_mux_data_->logic_segments().size() == 0)); if (decode_interrupt_) return; @@ -1213,52 +1260,57 @@ void DecodeSignal::decode_proc() start_srd_session(); - uint64_t sample_count = 0; + uint64_t samples_to_process = 0; uint64_t abs_start_samplenum = 0; do { // Keep processing new samples until we exhaust the input data do { - lock_guard input_lock(input_mutex_); - sample_count = input_segment->get_sample_count() - abs_start_samplenum; + samples_to_process = input_segment->get_sample_count() - abs_start_samplenum; - if (sample_count > 0) { - decode_data(abs_start_samplenum, sample_count, input_segment); - abs_start_samplenum += sample_count; + if (samples_to_process > 0) { + decode_data(abs_start_samplenum, samples_to_process, input_segment); + abs_start_samplenum += samples_to_process; } - } while (error_message_.isEmpty() && (sample_count > 0) && !decode_interrupt_); - - if (error_message_.isEmpty() && !decode_interrupt_ && sample_count == 0) { - if (current_segment_id_ < logic_mux_data_->logic_segments().size() - 1) { - // Process next segment - current_segment_id_++; - - try { - input_segment = logic_mux_data_->logic_segments().at(current_segment_id_); - } catch (out_of_range&) { - qDebug() << "Decode error for" << name() << ": no logic mux segment" \ - << current_segment_id_ << "in decode_proc(), mux segments size is" \ - << logic_mux_data_->logic_segments().size(); - return; + } while (!decode_interrupt_ && (samples_to_process > 0)); + + if (!decode_interrupt_) { + // samples_to_process is now 0, we've exhausted the currently available input data + + // If the input segment is complete, we've exhausted this segment + if (input_segment->is_complete()) { + if (current_segment_id_ < logic_mux_data_->logic_segments().size() - 1) { + // Process next segment + current_segment_id_++; + + try { + input_segment = logic_mux_data_->logic_segments().at(current_segment_id_); + } catch (out_of_range&) { + qDebug() << "Decode error for" << name() << ": no logic mux segment" \ + << current_segment_id_ << "in decode_proc(), mux segments size is" \ + << logic_mux_data_->logic_segments().size(); + decode_interrupt_ = true; + return; + } + abs_start_samplenum = 0; + + // Create the next segment and set its metadata + create_decode_segment(); + segments_.at(current_segment_id_).samplerate = input_segment->samplerate(); + segments_.at(current_segment_id_).start_time = input_segment->start_time(); + + // Reset decoder state but keep the decoder stack intact + terminate_srd_session(); + } else { + // All segments have been processed + decode_finished(); } - abs_start_samplenum = 0; - - // Create the next segment and set its metadata - create_decode_segment(); - segments_.at(current_segment_id_).samplerate = input_segment->samplerate(); - segments_.at(current_segment_id_).start_time = input_segment->start_time(); - - // Reset decoder state but keep the decoder stack intact - terminate_srd_session(); } else { - // All segments have been processed - decode_finished(); - - // Wait for new input data or an interrupt was requested + // Wait for more input data unique_lock input_wait_lock(input_mutex_); decode_input_cond_.wait(input_wait_lock); } } - } while (error_message_.isEmpty() && !decode_interrupt_); + } while (!decode_interrupt_); // Potentially reap decoders when the application no longer is // interested in their (pending) results. @@ -1381,10 +1433,10 @@ void DecodeSignal::connect_input_notifiers() continue; const data::SignalBase *signal = ch.assigned_signal; - connect(signal, SIGNAL(samples_cleared()), - this, SLOT(on_data_cleared())); - connect(signal, SIGNAL(samples_added(uint64_t, uint64_t, uint64_t)), - this, SLOT(on_data_received())); + connect(signal, &data::SignalBase::samples_cleared, + this, &DecodeSignal::on_data_cleared); + connect(signal, &data::SignalBase::samples_added, + this, &DecodeSignal::on_data_received); } } diff --git a/pv/data/decodesignal.hpp b/pv/data/decodesignal.hpp index 3fa8317e..bfbf5ee0 100644 --- a/pv/data/decodesignal.hpp +++ b/pv/data/decodesignal.hpp @@ -191,6 +191,7 @@ public: private: void set_error_message(QString msg); + bool all_input_segments_complete(uint32_t segment_id) const; uint32_t get_input_segment_count() const; double get_input_samplerate(uint32_t segment_id) const; diff --git a/pv/data/mathsignal.cpp b/pv/data/mathsignal.cpp index 24df0e71..2ad90212 100644 --- a/pv/data/mathsignal.cpp +++ b/pv/data/mathsignal.cpp @@ -99,8 +99,6 @@ MathSignal::MathSignal(pv::Session &session) : connect(&session_, SIGNAL(capture_state_changed(int)), this, SLOT(on_capture_state_changed(int))); - connect(&session_, SIGNAL(data_received()), - this, SLOT(on_data_received())); } MathSignal::~MathSignal() @@ -201,7 +199,7 @@ uint64_t MathSignal::get_working_sample_count(uint32_t segment_id) const return result; } -void MathSignal::update_completeness(uint32_t segment_id) +void MathSignal::update_completeness(uint32_t segment_id, uint64_t output_sample_count) { bool output_complete = true; @@ -224,9 +222,19 @@ void MathSignal::update_completeness(uint32_t segment_id) } const shared_ptr segment = analog_segments.at(segment_id); - if (!segment->is_complete()) + if (!segment->is_complete()) { + output_complete = false; + continue; + } + + if (output_sample_count < segment->get_sample_count()) output_complete = false; } + } else { + // We're done when we generated as many samples as the stopped session is long + if ((session_.get_capture_state() != Session::Stopped) || + (output_sample_count < session_.get_segment_sample_count(segment_id))) + output_complete = false; } if (output_complete) @@ -285,6 +293,8 @@ void MathSignal::begin_generation() return; } + disconnect(this, SLOT(on_data_received())); + fnc_sig_sample_ = new sig_sample(*this); exprtk_unknown_symbol_table_ = new exprtk::symbol_table(); @@ -320,6 +330,11 @@ void MathSignal::begin_generation() } if (error_message_.isEmpty()) { + // Connect to the session data notification if we have no input signals + if (input_signals_.empty()) + connect(&session_, SIGNAL(data_received()), + this, SLOT(on_data_received())); + gen_interrupt_ = false; gen_thread_ = std::thread(&MathSignal::generation_proc, this); } @@ -360,8 +375,6 @@ void MathSignal::generate_samples(uint32_t segment_id, const uint64_t start_samp void MathSignal::generation_proc() { - uint32_t segment_id = 0; - // Don't do anything until we have a valid sample rate do { if (use_custom_sample_rate_) @@ -378,6 +391,7 @@ void MathSignal::generation_proc() if (gen_interrupt_) return; + uint32_t segment_id = 0; shared_ptr analog = analog_data(); // Create initial analog segment @@ -408,28 +422,26 @@ void MathSignal::generation_proc() processed_samples += sample_count; // Notify consumers of this signal's data - // TODO Does this work when a conversion is active? samples_added(segment_id, start_sample, start_sample + processed_samples); } while (!gen_interrupt_ && (processed_samples < samples_to_process)); } - if (samples_to_process == 0) { - update_completeness(segment_id); + update_completeness(segment_id, output_sample_count); - if (segment_id < session_.get_highest_segment_id()) { + if (output_segment->is_complete() && (segment_id < session_.get_highest_segment_id())) { // Process next segment segment_id++; output_segment = make_shared(*analog.get(), segment_id, analog->get_samplerate()); analog->push_segment(output_segment); - } else { - // All segments have been processed, wait for more input - unique_lock gen_input_lock(input_mutex_); - gen_input_cond_.wait(gen_input_lock); - } } + if (!gen_interrupt_ && (samples_to_process == 0)) { + // Wait for more input + unique_lock gen_input_lock(input_mutex_); + gen_input_cond_.wait(gen_input_lock); + } } while (!gen_interrupt_); } @@ -450,6 +462,8 @@ signal_data* MathSignal::signal_from_name(const std::string& name) if (!sb->analog_data()) continue; + connect(sb->analog_data().get(), SIGNAL(samples_added(SharedPtrToSegment, uint64_t, uint64_t)), + this, SLOT(on_data_received())); connect(sb->analog_data().get(), SIGNAL(segment_completed()), this, SLOT(on_data_received())); @@ -479,7 +493,7 @@ void MathSignal::update_signal_sample(signal_data* sig_data, uint32_t segment_id sig_data->sample_num = sample_num; sig_data->sample_value = segment->get_sample(sample_num); - // We only have a reference if this signal is used as a scalar, + // We only have a reference if this signal is used as a scalar; // if it's used by a function, it's null if (sig_data->ref) *(sig_data->ref) = sig_data->sample_value; @@ -490,14 +504,9 @@ void MathSignal::on_capture_state_changed(int state) if (state == Session::Running) begin_generation(); - if (state == Session::Stopped) { - // If we have input signals, we use those as the indicators - if (input_signals_.empty()) { - shared_ptr analog = analog_data(); - if (!analog->analog_segments().empty()) - analog->analog_segments().back()->set_complete(); - } - } + // Make sure we don't miss any input samples, just in case + if (state == Session::Stopped) + gen_input_cond_.notify_one(); } void MathSignal::on_data_received() diff --git a/pv/data/mathsignal.hpp b/pv/data/mathsignal.hpp index a1fb9ffb..6127b177 100644 --- a/pv/data/mathsignal.hpp +++ b/pv/data/mathsignal.hpp @@ -89,7 +89,7 @@ private: */ uint64_t get_working_sample_count(uint32_t segment_id) const; - void update_completeness(uint32_t segment_id); + void update_completeness(uint32_t segment_id, uint64_t output_sample_count); void reset_generation(); void begin_generation(); diff --git a/pv/data/signalbase.cpp b/pv/data/signalbase.cpp index 274de1a1..104243b1 100644 --- a/pv/data/signalbase.cpp +++ b/pv/data/signalbase.cpp @@ -676,6 +676,7 @@ void SignalBase::convert_single_segment_range(shared_ptr asegment lsegment->append_payload(logic->data_pointer(), logic->data_length()); samples_added(lsegment->segment_id(), i, i + ConversionBlockSize); + i += ConversionBlockSize; } @@ -778,6 +779,7 @@ void SignalBase::conversion_thread_proc() // Only advance to next segment if the current input segment is complete if (asegment->is_complete() && analog_data->analog_segments().size() > logic_data->logic_segments().size()) { + // There are more segments to process segment_id++; @@ -794,12 +796,12 @@ void SignalBase::conversion_thread_proc() logic_data->push_segment(new_segment); lsegment = logic_data->logic_segments().back(); - } else { - // No more samples/segments to process, wait for data or interrupt - if (!conversion_interrupt_) { - unique_lock input_lock(conversion_input_mutex_); - conversion_input_cond_.wait(input_lock); - } + } + + // No more samples/segments to process, wait for data or interrupt + if (!conversion_interrupt_) { + unique_lock input_lock(conversion_input_mutex_); + conversion_input_cond_.wait(input_lock); } } while (!conversion_interrupt_); } @@ -815,11 +817,11 @@ void SignalBase::start_conversion(bool delayed_start) if (converted_data_) converted_data_->clear(); + samples_cleared(); conversion_interrupt_ = false; - conversion_thread_ = std::thread( - &SignalBase::conversion_thread_proc, this); + conversion_thread_ = std::thread(&SignalBase::conversion_thread_proc, this); } void SignalBase::stop_conversion() -- 2.30.2