asix-omega-rtm-cli: Implement RTM for ASIX OMEGA via external CLI process
authorGerhard Sittig <gerhard.sittig@gmx.net>
Mon, 11 Oct 2021 20:09:44 +0000 (22:09 +0200)
committerGerhard Sittig <gerhard.sittig@gmx.net>
Wed, 13 Oct 2021 16:14:17 +0000 (18:14 +0200)
The ASIX OMEGA vendor software provides a commandline application which
puts the device in "real time mode". The process' stdout provides a
continuous RLE compressed stream of samples for the 16 input signals,
recorded at 200MHz. The sigrok driver starts and terminates this process
for the duration of the acquisition.

This simple approach makes the OMEGA device available in a basic mode of
operation. The samplerate is fixed, hardware triggers are not available.
The binary data format is used to reduce the amount of inter process
communication. The vendor's Windows software also executes in Linux
under wine(1). All device detection, USB communication via FTDI FIFO
mode, firmware download, etc are transparently dealt with. This mode of
operation is an officially supported and documented feature.

The sigrok driver accepts the OMEGARTMCLI environment variable as the
specification of the vendor application's location, or falls back to the
omegartmcli executable name which should be in PATH. The conn= spec in
the sn= format can select one out of multiple connected devices.

The driver was tested on Linux with --samples and --time as well as with
Pulseview and manual acquisition stop while a huge limit was configured
that would not take effect.

src/hardware/asix-omega-rtm-cli/api.c
src/hardware/asix-omega-rtm-cli/protocol.c
src/hardware/asix-omega-rtm-cli/protocol.h

index 79d887bf47da596f01d00df9eb7686b3fef46d0c..71e922543f1a4c453233f6b9cc3d78cfc2342f45 100644 (file)
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+/*
+ * This sigrok driver implementation uses the vendor's CLI application
+ * for the ASIX OMEGA to operate the device in real time mode. The
+ * external process handles the device detection, USB communication
+ * (FTDI FIFO), FPGA netlist download, and device control. The process'
+ * stdout provides a continuous RLE compressed stream of 16bit samples
+ * taken at 200MHz.
+ *
+ * Known limitations: The samplerate is fixed. Hardware triggers are not
+ * available in this mode. The start of the acquisition takes a few
+ * seconds, but the device's native protocol is unknown and its firmware
+ * is unavailable, so that a native sigrok driver is in some distant
+ * future. Users need to initiate the acquisition in sigrok early so
+ * that the device is capturing when the event of interest happens.
+ *
+ * The vendor application's executable either must be named omegartmcli
+ * and must be found in PATH, or the OMEGARTMCLI environment variable
+ * must contain its location. A scan option could be used when a
+ * suitable SR_CONF key gets identified which communicates executable
+ * locations.
+ *
+ * When multiple devices are connected, then a conn=sn=... specification
+ * can select one of the devices. The serial number should contain six
+ * or eight hex digits (this follows the vendor's approach for the CLI
+ * application).
+ */
+
+/*
+ * Implementor's notes. Examples of program output which gets parsed by
+ * this sigrok driver.
+ *
+ *   $ ./omegartmcli.exe -version
+ *   omegartmcli.exe Omega Real-Time Mode
+ *   Version 2016-12-14
+ *   Copyright (c) 1991-2016 ASIX s.r.o.
+ *   Email: support@asix.net
+ *
+ *   $ ./omegartmcli.exe -bin [-serial SERNO] <NULL>
+ *   (five command line words including the terminator)
+ *
+ * The RTM CLI application terminates when its stdin closes, or when
+ * CTRL-C is pressed. The former is more portable across platforms. The
+ * stderr output should get ignored, it's rather noisy here under wine,
+ * communicates non-fatal diagnostics, and may communicate "progress"
+ * which we don't care about.
+ *
+ * Ideally the external process could get started earlier, and gets
+ * re-used across several sigrok acquisition activities. Unfortunately
+ * the driver's open/close actions lack a sigrok session, and cannot
+ * register the receive callback (or needs to duplicate common support
+ * code). When such an approach gets implemented, the external process'
+ * output must get drained even outside of sigrok acquisition phases,
+ * the cost of which is yet to get determined (depends on the input
+ * signals, may be expensive).
+ *
+ * The binary data format is used to reduce the amount of inter process
+ * communication. The format is rather simple: Three 16bit items (LE
+ * format) carry a timestamp (10ns resolution), and two 16bit samples
+ * (taken at 5ns intervals). The timestamp may translate to a repetition
+ * of the last sample a given number of times (RLE compression of idle
+ * phases where inputs don't change). The first timestamp after program
+ * startup is to get ignored. Chunks are sent after at most 32Ki 10ns
+ * ticks, to not overflow the 16bit counter. Which translates to a data
+ * volume of 6 bytes each 328us for idle inputs, higher for changing
+ * input signals.
+ *
+ * Is it useful to implement a set of samplerates in the sigrok driver,
+ * and downsample the data which is provided by the Asix application?
+ * This would not avoid the pressure of receiving the acquisition
+ * process' output, but may result in reduced cost on the sigrok side
+ * when users want to inspect slow signals, or export to "expensive"
+ * file formats.
+ *
+ * This driver implementation may benefit from software trigger support.
+ */
+
 #include <config.h>
