]> sigrok.org Git - libsigrokdecode.git/commitdiff
session: introduce the public "send EOF" API routine
authorGerhard Sittig <redacted>
Sun, 26 Dec 2021 06:42:27 +0000 (07:42 +0100)
committerGerhard Sittig <redacted>
Sun, 26 Dec 2021 12:45:09 +0000 (13:45 +0100)
Introduce the public srd_session_send_eof() routine which is backed by
the internal srd_inst_send_eof() helper. Applications can tell decoders
when the input stream of sample data is exhausted, so that decoders can
optionally "flush" their previously accumulated information when
desired. Previous implementations just kept decoders in blocking .wait()
calls and somehow terminated them at arbitrary times afterwards.

When EOF is sent to the decoder session, then calls to the .wait()
method which typically are done from .decode() or its descendents will
end with an EOFError Python exception. Termination of .decode() with the
EOFError exception is non-fatal for backwards compatibility and to keep
the convenience for current decoder implementations. Decoders can either
catch the exception, or use context managers, or do nothing.

This API extension is motivated by research for bug #1581 and provides
the infrastructure to address bug #292. Decoders need to remain careful,
and should not claim that protocol activities would have completed when
their end condition was not even seen in the capture. Marking incomplete
activities with warnings is the most appropriate reaction.

instance.c
libsigrokdecode-internal.h
libsigrokdecode.h
session.c
type_decoder.c

index c30b46b7ecd37348c17e9007efce47951214ebe8..067e98da44ca9797c1a4c2be5ad28083b9759860 100644 (file)
@@ -426,6 +426,7 @@ SRD_API struct srd_decoder_inst *srd_inst_new(struct srd_session *sess,
        di->got_new_samples = FALSE;
        di->handled_all_samples = FALSE;
        di->want_wait_terminate = FALSE;
+       di->communicate_eof = FALSE;
        di->decoder_state = SRD_OK;
 
        /*
@@ -500,6 +501,7 @@ static void srd_inst_reset_state(struct srd_decoder_inst *di)
        di->got_new_samples = FALSE;
        di->handled_all_samples = FALSE;
        di->want_wait_terminate = FALSE;
+       di->communicate_eof = FALSE;
        di->decoder_state = SRD_OK;
        /* Conditions and mutex got reset after joining the thread. */
 }
@@ -1058,6 +1060,17 @@ static gpointer di_thread(gpointer data)
        py_res = PyObject_CallMethod(di->py_inst, "decode", NULL);
        srd_dbg("%s: decode() terminated.", di->inst_id);
 
+       /*
+        * Termination with an EOFError exception is accepted to simplify
+        * the implementation of decoders and for backwards compatibility.
+        */
+       if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_EOFError)) {
+               srd_dbg("%s: ignoring EOFError during decode() execution.",
+                       di->inst_id);
+               PyErr_Clear();
+               if (!py_res)
+                       py_res = Py_None;
+       }
        if (!py_res)
                di->decoder_state = SRD_ERR;
 
@@ -1281,6 +1294,64 @@ SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di)
        return di->decoder_state;
 }
 
