]> sigrok.org Git - libsigrok.git/commitdiff
session: Allow multiple poll FDs per event source
authorDaniel Elstner <redacted>
Tue, 1 Sep 2015 01:36:03 +0000 (03:36 +0200)
committerDaniel Elstner <redacted>
Thu, 3 Sep 2015 17:37:09 +0000 (19:37 +0200)
Turns out that having one event source per libusb poll FD is
a bad idea. There is only a single callback for all poll FDs,
and libusb expects to be called only once per poll iteration,
no matter how many FDs triggered.

Also, they should all share the same timeout, which should get
reset on events from any polled FD. The new timeout handling made
this problem apparent, as it caused the callback to be invoked
multiple times on timeouts, once for each separate event source.

In order to fix this, change the implementation to allow for an
arbitrary number of poll FDs per event source. This number is
zero for timer FDs, one for normal I/O sources, and one or more
for libusb sources (Unix only).

Also, on Windows, do not get an additional timeout from libusb
in the event loop. This is only appropriate when polling the
libusb FDs directly, which we aren't doing on Windows.

src/libsigrok-internal.h
src/session.c
src/usb.c

index 7023c491cb1591fbfdec85dcdc25757fed41f619..86a6326a76f52e881315cea54f3ccdb9b26084bd 100644 (file)
@@ -722,10 +722,11 @@ struct sr_session {
        gboolean running;
 
        /*
-        * Each I/O source has an entry with the same index in both "sources"
-        * and "pollfds". The "sources" array may be larger than "pollfds",
-        * in which case the excess sources are pure timer sources.
-        * We can not embed the GPollFD into the source struct since we want
+        * Event sources and poll FDs are stored in the same order in the
+        * the sources and pollfds arrays. However, each source may cover
+        * any number of associated poll FDs, so the indices do not match.
+        *
+        * We cannot embed the GPollFD into the source struct since we want
         * to be able to pass the array of all poll descriptors to g_poll().
         */
        GArray *sources;
@@ -743,8 +744,11 @@ struct sr_session {
 };
 
 SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
-               GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
-               void *cb_data, gintptr poll_object, gboolean is_usb);
+               const GPollFD *pollfds, int num_fds, int timeout,
+               sr_receive_data_callback cb, void *cb_data,
+               gintptr poll_object);
+SR_PRIV int sr_session_source_remove_internal(struct sr_session *session,
+               gintptr poll_object);
 SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
                const struct sr_datafeed_packet *packet);
 SR_PRIV int sr_session_stop_sync(struct sr_session *session);
index 263c4c1dd3e749ddb943b838f5663535a7335704..276f328a1b5ef9cd1ef11298a113fe6708fcd74e 100644 (file)
@@ -47,6 +47,7 @@
 struct source {
        int64_t timeout;        /* microseconds */
        int64_t due;            /* microseconds */
+
        sr_receive_data_callback cb;
        void *cb_data;
 
@@ -55,7 +56,12 @@ struct source {
         */
        gintptr poll_object;
 
-       gboolean is_usb;
+       /* Number of fds to poll for this source. This will be 0 for timer
+        * sources, 1 for normal I/O sources, and 1 or more for libusb I/O
+        * sources on Unix platforms.
+        */
+       int num_fds;
+
        gboolean triggered;
 };
 
@@ -383,8 +389,6 @@ static gboolean sr_session_check_aborted(struct sr_session *session)
        return stop;
 }
 
-static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object);
-
 /**
  * Poll the session's event sources.
  *
@@ -397,6 +401,7 @@ static int sr_session_iteration(struct sr_session *session)
        int64_t start_time, stop_time, min_due, due;
        int timeout_ms;
        unsigned int i;
+       int k, fd_index;
        int ret;
        int fd;
        int revents;
@@ -404,7 +409,7 @@ static int sr_session_iteration(struct sr_session *session)
        struct source *source;
        GPollFD *pollfd;
        gintptr poll_object;
-#ifdef HAVE_LIBUSB_1_0
+#if HAVE_LIBUSB_1_0 && !defined(G_OS_WIN32)
        int64_t usb_timeout;
        int64_t usb_due;
        struct timeval tv;
@@ -422,7 +427,7 @@ static int sr_session_iteration(struct sr_session *session)
                        min_due = source->due;
                source->triggered = FALSE;
        }
-#ifdef HAVE_LIBUSB_1_0
+#if HAVE_LIBUSB_1_0 && !defined(G_OS_WIN32)
        usb_due = INT64_MAX;
        if (session->ctx->usb_source_present) {
                ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv);
@@ -472,28 +477,38 @@ static int sr_session_iteration(struct sr_session *session)
 
        triggered = FALSE;
        stopped = FALSE;
+       fd_index = 0;
 
        for (i = 0; i < session->sources->len; ++i) {
                source = &g_array_index(session->sources, struct source, i);
-               if (source->triggered)
-                       continue; /* already handled */
 
                poll_object = source->poll_object;
                fd = (int)poll_object;
                revents = 0;
 