-#include "protocol.h"
 
-static struct sr_dev_driver asix_omega_rtm_cli_driver_info;
+#include <stdlib.h>
+#include <string.h>
 
-static GSList *scan(struct sr_dev_driver *di, GSList *options)
-{
-       struct drv_context *drvc;
-       GSList *devices;
+#include "protocol.h"
 
-       (void)options;
+static const char *channel_names[] = {
+       "1", "2", "3", "4", "5", "6", "7", "8",
+       "9", "10", "11", "12", "13", "14", "15", "16",
+};
 
-       devices = NULL;
-       drvc = di->context;
-       drvc->instances = NULL;
+static const uint64_t samplerates[] = {
+       SR_MHZ(200),
+};
 
-       /* TODO: scan for devices, either based on a SR_CONF_CONN option
-        * or on a USB scan. */
+static const uint32_t scanopts[] = {
+       SR_CONF_CONN, /* Accepts serial number specs. */
+};
 
-       return devices;
-}
+static const uint32_t drvopts[] = {
+       SR_CONF_LOGIC_ANALYZER,
+};
+
+static const uint32_t devopts[] = {
+       SR_CONF_LIMIT_MSEC | SR_CONF_GET | SR_CONF_SET,
+       SR_CONF_LIMIT_SAMPLES | SR_CONF_GET | SR_CONF_SET,
+       SR_CONF_CONN | SR_CONF_GET,
+       SR_CONF_SAMPLERATE | SR_CONF_GET | SR_CONF_LIST,
+};
 
