]> sigrok.org Git - pulseview.git/blobdiff - pv/data/mathsignal.cpp
Rework all subthread-based workers to make notifications more robust
[pulseview.git] / pv / data / mathsignal.cpp
index 24df0e71985b3688aa3d1953e323e13084d24d32..2ad90212fe01eb5c8cd5cef764aaed6f1b3150b5 100644 (file)
@@ -99,8 +99,6 @@ MathSignal::MathSignal(pv::Session &session) :
 
        connect(&session_, SIGNAL(capture_state_changed(int)),
                this, SLOT(on_capture_state_changed(int)));
-       connect(&session_, SIGNAL(data_received()),
-               this, SLOT(on_data_received()));
 }
 
 MathSignal::~MathSignal()
@@ -201,7 +199,7 @@ uint64_t MathSignal::get_working_sample_count(uint32_t segment_id) const
        return result;
 }
 
-void MathSignal::update_completeness(uint32_t segment_id)
+void MathSignal::update_completeness(uint32_t segment_id, uint64_t output_sample_count)
 {
        bool output_complete = true;
 
@@ -224,9 +222,19 @@ void MathSignal::update_completeness(uint32_t segment_id)
                        }
 
                        const shared_ptr<AnalogSegment> segment = analog_segments.at(segment_id);
-                       if (!segment->is_complete())
+                       if (!segment->is_complete()) {
+                               output_complete = false;
+                               continue;
+                       }
+
+                       if (output_sample_count < segment->get_sample_count())
                                output_complete = false;
                }
+       } else {
+               // We're done when we generated as many samples as the stopped session is long
+               if ((session_.get_capture_state() != Session::Stopped) ||
+                       (output_sample_count < session_.get_segment_sample_count(segment_id)))
+                       output_complete = false;
        }
 
        if (output_complete)
@@ -285,6 +293,8 @@ void MathSignal::begin_generation()
                return;
        }
 
+       disconnect(this, SLOT(on_data_received()));
+
        fnc_sig_sample_ = new sig_sample<double>(*this);
 
        exprtk_unknown_symbol_table_ = new exprtk::symbol_table<double>();
@@ -320,6 +330,11 @@ void MathSignal::begin_generation()
        }
 
        if (error_message_.isEmpty()) {
+               // Connect to the session data notification if we have no input signals
+               if (input_signals_.empty())
+                       connect(&session_, SIGNAL(data_received()),
+                               this, SLOT(on_data_received()));
+
                gen_interrupt_ = false;
                gen_thread_ = std::thread(&MathSignal::generation_proc, this);
        }
@@ -360,8 +375,6 @@ void MathSignal::generate_samples(uint32_t segment_id, const uint64_t start_samp
 
 void MathSignal::generation_proc()
 {
-       uint32_t segment_id = 0;
-
        // Don't do anything until we have a valid sample rate
        do {
                if (use_custom_sample_rate_)
@@ -378,6 +391,7 @@ void MathSignal::generation_proc()
        if (gen_interrupt_)
                return;
 
+       uint32_t segment_id = 0;
        shared_ptr<Analog> analog = analog_data();
 
        // Create initial analog segment
@@ -408,28 +422,26 @@ void MathSignal::generation_proc()
                                processed_samples += sample_count;
 
                                // Notify consumers of this signal's data
-                               // TODO Does this work when a conversion is active?
                                samples_added(segment_id, start_sample, start_sample + processed_samples);
                        } while (!gen_interrupt_ && (processed_samples < samples_to_process));
                }
 
-               if (samples_to_process == 0) {
-                       update_completeness(segment_id);
+               update_completeness(segment_id, output_sample_count);
 
-                       if (segment_id < session_.get_highest_segment_id()) {
+               if (output_segment->is_complete() && (segment_id < session_.get_highest_segment_id())) {
                                // Process next segment
                                segment_id++;
 
                                output_segment =
                                        make_shared<AnalogSegment>(*analog.get(), segment_id, analog->get_samplerate());
                                analog->push_segment(output_segment);
-                       } else {
-                               // All segments have been processed, wait for more input
-                               unique_lock<mutex> gen_input_lock(input_mutex_);
-                               gen_input_cond_.wait(gen_input_lock);
-                       }
                }
 
+               if (!gen_interrupt_ && (samples_to_process == 0)) {
+                       // Wait for more input
+                       unique_lock<mutex> gen_input_lock(input_mutex_);
+                       gen_input_cond_.wait(gen_input_lock);
+               }
        } while (!gen_interrupt_);
 }
 
@@ -450,6 +462,8 @@ signal_data* MathSignal::signal_from_name(const std::string& name)
                                if (!sb->analog_data())
                                        continue;
 
+                               connect(sb->analog_data().get(), SIGNAL(samples_added(SharedPtrToSegment, uint64_t, uint64_t)),
+                                       this, SLOT(on_data_received()));
                                connect(sb->analog_data().get(), SIGNAL(segment_completed()),
                                        this, SLOT(on_data_received()));
 
@@ -479,7 +493,7 @@ void MathSignal::update_signal_sample(signal_data* sig_data, uint32_t segment_id
        sig_data->sample_num = sample_num;
        sig_data->sample_value = segment->get_sample(sample_num);
 
-       // We only have a reference if this signal is used as a scalar,
+       // We only have a reference if this signal is used as a scalar;
        // if it's used by a function, it's null
        if (sig_data->ref)
                *(sig_data->ref) = sig_data->sample_value;
@@ -490,14 +504,9 @@ void MathSignal::on_capture_state_changed(int state)
        if (state == Session::Running)
                begin_generation();
 
-       if (state == Session::Stopped) {
-               // If we have input signals, we use those as the indicators
-               if (input_signals_.empty()) {
-                       shared_ptr<Analog> analog = analog_data();
-                       if (!analog->analog_segments().empty())
-                               analog->analog_segments().back()->set_complete();
-               }
-       }
+       // Make sure we don't miss any input samples, just in case
+       if (state == Session::Stopped)
+               gen_input_cond_.notify_one();
 }
 
 void MathSignal::on_data_received()