]> sigrok.org Git - libsigrok.git/blobdiff - src/session.c
session: Unify handling of I/O and timer sources
[libsigrok.git] / src / session.c
index 11da1378b56508c67c804d66bbb3169f429ea7ad..aaeda1cc09ed199eebd59dd79078b6e0e6ef9bcf 100644 (file)
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include <errno.h>
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
 #include <string.h>
 #include <glib.h>
-#include "libsigrok.h"
+#include <libsigrok/libsigrok.h>
 #include "libsigrok-internal.h"
 
 /** @cond PRIVATE */
@@ -44,7 +45,8 @@
  */
 
 struct source {
-       int timeout;
+       int64_t timeout;        /* microseconds */
+       int64_t due;            /* microseconds */
        sr_receive_data_callback cb;
        void *cb_data;
 
@@ -52,6 +54,9 @@ struct source {
         * being polled and will be used to match the source when removing it again.
         */
        gintptr poll_object;
+
+       gboolean is_usb;
+       gboolean triggered;
 };
 
 struct datafeed_callback {
@@ -62,6 +67,7 @@ struct datafeed_callback {
 /**
  * Create a new session.
  *
+ * @param ctx         The context in which to create the new session.
  * @param new_session This will contain a pointer to the newly created
  *                    session if the return value is SR_OK, otherwise the value
  *                    is undefined and should not be used. Must not be NULL.
@@ -71,7 +77,8 @@ struct datafeed_callback {
  *
  * @since 0.4.0
  */
-SR_API int sr_session_new(struct sr_session **new_session)
+SR_API int sr_session_new(struct sr_context *ctx,
+               struct sr_session **new_session)
 {
        struct sr_session *session;
 
@@ -80,9 +87,11 @@ SR_API int sr_session_new(struct sr_session **new_session)
 
        session = g_malloc0(sizeof(struct sr_session));
 
-       session->source_timeout = -1;
-       session->running = FALSE;
-       session->abort_session = FALSE;
+       session->ctx = ctx;
+
+       session->sources = g_array_new(FALSE, FALSE, sizeof(struct source));
+       session->pollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD));
+
        g_mutex_init(&session->stop_mutex);
 
        *new_session = session;
@@ -113,6 +122,11 @@ SR_API int sr_session_destroy(struct sr_session *session)
        if (session->trigger)
                sr_trigger_free(session->trigger);
 
+       g_slist_free_full(session->owned_devs, (GDestroyNotify)sr_dev_inst_free);
+
+       g_array_unref(session->pollfds);
+       g_array_unref(session->sources);
+
        g_free(session);
 
        return SR_OK;
@@ -304,9 +318,7 @@ SR_API int sr_session_datafeed_callback_add(struct sr_session *session,
                return SR_ERR_ARG;
        }
 
-       if (!(cb_struct = g_try_malloc0(sizeof(struct datafeed_callback))))
-               return SR_ERR_MALLOC;
-
+       cb_struct = g_malloc0(sizeof(struct datafeed_callback));
        cb_struct->cb = cb;
        cb_struct->cb_data = cb_data;
 
@@ -316,76 +328,197 @@ SR_API int sr_session_datafeed_callback_add(struct sr_session *session,
        return SR_OK;
 }
 
+/**
+ * Get the trigger assigned to this session.
+ *
+ * @param session The session to use.
+ *
+ * @retval NULL Invalid (NULL) session was passed to the function.
+ * @retval other The trigger assigned to this session (can be NULL).
+ *
+ * @since 0.4.0
+ */
 SR_API struct sr_trigger *sr_session_trigger_get(struct sr_session *session)
 {
+       if (!session)
+               return NULL;
+
        return session->trigger;
 }
 
+/**
+ * Set the trigger of this session.
+ *
+ * @param session The session to use. Must not be NULL.
+ * @param trig The trigger to assign to this session. Can be NULL.
+ *
+ * @retval SR_OK Success.
+ * @retval SR_ERR_ARG Invalid argument.
+ *
+ * @since 0.4.0
+ */
 SR_API int sr_session_trigger_set(struct sr_session *session, struct sr_trigger *trig)
 {
+       if (!session)
+               return SR_ERR_ARG;
+
        session->trigger = trig;
 
        return SR_OK;
 }
 
+static gboolean sr_session_check_aborted(struct sr_session *session)
+{
+       gboolean stop;
+
+       g_mutex_lock(&session->stop_mutex);
+       stop = session->abort_session;
+       if (stop) {
+               sr_session_stop_sync(session);
+               /* But once is enough. */
+               session->abort_session = FALSE;
+       }
+       g_mutex_unlock(&session->stop_mutex);
+
+       return stop;
+}
+
+static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object);
+
 /**
- * Call every device in the current session's callback.
- *
- * For sessions not driven by select loops such as sr_session_run(),
- * but driven by another scheduler, this can be used to poll the devices
- * from within that scheduler.
+ * Poll the session's event sources.
  *
  * @param session The session to use. Must not be NULL.
- * @param block If TRUE, this call will wait for any of the session's
- *              sources to fire an event on the file descriptors, or
- *              any of their timeouts to activate. In other words, this
- *              can be used as a select loop.
- *              If FALSE, all sources have their callback run, regardless
- *              of file descriptor or timeout status.
- *
  * @retval SR_OK Success.
- * @retval SR_ERR Error occured.
+ * @retval SR_ERR Error occurred.
  */
-static int sr_session_iteration(struct sr_session *session, gboolean block)
+static int sr_session_iteration(struct sr_session *session)
 {
+       int64_t start_time, stop_time, min_due, due;
+       int timeout_ms;
        unsigned int i;
        int ret;
-
-       ret = g_poll(session->pollfds, session->num_sources,
-                       block ? session->source_timeout : 0);
-       for (i = 0; i < session->num_sources; i++) {
-               if (session->pollfds[i].revents > 0 || (ret == 0
-                       && session->source_timeout == session->sources[i].timeout)) {
-                       /*
-                        * Invoke the source's callback on an event,
-                        * or if the poll timed out and this source
-                        * asked for that timeout.
-                        */
-                       if (!session->sources[i].cb(session->pollfds[i].fd,
-                                       session->pollfds[i].revents,
-                                       session->sources[i].cb_data))
-                               sr_session_source_remove(session,
-                                               session->sources[i].poll_object);
+       int fd;
+       int revents;
+       gboolean triggered, stopped;
+       struct source *source;
+       GPollFD *pollfd;
+       gintptr poll_object;
+#ifdef HAVE_LIBUSB_1_0
+       int64_t usb_due;
+       struct timeval tv;
+#endif
+       if (session->sources->len == 0) {
+               sr_session_check_aborted(session);
+               return SR_OK;
+       }
+       start_time = g_get_monotonic_time();
+       min_due = INT64_MAX;
+
+       for (i = 0; i < session->sources->len; ++i) {
+               source = &g_array_index(session->sources, struct source, i);
+               if (source->due < min_due)
+                       min_due = source->due;
+               source->triggered = FALSE;
+       }
+#ifdef HAVE_LIBUSB_1_0
+       usb_due = INT64_MAX;
+       if (session->ctx->usb_source_present) {
+               ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv);
+               if (ret < 0) {
+                       sr_err("Error getting libusb timeout: %s",
+                               libusb_error_name(ret));
+                       return SR_ERR;
+               } else if (ret == 1) {
+                       usb_due = start_time + tv.tv_usec
+                               + (int64_t)tv.tv_sec * G_USEC_PER_SEC;
+                       if (usb_due < min_due)
+                               min_due = usb_due;
                }
+       }
+#endif
+       if (min_due == INT64_MAX)
+               timeout_ms = -1;
+       else if (min_due > start_time)
+               timeout_ms = MIN((min_due - start_time + 999) / 1000, INT_MAX);
+       else
+               timeout_ms = 0;
+
+       ret = g_poll((GPollFD *)session->pollfds->data,
+                       session->pollfds->len, timeout_ms);
+#ifdef G_OS_UNIX
+       if (ret < 0 && errno != EINTR) {
+               sr_err("Error in poll: %s", g_strerror(errno));
+               return SR_ERR;
+       }
+#else
+       if (ret < 0) {
+               sr_err("Error in poll: %d", ret);
+               return SR_ERR;
+       }
+#endif
+       stop_time = g_get_monotonic_time();
+       triggered = FALSE;
+       stopped = FALSE;
+
+       for (i = 0; i < session->sources->len; ++i) {
+               source = &g_array_index(session->sources, struct source, i);
+               if (source->triggered)
+                       continue; /* already handled */
+
+               poll_object = source->poll_object;
+               fd = (int)poll_object;
+               revents = 0;
+
+               if (i < session->pollfds->len) {
+                       pollfd = &g_array_index(session->pollfds, GPollFD, i);
+                       fd = pollfd->fd;
+                       if (ret > 0)
+                               revents = pollfd->revents;
+               }
+               if (ret > 0 && revents == 0)
+                       continue; /* skip timeouts if any I/O event occurred */
+
+               due = source->due;
+#ifdef HAVE_LIBUSB_1_0
+               if (source->is_usb && usb_due < due)
+                       due = usb_due;
+#endif
+               if (revents == 0 && stop_time < due)
+                       continue;
+               /*
+                * The source may be gone after the callback returns,
+                * so access any data now that needs accessing.
+                */
+               if (source->timeout >= 0)
+                       source->due = stop_time + source->timeout;
+               source->triggered = TRUE;
+               triggered = TRUE;
+               /*
+                * Invoke the source's callback on an event or timeout.
+                */
+               if (!source->cb(fd, revents, source->cb_data))
+                       _sr_session_source_remove(session, poll_object);
                /*
                 * We want to take as little time as possible to stop
                 * the session if we have been told to do so. Therefore,
                 * we check the flag after processing every source, not
                 * just once per main event loop.
                 */
-               g_mutex_lock(&session->stop_mutex);
-               if (session->abort_session) {
-                       sr_session_stop_sync(session);
-                       /* But once is enough. */
-                       session->abort_session = FALSE;
-               }
-               g_mutex_unlock(&session->stop_mutex);
+               if (!stopped)
+                       stopped = sr_session_check_aborted(session);
+
+               /* Restart loop as the sources list may have changed. */
+               i = 0;
        }
 
+       /* Check for abort at least once per iteration. */
+       if (!triggered)
+               sr_session_check_aborted(session);
+
        return SR_OK;
 }
 
-
 static int verify_trigger(struct sr_trigger *trigger)
 {
        struct sr_trigger_stage *stage;
@@ -421,6 +554,7 @@ static int verify_trigger(struct sr_trigger *trigger)
 
        return SR_OK;
 }
+
 /**
  * Start a session.
  *
@@ -496,11 +630,14 @@ SR_API int sr_session_start(struct sr_session *session)
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid session passed.
+ * @retval SR_ERR Error during event processing.
  *
  * @since 0.4.0
  */
 SR_API int sr_session_run(struct sr_session *session)
 {
+       int ret;
+
        if (!session) {
                sr_err("%s: session was NULL", __func__);
                return SR_ERR_ARG;
@@ -516,17 +653,12 @@ SR_API int sr_session_run(struct sr_session *session)
 
        sr_info("Running.");
 
-       /* Do we have real sources? */
-       if (session->num_sources == 1 && session->pollfds[0].fd == -1) {
-               /* Dummy source, freewheel over it. */
-               while (session->num_sources)
-                       session->sources[0].cb(-1, 0, session->sources[0].cb_data);
-       } else {
-               /* Real sources, use g_poll() main loop. */
-               while (session->num_sources)
-                       sr_session_iteration(session, TRUE);
+       /* Poll event sources until none are left. */
+       while (session->sources->len > 0) {
+               ret = sr_session_iteration(session);
+               if (ret != SR_OK)
+                       return ret;
        }
-
        return SR_OK;
 }
 
@@ -611,17 +743,22 @@ static void datafeed_dump(const struct sr_datafeed_packet *packet)
 {
        const struct sr_datafeed_logic *logic;
        const struct sr_datafeed_analog *analog;
+       const struct sr_datafeed_analog2 *analog2;
 
+       /* Please use the same order as in libsigrok.h. */
        switch (packet->type) {
        case SR_DF_HEADER:
                sr_dbg("bus: Received SR_DF_HEADER packet.");
                break;
-       case SR_DF_TRIGGER:
-               sr_dbg("bus: Received SR_DF_TRIGGER packet.");
+       case SR_DF_END:
+               sr_dbg("bus: Received SR_DF_END packet.");
                break;
        case SR_DF_META:
                sr_dbg("bus: Received SR_DF_META packet.");
                break;
+       case SR_DF_TRIGGER:
+               sr_dbg("bus: Received SR_DF_TRIGGER packet.");
+               break;
        case SR_DF_LOGIC:
                logic = packet->payload;
                sr_dbg("bus: Received SR_DF_LOGIC packet (%" PRIu64 " bytes, "
@@ -632,15 +769,17 @@ static void datafeed_dump(const struct sr_datafeed_packet *packet)
                sr_dbg("bus: Received SR_DF_ANALOG packet (%d samples).",
                       analog->num_samples);
                break;
-       case SR_DF_END:
-               sr_dbg("bus: Received SR_DF_END packet.");
-               break;
        case SR_DF_FRAME_BEGIN:
                sr_dbg("bus: Received SR_DF_FRAME_BEGIN packet.");
                break;
        case SR_DF_FRAME_END:
                sr_dbg("bus: Received SR_DF_FRAME_END packet.");
                break;
+       case SR_DF_ANALOG2:
+               analog2 = packet->payload;
+               sr_dbg("bus: Received SR_DF_ANALOG2 packet (%d samples).",
+                      analog2->num_samples);
+               break;
        default:
                sr_dbg("bus: Received unknown packet type: %d.", packet->type);
                break;
@@ -665,6 +804,9 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
 {
        GSList *l;
        struct datafeed_callback *cb_struct;
+       struct sr_datafeed_packet *packet_in, *packet_out;
+       struct sr_transform *t;
+       int ret;
 
        if (!sdi) {
                sr_err("%s: sdi was NULL", __func__);
@@ -681,6 +823,41 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
                return SR_ERR_BUG;
        }
 
+       /*
+        * Pass the packet to the first transform module. If that returns
+        * another packet (instead of NULL), pass that packet to the next
+        * transform module in the list, and so on.
+        */
+       packet_in = (struct sr_datafeed_packet *)packet;
+       for (l = sdi->session->transforms; l; l = l->next) {
+               t = l->data;
+               sr_spew("Running transform module '%s'.", t->module->id);
+               ret = t->module->receive(t, packet_in, &packet_out);
+               if (ret < 0) {
+                       sr_err("Error while running transform module: %d.", ret);
+                       return SR_ERR;
+               }
+               if (!packet_out) {
+                       /*
+                        * If any of the transforms don't return an output
+                        * packet, abort.
+                        */
+                       sr_spew("Transform module didn't return a packet, aborting.");
+                       return SR_OK;
+               } else {
+                       /*
+                        * Use this transform module's output packet as input
+                        * for the next transform module.
+                        */
+                       packet_in = packet_out;
+               }
+       }
+       packet = packet_in;
+
+       /*
+        * If the last transform did output a packet, pass it to all datafeed
+        * callbacks.
+        */
        for (l = sdi->session->datafeed_callbacks; l; l = l->next) {
                if (sr_log_loglevel_get() >= SR_LOG_DBG)
                        datafeed_dump(packet);
@@ -696,55 +873,50 @@ SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
  *
  * @param session The session to use. Must not be NULL.
  * @param pollfd The GPollFD.
- * @param[in] timeout Max time to wait before the callback is called,
- *              ignored if 0.
+ * @param[in] timeout Max time in ms to wait before the callback is called,
+ *                    or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
- * @param poll_object TODO.
+ * @param poll_object Handle by which the source is identified
+ * @param is_usb TRUE for a libusb polling source
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
- * @retval SR_ERR_MALLOC Memory allocation error.
  */
-static int _sr_session_source_add(struct sr_session *session, GPollFD *pollfd,
-               int timeout, sr_receive_data_callback cb, void *cb_data, gintptr poll_object)
+SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
+               GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
+               void *cb_data, gintptr poll_object, gboolean is_usb)
 {
-       struct source *new_sources, *s;
-       GPollFD *new_pollfds;
+       struct source src;
 
        if (!cb) {
                sr_err("%s: cb was NULL", __func__);
                return SR_ERR_ARG;
        }
-
        /* Note: cb_data can be NULL, that's not a bug. */
 
-       new_pollfds = g_try_realloc(session->pollfds,
-                       sizeof(GPollFD) * (session->num_sources + 1));
-       if (!new_pollfds) {
-               sr_err("%s: new_pollfds malloc failed", __func__);
-               return SR_ERR_MALLOC;
-       }
+       src.cb = cb;
+       src.cb_data = cb_data;
+       src.poll_object = poll_object;
+       src.is_usb = is_usb;
+       src.triggered = FALSE;
 
-       new_sources = g_try_realloc(session->sources, sizeof(struct source) *
-                       (session->num_sources + 1));
-       if (!new_sources) {
-               sr_err("%s: new_sources malloc failed", __func__);
-               return SR_ERR_MALLOC;
+       if (timeout >= 0) {
+               src.timeout = INT64_C(1000) * timeout;
+               src.due = g_get_monotonic_time() + src.timeout;
+       } else {
+               src.timeout = -1;
+               src.due = INT64_MAX;
        }
 
-       new_pollfds[session->num_sources] = *pollfd;
-       s = &new_sources[session->num_sources++];
-       s->timeout = timeout;
-       s->cb = cb;
-       s->cb_data = cb_data;
-       s->poll_object = poll_object;
-       session->pollfds = new_pollfds;
-       session->sources = new_sources;
-
-       if (timeout != session->source_timeout && timeout > 0
-           && (session->source_timeout == -1 || timeout < session->source_timeout))
-               session->source_timeout = timeout;
+       if (pollfd) {
+               /* I/O source */
+               g_array_insert_val(session->sources, session->pollfds->len, src);
+               g_array_append_vals(session->pollfds, pollfd, 1);
+       } else {
+               /* Timer source */
+               g_array_append_val(session->sources, src);
+       }
 
        return SR_OK;
 }
@@ -755,13 +927,13 @@ static int _sr_session_source_add(struct sr_session *session, GPollFD *pollfd,
  * @param session The session to use. Must not be NULL.
  * @param fd The file descriptor.
  * @param events Events to check for.
- * @param timeout Max time to wait before the callback is called, ignored if 0.
+ * @param timeout Max time in ms to wait before the callback is called,
+ *                or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
- * @retval SR_ERR_MALLOC Memory allocation error.
  *
  * @since 0.3.0
  */
@@ -770,24 +942,30 @@ SR_API int sr_session_source_add(struct sr_session *session, int fd,
 {
        GPollFD p;
 
+       if (fd < 0 && timeout < 0) {
+               sr_err("Timer source without timeout would block indefinitely");
+               return SR_ERR_ARG;
+       }
        p.fd = fd;
        p.events = events;
+       p.revents = 0;
 
-       return _sr_session_source_add(session, &p, timeout, cb, cb_data, (gintptr)fd);
+       return sr_session_source_add_internal(session,
+               (fd < 0) ? NULL : &p, timeout, cb, cb_data, fd, FALSE);
 }
 
 /**
  * Add an event source for a GPollFD.
  *
  * @param session The session to use. Must not be NULL.
- * @param pollfd The GPollFD.
- * @param timeout Max time to wait before the callback is called, ignored if 0.
+ * @param pollfd The GPollFD. Must not be NULL.
+ * @param timeout Max time in ms to wait before the callback is called,
+ *                or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
- * @retval SR_ERR_MALLOC Memory allocation error.
  *
  * @since 0.3.0
  */
@@ -795,8 +973,12 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
                GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
                void *cb_data)
 {
-       return _sr_session_source_add(session, pollfd, timeout, cb,
-                       cb_data, (gintptr)pollfd);
+       if (!pollfd) {
+               sr_err("%s: pollfd was NULL", __func__);
+               return SR_ERR_ARG;
+       }
+       return sr_session_source_add_internal(session, pollfd, timeout,
+                       cb, cb_data, (gintptr)pollfd, FALSE);
 }
 
 /**
@@ -805,13 +987,13 @@ SR_API int sr_session_source_add_pollfd(struct sr_session *session,
  * @param session The session to use. Must not be NULL.
  * @param channel The GIOChannel.
  * @param events Events to poll on.
- * @param timeout Max time to wait before the callback is called, ignored if 0.
+ * @param timeout Max time in ms to wait before the callback is called,
+ *                or -1 to wait indefinitely.
  * @param cb Callback function to add. Must not be NULL.
  * @param cb_data Data for the callback function. Can be NULL.
  *
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
- * @retval SR_ERR_MALLOC Memory allocation error.
  *
  * @since 0.3.0
  */
@@ -821,73 +1003,41 @@ SR_API int sr_session_source_add_channel(struct sr_session *session,
 {
        GPollFD p;
 
-#ifdef _WIN32
+#ifdef G_OS_WIN32
        g_io_channel_win32_make_pollfd(channel, events, &p);
 #else
        p.fd = g_io_channel_unix_get_fd(channel);
        p.events = events;
+       p.revents = 0;
 #endif
-
-       return _sr_session_source_add(session, &p, timeout, cb, cb_data, (gintptr)channel);
+       return sr_session_source_add_internal(session, &p, timeout, cb,
+                       cb_data, (gintptr)channel, FALSE);
 }
 
 /**
  * Remove the source belonging to the specified channel.
  *
- * @todo Add more error checks and logging.
- *
  * @param session The session to use. Must not be NULL.
  * @param poll_object The channel for which the source should be removed.
  *
  * @retval SR_OK Success
  * @retval SR_ERR_ARG Invalid arguments
- * @retval SR_ERR_MALLOC Memory allocation error
  * @retval SR_ERR_BUG Internal error
  */
 static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object)
 {
-       struct source *new_sources;
-       GPollFD *new_pollfds;
-       unsigned int old;
+       unsigned int i;
 
-       if (!session->sources || !session->num_sources) {
-               sr_err("%s: sources was NULL", __func__);
-               return SR_ERR_BUG;
-       }
+       for (i = 0; i < session->sources->len; ++i) {
+               if (g_array_index(session->sources, struct source, i)
+                                       .poll_object == poll_object) {
 
-       for (old = 0; old < session->num_sources; old++) {
-               if (session->sources[old].poll_object == poll_object)
+                       g_array_remove_index(session->sources, i);
+                       if (i < session->pollfds->len)
+                               g_array_remove_index(session->pollfds, i);
                        break;
+               }
        }
-
-       /* fd not found, nothing to do */
-       if (old == session->num_sources)
-               return SR_OK;
-
-       session->num_sources -= 1;
-
-       if (old != session->num_sources) {
-               memmove(&session->pollfds[old], &session->pollfds[old+1],
-                       (session->num_sources - old) * sizeof(GPollFD));
-               memmove(&session->sources[old], &session->sources[old+1],
-                       (session->num_sources - old) * sizeof(struct source));
-       }
-
-       new_pollfds = g_try_realloc(session->pollfds, sizeof(GPollFD) * session->num_sources);
-       if (!new_pollfds && session->num_sources > 0) {
-               sr_err("%s: new_pollfds malloc failed", __func__);
-               return SR_ERR_MALLOC;
-       }
-
-       new_sources = g_try_realloc(session->sources, sizeof(struct source) * session->num_sources);
-       if (!new_sources && session->num_sources > 0) {
-               sr_err("%s: new_sources malloc failed", __func__);
-               return SR_ERR_MALLOC;
-       }
-
-       session->pollfds = new_pollfds;
-       session->sources = new_sources;
-
        return SR_OK;
 }
 
@@ -899,14 +1049,13 @@ static int _sr_session_source_remove(struct sr_session *session, gintptr poll_ob
  *
  * @retval SR_OK Success
  * @retval SR_ERR_ARG Invalid argument
- * @retval SR_ERR_MALLOC Memory allocation error.
  * @retval SR_ERR_BUG Internal error.
  *
  * @since 0.3.0
  */
 SR_API int sr_session_source_remove(struct sr_session *session, int fd)
 {
-       return _sr_session_source_remove(session, (gintptr)fd);
+       return _sr_session_source_remove(session, fd);
 }
 
 /**
@@ -914,7 +1063,7 @@ SR_API int sr_session_source_remove(struct sr_session *session, int fd)
  *
  * @param session The session to use. Must not be NULL.
  * @param pollfd The poll descriptor for which the source should be removed.
- *
+ *               Must not be NULL.
  * @return SR_OK upon success, SR_ERR_ARG upon invalid arguments, or
  *         SR_ERR_MALLOC upon memory allocation errors, SR_ERR_BUG upon
  *         internal errors.
@@ -924,6 +1073,10 @@ SR_API int sr_session_source_remove(struct sr_session *session, int fd)
 SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
                GPollFD *pollfd)
 {
+       if (!pollfd) {
+               sr_err("%s: pollfd was NULL", __func__);
+               return SR_ERR_ARG;
+       }
        return _sr_session_source_remove(session, (gintptr)pollfd);
 }
 
@@ -932,10 +1085,9 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
  *
  * @param session The session to use. Must not be NULL.
  * @param channel The channel for which the source should be removed.
- *
+ *                Must not be NULL.
  * @retval SR_OK Success.
  * @retval SR_ERR_ARG Invalid argument.
- * @retval SR_ERR_MALLOC Memory allocation error.
  * @return SR_ERR_BUG Internal error.
  *
  * @since 0.2.0
@@ -943,18 +1095,18 @@ SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
 SR_API int sr_session_source_remove_channel(struct sr_session *session,
                GIOChannel *channel)
 {
+       if (!channel) {
+               sr_err("%s: channel was NULL", __func__);
+               return SR_ERR_ARG;
+       }
        return _sr_session_source_remove(session, (gintptr)channel);
 }
 
-static void *copy_src(struct sr_config *src)
+static void copy_src(struct sr_config *src, struct sr_datafeed_meta *meta_copy)
 {
-       struct sr_config *new_src;
-
-       new_src = g_malloc(sizeof(struct sr_config));
-       memcpy(new_src, src, sizeof(struct sr_config));
        g_variant_ref(src->data);
-
-       return new_src;
+       meta_copy->config = g_slist_append(meta_copy->config,
+                                          g_memdup(src, sizeof(struct sr_config)));
 }
 
 SR_PRIV int sr_packet_copy(const struct sr_datafeed_packet *packet,
@@ -983,9 +1135,8 @@ SR_PRIV int sr_packet_copy(const struct sr_datafeed_packet *packet,
                break;
        case SR_DF_META:
                meta = packet->payload;
-               meta_copy = g_malloc(sizeof(struct sr_datafeed_meta));
-               meta_copy->config = g_slist_copy_deep(meta->config,
-                               (GCopyFunc)copy_src, NULL);
+               meta_copy = g_malloc0(sizeof(struct sr_datafeed_meta));
+               g_slist_foreach(meta->config, (GFunc)copy_src, meta_copy->config);
                (*copy)->payload = meta_copy;
                break;
        case SR_DF_LOGIC: