#include <libsigrokcxx/libsigrokcxx.hpp>
+#ifdef ENABLE_FLOW
+#include <gstreamermm.h>
+#include <libsigrokflow/libsigrokflow.hpp>
+#endif
+
#ifdef ENABLE_DECODE
#include <libsigrokdecode/libsigrokdecode.h>
#include "data/decodesignal.hpp"
using std::runtime_error;
using std::shared_ptr;
using std::string;
+#ifdef ENABLE_FLOW
+using std::unique_lock;
+#endif
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using Glib::VariantBase;
+#ifdef ENABLE_FLOW
+using Gst::Bus;
+using Gst::ElementFactory;
+using Gst::Pipeline;
+#endif
+
namespace pv {
shared_ptr<sigrok::Context> Session::sr_context;
{
assert(error_handler);
+#ifdef ENABLE_FLOW
+ pipeline_ = Pipeline::create();
+
+ source_ = ElementFactory::create_element("filesrc", "source");
+ sink_ = RefPtr<AppSink>::cast_dynamic(ElementFactory::create_element("appsink", "sink"));
+
+ pipeline_->add(source_)->add(sink_);
+ source_->link(sink_);
+
+ source_->set_property("location", Glib::ustring("/tmp/dummy_binary"));
+
+ sink_->set_property("emit-signals", TRUE);
+ sink_->signal_new_sample().connect(sigc::mem_fun(*this, &Session::on_gst_new_sample));
+
+ // Get the bus from the pipeline and add a bus watch to the default main context
+ RefPtr<Bus> bus = pipeline_->get_bus();
+ bus->add_watch(sigc::mem_fun(this, &Session::on_gst_bus_message));
+
+ // Start pipeline and Wait until it finished processing
+ pipeline_done_interrupt_ = false;
+ pipeline_->set_state(Gst::STATE_PLAYING);
+
+ unique_lock<mutex> pipeline_done_lock_(pipeline_done_mutex_);
+ pipeline_done_cond_.wait(pipeline_done_lock_);
+
+ // Let the pipeline free all resources
+ pipeline_->set_state(Gst::STATE_NULL);
+
+#else
if (!device_)
return;
// Confirm that SR_DF_END was received
if (cur_logic_segment_)
qDebug() << "WARNING: SR_DF_END was not received.";
+#endif
// Optimize memory usage
free_unused_memory();
segment_completed(segment_id);
}
+#ifdef ENABLE_FLOW
+bool Session::on_gst_bus_message(const Glib::RefPtr<Gst::Bus>& bus, const Glib::RefPtr<Gst::Message>& message)
+{
+ (void)bus;
+
+ if ((message->get_source() == pipeline_) && \
+ ((message->get_message_type() == Gst::MESSAGE_EOS)))
+ pipeline_done_cond_.notify_one();
+
+ // TODO Also evaluate MESSAGE_STREAM_STATUS to receive error notifications
+
+ return true;
+}
+
+Gst::FlowReturn Session::on_gst_new_sample()
+{
+ RefPtr<Gst::Sample> sample = sink_->pull_sample();
+ RefPtr<Gst::Buffer> buf = sample->get_buffer();
+
+ for (uint32_t block_id = 0; block_id < buf->n_memory(); block_id++) {
+ RefPtr<Gst::Memory> buf_mem = buf->get_memory(block_id);
+ Gst::MapInfo mapinfo;
+ buf_mem->map(mapinfo, Gst::MAP_READ);
+
+ shared_ptr<sigrok::Packet> logic_packet =
+ sr_context->create_logic_packet(mapinfo.get_data(), buf->get_size(), 1);
+
+ try {
+ feed_in_logic(dynamic_pointer_cast<sigrok::Logic>(logic_packet->payload()));
+ } catch (bad_alloc&) {
+ out_of_memory_ = true;
+ device_->stop();
+ buf_mem->unmap(mapinfo);
+ return Gst::FLOW_ERROR;
+ }
+
+ buf_mem->unmap(mapinfo);
+ }
+
+ return Gst::FLOW_OK;
+}
+#endif
+
void Session::feed_in_header()
{
// Nothing to do here for now
#ifndef PULSEVIEW_PV_SESSION_HPP
#define PULSEVIEW_PV_SESSION_HPP
+#ifdef ENABLE_FLOW
+#include <atomic>
+#include <condition_variable>
+#endif
+
#include <functional>
#include <map>
#include <memory>
#include <QSettings>
#include <QString>
+#ifdef ENABLE_FLOW
+#include <gstreamermm.h>
+#include <libsigrokflow/libsigrokflow.hpp>
+#endif
+
#include "util.hpp"
#include "views/viewbase.hpp"
+
using std::function;
using std::list;
using std::map;
using std::string;
using std::unordered_set;
+#ifdef ENABLE_FLOW
+using Glib::RefPtr;
+using Gst::AppSink;
+using Gst::Element;
+using Gst::Pipeline;
+#endif
+
struct srd_decoder;
struct srd_channel;
void signal_new_segment();
void signal_segment_completed();
+#ifdef ENABLE_FLOW
+ bool on_gst_bus_message(const Glib::RefPtr<Gst::Bus>& bus, const Glib::RefPtr<Gst::Message>& message);
+
+ Gst::FlowReturn on_gst_new_sample();
+#endif
+
void feed_in_header();
void feed_in_meta(shared_ptr<sigrok::Meta> meta);
bool out_of_memory_;
bool data_saved_;
bool frame_began_;
+
+#ifdef ENABLE_FLOW
+ RefPtr<Pipeline> pipeline_;
+ RefPtr<Element> source_;
+ RefPtr<AppSink> sink_;
+
+ mutable mutex pipeline_done_mutex_;
+ mutable condition_variable pipeline_done_cond_;
+ atomic<bool> pipeline_done_interrupt_;
+#endif
};
} // namespace pv