-static int dev_open(struct sr_dev_inst *sdi)
+static GSList *scan(struct sr_dev_driver *di, GSList *options)
 {
-       (void)sdi;
+       const char *conn, *serno, *exe;
+       GSList *devices;
+       size_t argc, chidx;
+       gchar **argv, *output, *vers_text, *eol;
+       GSpawnFlags flags;
+       GError *error;
+       gboolean ok;
+       char serno_buff[10];
+       struct sr_dev_inst *sdi;
+       struct dev_context *devc;
+
+       /* Extract optional serial number from conn= spec. */
+       conn = NULL;
+       (void)sr_serial_extract_options(options, &conn, NULL);
+       if (!conn || !*conn)
+               conn = NULL;
+       serno = NULL;
+       if (conn) {
+               if (!g_str_has_prefix(conn, "sn=")) {
+                       sr_err("conn= must specify a serial number.");
+                       return NULL;
+               }
+               serno = conn + strlen("sn=");
+               if (!*serno)
+                       serno = NULL;
+       }
+       if (serno)
+               sr_dbg("User specified serial number: %s", serno);
+       if (serno && strlen(serno) == 4) {
+               sr_dbg("Adding 03 prefix to user specified serial number");
+               snprintf(serno_buff, sizeof(serno_buff), "03%s", serno);
+               serno = serno_buff;
+       }
+       if (serno && strlen(serno) != 6 && strlen(serno) != 8) {
+               sr_err("Serial number must be 03xxxx or A603xxxx");
+               serno = NULL;
+       }
 
-       /* TODO: get handle from sdi->conn and open it. */
+       devices = NULL;
 
-       return SR_OK;
-}
+       /*
+        * Check availability of the external executable. Notice that
+        * failure is non-fatal, the scan can take place even when users
+        * don't request and don't expect to use Asix Omega devices.
+        */
+       exe = getenv("OMEGARTMCLI");
+       if (!exe || !*exe)
+               exe = "omegartmcli";
+       sr_dbg("Vendor application executable: %s", exe);
+       argv = g_malloc0(5 * sizeof(argv[0]));
+       argc = 0;
+       argv[argc++] = g_strdup(exe);
+       argv[argc++] = g_strdup("-version");
+       argv[argc++] = NULL;
+       flags = G_SPAWN_SEARCH_PATH | G_SPAWN_STDERR_TO_DEV_NULL;
+       output = NULL;
+       error = NULL;
+       ok = g_spawn_sync(NULL, argv, NULL, flags, NULL, NULL,
+               &output, NULL, NULL, &error);
+       if (error && error->code != G_SPAWN_ERROR_NOENT)
+               sr_err("Cannot execute RTM CLI process: %s", error->message);
+       if (error) {
+               ok = FALSE;
+               g_error_free(error);
+       }
+       if (!output || !*output)
+               ok = FALSE;
+       if (!ok) {
+               sr_dbg("External RTM CLI execution failed.");
+               g_free(output);
+               g_strfreev(argv);
+               return NULL;
+       }
 
