// Receive notifications when new sample data is available
connect_input_notifiers();
- if (get_input_segment_count() == 0) {
+ if (get_input_segment_count() == 0)
set_error_message(tr("No input data"));
- return;
- }
// Make sure the logic output data is complete and up-to-date
logic_mux_interrupt_ = false;
qDebug().nospace() << name() << ": " << msg;
}
+bool DecodeSignal::all_input_segments_complete(uint32_t segment_id) const
+{
+ bool all_complete = true;
+
+ for (const decode::DecodeChannel& ch : channels_)
+ if (ch.assigned_signal) {
+ if (!ch.assigned_signal->logic_data())
+ continue;
+
+ const shared_ptr<Logic> logic_data = ch.assigned_signal->logic_data();
+ if (logic_data->logic_segments().empty())
+ return false;
+
+ if (segment_id >= logic_data->logic_segments().size())
+ return false;
+
+ const shared_ptr<LogicSegment> segment = logic_data->logic_segments()[segment_id];
+ if (!segment->is_complete())
+ all_complete = false;
+ }
+
+ return all_complete;
+}
+
uint32_t DecodeSignal::get_input_segment_count() const
{
uint64_t count = std::numeric_limits<uint64_t>::max();
void DecodeSignal::logic_mux_proc()
{
- uint32_t segment_id = 0;
+ uint32_t input_segment_count;
+ do {
+ input_segment_count = get_input_segment_count();
+ if (input_segment_count == 0) {
+ // Wait for input data
+ unique_lock<mutex> logic_mux_lock(logic_mux_mutex_);
+ logic_mux_cond_.wait(logic_mux_lock);
+ }
+ } while ((!logic_mux_interrupt_) && (input_segment_count == 0));
+
+ if (logic_mux_interrupt_)
+ return;
assert(logic_mux_data_);
+ uint32_t segment_id = 0;
+
// Create initial logic mux segment
shared_ptr<LogicSegment> output_segment =
make_shared<LogicSegment>(*logic_mux_data_, segment_id,
output_segment->set_samplerate(get_input_samplerate(0));
+ // Logic mux data is being updated
+ logic_mux_data_invalid_ = false;
+
+ uint64_t samples_to_process;
do {
- const uint64_t input_sample_count = get_working_sample_count(segment_id);
- const uint64_t output_sample_count = output_segment->get_sample_count();
+ do {
+ const uint64_t input_sample_count = get_working_sample_count(segment_id);
+ const uint64_t output_sample_count = output_segment->get_sample_count();
- const uint64_t samples_to_process =
- (input_sample_count > output_sample_count) ?
- (input_sample_count - output_sample_count) : 0;
+ samples_to_process =
+ (input_sample_count > output_sample_count) ?
+ (input_sample_count - output_sample_count) : 0;
- // Process the samples if necessary...
- if (samples_to_process > 0) {
- const uint64_t unit_size = output_segment->unit_size();
- const uint64_t chunk_sample_count = DecodeChunkLength / unit_size;
+ if (samples_to_process > 0) {
+ const uint64_t unit_size = output_segment->unit_size();
+ const uint64_t chunk_sample_count = DecodeChunkLength / unit_size;
- uint64_t processed_samples = 0;
- do {
- const uint64_t start_sample = output_sample_count + processed_samples;
- const uint64_t sample_count =
- min(samples_to_process - processed_samples, chunk_sample_count);
+ uint64_t processed_samples = 0;
+ do {
+ const uint64_t start_sample = output_sample_count + processed_samples;
+ const uint64_t sample_count =
+ min(samples_to_process - processed_samples, chunk_sample_count);
- mux_logic_samples(segment_id, start_sample, start_sample + sample_count);
- processed_samples += sample_count;
+ mux_logic_samples(segment_id, start_sample, start_sample + sample_count);
+ processed_samples += sample_count;
- // ...and process the newly muxed logic data
- decode_input_cond_.notify_one();
- } while (!logic_mux_interrupt_ && (processed_samples < samples_to_process));
- }
+ // ...and process the newly muxed logic data
+ decode_input_cond_.notify_one();
+ } while (!logic_mux_interrupt_ && (processed_samples < samples_to_process));
+ }
+ } while (!logic_mux_interrupt_ && (samples_to_process > 0));
- if (samples_to_process == 0) {
- // TODO Optimize this by caching the input segment count and only
- // querying it when the cached value was reached
- if (segment_id < get_input_segment_count() - 1) {
- // Process next segment
- segment_id++;
+ if (!logic_mux_interrupt_) {
+ // samples_to_process is now 0, we've exhausted the currently available input data
- output_segment =
- make_shared<LogicSegment>(*logic_mux_data_, segment_id,
- logic_mux_unit_size_, 0);
- logic_mux_data_->push_segment(output_segment);
+ // If the input segments are complete, we've completed this segment
+ if (all_input_segments_complete(segment_id)) {
+ if (!output_segment->is_complete())
+ output_segment->set_complete();
- output_segment->set_samplerate(get_input_samplerate(segment_id));
- } else {
- // All segments have been processed
- logic_mux_data_invalid_ = false;
+ if (segment_id < get_input_segment_count() - 1) {
+ // Process next segment
+ segment_id++;
+
+ output_segment =
+ make_shared<LogicSegment>(*logic_mux_data_, segment_id,
+ logic_mux_unit_size_, 0);
+ logic_mux_data_->push_segment(output_segment);
+ output_segment->set_samplerate(get_input_samplerate(segment_id));
+ }
+ } else {
// Wait for more input
unique_lock<mutex> logic_mux_lock(logic_mux_mutex_);
logic_mux_cond_.wait(logic_mux_lock);
}
}
-
} while (!logic_mux_interrupt_);
}
input_segment->get_samples(i, chunk_end, chunk);
if (srd_session_send(srd_session_, i, chunk_end, chunk,
- data_size, unit_size) != SRD_OK)
+ data_size, unit_size) != SRD_OK) {
set_error_message(tr("Decoder reported an error"));
+ decode_interrupt_ = true;
+ }
delete[] chunk;
current_segment_id_ = 0;
// If there is no input data available yet, wait until it is or we're interrupted
- if (logic_mux_data_->logic_segments().size() == 0) {
- unique_lock<mutex> input_wait_lock(input_mutex_);
- decode_input_cond_.wait(input_wait_lock);
- }
+ do {
+ if (logic_mux_data_->logic_segments().size() == 0) {
+ // Wait for input data
+ unique_lock<mutex> input_wait_lock(input_mutex_);
+ decode_input_cond_.wait(input_wait_lock);
+ }
+ } while ((!decode_interrupt_) && (logic_mux_data_->logic_segments().size() == 0));
if (decode_interrupt_)
return;
start_srd_session();
- uint64_t sample_count = 0;
+ uint64_t samples_to_process = 0;
uint64_t abs_start_samplenum = 0;
do {
// Keep processing new samples until we exhaust the input data
do {
- lock_guard<mutex> input_lock(input_mutex_);
- sample_count = input_segment->get_sample_count() - abs_start_samplenum;
+ samples_to_process = input_segment->get_sample_count() - abs_start_samplenum;
- if (sample_count > 0) {
- decode_data(abs_start_samplenum, sample_count, input_segment);
- abs_start_samplenum += sample_count;
+ if (samples_to_process > 0) {
+ decode_data(abs_start_samplenum, samples_to_process, input_segment);
+ abs_start_samplenum += samples_to_process;
}
- } while (error_message_.isEmpty() && (sample_count > 0) && !decode_interrupt_);
-
- if (error_message_.isEmpty() && !decode_interrupt_ && sample_count == 0) {
- if (current_segment_id_ < logic_mux_data_->logic_segments().size() - 1) {
- // Process next segment
- current_segment_id_++;
-
- try {
- input_segment = logic_mux_data_->logic_segments().at(current_segment_id_);
- } catch (out_of_range&) {
- qDebug() << "Decode error for" << name() << ": no logic mux segment" \
- << current_segment_id_ << "in decode_proc(), mux segments size is" \
- << logic_mux_data_->logic_segments().size();
- return;
+ } while (!decode_interrupt_ && (samples_to_process > 0));
+
+ if (!decode_interrupt_) {
+ // samples_to_process is now 0, we've exhausted the currently available input data
+
+ // If the input segment is complete, we've exhausted this segment
+ if (input_segment->is_complete()) {
+ if (current_segment_id_ < logic_mux_data_->logic_segments().size() - 1) {
+ // Process next segment
+ current_segment_id_++;
+
+ try {
+ input_segment = logic_mux_data_->logic_segments().at(current_segment_id_);
+ } catch (out_of_range&) {
+ qDebug() << "Decode error for" << name() << ": no logic mux segment" \
+ << current_segment_id_ << "in decode_proc(), mux segments size is" \
+ << logic_mux_data_->logic_segments().size();
+ decode_interrupt_ = true;
+ return;
+ }
+ abs_start_samplenum = 0;
+
+ // Create the next segment and set its metadata
+ create_decode_segment();
+ segments_.at(current_segment_id_).samplerate = input_segment->samplerate();
+ segments_.at(current_segment_id_).start_time = input_segment->start_time();
+
+ // Reset decoder state but keep the decoder stack intact
+ terminate_srd_session();
+ } else {
+ // All segments have been processed
+ decode_finished();
}
- abs_start_samplenum = 0;
-
- // Create the next segment and set its metadata
- create_decode_segment();
- segments_.at(current_segment_id_).samplerate = input_segment->samplerate();
- segments_.at(current_segment_id_).start_time = input_segment->start_time();
-
- // Reset decoder state but keep the decoder stack intact
- terminate_srd_session();
} else {
- // All segments have been processed
- decode_finished();
-
- // Wait for new input data or an interrupt was requested
+ // Wait for more input data
unique_lock<mutex> input_wait_lock(input_mutex_);
decode_input_cond_.wait(input_wait_lock);
}
}
- } while (error_message_.isEmpty() && !decode_interrupt_);
+ } while (!decode_interrupt_);
// Potentially reap decoders when the application no longer is
// interested in their (pending) results.
continue;
const data::SignalBase *signal = ch.assigned_signal;
- connect(signal, SIGNAL(samples_cleared()),
- this, SLOT(on_data_cleared()));
- connect(signal, SIGNAL(samples_added(uint64_t, uint64_t, uint64_t)),
- this, SLOT(on_data_received()));
+ connect(signal, &data::SignalBase::samples_cleared,
+ this, &DecodeSignal::on_data_cleared);
+ connect(signal, &data::SignalBase::samples_added,
+ this, &DecodeSignal::on_data_received);
}
}
connect(&session_, SIGNAL(capture_state_changed(int)),
this, SLOT(on_capture_state_changed(int)));
- connect(&session_, SIGNAL(data_received()),
- this, SLOT(on_data_received()));
}
MathSignal::~MathSignal()
return result;
}
-void MathSignal::update_completeness(uint32_t segment_id)
+void MathSignal::update_completeness(uint32_t segment_id, uint64_t output_sample_count)
{
bool output_complete = true;
}
const shared_ptr<AnalogSegment> segment = analog_segments.at(segment_id);
- if (!segment->is_complete())
+ if (!segment->is_complete()) {
+ output_complete = false;
+ continue;
+ }
+
+ if (output_sample_count < segment->get_sample_count())
output_complete = false;
}
+ } else {
+ // We're done when we generated as many samples as the stopped session is long
+ if ((session_.get_capture_state() != Session::Stopped) ||
+ (output_sample_count < session_.get_segment_sample_count(segment_id)))
+ output_complete = false;
}
if (output_complete)
return;
}
+ disconnect(this, SLOT(on_data_received()));
+
fnc_sig_sample_ = new sig_sample<double>(*this);
exprtk_unknown_symbol_table_ = new exprtk::symbol_table<double>();
}
if (error_message_.isEmpty()) {
+ // Connect to the session data notification if we have no input signals
+ if (input_signals_.empty())
+ connect(&session_, SIGNAL(data_received()),
+ this, SLOT(on_data_received()));
+
gen_interrupt_ = false;
gen_thread_ = std::thread(&MathSignal::generation_proc, this);
}
void MathSignal::generation_proc()
{
- uint32_t segment_id = 0;
-
// Don't do anything until we have a valid sample rate
do {
if (use_custom_sample_rate_)
if (gen_interrupt_)
return;
+ uint32_t segment_id = 0;
shared_ptr<Analog> analog = analog_data();
// Create initial analog segment
processed_samples += sample_count;
// Notify consumers of this signal's data
- // TODO Does this work when a conversion is active?
samples_added(segment_id, start_sample, start_sample + processed_samples);
} while (!gen_interrupt_ && (processed_samples < samples_to_process));
}
- if (samples_to_process == 0) {
- update_completeness(segment_id);
+ update_completeness(segment_id, output_sample_count);
- if (segment_id < session_.get_highest_segment_id()) {
+ if (output_segment->is_complete() && (segment_id < session_.get_highest_segment_id())) {
// Process next segment
segment_id++;
output_segment =
make_shared<AnalogSegment>(*analog.get(), segment_id, analog->get_samplerate());
analog->push_segment(output_segment);
- } else {
- // All segments have been processed, wait for more input
- unique_lock<mutex> gen_input_lock(input_mutex_);
- gen_input_cond_.wait(gen_input_lock);
- }
}
+ if (!gen_interrupt_ && (samples_to_process == 0)) {
+ // Wait for more input
+ unique_lock<mutex> gen_input_lock(input_mutex_);
+ gen_input_cond_.wait(gen_input_lock);
+ }
} while (!gen_interrupt_);
}
if (!sb->analog_data())
continue;
+ connect(sb->analog_data().get(), SIGNAL(samples_added(SharedPtrToSegment, uint64_t, uint64_t)),
+ this, SLOT(on_data_received()));
connect(sb->analog_data().get(), SIGNAL(segment_completed()),
this, SLOT(on_data_received()));
sig_data->sample_num = sample_num;
sig_data->sample_value = segment->get_sample(sample_num);
- // We only have a reference if this signal is used as a scalar,
+ // We only have a reference if this signal is used as a scalar;
// if it's used by a function, it's null
if (sig_data->ref)
*(sig_data->ref) = sig_data->sample_value;
if (state == Session::Running)
begin_generation();
- if (state == Session::Stopped) {
- // If we have input signals, we use those as the indicators
- if (input_signals_.empty()) {
- shared_ptr<Analog> analog = analog_data();
- if (!analog->analog_segments().empty())
- analog->analog_segments().back()->set_complete();
- }
- }
+ // Make sure we don't miss any input samples, just in case
+ if (state == Session::Stopped)
+ gen_input_cond_.notify_one();
}
void MathSignal::on_data_received()