]> sigrok.org Git - libsigrok.git/commitdiff
rdtech-um: rework serial reception and processing, including re-sync
authorGerhard Sittig <redacted>
Wed, 15 Mar 2023 19:39:10 +0000 (20:39 +0100)
committerGerhard Sittig <redacted>
Thu, 16 Mar 2023 13:29:30 +0000 (14:29 +0100)
The previous implementation was expensive on the serial transport and
kept calling into this layer for individual bytes, to simplify the logic
which synchronized to the packetized input stream. And it used to block
in main loop calls until reception of a packet completes, which could
take some 200ms in the RFCOMM setup, and not much less for USB CDC.

Rework the call chain from main loop receive callbacks to processing
packets. Grab receive data in large chunks from the serial layer. Try
to rate limit the synchronization to the packet stream and associated
diagnostics messages. Process more return values and stop acquisition
upon errors. Move the checksum test to the packet content processor.
Check acquisition limits in the spot which just submitted more data
(keep another check in the periodic callback to handle time limits).
Strive for fast yet rate limited transmission of periodic measurement
requests, to initiate the reception of more sample values.

Shorten more lines (and avoid continuations and line breaks) by renaming
routines and variables. Pass packet content references such that content
handlers need not know about reception details, and reception can chunk
data differently in future versions.

This implementation became rather complex, most of all because of UI
concerns and robustness, as well as potential for future extension of
the scope of these code paths. The loss of synchronization and slow
reception of packets in chunks in combination with periodic request
transmission based on intervals results in "interesting" behaviour.
Maybe something simpler is desirable which is more expensive to run
but easier to maintain and reason about?

src/hardware/rdtech-um/protocol.c

index 5d3ab6ea0211bc4c92df6fd1988cbff25566e448..333c1e69a5f871f102f3aad26ec6cea3cf1c2bf7 100644 (file)
@@ -161,8 +161,12 @@ SR_PRIV int rdtech_um_poll(const struct sr_dev_inst *sdi, gboolean force)
        uint8_t req;
        int ret;
 
-       /* Check for expired intervals or forced requests. */
+       /* Don't send request when receive data is being accumulated. */
        devc = sdi->priv;
+       if (!force && devc->buflen)
+               return SR_OK;
+
+       /* Check for expired intervals or forced requests. */
        now = g_get_monotonic_time() / 1000;
        elapsed = now - devc->cmd_sent_at;
        if (!force && elapsed < POLL_PERIOD_MS)
@@ -181,62 +185,145 @@ SR_PRIV int rdtech_um_poll(const struct sr_dev_inst *sdi, gboolean force)
        return SR_OK;
 }
 