-static int dev_close(struct sr_dev_inst *sdi)
-{
-       (void)sdi;
+       /*
+        * Get the executable's version from second stdout line. This
+        * only executes when the executable is found, failure to get
+        * the version information is considered fatal.
+        */
+       vers_text = strstr(output, "Version ");
+       if (!vers_text)
+               ok = FALSE;
+       if (ok) {
+               vers_text += strlen("Version ");
+               eol = strchr(vers_text, '\n');
+               if (eol)
+                       *eol = '\0';
+               eol = strchr(vers_text, '\r');
+               if (eol)
+                       *eol = '\0';
+               if (!vers_text || !*vers_text)
+                       ok = FALSE;
+       }
+       if (!ok) {
+               sr_err("Cannot get RTM CLI executable's version.");
+               g_free(output);
+               g_strfreev(argv);
+               return NULL;
+       }
+       sr_info("RTM CLI executable version: %s", vers_text);
+
+       /*
+        * Create a device instance, add it to the result set. Create a
+        * device context. Change the -version command into the command
+        * for acquisition for later use in the driver's lifetime.
+        */
+       sdi = g_malloc0(sizeof(*sdi));
+       devices = g_slist_append(devices, sdi);
+       sdi->status = SR_ST_INITIALIZING;
+       sdi->vendor = g_strdup("ASIX");
+       sdi->model = g_strdup("OMEGA RTM CLI");
+       sdi->version = g_strdup(vers_text);
+       if (serno)
+               sdi->serial_num = g_strdup(serno);
+       if (conn)
+               sdi->connection_id = g_strdup(conn);
+       for (chidx = 0; chidx < ARRAY_SIZE(channel_names); chidx++) {
+               sr_channel_new(sdi, chidx, SR_CHANNEL_LOGIC,
+                       TRUE, channel_names[chidx]);
+       }
 
-       /* TODO: get handle from sdi->conn and close it. */
+       devc = g_malloc0(sizeof(*devc));
+       sdi->priv = devc;
+       sr_sw_limits_init(&devc->limits);
+       argc = 1;
+       g_free(argv[argc]);
+       argv[argc++] = g_strdup("-bin");
+       if (serno) {
+               argv[argc++] = g_strdup("-serial");
+               argv[argc++] = g_strdup(serno);
+       }
+       argv[argc++] = NULL;
+       devc->child.argv = argv;
+       devc->child.flags = flags | G_SPAWN_CLOEXEC_PIPES;
+       devc->child.fd_stdin_write = -1;
+       devc->child.fd_stdout_read = -1;
 
-       return SR_OK;
+       return std_scan_complete(di, devices);
 }
 
 static int config_get(uint32_t key, GVariant **data,
        const struct sr_dev_inst *sdi, const struct sr_channel_group *cg)
 {
-       int ret;
+       struct dev_context *devc;
 
-       (void)sdi;
-       (void)data;
        (void)cg;
 
-       ret = SR_OK;
+       if (!sdi)
+               return SR_ERR_ARG;
+       devc = sdi->priv;
+
        switch (key) {
-       /* TODO */
+       case SR_CONF_CONN:
+               if (!sdi->connection_id)
+                       return SR_ERR_NA;
+               *data = g_variant_new_string(sdi->connection_id);
+               break;
+       case SR_CONF_SAMPLERATE:
+               *data = g_variant_new_uint64(samplerates[0]);
+               break;
+       case SR_CONF_LIMIT_MSEC:
+       case SR_CONF_LIMIT_SAMPLES:
+               return sr_sw_limits_config_get(&devc->limits, key, data);
        default:
                return SR_ERR_NA;
        }
 
-       return ret;
+       return SR_OK;
 }
 
 static int config_set(uint32_t key, GVariant *data,
        const struct sr_dev_inst *sdi, const struct sr_channel_group *cg)
 {
-       int ret;
+       struct dev_context *devc;
 
-       (void)sdi;
-       (void)data;
        (void)cg;
 
-       ret = SR_OK;
+       if (!sdi)
+               return SR_ERR_ARG;
+       devc = sdi->priv;
+
        switch (key) {
-       /* TODO */
+       case SR_CONF_LIMIT_MSEC:
+       case SR_CONF_LIMIT_SAMPLES:
+               return sr_sw_limits_config_set(&devc->limits, key, data);
        default:
-               ret = SR_ERR_NA;
+               return SR_ERR_NA;
        }
 
-       return ret;
+       return SR_OK;
 }
 
 static int config_list(uint32_t key, GVariant **data,
        const struct sr_dev_inst *sdi, const struct sr_channel_group *cg)
 {
-       int ret;
-
-       (void)sdi;
-       (void)data;
-       (void)cg;
 
-       ret = SR_OK;
        switch (key) {
-       /* TODO */
+       case SR_CONF_SCAN_OPTIONS:
+       case SR_CONF_DEVICE_OPTIONS:
+               if (cg)
+                       return SR_ERR_NA;
+               return STD_CONFIG_LIST(key, data, sdi, cg,
+                       scanopts, drvopts, devopts);
+       case SR_CONF_SAMPLERATE:
+               *data = std_gvar_samplerates(ARRAY_AND_SIZE(samplerates));
+               break;
        default:
                return SR_ERR_NA;
        }
 
-       return ret;
+       return SR_OK;
 }
 
 static int dev_acquisition_start(const struct sr_dev_inst *sdi)
 {
-       /* TODO: configure hardware, reset acquisition state, set up
-        * callbacks and send header packet. */
+       struct dev_context *devc;
+       int ret;
+       int fd, events;
+       uint64_t remain_count;
+
+       devc = sdi->priv;
+
+       /* Start the external acquisition process. */
+       ret = omega_rtm_cli_open(sdi);
+       if (ret != SR_OK)
+               return ret;
+       fd = devc->child.fd_stdout_read;
+       events = G_IO_IN | G_IO_ERR;
+
+       /*
+        * Start supervising acquisition limits. Arrange for a stricter
+        * "samples count" check than supported by the common approach.
+        */
+       sr_sw_limits_acquisition_start(&devc->limits);
+       ret = sr_sw_limits_get_remain(&devc->limits,
+               &remain_count, NULL, NULL, NULL);
+       if (ret != SR_OK)
+               return ret;
+       if (remain_count) {
+               devc->samples.remain_count = remain_count;
+               devc->samples.check_count = TRUE;
+       }
 
-       (void)sdi;
+       /* Send the session feed header. */
+       ret = std_session_send_df_header(sdi);
+       if (ret != SR_OK)
+               return ret;
+
+       /* Start processing the external process' output. */
+       ret = sr_session_source_add(sdi->session, fd, events, 10,
+               omega_rtm_cli_receive_data, (void *)sdi); /* Un-const. */
+       if (ret != SR_OK)
+               return ret;
 
        return SR_OK;
 }
 
 static int dev_acquisition_stop(struct sr_dev_inst *sdi)
 {
-       /* TODO: stop acquisition. */
+       struct dev_context *devc;
+       int ret;
+       int fd;
+
+       devc = sdi->priv;
+
+       /*
+        * Implementor's note: Do run all stop activities even if
+        * some of them may fail. Emit diagnostics messages as errors
+        * are seen, but don't return early.
+        */
+
+       /* Stop processing the external process' output. */
+       fd = devc->child.fd_stdout_read;
+       if (fd >= 0) {
+               ret = sr_session_source_remove(sdi->session, fd);
+               if (ret != SR_OK) {
+                       sr_err("Cannot stop reading acquisition data");
+               }
+       }
+
+       ret = std_session_send_df_end(sdi);
+       (void)ret;
 
-       (void)sdi;
+       ret = omega_rtm_cli_close(sdi);
+       if (ret != SR_OK) {
+               sr_err("Could not terminate acquisition process");
+       }
+       (void)ret;
 
        return SR_OK;
 }
