]> sigrok.org Git - libsigrok.git/commitdiff
output/srzip: queue samples before ZIP operation
authorGerhard Sittig <redacted>
Sun, 28 Jun 2020 01:30:58 +0000 (03:30 +0200)
committerGerhard Sittig <redacted>
Fri, 24 Jul 2020 07:13:43 +0000 (09:13 +0200)
Accumulate samples from multiple session feed packets before sending
them off to ZIP archive operations. This improves throughput for those
setups where acquisition devices or input modules provide only few
samples per session feed send call.

This version also splits large packets from applications into smaller
ZIP members (if the application's packet size is larger than the output
module's local buffer size). If that is not desired, the implementation
needs adjustment to immediately pass larger blocks to ZIP operations
(after potentially flushing previously queued data) instead of looping.

This fixes bug #974.

src/output/srzip.c

index 0c5ebf684a6c83ab471e01fe53c43f0e1af695ea..fb6d5a392e979c9cd894e70693c01366c5429701 100644 (file)
 #include "libsigrok-internal.h"
 
 #define LOG_PREFIX "output/srzip"
+#define CHUNK_SIZE (4 * 1024 * 1024)
 
 struct out_context {
        gboolean zip_created;
        uint64_t samplerate;
        char *filename;
-       gint first_analog_index;
+       size_t first_analog_index;
+       size_t analog_ch_count;
        gint *analog_index_map;
+       struct logic_buff {
+               size_t unit_size;
+               size_t alloc_size;
+               uint8_t *samples;
+               size_t fill_size;
+       } logic_buff;
+       struct analog_buff {
+               size_t alloc_size;
+               float *samples;
+               size_t fill_size;
+       } *analog_buff;
 };
 
 static int init(struct sr_output *o, GHashTable *options)
@@ -61,14 +74,16 @@ static int zip_create(const struct sr_output *o)
        struct zip *zipfile;
        struct zip_source *versrc, *metasrc;
        struct sr_channel *ch;
+       size_t ch_nr;
+       size_t alloc_size;
        GVariant *gvar;
        GKeyFile *meta;
        GSList *l;
        const char *devgroup;
        char *s, *metabuf;
        gsize metalen;
-       guint logic_channels = 0, enabled_logic_channels = 0;
-       guint enabled_analog_channels = 0;
+       guint logic_channels, enabled_logic_channels;
+       guint enabled_analog_channels;
        guint index;
 
        outc = o->priv;
@@ -103,6 +118,9 @@ static int zip_create(const struct sr_output *o)
 
        devgroup = "device 1";
 
+       logic_channels = 0;
+       enabled_logic_channels = 0;
+       enabled_analog_channels = 0;
        for (l = o->sdi->channels; l; l = l->next) {
                ch = l->data;
 
@@ -139,10 +157,9 @@ static int zip_create(const struct sr_output *o)
 
        g_key_file_set_integer(meta, devgroup, "total analog", enabled_analog_channels);
 
-       /* Make the array one entry larger than needed so we can use the final
-        * entry as terminator, which is set to -1. */
-       outc->analog_index_map = g_malloc0(sizeof(gint) * (enabled_analog_channels + 1));
-       outc->analog_index_map[enabled_analog_channels] = -1;
+       outc->analog_ch_count = enabled_analog_channels;
+       alloc_size = sizeof(gint) * outc->analog_ch_count;
+       outc->analog_index_map = g_malloc0(alloc_size);
 
        index = 0;
        for (l = o->sdi->channels; l; l = l->next) {
@@ -153,11 +170,13 @@ static int zip_create(const struct sr_output *o)
                s = NULL;
                switch (ch->type) {
                case SR_CHANNEL_LOGIC:
-                       s = g_strdup_printf("probe%d", ch->index + 1);
+                       ch_nr = ch->index + 1;
+                       s = g_strdup_printf("probe%zu", ch_nr);
                        break;
                case SR_CHANNEL_ANALOG:
+                       ch_nr = outc->first_analog_index + index;
                        outc->analog_index_map[index] = ch->index;
-                       s = g_strdup_printf("analog%d", outc->first_analog_index + index);
+                       s = g_strdup_printf("analog%zu", ch_nr);
                        index++;
                        break;
                }
@@ -167,6 +186,41 @@ static int zip_create(const struct sr_output *o)
                }
        }
 
+       /*
+        * Allocate one samples buffer for all logic channels, and
+        * several samples buffers for the analog channels. Allocate
+        * buffers of CHUNK_SIZE size (in bytes), and determine the
+        * sample counts from the respective channel counts and data
+        * type widths.
+        *
+        * These buffers are intended to reduce the number of ZIP
+        * archive update calls, and decouple the srzip output module
+        * from implementation details in other acquisition device
+        * drivers and input modules.
+        */
+       alloc_size = CHUNK_SIZE;
+       outc->logic_buff.unit_size = logic_channels;
+       outc->logic_buff.unit_size += 8 - 1;
+       outc->logic_buff.unit_size /= 8;
+       outc->logic_buff.samples = g_try_malloc0(alloc_size);
+       if (!outc->logic_buff.samples)
+               return SR_ERR_MALLOC;
+       alloc_size /= outc->logic_buff.unit_size;
+       outc->logic_buff.alloc_size = alloc_size;
+       outc->logic_buff.fill_size = 0;
+
+       alloc_size = sizeof(outc->analog_buff[0]) * outc->analog_ch_count;
+       outc->analog_buff = g_malloc0(alloc_size);
+       for (index = 0; index < outc->analog_ch_count; index++) {
+               alloc_size = CHUNK_SIZE;
+               outc->analog_buff[index].samples = g_try_malloc0(alloc_size);
+               if (!outc->analog_buff[index].samples)
+                       return SR_ERR_MALLOC;
+               alloc_size /= sizeof(outc->analog_buff[0].samples[0]);
+               outc->analog_buff[index].alloc_size = alloc_size;
+               outc->analog_buff[index].fill_size = 0;
+       }
+
        metabuf = g_key_file_to_data(meta, &metalen, NULL);
        g_key_file_free(meta);
 
@@ -191,8 +245,18 @@ static int zip_create(const struct sr_output *o)
        return SR_OK;
 }
 
