]> sigrok.org Git - libsigrok.git/commitdiff
session: Unify handling of I/O and timer sources
authorDaniel Elstner <redacted>
Mon, 31 Aug 2015 00:35:57 +0000 (02:35 +0200)
committerDaniel Elstner <redacted>
Thu, 3 Sep 2015 17:37:09 +0000 (19:37 +0200)
Handle I/O sources and timer ("dummy") sources within the same
polling loop, so that both may be used together. Slightly change
the API to improve consistency: a timeout value of -1 now disables
the timeout, and 0 makes the source always time out immediately.
The "dummy" sources already behaved that way, although it wasn't
documented as such.

Make sure that I/O events are processed preferentially: Skip any
timeout callbacks if an I/O event occurred within the same poll
iteration. This applies to both timer/idle sources and timeouts
of I/O sources.

Do not create dummy GPollFDs for timer/idle sources. Instead,
split the sources array into an I/O section and a timer section,
and create corresponding GPollFDs only for the I/O section. Use
GArray to simplify the handling of the dynamic arrays.

src/libsigrok-internal.h
src/session.c

index a937d046ff8854399bae7111b471ca8741acf797..7023c491cb1591fbfdec85dcdc25757fed41f619 100644 (file)
@@ -721,16 +721,15 @@ struct sr_session {
 
        gboolean running;
 
-       unsigned int num_sources;
-
        /*
-        * Both "sources" and "pollfds" are of the same size and contain pairs
-        * of descriptor and callback function. We can not embed the GPollFD
-        * into the source struct since we want to be able to pass the array
-        * of all poll descriptors to g_poll().
+        * Each I/O source has an entry with the same index in both "sources"
+        * and "pollfds". The "sources" array may be larger than "pollfds",
+        * in which case the excess sources are pure timer sources.
+        * We can not embed the GPollFD into the source struct since we want
+        * to be able to pass the array of all poll descriptors to g_poll().
         */
-       struct source *sources;
-       GPollFD *pollfds;
+       GArray *sources;
+       GArray *pollfds;
 
        /*
         * These are our synchronization primitives for stopping the session in
index e000dd0ef7efb7be12e2d4ab4a9887c92fcce46f..aaeda1cc09ed199eebd59dd79078b6e0e6ef9bcf 100644 (file)
@@ -45,7 +45,8 @@
  */
 
 struct source {
-       int64_t due;
+       int64_t timeout;        /* microseconds */
+       int64_t due;            /* microseconds */
        sr_receive_data_callback cb;
        void *cb_data;
 
@@ -54,8 +55,8 @@ struct source {
         */
        gintptr poll_object;
 
-       int timeout;
        gboolean is_usb;
+       gboolean triggered;
 };
 
 struct datafeed_callback {
@@ -87,8 +88,10 @@ SR_API int sr_session_new(struct sr_context *ctx,
        session = g_malloc0(sizeof(struct sr_session));
 
        session->ctx = ctx;
-       session->running = FALSE;
-       session->abort_session = FALSE;
+
+       session->sources = g_array_new(FALSE, FALSE, sizeof(struct source));
+       session->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
+
        g_mutex_init(&session->stop_mutex);
 
        *new_session = session;
@@ -121,6 +124,9 @@ SR_API int sr_session_destroy(struct sr_session *session)
 
        g_slist_free_full(session->owned_devs, (GDestroyNotify)sr_dev_inst_free);
 
+       g_array_unref(session->pollfds);
+       g_array_unref(session->sources);
+
        g_free(session);
 
        return SR_OK;
@@ -377,12 +383,10 @@ static gboolean sr_session_check_aborted(struct sr_session *session)
        return stop;
 }
 
+static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object);
+
 /**
- * Call every device in the current session's callback.
- *
- * For sessions not driven by select loops such as sr_session_run(),
- * but driven by another scheduler, this can be used to poll the devices
- * from within that scheduler.
+ * Poll the session's event sources.
  *
  * @param session The session to use. Must not be NULL.
  * @retval SR_OK Success.
@@ -390,15 +394,13 @@ static gboolean sr_session_check_aborted(struct sr_session *session)
  */
 static int sr_session_iteration(struct sr_session *session)
 {
-       int64_t start_time;
-       int64_t stop_time;
-       int64_t min_due;
-       int64_t due;
+       int64_t start_time, stop_time, min_due, due;
+       int timeout_ms;
        unsigned int i;
-       int ret, timeout;
+       int ret;
+       int fd;
        int revents;
-       gboolean stop_checked;
-       gboolean stopped;
+       gboolean triggered, stopped;
        struct source *source;
        GPollFD *pollfd;
        gintptr poll_object;
@@ -406,16 +408,18 @@ static int sr_session_iteration(struct sr_session *session)
        int64_t usb_due;
        struct timeval tv;
 #endif
-       if (session->num_sources <= 0) {
+       if (session->sources->len == 0) {
                sr_session_check_aborted(session);
                return SR_OK;
        }
        start_time = g_get_monotonic_time();
        min_due = INT64_MAX;
 
-       for (i = 0; i < session->num_sources; ++i) {
-               if (session->sources[i].due < min_due)
-                       min_due = session->sources[i].due;
+       for (i = 0; i < session->sources->len; ++i) {
+               source = &g_array_index(session->sources, struct source, i);
+               if (source->due < min_due)
+                       min_due = source->due;
+               source->triggered = FALSE;
        }
 #ifdef HAVE_LIBUSB_1_0
        usb_due = INT64_MAX;
@@ -433,13 +437,15 @@ static int sr_session_iteration(struct sr_session *session)
                }
        }
 #endif
-       if (min_due > start_time)
-               timeout = (min_due == INT64_MAX) ? -1
-                       : MIN((min_due - start_time + 999) / 1000, INT_MAX);
+       if (min_due == INT64_MAX)
+               timeout_ms = -1;
+       else if (min_due > start_time)
+               timeout_ms = MIN((min_due - start_time + 999) / 1000, INT_MAX);
        else
-               timeout = 0;
+               timeout_ms = 0;
 
-       ret = g_poll(session->pollfds, session->num_sources, timeout);
+       ret = g_poll((GPollFD *)session->pollfds->data,
+                       session->pollfds->len, timeout_ms);
 #ifdef G_OS_UNIX
        if (ret < 0 && errno != EINTR) {
                sr_err("Error in poll: %s", g_strerror(errno));
@@ -452,13 +458,27 @@ static int sr_session_iteration(struct sr_session *session)
        }
 #endif
        stop_time = g_get_monotonic_time();
-       stop_checked = FALSE;
+       triggered = FALSE;
        stopped = FALSE;
 
-       for (i = 0; i < session->num_sources; i++) {
-               source = &session->sources[i];
-               pollfd = &session->pollfds[i];
-               revents = (ret > 0) ? pollfd->revents : 0;
+       for (i = 0; i < session->sources->len; ++i) {
+               source = &g_array_index(session->sources, struct source, i);
+               if (source->triggered)
+                       continue; /* already handled */
+
+               poll_object = source->poll_object;
+               fd = (int)poll_object;
+               revents = 0;
+
+               if (i < session->pollfds->len) {
+                       pollfd = &g_array_index(session->pollfds, GPollFD, i);
+                       fd = pollfd->fd;
+                       if (ret > 0)
+                               revents = pollfd->revents;
+               }
+               if (ret > 0 && revents == 0)
+                       continue; /* skip timeouts if any I/O event occurred */
+
                due = source->due;
 #ifdef HAVE_LIBUSB_1_0
                if (source->is_usb && usb_due < due)
@@ -470,29 +490,30 @@ static int sr_session_iteration(struct sr_session *session)
                 * The source may be gone after the callback returns,
                 * so access any data now that needs accessing.
                 */
-               poll_object = source->poll_object;
-               if (source->timeout > 0)
-                       source->due = stop_time + INT64_C(1000) * source->timeout;
-               pollfd->revents = 0;
+               if (source->timeout >= 0)
+                       source->due = stop_time + source->timeout;
+               source->triggered = TRUE;
+               triggered = TRUE;
                /*
                 * Invoke the source's callback on an event or timeout.
                 */
-               if (!source->cb(pollfd->fd, revents, source->cb_data))
-                       sr_session_source_remove(session, poll_object);
+               if (!source->cb(fd, revents, source->cb_data))
+                       _sr_session_source_remove(session, poll_object);
                /*
                 * We want to take as little time as possible to stop
                 * the session if we have been told to do so. Therefore,
                 * we check the flag after processing every source, not
                 * just once per main event loop.
                 */
-               if (!stopped) {
+               if (!stopped)
                        stopped = sr_session_check_aborted(session);
-                       stop_checked = TRUE;
-               }
+
                /* Restart loop as the sources list may have changed. */
                i = 0;
        }
-       if (!stop_checked)
+
+       /* Check for abort at least once per iteration. */
+       if (!triggered)
                sr_session_check_aborted(session);
 
        return SR_OK;
@@ -609,11 +630,14 @@ SR_API int sr_session_start(struct sr_session *session)
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid session passed.
+ * @retval SR_ERR Error during event processing.
  *
  * @since 0.4.0
  */
 SR_API int sr_session_run(struct sr_session *session)
 {
+       int ret;
+
        if (!session) {
                sr_err("%s: session was NULL", __func__);
                return SR_ERR_ARG;
@@ -629,17 +653,12 @@ SR_API int sr_session_run(struct sr_session *session)
 
        sr_info("Running.");
 
-       /* Do we have real sources? */
-       if (session->num_sources == 1 && session->pollfds[0].fd == -1) {
-               /* Dummy source, freewheel over it. */
-               while (session->num_sources)
-                       session->sources[0].cb(-1, 0, session->sources[0].cb_data);
-       } else {
-               /* Real sources, use g_poll() main loop. */
-               while (session->num_sources)
-                       sr_session_iteration(session);
+       /* Poll event sources until none are left. */
+       while (session->sources->len > 0) {
+               ret = sr_session_iteration(session);
+               if (ret != SR_OK)
+                       return ret;
        }
-
        return SR_OK;
 }
 
@@ -854,8 +873,8 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
  *
  * @param session The session to use. Must not be NULL.
  * @param pollfd The GPollFD.
- * @param[in] timeout Max time to wait before the callback is called,
- *              ignored if 0.
+ * @param[in] timeout Max time in ms to wait before the callback is called,
+ *                    or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  * @param poll_object Handle by which the source is identified
@@ -868,34 +887,36 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
                GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
                void *cb_data, gintptr poll_object, gboolean is_usb)
 {
-       struct source *new_sources, *s;
-       GPollFD *new_pollfds;
+       struct source src;
 
        if (!cb) {
                sr_err("%s: cb was NULL", __func__);
                return SR_ERR_ARG;
        }
-
        /* Note: cb_data can be NULL, that's not a bug. */
 
-       new_pollfds = g_realloc(session->pollfds,
-                       sizeof(GPollFD) * (session->num_sources + 1));
-       new_sources = g_realloc(session->sources, sizeof(struct source) *
-                       (session->num_sources + 1));
+       src.cb = cb;
+       src.cb_data = cb_data;
+       src.poll_object = poll_object;
+       src.is_usb = is_usb;
+       src.triggered = FALSE;
 
-       new_pollfds[session->num_sources] = *pollfd;
-       s = &new_sources[session->num_sources++];
-       if (timeout > 0)
-               s->due = g_get_monotonic_time() + INT64_C(1000) * timeout;
-       else
-               s->due = INT64_MAX;
-       s->cb = cb;
-       s->cb_data = cb_data;
-       s->poll_object = poll_object;
-       s->timeout = timeout;
-       s->is_usb = is_usb;
-       session->pollfds = new_pollfds;
-       session->sources = new_sources;
+       if (timeout >= 0) {
+               src.timeout = INT64_C(1000) * timeout;
+               src.due = g_get_monotonic_time() + src.timeout;
+       } else {
+               src.timeout = -1;
+               src.due = INT64_MAX;
+       }
+
+       if (pollfd) {
+               /* I/O source */
+               g_array_insert_val(session->sources, session->pollfds->len, src);
+               g_array_append_vals(session->pollfds, pollfd, 1);
+       } else {
+               /* Timer source */
+               g_array_append_val(session->sources, src);
+       }
 
        return SR_OK;
 }
@@ -906,7 +927,8 @@ SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
  * @param session The session to use. Must not be NULL.
  * @param fd The file descriptor.
  * @param events Events to check for.
- * @param timeout Max time to wait before the callback is called, ignored if 0.
+ * @param timeout Max time in ms to wait before the callback is called,
+ *                or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  *
@@ -920,20 +942,25 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd,
 {
        GPollFD p;
 
+       if (fd < 0 && timeout < 0) {
+               sr_err("Timer source without timeout would block indefinitely");
+               return SR_ERR_ARG;
+       }
        p.fd = fd;
        p.events = events;
        p.revents = 0;
 
-       return sr_session_source_add_internal(session, &p, timeout,
-                       cb, cb_data, fd, FALSE);
+       return sr_session_source_add_internal(session,
+               (fd < 0) ? NULL : &p, timeout, cb, cb_data, fd, FALSE);
 }
 
 /**
  * Add an event source for a GPollFD.
  *
  * @param session The session to use. Must not be NULL.
- * @param pollfd The GPollFD.
- * @param timeout Max time to wait before the callback is called, ignored if 0.
+ * @param pollfd The GPollFD. Must not be NULL.
+ * @param timeout Max time in ms to wait before the callback is called,
+ *                or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  *
@@ -946,6 +973,10 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
                GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
                void *cb_data)
 {
+       if (!pollfd) {
+               sr_err("%s: pollfd was NULL", __func__);
+               return SR_ERR_ARG;
+       }
        return sr_session_source_add_internal(session, pollfd, timeout,
                        cb, cb_data, (gintptr)pollfd, FALSE);
 }
@@ -956,7 +987,8 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
  * @param session The session to use. Must not be NULL.
  * @param channel The GIOChannel.
  * @param events Events to poll on.
- * @param timeout Max time to wait before the callback is called, ignored if 0.
+ * @param timeout Max time in ms to wait before the callback is called,
+ *                or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  *
@@ -971,7 +1003,7 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
 {
        GPollFD p;
 
-#ifdef _WIN32
+#ifdef G_OS_WIN32
        g_io_channel_win32_make_pollfd(channel, events, &p);
 #else
        p.fd = g_io_channel_unix_get_fd(channel);
@@ -994,34 +1026,18 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
  */
 static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object)
 {
-       unsigned int old;
+       unsigned int i;
 
-       if (!session->sources || !session->num_sources) {
-               sr_err("%s: sources was NULL", __func__);
-               return SR_ERR_BUG;
-       }
+       for (i = 0; i < session->sources->len; ++i) {
+               if (g_array_index(session->sources, struct source, i)
+                                       .poll_object == poll_object) {
 
-       for (old = 0; old < session->num_sources; old++) {
-               if (session->sources[old].poll_object == poll_object)
+                       g_array_remove_index(session->sources, i);
+                       if (i < session->pollfds->len)
+                               g_array_remove_index(session->pollfds, i);
                        break;
+               }
        }
-
-       /* fd not found, nothing to do */
-       if (old == session->num_sources)
-               return SR_OK;
-
-       session->num_sources--;
-
-       if (old != session->num_sources) {
-               memmove(&session->pollfds[old], &session->pollfds[old + 1],
-                       (session->num_sources - old) * sizeof(GPollFD));
-               memmove(&session->sources[old], &session->sources[old + 1],
-                       (session->num_sources - old) * sizeof(struct source));
-       }
-
-       session->pollfds = g_realloc(session->pollfds, sizeof(GPollFD) * session->num_sources);
-       session->sources = g_realloc(session->sources, sizeof(struct source) * session->num_sources);
-
        return SR_OK;
 }
 
@@ -1039,7 +1055,7 @@ static int _sr_session_source_remove(struct sr_session *session, gintptr poll_ob
  */
 SR_API int sr_session_source_remove(struct sr_session *session, int fd)
 {
-       return _sr_session_source_remove(session, (gintptr)fd);
+       return _sr_session_source_remove(session, fd);
 }
 
 /**
@@ -1047,7 +1063,7 @@ SR_API int sr_session_source_remove(struct sr_session *session, int fd)
  *
  * @param session The session to use. Must not be NULL.
  * @param pollfd The poll descriptor for which the source should be removed.
- *
+ *               Must not be NULL.
  * @return SR_OK upon success, SR_ERR_ARG upon invalid arguments, or
  *         SR_ERR_MALLOC upon memory allocation errors, SR_ERR_BUG upon
  *         internal errors.
@@ -1057,6 +1073,10 @@ SR_API int sr_session_source_remove(struct sr_session *session, int fd)
 SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
                GPollFD *pollfd)
 {
+       if (!pollfd) {
+               sr_err("%s: pollfd was NULL", __func__);
+               return SR_ERR_ARG;
+       }
        return _sr_session_source_remove(session, (gintptr)pollfd);
 }
 
@@ -1065,7 +1085,7 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
  *
  * @param session The session to use. Must not be NULL.
  * @param channel The channel for which the source should be removed.
- *
+ *                Must not be NULL.
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
  * @return SR_ERR_BUG Internal error.
@@ -1075,6 +1095,10 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
 SR_API int sr_session_source_remove_channel(struct sr_session *session,
                GIOChannel *channel)
 {
+       if (!channel) {
+               sr_err("%s: channel was NULL", __func__);
+               return SR_ERR_ARG;
+       }
        return _sr_session_source_remove(session, (gintptr)channel);
 }