From: Daniel Elstner Date: Tue, 1 Sep 2015 01:36:03 +0000 (+0200) Subject: session: Allow multiple poll FDs per event source X-Git-Tag: libsigrok-0.4.0~347 X-Git-Url: https://sigrok.org/gitaction?a=commitdiff_plain;h=92248e7821d6ed98390088dab3a4edd329ef414a;p=libsigrok.git session: Allow multiple poll FDs per event source Turns out that having one event source per libusb poll FD is a bad idea. There is only a single callback for all poll FDs, and libusb expects to be called only once per poll iteration, no matter how many FDs triggered. Also, they should all share the same timeout, which should get reset on events from any polled FD. The new timeout handling made this problem apparent, as it caused the callback to be invoked multiple times on timeouts, once for each separate event source. In order to fix this, change the implementation to allow for an arbitrary number of poll FDs per event source. This number is zero for timer FDs, one for normal I/O sources, and one or more for libusb sources (Unix only). Also, on Windows, do not get an additional timeout from libusb in the event loop. This is only appropriate when polling the libusb FDs directly, which we aren't doing on Windows. --- diff --git a/src/libsigrok-internal.h b/src/libsigrok-internal.h index 7023c491..86a6326a 100644 --- a/src/libsigrok-internal.h +++ b/src/libsigrok-internal.h @@ -722,10 +722,11 @@ struct sr_session { gboolean running; /* - * Each I/O source has an entry with the same index in both "sources" - * and "pollfds". The "sources" array may be larger than "pollfds", - * in which case the excess sources are pure timer sources. - * We can not embed the GPollFD into the source struct since we want + * Event sources and poll FDs are stored in the same order in the + * the sources and pollfds arrays. However, each source may cover + * any number of associated poll FDs, so the indices do not match. + * + * We cannot embed the GPollFD into the source struct since we want * to be able to pass the array of all poll descriptors to g_poll(). */ GArray *sources; @@ -743,8 +744,11 @@ struct sr_session { }; SR_PRIV int sr_session_source_add_internal(struct sr_session *session, - GPollFD *pollfd, int timeout, sr_receive_data_callback cb, - void *cb_data, gintptr poll_object, gboolean is_usb); + const GPollFD *pollfds, int num_fds, int timeout, + sr_receive_data_callback cb, void *cb_data, + gintptr poll_object); +SR_PRIV int sr_session_source_remove_internal(struct sr_session *session, + gintptr poll_object); SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi, const struct sr_datafeed_packet *packet); SR_PRIV int sr_session_stop_sync(struct sr_session *session); diff --git a/src/session.c b/src/session.c index 263c4c1d..276f328a 100644 --- a/src/session.c +++ b/src/session.c @@ -47,6 +47,7 @@ struct source { int64_t timeout; /* microseconds */ int64_t due; /* microseconds */ + sr_receive_data_callback cb; void *cb_data; @@ -55,7 +56,12 @@ struct source { */ gintptr poll_object; - gboolean is_usb; + /* Number of fds to poll for this source. This will be 0 for timer + * sources, 1 for normal I/O sources, and 1 or more for libusb I/O + * sources on Unix platforms. + */ + int num_fds; + gboolean triggered; }; @@ -383,8 +389,6 @@ static gboolean sr_session_check_aborted(struct sr_session *session) return stop; } -static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object); - /** * Poll the session's event sources. * @@ -397,6 +401,7 @@ static int sr_session_iteration(struct sr_session *session) int64_t start_time, stop_time, min_due, due; int timeout_ms; unsigned int i; + int k, fd_index; int ret; int fd; int revents; @@ -404,7 +409,7 @@ static int sr_session_iteration(struct sr_session *session) struct source *source; GPollFD *pollfd; gintptr poll_object; -#ifdef HAVE_LIBUSB_1_0 +#if HAVE_LIBUSB_1_0 && !defined(G_OS_WIN32) int64_t usb_timeout; int64_t usb_due; struct timeval tv; @@ -422,7 +427,7 @@ static int sr_session_iteration(struct sr_session *session) min_due = source->due; source->triggered = FALSE; } -#ifdef HAVE_LIBUSB_1_0 +#if HAVE_LIBUSB_1_0 && !defined(G_OS_WIN32) usb_due = INT64_MAX; if (session->ctx->usb_source_present) { ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv); @@ -472,28 +477,38 @@ static int sr_session_iteration(struct sr_session *session) triggered = FALSE; stopped = FALSE; + fd_index = 0; for (i = 0; i < session->sources->len; ++i) { source = &g_array_index(session->sources, struct source, i); - if (source->triggered) - continue; /* already handled */ poll_object = source->poll_object; fd = (int)poll_object; revents = 0; - if (i < session->pollfds->len) { - pollfd = &g_array_index(session->pollfds, GPollFD, i); + for (k = 0; k < source->num_fds; ++k) { + pollfd = &g_array_index(session->pollfds, + GPollFD, fd_index + k); fd = pollfd->fd; - if (ret > 0) - revents = pollfd->revents; + revents |= pollfd->revents; } + fd_index += source->num_fds; + + if (source->triggered) + continue; /* already handled */ if (ret > 0 && revents == 0) continue; /* skip timeouts if any I/O event occurred */ + /* Make invalid to avoid confusion in case of multiple FDs. */ + if (source->num_fds > 1) + fd = -1; + if (ret <= 0) + revents = 0; + due = source->due; -#ifdef HAVE_LIBUSB_1_0 - if (source->is_usb && usb_due < due) +#if HAVE_LIBUSB_1_0 && !defined(G_OS_WIN32) + if (usb_due < due && source->poll_object + == (gintptr)session->ctx->libusb_ctx) due = usb_due; #endif if (revents == 0 && stop_time < due) @@ -510,7 +525,7 @@ static int sr_session_iteration(struct sr_session *session) * Invoke the source's callback on an event or timeout. */ if (!source->cb(fd, revents, source->cb_data)) - _sr_session_source_remove(session, poll_object); + sr_session_source_remove_internal(session, poll_object); /* * We want to take as little time as possible to stop * the session if we have been told to do so. Therefore, @@ -521,6 +536,7 @@ static int sr_session_iteration(struct sr_session *session) stopped = sr_session_check_aborted(session); /* Restart loop as the sources list may have changed. */ + fd_index = 0; i = 0; } @@ -884,33 +900,49 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi, * Add an event source for a file descriptor. * * @param session The session to use. Must not be NULL. - * @param pollfd The GPollFD. + * @param[in] pollfds The FDs to poll, or NULL if @a num_fds is 0. + * @param[in] num_fds Number of FDs in the array. * @param[in] timeout Max time in ms to wait before the callback is called, * or -1 to wait indefinitely. * @param cb Callback function to add. Must not be NULL. * @param cb_data Data for the callback function. Can be NULL. * @param poll_object Handle by which the source is identified - * @param is_usb TRUE for a libusb polling source * * @retval SR_OK Success. * @retval SR_ERR_ARG Invalid argument. + * @retval SR_ERR An event source for @a poll_object is already installed. */ SR_PRIV int sr_session_source_add_internal(struct sr_session *session, - GPollFD *pollfd, int timeout, sr_receive_data_callback cb, - void *cb_data, gintptr poll_object, gboolean is_usb) + const GPollFD *pollfds, int num_fds, int timeout, + sr_receive_data_callback cb, void *cb_data, + gintptr poll_object) { struct source src; + unsigned int i; + /* Note: cb_data can be NULL, that's not a bug. */ if (!cb) { sr_err("%s: cb was NULL", __func__); return SR_ERR_ARG; } - /* Note: cb_data can be NULL, that's not a bug. */ - + if (!pollfds && num_fds != 0) { + sr_err("%s: pollfds was NULL", __func__); + return SR_ERR_ARG; + } + /* Make sure that poll_object is unique. + */ + for (i = 0; i < session->sources->len; ++i) { + if (g_array_index(session->sources, struct source, i) + .poll_object == poll_object) { + sr_err("Event source for object %" G_GINTPTR_FORMAT + " already installed.", poll_object); + return SR_ERR; + } + } src.cb = cb; src.cb_data = cb_data; src.poll_object = poll_object; - src.is_usb = is_usb; + src.num_fds = num_fds; src.triggered = FALSE; if (timeout >= 0) { @@ -920,15 +952,10 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session, src.timeout = -1; src.due = INT64_MAX; } + g_array_append_val(session->sources, src); - if (pollfd) { - /* I/O source */ - g_array_insert_val(session->sources, session->pollfds->len, src); - g_array_append_vals(session->pollfds, pollfd, 1); - } else { - /* Timer source */ - g_array_append_val(session->sources, src); - } + if (num_fds > 0) + g_array_append_vals(session->pollfds, pollfds, num_fds); return SR_OK; } @@ -963,7 +990,7 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd, p.revents = 0; return sr_session_source_add_internal(session, - (fd < 0) ? NULL : &p, timeout, cb, cb_data, fd, FALSE); + &p, (fd < 0) ? 0 : 1, timeout, cb, cb_data, fd); } /** @@ -989,8 +1016,8 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session, sr_err("%s: pollfd was NULL", __func__); return SR_ERR_ARG; } - return sr_session_source_add_internal(session, pollfd, timeout, - cb, cb_data, (gintptr)pollfd, FALSE); + return sr_session_source_add_internal(session, pollfd, 1, + timeout, cb, cb_data, (gintptr)pollfd); } /** @@ -1022,35 +1049,45 @@ SR_API int sr_session_source_add_channel(struct sr_session *session, p.events = events; p.revents = 0; #endif - return sr_session_source_add_internal(session, &p, timeout, cb, - cb_data, (gintptr)channel, FALSE); + return sr_session_source_add_internal(session, &p, 1, + timeout, cb, cb_data, (gintptr)channel); } /** - * Remove the source belonging to the specified channel. + * Remove the source identified by the specified poll object. * * @param session The session to use. Must not be NULL. * @param poll_object The channel for which the source should be removed. * * @retval SR_OK Success - * @retval SR_ERR_ARG Invalid arguments - * @retval SR_ERR_BUG Internal error + * @retval SR_ERR_BUG No event source for poll_object found. */ -static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object) +SR_PRIV int sr_session_source_remove_internal(struct sr_session *session, + gintptr poll_object) { + struct source *source; unsigned int i; + int fd_index = 0; for (i = 0; i < session->sources->len; ++i) { - if (g_array_index(session->sources, struct source, i) - .poll_object == poll_object) { + source = &g_array_index(session->sources, struct source, i); + if (source->poll_object == poll_object) { + if (source->num_fds > 0) + g_array_remove_range(session->pollfds, + fd_index, source->num_fds); g_array_remove_index(session->sources, i); - if (i < session->pollfds->len) - g_array_remove_index(session->pollfds, i); - break; + return SR_OK; } + fd_index += source->num_fds; } - return SR_OK; + /* Trying to remove an already removed event source is problematic + * since the poll_object handle may have been reused in the meantime. + */ + sr_warn("Cannot remove non-existing event source for object %" + G_GINTPTR_FORMAT ".", poll_object); + + return SR_ERR_BUG; } /** @@ -1067,7 +1104,7 @@ static int _sr_session_source_remove(struct sr_session *session, gintptr poll_ob */ SR_API int sr_session_source_remove(struct sr_session *session, int fd) { - return _sr_session_source_remove(session, fd); + return sr_session_source_remove_internal(session, fd); } /** @@ -1089,7 +1126,7 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session, sr_err("%s: pollfd was NULL", __func__); return SR_ERR_ARG; } - return _sr_session_source_remove(session, (gintptr)pollfd); + return sr_session_source_remove_internal(session, (gintptr)pollfd); } /** @@ -1111,7 +1148,7 @@ SR_API int sr_session_source_remove_channel(struct sr_session *session, sr_err("%s: channel was NULL", __func__); return SR_ERR_ARG; } - return _sr_session_source_remove(session, (gintptr)channel); + return sr_session_source_remove_internal(session, (gintptr)channel); } static void copy_src(struct sr_config *src, struct sr_datafeed_meta *meta_copy) diff --git a/src/usb.c b/src/usb.c index 96cff396..da192ec1 100644 --- a/src/usb.c +++ b/src/usb.c @@ -236,6 +236,8 @@ static int usb_callback(int fd, int revents, void *cb_data) SR_PRIV int usb_source_add(struct sr_session *session, struct sr_context *ctx, int timeout, sr_receive_data_callback cb, void *cb_data) { + int ret; + if (ctx->usb_source_present) { sr_err("A USB event source is already present."); return SR_ERR; @@ -254,32 +256,43 @@ SR_PRIV int usb_source_add(struct sr_session *session, struct sr_context *ctx, ctx->usb_pollfd.revents = 0; ctx->usb_cb = cb; ctx->usb_cb_data = cb_data; - sr_session_source_add_internal(session, &ctx->usb_pollfd, timeout, - usb_callback, ctx, (gintptr)&ctx->usb_pollfd, TRUE); + ret = sr_session_source_add_pollfd(session, &ctx->usb_pollfd, + timeout, usb_callback, ctx); #else const struct libusb_pollfd **lupfd; - unsigned int i; + GPollFD *pollfds; + int i; + int num_fds = 0; lupfd = libusb_get_pollfds(ctx->libusb_ctx); - for (i = 0; lupfd[i]; i++) { - GPollFD p; - - p.fd = lupfd[i]->fd; - p.events = lupfd[i]->events; - p.revents = 0; - - sr_session_source_add_internal(session, &p, timeout, - cb, cb_data, p.fd, TRUE); + if (!lupfd || !lupfd[0]) { + free(lupfd); + sr_err("Failed to get libusb file descriptors."); + return SR_ERR; + } + while (lupfd[num_fds]) + ++num_fds; + pollfds = g_new(GPollFD, num_fds); + + for (i = 0; i < num_fds; ++i) { + pollfds[i].fd = lupfd[i]->fd; + pollfds[i].events = lupfd[i]->events; + pollfds[i].revents = 0; } free(lupfd); + ret = sr_session_source_add_internal(session, pollfds, num_fds, + timeout, cb, cb_data, (gintptr)ctx->libusb_ctx); + g_free(pollfds); #endif - ctx->usb_source_present = TRUE; + ctx->usb_source_present = (ret == SR_OK); - return SR_OK; + return ret; } SR_PRIV int usb_source_remove(struct sr_session *session, struct sr_context *ctx) { + int ret; + if (!ctx->usb_source_present) return SR_OK; @@ -291,22 +304,17 @@ SR_PRIV int usb_source_remove(struct sr_session *session, struct sr_context *ctx /* Wait for USB wait thread to terminate. */ g_thread_join(ctx->usb_thread); /* Remove USB event from session poll set. */ - sr_session_source_remove_pollfd(session, &ctx->usb_pollfd); + ret = sr_session_source_remove_pollfd(session, &ctx->usb_pollfd); /* Close event handles that were used between threads. */ CloseHandle(ctx->usb_wait_request_event); CloseHandle(ctx->usb_wait_complete_event); #else - const struct libusb_pollfd **lupfd; - unsigned int i; - - lupfd = libusb_get_pollfds(ctx->libusb_ctx); - for (i = 0; lupfd[i]; i++) - sr_session_source_remove(session, lupfd[i]->fd); - free(lupfd); + ret = sr_session_source_remove_internal(session, + (gintptr)ctx->libusb_ctx); #endif ctx->usb_source_present = FALSE; - return SR_OK; + return ret; } SR_PRIV int usb_get_port_path(libusb_device *dev, char *path, int path_len)