-static int zip_append(const struct sr_output *o, unsigned char *buf,
-               int unitsize, int length)
+/**
+ * Append a block of logic data to an srzip archive.
+ *
+ * @param[in] o Output module instance.
+ * @param[in] buf Logic data samples as byte sequence.
+ * @param[in] unitsize Logic data unit size (bytes per sample).
+ * @param[in] length Byte sequence length (in bytes, not samples).
+ *
+ * @returns SR_OK et al error codes.
+ */
+static int zip_append(const struct sr_output *o,
+       uint8_t *buf, size_t unitsize, size_t length)
 {
        struct out_context *outc;
        struct zip *archive;
@@ -209,6 +273,9 @@ static int zip_append(const struct sr_output *o, unsigned char *buf,
        char *chunkname;
        unsigned int next_chunk_num;
 
+       if (!length)
+               return SR_OK;
+
        outc = o->priv;
        if (!(archive = zip_open(outc->filename, 0, NULL)))
                return SR_ERR;
@@ -285,8 +352,8 @@ static int zip_append(const struct sr_output *o, unsigned char *buf,
        }
 
        if (length % unitsize != 0) {
-               sr_warn("Chunk size %d not a multiple of the"
-                       " unit size %d.", length, unitsize);
+               sr_warn("Chunk size %zu not a multiple of the"
+                       " unit size %zu.", length, unitsize);
        }
        logicsrc = zip_source_buffer(archive, buf, length, FALSE);
        chunkname = g_strdup_printf("logic-1-%u", next_chunk_num);
@@ -311,53 +378,108 @@ static int zip_append(const struct sr_output *o, unsigned char *buf,
        return SR_OK;
 }
 
+/**
+ * Queue a block of logic data for srzip archive writes.
+ *
+ * @param[in] o Output module instance.
+ * @param[in] buf Logic data samples as byte sequence.
+ * @param[in] unitsize Logic data unit size (bytes per sample).
+ * @param[in] length Number of bytes of sample data.
+ * @param[in] flush Force ZIP archive update (queue by default).
+ *
+ * @returns SR_OK et al error codes.
+ */
+static int zip_append_queue(const struct sr_output *o,
+       uint8_t *buf, size_t unitsize, size_t length, gboolean flush)
+{
+       struct out_context *outc;
+       struct logic_buff *buff;
+       size_t send_size, remain, copy_size;
+       uint8_t *wrptr, *rdptr;
+       int ret;
+
+       outc = o->priv;
+       buff = &outc->logic_buff;
+       if (length && unitsize != buff->unit_size)
+               return SR_ERR_ARG;
+
+       /*
+        * Queue most recently received samples to the local buffer.
+        * Flush to the ZIP archive when the buffer space is exhausted.
+        */
+       rdptr = buf;
+       send_size = length / buff->unit_size;
+       while (send_size) {
+               remain = buff->alloc_size - buff->fill_size;
+               if (remain) {
+                       wrptr = &buff->samples[buff->fill_size * buff->unit_size];
+                       copy_size = MIN(send_size, remain);
+                       send_size -= copy_size;
+                       buff->fill_size += copy_size;
+                       memcpy(wrptr, rdptr, copy_size * buff->unit_size);
+                       rdptr += copy_size * buff->unit_size;
+                       remain -= copy_size;
+               }
+               if (send_size && !remain) {
+                       ret = zip_append(o, buff->samples, buff->unit_size,
+                               buff->fill_size * buff->unit_size);
+                       if (ret != SR_OK)
+                               return ret;
+                       buff->fill_size = 0;
+                       remain = buff->alloc_size - buff->fill_size;
+               }
+       }
+
+       /* Flush to the ZIP archive if the caller wants us to. */
+       if (flush && buff->fill_size) {
+               ret = zip_append(o, buff->samples, buff->unit_size,
+                       buff->fill_size * buff->unit_size);
+               if (ret != SR_OK)
+                       return ret;
+               buff->fill_size = 0;
+       }
+
+       return SR_OK;
+}
+
+/**
+ * Append analog data of a channel to an srzip archive.
+ *
+ * @param[in] o Output module instance.
+ * @param[in] values Sample data as array of floating point values.
+ * @param[in] count Number of samples (float items, not bytes).
+ * @param[in] ch_nr 1-based channel number.
+ *
+ * @returns SR_OK et al error codes.
+ */
 static int zip_append_analog(const struct sr_output *o,
-               const struct sr_datafeed_analog *analog)
+       const float *values, size_t count, size_t ch_nr)
 {
        struct out_context *outc;
        struct zip *archive;
        struct zip_source *analogsrc;
        int64_t i, num_files;
+       size_t size;
        struct zip_stat zs;
        uint64_t chunk_num;
        const char *entry_name;
        char *basename;
        gsize baselen;
-       struct sr_channel *channel;
-       float *chunkbuf;
-       gsize chunksize;
        char *chunkname;
-       unsigned int next_chunk_num, index;
+       unsigned int next_chunk_num;
 
        outc = o->priv;
 
-       /* TODO: support packets covering multiple channels */
-       if (g_slist_length(analog->meaning->channels) != 1) {
-               sr_err("Analog packets covering multiple channels not supported yet");
-               return SR_ERR;
-       }
-       channel = analog->meaning->channels->data;
-
-       /* When reading the file, analog channels must be consecutive.
-        * Thus we need a global channel index map as we don't know in
-        * which order the channel data comes in. */
-       for (index = 0; outc->analog_index_map[index] != -1; index++)
-               if (outc->analog_index_map[index] == channel->index)
-                       break;
-       if (outc->analog_index_map[index] == -1)
-               return SR_ERR_ARG; /* Channel index was not in the list */
-
-       index += outc->first_analog_index;
-
        if (!(archive = zip_open(outc->filename, 0, NULL)))
                return SR_ERR;
 
        if (zip_stat(archive, "metadata", 0, &zs) < 0) {
                sr_err("Failed to open metadata: %s", zip_strerror(archive));
-               goto err_zip_discard;
+               zip_discard(archive);
+               return SR_ERR;
        }
 
-       basename = g_strdup_printf("analog-1-%u", index);
+       basename = g_strdup_printf("analog-1-%zu", ch_nr);
        baselen = strlen(basename);
        next_chunk_num = 1;
        num_files = zip_get_num_entries(archive, 0);
@@ -372,41 +494,134 @@ static int zip_append_analog(const struct sr_output *o,
                }
        }
 
-       chunksize = sizeof(float) * analog->num_samples;
-       if (!(chunkbuf = g_try_malloc(chunksize)))
-               goto err_free_basename;
-
-       if (sr_analog_to_float(analog, chunkbuf) != SR_OK)
-               goto err_free_chunkbuf;
-
-       analogsrc = zip_source_buffer(archive, chunkbuf, chunksize, FALSE);
+       size = sizeof(values[0]) * count;
+       analogsrc = zip_source_buffer(archive, values, size, FALSE);
        chunkname = g_strdup_printf("%s-%u", basename, next_chunk_num);
        i = zip_add(archive, chunkname, analogsrc);
        if (i < 0) {
                sr_err("Failed to add chunk '%s': %s", chunkname, zip_strerror(archive));
                g_free(chunkname);
+               g_free(basename);
                zip_source_free(analogsrc);
-               goto err_free_chunkbuf;
+               zip_discard(archive);
+               return SR_ERR;
        }
        g_free(chunkname);
        if (zip_close(archive) < 0) {
                sr_err("Error saving session file: %s", zip_strerror(archive));
-               goto err_free_chunkbuf;
+               g_free(basename);
+               zip_discard(archive);
+               return SR_ERR;
        }
 
        g_free(basename);
-       g_free(chunkbuf);
 
        return SR_OK;
+}
 
