/* module_sigrokdecode.c */
extern SRD_PRIV PyObject *srd_logic_type;
+static void srd_inst_join_decode_thread(struct srd_decoder_inst *di);
+static void srd_inst_reset_state(struct srd_decoder_inst *di);
+SRD_PRIV void oldpins_array_free(struct srd_decoder_inst *di);
+
/** @endcond */
/**
di->thread_handle = NULL;
di->got_new_samples = FALSE;
di->handled_all_samples = FALSE;
+ di->want_wait_terminate = FALSE;
+
+ /*
+ * Strictly speaking initialization of statically allocated
+ * condition and mutex variables (or variables allocated on the
+ * stack) is not required, but won't harm either. Explicitly
+ * running init() will better match subsequent clear() calls.
+ */
+ g_cond_init(&di->got_new_samples_cond);
+ g_cond_init(&di->handled_all_samples_cond);
+ g_mutex_init(&di->data_mutex);
/* Instance takes input from a frontend by default. */
sess->di_list = g_slist_append(sess->di_list, di);
return di;
}
+static void srd_inst_join_decode_thread(struct srd_decoder_inst *di)
+{
+ if (!di)
+ return;
+ if (!di->thread_handle)
+ return;
+
+ srd_dbg("%s: Joining decoder thread.", di->inst_id);
+
+ /*
+ * Terminate potentially running threads which still
+ * execute the decoder instance's decode() method.
+ */
+ srd_dbg("%s: Raising want_term, sending got_new.", di->inst_id);
+ g_mutex_lock(&di->data_mutex);
+ di->want_wait_terminate = TRUE;
+ g_cond_signal(&di->got_new_samples_cond);
+ g_mutex_unlock(&di->data_mutex);
+
+ srd_dbg("%s: Running join().", di->inst_id);
+ (void)g_thread_join(di->thread_handle);
+ srd_dbg("%s: Call to join() done.", di->inst_id);
+ di->thread_handle = NULL;
+
+ /*
+ * Reset condition and mutex variables, such that next
+ * operations on them will find them in a clean state.
+ */
+ g_cond_clear(&di->got_new_samples_cond);
+ g_cond_init(&di->got_new_samples_cond);
+ g_cond_clear(&di->handled_all_samples_cond);
+ g_cond_init(&di->handled_all_samples_cond);
+ g_mutex_clear(&di->data_mutex);
+ g_mutex_init(&di->data_mutex);
+}
+
+static void srd_inst_reset_state(struct srd_decoder_inst *di)
+{
+ if (!di)
+ return;
+
+ srd_dbg("%s: Resetting decoder state.", di->inst_id);
+
+ /*
+ * Reset internal state of the decoder.
+ */
+ condition_list_free(di);
+ match_array_free(di);
+ di->abs_start_samplenum = 0;
+ di->abs_end_samplenum = 0;
+ di->inbuf = NULL;
+ di->inbuflen = 0;
+ di->abs_cur_samplenum = 0;
+ oldpins_array_free(di);
+ di->got_new_samples = FALSE;
+ di->handled_all_samples = FALSE;
+ di->want_wait_terminate = FALSE;
+ /* Conditions and mutex got reset after joining the thread. */
+}
+
/**
* Stack a decoder instance on top of another.
*
g_string_free(s, TRUE);
}
+SRD_PRIV void oldpins_array_free(struct srd_decoder_inst *di)
+{
+ if (!di)
+ return;
+ if (!di->old_pins_array)
+ return;
+
+ srd_dbg("%s: Releasing initial pin state.", di->inst_id);
+
+ g_array_free(di->old_pins_array, TRUE);
+ di->old_pins_array = NULL;
+}
+
/** @private */
SRD_PRIV int srd_inst_start(struct srd_decoder_inst *di)
{
*
* This function returns if there is an error, or when a match is found, or
* when all samples have been processed (whether a match was found or not).
+ * This function immediately terminates when the decoder's wait() method
+ * invocation shall get terminated.
*
* @param di The decoder instance to use. Must not be NULL.
* @param found_match Will be set to TRUE if at least one condition matched,
if (!di || !found_match)
return SRD_ERR_ARG;
+ *found_match = FALSE;
+ if (di->want_wait_terminate)
+ return SRD_OK;
+
/* Check if any of the current condition(s) match. */
while (TRUE) {
/* Feed the (next chunk of the) buffer to find_match(). */
{
PyObject *py_res;
struct srd_decoder_inst *di;
+ int wanted_term;
if (!data)
return NULL;
di = data;
- /* Call self.decode(). Only returns if the PD throws an exception. */
+ srd_dbg("%s: Starting thread routine for decoder.", di->inst_id);
+
+ /*
+ * Call self.decode(). Only returns if the PD throws an exception.
+ * "Regular" termination of the decode() method is not expected.
+ */
Py_IncRef(di->py_inst);
- if (!(py_res = PyObject_CallMethod(di->py_inst, "decode", NULL))) {
+ srd_dbg("%s: Calling decode() method.", di->inst_id);
+ py_res = PyObject_CallMethod(di->py_inst, "decode", NULL);
+ srd_dbg("%s: decode() method terminated.", di->inst_id);
+
+ /*
+ * Make sure to unblock potentially pending srd_inst_decode()
+ * calls in application threads after the decode() method might
+ * have terminated, while it neither has processed sample data
+ * nor has terminated upon request. This happens e.g. when "need
+ * a samplerate to decode" exception is thrown.
+ */
+ g_mutex_lock(&di->data_mutex);
+ wanted_term = di->want_wait_terminate;
+ di->want_wait_terminate = TRUE;
+ di->handled_all_samples = TRUE;
+ g_cond_signal(&di->handled_all_samples_cond);
+ g_mutex_unlock(&di->data_mutex);
+
+ /*
+ * Check for the termination cause of the decode() method.
+ * Though this is mostly for information.
+ */
+ if (!py_res && wanted_term) {
+ /*
+ * Silently ignore errors upon return from decode() calls
+ * when termination was requested. Terminate the thread
+ * which executed this instance's decode() logic.
+ */
+ srd_dbg("%s: Thread done (!res, want_term).", di->inst_id);
+ PyErr_Clear();
+ return NULL;
+ }
+ if (!py_res) {
+ /*
+ * The decode() invocation terminated unexpectedly. Have
+ * the back trace printed, and terminate the thread which
+ * executed the decode() method.
+ */
+ srd_dbg("%s: decode() terminated unrequested.", di->inst_id);
srd_exception_catch("Protocol decoder instance %s: ", di->inst_id);
- exit(1); /* TODO: Proper shutdown. This is a hack. */
+ srd_dbg("%s: Thread done (!res, !want_term).", di->inst_id);
return NULL;
}
+
+ /*
+ * TODO: By design the decode() method is not supposed to terminate.
+ * Nevertheless we have the thread joined, and srd backend calls to
+ * decode() will re-start another thread transparently.
+ */
+ srd_dbg("%s: decode() terminated (req %d).", di->inst_id, wanted_term);
Py_DecRef(py_res);
+ PyErr_Clear();
+
+ srd_dbg("%s: Thread done (with res).", di->inst_id);
return NULL;
}
di->inbuflen = inbuflen;
di->got_new_samples = TRUE;
di->handled_all_samples = FALSE;
+ di->want_wait_terminate = FALSE;
/* Signal the thread that we have new data. */
g_cond_signal(&di->got_new_samples_cond);
/* When all samples in this chunk were handled, return. */
g_mutex_lock(&di->data_mutex);
- while (!di->handled_all_samples)
+ while (!di->handled_all_samples && !di->want_wait_terminate)
g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex);
g_mutex_unlock(&di->data_mutex);
}
srd_dbg("Freeing instance %s", di->inst_id);
+ srd_inst_join_decode_thread(di);
+ srd_inst_reset_state(di);
+
Py_DecRef(di->py_inst);
g_free(di->inst_id);
g_free(di->dec_channelmap);
return SRD_ERR;
}
+ /*
+ * Return an error condition from .wait() when termination is
+ * requested, such that decode() will terminate.
+ */
+ if (di->want_wait_terminate) {
+ srd_dbg("%s: %s: Skip (want_term).", di->inst_id, __func__);
+ return SRD_ERR;
+ }
+
/* Parse the argument of self.wait() into 'py_conds'. */
if (!PyArg_ParseTuple(args, "O", &py_conds)) {
/* Let Python raise this exception. */
}
ret = set_new_condition_list(self, args);
-
+ if (ret < 0) {
+ srd_dbg("%s: %s: Aborting wait().", di->inst_id, __func__);
+ return NULL;
+ }
if (ret == 9999) {
/* Empty condition list, automatic match. */
PyObject_SetAttrString(di->py_inst, "matched", Py_None);
}
while (1) {
- /* Wait for new samples to process. */
+ /* Wait for new samples to process, or termination request. */
g_mutex_lock(&di->data_mutex);
- while (!di->got_new_samples)
+ while (!di->got_new_samples && !di->want_wait_terminate)
g_cond_wait(&di->got_new_samples_cond, &di->data_mutex);
- /* Check whether any of the current condition(s) match. */
+ /*
+ * Check whether any of the current condition(s) match.
+ * Arrange for termination requests to take a code path which
+ * won't find new samples to process, pretends to have processed
+ * previously stored samples, and returns to the main thread,
+ * while the termination request still gets signalled.
+ */
+ found_match = FALSE;
ret = process_samples_until_condition_match(di, &found_match);
/* If there's a match, set self.samplenum etc. and return. */
/* Signal the main thread that we handled all samples. */
g_cond_signal(&di->handled_all_samples_cond);
+ /*
+ * When termination of wait() and decode() was requested,
+ * then exit the loop after releasing the mutex.
+ */
+ if (di->want_wait_terminate) {
+ srd_dbg("%s: %s: Will return from wait().",
+ di->inst_id, __func__);
+ g_mutex_unlock(&di->data_mutex);
+ return NULL;
+ }
+
g_mutex_unlock(&di->data_mutex);
}