From: Gerhard Sittig Date: Sun, 26 Dec 2021 06:42:27 +0000 (+0100) Subject: session: introduce the public "send EOF" API routine X-Git-Url: https://sigrok.org/gitaction?a=commitdiff_plain;h=819e508972da02a0dcff7f81178f0283546a9afd;p=libsigrokdecode.git session: introduce the public "send EOF" API routine Introduce the public srd_session_send_eof() routine which is backed by the internal srd_inst_send_eof() helper. Applications can tell decoders when the input stream of sample data is exhausted, so that decoders can optionally "flush" their previously accumulated information when desired. Previous implementations just kept decoders in blocking .wait() calls and somehow terminated them at arbitrary times afterwards. When EOF is sent to the decoder session, then calls to the .wait() method which typically are done from .decode() or its descendents will end with an EOFError Python exception. Termination of .decode() with the EOFError exception is non-fatal for backwards compatibility and to keep the convenience for current decoder implementations. Decoders can either catch the exception, or use context managers, or do nothing. This API extension is motivated by research for bug #1581 and provides the infrastructure to address bug #292. Decoders need to remain careful, and should not claim that protocol activities would have completed when their end condition was not even seen in the capture. Marking incomplete activities with warnings is the most appropriate reaction. --- diff --git a/instance.c b/instance.c index c30b46b..067e98d 100644 --- a/instance.c +++ b/instance.c @@ -426,6 +426,7 @@ SRD_API struct srd_decoder_inst *srd_inst_new(struct srd_session *sess, di->got_new_samples = FALSE; di->handled_all_samples = FALSE; di->want_wait_terminate = FALSE; + di->communicate_eof = FALSE; di->decoder_state = SRD_OK; /* @@ -500,6 +501,7 @@ static void srd_inst_reset_state(struct srd_decoder_inst *di) di->got_new_samples = FALSE; di->handled_all_samples = FALSE; di->want_wait_terminate = FALSE; + di->communicate_eof = FALSE; di->decoder_state = SRD_OK; /* Conditions and mutex got reset after joining the thread. */ } @@ -1058,6 +1060,17 @@ static gpointer di_thread(gpointer data) py_res = PyObject_CallMethod(di->py_inst, "decode", NULL); srd_dbg("%s: decode() terminated.", di->inst_id); + /* + * Termination with an EOFError exception is accepted to simplify + * the implementation of decoders and for backwards compatibility. + */ + if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_EOFError)) { + srd_dbg("%s: ignoring EOFError during decode() execution.", + di->inst_id); + PyErr_Clear(); + if (!py_res) + py_res = Py_None; + } if (!py_res) di->decoder_state = SRD_ERR; @@ -1281,6 +1294,64 @@ SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di) return di->decoder_state; } +/** + * Communicate the end of the stream of sample data to a decoder instance. + * + * @param[in] di The decoder instance to call. Must not be NULL. + * + * @return SRD_OK upon success, a (negative) error code otherwise. + * + * @private + */ +SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di) +{ + GSList *l; + int ret; + + if (!di) + return SRD_ERR_ARG; + + /* + * Send EOF to the caller specified decoder instance. Only + * communicate EOF to currently executing decoders. Never + * started or previously finished is perfectly acceptable. + */ + srd_dbg("End of sample data: instance %s.", di->inst_id); + if (!di->thread_handle) { + srd_dbg("No worker thread, nothing to do."); + return SRD_OK; + } + + /* Signal the thread about the EOF condition. */ + g_mutex_lock(&di->data_mutex); + di->inbuf = NULL; + di->inbuflen = 0; + di->got_new_samples = TRUE; + di->handled_all_samples = FALSE; + di->want_wait_terminate = TRUE; + di->communicate_eof = TRUE; + g_cond_signal(&di->got_new_samples_cond); + g_mutex_unlock(&di->data_mutex); + + /* Only return from here when the condition was handled. */ + g_mutex_lock(&di->data_mutex); + while (!di->handled_all_samples && !di->want_wait_terminate) + g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex); + g_mutex_unlock(&di->data_mutex); + + /* Flush the decoder instance which handled EOF. */ + srd_inst_flush(di); + + /* Pass EOF to all stacked decoders. */ + for (l = di->next_di; l; l = l->next) { + ret = srd_inst_send_eof(l->data); + if (ret != SRD_OK) + return ret; + } + + return SRD_OK; +} + /** * Terminate current decoder work, prepare for re-use on new input data. * diff --git a/libsigrokdecode-internal.h b/libsigrokdecode-internal.h index 453e14b..0e3cb64 100644 --- a/libsigrokdecode-internal.h +++ b/libsigrokdecode-internal.h @@ -94,6 +94,7 @@ SRD_PRIV int srd_inst_decode(struct srd_decoder_inst *di, const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize); SRD_PRIV int process_samples_until_condition_match(struct srd_decoder_inst *di, gboolean *found_match); SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di); +SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di); SRD_PRIV int srd_inst_terminate_reset(struct srd_decoder_inst *di); SRD_PRIV void srd_inst_free(struct srd_decoder_inst *di); SRD_PRIV void srd_inst_free_all(struct srd_session *sess); diff --git a/libsigrokdecode.h b/libsigrokdecode.h index 3ce6ae5..cf6479c 100644 --- a/libsigrokdecode.h +++ b/libsigrokdecode.h @@ -291,6 +291,9 @@ struct srd_decoder_inst { /** Requests termination of wait() and decode(). */ gboolean want_wait_terminate; + /** Requests that .wait() terminates a Python iteration. */ + gboolean communicate_eof; + /** Indicates the current state of the decoder stack. */ int decoder_state; @@ -353,6 +356,7 @@ SRD_API int srd_session_metadata_set(struct srd_session *sess, int key, SRD_API int srd_session_send(struct srd_session *sess, uint64_t abs_start_samplenum, uint64_t abs_end_samplenum, const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize); +SRD_API int srd_session_send_eof(struct srd_session *sess); SRD_API int srd_session_terminate_reset(struct srd_session *sess); SRD_API int srd_session_destroy(struct srd_session *sess); SRD_API int srd_pd_output_callback_add(struct srd_session *sess, diff --git a/session.c b/session.c index 386fb71..ad08407 100644 --- a/session.c +++ b/session.c @@ -278,6 +278,32 @@ SRD_API int srd_session_send(struct srd_session *sess, return SRD_OK; } +/** + * Communicate the end of the stream of sample data to the session. + * + * @param[in] sess The session. Must not be NULL. + * + * @return SRD_OK upon success. A (negative) error code otherwise. + * + * @since 0.6.0 + */ +SRD_API int srd_session_send_eof(struct srd_session *sess) +{ + GSList *d; + int ret; + + if (!sess) + return SRD_ERR_ARG; + + for (d = sess->di_list; d; d = d->next) { + ret = srd_inst_send_eof(d->data); + if (ret != SRD_OK) + return ret; + } + + return SRD_OK; +} + /** * Terminate currently executing decoders in a session, reset internal state. * diff --git a/type_decoder.c b/type_decoder.c index 0a92a45..6c6eab6 100644 --- a/type_decoder.c +++ b/type_decoder.c @@ -1089,6 +1089,22 @@ static PyObject *Decoder_wait(PyObject *self, PyObject *args) /* Signal the main thread that we handled all samples. */ g_cond_signal(&di->handled_all_samples_cond); + /* + * When EOF was provided externally, communicate the + * Python EOFError exception to .decode() and return + * from the .wait() method call. This is motivated by + * the use of Python context managers, so that .decode() + * methods can "close" incompletely accumulated data + * when the sample data is exhausted. + */ + if (di->communicate_eof) { + srd_dbg("%s: %s: Raising EOF from wait().", + di->inst_id, __func__); + g_mutex_unlock(&di->data_mutex); + PyErr_SetString(PyExc_EOFError, "samples exhausted"); + goto err; + } + /* * When termination of wait() and decode() was requested, * then exit the loop after releasing the mutex.