SR_API int sr_input_scan_file(const char *filename, const struct sr_input **in);
SR_API struct sr_dev_inst *sr_input_dev_inst_get(const struct sr_input *in);
SR_API int sr_input_send(const struct sr_input *in, GString *buf);
+SR_API int sr_input_end(const struct sr_input *in);
SR_API int sr_input_free(const struct sr_input *in);
/*--- output/output.c -------------------------------------------------------*/
sr_err("No receive in module %d ('%s').", i, d);
errors++;
}
+ if (!inputs[i]->end) {
+ sr_err("No end in module %d ('%s').", i, d);
+ errors++;
+ }
if (errors == 0)
continue;
return SR_OK;
}
-static int receive(struct sr_input *in, GString *buf)
+static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
struct sr_config *src;
struct context *inc;
gsize chunk_size, i;
- int chunk, num_channels;
+ int chunk;
inc = in->priv;
-
- g_string_append_len(in->buf, buf->str, buf->len);
-
- num_channels = g_slist_length(in->sdi->channels);
-
- if (!in->sdi_ready) {
- /* sdi is ready, notify frontend. */
- in->sdi_ready = TRUE;
- return SR_OK;
- }
-
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
packet.type = SR_DF_LOGIC;
packet.payload = &logic;
- logic.unitsize = (num_channels + 7) / 8;
+ logic.unitsize = (g_slist_length(in->sdi->channels) + 7) / 8;
/* Cut off at multiple of unitsize. */
chunk_size = in->buf->len / logic.unitsize * logic.unitsize;
return SR_OK;
}
-static int cleanup(struct sr_input *in)
+static int receive(struct sr_input *in, GString *buf)
+{
+ int ret;
+
+ g_string_append_len(in->buf, buf->str, buf->len);
+
+ if (!in->sdi_ready) {
+ /* sdi is ready, notify frontend. */
+ in->sdi_ready = TRUE;
+ return SR_OK;
+ }
+
+ ret = process_buffer(in);
+
+ return ret;
+}
+
+static int end(struct sr_input *in)
{
struct context *inc;
struct sr_datafeed_packet packet;
+ int ret;
- inc = in->priv;
- if (!inc)
- return SR_OK;
+ if (in->sdi_ready)
+ ret = process_buffer(in);
+ else
+ ret = SR_OK;
+ inc = in->priv;
if (inc->started) {
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
- return SR_OK;
+ return ret;
}
static struct sr_option options[] = {
.options = get_options,
.init = init,
.receive = receive,
- .cleanup = cleanup,
+ .end = end,
};
return SR_OK;
}
-static int receive(struct sr_input *in, GString *buf)
+static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
struct sr_config *src;
struct context *inc;
gsize chunk_size, i;
- int chunk, num_channels;
+ int chunk;
inc = in->priv;
-
- g_string_append_len(in->buf, buf->str, buf->len);
-
- num_channels = g_slist_length(in->sdi->channels);
-
- if (!in->sdi_ready) {
- /* sdi is ready, notify frontend. */
- in->sdi_ready = TRUE;
- return SR_OK;
- }
-
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
packet.type = SR_DF_LOGIC;
packet.payload = &logic;
- logic.unitsize = (num_channels + 7) / 8;
+ logic.unitsize = (g_slist_length(in->sdi->channels) + 7) / 8;
/* Cut off at multiple of unitsize. */
chunk_size = in->buf->len / logic.unitsize * logic.unitsize;
return SR_OK;
}
-static int cleanup(struct sr_input *in)
+static int receive(struct sr_input *in, GString *buf)
+{
+ int ret;
+
+ g_string_append_len(in->buf, buf->str, buf->len);
+
+ if (!in->sdi_ready) {
+ /* sdi is ready, notify frontend. */
+ in->sdi_ready = TRUE;
+ return SR_OK;
+ }
+
+ ret = process_buffer(in);
+
+ return ret;
+}
+
+static int end(struct sr_input *in)
{
struct context *inc;
struct sr_datafeed_packet packet;
+ int ret;
- inc = in->priv;
- if (!inc)
- return SR_OK;
+ if (in->sdi_ready)
+ ret = process_buffer(in);
+ else
+ ret = SR_OK;
+ inc = in->priv;
if (inc->started) {
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
- return SR_OK;
+ return ret;
}
static struct sr_option options[] = {
.format_match = format_match,
.init = init,
.receive = receive,
- .cleanup = cleanup,
+ .end = end,
};
return ret;
}
-static int receive(struct sr_input *in, GString *buf)
+static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
int max_columns, ret, l;
char *p, **lines, **columns;
- g_string_append_len(in->buf, buf->str, buf->len);
-
inc = in->priv;
- if (!inc->termination) {
- if ((ret = initial_receive(in)) == SR_ERR_NA)
- /* Not enough data yet. */
- return SR_OK;
- else if (ret != SR_OK)
- return SR_ERR;
-
- /* sdi is ready, notify frontend. */
- in->sdi_ready = TRUE;
- return SR_OK;
- }
-
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
g_strfreev(lines);
g_string_erase(in->buf, 0, p - in->buf->str + 1);
- return SR_OK;
+ return ret;
}
-static int cleanup(struct sr_input *in)
+static int receive(struct sr_input *in, GString *buf)
{
struct context *inc;
- struct sr_datafeed_packet packet;
+ int ret;
+
+ g_string_append_len(in->buf, buf->str, buf->len);
inc = in->priv;
- if (!inc)
+ if (!inc->termination) {
+ if ((ret = initial_receive(in)) == SR_ERR_NA)
+ /* Not enough data yet. */
+ return SR_OK;
+ else if (ret != SR_OK)
+ return SR_ERR;
+
+ /* sdi is ready, notify frontend. */
+ in->sdi_ready = TRUE;
return SR_OK;
+ }
+
+ ret = process_buffer(in);
+
+ return ret;
+}
+
+static int end(struct sr_input *in)
+{
+ struct context *inc;
+ struct sr_datafeed_packet packet;
+ int ret;
+ if (in->sdi_ready)
+ ret = process_buffer(in);
+ else
+ ret = SR_OK;
+
+ inc = in->priv;
if (inc->started) {
/* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
+ return ret;
+}
+
+static int cleanup(struct sr_input *in)
+{
+ struct context *inc;
+
+ inc = in->priv;
+
if (inc->delimiter)
g_string_free(inc->delimiter, TRUE);
.format_match = format_match,
.init = init,
.receive = receive,
+ .end = end,
.cleanup = cleanup,
};
return in->module->receive((struct sr_input *)in, buf);
}
+/**
+ * Signal the input module no more data will come.
+ *
+ * This will cause the module to process any data it may have buffered.
+ * The SR_DF_END packet will also typically be sent at this time.
+ *
+ * @since 0.4.0
+ */
+SR_API int sr_input_end(const struct sr_input *in)
+{
+ sr_spew("Calling end() on %s module.", in->module->id);
+ return in->module->end((struct sr_input *)in);
+}
+
/**
* Free the specified input instance and all associated resources.
*
return FALSE;
}
-static int receive(struct sr_input *in, GString *buf)
+static int process_buffer(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
uint64_t samplerate;
char *p;
- g_string_append_len(in->buf, buf->str, buf->len);
-
inc = in->priv;
- if (!inc->got_header) {
- if (!have_header(in->buf))
- return SR_OK;
- if (!parse_header(in, in->buf) != SR_OK)
- /* There was a header in there, but it was malformed. */
- return SR_ERR;
-
- in->sdi_ready = TRUE;
- /* sdi is ready, notify frontend. */
- return SR_OK;
- }
-
if (!inc->started) {
std_session_send_df_header(in->sdi, LOG_PREFIX);
return SR_OK;
}
-static int cleanup(struct sr_input *in)
+static int receive(struct sr_input *in, GString *buf)
+{
+ struct context *inc;
+ int ret;
+
+ g_string_append_len(in->buf, buf->str, buf->len);
+
+ inc = in->priv;
+ if (!inc->got_header) {
+ if (!have_header(in->buf))
+ return SR_OK;
+ if (!parse_header(in, in->buf) != SR_OK)
+ /* There was a header in there, but it was malformed. */
+ return SR_ERR;
+
+ in->sdi_ready = TRUE;
+ /* sdi is ready, notify frontend. */
+ return SR_OK;
+ }
+
+ ret = process_buffer(in);
+
+ return ret;
+}
+
+static int end(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
+ int ret;
+
+ if (in->sdi_ready)
+ ret = process_buffer(in);
+ else
+ ret = SR_OK;
inc = in->priv;
if (inc->started) {
- /* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
+ return ret;
+}
+
+static int cleanup(struct sr_input *in)
+{
+ struct context *inc;
+
+ inc = in->priv;
g_slist_free_full(inc->channels, free_channel);
return SR_OK;
.format_match = format_match,
.init = init,
.receive = receive,
+ .end = end,
.cleanup = cleanup,
};
sr_session_send(in->sdi, &packet);
}
-static int receive(struct sr_input *in, GString *buf)
+static int process_buffer(struct sr_input *in)
{
+ struct context *inc;
struct sr_datafeed_packet packet;
struct sr_datafeed_meta meta;
struct sr_channel *ch;
struct sr_config *src;
- struct context *inc;
int offset, chunk_samples, total_samples, processed, max_chunk_samples;
- int num_samples, ret, i;
+ int num_samples, i;
char channelname[8];
- g_string_append_len(in->buf, buf->str, buf->len);
-
- if (in->buf->len < MIN_DATA_CHUNK_OFFSET) {
- /*
- * Don't even try until there's enough room
- * for the data segment to start.
- */
- return SR_OK;
- }
-
inc = in->priv;
- if (!in->sdi_ready) {
- if ((ret = parse_wav_header(in->buf, inc)) == SR_ERR_NA)
- /* Not enough data yet. */
- return SR_OK;
- else if (ret != SR_OK)
- return ret;
-
- /* sdi is ready, notify frontend. */
- in->sdi_ready = TRUE;
- return SR_OK;
- }
-
if (!inc->started) {
for (i = 0; i < inc->num_channels; i++) {
snprintf(channelname, 8, "CH%d", i + 1);
return SR_OK;
}
-static int cleanup(struct sr_input *in)
+static int receive(struct sr_input *in, GString *buf)
+{
+ struct context *inc;
+ int ret;
+
+ g_string_append_len(in->buf, buf->str, buf->len);
+
+ if (in->buf->len < MIN_DATA_CHUNK_OFFSET) {
+ /*
+ * Don't even try until there's enough room
+ * for the data segment to start.
+ */
+ return SR_OK;
+ }
+
+ inc = in->priv;
+ if (!in->sdi_ready) {
+ if ((ret = parse_wav_header(in->buf, inc)) == SR_ERR_NA)
+ /* Not enough data yet. */
+ return SR_OK;
+ else if (ret != SR_OK)
+ return ret;
+
+ /* sdi is ready, notify frontend. */
+ in->sdi_ready = TRUE;
+ return SR_OK;
+ }
+
+ ret = process_buffer(in);
+
+ return SR_OK;
+}
+
+static int end(struct sr_input *in)
{
struct sr_datafeed_packet packet;
struct context *inc;
+ int ret;
+
+ if (in->sdi_ready)
+ ret = process_buffer(in);
+ else
+ ret = SR_OK;
inc = in->priv;
if (inc->started) {
- /* End of stream. */
packet.type = SR_DF_END;
sr_session_send(in->sdi, &packet);
}
- return SR_OK;
+ return ret;
}
SR_PRIV struct sr_input_module input_wav = {
.format_match = format_match,
.init = init,
.receive = receive,
- .cleanup = cleanup,
+ .end = end,
};
*/
int (*receive) (struct sr_input *in, GString *buf);
+ int (*end) (struct sr_input *in);
+
/**
* This function is called after the caller is finished using
* the input module, and can be used to free any internal