-err_free_chunkbuf:
-       g_free(chunkbuf);
-err_free_basename:
-       g_free(basename);
-err_zip_discard:
-       zip_discard(archive);
+/**
+ * Queue analog data of a channel for srzip archive writes.
+ *
+ * @param[in] o Output module instance.
+ * @param[in] analog Sample data (session feed packet format).
+ * @param[in] flush Force ZIP archive update (queue by default).
+ *
+ * @returns SR_OK et al error codes.
+ */
+static int zip_append_analog_queue(const struct sr_output *o,
+       const struct sr_datafeed_analog *analog, gboolean flush)
+{
+       struct out_context *outc;
+       const struct sr_channel *ch;
+       size_t idx, nr;
+       struct analog_buff *buff;
+       float *values, *wrptr, *rdptr;
+       size_t send_size, remain, copy_size;
+       int ret;
 
-       return SR_ERR;
+       outc = o->priv;
+
+       /* Is this the DF_END flush call without samples submission? */
+       if (!analog && flush) {
+               for (idx = 0; idx < outc->analog_ch_count; idx++) {
+                       nr = outc->first_analog_index + idx;
+                       buff = &outc->analog_buff[idx];
+                       if (!buff->fill_size)
+                               continue;
+                       ret = zip_append_analog(o,
+                               buff->samples, buff->fill_size, nr);
+                       if (ret != SR_OK)
+                               return ret;
+                       buff->fill_size = 0;
+               }
+               return SR_OK;
+       }
+
+       /* Lookup index and number of the analog channel. */
+       /* TODO: support packets covering multiple channels */
+       if (g_slist_length(analog->meaning->channels) != 1) {
+               sr_err("Analog packets covering multiple channels not supported yet");
+               return SR_ERR;
+       }
+       ch = g_slist_nth_data(analog->meaning->channels, 0);
+       for (idx = 0; idx < outc->analog_ch_count; idx++) {
+               if (outc->analog_index_map[idx] == ch->index)
+                       break;
+       }
+       if (idx == outc->analog_ch_count)
+               return SR_ERR_ARG;
+       nr = outc->first_analog_index + idx;
+       buff = &outc->analog_buff[idx];
+
+       /* Convert the analog data to an array of float values. */
+       values = g_try_malloc0(analog->num_samples * sizeof(values[0]));
+       if (!values)
+               return SR_ERR_MALLOC;
+       ret = sr_analog_to_float(analog, values);
+       if (ret != SR_OK) {
+               g_free(values);
+               return ret;
+       }
+
+       /*
+        * Queue most recently received samples to the local buffer.
+        * Flush to the ZIP archive when the buffer space is exhausted.
+        */
+       rdptr = values;
+       send_size = analog->num_samples;
+       while (send_size) {
+               remain = buff->alloc_size - buff->fill_size;
+               if (remain) {
+                       wrptr = &buff->samples[buff->fill_size];
+                       copy_size = MIN(send_size, remain);
+                       send_size -= copy_size;
+                       buff->fill_size += copy_size;
+                       memcpy(wrptr, rdptr, copy_size * sizeof(values[0]));
+                       rdptr += copy_size;
+                       remain -= copy_size;
+               }
+               if (send_size && !remain) {
+                       ret = zip_append_analog(o,
+                               buff->samples, buff->fill_size, nr);
+                       if (ret != SR_OK) {
+                               g_free(values);
+                               return ret;
+                       }
+                       buff->fill_size = 0;
+                       remain = buff->alloc_size - buff->fill_size;
+               }
+       }
+       g_free(values);
+
+       /* Flush to the ZIP archive if the caller wants us to. */
+       if (flush && buff->fill_size) {
+               ret = zip_append_analog(o, buff->samples, buff->fill_size, nr);
+               if (ret != SR_OK)
+                       return ret;
+               buff->fill_size = 0;
+       }
+
+       return SR_OK;
 }
 
 static int receive(const struct sr_output *o, const struct sr_datafeed_packet *packet,
@@ -418,7 +633,6 @@ static int receive(const struct sr_output *o, const struct sr_datafeed_packet *p
        const struct sr_datafeed_analog *analog;
        const struct sr_config *src;
        GSList *l;
-
        int ret;
 
        *out = NULL;
@@ -442,7 +656,8 @@ static int receive(const struct sr_output *o, const struct sr_datafeed_packet *p
                        outc->zip_created = TRUE;
                }
                logic = packet->payload;
-               ret = zip_append(o, logic->data, logic->unitsize, logic->length);
+               ret = zip_append_queue(o, logic->data,
+                       logic->unitsize, logic->length, FALSE);
                if (ret != SR_OK)
                        return ret;
                break;
@@ -453,10 +668,20 @@ static int receive(const struct sr_output *o, const struct sr_datafeed_packet *p
                        outc->zip_created = TRUE;
                }
                analog = packet->payload;
-               ret = zip_append_analog(o, analog);
+               ret = zip_append_analog_queue(o, analog, FALSE);
                if (ret != SR_OK)
                        return ret;
                break;
+       case SR_DF_END:
+               if (outc->zip_created) {
+                       ret = zip_append_queue(o, NULL, 0, 0, TRUE);
+                       if (ret != SR_OK)
+                               return ret;
+                       ret = zip_append_analog_queue(o, NULL, TRUE);
+                       if (ret != SR_OK)
+                               return ret;
+               }
+               break;
        }
 
        return SR_OK;
@@ -474,10 +699,17 @@ static const struct sr_option *get_options(void)
 static int cleanup(struct sr_output *o)
 {
        struct out_context *outc;
+       size_t idx;
 
        outc = o->priv;
+
        g_free(outc->analog_index_map);
        g_free(outc->filename);
+       g_free(outc->logic_buff.samples);
+       for (idx = 0; idx < outc->analog_ch_count; idx++)
+               g_free(outc->analog_buff[idx].samples);
+       g_free(outc->analog_buff);
+
        g_free(outc);
        o->priv = NULL;