+/**
+ * Communicate the end of the stream of sample data to a decoder instance.
+ *
+ * @param[in] di The decoder instance to call. Must not be NULL.
+ *
+ * @return SRD_OK upon success, a (negative) error code otherwise.
+ *
+ * @private
+ */
+SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di)
+{
+       GSList *l;
+       int ret;
+
+       if (!di)
+               return SRD_ERR_ARG;
+
+       /*
+        * Send EOF to the caller specified decoder instance. Only
+        * communicate EOF to currently executing decoders. Never
+        * started or previously finished is perfectly acceptable.
+        */
+       srd_dbg("End of sample data: instance %s.", di->inst_id);
+       if (!di->thread_handle) {
+               srd_dbg("No worker thread, nothing to do.");
+               return SRD_OK;
+       }
+
+       /* Signal the thread about the EOF condition. */
+       g_mutex_lock(&di->data_mutex);
+       di->inbuf = NULL;
+       di->inbuflen = 0;
+       di->got_new_samples = TRUE;
+       di->handled_all_samples = FALSE;
+       di->want_wait_terminate = TRUE;
+       di->communicate_eof = TRUE;
+       g_cond_signal(&di->got_new_samples_cond);
+       g_mutex_unlock(&di->data_mutex);
+
+       /* Only return from here when the condition was handled. */
+       g_mutex_lock(&di->data_mutex);
+       while (!di->handled_all_samples && !di->want_wait_terminate)
+               g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex);
+       g_mutex_unlock(&di->data_mutex);
+
+       /* Flush the decoder instance which handled EOF. */
+       srd_inst_flush(di);
+
+       /* Pass EOF to all stacked decoders. */
+       for (l = di->next_di; l; l = l->next) {
+               ret = srd_inst_send_eof(l->data);
+               if (ret != SRD_OK)
+                       return ret;
+       }
+
+       return SRD_OK;
+}
+
 /**
  * Terminate current decoder work, prepare for re-use on new input data.
  *
index 453e14bf57a3bf45964ff8a59a92c78431dc76d1..0e3cb64231c3af30431056293fc6b60cf28288dd 100644 (file)
@@ -94,6 +94,7 @@ SRD_PRIV int srd_inst_decode(struct srd_decoder_inst *di,
                const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize);
 SRD_PRIV int process_samples_until_condition_match(struct srd_decoder_inst *di, gboolean *found_match);
 SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di);
+SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di);
 SRD_PRIV int srd_inst_terminate_reset(struct srd_decoder_inst *di);
 SRD_PRIV void srd_inst_free(struct srd_decoder_inst *di);
 SRD_PRIV void srd_inst_free_all(struct srd_session *sess);
index 3ce6ae5ee37b798c53c583d7b30cec1a645a2b7a..cf6479c99ff7fb7d6102b7634a3afc3b085d97c7 100644 (file)
@@ -291,6 +291,9 @@ struct srd_decoder_inst {
        /** Requests termination of wait() and decode(). */
        gboolean want_wait_terminate;
 
+       /** Requests that .wait() terminates a Python iteration. */
+       gboolean communicate_eof;
+
        /** Indicates the current state of the decoder stack. */
        int decoder_state;
 
@@ -353,6 +356,7 @@ SRD_API int srd_session_metadata_set(struct srd_session *sess, int key,
 SRD_API int srd_session_send(struct srd_session *sess,
                uint64_t abs_start_samplenum, uint64_t abs_end_samplenum,
                const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize);
+SRD_API int srd_session_send_eof(struct srd_session *sess);
 SRD_API int srd_session_terminate_reset(struct srd_session *sess);
 SRD_API int srd_session_destroy(struct srd_session *sess);
 SRD_API int srd_pd_output_callback_add(struct srd_session *sess,
index 386fb710f5cb7a73506a15e5ea047a7b4f715cbb..ad084074889deee4367928d6b20d504fa15343ad 100644 (file)
--- a/session.c
+++ b/session.c
@@ -278,6 +278,32 @@ SRD_API int srd_session_send(struct srd_session *sess,
        return SRD_OK;
 }
 
+/**
+ * Communicate the end of the stream of sample data to the session.
+ *
+ * @param[in] sess The session. Must not be NULL.
+ *
+ * @return SRD_OK upon success. A (negative) error code otherwise.
+ *
+ * @since 0.6.0
+ */
+SRD_API int srd_session_send_eof(struct srd_session *sess)
+{
+       GSList *d;
+       int ret;
+
+       if (!sess)
+               return SRD_ERR_ARG;
+
+       for (d = sess->di_list; d; d = d->next) {
+               ret = srd_inst_send_eof(d->data);
+               if (ret != SRD_OK)
+                       return ret;
+       }
+
+       return SRD_OK;
+}
+
 /**
  * Terminate currently executing decoders in a session, reset internal state.
  *
index 0a92a45df7b1510c98ede2c226779cf8d667c435..6c6eab6b22f584cbcbb0324431cd53e7a0e988fd 100644 (file)
@@ -1089,6 +1089,22 @@ static PyObject *Decoder_wait(PyObject *self, PyObject *args)
                /* Signal the main thread that we handled all samples. */
                g_cond_signal(&di->handled_all_samples_cond);
 
+               /*
+                * When EOF was provided externally, communicate the
+                * Python EOFError exception to .decode() and return
+                * from the .wait() method call. This is motivated by
+                * the use of Python context managers, so that .decode()
+                * methods can "close" incompletely accumulated data
+                * when the sample data is exhausted.
+                */
+               if (di->communicate_eof) {
+                       srd_dbg("%s: %s: Raising EOF from wait().",
+                               di->inst_id, __func__);
+                       g_mutex_unlock(&di->data_mutex);
+                       PyErr_SetString(PyExc_EOFError, "samples exhausted");
+                       goto err;
+               }
+
                /*
                 * When termination of wait() and decode() was requested,
                 * then exit the loop after releasing the mutex.