sr_hexdump_free(text);
}
-static void handle_packet(const uint8_t *buf, struct sr_dev_inst *sdi,
- void *info)
+static void handle_packet(struct sr_dev_inst *sdi,
+ const uint8_t *buf, size_t len, void *info)
{
struct dmm_info *dmm;
+ struct dev_context *devc;
float floatval;
+ double doubleval;
struct sr_datafeed_packet packet;
struct sr_datafeed_analog analog;
struct sr_analog_encoding encoding;
struct sr_analog_meaning meaning;
struct sr_analog_spec spec;
- struct dev_context *devc;
gboolean sent_sample;
struct sr_channel *channel;
size_t ch_idx;
dmm = (struct dmm_info *)sdi->driver;
- log_dmm_packet(buf, dmm->packet_size);
+ log_dmm_packet(buf, len);
devc = sdi->priv;
sent_sample = FALSE;
analog.num_samples = 1;
analog.meaning->mq = 0;
- dmm->packet_parse(buf, &floatval, &analog, info);
- analog.data = &floatval;
+ if (dmm->packet_parse) {
+ dmm->packet_parse(buf, &floatval, &analog, info);
+ analog.data = &floatval;
+ analog.encoding->unitsize = sizeof(floatval);
+ } else if (dmm->packet_parse_len) {
+ dmm->packet_parse_len(dmm->dmm_state, buf, len,
+ &doubleval, &analog, info);
+ analog.data = &doubleval;
+ analog.encoding->unitsize = sizeof(doubleval);
+ }
/* If this DMM needs additional handling, call the resp. function. */
if (dmm->dmm_details)
struct dmm_info *dmm;
struct dev_context *devc;
struct sr_serial_dev_inst *serial;
+ uint64_t now, left, next;
int ret;
dmm = (struct dmm_info *)sdi->driver;
-
if (!dmm->packet_request)
return SR_OK;
devc = sdi->priv;
serial = sdi->conn;
- if (devc->req_next_at && (devc->req_next_at > g_get_monotonic_time())) {
- sr_spew("Not requesting new packet yet, %" PRIi64 " ms left.",
- ((devc->req_next_at - g_get_monotonic_time()) / 1000));
+ now = g_get_monotonic_time();
+ if (devc->req_next_at && now < devc->req_next_at) {
+ left = (devc->req_next_at - now) / 1000;
+ sr_spew("Not re-requesting yet, %" PRIu64 "ms left.", left);
return SR_OK;
}
+ sr_spew("Requesting next packet.");
ret = dmm->packet_request(serial);
if (ret < 0) {
sr_err("Failed to request packet: %d.", ret);
return ret;
}
- if (dmm->req_timeout_ms)
- devc->req_next_at = g_get_monotonic_time() + (dmm->req_timeout_ms * 1000);
+ if (dmm->req_timeout_ms) {
+ next = now + dmm->req_timeout_ms * 1000;
+ devc->req_next_at = next;
+ }
return SR_OK;
}
{
struct dmm_info *dmm;
struct dev_context *devc;
- int len, offset;
struct sr_serial_dev_inst *serial;
+ int ret;
+ size_t read_len, check_pos, check_len, pkt_size, copy_len;
+ uint8_t *check_ptr;
+ uint64_t deadline;
dmm = (struct dmm_info *)sdi->driver;
devc = sdi->priv;
serial = sdi->conn;
- /* Try to get as much data as the buffer can hold. */
- len = DMM_BUFSIZE - devc->buflen;
- len = serial_read_nonblocking(serial, devc->buf + devc->buflen, len);
- if (len == 0)
+ /* Add the maximum available RX data we can get to the local buffer. */
+ read_len = DMM_BUFSIZE - devc->buflen;
+ ret = serial_read_nonblocking(serial, &devc->buf[devc->buflen], read_len);
+ if (ret == 0)
return; /* No new bytes, nothing to do. */
- if (len < 0) {
- sr_err("Serial port read error: %d.", len);
+ if (ret < 0) {
+ sr_err("Serial port read error: %d.", ret);
return;
}
- devc->buflen += len;
-
- /* Now look for packets in that data. */
- offset = 0;
- while ((devc->buflen - offset) >= dmm->packet_size) {
- if (dmm->packet_valid(devc->buf + offset)) {
- handle_packet(devc->buf + offset, sdi, info);
- offset += dmm->packet_size;
-
- /* Request next packet, if required. */
- if (!dmm->packet_request)
+ devc->buflen += ret;
+
+ /*
+ * Process packets when their reception has completed, or keep
+ * trying to synchronize to the stream of input data.
+ */
+ check_pos = 0;
+ while (check_pos < devc->buflen) {
+ /* Got the (minimum) amount of receive data for a packet? */
+ check_len = devc->buflen - check_pos;
+ if (check_len < dmm->packet_size)
+ break;
+ sr_dbg("Checking: pos %zu, len %zu.", check_pos, check_len);
+
+ /* Is it a valid packet? */
+ check_ptr = &devc->buf[check_pos];
+ if (dmm->packet_valid_len) {
+ ret = dmm->packet_valid_len(dmm->dmm_state,
+ check_ptr, check_len, &pkt_size);
+ if (ret == SR_PACKET_NEED_RX) {
+ sr_dbg("Need more RX data.");
break;
- if (dmm->req_timeout_ms || dmm->req_delay_ms)
- devc->req_next_at = g_get_monotonic_time() +
- dmm->req_delay_ms * 1000;
- req_packet(sdi);
- } else {
- offset++;
+ }
+ if (ret == SR_PACKET_INVALID) {
+ sr_dbg("Not a valid packet, searching.");
+ check_pos++;
+ continue;
+ }
+ } else if (dmm->packet_valid) {
+ if (!dmm->packet_valid(check_ptr)) {
+ sr_dbg("Not a valid packet, searching.");
+ check_pos++;
+ continue;
+ }
+ pkt_size = dmm->packet_size;
+ }
+
+ /* Process the package. */
+ sr_dbg("Valid packet, size %zu, processing", pkt_size);
+ handle_packet(sdi, check_ptr, pkt_size, info);
+ check_pos += pkt_size;
+
+ /* Arrange for the next packet request if needed. */
+ if (!dmm->packet_request)
+ continue;
+ if (dmm->req_timeout_ms || dmm->req_delay_ms) {
+ deadline = g_get_monotonic_time();
+ deadline += dmm->req_delay_ms * 1000;
+ devc->req_next_at = deadline;
}
+ req_packet(sdi);
+ continue;
}
/* If we have any data left, move it to the beginning of our buffer. */
- if (devc->buflen > offset)
- memmove(devc->buf, devc->buf + offset, devc->buflen - offset);
- devc->buflen -= offset;
+ if (devc->buflen > check_pos) {
+ copy_len = devc->buflen - check_pos;
+ memmove(&devc->buf[0], &devc->buf[check_pos], copy_len);
+ }
+ devc->buflen -= check_pos;
+
+ /*
+ * If the complete buffer filled up and none of it got processed,
+ * discard the unprocessed buffer, re-sync to the stream in later
+ * calls again.
+ */
+ if (devc->buflen == sizeof(devc->buf)) {
+ sr_info("Drop unprocessed RX data, try to re-sync to stream.");
+ devc->buflen = 0;
+ }
}
int receive_data(int fd, int revents, void *cb_data)