@@ -145,8 +430,8 @@ static struct sr_dev_driver asix_omega_rtm_cli_driver_info = {
        .config_get = config_get,
        .config_set = config_set,
        .config_list = config_list,
-       .dev_open = dev_open,
-       .dev_close = dev_close,
+       .dev_open = std_dummy_dev_open,
+       .dev_close = std_dummy_dev_close,
        .dev_acquisition_start = dev_acquisition_start,
        .dev_acquisition_stop = dev_acquisition_stop,
        .context = NULL,
index aa9ed8101309b84c3cb7a2fec4c39c1f8de2b40c..e72036bfb32338e32a02955736613a089459f40b 100644 (file)
  */
 
 #include <config.h>
+
+#include <string.h>
+#include <unistd.h>
+
 #include "protocol.h"
 
-SR_PRIV int asix_omega_rtm_cli_receive_data(int fd, int revents, void *cb_data)
+/*
+ * Start the external acquisition process (vendor's CLI application).
+ * Get the initial response to verify its operation.
+ */
+SR_PRIV int omega_rtm_cli_open(const struct sr_dev_inst *sdi)
 {
-       const struct sr_dev_inst *sdi;
        struct dev_context *devc;
+       gboolean ok;
+       gchar **argv;
+       GSpawnFlags flags;
+       GPid pid;
+       gint fd_in, fd_out;
+       GError *error;
+       GString *txt;
+       ssize_t rcvd;
+       uint8_t rsp[3 * sizeof(uint16_t)];
+       const uint8_t *rdptr;
+       uint16_t stamp, sample1, sample2;
 
-       (void)fd;
+       if (!sdi)
+               return SR_ERR_ARG;
+       devc = sdi->priv;
+       if (!devc)
+               return SR_ERR_ARG;
 
-       if (!(sdi = cb_data))
-               return TRUE;
+       if (devc->child.running) {
+               sr_err("Vendor application already running.");
+               return SR_ERR_BUG;
+       }
+
+       /* Prepare to feed sample data to the session. */
+       memset(&devc->rawdata, 0, sizeof(devc->rawdata));
+       memset(&devc->samples, 0, sizeof(devc->samples));
+       devc->samples.queue = feed_queue_logic_alloc(sdi,
+               FEED_QUEUE_DEPTH, sizeof(devc->samples.last_sample));
+
+       /*
+        * Start the background process. May take considerable time
+        * before actual acquisition starts.
+        */
+       sr_dbg("Starting vendor application");
+       argv = devc->child.argv;
+       flags = devc->child.flags;
+       error = NULL;
+       ok = g_spawn_async_with_pipes(NULL, argv, NULL, flags, NULL, NULL,
+               &pid, &fd_in, &fd_out, NULL, &error);
+       if (error) {
+               sr_err("Cannot execute RTM CLI process: %s", error->message);
+               g_error_free(error);
+               ok = FALSE;
+       }
+       if (fd_in < 0 || fd_out < 0)
+               ok = FALSE;
+       if (!ok) {
+               sr_err("Vendor application start failed.");
+               return SR_ERR_IO;
+       }
+       devc->child.pid = pid;
+       devc->child.fd_stdin_write = fd_in;
+       devc->child.fd_stdout_read = fd_out;
+       devc->child.running = TRUE;
+       sr_dbg("Started vendor application, in %d, out %d", fd_in, fd_out);
+       txt = sr_hexdump_new((const uint8_t *)&pid, sizeof(pid));
+       sr_dbg("Vendor application PID (OS dependent): %s", txt->str);
+       sr_hexdump_free(txt);
+
+       /*
+        * Get the initial response. Verifies its operation, and only
+        * returns with success when acquisition became operational.
+        */
+       rcvd = read(fd_out, rsp, sizeof(rsp));
+       sr_dbg("Read from vendor application, ret %zd", rcvd);
+       if (rcvd < 0)
+               ok = FALSE;
+       if (ok && (size_t)rcvd != sizeof(rsp))
+               ok = FALSE;
+       if (!ok) {
+               omega_rtm_cli_close(sdi);
+               return SR_ERR_IO;
+       }
+
+       /*
+        * Ignore the first timestamp. Grab the most recent sample data
+        * to feed the session from it upon later repetition.
+        */
+       rdptr = rsp;
+       stamp = read_u16le_inc(&rdptr);
+       sample1 = read_u16le_inc(&rdptr);
+       sample2 = read_u16le_inc(&rdptr);
+       sr_dbg("stamp %u, samples %x %x", stamp, sample1, sample2);
+       write_u16le(devc->samples.last_sample, sample2);
+
+       return SR_OK;
+}
+
+/*
+ * Terminate the external acquisition process (vendor's CLI application).
+ */
+SR_PRIV int omega_rtm_cli_close(const struct sr_dev_inst *sdi)
+{
+       struct dev_context *devc;
+
+       if (!sdi)
+               return SR_ERR_ARG;
+       devc = sdi->priv;
+       if (!devc)
+               return SR_ERR_ARG;
+
+       /* Close the external process' stdin. Discard its stdout. */
+       sr_dbg("Closing vendor application file descriptors.");
+       if (devc->child.fd_stdin_write >= 0) {
+               sr_dbg("Closing vendor application stdin descriptor.");
+               close(devc->child.fd_stdin_write);
+               devc->child.fd_stdin_write = -1;
+       }
+       if (devc->child.fd_stdout_read >= 0) {
+               sr_dbg("Closing vendor application stdout descriptor.");
+               close(devc->child.fd_stdout_read);
+               devc->child.fd_stdout_read = -1;
+       }
+
+       /* Terminate the external process. */
+       if (devc->child.running) {
+               sr_dbg("Closing vendor application process.");
+               (void)g_spawn_close_pid(devc->child.pid);
+               memset(&devc->child.pid, 0, sizeof(devc->child.pid));
+               devc->child.running = FALSE;
+       }
+
+       /* Release the session feed queue. */
+       if (devc->samples.queue) {
+               feed_queue_logic_free(devc->samples.queue);
+               devc->samples.queue = NULL;
+       }
+
+       sr_dbg("Done closing vendor application.");
 
-       if (!(devc = sdi->priv))
+       return SR_OK;
+}
+
+/*
+ * Process received sample data, which comes in 6-byte chunks.
+ * Uncompress the RLE stream. Strictly enforce user specified sample
+ * count limits in the process, cap the submission when an uncompressed
+ * chunk would exceed the limit.
+ */
+static int omega_rtm_cli_process_rawdata(const struct sr_dev_inst *sdi)
+{
+       static const size_t chunk_size = 3 * sizeof(uint16_t);
+
+       struct dev_context *devc;
+       const uint8_t *rdptr;
+       size_t avail, taken, count;
+       uint16_t stamp, sample1, sample2;
+       int ret;
+
+       devc = sdi->priv;
+       rdptr = &devc->rawdata.buff[0];
+       avail = devc->rawdata.fill;
+       taken = 0;
+       ret = SR_OK;
+
+       /* Cope with previous errors, silently discard RX data. */
+       if (!devc->samples.queue)
+               ret = SR_ERR_DATA;
+
+       /* Process those chunks whose reception has completed. */
+       while (ret == SR_OK && avail >= chunk_size) {
+               stamp = read_u16le_inc(&rdptr);
+               sample1 = read_u16le_inc(&rdptr);
+               sample2 = read_u16le_inc(&rdptr);
+               avail -= chunk_size;
+               taken += chunk_size;
+
+               /*
+                * Uncompress the RLE stream by repeating the last
+                * sample value when necessary. Notice that the stamp
+                * has a resolution of 10ns and thus covers two times
+                * the number of samples, these are taken each 5ns (at
+                * 200MHz rate). A stamp value of 1 is immediately
+                * adjacent to the last chunk.
+                */
+               if (stamp)
+                       stamp--;
+               count = stamp * 2;
+               if (devc->samples.check_count) {
+                       if (count > devc->samples.remain_count)
+                               count = devc->samples.remain_count;
+                       devc->samples.remain_count -= count;
+               }
+               if (count) {
+                       ret = feed_queue_logic_submit(devc->samples.queue,
+                               devc->samples.last_sample, count);
+                       if (ret != SR_OK)
+                               break;
+                       sr_sw_limits_update_samples_read(&devc->limits, count);
+               }
+               if (devc->samples.check_count && !devc->samples.remain_count)
+                       break;
+
+               /*
+                * Also send the current samples. Keep the last value at
+                * hand because future chunks might repeat it.
+                */
+               write_u16le(devc->samples.last_sample, sample1);
+               ret = feed_queue_logic_submit(devc->samples.queue,
+                       devc->samples.last_sample, 1);
+               if (ret != SR_OK)
+                       break;
+
+               write_u16le(devc->samples.last_sample, sample2);
+               ret = feed_queue_logic_submit(devc->samples.queue,
+                       devc->samples.last_sample, 1);
+               if (ret != SR_OK)
+                       break;
+
+               count = 2;
+               sr_sw_limits_update_samples_read(&devc->limits, count);
+               if (devc->samples.check_count) {
+                       if (count > devc->samples.remain_count)
+                               count = devc->samples.remain_count;
+                       devc->samples.remain_count -= count;
+                       if (!devc->samples.remain_count)
+                               break;
+               }
+       }
+
+       /*
+        * Silently consume all chunks which were successfully received.
+        * These either completely got processed, or we are in an error
+        * path and discard unprocessed but complete sample data before
+        * propagating the error condition. This simplifies the logic
+        * above, and allows to keep draining the acquisition process'
+        * output, perhaps even resynchronize to it in a later attempt.
+        * The cost of this rare operation does not matter, robustness
+        * does.
+        */
+       while (avail >= chunk_size) {
+               avail -= chunk_size;
+               taken += chunk_size;
+       }
+
+       /*
+        * Shift remainders (incomplete chunks) down to the start of the
+        * receive buffer.
+        */
+       if (taken && avail) {
+               memmove(&devc->rawdata.buff[0],
+                       &devc->rawdata.buff[taken], avail);
+       }
+       devc->rawdata.fill -= taken;
+
+       return ret;
+}
+
+SR_PRIV int omega_rtm_cli_receive_data(int fd, int revents, void *cb_data)
+{
+       const struct sr_dev_inst *sdi;
+       struct dev_context *devc;
+       uint8_t *buff;
+       size_t space;
+       ssize_t rcvd;
+       int ret;
+
+       sdi = cb_data;
+       if (!sdi)
+               return TRUE;
+       devc = sdi->priv;
+       if (!devc)
                return TRUE;
 
-       if (revents == G_IO_IN) {
-               /* TODO */
+       /* Process receive data when available. */
+       if (revents & G_IO_IN) do {
+               buff = &devc->rawdata.buff[devc->rawdata.fill];
+               space = sizeof(devc->rawdata.buff) - devc->rawdata.fill;
+               rcvd = read(fd, buff, space);
+               sr_spew("Read from vendor application, ret %zd", rcvd);
+               if (rcvd <= 0)
+                       break;
+               devc->rawdata.fill += (size_t)rcvd;
+               ret = omega_rtm_cli_process_rawdata(sdi);
+               if (ret != SR_OK) {
+                       sr_err("Could not process sample data.");
+               }
+       } while (0);
+
+       /* Handle receive errors. */
+       if (revents & G_IO_ERR) {
+               (void)feed_queue_logic_flush(devc->samples.queue);
+               (void)sr_dev_acquisition_stop((struct sr_dev_inst *)sdi);
+       }
+
+       /* Handle optional acquisition limits. */
+       if (sr_sw_limits_check(&devc->limits)) {
+               (void)feed_queue_logic_flush(devc->samples.queue);
+               (void)sr_dev_acquisition_stop((struct sr_dev_inst *)sdi);
        }
 
        return TRUE;
index 74f8b1bd0bf72f02c8804b16fb0cffbc1ed4d03b..2f4641c8a371871140bbb30d4fe575fb29bdf8a1 100644 (file)
 #ifndef LIBSIGROK_HARDWARE_ASIX_OMEGA_RTM_CLI_PROTOCOL_H
 #define LIBSIGROK_HARDWARE_ASIX_OMEGA_RTM_CLI_PROTOCOL_H
 
-#include <stdint.h>
 #include <glib.h>
 #include <libsigrok/libsigrok.h>
+#include <stdint.h>
+
 #include "libsigrok-internal.h"
 
 #define LOG_PREFIX "asix-omega-rtm-cli"
 
+#define RTMCLI_STDOUT_CHUNKSIZE (1024 * 1024)
+#define FEED_QUEUE_DEPTH (256 * 1024)
+
 struct dev_context {
+       struct sr_sw_limits limits;
+       struct {
+               gchar **argv;
+               GSpawnFlags flags;
+               gboolean running;
+               GPid pid;
+               gint fd_stdin_write;
+               gint fd_stdout_read;
+       } child;
+       struct {
+               uint8_t buff[RTMCLI_STDOUT_CHUNKSIZE];
+               size_t fill;
+       } rawdata;
+       struct {
+               struct feed_queue_logic *queue;
+               uint8_t last_sample[sizeof(uint16_t)];
+               uint64_t remain_count;
+               gboolean check_count;
+       } samples;
 };
 
-SR_PRIV int asix_omega_rtm_cli_receive_data(int fd, int revents, void *cb_data);
+SR_PRIV int omega_rtm_cli_open(const struct sr_dev_inst *sdi);
+SR_PRIV int omega_rtm_cli_close(const struct sr_dev_inst *sdi);
+SR_PRIV int omega_rtm_cli_receive_data(int fd, int revents, void *cb_data);
 
 #endif