]> sigrok.org Git - libsigrok.git/blob - src/session.c
session: Properly accumulate event source timeouts
[libsigrok.git] / src / session.c
1 /*
2  * This file is part of the libsigrok project.
3  *
4  * Copyright (C) 2010-2012 Bert Vermeulen <bert@biot.com>
5  *
6  * This program is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include <errno.h>
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <unistd.h>
24 #include <string.h>
25 #include <glib.h>
26 #include <libsigrok/libsigrok.h>
27 #include "libsigrok-internal.h"
28
29 /** @cond PRIVATE */
30 #define LOG_PREFIX "session"
31 /** @endcond */
32
33 /**
34  * @file
35  *
36  * Creating, using, or destroying libsigrok sessions.
37  */
38
39 /**
40  * @defgroup grp_session Session handling
41  *
42  * Creating, using, or destroying libsigrok sessions.
43  *
44  * @{
45  */
46
47 struct source {
48         int64_t due;
49         sr_receive_data_callback cb;
50         void *cb_data;
51
52         /* This is used to keep track of the object (fd, pollfd or channel) which is
53          * being polled and will be used to match the source when removing it again.
54          */
55         gintptr poll_object;
56
57         int timeout;
58         gboolean is_usb;
59 };
60
61 struct datafeed_callback {
62         sr_datafeed_callback cb;
63         void *cb_data;
64 };
65
66 /**
67  * Create a new session.
68  *
69  * @param ctx         The context in which to create the new session.
70  * @param new_session This will contain a pointer to the newly created
71  *                    session if the return value is SR_OK, otherwise the value
72  *                    is undefined and should not be used. Must not be NULL.
73  *
74  * @retval SR_OK Success.
75  * @retval SR_ERR_ARG Invalid argument.
76  *
77  * @since 0.4.0
78  */
79 SR_API int sr_session_new(struct sr_context *ctx,
80                 struct sr_session **new_session)
81 {
82         struct sr_session *session;
83
84         if (!new_session)
85                 return SR_ERR_ARG;
86
87         session = g_malloc0(sizeof(struct sr_session));
88
89         session->ctx = ctx;
90         session->running = FALSE;
91         session->abort_session = FALSE;
92         g_mutex_init(&session->stop_mutex);
93
94         *new_session = session;
95
96         return SR_OK;
97 }
98
99 /**
100  * Destroy a session.
101  * This frees up all memory used by the session.
102  *
103  * @param session The session to destroy. Must not be NULL.
104  *
105  * @retval SR_OK Success.
106  * @retval SR_ERR_ARG Invalid session passed.
107  *
108  * @since 0.4.0
109  */
110 SR_API int sr_session_destroy(struct sr_session *session)
111 {
112         if (!session) {
113                 sr_err("%s: session was NULL", __func__);
114                 return SR_ERR_ARG;
115         }
116
117         sr_session_dev_remove_all(session);
118         g_mutex_clear(&session->stop_mutex);
119         if (session->trigger)
120                 sr_trigger_free(session->trigger);
121
122         g_slist_free_full(session->owned_devs, (GDestroyNotify)sr_dev_inst_free);
123
124         g_free(session);
125
126         return SR_OK;
127 }
128
129 /**
130  * Remove all the devices from a session.
131  *
132  * The session itself (i.e., the struct sr_session) is not free'd and still
133  * exists after this function returns.
134  *
135  * @param session The session to use. Must not be NULL.
136  *
137  * @retval SR_OK Success.
138  * @retval SR_ERR_BUG Invalid session passed.
139  *
140  * @since 0.4.0
141  */
142 SR_API int sr_session_dev_remove_all(struct sr_session *session)
143 {
144         struct sr_dev_inst *sdi;
145         GSList *l;
146
147         if (!session) {
148                 sr_err("%s: session was NULL", __func__);
149                 return SR_ERR_ARG;
150         }
151
152         for (l = session->devs; l; l = l->next) {
153                 sdi = (struct sr_dev_inst *) l->data;
154                 sdi->session = NULL;
155         }
156
157         g_slist_free(session->devs);
158         session->devs = NULL;
159
160         return SR_OK;
161 }
162
163 /**
164  * Add a device instance to a session.
165  *
166  * @param session The session to add to. Must not be NULL.
167  * @param sdi The device instance to add to a session. Must not
168  *            be NULL. Also, sdi->driver and sdi->driver->dev_open must
169  *            not be NULL.
170  *
171  * @retval SR_OK Success.
172  * @retval SR_ERR_ARG Invalid argument.
173  *
174  * @since 0.4.0
175  */
176 SR_API int sr_session_dev_add(struct sr_session *session,
177                 struct sr_dev_inst *sdi)
178 {
179         int ret;
180
181         if (!sdi) {
182                 sr_err("%s: sdi was NULL", __func__);
183                 return SR_ERR_ARG;
184         }
185
186         if (!session) {
187                 sr_err("%s: session was NULL", __func__);
188                 return SR_ERR_ARG;
189         }
190
191         /* If sdi->session is not NULL, the device is already in this or
192          * another session. */
193         if (sdi->session) {
194                 sr_err("%s: already assigned to session", __func__);
195                 return SR_ERR_ARG;
196         }
197
198         /* If sdi->driver is NULL, this is a virtual device. */
199         if (!sdi->driver) {
200                 /* Just add the device, don't run dev_open(). */
201                 session->devs = g_slist_append(session->devs, (gpointer)sdi);
202                 sdi->session = session;
203                 return SR_OK;
204         }
205
206         /* sdi->driver is non-NULL (i.e. we have a real device). */
207         if (!sdi->driver->dev_open) {
208                 sr_err("%s: sdi->driver->dev_open was NULL", __func__);
209                 return SR_ERR_BUG;
210         }
211
212         session->devs = g_slist_append(session->devs, (gpointer)sdi);
213         sdi->session = session;
214
215         if (session->running) {
216                 /* Adding a device to a running session. Commit settings
217                  * and start acquisition on that device now. */
218                 if ((ret = sr_config_commit(sdi)) != SR_OK) {
219                         sr_err("Failed to commit device settings before "
220                                "starting acquisition in running session (%s)",
221                                sr_strerror(ret));
222                         return ret;
223                 }
224                 if ((ret = sdi->driver->dev_acquisition_start(sdi,
225                                                 (void *)sdi)) != SR_OK) {
226                         sr_err("Failed to start acquisition of device in "
227                                "running session (%s)", sr_strerror(ret));
228                         return ret;
229                 }
230         }
231
232         return SR_OK;
233 }
234
235 /**
236  * List all device instances attached to a session.
237  *
238  * @param session The session to use. Must not be NULL.
239  * @param devlist A pointer where the device instance list will be
240  *                stored on return. If no devices are in the session,
241  *                this will be NULL. Each element in the list points
242  *                to a struct sr_dev_inst *.
243  *                The list must be freed by the caller, but not the
244  *                elements pointed to.
245  *
246  * @retval SR_OK Success.
247  * @retval SR_ERR_ARG Invalid argument.
248  *
249  * @since 0.4.0
250  */
251 SR_API int sr_session_dev_list(struct sr_session *session, GSList **devlist)
252 {
253         if (!session)
254                 return SR_ERR_ARG;
255
256         if (!devlist)
257                 return SR_ERR_ARG;
258
259         *devlist = g_slist_copy(session->devs);
260
261         return SR_OK;
262 }
263
264 /**
265  * Remove all datafeed callbacks in a session.
266  *
267  * @param session The session to use. Must not be NULL.
268  *
269  * @retval SR_OK Success.
270  * @retval SR_ERR_ARG Invalid session passed.
271  *
272  * @since 0.4.0
273  */
274 SR_API int sr_session_datafeed_callback_remove_all(struct sr_session *session)
275 {
276         if (!session) {
277                 sr_err("%s: session was NULL", __func__);
278                 return SR_ERR_ARG;
279         }
280
281         g_slist_free_full(session->datafeed_callbacks, g_free);
282         session->datafeed_callbacks = NULL;
283
284         return SR_OK;
285 }
286
287 /**
288  * Add a datafeed callback to a session.
289  *
290  * @param session The session to use. Must not be NULL.
291  * @param cb Function to call when a chunk of data is received.
292  *           Must not be NULL.
293  * @param cb_data Opaque pointer passed in by the caller.
294  *
295  * @retval SR_OK Success.
296  * @retval SR_ERR_BUG No session exists.
297  *
298  * @since 0.3.0
299  */
300 SR_API int sr_session_datafeed_callback_add(struct sr_session *session,
301                 sr_datafeed_callback cb, void *cb_data)
302 {
303         struct datafeed_callback *cb_struct;
304
305         if (!session) {
306                 sr_err("%s: session was NULL", __func__);
307                 return SR_ERR_BUG;
308         }
309
310         if (!cb) {
311                 sr_err("%s: cb was NULL", __func__);
312                 return SR_ERR_ARG;
313         }
314
315         cb_struct = g_malloc0(sizeof(struct datafeed_callback));
316         cb_struct->cb = cb;
317         cb_struct->cb_data = cb_data;
318
319         session->datafeed_callbacks =
320             g_slist_append(session->datafeed_callbacks, cb_struct);
321
322         return SR_OK;
323 }
324
325 /**
326  * Get the trigger assigned to this session.
327  *
328  * @param session The session to use.
329  *
330  * @retval NULL Invalid (NULL) session was passed to the function.
331  * @retval other The trigger assigned to this session (can be NULL).
332  *
333  * @since 0.4.0
334  */
335 SR_API struct sr_trigger *sr_session_trigger_get(struct sr_session *session)
336 {
337         if (!session)
338                 return NULL;
339
340         return session->trigger;
341 }
342
343 /**
344  * Set the trigger of this session.
345  *
346  * @param session The session to use. Must not be NULL.
347  * @param trig The trigger to assign to this session. Can be NULL.
348  *
349  * @retval SR_OK Success.
350  * @retval SR_ERR_ARG Invalid argument.
351  *
352  * @since 0.4.0
353  */
354 SR_API int sr_session_trigger_set(struct sr_session *session, struct sr_trigger *trig)
355 {
356         if (!session)
357                 return SR_ERR_ARG;
358
359         session->trigger = trig;
360
361         return SR_OK;
362 }
363
364 static gboolean sr_session_check_aborted(struct sr_session *session)
365 {
366         gboolean stop;
367
368         g_mutex_lock(&session->stop_mutex);
369         stop = session->abort_session;
370         if (stop) {
371                 sr_session_stop_sync(session);
372                 /* But once is enough. */
373                 session->abort_session = FALSE;
374         }
375         g_mutex_unlock(&session->stop_mutex);
376
377         return stop;
378 }
379
380 /**
381  * Call every device in the current session's callback.
382  *
383  * For sessions not driven by select loops such as sr_session_run(),
384  * but driven by another scheduler, this can be used to poll the devices
385  * from within that scheduler.
386  *
387  * @param session The session to use. Must not be NULL.
388  * @retval SR_OK Success.
389  * @retval SR_ERR Error occurred.
390  */
391 static int sr_session_iteration(struct sr_session *session)
392 {
393         int64_t start_time;
394         int64_t stop_time;
395         int64_t min_due;
396         int64_t due;
397         unsigned int i;
398         int ret, timeout;
399         int revents;
400         gboolean stop_checked;
401         gboolean stopped;
402         struct source *source;
403         GPollFD *pollfd;
404         gintptr poll_object;
405 #ifdef HAVE_LIBUSB_1_0
406         int64_t usb_due;
407         struct timeval tv;
408 #endif
409         if (session->num_sources <= 0) {
410                 sr_session_check_aborted(session);
411                 return SR_OK;
412         }
413         start_time = g_get_monotonic_time();
414         min_due = INT64_MAX;
415
416         for (i = 0; i < session->num_sources; ++i) {
417                 if (session->sources[i].due < min_due)
418                         min_due = session->sources[i].due;
419         }
420 #ifdef HAVE_LIBUSB_1_0
421         usb_due = INT64_MAX;
422         if (session->ctx->usb_source_present) {
423                 ret = libusb_get_next_timeout(session->ctx->libusb_ctx, &tv);
424                 if (ret < 0) {
425                         sr_err("Error getting libusb timeout: %s",
426                                 libusb_error_name(ret));
427                         return SR_ERR;
428                 } else if (ret == 1) {
429                         usb_due = start_time + tv.tv_usec
430                                 + (int64_t)tv.tv_sec * G_USEC_PER_SEC;
431                         if (usb_due < min_due)
432                                 min_due = usb_due;
433                 }
434         }
435 #endif
436         if (min_due > start_time)
437                 timeout = (min_due == INT64_MAX) ? -1
438                         : MIN((min_due - start_time + 999) / 1000, INT_MAX);
439         else
440                 timeout = 0;
441
442         ret = g_poll(session->pollfds, session->num_sources, timeout);
443 #ifdef G_OS_UNIX
444         if (ret < 0 && errno != EINTR) {
445                 sr_err("Error in poll: %s", g_strerror(errno));
446                 return SR_ERR;
447         }
448 #else
449         if (ret < 0) {
450                 sr_err("Error in poll: %d", ret);
451                 return SR_ERR;
452         }
453 #endif
454         stop_time = g_get_monotonic_time();
455         stop_checked = FALSE;
456         stopped = FALSE;
457
458         for (i = 0; i < session->num_sources; i++) {
459                 source = &session->sources[i];
460                 pollfd = &session->pollfds[i];
461                 revents = (ret > 0) ? pollfd->revents : 0;
462                 due = source->due;
463 #ifdef HAVE_LIBUSB_1_0
464                 if (source->is_usb && usb_due < due)
465                         due = usb_due;
466 #endif
467                 if (revents == 0 && stop_time < due)
468                         continue;
469                 /*
470                  * The source may be gone after the callback returns,
471                  * so access any data now that needs accessing.
472                  */
473                 poll_object = source->poll_object;
474                 if (source->timeout > 0)
475                         source->due = stop_time + INT64_C(1000) * source->timeout;
476                 pollfd->revents = 0;
477                 /*
478                  * Invoke the source's callback on an event or timeout.
479                  */
480                 if (!source->cb(pollfd->fd, revents, source->cb_data))
481                         sr_session_source_remove(session, poll_object);
482                 /*
483                  * We want to take as little time as possible to stop
484                  * the session if we have been told to do so. Therefore,
485                  * we check the flag after processing every source, not
486                  * just once per main event loop.
487                  */
488                 if (!stopped) {
489                         stopped = sr_session_check_aborted(session);
490                         stop_checked = TRUE;
491                 }
492                 /* Restart loop as the sources list may have changed. */
493                 i = 0;
494         }
495         if (!stop_checked)
496                 sr_session_check_aborted(session);
497
498         return SR_OK;
499 }
500
501 static int verify_trigger(struct sr_trigger *trigger)
502 {
503         struct sr_trigger_stage *stage;
504         struct sr_trigger_match *match;
505         GSList *l, *m;
506
507         if (!trigger->stages) {
508                 sr_err("No trigger stages defined.");
509                 return SR_ERR;
510         }
511
512         sr_spew("Checking trigger:");
513         for (l = trigger->stages; l; l = l->next) {
514                 stage = l->data;
515                 if (!stage->matches) {
516                         sr_err("Stage %d has no matches defined.", stage->stage);
517                         return SR_ERR;
518                 }
519                 for (m = stage->matches; m; m = m->next) {
520                         match = m->data;
521                         if (!match->channel) {
522                                 sr_err("Stage %d match has no channel.", stage->stage);
523                                 return SR_ERR;
524                         }
525                         if (!match->match) {
526                                 sr_err("Stage %d match is not defined.", stage->stage);
527                                 return SR_ERR;
528                         }
529                         sr_spew("Stage %d match on channel %s, match %d", stage->stage,
530                                         match->channel->name, match->match);
531                 }
532         }
533
534         return SR_OK;
535 }
536
537 /**
538  * Start a session.
539  *
540  * @param session The session to use. Must not be NULL.
541  *
542  * @retval SR_OK Success.
543  * @retval SR_ERR_ARG Invalid session passed.
544  *
545  * @since 0.4.0
546  */
547 SR_API int sr_session_start(struct sr_session *session)
548 {
549         struct sr_dev_inst *sdi;
550         struct sr_channel *ch;
551         GSList *l, *c;
552         int enabled_channels, ret;
553
554         if (!session) {
555                 sr_err("%s: session was NULL", __func__);
556                 return SR_ERR_ARG;
557         }
558
559         if (!session->devs) {
560                 sr_err("%s: session->devs was NULL; a session "
561                        "cannot be started without devices.", __func__);
562                 return SR_ERR_ARG;
563         }
564
565         if (session->trigger && verify_trigger(session->trigger) != SR_OK)
566                 return SR_ERR;
567
568         sr_info("Starting.");
569
570         ret = SR_OK;
571         for (l = session->devs; l; l = l->next) {
572                 sdi = l->data;
573                 enabled_channels = 0;
574                 for (c = sdi->channels; c; c = c->next) {
575                         ch = c->data;
576                         if (ch->enabled) {
577                                 enabled_channels++;
578                                 break;
579                         }
580                 }
581                 if (enabled_channels == 0) {
582                         ret = SR_ERR;
583                         sr_err("%s using connection %s has no enabled channels!",
584                                         sdi->driver->name, sdi->connection_id);
585                         break;
586                 }
587
588                 if ((ret = sr_config_commit(sdi)) != SR_OK) {
589                         sr_err("Failed to commit device settings before "
590                                "starting acquisition (%s)", sr_strerror(ret));
591                         break;
592                 }
593                 if ((ret = sdi->driver->dev_acquisition_start(sdi, sdi)) != SR_OK) {
594                         sr_err("%s: could not start an acquisition "
595                                "(%s)", __func__, sr_strerror(ret));
596                         break;
597                 }
598         }
599
600         /* TODO: What if there are multiple devices? Which return code? */
601
602         return ret;
603 }
604
605 /**
606  * Run a session.
607  *
608  * @param session The session to use. Must not be NULL.
609  *
610  * @retval SR_OK Success.
611  * @retval SR_ERR_ARG Invalid session passed.
612  *
613  * @since 0.4.0
614  */
615 SR_API int sr_session_run(struct sr_session *session)
616 {
617         if (!session) {
618                 sr_err("%s: session was NULL", __func__);
619                 return SR_ERR_ARG;
620         }
621
622         if (!session->devs) {
623                 /* TODO: Actually the case? */
624                 sr_err("%s: session->devs was NULL; a session "
625                        "cannot be run without devices.", __func__);
626                 return SR_ERR_ARG;
627         }
628         session->running = TRUE;
629
630         sr_info("Running.");
631
632         /* Do we have real sources? */
633         if (session->num_sources == 1 && session->pollfds[0].fd == -1) {
634                 /* Dummy source, freewheel over it. */
635                 while (session->num_sources)
636                         session->sources[0].cb(-1, 0, session->sources[0].cb_data);
637         } else {
638                 /* Real sources, use g_poll() main loop. */
639                 while (session->num_sources)
640                         sr_session_iteration(session);
641         }
642
643         return SR_OK;
644 }
645
646 /**
647  * Stop a session.
648  *
649  * The session is stopped immediately, with all acquisition sessions stopped
650  * and hardware drivers cleaned up.
651  *
652  * This must be called from within the session thread, to prevent freeing
653  * resources that the session thread will try to use.
654  *
655  * @param session The session to use. Must not be NULL.
656  *
657  * @retval SR_OK Success.
658  * @retval SR_ERR_ARG Invalid session passed.
659  *
660  * @private
661  */
662 SR_PRIV int sr_session_stop_sync(struct sr_session *session)
663 {
664         struct sr_dev_inst *sdi;
665         GSList *l;
666
667         if (!session) {
668                 sr_err("%s: session was NULL", __func__);
669                 return SR_ERR_ARG;
670         }
671
672         sr_info("Stopping.");
673
674         for (l = session->devs; l; l = l->next) {
675                 sdi = l->data;
676                 if (sdi->driver) {
677                         if (sdi->driver->dev_acquisition_stop)
678                                 sdi->driver->dev_acquisition_stop(sdi, sdi);
679                 }
680         }
681         session->running = FALSE;
682
683         return SR_OK;
684 }
685
686 /**
687  * Stop a session.
688  *
689  * The session is stopped immediately, with all acquisition sessions being
690  * stopped and hardware drivers cleaned up.
691  *
692  * If the session is run in a separate thread, this function will not block
693  * until the session is finished executing. It is the caller's responsibility
694  * to wait for the session thread to return before assuming that the session is
695  * completely decommissioned.
696  *
697  * @param session The session to use. Must not be NULL.
698  *
699  * @retval SR_OK Success.
700  * @retval SR_ERR_ARG Invalid session passed.
701  *
702  * @since 0.4.0
703  */
704 SR_API int sr_session_stop(struct sr_session *session)
705 {
706         if (!session) {
707                 sr_err("%s: session was NULL", __func__);
708                 return SR_ERR_BUG;
709         }
710
711         g_mutex_lock(&session->stop_mutex);
712         session->abort_session = TRUE;
713         g_mutex_unlock(&session->stop_mutex);
714
715         return SR_OK;
716 }
717
718 /**
719  * Debug helper.
720  *
721  * @param packet The packet to show debugging information for.
722  */
723 static void datafeed_dump(const struct sr_datafeed_packet *packet)
724 {
725         const struct sr_datafeed_logic *logic;
726         const struct sr_datafeed_analog *analog;
727         const struct sr_datafeed_analog2 *analog2;
728
729         /* Please use the same order as in libsigrok.h. */
730         switch (packet->type) {
731         case SR_DF_HEADER:
732                 sr_dbg("bus: Received SR_DF_HEADER packet.");
733                 break;
734         case SR_DF_END:
735                 sr_dbg("bus: Received SR_DF_END packet.");
736                 break;
737         case SR_DF_META:
738                 sr_dbg("bus: Received SR_DF_META packet.");
739                 break;
740         case SR_DF_TRIGGER:
741                 sr_dbg("bus: Received SR_DF_TRIGGER packet.");
742                 break;
743         case SR_DF_LOGIC:
744                 logic = packet->payload;
745                 sr_dbg("bus: Received SR_DF_LOGIC packet (%" PRIu64 " bytes, "
746                        "unitsize = %d).", logic->length, logic->unitsize);
747                 break;
748         case SR_DF_ANALOG:
749                 analog = packet->payload;
750                 sr_dbg("bus: Received SR_DF_ANALOG packet (%d samples).",
751                        analog->num_samples);
752                 break;
753         case SR_DF_FRAME_BEGIN:
754                 sr_dbg("bus: Received SR_DF_FRAME_BEGIN packet.");
755                 break;
756         case SR_DF_FRAME_END:
757                 sr_dbg("bus: Received SR_DF_FRAME_END packet.");
758                 break;
759         case SR_DF_ANALOG2:
760                 analog2 = packet->payload;
761                 sr_dbg("bus: Received SR_DF_ANALOG2 packet (%d samples).",
762                        analog2->num_samples);
763                 break;
764         default:
765                 sr_dbg("bus: Received unknown packet type: %d.", packet->type);
766                 break;
767         }
768 }
769
770 /**
771  * Send a packet to whatever is listening on the datafeed bus.
772  *
773  * Hardware drivers use this to send a data packet to the frontend.
774  *
775  * @param sdi TODO.
776  * @param packet The datafeed packet to send to the session bus.
777  *
778  * @retval SR_OK Success.
779  * @retval SR_ERR_ARG Invalid argument.
780  *
781  * @private
782  */
783 SR_PRIV int sr_session_send(const struct sr_dev_inst *sdi,
784                 const struct sr_datafeed_packet *packet)
785 {
786         GSList *l;
787         struct datafeed_callback *cb_struct;
788         struct sr_datafeed_packet *packet_in, *packet_out;
789         struct sr_transform *t;
790         int ret;
791
792         if (!sdi) {
793                 sr_err("%s: sdi was NULL", __func__);
794                 return SR_ERR_ARG;
795         }
796
797         if (!packet) {
798                 sr_err("%s: packet was NULL", __func__);
799                 return SR_ERR_ARG;
800         }
801
802         if (!sdi->session) {
803                 sr_err("%s: session was NULL", __func__);
804                 return SR_ERR_BUG;
805         }
806
807         /*
808          * Pass the packet to the first transform module. If that returns
809          * another packet (instead of NULL), pass that packet to the next
810          * transform module in the list, and so on.
811          */
812         packet_in = (struct sr_datafeed_packet *)packet;
813         for (l = sdi->session->transforms; l; l = l->next) {
814                 t = l->data;
815                 sr_spew("Running transform module '%s'.", t->module->id);
816                 ret = t->module->receive(t, packet_in, &packet_out);
817                 if (ret < 0) {
818                         sr_err("Error while running transform module: %d.", ret);
819                         return SR_ERR;
820                 }
821                 if (!packet_out) {
822                         /*
823                          * If any of the transforms don't return an output
824                          * packet, abort.
825                          */
826                         sr_spew("Transform module didn't return a packet, aborting.");
827                         return SR_OK;
828                 } else {
829                         /*
830                          * Use this transform module's output packet as input
831                          * for the next transform module.
832                          */
833                         packet_in = packet_out;
834                 }
835         }
836         packet = packet_in;
837
838         /*
839          * If the last transform did output a packet, pass it to all datafeed
840          * callbacks.
841          */
842         for (l = sdi->session->datafeed_callbacks; l; l = l->next) {
843                 if (sr_log_loglevel_get() >= SR_LOG_DBG)
844                         datafeed_dump(packet);
845                 cb_struct = l->data;
846                 cb_struct->cb(sdi, packet, cb_struct->cb_data);
847         }
848
849         return SR_OK;
850 }
851
852 /**
853  * Add an event source for a file descriptor.
854  *
855  * @param session The session to use. Must not be NULL.
856  * @param pollfd The GPollFD.
857  * @param[in] timeout Max time to wait before the callback is called,
858  *              ignored if 0.
859  * @param cb Callback function to add. Must not be NULL.
860  * @param cb_data Data for the callback function. Can be NULL.
861  * @param poll_object Handle by which the source is identified
862  * @param is_usb TRUE for a libusb polling source
863  *
864  * @retval SR_OK Success.
865  * @retval SR_ERR_ARG Invalid argument.
866  */
867 SR_PRIV int sr_session_source_add_internal(struct sr_session *session,
868                 GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
869                 void *cb_data, gintptr poll_object, gboolean is_usb)
870 {
871         struct source *new_sources, *s;
872         GPollFD *new_pollfds;
873
874         if (!cb) {
875                 sr_err("%s: cb was NULL", __func__);
876                 return SR_ERR_ARG;
877         }
878
879         /* Note: cb_data can be NULL, that's not a bug. */
880
881         new_pollfds = g_realloc(session->pollfds,
882                         sizeof(GPollFD) * (session->num_sources + 1));
883         new_sources = g_realloc(session->sources, sizeof(struct source) *
884                         (session->num_sources + 1));
885
886         new_pollfds[session->num_sources] = *pollfd;
887         s = &new_sources[session->num_sources++];
888         if (timeout > 0)
889                 s->due = g_get_monotonic_time() + INT64_C(1000) * timeout;
890         else
891                 s->due = INT64_MAX;
892         s->cb = cb;
893         s->cb_data = cb_data;
894         s->poll_object = poll_object;
895         s->timeout = timeout;
896         s->is_usb = is_usb;
897         session->pollfds = new_pollfds;
898         session->sources = new_sources;
899
900         return SR_OK;
901 }
902
903 /**
904  * Add an event source for a file descriptor.
905  *
906  * @param session The session to use. Must not be NULL.
907  * @param fd The file descriptor.
908  * @param events Events to check for.
909  * @param timeout Max time to wait before the callback is called, ignored if 0.
910  * @param cb Callback function to add. Must not be NULL.
911  * @param cb_data Data for the callback function. Can be NULL.
912  *
913  * @retval SR_OK Success.
914  * @retval SR_ERR_ARG Invalid argument.
915  *
916  * @since 0.3.0
917  */
918 SR_API int sr_session_source_add(struct sr_session *session, int fd,
919                 int events, int timeout, sr_receive_data_callback cb, void *cb_data)
920 {
921         GPollFD p;
922
923         p.fd = fd;
924         p.events = events;
925         p.revents = 0;
926
927         return sr_session_source_add_internal(session, &p, timeout,
928                         cb, cb_data, fd, FALSE);
929 }
930
931 /**
932  * Add an event source for a GPollFD.
933  *
934  * @param session The session to use. Must not be NULL.
935  * @param pollfd The GPollFD.
936  * @param timeout Max time to wait before the callback is called, ignored if 0.
937  * @param cb Callback function to add. Must not be NULL.
938  * @param cb_data Data for the callback function. Can be NULL.
939  *
940  * @retval SR_OK Success.
941  * @retval SR_ERR_ARG Invalid argument.
942  *
943  * @since 0.3.0
944  */
945 SR_API int sr_session_source_add_pollfd(struct sr_session *session,
946                 GPollFD *pollfd, int timeout, sr_receive_data_callback cb,
947                 void *cb_data)
948 {
949         return sr_session_source_add_internal(session, pollfd, timeout,
950                         cb, cb_data, (gintptr)pollfd, FALSE);
951 }
952
953 /**
954  * Add an event source for a GIOChannel.
955  *
956  * @param session The session to use. Must not be NULL.
957  * @param channel The GIOChannel.
958  * @param events Events to poll on.
959  * @param timeout Max time to wait before the callback is called, ignored if 0.
960  * @param cb Callback function to add. Must not be NULL.
961  * @param cb_data Data for the callback function. Can be NULL.
962  *
963  * @retval SR_OK Success.
964  * @retval SR_ERR_ARG Invalid argument.
965  *
966  * @since 0.3.0
967  */
968 SR_API int sr_session_source_add_channel(struct sr_session *session,
969                 GIOChannel *channel, int events, int timeout,
970                 sr_receive_data_callback cb, void *cb_data)
971 {
972         GPollFD p;
973
974 #ifdef _WIN32
975         g_io_channel_win32_make_pollfd(channel, events, &p);
976 #else
977         p.fd = g_io_channel_unix_get_fd(channel);
978         p.events = events;
979         p.revents = 0;
980 #endif
981         return sr_session_source_add_internal(session, &p, timeout, cb,
982                         cb_data, (gintptr)channel, FALSE);
983 }
984
985 /**
986  * Remove the source belonging to the specified channel.
987  *
988  * @param session The session to use. Must not be NULL.
989  * @param poll_object The channel for which the source should be removed.
990  *
991  * @retval SR_OK Success
992  * @retval SR_ERR_ARG Invalid arguments
993  * @retval SR_ERR_BUG Internal error
994  */
995 static int _sr_session_source_remove(struct sr_session *session, gintptr poll_object)
996 {
997         unsigned int old;
998
999         if (!session->sources || !session->num_sources) {
1000                 sr_err("%s: sources was NULL", __func__);
1001                 return SR_ERR_BUG;
1002         }
1003
1004         for (old = 0; old < session->num_sources; old++) {
1005                 if (session->sources[old].poll_object == poll_object)
1006                         break;
1007         }
1008
1009         /* fd not found, nothing to do */
1010         if (old == session->num_sources)
1011                 return SR_OK;
1012
1013         session->num_sources--;
1014
1015         if (old != session->num_sources) {
1016                 memmove(&session->pollfds[old], &session->pollfds[old + 1],
1017                         (session->num_sources - old) * sizeof(GPollFD));
1018                 memmove(&session->sources[old], &session->sources[old + 1],
1019                         (session->num_sources - old) * sizeof(struct source));
1020         }
1021
1022         session->pollfds = g_realloc(session->pollfds, sizeof(GPollFD) * session->num_sources);
1023         session->sources = g_realloc(session->sources, sizeof(struct source) * session->num_sources);
1024
1025         return SR_OK;
1026 }
1027
1028 /**
1029  * Remove the source belonging to the specified file descriptor.
1030  *
1031  * @param session The session to use. Must not be NULL.
1032  * @param fd The file descriptor for which the source should be removed.
1033  *
1034  * @retval SR_OK Success
1035  * @retval SR_ERR_ARG Invalid argument
1036  * @retval SR_ERR_BUG Internal error.
1037  *
1038  * @since 0.3.0
1039  */
1040 SR_API int sr_session_source_remove(struct sr_session *session, int fd)
1041 {
1042         return _sr_session_source_remove(session, (gintptr)fd);
1043 }
1044
1045 /**
1046  * Remove the source belonging to the specified poll descriptor.
1047  *
1048  * @param session The session to use. Must not be NULL.
1049  * @param pollfd The poll descriptor for which the source should be removed.
1050  *
1051  * @return SR_OK upon success, SR_ERR_ARG upon invalid arguments, or
1052  *         SR_ERR_MALLOC upon memory allocation errors, SR_ERR_BUG upon
1053  *         internal errors.
1054  *
1055  * @since 0.2.0
1056  */
1057 SR_API int sr_session_source_remove_pollfd(struct sr_session *session,
1058                 GPollFD *pollfd)
1059 {
1060         return _sr_session_source_remove(session, (gintptr)pollfd);
1061 }
1062
1063 /**
1064  * Remove the source belonging to the specified channel.
1065  *
1066  * @param session The session to use. Must not be NULL.
1067  * @param channel The channel for which the source should be removed.
1068  *
1069  * @retval SR_OK Success.
1070  * @retval SR_ERR_ARG Invalid argument.
1071  * @return SR_ERR_BUG Internal error.
1072  *
1073  * @since 0.2.0
1074  */
1075 SR_API int sr_session_source_remove_channel(struct sr_session *session,
1076                 GIOChannel *channel)
1077 {
1078         return _sr_session_source_remove(session, (gintptr)channel);
1079 }
1080
1081 static void copy_src(struct sr_config *src, struct sr_datafeed_meta *meta_copy)
1082 {
1083         g_variant_ref(src->data);
1084         meta_copy->config = g_slist_append(meta_copy->config,
1085                                            g_memdup(src, sizeof(struct sr_config)));
1086 }
1087
1088 SR_PRIV int sr_packet_copy(const struct sr_datafeed_packet *packet,
1089                 struct sr_datafeed_packet **copy)
1090 {
1091         const struct sr_datafeed_meta *meta;
1092         struct sr_datafeed_meta *meta_copy;
1093         const struct sr_datafeed_logic *logic;
1094         struct sr_datafeed_logic *logic_copy;
1095         const struct sr_datafeed_analog *analog;
1096         struct sr_datafeed_analog *analog_copy;
1097         uint8_t *payload;
1098
1099         *copy = g_malloc0(sizeof(struct sr_datafeed_packet));
1100         (*copy)->type = packet->type;
1101
1102         switch (packet->type) {
1103         case SR_DF_TRIGGER:
1104         case SR_DF_END:
1105                 /* No payload. */
1106                 break;
1107         case SR_DF_HEADER:
1108                 payload = g_malloc(sizeof(struct sr_datafeed_header));
1109                 memcpy(payload, packet->payload, sizeof(struct sr_datafeed_header));
1110                 (*copy)->payload = payload;
1111                 break;
1112         case SR_DF_META:
1113                 meta = packet->payload;
1114                 meta_copy = g_malloc0(sizeof(struct sr_datafeed_meta));
1115                 g_slist_foreach(meta->config, (GFunc)copy_src, meta_copy->config);
1116                 (*copy)->payload = meta_copy;
1117                 break;
1118         case SR_DF_LOGIC:
1119                 logic = packet->payload;
1120                 logic_copy = g_malloc(sizeof(logic));
1121                 logic_copy->length = logic->length;
1122                 logic_copy->unitsize = logic->unitsize;
1123                 memcpy(logic_copy->data, logic->data, logic->length * logic->unitsize);
1124                 (*copy)->payload = logic_copy;
1125                 break;
1126         case SR_DF_ANALOG:
1127                 analog = packet->payload;
1128                 analog_copy = g_malloc(sizeof(analog));
1129                 analog_copy->channels = g_slist_copy(analog->channels);
1130                 analog_copy->num_samples = analog->num_samples;
1131                 analog_copy->mq = analog->mq;
1132                 analog_copy->unit = analog->unit;
1133                 analog_copy->mqflags = analog->mqflags;
1134                 memcpy(analog_copy->data, analog->data,
1135                                 analog->num_samples * sizeof(float));
1136                 (*copy)->payload = analog_copy;
1137                 break;
1138         default:
1139                 sr_err("Unknown packet type %d", packet->type);
1140                 return SR_ERR;
1141         }
1142
1143         return SR_OK;
1144 }
1145
1146 void sr_packet_free(struct sr_datafeed_packet *packet)
1147 {
1148         const struct sr_datafeed_meta *meta;
1149         const struct sr_datafeed_logic *logic;
1150         const struct sr_datafeed_analog *analog;
1151         struct sr_config *src;
1152         GSList *l;
1153
1154         switch (packet->type) {
1155         case SR_DF_TRIGGER:
1156         case SR_DF_END:
1157                 /* No payload. */
1158                 break;
1159         case SR_DF_HEADER:
1160                 /* Payload is a simple struct. */
1161                 g_free((void *)packet->payload);
1162                 break;
1163         case SR_DF_META:
1164                 meta = packet->payload;
1165                 for (l = meta->config; l; l = l->next) {
1166                         src = l->data;
1167                         g_variant_unref(src->data);
1168                         g_free(src);
1169                 }
1170                 g_slist_free(meta->config);
1171                 g_free((void *)packet->payload);
1172                 break;
1173         case SR_DF_LOGIC:
1174                 logic = packet->payload;
1175                 g_free(logic->data);
1176                 g_free((void *)packet->payload);
1177                 break;
1178         case SR_DF_ANALOG:
1179                 analog = packet->payload;
1180                 g_slist_free(analog->channels);
1181                 g_free(analog->data);
1182                 g_free((void *)packet->payload);
1183                 break;
1184         default:
1185                 sr_err("Unknown packet type %d", packet->type);
1186         }
1187         g_free(packet);
1188
1189 }
1190
1191 /** @} */