uint8_t req;
int ret;
- /* Check for expired intervals or forced requests. */
+ /* Don't send request when receive data is being accumulated. */
devc = sdi->priv;
+ if (!force && devc->buflen)
+ return SR_OK;
+
+ /* Check for expired intervals or forced requests. */
now = g_get_monotonic_time() / 1000;
elapsed = now - devc->cmd_sent_at;
if (!force && elapsed < POLL_PERIOD_MS)
return SR_OK;
}
-static void handle_poll_data(const struct sr_dev_inst *sdi)
+static int process_data(struct sr_dev_inst *sdi,
+ const uint8_t *data, size_t dlen)
{
struct dev_context *devc;
+ const struct rdtech_um_profile *p;
size_t ch_idx;
GSList *ch;
+ int ret;
devc = sdi->priv;
- sr_spew("Received poll packet (len: %zu).", devc->buflen);
- if (devc->buflen != POLL_RECV_LEN) {
- sr_err("Unexpected poll packet length: %zu", devc->buflen);
- return;
+ p = devc->profile;
+
+ sr_spew("Received poll packet (len: %zu).", dlen);
+ if (dlen < POLL_RECV_LEN) {
+ sr_err("Insufficient response data length: %zu", dlen);
+ return SR_ERR_DATA;
+ }
+
+ if (!p->csum_ok(data, POLL_RECV_LEN)) {
+ sr_err("Packet checksum verification failed.");
+ return SR_ERR_DATA;
}
ch_idx = 0;
for (ch = sdi->channels; ch; ch = g_slist_next(ch)) {
- bv_send_analog_channel(sdi, ch->data,
+ ret = bv_send_analog_channel(sdi, ch->data,
&devc->profile->channels[ch_idx],
- devc->buf, devc->buflen);
+ data, dlen);
ch_idx++;
+ if (ret != SR_OK)
+ return ret;
}
sr_sw_limits_update_samples_read(&devc->limits, 1);
+ if (sr_sw_limits_check(&devc->limits))
+ sr_dev_acquisition_stop(sdi);
+
+ return SR_OK;
}
-static void recv_poll_data(struct sr_dev_inst *sdi, struct sr_serial_dev_inst *serial)
+static int accum_data(struct sr_dev_inst *sdi, struct sr_serial_dev_inst *serial)
{
struct dev_context *devc;
const struct rdtech_um_profile *p;
- int len;
-
- /* Serial data arrived. */
+ uint8_t *rdptr;
+ size_t space, rcvd, rdlen;
+ int ret;
+ gboolean do_sync_check;
+ size_t sync_len, sync_idx;
+
+ /*
+ * Receive data became available. Drain the serial transport.
+ * Grab incoming data in as large a chunk as possible. Also
+ * copes with zero receive data length, as some transports may
+ * trigger periodically without data really being available.
+ */
devc = sdi->priv;
p = devc->profile;
- while (devc->buflen < POLL_RECV_LEN) {
- len = serial_read_nonblocking(serial, devc->buf + devc->buflen, 1);
- if (len < 1)
- return;
- devc->buflen += len;
-
- /* Check if the poll model ID matches the profile. */
- if (devc->buflen == 2 && RB16(devc->buf) != p->model_id) {
- sr_warn("Illegal model ID in poll response (0x%.4" PRIx16 "),"
- " skipping 1 byte.",
- RB16(devc->buf));
- devc->buflen--;
- memmove(devc->buf, devc->buf + 1, devc->buflen);
+ rdptr = &devc->buf[devc->buflen];
+ space = sizeof(devc->buf) - devc->buflen;
+ do_sync_check = FALSE;
+ sync_len = sizeof(uint16_t);
+ while (space) {
+ ret = serial_read_nonblocking(serial, rdptr, space);
+ if (ret < 0)
+ return SR_ERR_IO;
+ rcvd = (size_t)ret;
+ if (rcvd == 0)
+ break;
+ if (rcvd > space)
+ return SR_ERR_BUG;
+ if (devc->buflen < sync_len)
+ do_sync_check = TRUE;
+ devc->buflen += rcvd;
+ if (devc->buflen < sync_len)
+ do_sync_check = FALSE;
+ space -= rcvd;
+ rdptr += rcvd;
+ }
+
+ /*
+ * Synchronize to the packetized input stream. Check the model
+ * ID at the start of receive data. Which is a weak condition,
+ * but going out of sync should be rare, and repeated attempts
+ * to synchronize should eventually succeed. Try to rate limit
+ * the emission of diagnostics messages. (Re-)run this logic
+ * at the first reception which makes enough data available,
+ * but not during subsequent accumulation of more data.
+ *
+ * Reducing redundancy in the implementation at the same time as
+ * increasing robustness would involve the creation of a checker
+ * routine, which just gets called for every byte position until
+ * it succeeds. Similar to what a previous implementation of the
+ * read loop did, which was expensive on the serial transport.
+ */
+ sync_idx = 0;
+ if (do_sync_check && read_u16be(&devc->buf[sync_idx]) != p->model_id)
+ sr_warn("Unexpected response data, trying to synchronize.");
+ while (do_sync_check) {
+ if (sync_idx + sync_len >= devc->buflen)
+ break;
+ if (read_u16be(&devc->buf[sync_idx]) == p->model_id)
+ break;
+ sync_idx++;
+ }
+ if (do_sync_check && sync_idx) {
+ sr_dbg("Skipping %zu bytes in attempt to sync.", sync_idx);
+ sync_len = devc->buflen - sync_idx;
+ if (sync_len)
+ memmove(&devc->buf[0], &devc->buf[sync_idx], sync_len);
+ devc->buflen -= sync_idx;
+ }
+
+ /*
+ * Process packets as their reception completes. Periodically
+ * re-transmit poll requests. Discard consumed data after all
+ * processing has completed.
+ */
+ rdptr = devc->buf;
+ rdlen = devc->buflen;
+ ret = SR_OK;
+ while (ret == SR_OK && rdlen >= POLL_RECV_LEN) {
+ ret = process_data(sdi, rdptr, rdlen);
+ if (ret != SR_OK) {
+ sr_err("Processing response packet failed.");
+ break;
}
+ rdptr += POLL_RECV_LEN;
+ rdlen -= POLL_RECV_LEN;
+
+ if (0 && !sr_sw_limits_check(&devc->limits))
+ (void)rdtech_um_poll(sdi, FALSE);
}
+ rcvd = rdptr - devc->buf;
+ devc->buflen -= rcvd;
+ if (devc->buflen)
+ memmove(&devc->buf[0], rdptr, devc->buflen);
- if (devc->buflen != POLL_RECV_LEN)
- sr_warn("Skipping packet, unexpected receive length.");
- else if (!p->csum_ok(devc->buf, devc->buflen))
- sr_warn("Skipping packet, checksum verification failed.");
- else
- handle_poll_data(sdi);
- devc->buflen = 0;
+ return ret;
}
SR_PRIV int rdtech_um_receive_data(int fd, int revents, void *cb_data)
struct sr_dev_inst *sdi;
struct dev_context *devc;
struct sr_serial_dev_inst *serial;
+ int ret;
(void)fd;
if (!(devc = sdi->priv))
return TRUE;
- /* Drain and process receive data as it becomes available. */
+ /*
+ * Drain and process receive data as it becomes available.
+ * Terminate acquisition upon receive or processing error.
+ */
serial = sdi->conn;
- if (revents == G_IO_IN)
- recv_poll_data(sdi, serial);
+ if (revents == G_IO_IN) {
+ ret = accum_data(sdi, serial);
+ if (ret != SR_OK) {
+ sr_dev_acquisition_stop(sdi);
+ return TRUE;
+ }
+ }
/* Check configured acquisition limits. */
if (sr_sw_limits_check(&devc->limits)) {
return TRUE;
}
- /* Periodically emit measurement requests. */
+ /* Periodically retransmit measurement requests. */
(void)rdtech_um_poll(sdi, FALSE);
return TRUE;