-               if (i < session->pollfds->len) {
-                       pollfd = &g_array_index(session->pollfds, GPollFD, i);
+               for (k = 0; k < source->num_fds; ++k) {
+                       pollfd = &g_array_index(session->pollfds,
+                                       GPollFD, fd_index + k);
                        fd = pollfd->fd;
-                       if (ret > 0)
-                               revents = pollfd->revents;
+                       revents |= pollfd->revents;
                }
+               fd_index += source->num_fds;
+
+               if (source->triggered)
+                       continue; /* already handled */
                if (ret > 0 && revents == 0)
                        continue; /* skip timeouts if any I/O event occurred */
 
+               /* Make invalid to avoid confusion in case of multiple FDs. */
+               if (source->num_fds > 1)
+                       fd = -1;
+               if (ret <= 0)
+                       revents = 0;
+
                due = source->due;
-#ifdef HAVE_LIBUSB_1_0
-               if (source->is_usb && usb_due < due)
+#if HAVE_LIBUSB_1_0 && !defined(G_OS_WIN32)
+               if (usb_due < due && source->poll_object
+                               == (gintptr)session->ctx->libusb_ctx)
                        due = usb_due;
 #endif
                if (revents == 0 && stop_time < due)
@@ -510,7 +525,7 @@ static int sr_session_iteration(struct sr_session *session)
                 * Invoke the source's callback on an event or timeout.
                 */
                if (!source->cb(fd, revents, source->cb_data))
-                       _sr_session_source_remove(session, poll_object);
+                       sr_session_source_remove_internal(session, poll_object);
                /*
                 * We want to take as little time as possible to stop
                 * the session if we have been told to do so. Therefore,
@@ -521,6 +536,7 @@ static int sr_session_iteration(struct sr_session *session)
                        stopped = sr_session_check_aborted(session);
 
                /* Restart loop as the sources list may have changed. */
+               fd_index = 0;
                i = 0;
        }
 
@@ -884,33 +900,49 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
  * Add an event source for a file descriptor.
  *
  * @param session The session to use. Must not be NULL.
- * @param pollfd The GPollFD.
+ * @param[in] pollfds The FDs to poll, or NULL if @a num_fds is 0.
+ * @param[in] num_fds Number of FDs in the array.
  * @param[in] timeout Max time in ms to wait before the callback is called,
  *                    or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  * @param poll_object Handle by which the source is identified
- * @param is_usb TRUE for a libusb polling source
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
+ * @retval SR_ERR An event source for @a poll_object is already installed.
  */
 SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
-               GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
-               void *cb_data, gintptr poll_object, gboolean is_usb)
+               const GPollFD *pollfds, int num_fds, int timeout,
+               sr_receive_data_callback cb, void *cb_data,
+               gintptr poll_object)
 {
        struct source src;
+       unsigned int i;
 
+       /* Note: cb_data can be NULL, that's not a bug. */
        if (!cb) {
                sr_err("%s: cb was NULL", __func__);
                return SR_ERR_ARG;
        }
-       /* Note: cb_data can be NULL, that's not a bug. */
-
+       if (!pollfds && num_fds != 0) {
+               sr_err("%s: pollfds was NULL", __func__);
+               return SR_ERR_ARG;
+       }
+       /* Make sure that poll_object is unique.
+        */
+       for (i = 0; i < session->sources->len; ++i) {
+               if (g_array_index(session->sources, struct source, i)
+                               .poll_object == poll_object) {
+                       sr_err("Event source for object %" G_GINTPTR_FORMAT
+                               " already installed.", poll_object);
+                       return SR_ERR;
+               }
+       }
        src.cb = cb;
        src.cb_data = cb_data;
        src.poll_object = poll_object;
-       src.is_usb = is_usb;
+       src.num_fds = num_fds;
        src.triggered = FALSE;
 
        if (timeout >= 0) {
@@ -920,15 +952,10 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
                src.timeout = -1;
                src.due = INT64_MAX;
        }
+       g_array_append_val(session->sources, src);
 
-       if (pollfd) {
-               /* I/O source */
-               g_array_insert_val(session->sources, session->pollfds->len, src);
-               g_array_append_vals(session->pollfds, pollfd, 1);
-       } else {
-               /* Timer source */
-               g_array_append_val(session->sources, src);
-       }
+       if (num_fds > 0)
+               g_array_append_vals(session->pollfds, pollfds, num_fds);
 
        return SR_OK;
 }