-static void handle_poll_data(const struct sr_dev_inst *sdi)
+static int process_data(struct sr_dev_inst *sdi,
+       const uint8_t *data, size_t dlen)
 {
        struct dev_context *devc;
+       const struct rdtech_um_profile *p;
        size_t ch_idx;
        GSList *ch;
+       int ret;
 
        devc = sdi->priv;
-       sr_spew("Received poll packet (len: %zu).", devc->buflen);
-       if (devc->buflen != POLL_RECV_LEN) {
-               sr_err("Unexpected poll packet length: %zu", devc->buflen);
-               return;
+       p = devc->profile;
+
+       sr_spew("Received poll packet (len: %zu).", dlen);
+       if (dlen < POLL_RECV_LEN) {
+               sr_err("Insufficient response data length: %zu", dlen);
+               return SR_ERR_DATA;
+       }
+
+       if (!p->csum_ok(data, POLL_RECV_LEN)) {
+               sr_err("Packet checksum verification failed.");
+               return SR_ERR_DATA;
        }
 
        ch_idx = 0;
        for (ch = sdi->channels; ch; ch = g_slist_next(ch)) {
-               bv_send_analog_channel(sdi, ch->data,
+               ret = bv_send_analog_channel(sdi, ch->data,
                        &devc->profile->channels[ch_idx],
-                       devc->buf, devc->buflen);
+                       data, dlen);
                ch_idx++;
+               if (ret != SR_OK)
+                       return ret;
        }
 
        sr_sw_limits_update_samples_read(&devc->limits, 1);
+       if (sr_sw_limits_check(&devc->limits))
+               sr_dev_acquisition_stop(sdi);
+
+       return SR_OK;
 }
 
-static void recv_poll_data(struct sr_dev_inst *sdi, struct sr_serial_dev_inst *serial)
+static int accum_data(struct sr_dev_inst *sdi, struct sr_serial_dev_inst *serial)
 {
        struct dev_context *devc;
        const struct rdtech_um_profile *p;
-       int len;
-
-       /* Serial data arrived. */
+       uint8_t *rdptr;
+       size_t space, rcvd, rdlen;
+       int ret;
+       gboolean do_sync_check;
+       size_t sync_len, sync_idx;
+
+       /*
+        * Receive data became available. Drain the serial transport.
+        * Grab incoming data in as large a chunk as possible. Also
+        * copes with zero receive data length, as some transports may
+        * trigger periodically without data really being available.
+        */
        devc = sdi->priv;
        p = devc->profile;
-       while (devc->buflen < POLL_RECV_LEN) {
-               len = serial_read_nonblocking(serial, devc->buf + devc->buflen, 1);
-               if (len < 1)
-                       return;
-               devc->buflen += len;
-
-               /* Check if the poll model ID matches the profile. */
-               if (devc->buflen == 2 && RB16(devc->buf) != p->model_id) {
-                       sr_warn("Illegal model ID in poll response (0x%.4" PRIx16 "),"
-                               " skipping 1 byte.",
-                               RB16(devc->buf));
-                       devc->buflen--;
-                       memmove(devc->buf, devc->buf + 1, devc->buflen);
+       rdptr = &devc->buf[devc->buflen];
+       space = sizeof(devc->buf) - devc->buflen;
+       do_sync_check = FALSE;
+       sync_len = sizeof(uint16_t);
+       while (space) {
+               ret = serial_read_nonblocking(serial, rdptr, space);
+               if (ret < 0)
+                       return SR_ERR_IO;
+               rcvd = (size_t)ret;
+               if (rcvd == 0)
+                       break;
+               if (rcvd > space)
+                       return SR_ERR_BUG;
+               if (devc->buflen < sync_len)
+                       do_sync_check = TRUE;
+               devc->buflen += rcvd;
+               if (devc->buflen < sync_len)
+                       do_sync_check = FALSE;
+               space -= rcvd;
+               rdptr += rcvd;
+       }
+
+       /*
+        * Synchronize to the packetized input stream. Check the model
+        * ID at the start of receive data. Which is a weak condition,
+        * but going out of sync should be rare, and repeated attempts
+        * to synchronize should eventually succeed. Try to rate limit
+        * the emission of diagnostics messages. (Re-)run this logic
+        * at the first reception which makes enough data available,
+        * but not during subsequent accumulation of more data.
+        *
+        * Reducing redundancy in the implementation at the same time as
+        * increasing robustness would involve the creation of a checker
+        * routine, which just gets called for every byte position until
+        * it succeeds. Similar to what a previous implementation of the
+        * read loop did, which was expensive on the serial transport.
+        */
+       sync_idx = 0;
+       if (do_sync_check && read_u16be(&devc->buf[sync_idx]) != p->model_id)
+               sr_warn("Unexpected response data, trying to synchronize.");
+       while (do_sync_check) {
+               if (sync_idx + sync_len >= devc->buflen)
+                       break;
+               if (read_u16be(&devc->buf[sync_idx]) == p->model_id)
+                       break;
+               sync_idx++;
+       }
+       if (do_sync_check && sync_idx) {
+               sr_dbg("Skipping %zu bytes in attempt to sync.", sync_idx);
+               sync_len = devc->buflen - sync_idx;
+               if (sync_len)
+                       memmove(&devc->buf[0], &devc->buf[sync_idx], sync_len);
+               devc->buflen -= sync_idx;
+       }
+
+       /*
+        * Process packets as their reception completes. Periodically
+        * re-transmit poll requests. Discard consumed data after all
+        * processing has completed.
+        */
+       rdptr = devc->buf;
+       rdlen = devc->buflen;
+       ret = SR_OK;
+       while (ret == SR_OK && rdlen >= POLL_RECV_LEN) {
+               ret = process_data(sdi, rdptr, rdlen);
+               if (ret != SR_OK) {
+                       sr_err("Processing response packet failed.");
+                       break;
                }
+               rdptr += POLL_RECV_LEN;
+               rdlen -= POLL_RECV_LEN;
+
+               if (0 && !sr_sw_limits_check(&devc->limits))
+                       (void)rdtech_um_poll(sdi, FALSE);
        }
+       rcvd = rdptr - devc->buf;
+       devc->buflen -= rcvd;
+       if (devc->buflen)
+               memmove(&devc->buf[0], rdptr, devc->buflen);
 
-       if (devc->buflen != POLL_RECV_LEN)
-               sr_warn("Skipping packet, unexpected receive length.");
-       else if (!p->csum_ok(devc->buf, devc->buflen))
-               sr_warn("Skipping packet, checksum verification failed.");
-       else
-               handle_poll_data(sdi);
-       devc->buflen = 0;
+       return ret;
 }
 
 SR_PRIV int rdtech_um_receive_data(int fd, int revents, void *cb_data)
@@ -244,6 +331,7 @@ SR_PRIV int rdtech_um_receive_data(int fd, int revents, void *cb_data)
        struct sr_dev_inst *sdi;
        struct dev_context *devc;
        struct sr_serial_dev_inst *serial;
+       int ret;
 
        (void)fd;
 
@@ -252,10 +340,18 @@ SR_PRIV int rdtech_um_receive_data(int fd, int revents, void *cb_data)
        if (!(devc = sdi->priv))
                return TRUE;
 
-       /* Drain and process receive data as it becomes available. */
+       /*
+        * Drain and process receive data as it becomes available.
+        * Terminate acquisition upon receive or processing error.
+        */
        serial = sdi->conn;
-       if (revents == G_IO_IN)
-               recv_poll_data(sdi, serial);
+       if (revents == G_IO_IN) {
+               ret = accum_data(sdi, serial);
+               if (ret != SR_OK) {
+                       sr_dev_acquisition_stop(sdi);
+                       return TRUE;
+               }
+       }
 
        /* Check configured acquisition limits. */
        if (sr_sw_limits_check(&devc->limits)) {
@@ -263,7 +359,7 @@ SR_PRIV int rdtech_um_receive_data(int fd, int revents, void *cb_data)
                return TRUE;
        }
 
-       /* Periodically emit measurement requests. */
+       /* Periodically retransmit measurement requests. */
        (void)rdtech_um_poll(sdi, FALSE);
 
        return TRUE;