From: Daniel Elstner Date: Sun, 30 Aug 2015 19:43:30 +0000 (+0200) Subject: session: Properly accumulate event source timeouts X-Git-Tag: libsigrok-0.4.0~354 X-Git-Url: https://sigrok.org/gitaction?a=commitdiff_plain;h=62d7945f8059ccbf56dfa2e5eb60671dd5bc959b;p=libsigrok.git session: Properly accumulate event source timeouts Keep track of when source timeouts are due and properly compare against accumulated elapsed time between invocations. This prevents sources with short timeouts from blocking other sources with longer timeouts indefinitely. --- diff --git a/src/libsigrok-internal.h b/src/libsigrok-internal.h index 5004fc7b..a937d046 100644 --- a/src/libsigrok-internal.h +++ b/src/libsigrok-internal.h @@ -718,7 +718,7 @@ struct sr_session { GSList *datafeed_callbacks; GSList *transforms; struct sr_trigger *trigger; - GTimeVal starttime; + gboolean running; unsigned int num_sources; @@ -731,7 +731,6 @@ struct sr_session { */ struct source *sources; GPollFD *pollfds; - int source_timeout; /* * These are our synchronization primitives for stopping the session in @@ -744,6 +743,9 @@ struct sr_session { gboolean abort_session; }; +SR_PRIV int sr_session_source_add_internal(struct sr_session *session, + GPollFD *pollfd, int timeout, sr_receive_data_callback cb, + void *cb_data, gintptr poll_object, gboolean is_usb); SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi, const struct sr_datafeed_packet *packet); SR_PRIV int sr_session_stop_sync(struct sr_session *session); diff --git a/src/session.c b/src/session.c index 138f4222..e000dd0e 100644 --- a/src/session.c +++ b/src/session.c @@ -45,7 +45,7 @@ */ struct source { - int timeout; + int64_t due; sr_receive_data_callback cb; void *cb_data; @@ -53,6 +53,9 @@ struct source { * being polled and will be used to match the source when removing it again. */ gintptr poll_object; + + int timeout; + gboolean is_usb; }; struct datafeed_callback { @@ -84,7 +87,6 @@ SR_API int sr_session_new(struct sr_context *ctx, session = g_malloc0(sizeof(struct sr_session)); session->ctx = ctx; - session->source_timeout = -1; session->running = FALSE; session->abort_session = FALSE; g_mutex_init(&session->stop_mutex); @@ -383,17 +385,15 @@ static gboolean sr_session_check_aborted(struct sr_session *session) * from within that scheduler. * * @param session The session to use. Must not be NULL. - * @param block If TRUE, this call will wait for any of the session's - * sources to fire an event on the file descriptors, or - * any of their timeouts to activate. In other words, this - * can be used as a select loop. - * If FALSE, return immediately if none of the sources has - * events pending. * @retval SR_OK Success. * @retval SR_ERR Error occurred. */ -static int sr_session_iteration(struct sr_session *session, gboolean block) +static int sr_session_iteration(struct sr_session *session) { + int64_t start_time; + int64_t stop_time; + int64_t min_due; + int64_t due; unsigned int i; int ret, timeout; int revents; @@ -401,14 +401,24 @@ static int sr_session_iteration(struct sr_session *session, gboolean block) gboolean stopped; struct source *source; GPollFD *pollfd; + gintptr poll_object; #ifdef HAVE_LIBUSB_1_0 - int usb_timeout; + int64_t usb_due; struct timeval tv; #endif + if (session->num_sources <= 0) { + sr_session_check_aborted(session); + return SR_OK; + } + start_time = g_get_monotonic_time(); + min_due = INT64_MAX; - timeout = block ? session->source_timeout : 0; - + for (i = 0; i < session->num_sources; ++i) { + if (session->sources[i].due < min_due) + min_due = session->sources[i].due; + } #ifdef HAVE_LIBUSB_1_0 + usb_due = INT64_MAX; if (session->ctx->usb_source_present) { ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv); if (ret < 0) { @@ -416,11 +426,18 @@ static int sr_session_iteration(struct sr_session *session, gboolean block) libusb_error_name(ret)); return SR_ERR; } else if (ret == 1) { - usb_timeout = tv.tv_sec * 1000 + tv.tv_usec / 1000; - timeout = MIN(timeout, usb_timeout); + usb_due = start_time + tv.tv_usec + + (int64_t)tv.tv_sec * G_USEC_PER_SEC; + if (usb_due < min_due) + min_due = usb_due; } } #endif + if (min_due > start_time) + timeout = (min_due == INT64_MAX) ? -1 + : MIN((min_due - start_time + 999) / 1000, INT_MAX); + else + timeout = 0; ret = g_poll(session->pollfds, session->num_sources, timeout); #ifdef G_OS_UNIX @@ -434,6 +451,7 @@ static int sr_session_iteration(struct sr_session *session, gboolean block) return SR_ERR; } #endif + stop_time = g_get_monotonic_time(); stop_checked = FALSE; stopped = FALSE; @@ -441,28 +459,38 @@ static int sr_session_iteration(struct sr_session *session, gboolean block) source = &session->sources[i]; pollfd = &session->pollfds[i]; revents = (ret > 0) ? pollfd->revents : 0; - - if (revents > 0 || (ret == 0 - && session->source_timeout == source->timeout)) { - /* - * Invoke the source's callback on an event, - * or if the poll timed out and this source - * asked for that timeout. - */ - if (!source->cb(pollfd->fd, revents, source->cb_data)) - sr_session_source_remove(session, - source->poll_object); - /* - * We want to take as little time as possible to stop - * the session if we have been told to do so. Therefore, - * we check the flag after processing every source, not - * just once per main event loop. - */ - if (!stopped) { - stopped = sr_session_check_aborted(session); - stop_checked = TRUE; - } + due = source->due; +#ifdef HAVE_LIBUSB_1_0 + if (source->is_usb && usb_due < due) + due = usb_due; +#endif + if (revents == 0 && stop_time < due) + continue; + /* + * The source may be gone after the callback returns, + * so access any data now that needs accessing. + */ + poll_object = source->poll_object; + if (source->timeout > 0) + source->due = stop_time + INT64_C(1000) * source->timeout; + pollfd->revents = 0; + /* + * Invoke the source's callback on an event or timeout. + */ + if (!source->cb(pollfd->fd, revents, source->cb_data)) + sr_session_source_remove(session, poll_object); + /* + * We want to take as little time as possible to stop + * the session if we have been told to do so. Therefore, + * we check the flag after processing every source, not + * just once per main event loop. + */ + if (!stopped) { + stopped = sr_session_check_aborted(session); + stop_checked = TRUE; } + /* Restart loop as the sources list may have changed. */ + i = 0; } if (!stop_checked) sr_session_check_aborted(session); @@ -609,7 +637,7 @@ SR_API int sr_session_run(struct sr_session *session) } else { /* Real sources, use g_poll() main loop. */ while (session->num_sources) - sr_session_iteration(session, TRUE); + sr_session_iteration(session); } return SR_OK; @@ -830,13 +858,15 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi, * ignored if 0. * @param cb Callback function to add. Must not be NULL. * @param cb_data Data for the callback function. Can be NULL. - * @param poll_object TODO. + * @param poll_object Handle by which the source is identified + * @param is_usb TRUE for a libusb polling source * * @retval SR_OK Success. * @retval SR_ERR_ARG Invalid argument. */ -static int _sr_session_source_add(struct sr_session *session, GPollFD *pollfd, - int timeout, sr_receive_data_callback cb, void *cb_data, gintptr poll_object) +SR_PRIV int sr_session_source_add_internal(struct sr_session *session, + GPollFD *pollfd, int timeout, sr_receive_data_callback cb, + void *cb_data, gintptr poll_object, gboolean is_usb) { struct source *new_sources, *s; GPollFD *new_pollfds; @@ -855,17 +885,18 @@ static int _sr_session_source_add(struct sr_session *session, GPollFD *pollfd, new_pollfds[session->num_sources] = *pollfd; s = &new_sources[session->num_sources++]; - s->timeout = timeout; + if (timeout > 0) + s->due = g_get_monotonic_time() + INT64_C(1000) * timeout; + else + s->due = INT64_MAX; s->cb = cb; s->cb_data = cb_data; s->poll_object = poll_object; + s->timeout = timeout; + s->is_usb = is_usb; session->pollfds = new_pollfds; session->sources = new_sources; - if (timeout != session->source_timeout && timeout > 0 - && (session->source_timeout == -1 || timeout < session->source_timeout)) - session->source_timeout = timeout; - return SR_OK; } @@ -893,7 +924,8 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd, p.events = events; p.revents = 0; - return _sr_session_source_add(session, &p, timeout, cb, cb_data, (gintptr)fd); + return sr_session_source_add_internal(session, &p, timeout, + cb, cb_data, fd, FALSE); } /** @@ -914,8 +946,8 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session, GPollFD *pollfd, int timeout, sr_receive_data_callback cb, void *cb_data) { - return _sr_session_source_add(session, pollfd, timeout, cb, - cb_data, (gintptr)pollfd); + return sr_session_source_add_internal(session, pollfd, timeout, + cb, cb_data, (gintptr)pollfd, FALSE); } /** @@ -946,8 +978,8 @@ SR_API int sr_session_source_add_channel(struct sr_session *session, p.events = events; p.revents = 0; #endif - - return _sr_session_source_add(session, &p, timeout, cb, cb_data, (gintptr)channel); + return sr_session_source_add_internal(session, &p, timeout, cb, + cb_data, (gintptr)channel, FALSE); } /** diff --git a/src/usb.c b/src/usb.c index 5038441c..96cff396 100644 --- a/src/usb.c +++ b/src/usb.c @@ -254,16 +254,23 @@ SR_PRIV int usb_source_add(struct sr_session *session, struct sr_context *ctx, ctx->usb_pollfd.revents = 0; ctx->usb_cb = cb; ctx->usb_cb_data = cb_data; - sr_session_source_add_pollfd(session, &ctx->usb_pollfd, timeout, - usb_callback, ctx); + sr_session_source_add_internal(session, &ctx->usb_pollfd, timeout, + usb_callback, ctx, (gintptr)&ctx->usb_pollfd, TRUE); #else const struct libusb_pollfd **lupfd; unsigned int i; lupfd = libusb_get_pollfds(ctx->libusb_ctx); - for (i = 0; lupfd[i]; i++) - sr_session_source_add(session, lupfd[i]->fd, lupfd[i]->events, - timeout, cb, cb_data); + for (i = 0; lupfd[i]; i++) { + GPollFD p; + + p.fd = lupfd[i]->fd; + p.events = lupfd[i]->events; + p.revents = 0; + + sr_session_source_add_internal(session, &p, timeout, + cb, cb_data, p.fd, TRUE); + } free(lupfd); #endif ctx->usb_source_present = TRUE;