@@ -963,7 +990,7 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd,
        p.revents = 0;
 
        return sr_session_source_add_internal(session,
-               (fd < 0) ? NULL : &p, timeout, cb, cb_data, fd, FALSE);
+               &p, (fd < 0) ? 0 : 1, timeout, cb, cb_data, fd);
 }
 
 /**
@@ -989,8 +1016,8 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
                sr_err("%s: pollfd was NULL", __func__);
                return SR_ERR_ARG;
        }
-       return sr_session_source_add_internal(session, pollfd, timeout,
-                       cb, cb_data, (gintptr)pollfd, FALSE);
+       return sr_session_source_add_internal(session, pollfd, 1,
+                       timeout, cb, cb_data, (gintptr)pollfd);
 }
 
 /**
@@ -1022,35 +1049,45 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
        p.events = events;
        p.revents = 0;
 #endif
-       return sr_session_source_add_internal(session, &p, timeout, cb,
-                       cb_data, (gintptr)channel, FALSE);
+       return sr_session_source_add_internal(session, &p, 1,
+                       timeout, cb, cb_data, (gintptr)channel);
 }
 
 /**
- * Remove the source belonging to the specified channel.
+ * Remove the source identified by the specified poll object.
  *
  * @param session The session to use. Must not be NULL.
  * @param poll_object The channel for which the source should be removed.
  *
  * @retval SR_OK Success
- * @retval SR_ERR_ARG Invalid arguments
- * @retval SR_ERR_BUG Internal error
+ * @retval SR_ERR_BUG No event source for poll_object found.
  */
-static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object)
+SR_PRIV int sr_session_source_remove_internal(struct sr_session *session,
+               gintptr poll_object)
 {
+       struct source *source;
        unsigned int i;
+       int fd_index = 0;
 
        for (i = 0; i < session->sources->len; ++i) {
-               if (g_array_index(session->sources, struct source, i)
-                                       .poll_object == poll_object) {
+               source = &g_array_index(session->sources, struct source, i);
 
+               if (source->poll_object == poll_object) {
+                       if (source->num_fds > 0)
+                               g_array_remove_range(session->pollfds,
+                                               fd_index, source->num_fds);
                        g_array_remove_index(session->sources, i);
-                       if (i < session->pollfds->len)
-                               g_array_remove_index(session->pollfds, i);
-                       break;
+                       return SR_OK;
                }
+               fd_index += source->num_fds;
        }
-       return SR_OK;
+       /* Trying to remove an already removed event source is problematic
+        * since the poll_object handle may have been reused in the meantime.
+        */
+       sr_warn("Cannot remove non-existing event source for object %"
+               G_GINTPTR_FORMAT ".", poll_object);
+
+       return SR_ERR_BUG;
 }
 
 /**
@@ -1067,7 +1104,7 @@ static int _sr_session_source_remove(struct sr_session *session, gintptr poll_ob
  */
 SR_API int sr_session_source_remove(struct sr_session *session, int fd)
 {
-       return _sr_session_source_remove(session, fd);
+       return sr_session_source_remove_internal(session, fd);
 }
 
 /**
@@ -1089,7 +1126,7 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
                sr_err("%s: pollfd was NULL", __func__);
                return SR_ERR_ARG;
        }
-       return _sr_session_source_remove(session, (gintptr)pollfd);
+       return sr_session_source_remove_internal(session, (gintptr)pollfd);
 }
 
 /**
@@ -1111,7 +1148,7 @@ SR_API int sr_session_source_remove_channel(struct sr_session *session,
                sr_err("%s: channel was NULL", __func__);
                return SR_ERR_ARG;
        }
-       return _sr_session_source_remove(session, (gintptr)channel);
+       return sr_session_source_remove_internal(session, (gintptr)channel);
 }
 
 static void copy_src(struct sr_config *src, struct sr_datafeed_meta *meta_copy)
index 96cff39662abe0bc88db48759c81d249faac8c7a..da192ec1710f2e6c3514ce58f8f85b45b499d76a 100644 (file)
--- a/src/usb.c
+++ b/src/usb.c
@@ -236,6 +236,8 @@ static int usb_callback(int fd, int revents, void *cb_data)
 SR_PRIV int usb_source_add(struct sr_session *session, struct sr_context *ctx,
                int timeout, sr_receive_data_callback cb, void *cb_data)
 {
+       int ret;
+
        if (ctx->usb_source_present) {
                sr_err("A USB event source is already present.");
                return SR_ERR;
@@ -254,32 +256,43 @@ SR_PRIV int usb_source_add(struct sr_session *session, struct sr_context *ctx,
        ctx->usb_pollfd.revents = 0;
        ctx->usb_cb = cb;
        ctx->usb_cb_data = cb_data;
-       sr_session_source_add_internal(session, &ctx->usb_pollfd, timeout,
-                       usb_callback, ctx, (gintptr)&ctx->usb_pollfd, TRUE);
+       ret = sr_session_source_add_pollfd(session, &ctx->usb_pollfd,
+                       timeout, usb_callback, ctx);
 #else
        const struct libusb_pollfd **lupfd;
-       unsigned int i;
+       GPollFD *pollfds;
+       int i;
+       int num_fds = 0;
 
        lupfd = libusb_get_pollfds(ctx->libusb_ctx);
-       for (i = 0; lupfd[i]; i++) {
-               GPollFD p;
-
-               p.fd = lupfd[i]->fd;
-               p.events = lupfd[i]->events;
-               p.revents = 0;
-
-               sr_session_source_add_internal(session, &p, timeout,
-                               cb, cb_data, p.fd, TRUE);
+       if (!lupfd || !lupfd[0]) {
+               free(lupfd);
+               sr_err("Failed to get libusb file descriptors.");
+               return SR_ERR;
+       }
+       while (lupfd[num_fds])
+               ++num_fds;
+       pollfds = g_new(GPollFD, num_fds);
+
+       for (i = 0; i < num_fds; ++i) {
+               pollfds[i].fd = lupfd[i]->fd;
+               pollfds[i].events = lupfd[i]->events;
+               pollfds[i].revents = 0;
        }
        free(lupfd);
+       ret = sr_session_source_add_internal(session, pollfds, num_fds,
+                       timeout, cb, cb_data, (gintptr)ctx->libusb_ctx);
+       g_free(pollfds);
 #endif
-       ctx->usb_source_present = TRUE;
+       ctx->usb_source_present = (ret == SR_OK);
 
-       return SR_OK;
+       return ret;
 }
 
 SR_PRIV int usb_source_remove(struct sr_session *session, struct sr_context *ctx)
 {
+       int ret;
+
        if (!ctx->usb_source_present)
                return SR_OK;
 
@@ -291,22 +304,17 @@ SR_PRIV int usb_source_remove(struct sr_session *session, struct sr_context *ctx
        /* Wait for USB wait thread to terminate. */
        g_thread_join(ctx->usb_thread);
        /* Remove USB event from session poll set. */
-       sr_session_source_remove_pollfd(session, &ctx->usb_pollfd);
+       ret = sr_session_source_remove_pollfd(session, &ctx->usb_pollfd);
        /* Close event handles that were used between threads. */
        CloseHandle(ctx->usb_wait_request_event);
        CloseHandle(ctx->usb_wait_complete_event);
 #else
-       const struct libusb_pollfd **lupfd;
-       unsigned int i;
-
-       lupfd = libusb_get_pollfds(ctx->libusb_ctx);
-       for (i = 0; lupfd[i]; i++)
-               sr_session_source_remove(session, lupfd[i]->fd);
-       free(lupfd);
+       ret = sr_session_source_remove_internal(session,
+                       (gintptr)ctx->libusb_ctx);
 #endif
        ctx->usb_source_present = FALSE;
 
-       return SR_OK;
+       return ret;
 }
 
 SR_PRIV int usb_get_port_path(libusb_device *dev, char *path, int path_len)