irmp/irmpsystem.h \
irmp/irmpprotocols.h
noinst_HEADERS += irmp/irmp.c
+libirmp_la_CFLAGS = $(LIBIRMP_CFLAGS)
+libirmp_la_LIBADD = $(LIBIRMP_LIBS)
libirmp_la_LDFLAGS = -no-undefined -version-info 0:0:0
endif
IRC
---
-You can find the sigrok developers in the #sigrok IRC channel on Freenode.
+You can find the sigrok developers in the #sigrok IRC channel on Libera.Chat.
Website
# first, since usually only that variant will add "-lpython3.8".
# https://docs.python.org/3/whatsnew/3.8.html#debug-build-uses-the-same-abi-as-release-build
SR_PKG_CHECK([python3], [SRD_PKGLIBS],
- [python-3.9-embed], [python-3.8-embed], [python-3.8 >= 3.8], [python-3.7 >= 3.7], [python-3.6 >= 3.6], [python-3.5 >= 3.5], [python-3.4 >= 3.4], [python-3.3 >= 3.3], [python-3.2 >= 3.2], [python3 >= 3.2])
+ [python-3.12-embed], [python-3.11-embed],
+ [python-3.10-embed], [python-3.9-embed], [python-3.8-embed], [python3-embed],
+ [python-3.8 >= 3.8], [python-3.7 >= 3.7], [python-3.6 >= 3.6], [python-3.5 >= 3.5],
+ [python-3.4 >= 3.4], [python-3.3 >= 3.3], [python-3.2 >= 3.2], [python3 >= 3.2])
AS_IF([test "x$sr_have_python3" = xno],
[AC_MSG_ERROR([Cannot find Python 3 development headers.])])
# Retrieve the compile and link flags for all modules combined.
# Also, bail out at this point if any module dependency is not met.
PKG_CHECK_MODULES([LIBSIGROKDECODE], [glib-2.0 >= 2.34 $SRD_PKGLIBS])
+PKG_CHECK_MODULES([LIBIRMP], [glib-2.0 >= 2.34 $SRD_PKGLIBS])
PKG_CHECK_MODULES([TESTS], [$SRD_PKGLIBS_TESTS glib-2.0 $SRD_PKGLIBS])
srd_glib_version=`$PKG_CONFIG --modversion glib-2.0 2>&AS_MESSAGE_LOG_FD`
g_free(row);
}
+static void logic_output_channel_free(void *data)
+{
+ struct srd_decoder_logic_output_channel *logic_out_ch = data;
+
+ if (!logic_out_ch)
+ return;
+
+ g_free(logic_out_ch->desc);
+ g_free(logic_out_ch->id);
+ g_free(logic_out_ch);
+}
+
static void decoder_option_free(void *data)
{
struct srd_decoder_option *opt = data;
PyObject *py_channellist, *py_entry;
struct srd_channel *pdch;
GSList *pdchl;
- ssize_t i;
+ ssize_t ch_idx;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
goto err_out;
}
- for (i = PyTuple_Size(py_channellist) - 1; i >= 0; i--) {
- py_entry = PyTuple_GetItem(py_channellist, i);
+ ch_idx = PyTuple_Size(py_channellist);
+ while (ch_idx--) {
+ py_entry = PyTuple_GetItem(py_channellist, ch_idx);
if (!py_entry)
goto except_out;
if (py_dictitem_as_str(py_entry, "desc", &pdch->desc) != SRD_OK)
goto err_out;
- pdch->order = offset + i;
+ pdch->order = offset + ch_idx;
}
Py_DECREF(py_channellist);
GSList *options;
struct srd_decoder_option *o;
GVariant *gvar;
- ssize_t opt, i;
+ ssize_t opt, val_idx;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
goto err_out;
}
- for (i = PyTuple_Size(py_values) - 1; i >= 0; i--) {
- py_item = PyTuple_GetItem(py_values, i);
+ val_idx = PyTuple_Size(py_values);
+ while (val_idx--) {
+ py_item = PyTuple_GetItem(py_values, val_idx);
if (!py_item)
goto except_out;
}
/* Convert annotation class attribute to GSList of char **. */
-static int get_annotations(struct srd_decoder *dec)
+static int get_annotations(struct srd_decoder *dec, size_t *ret_count)
{
PyObject *py_annlist, *py_ann;
GSList *annotations;
char **annpair;
- ssize_t i;
+ ssize_t ann_idx;
PyGILState_STATE gstate;
+ if (ret_count)
+ *ret_count = 0;
+
gstate = PyGILState_Ensure();
if (!PyObject_HasAttrString(dec->py_dec, "annotations")) {
goto except_out;
if (!PyTuple_Check(py_annlist)) {
- srd_err("Protocol decoder %s annotations should "
- "be a tuple.", dec->name);
+ srd_err("Protocol decoder %s annotations should be a tuple.",
+ dec->name);
goto err_out;
}
- for (i = PyTuple_Size(py_annlist) - 1; i >= 0; i--) {
- py_ann = PyTuple_GetItem(py_annlist, i);
+ ann_idx = PyTuple_Size(py_annlist);
+ if (ret_count)
+ *ret_count = ann_idx;
+ while (ann_idx--) {
+ py_ann = PyTuple_GetItem(py_annlist, ann_idx);
if (!py_ann)
goto except_out;
if (!PyTuple_Check(py_ann) || PyTuple_Size(py_ann) != 2) {
- srd_err("Protocol decoder %s annotation %zd should "
- "be a tuple with two elements.",
- dec->name, i + 1);
+ srd_err("Protocol decoder %s annotation %zd should be a tuple with two elements.",
+ dec->name, ann_idx + 1);
goto err_out;
}
if (py_strseq_to_char(py_ann, &annpair) != SRD_OK)
}
/* Convert annotation_rows to GSList of 'struct srd_decoder_annotation_row'. */
-static int get_annotation_rows(struct srd_decoder *dec)
+static int get_annotation_rows(struct srd_decoder *dec, size_t cls_count)
{
+ const char *py_member_name = "annotation_rows";
+
PyObject *py_ann_rows, *py_ann_row, *py_ann_classes, *py_item;
GSList *annotation_rows;
struct srd_decoder_annotation_row *ann_row;
- ssize_t i, k;
+ ssize_t row_idx, item_idx;
size_t class_idx;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
- if (!PyObject_HasAttrString(dec->py_dec, "annotation_rows")) {
+ if (!PyObject_HasAttrString(dec->py_dec, py_member_name)) {
PyGILState_Release(gstate);
return SRD_OK;
}
annotation_rows = NULL;
- py_ann_rows = PyObject_GetAttrString(dec->py_dec, "annotation_rows");
+ py_ann_rows = PyObject_GetAttrString(dec->py_dec, py_member_name);
if (!py_ann_rows)
goto except_out;
if (!PyTuple_Check(py_ann_rows)) {
- srd_err("Protocol decoder %s annotation_rows "
- "must be a tuple.", dec->name);
+ srd_err("Protocol decoder %s %s must be a tuple.",
+ dec->name, py_member_name);
goto err_out;
}
- for (i = PyTuple_Size(py_ann_rows) - 1; i >= 0; i--) {
- py_ann_row = PyTuple_GetItem(py_ann_rows, i);
+ row_idx = PyTuple_Size(py_ann_rows);
+ while (row_idx--) {
+ py_ann_row = PyTuple_GetItem(py_ann_rows, row_idx);
if (!py_ann_row)
goto except_out;
if (!PyTuple_Check(py_ann_row) || PyTuple_Size(py_ann_row) != 3) {
- srd_err("Protocol decoder %s annotation_rows "
- "must contain only tuples of 3 elements.",
- dec->name);
+ srd_err("Protocol decoder %s %s must contain only tuples of 3 elements.",
+ dec->name, py_member_name);
goto err_out;
}
ann_row = g_malloc0(sizeof(struct srd_decoder_annotation_row));
goto except_out;
if (!PyTuple_Check(py_ann_classes)) {
- srd_err("Protocol decoder %s annotation_rows tuples "
- "must have a tuple of numbers as 3rd element.",
- dec->name);
+ srd_err("Protocol decoder %s %s tuples must have a tuple of numbers as 3rd element.",
+ dec->name, py_member_name);
goto err_out;
}
- for (k = PyTuple_Size(py_ann_classes) - 1; k >= 0; k--) {
- py_item = PyTuple_GetItem(py_ann_classes, k);
+ item_idx = PyTuple_Size(py_ann_classes);
+ while (item_idx--) {
+ py_item = PyTuple_GetItem(py_ann_classes, item_idx);
if (!py_item)
goto except_out;
if (!PyLong_Check(py_item)) {
- srd_err("Protocol decoder %s annotation row "
- "class tuple must only contain numbers.",
+ srd_err("Protocol decoder %s annotation row class tuple must only contain numbers.",
dec->name);
goto err_out;
}
class_idx = PyLong_AsSize_t(py_item);
if (PyErr_Occurred())
goto except_out;
+ if (class_idx >= cls_count) {
+ srd_err("Protocol decoder %s annotation row %zd references invalid class %zu.",
+ dec->name, row_idx, class_idx);
+ goto err_out;
+ }
ann_row->ann_classes = g_slist_prepend(ann_row->ann_classes,
GSIZE_TO_POINTER(class_idx));
PyObject *py_bin_classes, *py_bin_class;
GSList *bin_classes;
char **bin;
- ssize_t i;
+ ssize_t bin_idx;
PyGILState_STATE gstate;
gstate = PyGILState_Ensure();
goto err_out;
}
- for (i = PyTuple_Size(py_bin_classes) - 1; i >= 0; i--) {
- py_bin_class = PyTuple_GetItem(py_bin_classes, i);
+ bin_idx = PyTuple_Size(py_bin_classes);
+ while (bin_idx--) {
+ py_bin_class = PyTuple_GetItem(py_bin_classes, bin_idx);
if (!py_bin_class)
goto except_out;
return SRD_ERR_PYTHON;
}
+/* Convert logic_output_channels to GSList of 'struct srd_decoder_logic_output_channel'. */
+static int get_logic_output_channels(struct srd_decoder *dec)
+{
+ PyObject *py_logic_out_chs, *py_logic_out_ch, *py_item;
+ GSList *logic_out_chs;
+ struct srd_decoder_logic_output_channel *logic_out_ch;
+ ssize_t i;
+ PyGILState_STATE gstate;
+
+ gstate = PyGILState_Ensure();
+
+ if (!PyObject_HasAttrString(dec->py_dec, "logic_output_channels")) {
+ PyGILState_Release(gstate);
+ return SRD_OK;
+ }
+
+ logic_out_chs = NULL;
+
+ py_logic_out_chs = PyObject_GetAttrString(dec->py_dec, "logic_output_channels");
+ if (!py_logic_out_chs)
+ goto except_out;
+
+ if (!PyTuple_Check(py_logic_out_chs)) {
+ srd_err("Protocol decoder %s logic_output_channels "
+ "must be a tuple.", dec->name);
+ goto err_out;
+ }
+
+ for (i = PyTuple_Size(py_logic_out_chs) - 1; i >= 0; i--) {
+ py_logic_out_ch = PyTuple_GetItem(py_logic_out_chs, i);
+ if (!py_logic_out_ch)
+ goto except_out;
+
+ if (!PyTuple_Check(py_logic_out_ch) || PyTuple_Size(py_logic_out_ch) != 2) {
+ srd_err("Protocol decoder %s logic_output_channels "
+ "must contain only tuples of 2 elements.",
+ dec->name);
+ goto err_out;
+ }
+ logic_out_ch = g_malloc0(sizeof(*logic_out_ch));
+ /* Add to list right away so it doesn't get lost. */
+ logic_out_chs = g_slist_prepend(logic_out_chs, logic_out_ch);
+
+ py_item = PyTuple_GetItem(py_logic_out_ch, 0);
+ if (!py_item)
+ goto except_out;
+ if (py_str_as_str(py_item, &logic_out_ch->id) != SRD_OK)
+ goto err_out;
+
+ py_item = PyTuple_GetItem(py_logic_out_ch, 1);
+ if (!py_item)
+ goto except_out;
+ if (py_str_as_str(py_item, &logic_out_ch->desc) != SRD_OK)
+ goto err_out;
+ }
+ dec->logic_output_channels = logic_out_chs;
+ Py_DECREF(py_logic_out_chs);
+ PyGILState_Release(gstate);
+
+ return SRD_OK;
+
+except_out:
+ srd_exception_catch("Failed to get %s decoder logic output channels",
+ dec->name);
+
+err_out:
+ g_slist_free_full(logic_out_chs, &logic_output_channel_free);
+ Py_XDECREF(py_logic_out_chs);
+ PyGILState_Release(gstate);
+
+ return SRD_ERR_PYTHON;
+}
+
/* Check whether the Decoder class defines the named method. */
static int check_method(PyObject *py_dec, const char *mod_name,
const char *method_name)
int is_subclass;
const char *fail_txt;
PyGILState_STATE gstate;
+ size_t ann_cls_count;
if (!srd_check_init())
return SRD_ERR;
goto err_out;
}
- if (get_annotations(d) != SRD_OK) {
+ if (get_annotations(d, &ann_cls_count) != SRD_OK) {
fail_txt = "cannot get annotations";
goto err_out;
}
- if (get_annotation_rows(d) != SRD_OK) {
+ if (get_annotation_rows(d, ann_cls_count) != SRD_OK) {
fail_txt = "cannot get annotation rows";
goto err_out;
}
goto err_out;
}
+ if (get_logic_output_channels(d) != SRD_OK) {
+ fail_txt = "cannot get logic output channels";
+ goto err_out;
+ }
+
PyGILState_Release(gstate);
/* Append it to the list of loaded decoders. */
##
import sigrokdecode as srd
+from common.srdhelper import bitpack_lsb
def disabled_enabled(v):
return ['Disabled', 'Enabled'][v]
def output_power(v):
- return '%+ddBm' % [-4, -1, 2, 5][v]
+ return '{:+d}dBm'.format([-4, -1, 2, 5][v])
+# Notes on the implementation:
+# - A register's description is an iterable of tuples which contain:
+# The starting bit position, the bit count, the name of a field, and
+# an optional parser which interprets the field's content. Parser are
+# expected to yield a single text string when they exist. Other types
+# of output are passed to Python's .format() routine as is.
+# - Bit fields' width in registers determines the range of indices in
+# table/tuple lookups. Keep the implementation as robust as possible
+# during future maintenance. Avoid Python runtime errors when adjusting
+# the decoder.
regs = {
-# reg: name offset width parser
- 0: [
- ('FRAC', 3, 12, None),
- ('INT', 15, 16, lambda v: 'Not Allowed' if v < 32 else v)
- ],
- 1: [
- ('MOD', 3, 12, None),
- ('Phase', 15, 12, None),
- ('Prescalar', 27, 1, lambda v: ['4/5', '8/9'][v]),
- ('Phase Adjust', 28, 1, lambda v: ['Off', 'On'][v]),
- ],
- 2: [
- ('Counter Reset', 3, 1, disabled_enabled),
- ('Charge Pump Three-State', 4, 1, disabled_enabled),
- ('Power-Down', 5, 1, disabled_enabled),
- ('PD Polarity', 6, 1, lambda v: ['Negative', 'Positive'][v]),
- ('LDP', 7, 1, lambda v: ['10ns', '6ns'][v]),
- ('LDF', 8, 1, lambda v: ['FRAC-N', 'INT-N'][v]),
- ('Charge Pump Current Setting', 9, 4, lambda v: '%0.2fmA @ 5.1kΩ' %
- [0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50,
- 2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00][v]),
- ('Double Buffer', 13, 1, disabled_enabled),
- ('R Counter', 14, 10, None),
- ('RDIV2', 24, 1, disabled_enabled),
- ('Reference Doubler', 25, 1, disabled_enabled),
- ('MUXOUT', 26, 3, lambda v:
- ['Three-State Output', 'DVdd', 'DGND', 'R Counter Output', 'N Divider Output',
- 'Analog Lock Detect', 'Digital Lock Detect', 'Reserved'][v]),
- ('Low Noise and Low Spur Modes', 29, 2, lambda v:
- ['Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode'][v])
- ],
- 3: [
- ('Clock Divider', 3, 12, None),
- ('Clock Divider Mode', 15, 2, lambda v:
- ['Clock Divider Off', 'Fast Lock Enable', 'Resync Enable', 'Reserved'][v]),
- ('CSR Enable', 18, 1, disabled_enabled),
- ('Charge Cancellation', 21, 1, disabled_enabled),
- ('ABP', 22, 1, lambda v: ['6ns (FRAC-N)', '3ns (INT-N)'][v]),
- ('Band Select Clock Mode', 23, 1, lambda v: ['Low', 'High'][v])
- ],
- 4: [
- ('Output Power', 3, 2, output_power),
- ('Output Enable', 5, 1, disabled_enabled),
- ('AUX Output Power', 6, 2, output_power),
- ('AUX Output Select', 8, 1, lambda v: ['Divided Output', 'Fundamental'][v]),
- ('AUX Output Enable', 9, 1, disabled_enabled),
- ('MTLD', 10, 1, disabled_enabled),
- ('VCO Power-Down', 11, 1, lambda v:
- 'VCO Powered ' + ('Down' if v == 1 else 'Up')),
- ('Band Select Clock Divider', 12, 8, None),
- ('RF Divider Select', 20, 3, lambda v: '÷' + str(2**v)),
- ('Feedback Select', 23, 1, lambda v: ['Divided', 'Fundamental'][v]),
- ],
- 5: [
- ('LD Pin Mode', 22, 2, lambda v:
- ['Low', 'Digital Lock Detect', 'Low', 'High'][v])
- ]
+ # Register description fields:
+ # offset, width, name, parser.
+ 0: (
+ ( 3, 12, 'FRAC'),
+ (15, 16, 'INT',
+ None, lambda v: 'Not Allowed' if v < 23 else None,
+ ),
+ ),
+ 1: (
+ ( 3, 12, 'MOD'),
+ (15, 12, 'Phase'),
+ (27, 1, 'Prescalar', lambda v: ('4/5', '8/9',)[v]),
+ (28, 1, 'Phase Adjust', lambda v: ('Off', 'On',)[v]),
+ ),
+ 2: (
+ ( 3, 1, 'Counter Reset', disabled_enabled),
+ ( 4, 1, 'Charge Pump Three-State', disabled_enabled),
+ ( 5, 1, 'Power-Down', disabled_enabled),
+ ( 6, 1, 'PD Polarity', lambda v: ('Negative', 'Positive',)[v]),
+ ( 7, 1, 'LDP', lambda v: ('10ns', '6ns',)[v]),
+ ( 8, 1, 'LDF', lambda v: ('FRAC-N', 'INT-N',)[v]),
+ ( 9, 4, 'Charge Pump Current Setting',
+ lambda v: '{curr:0.2f}mA @ 5.1kΩ'.format(curr = (
+ 0.31, 0.63, 0.94, 1.25, 1.56, 1.88, 2.19, 2.50,
+ 2.81, 3.13, 3.44, 3.75, 4.06, 4.38, 4.69, 5.00,
+ )[v])),
+ (13, 1, 'Double Buffer', disabled_enabled),
+ (14, 10, 'R Counter'),
+ (24, 1, 'RDIV2', disabled_enabled),
+ (25, 1, 'Reference Doubler', disabled_enabled),
+ (26, 3, 'MUXOUT',
+ lambda v: '{text}'.format(text = (
+ 'Three-State Output', 'DVdd', 'DGND',
+ 'R Counter Output', 'N Divider Output',
+ 'Analog Lock Detect', 'Digital Lock Detect',
+ 'Reserved',
+ )[v])),
+ (29, 2, 'Low Noise and Low Spur Modes',
+ lambda v: '{text}'.format(text = (
+ 'Low Noise Mode', 'Reserved', 'Reserved', 'Low Spur Mode',
+ )[v])),
+ ),
+ 3: (
+ ( 3, 12, 'Clock Divider'),
+ (15, 2, 'Clock Divider Mode',
+ lambda v: '{text}'.format(text = (
+ 'Clock Divider Off', 'Fast Lock Enable',
+ 'Resync Enable', 'Reserved',
+ )[v])),
+ (18, 1, 'CSR Enable', disabled_enabled),
+ (21, 1, 'Charge Cancellation', disabled_enabled),
+ (22, 1, 'ABP', lambda v: ('6ns (FRAC-N)', '3ns (INT-N)',)[v]),
+ (23, 1, 'Band Select Clock Mode', lambda v: ('Low', 'High',)[v]),
+ ),
+ 4: (
+ ( 3, 2, 'Output Power', output_power),
+ ( 5, 1, 'Output Enable', disabled_enabled),
+ ( 6, 2, 'AUX Output Power', output_power),
+ ( 8, 1, 'AUX Output Select',
+ lambda v: ('Divided Output', 'Fundamental',)[v]),
+ ( 9, 1, 'AUX Output Enable', disabled_enabled),
+ (10, 1, 'MTLD', disabled_enabled),
+ (11, 1, 'VCO Power-Down',
+ lambda v: 'VCO Powered {ud}'.format(ud = 'Down' if v else 'Up')),
+ (12, 8, 'Band Select Clock Divider'),
+ (20, 3, 'RF Divider Select', lambda v: '÷{:d}'.format(2 ** v)),
+ (23, 1, 'Feedback Select', lambda v: ('Divided', 'Fundamental',)[v]),
+ ),
+ 5: (
+ (22, 2, 'LD Pin Mode',
+ lambda v: '{text}'.format(text = (
+ 'Low', 'Digital Lock Detect', 'Low', 'High',
+ )[v])),
+ ),
}
-ANN_REG = 0
+( ANN_REG, ANN_WARN, ) = range(2)
class Decoder(srd.Decoder):
api_version = 3
annotations = (
# Sent from the host to the chip.
('write', 'Register write'),
+ ('warning', "Warnings"),
)
annotation_rows = (
('writes', 'Register writes', (ANN_REG,)),
+ ('warnings', 'Warnings', (ANN_WARN,)),
)
def __init__(self):
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
+ def putg(self, ss, es, cls, data):
+ self.put(ss, es, self.out_ann, [ cls, data, ])
+
def decode_bits(self, offset, width):
- return (sum([(1 << i) if self.bits[offset + i][0] else 0 for i in range(width)]),
- (self.bits[offset + width - 1][1], self.bits[offset][2]))
+ '''Extract a bit field. Expects LSB input data.'''
+ bits = self.bits[offset:][:width]
+ ss, es = bits[-1][1], bits[0][2]
+ value = bitpack_lsb(bits, 0)
+ return ( value, ( ss, es, ))
+
+ def decode_field(self, name, offset, width, parser = None, checker = None):
+ '''Interpret a bit field. Emits an annotation.'''
+ # Get the register field's content and position.
+ val, ( ss, es, ) = self.decode_bits(offset, width)
+ # Have the field's content formatted, emit an annotation.
+ formatted = parser(val) if parser else '{}'.format(val)
+ if formatted is not None:
+ text = ['{name}: {val}'.format(name = name, val = formatted)]
+ else:
+ text = ['{name}'.format(name = name)]
+ if text:
+ self.putg(ss, es, ANN_REG, text)
+ # Have the field's content checked, emit an optional warning.
+ warn = checker(val) if checker else None
+ if warn:
+ text = ['{}'.format(warn)]
+ self.putg(ss, es, ANN_WARN, text)
- def decode_field(self, name, offset, width, parser):
- val, pos = self.decode_bits(offset, width)
- self.put(pos[0], pos[1], self.out_ann, [ANN_REG,
- ['%s: %s' % (name, parser(val) if parser else str(val))]])
- return val
+ def decode_word(self, ss, es, bits):
+ '''Interpret a 32bit word after accumulation completes.'''
+ # SPI transfer content must be exactly one 32bit word.
+ count = len(self.bits)
+ if count != 32:
+ text = [
+ 'Frame error: Bit count: want 32, got {}'.format(count),
+ 'Frame error: Bit count',
+ 'Frame error',
+ ]
+ self.putg(ss, es, ANN_WARN, text)
+ return
+ # Holding bits in LSB order during interpretation simplifies
+ # bit field extraction. And annotation emitting routines expect
+ # this reverse order of bits' timestamps.
+ self.bits.reverse()
+ # Determine which register was accessed.
+ reg_addr, ( reg_ss, reg_es, ) = self.decode_bits(0, 3)
+ text = [
+ 'Register: {addr}'.format(addr = reg_addr),
+ 'Reg: {addr}'.format(addr = reg_addr),
+ '[{addr}]'.format(addr = reg_addr),
+ ]
+ self.putg(reg_ss, reg_es, ANN_REG, text)
+ # Interpret the register's content (when parsers are available).
+ field_descs = regs.get(reg_addr, None)
+ if not field_descs:
+ return
+ for field_desc in field_descs:
+ parser = None
+ checker = None
+ if len(field_desc) == 3:
+ start, count, name, = field_desc
+ elif len(field_desc) == 4:
+ start, count, name, parser = field_desc
+ elif len(field_desc) == 5:
+ start, count, name, parser, checker = field_desc
+ else:
+ # Unsupported regs{} syntax, programmer's error.
+ return
+ self.decode_field(name, start, count, parser, checker)
def decode(self, ss, es, data):
+ ptype, _, _ = data
+
+ if ptype == 'TRANSFER':
+ # Process accumulated bits after completion of a transfer.
+ self.decode_word(ss, es, self.bits)
+ self.bits.clear()
- ptype, data1, data2 = data
-
- if ptype == 'CS-CHANGE':
- if data1 == 1:
- if len(self.bits) == 32:
- reg_value, reg_pos = self.decode_bits(0, 3)
- self.put(reg_pos[0], reg_pos[1], self.out_ann, [ANN_REG,
- ['Register: %d' % reg_value, 'Reg: %d' % reg_value,
- '[%d]' % reg_value]])
- if reg_value < len(regs):
- field_descs = regs[reg_value]
- for field_desc in field_descs:
- field = self.decode_field(*field_desc)
- self.bits = []
if ptype == 'BITS':
- self.bits = data1 + self.bits
+ _, mosi_bits, miso_bits = data
+ # Accumulate bits in MSB order as they are seen in SPI frames.
+ msb_bits = mosi_bits.copy()
+ msb_bits.reverse()
+ self.bits.extend(msb_bits)
# Vendor code
vendor_code = {
- 0x1e: 'Atmel',
+ 0x1E: 'Atmel',
0x00: 'Device locked',
}
# (Part family + flash size, part number)
part = {
(0x90, 0x01): 'AT90S1200',
+ (0x90, 0x05): 'ATtiny12',
+ (0x90, 0x06): 'ATtiny15',
+ (0x90, 0x07): 'ATtiny13',
(0x91, 0x01): 'AT90S2313',
+ (0x91, 0x02): 'AT90S2323',
+ (0x91, 0x03): 'AT90S2343',
+ (0x91, 0x05): 'AT90S2333',
+ (0x91, 0x06): 'ATtiny22',
+ (0x91, 0x07): 'ATtiny28',
+ (0x91, 0x08): 'ATtiny25',
+ (0x91, 0x09): 'ATtiny26',
+ (0x91, 0x0A): 'ATtiny2313',
+ (0x91, 0x0B): 'ATtiny24',
+ (0x91, 0x0C): 'ATtiny261',
(0x92, 0x01): 'AT90S4414',
- (0x92, 0x05): 'ATmega48', # 4kB flash
+ (0x92, 0x03): 'AT90S4433',
+ (0x92, 0x05): 'ATmega48(A)',
+ (0x92, 0x06): 'ATtiny45',
+ (0x92, 0x08): 'ATtiny461',
+ (0x92, 0x09): 'ATtiny48',
+ (0x92, 0x0A): 'ATmega48PA',
+ (0x92, 0x0D): 'ATtiny4313',
+ (0x92, 0x10): 'ATmega48PB',
(0x93, 0x01): 'AT90S8515',
- (0x93, 0x0a): 'ATmega88', # 8kB flash
- (0x94, 0x06): 'ATmega168', # 16kB flash
- (0xff, 0xff): 'Device code erased, or target missing',
+ (0x93, 0x03): 'AT90S8535',
+ (0x93, 0x07): 'ATmega8',
+ (0x93, 0x0A): 'ATmega88(A)',
+ (0x93, 0x0B): 'ATtiny85',
+ (0x93, 0x0D): 'ATtiny861',
+ (0x93, 0x0F): 'ATmega88PA',
+ (0x93, 0x11): 'ATtiny88',
+ (0x93, 0x16): 'ATmega88PB',
+ (0x93, 0x89): 'ATmega8U2',
+ (0x94, 0x01): 'ATmega161',
+ (0x94, 0x02): 'ATmega163',
+ (0x94, 0x03): 'ATmega16',
+ (0x94, 0x04): 'ATmega162',
+ (0x94, 0x06): 'ATmega168(A)',
+ (0x94, 0x0A): 'ATmega164PA',
+ (0x94, 0x0B): 'ATmega168PA',
+ (0x94, 0x0F): 'ATmega164A',
+ (0x94, 0x12): 'ATtiny1634',
+ (0x94, 0x15): 'ATmega168PB',
+ (0x94, 0x88): 'ATmega16U4',
+ (0x94, 0x89): 'ATmega16U2',
+ (0x95, 0x01): 'ATmega32',
+ (0x95, 0x01): 'ATmega323',
+ (0x95, 0x0F): 'ATmega328P',
+ (0x95, 0x11): 'ATmega324PA',
+ (0x95, 0x14): 'ATmega328',
+ (0x95, 0x15): 'ATmega324A',
+ (0x95, 0x87): 'ATmega32U4',
+ (0x95, 0x8A): 'ATmega32U2',
+ (0x96, 0x08): 'ATmega640',
+ (0x96, 0x09): 'ATmega644(A)',
+ (0x96, 0x0A): 'ATmega644PA',
+ (0x97, 0x01): 'ATmega103',
+ (0x97, 0x03): 'ATmega1280',
+ (0x97, 0x04): 'ATmega1281',
+ (0x97, 0x05): 'ATmega1284P',
+ (0x97, 0x06): 'ATmega1284',
+ (0x98, 0x01): 'ATmega2560',
+ (0x98, 0x02): 'ATmega2561',
+ (0xFF, 0xFF): 'Device code erased, or target missing',
(0x01, 0x02): 'Device locked',
- # TODO: Lots more entries.
}
self.part_number = ret[3]
self.putx([Ann.RSB2, ['Part number: 0x%02x' % ret[3]]])
- p = part[(self.part_fam_flash_size, self.part_number)]
- data = [Ann.DEV, ['Device: Atmel %s' % p]]
- self.put(self.ss_device, self.es_cmd, self.out_ann, data)
+ # Part name if known
+ key = (self.part_fam_flash_size, self.part_number)
+ if key in part:
+ p = part[key]
+ data = [Ann.DEV, ['Device: Atmel %s' % p]]
+ self.put(self.ss_device, self.es_cmd, self.out_ann, data)
# Sanity check on reply.
if ret[1] != 0x30 or ret[2] != self.xx or ret[0] != self.mm:
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
+import copy
import sigrokdecode as srd
from .lists import *
self.reset_variables()
def decode(self, ss, es, data):
- self.cmd, self.databyte = data
+ cmd, _ = data
# Collect the 'BITS' packet, then return. The next packet is
# guaranteed to belong to these bits we just stored.
- if self.cmd == 'BITS':
- self.bits = self.databyte
+ if cmd == 'BITS':
+ _, databits = data
+ self.bits = copy.deepcopy(databits)
return
- # Store the start/end samples of this I²C packet.
+ # Store the start/end samples of this I²C packet. Deep copy
+ # caller's data, assuming that implementation details of the
+ # above complex methods can access the data after returning
+ # from the .decode() invocation, with the data having become
+ # invalid by that time of access. This conservative approach
+ # can get weakened after close inspection of those methods.
self.ss, self.es = ss, es
+ _, databyte = data
+ databyte = copy.deepcopy(databyte)
+ self.cmd, self.databyte = cmd, databyte
# State machine.
s = 'handle_%s' % self.state.lower().replace(' ', '_')
# TODO: Implement support for inverting SDA/SCL levels (0->1 and 1->0).
# TODO: Implement support for detecting various bus errors.
+from common.srdhelper import bitpack_msb
import sigrokdecode as srd
'''
command. Slave addresses do not include bit 0 (the READ/WRITE indication bit).
For example, a slave address field could be 0x51 (instead of 0xa2).
For 'START', 'START REPEAT', 'STOP', 'ACK', and 'NACK' <pdata> is None.
+For 'BITS' <pdata> is a sequence of tuples of bit values and their start and
+stop positions, in LSB first order (although the I2C protocol is MSB first).
'''
-# CMD: [annotation-type-index, long annotation, short annotation]
+# Meaning of table items:
+# command -> [annotation class, annotation text in order of decreasing length]
proto = {
- 'START': [0, 'Start', 'S'],
- 'START REPEAT': [1, 'Start repeat', 'Sr'],
- 'STOP': [2, 'Stop', 'P'],
- 'ACK': [3, 'ACK', 'A'],
- 'NACK': [4, 'NACK', 'N'],
- 'BIT': [5, 'Bit', 'B'],
- 'ADDRESS READ': [6, 'Address read', 'AR'],
- 'ADDRESS WRITE': [7, 'Address write', 'AW'],
- 'DATA READ': [8, 'Data read', 'DR'],
- 'DATA WRITE': [9, 'Data write', 'DW'],
+ 'START': [0, 'Start', 'S'],
+ 'START REPEAT': [1, 'Start repeat', 'Sr'],
+ 'STOP': [2, 'Stop', 'P'],
+ 'ACK': [3, 'ACK', 'A'],
+ 'NACK': [4, 'NACK', 'N'],
+ 'BIT': [5, '{b:1d}'],
+ 'ADDRESS READ': [6, 'Address read: {b:02X}', 'AR: {b:02X}', '{b:02X}'],
+ 'ADDRESS WRITE': [7, 'Address write: {b:02X}', 'AW: {b:02X}', '{b:02X}'],
+ 'DATA READ': [8, 'Data read: {b:02X}', 'DR: {b:02X}', '{b:02X}'],
+ 'DATA WRITE': [9, 'Data write: {b:02X}', 'DW: {b:02X}', '{b:02X}'],
+ 'WARN': [10, '{text}'],
}
class Decoder(srd.Decoder):
def reset(self):
self.samplerate = None
- self.ss = self.es = self.ss_byte = -1
- self.bitcount = 0
- self.databyte = 0
- self.wr = -1
- self.is_repeat_start = 0
- self.state = 'FIND START'
+ self.is_write = None
+ self.rem_addr_bytes = None
+ self.slave_addr_7 = None
+ self.slave_addr_10 = None
+ self.is_repeat_start = False
self.pdu_start = None
self.pdu_bits = 0
- self.bits = []
+ self.data_bits = []
+ self.bitwidth = 0
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.out_bitrate = self.register(srd.OUTPUT_META,
meta=(int, 'Bitrate', 'Bitrate from Start bit to Stop bit'))
- def putx(self, data):
- self.put(self.ss, self.es, self.out_ann, data)
-
- def putp(self, data):
- self.put(self.ss, self.es, self.out_python, data)
-
- def putb(self, data):
- self.put(self.ss, self.es, self.out_binary, data)
-
- def handle_start(self, pins):
- self.ss, self.es = self.samplenum, self.samplenum
- self.pdu_start = self.samplenum
- self.pdu_bits = 0
- cmd = 'START REPEAT' if (self.is_repeat_start == 1) else 'START'
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- self.state = 'FIND ADDRESS'
- self.bitcount = self.databyte = 0
- self.is_repeat_start = 1
- self.wr = -1
- self.bits = []
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
+
+ def putp(self, ss, es, data):
+ self.put(ss, es, self.out_python, data)
+
+ def putb(self, ss, es, data):
+ self.put(ss, es, self.out_binary, data)
+
+ def _wants_start(self):
+ # Check whether START is required (to sync to the input stream).
+ return self.pdu_start is None
+
+ def _collects_address(self):
+ # Check whether the transfer still is in the address phase (is
+ # still collecting address and r/w details, or has not started
+ # collecting it).
+ return self.rem_addr_bytes is None or self.rem_addr_bytes != 0
+
+ def _collects_byte(self):
+ # Check whether bits of a byte are being collected. Outside of
+ # the data byte, the bit is the ACK/NAK slot.
+ return self.data_bits is None or len(self.data_bits) < 8
+
+ def handle_start(self, ss, es):
+ if self.is_repeat_start:
+ cmd = 'START REPEAT'
+ else:
+ cmd = 'START'
+ self.pdu_start = ss
+ self.pdu_bits = 0
+ self.putp(ss, es, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss, es, cls, texts)
+ self.is_repeat_start = True
+ self.is_write = None
+ self.slave_addr_7 = None
+ self.slave_addr_10 = None
+ self.rem_addr_bytes = None
+ self.data_bits.clear()
+ self.bitwidth = 0
# Gather 8 bits of data plus the ACK/NACK bit.
- def handle_address_or_data(self, pins):
- scl, sda = pins
+ def handle_address_or_data(self, ss, es, value):
self.pdu_bits += 1
- # Address and data are transmitted MSB-first.
- self.databyte <<= 1
- self.databyte |= sda
-
- # Remember the start of the first data/address bit.
- if self.bitcount == 0:
- self.ss_byte = self.samplenum
-
- # Store individual bits and their start/end samplenumbers.
- # In the list, index 0 represents the LSB (I²C transmits MSB-first).
- self.bits.insert(0, [sda, self.samplenum, self.samplenum])
- if self.bitcount > 0:
- self.bits[1][2] = self.samplenum
- if self.bitcount == 7:
- self.bitwidth = self.bits[1][2] - self.bits[2][2]
- self.bits[0][2] += self.bitwidth
-
- # Return if we haven't collected all 8 + 1 bits, yet.
- if self.bitcount < 7:
- self.bitcount += 1
+ # Accumulate a byte's bits, including its start position.
+ # Accumulate individual bits and their start/end sample numbers
+ # as we see them. Get the start sample number at the time when
+ # the bit value gets sampled. Assume the start of the next bit
+ # as the end sample number of the previous bit. Guess the last
+ # bit's end sample number from the second last bit's width.
+ # Keep the bits in receive order (MSB first) during accumulation.
+ # (gsi: Strictly speaking falling SCL would be the end of the
+ # bit value's validity. That'd break compatibility though.)
+ if self.data_bits:
+ self.data_bits[-1][2] = ss
+ self.data_bits.append([value, ss, es])
+ if len(self.data_bits) < 8:
return
-
- d = self.databyte
- if self.state == 'FIND ADDRESS':
- # The READ/WRITE bit is only in address bytes, not data bytes.
- self.wr = 0 if (self.databyte & 1) else 1
- if self.options['address_format'] == 'shifted':
- d = d >> 1
-
+ self.bitwidth = self.data_bits[-2][2] - self.data_bits[-3][2]
+ self.data_bits[-1][2] = self.data_bits[-1][1] + self.bitwidth
+
+ # Get the byte value. Address and data are transmitted MSB-first.
+ d = bitpack_msb(self.data_bits, 0)
+ ss_byte, es_byte = self.data_bits[0][1], self.data_bits[-1][2]
+
+ # Process the address bytes at the start of a transfer. The
+ # first byte will carry the R/W bit, and all of the 7bit address
+ # or part of a 10bit address. Bit pattern 0b11110xxx signals
+ # that another byte follows which carries the remaining bits of
+ # a 10bit slave address.
+ is_address = self._collects_address()
+ if is_address:
+ addr_byte = d
+ if self.rem_addr_bytes is None:
+ if (addr_byte & 0xf8) == 0xf0:
+ self.rem_addr_bytes = 2
+ self.slave_addr_7 = None
+ self.slave_addr_10 = addr_byte & 0x06
+ self.slave_addr_10 <<= 7
+ else:
+ self.rem_addr_bytes = 1
+ self.slave_addr_7 = addr_byte >> 1
+ self.slave_addr_10 = None
+ has_rw_bit = self.is_write is None
+ if self.is_write is None:
+ read_bit = bool(addr_byte & 1)
+ if self.options['address_format'] == 'shifted':
+ d >>= 1
+ self.is_write = False if read_bit else True
+ elif self.slave_addr_10 is not None:
+ self.slave_addr_10 |= addr_byte
+ else:
+ cls, texts = proto['WARN'][0], proto['WARN'][1:]
+ msg = 'Unhandled address byte'
+ texts = [t.format(text = msg) for t in texts]
+ self.putg(ss_byte, es_byte, cls, texts)
+ is_write = self.is_write
+ is_seven = self.slave_addr_7 is not None
+
+ # Determine annotation classes depending on whether the byte is
+ # an address or payload data, and whether it's written or read.
bin_class = -1
- if self.state == 'FIND ADDRESS' and self.wr == 1:
+ if is_address and is_write:
cmd = 'ADDRESS WRITE'
bin_class = 1
- elif self.state == 'FIND ADDRESS' and self.wr == 0:
+ elif is_address and not is_write:
cmd = 'ADDRESS READ'
bin_class = 0
- elif self.state == 'FIND DATA' and self.wr == 1:
+ elif not is_address and is_write:
cmd = 'DATA WRITE'
bin_class = 3
- elif self.state == 'FIND DATA' and self.wr == 0:
+ elif not is_address and not is_write:
cmd = 'DATA READ'
bin_class = 2
- self.ss, self.es = self.ss_byte, self.samplenum + self.bitwidth
-
- self.putp(['BITS', self.bits])
- self.putp([cmd, d])
-
- self.putb([bin_class, bytes([d])])
-
- for bit in self.bits:
- self.put(bit[1], bit[2], self.out_ann, [5, ['%d' % bit[0]]])
-
- if cmd.startswith('ADDRESS'):
- self.ss, self.es = self.samplenum, self.samplenum + self.bitwidth
- w = ['Write', 'Wr', 'W'] if self.wr else ['Read', 'Rd', 'R']
- self.putx([proto[cmd][0], w])
- self.ss, self.es = self.ss_byte, self.samplenum
-
- self.putx([proto[cmd][0], ['%s: %02X' % (proto[cmd][1], d),
- '%s: %02X' % (proto[cmd][2], d), '%02X' % d]])
-
- # Done with this packet.
- self.bitcount = self.databyte = 0
- self.bits = []
- self.state = 'FIND ACK'
-
- def get_ack(self, pins):
- scl, sda = pins
- self.ss, self.es = self.samplenum, self.samplenum + self.bitwidth
- cmd = 'NACK' if (sda == 1) else 'ACK'
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- # There could be multiple data bytes in a row, so either find
- # another data byte or a STOP condition next.
- self.state = 'FIND DATA'
-
- def handle_stop(self, pins):
+ # Reverse the list of bits to LSB first order before emitting
+ # annotations and passing bits to upper layers. This may be
+ # unexpected because the protocol is MSB first, but it keeps
+ # backwards compatibility.
+ lsb_bits = self.data_bits[:]
+ lsb_bits.reverse()
+ self.putp(ss_byte, es_byte, ['BITS', lsb_bits])
+ self.putp(ss_byte, es_byte, [cmd, d])
+
+ self.putb(ss_byte, es_byte, [bin_class, bytes([d])])
+
+ for bit_value, ss_bit, es_bit in lsb_bits:
+ cls, texts = proto['BIT'][0], proto['BIT'][1:]
+ texts = [t.format(b = bit_value) for t in texts]
+ self.putg(ss_bit, es_bit, cls, texts)
+
+ if is_address and has_rw_bit:
+ # Assign the last bit's location to the R/W annotation.
+ # Adjust the address value's location to the left.
+ ss_bit, es_bit = self.data_bits[-1][1], self.data_bits[-1][2]
+ es_byte = self.data_bits[-2][2]
+ cls = proto[cmd][0]
+ w = ['Write', 'Wr', 'W'] if self.is_write else ['Read', 'Rd', 'R']
+ self.putg(ss_bit, es_bit, cls, w)
+
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ texts = [t.format(b = d) for t in texts]
+ self.putg(ss_byte, es_byte, cls, texts)
+
+ def get_ack(self, ss, es, value):
+ ss_bit, es_bit = ss, es
+ cmd = 'ACK' if value == 0 else 'NACK'
+ self.putp(ss_bit, es_bit, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss_bit, es_bit, cls, texts)
+ # Slave addresses can span one or two bytes, before data bytes
+ # follow. There can be an arbitrary number of data bytes. Stick
+ # with getting more address bytes if applicable, or enter or
+ # remain in the data phase of the transfer otherwise.
+ if self.rem_addr_bytes:
+ self.rem_addr_bytes -= 1
+ self.data_bits.clear()
+
+ def handle_stop(self, ss, es):
# Meta bitrate
- if self.samplerate:
- elapsed = 1 / float(self.samplerate) * (self.samplenum - self.pdu_start + 1)
+ if self.samplerate and self.pdu_start:
+ elapsed = es - self.pdu_start + 1
+ elapsed /= self.samplerate
bitrate = int(1 / elapsed * self.pdu_bits)
- self.put(self.ss_byte, self.samplenum, self.out_bitrate, bitrate)
+ ss_meta, es_meta = self.pdu_start, es
+ self.put(ss_meta, es_meta, self.out_bitrate, bitrate)
+ self.pdu_start = None
+ self.pdu_bits = 0
cmd = 'STOP'
- self.ss, self.es = self.samplenum, self.samplenum
- self.putp([cmd, None])
- self.putx([proto[cmd][0], proto[cmd][1:]])
- self.state = 'FIND START'
- self.is_repeat_start = 0
- self.wr = -1
- self.bits = []
+ self.putp(ss, es, [cmd, None])
+ cls, texts = proto[cmd][0], proto[cmd][1:]
+ self.putg(ss, es, cls, texts)
+ self.is_repeat_start = False
+ self.is_write = None
+ self.data_bits.clear()
def decode(self):
+ # Check for several bus conditions. Determine sample numbers
+ # here and pass ss, es, and bit values to handling routines.
while True:
# State machine.
- if self.state == 'FIND START':
+ # BEWARE! This implementation expects to see valid traffic,
+ # is rather picky in which phase which symbols get handled.
+ # This attempts to support severely undersampled captures,
+ # which a previous implementation happened to read instead
+ # of rejecting the inadequate input data.
+ # NOTE that handling bits at the start of their validity,
+ # and assuming that they remain valid until the next bit
+ # starts, is also done for backwards compatibility.
+ if self._wants_start():
# Wait for a START condition (S): SCL = high, SDA = falling.
- self.handle_start(self.wait({0: 'h', 1: 'f'}))
- elif self.state == 'FIND ADDRESS':
+ pins = self.wait({0: 'h', 1: 'f'})
+ ss, es = self.samplenum, self.samplenum
+ self.handle_start(ss, es)
+ elif self._collects_address() and self._collects_byte():
# Wait for a data bit: SCL = rising.
- self.handle_address_or_data(self.wait({0: 'r'}))
- elif self.state == 'FIND DATA':
+ pins = self.wait({0: 'r'})
+ _, sda = pins
+ ss, es = self.samplenum, self.samplenum + self.bitwidth
+ self.handle_address_or_data(ss, es, sda)
+ elif self._collects_byte():
# Wait for any of the following conditions (or combinations):
# a) Data sampling of receiver: SCL = rising, and/or
# b) START condition (S): SCL = high, SDA = falling, and/or
# Check which of the condition(s) matched and handle them.
if self.matched[0]:
- self.handle_address_or_data(pins)
+ _, sda = pins
+ ss, es = self.samplenum, self.samplenum + self.bitwidth
+ self.handle_address_or_data(ss, es, sda)
elif self.matched[1]:
- self.handle_start(pins)
+ ss, es = self.samplenum, self.samplenum
+ self.handle_start(ss, es)
elif self.matched[2]:
- self.handle_stop(pins)
- elif self.state == 'FIND ACK':
+ ss, es = self.samplenum, self.samplenum
+ self.handle_stop(ss, es)
+ else:
# Wait for a data/ack bit: SCL = rising.
- self.get_ack(self.wait({0: 'r'}))
+ pins = self.wait({0: 'r'})
+ _, sda = pins
+ ss, es = self.samplenum, self.samplenum + self.bitwidth
+ self.get_ack(ss, es, sda)
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
-# TODO: Support for filtering out multiple slave/direction pairs?
+# TODO
+# - Accept other slave address forms than decimal numbers?
+# - Support for filtering out multiple slave/direction pairs?
+# - Support 10bit slave addresses?
+import copy
import sigrokdecode as srd
class Decoder(srd.Decoder):
self.reset()
def reset(self):
- self.curslave = -1
- self.curdirection = None
- self.packets = [] # Local cache of I²C packets
+ self.seen_packets = []
+ self.do_forward = None
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON, proto_id='i2c')
if self.options['address'] not in range(0, 127 + 1):
raise Exception('Invalid slave (must be 0..127).')
+ self.want_addrs = []
+ if self.options['address']:
+ self.want_addrs.append(self.options['address'])
+ self.want_dir = {
+ 'read': 'READ', 'write': 'WRITE',
+ }.get(self.options['direction'], None)
- # Grab I²C packets into a local cache, until an I²C STOP condition
- # packet comes along. At some point before that STOP condition, there
- # will have been an ADDRESS READ or ADDRESS WRITE which contains the
- # I²C address of the slave that the master wants to talk to.
- # If that slave shall be filtered, output the cache (all packets from
- # START to STOP) as proto 'i2c', otherwise drop it.
- def decode(self, ss, es, data):
+ def _need_to_forward(self, slave_addr, direction):
+ if self.want_addrs and slave_addr not in self.want_addrs:
+ return False
+ if self.want_dir and direction != self.want_dir:
+ return False
+ return True
- cmd, databyte = data
+ # Accumulate observed I2C packets until a STOP or REPEATED START
+ # condition is seen. These are conditions where transfers end or
+ # where direction potentially changes. Forward all previously
+ # accumulated traffic if it passes the slave address and direction
+ # filter. This assumes that the slave address as well as the read
+ # or write direction was part of the observed traffic. There should
+ # be no surprise when incomplete traffic does not match the filter
+ # condition.
+ def decode(self, ss, es, data):
- # Add the I²C packet to our local cache.
- self.packets.append([ss, es, data])
+ # Unconditionally accumulate every lower layer packet we see.
+ # Keep deep copies for later, only reference caller's values
+ # as long as this .decode() invocation executes.
+ self.seen_packets.append([ss, es, copy.deepcopy(data)])
+ cmd, _ = data
+ # Check the slave address and transfer direction early when
+ # we see them. Keep accumulating packets while it's already
+ # known here whether to forward them. This simplifies other
+ # code paths. Including future handling of 10bit addresses.
if cmd in ('ADDRESS READ', 'ADDRESS WRITE'):
- self.curslave = databyte
- self.curdirection = cmd[8:].lower()
- elif cmd in ('STOP', 'START REPEAT'):
- # If this chunk was not for the correct slave, drop it.
- if self.options['address'] == 0:
- pass
- elif self.curslave != self.options['address']:
- self.packets = []
- return
-
- # If this chunk was not in the right direction, drop it.
- if self.options['direction'] == 'both':
- pass
- elif self.options['direction'] != self.curdirection:
- self.packets = []
- return
-
- # TODO: START->STOP chunks with both read and write (Repeat START)
- # Otherwise, send out the whole chunk of I²C packets.
- for p in self.packets:
- self.put(p[0], p[1], self.out_python, p[2])
+ direction = cmd[len('ADDRESS '):]
+ _, slave_addr = data
+ self.do_forward = self._need_to_forward(slave_addr, direction)
+ return
- self.packets = []
- else:
- pass # Do nothing, only add the I²C packet to our cache.
+ # Forward previously accumulated packets as we see their
+ # completion, and when they pass the filter condition. Prepare
+ # to handle the next transfer (the next read/write part of it).
+ if cmd in ('STOP', 'START REPEAT'):
+ if self.do_forward:
+ for ss, es, data in self.seen_packets:
+ self.put(ss, es, self.out_python, data)
+ self.seen_packets.clear()
+ self.do_forward = None
+ return
when addressing channels within the device.
- 'DATA_BYTE': <addr> is the talker address (when available), <pdata>
is the raw data byte (transport layer, ATN inactive).
+ - 'PPOLL': <addr> is not applicable, <pdata> is a list of bit indices
+ (DIO1 to DIO8 order) which responded to the PP request.
Extracted payload information (peers and their communicated data):
- 'TALK_LISTEN': <addr> is the current talker, <pdata> is the list of
ANN_RAW_BIT, ANN_RAW_BYTE,
ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,
ANN_EOI,
+ ANN_PP,
ANN_TEXT,
# TODO Want to provide one annotation class per talker address (0-30)?
ANN_IEC_PERIPH,
ANN_WARN,
-) = range(11)
+) = range(12)
(
BIN_RAW,
{'id': 'clk', 'name': 'CLK', 'desc': 'Serial clock'},
)
options = (
- {'id': 'iec_periph', 'desc': 'Decode Commodore IEC bus peripherals details',
+ {'id': 'iec_periph', 'desc': 'Decode Commodore IEC peripherals',
'default': 'no', 'values': ('no', 'yes')},
{'id': 'delim', 'desc': 'Payload data delimiter',
'default': 'eol', 'values': ('none', 'eol')},
+ {'id': 'atn_parity', 'desc': 'ATN commands use parity',
+ 'default': 'no', 'values': ('no', 'yes')},
)
annotations = (
('bit', 'IEC bit'),
('saddr', 'Secondary address'),
('data', 'Data byte'),
('eoi', 'EOI'),
+ ('pp', 'Parallel poll'),
('text', 'Talker text'),
('periph', 'IEC bus peripherals'),
('warning', 'Warning'),
('raws', 'Raw bytes', (ANN_RAW_BYTE,)),
('gpib', 'Commands/data', (ANN_CMD, ANN_LADDR, ANN_TADDR, ANN_SADDR, ANN_DATA,)),
('eois', 'EOI', (ANN_EOI,)),
+ ('polls', 'Polls', (ANN_PP,)),
('texts', 'Talker texts', (ANN_TEXT,)),
('periphs', 'IEC peripherals', (ANN_IEC_PERIPH,)),
('warnings', 'Warnings', (ANN_WARN,)),
self.es_eoi = None
self.ss_text = None
self.es_text = None
+ self.ss_pp = None
self.last_talker = None
self.last_listener = []
self.last_iec_addr = None
if had_eol and not is_eol:
self.flush_bytes_text_accu()
+ def check_pp(self, dio = None):
+ # The combination of ATN and EOI means PP (parallel poll). Track
+ # this condition's start and end, and keep grabing the DIO lines'
+ # state as long as the condition is seen, since DAV is not used
+ # in the PP communication.
+ capture_in_pp = self.curr_atn and self.curr_eoi
+ decoder_in_pp = self.ss_pp is not None
+ if capture_in_pp and not decoder_in_pp:
+ # Phase starts. Track its ss. Start collecting DIO state.
+ self.ss_pp = self.samplenum
+ self.dio_pp = []
+ return 'enter'
+ if not capture_in_pp and decoder_in_pp:
+ # Phase ends. Void its ss. Process collected DIO state.
+ ss, es = self.ss_pp, self.samplenum
+ dio = self.dio_pp or []
+ self.ss_pp, self.dio_pp = None, None
+ if ss == es:
+ # False positive, caused by low oversampling.
+ return 'leave'
+ # Emit its annotation. Translate bit indices 0..7 for the
+ # DIO1..DIO8 signals to display text. Pass bit indices in
+ # the Python output for upper layers.
+ #
+ # TODO The presentation of this information may need more
+ # adjustment. The bit positions need not translate to known
+ # device addresses. Bits need not even belong to a single
+ # device. Participants and their location in the DIO pattern
+ # is configurable. Leave the interpretation to upper layers.
+ bits = [i for i, b in enumerate(dio) if b]
+ bits_text = ' '.join(['{}'.format(i + 1) for i in bits])
+ dios = ['DIO{}'.format(i + 1) for i in bits]
+ dios_text = ' '.join(dios or ['-'])
+ text = [
+ 'PPOLL {}'.format(dios_text),
+ 'PP {}'.format(bits_text),
+ 'PP',
+ ]
+ self.emit_data_ann(ss, es, ANN_PP, text)
+ self.putpy(ss, es, 'PPOLL', None, bits)
+ # Cease collecting DIO state.
+ return 'leave'
+ if decoder_in_pp:
+ # Keep collecting DIO state for each individual sample in
+ # the PP phase. Logically OR all DIO values that were seen.
+ # This increases robustness for low oversampling captures,
+ # where DIO may no longer be asserted when ATN/EOI deassert,
+ # and DIO was not asserted yet when ATN/EOI start asserting.
+ if dio is None:
+ dio = []
+ if len(dio) > len(self.dio_pp):
+ self.dio_pp.extend([ 0, ] * (len(dio) - len(self.dio_pp)))
+ for i, b in enumerate(dio):
+ self.dio_pp[i] |= b
+ return 'keep'
+ return 'idle'
+
def handle_ifc_change(self, ifc):
# Track IFC line for parallel input.
# Assertion of IFC de-selects all talkers and listeners.
upd_iec = False,
py_type = None
py_peers = False
+ if self.options['atn_parity'] == 'yes':
+ par = 1 if b & 0x80 else 0
+ b &= ~0x80
+ ones = bin(b).count('1') + par
+ if ones % 2:
+ warn_texts = ['Command parity error', 'parity', 'PAR']
+ self.emit_warn_ann(self.ss_raw, self.es_raw, warn_texts)
is_cmd, is_unl, is_unt = _is_command(b)
laddr = _is_listen_addr(b)
taddr = _is_talk_addr(b)
# low signal levels, i.e. won't include the initial falling edge.
# Scan for ATN/EOI edges as well (including the trick which works
# around initial pin state).
+ #
+ # Use efficient edge based wait conditions for most activities,
+ # though some phases may require individual inspection of each
+ # sample (think parallel poll in combination with slow sampling).
+ #
# Map low-active physical transport lines to positive logic here,
# to simplify logical inspection/decoding of communicated data,
# and to avoid redundancy and inconsistency in later code paths.
if has_ifc:
idx_ifc = len(waitcond)
waitcond.append({PIN_IFC: 'l'})
+ idx_pp_check = None
+ def add_data_cond(conds):
+ idx = len(conds)
+ conds.append({'skip': 1})
+ return idx
+ def del_data_cond(conds, idx):
+ conds.pop(idx)
+ return None
while True:
pins = self.wait(waitcond)
pins = self.invert_pins(pins)
# captures, many edges fall onto the same sample number. So
# we process active edges of flags early (before processing
# data bits), and inactive edges late (after data got processed).
+ want_pp_check = False
if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 1:
self.handle_ifc_change(pins[PIN_IFC])
if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 1:
self.handle_eoi_change(pins[PIN_EOI])
+ want_pp_check = True
if self.matched[idx_atn] and pins[PIN_ATN] == 1:
self.handle_atn_change(pins[PIN_ATN])
+ want_pp_check = True
+ if want_pp_check and not idx_pp_check:
+ pp = self.check_pp()
+ if pp in ('enter',):
+ idx_pp_check = add_data_cond(waitcond)
if self.matched[idx_dav]:
self.handle_dav_change(pins[PIN_DAV], pins[PIN_DIO1:PIN_DIO8 + 1])
+ if idx_pp_check:
+ pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
+ want_pp_check = False
if self.matched[idx_atn] and pins[PIN_ATN] == 0:
self.handle_atn_change(pins[PIN_ATN])
+ want_pp_check = True
if idx_eoi is not None and self.matched[idx_eoi] and pins[PIN_EOI] == 0:
self.handle_eoi_change(pins[PIN_EOI])
+ want_pp_check = True
+ if idx_pp_check is not None and want_pp_check:
+ pp = self.check_pp(pins[PIN_DIO1:PIN_DIO8 + 1])
+ if pp in ('leave',) and idx_pp_check is not None:
+ idx_pp_check = del_data_cond(waitcond, idx_pp_check)
if idx_ifc is not None and self.matched[idx_ifc] and pins[PIN_IFC] == 0:
self.handle_ifc_change(pins[PIN_IFC])
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2019 Rene Staffen
+## Copyright (C) 2020-2021 Gerhard Sittig <gerhard.sittig@gmx.net>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
Lookup the C library's API routines. Declare their prototypes.
'''
- if not self._lib:
- return False
-
self._lib.irmp_get_sample_rate.restype = ctypes.c_uint32
self._lib.irmp_get_sample_rate.argtypes = []
+ self._lib.irmp_instance_alloc.restype = ctypes.c_void_p
+ self._lib.irmp_instance_alloc.argtypes = []
+
+ self._lib.irmp_instance_free.restype = None
+ self._lib.irmp_instance_free.argtypes = [ ctypes.c_void_p, ]
+
+ self._lib.irmp_instance_id.restype = ctypes.c_size_t
+ self._lib.irmp_instance_id.argtypes = [ ctypes.c_void_p, ]
+
+ self._lib.irmp_instance_lock.restype = ctypes.c_int
+ self._lib.irmp_instance_lock.argtypes = [ ctypes.c_void_p, ctypes.c_int, ]
+
+ self._lib.irmp_instance_unlock.restype = None
+ self._lib.irmp_instance_unlock.argtypes = [ ctypes.c_void_p, ]
+
self._lib.irmp_reset_state.restype = None
self._lib.irmp_reset_state.argtypes = []
# Create a result buffer that's local to the library instance.
self._data = self.ResultData()
+ self._inst = None
return True
Create a library instance.
'''
- # Only create a working instance for the first invocation.
- # Degrade all other instances, make them fail "late" during
- # execution, so that users will see the errors.
- self._lib = None
- self._data = None
- if IrmpLibrary.__usable_instance is None:
- filename = self._library_filename()
- self._lib = ctypes.cdll.LoadLibrary(filename)
- self._library_setup_api()
- IrmpLibrary.__usable_instance = self
+ filename = self._library_filename()
+ self._lib = ctypes.cdll.LoadLibrary(filename)
+ self._library_setup_api()
+
+ def __del__(self):
+ '''
+ Release a disposed library instance.
+ '''
+
+ if self._inst:
+ self._lib.irmp_instance_free(self._inst)
+ self._inst = None
+
+ def __enter__(self):
+ '''
+ Enter a context (lock management).
+ '''
+
+ if self._inst is None:
+ self._inst = self._lib.irmp_instance_alloc()
+ self._lib.irmp_instance_lock(self._inst, 1)
+ return self
+
+ def __exit__(self, extype, exvalue, trace):
+ '''
+ Leave a context (lock management).
+ '''
+
+ self._lib.irmp_instance_unlock(self._inst)
+ return False
+
+ def client_id(self):
+ return self._lib.irmp_instance_id(self._inst)
def get_sample_rate(self):
- if not self._lib:
- return None
return self._lib.irmp_get_sample_rate()
def reset_state(self):
- if not self._lib:
- return None
self._lib.irmp_reset_state()
def add_one_sample(self, level):
- if not self._lib:
- raise Exception("IRMP library limited to a single instance.")
if not self._lib.irmp_add_one_sample(int(level)):
return False
self._lib.irmp_get_result_data(ctypes.byref(self._data))
##
## Copyright (C) 2014 Gump Yang <gump.yang@gmail.com>
## Copyright (C) 2019 Rene Staffen
+## Copyright (C) 2020-2021 Gerhard Sittig <gerhard.sittig@gmx.net>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
self.reset()
def reset(self):
- self.want_reset = True
+ pass
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
except Exception as e:
txt = e.args[0]
raise LibraryError(txt)
- if self.irmp:
- self.lib_rate = self.irmp.get_sample_rate()
- if not self.irmp or not self.lib_rate:
- raise LibraryError('Cannot access IRMP library. One instance limit exceeded?')
+ if not self.irmp:
+ raise LibraryError('Cannot access IRMP library.')
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
- if self.samplerate % self.lib_rate:
- raise SamplerateError('Capture samplerate must be multiple of library samplerate ({})'.format(self.lib_rate))
- self.rate_factor = int(self.samplerate / self.lib_rate)
- if self.want_reset:
- self.irmp.reset_state()
- self.want_reset = False
+ lib_rate = self.irmp.get_sample_rate()
+ if not lib_rate:
+ raise LibraryError('Cannot determine IRMP library\'s samplerate.')
+ if self.samplerate % lib_rate:
+ raise SamplerateError('Capture samplerate must be multiple of library samplerate ({})'.format(lib_rate))
+
+ self.rate_factor = int(self.samplerate / lib_rate)
+ active = 0 if self.options['polarity'] == 'active-low' else 1
- self.active = 0 if self.options['polarity'] == 'active-low' else 1
ir, = self.wait()
- while True:
- if self.active == 1:
- ir = 1 - ir
- if self.irmp.add_one_sample(ir):
- data = self.irmp.get_result_data()
- self.putframe(data)
- ir, = self.wait([{'skip': self.rate_factor}])
+ with self.irmp:
+ self.irmp.reset_state()
+ while True:
+ if active == 1:
+ ir = 1 - ir
+ if self.irmp.add_one_sample(ir):
+ data = self.irmp.get_result_data()
+ self.putframe(data)
+ ir, = self.wait([{'skip': self.rate_factor}])
def reset(self):
self.variant = None
- self.ss_block = None
- self.es_block = None
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.variant = self.options['variant']
- def putx(self, data):
- self.put(self.ss_block, self.es_block, self.out_ann, data)
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, [text]])
- def handle_data(self, value):
- if value == 0xFF:
- self.putx([1, ['No button is pressed']])
- return
+ def handle_data(self, ss, es, value):
+ if value == 0xff:
+ self.putg(ss, es, 1, 'No button is pressed')
+ return
if value == 0x00:
- self.putx([2, ['Gamepad is not connected']])
- return
+ self.putg(ss, es, 2, 'Gamepad is not connected')
+ return
buttons = [
'A',
'North',
'South',
'West',
- 'East'
+ 'East',
]
- bits = format(value, '08b')
- button_str = ''
-
- for b in enumerate(bits):
- button_index = b[0]
- button_is_pressed = b[1] == '0'
-
- if button_is_pressed:
- if button_str != '':
- button_str += ' + '
- button_str += buttons[button_index]
-
- self.putx([0, ['%s' % button_str]])
+ bits = '{:08b}'.format(value)
+ text = [buttons[i] for i, b in enumerate(bits) if b == '0']
+ text = ' + '.join(text)
+ self.putg(ss, es, 0, text)
def decode(self, ss, es, data):
- ptype, mosi, miso = data
- self.ss_block, self.es_block = ss, es
-
- if ptype != 'DATA':
- return
-
- self.handle_data(miso)
+ ptype, _, _ = data
+ if ptype == 'DATA':
+ _, _, miso = data
+ self.handle_data(ss, es, miso)
+ return
('words', 'Words', (Ann.WORD,)),
('warnings', 'Warnings', (Ann.WARN,)),
)
+ binary = (
+ ('binary', 'Binary'),
+ )
def __init__(self):
self.reset()
def start(self):
self.out_python = self.register(srd.OUTPUT_PYTHON)
+ self.out_binary = self.register(srd.OUTPUT_BINARY)
self.out_ann = self.register(srd.OUTPUT_ANN)
def putg(self, ss, es, ann, txts):
def putpy(self, ss, es, ann, data):
self.put(ss, es, self.out_python, [ann, data])
+ def putbin(self, ss, es, ann_class, data):
+ self.put(ss, es, self.out_binary, [ann_class, data])
+
def flush_word(self, bus_width):
if not self.word_items:
return
txts = [self.fmt_item.format(data)]
self.putg(ss, es, Ann.ITEM, txts)
self.putpy(ss, es, 'ITEM', (data, bus_width))
+ self.putbin(ss, es, 0, data.to_bytes(1, byteorder='big'))
# Optionally queue the currently seen item.
if item is not None:
# This results in robust operation for low-oversampled input.
in_reset = False
while True:
- pins = self.wait(conds)
+ try:
+ pins = self.wait(conds)
+ except EOFError as e:
+ break
clock_edge = cond_idx_clock is not None and self.matched[cond_idx_clock]
data_edge = cond_idx_data_0 is not None and [idx for idx in range(cond_idx_data_0, cond_idx_data_N) if self.matched[idx]]
reset_edge = cond_idx_reset is not None and self.matched[cond_idx_reset]
data_bits = data_bits[:num_item_bits]
item = bitpack(data_bits)
self.handle_bits(self.samplenum, item, num_item_bits)
+
+ self.handle_bits(self.samplenum, None, num_item_bits)
+ # TODO Determine whether a WARN annotation needs to get emitted.
+ # The decoder has not seen the end of the last accumulated item.
+ # Instead it just ran out of input data.
# TODO: Other I²C functions: general call / reset address, device ID address.
+def logic_channels(num_channels):
+ l = []
+ for i in range(num_channels):
+ l.append(tuple(['p%d' % i, 'P%d' % i]))
+ return tuple(l)
+
class Decoder(srd.Decoder):
api_version = 3
id = 'pca9571'
('value', 'Register value'),
('warning', 'Warning'),
)
+ logic_output_channels = logic_channels(NUM_OUTPUT_CHANNELS)
annotation_rows = (
('regs', 'Registers', (0, 1)),
('warnings', 'Warnings', (2,)),
def reset(self):
self.state = 'IDLE'
self.last_write = 0xFF # Chip port default state is high.
+ self.last_write_es = 0
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_logic = self.register(srd.OUTPUT_LOGIC)
+
+ def flush(self):
+ self.put_logic_states()
def putx(self, data):
self.put(self.ss, self.es, self.out_ann, data)
+ def put_logic_states(self):
+ if (self.es > self.last_write_es):
+ data = bytes([self.last_write])
+ self.put(self.last_write_es, self.es, self.out_logic, [0, data])
+ self.last_write_es = self.es
+
def handle_io(self, b):
if self.state == 'READ DATA':
operation = ['Outputs read', 'R']
'(%02X) are different' % self.last_write]])
else:
operation = ['Outputs set', 'W']
+ self.put_logic_states()
self.last_write = b
+
self.putx([1, [operation[0] + ': %02X' % b,
operation[1] + ': %02X' % b]])
transmitted over whatever frequency and modulation that the designer
chooses. These devices operate at a number of frequencies including 433MHz.
-This PD should also decode the HX2262 and SC5262 which are equivalents.
+This PD should also decode the HX2262 and SC5262 which are equivalents, as
+well as the 2272 variants of these ICs. Support for the EV1527, RT1527, FP1527
+and HS1527 is also present.
-The decoder also contains some additional decoding for a Maplin L95AR
-remote control and will turn the received signal into which button was
-pressed and what the address code DIP switches are set to.
+The decoder can additionaly decoding the Maplin L95AR remote control and will
+turn the received signal into which button was pressed and what the address
+code DIP switches are set to.
+Please contact the sigrok team if you want decoding for further remote
+controls to be added.
'''
from .pd import Decoder
bitvals = ('0', '1', 'f', 'U')
-def decode_bit(edges):
- # Datasheet says long pulse is 3 times short pulse.
- lmin = 2 # long min multiplier
- lmax = 5 # long max multiplier
- eqmin = 0.5 # equal min multiplier
- eqmax = 1.5 # equal max multiplier
- if ( # 0 -___-___
- (edges[1] >= edges[0] * lmin and edges[1] <= edges[0] * lmax) and
- (edges[2] >= edges[0] * eqmin and edges[2] <= edges[0] * eqmax) and
- (edges[3] >= edges[0] * lmin and edges[3] <= edges[0] * lmax)):
- return '0'
- elif ( # 1 ---_---_
- (edges[0] >= edges[1] * lmin and edges[0] <= edges[1] * lmax) and
- (edges[0] >= edges[2] * eqmin and edges[0] <= edges[2] * eqmax) and
- (edges[0] >= edges[3] * lmin and edges[0] <= edges[3] * lmax)):
- return '1'
- elif ( # float ---_-___
- (edges[1] >= edges[0] * lmin and edges[1] <= edges[0] * lmax) and
- (edges[2] >= edges[0] * lmin and edges[2] <= edges[0]* lmax) and
- (edges[3] >= edges[0] * eqmin and edges[3] <= edges[0] * eqmax)):
- return 'f'
- else:
- return 'U'
-
-def pinlabels(bit_count):
- if bit_count <= 6:
- return 'A%i' % (bit_count - 1)
- else:
- return 'A%i/D%i' % (bit_count - 1, 12 - bit_count)
+def decode_bit(edges, pulses_per_bit):
+ if pulses_per_bit == 2:
+ # Datasheet says long pulse is 3 times short pulse.
+ lmin = 1.5 # long min multiplier
+ lmax = 5 # long max multiplier
+ if (edges[1] >= edges[0] * lmin and edges[1] <= edges[0] * lmax): # 0 -___
+ return '0'
+ elif (edges[0] >= edges[1] * lmin and edges[0] <= edges[1] * lmax): # 1 ---_
+ return '1'
+ # No float type for this line encoding
+ else:
+ return 'U'
+
+ if pulses_per_bit == 4:
+ # Datasheet says long pulse is 3 times short pulse.
+ lmin = 2 # long min multiplier
+ lmax = 5 # long max multiplier
+ eqmin = 0.5 # equal min multiplier
+ eqmax = 1.5 # equal max multiplier
+ if ( # 0 -___-___
+ (edges[1] >= edges[0] * lmin and edges[1] <= edges[0] * lmax) and
+ (edges[2] >= edges[0] * eqmin and edges[2] <= edges[0] * eqmax) and
+ (edges[3] >= edges[0] * lmin and edges[3] <= edges[0] * lmax)):
+ return '0'
+ elif ( # 1 ---_---_
+ (edges[0] >= edges[1] * lmin and edges[0] <= edges[1] * lmax) and
+ (edges[0] >= edges[2] * eqmin and edges[0] <= edges[2] * eqmax) and
+ (edges[0] >= edges[3] * lmin and edges[0] <= edges[3] * lmax)):
+ return '1'
+ elif ( # float ---_-___
+ (edges[1] >= edges[0] * lmin and edges[1] <= edges[0] * lmax) and
+ (edges[2] >= edges[0] * lmin and edges[2] <= edges[0]* lmax) and
+ (edges[3] >= edges[0] * eqmin and edges[3] <= edges[0] * eqmax)):
+ return 'f'
+ else:
+ return 'U'
+
+def pinlabels(bit_count, packet_bit_count):
+ if packet_bit_count == 12:
+ if bit_count <= 6:
+ return 'A%i' % (bit_count - 1)
+ else:
+ return 'A%i/D%i' % (bit_count - 1, 12 - bit_count)
+
+ if packet_bit_count == 24:
+ if bit_count <= 20:
+ return 'A%i' % (bit_count - 1)
+ else:
+ return 'D%i' % (bit_count - 21)
def decode_model(model, bits):
if model == 'maplin_l95ar':
- address = 'Addr' # Address pins A0 to A5
+ address = 'Addr' # Address bits A0 to A5
for i in range(0, 6):
address += ' %i:' % (i + 1) + ('on' if bits[i][0] == '0' else 'off')
button = 'Button'
- # Button pins A6/D5 to A11/D0
+ # Button bits A6/D5 to A11/D0
if bits[6][0] == '0' and bits[11][0] == '0':
button += ' A ON/OFF'
elif bits[7][0] == '0' and bits[11][0] == '0':
button += ' D ON/OFF'
else:
button += ' Unknown'
- return ['%s' % address, bits[0][1], bits[5][2], \
- '%s' % button, bits[6][1], bits[11][2]]
+ return [address, bits[0][1], bits[5][2], \
+ button, bits[6][1], bits[11][2]]
+
+ if model == 'xx1527':
+ addr = 0
+ addr_valid = 1
+ for i in range(0, 20):
+ if bits[i][0] != 'U':
+ addr += int(bits[i][0]) * 2 ** i
+ else:
+ addr_valid = 0
+
+ if addr_valid == 1:
+ address = 'Address 0x%X %X %X' % (addr & 0xFF, (addr >> 8) & 0xFF, addr >> 16)
+ else:
+ address = 'Invalid address as not all bits are 0 or 1'
+
+ output = ' K0 = ' + bits[20][0] + ','
+ output += ' K1 = ' + bits[21][0] + ','
+ output += ' K2 = ' + bits[22][0] + ','
+ output += ' K3 = ' + bits[23][0]
+ return [address, bits[0][1], bits[19][2], \
+ output, bits[20][1], bits[23][2]]
class Decoder(srd.Decoder):
api_version = 3
id = 'rc_encode'
name = 'RC encode'
longname = 'Remote control encoder'
- desc = 'PT2262/HX2262/SC5262 remote control encoder protocol.'
+ desc = 'PT22x2/HX22x2/SC52x2 and xx1527 remote control encoder protocol.'
license = 'gplv2+'
inputs = ['logic']
outputs = []
('code-words', 'Code words', (6, 7)),
)
options = (
- {'id': 'remote', 'desc': 'Remote', 'default': 'none',
- 'values': ('none', 'maplin_l95ar')},
+ {'id': 'linecoding', 'desc': 'Encoding', 'default': 'SC52x2/HX22x2', 'values': ('SC52x2/HX22x2', 'xx1527')},
+ {'id': 'remote', 'desc': 'Remote', 'default': 'none', 'values': ('none', 'maplin_l95ar')},
)
def __init__(self):
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
self.model = self.options['remote']
+ if self.options['linecoding'] == 'xx1527':
+ self.pulses_per_bit = 2
+ self.packet_bits = 24
+ self.model = 'xx1527'
+ else:
+ self.pulses_per_bit = 4 # Each bit is repeated
+ self.packet_bits = 12
def putx(self, data):
self.put(self.ss, self.es, self.out_ann, data)
self.ss = self.samplenum
continue
- if self.bit_count < 12: # Decode A0 to A11.
+ if self.bit_count < self.packet_bits: # Decode A0 to A11 / A23.
self.bit_count += 1
- for i in range(0, 4): # Get four pulses for each bit.
+ for i in range(0, self.pulses_per_bit):
if i > 0:
- pin = self.wait({0: 'e'}) # Get next 3 edges.
+ pin = self.wait({0: 'e'}) # Get next edges if we need more.
samples = self.samplenum - self.samplenumber_last
self.pulses.append(samples) # Save the pulse width.
self.samplenumber_last = self.samplenum
self.es = self.samplenum
- self.bits.append([decode_bit(self.pulses), self.ss,
+ self.bits.append([decode_bit(self.pulses, self.pulses_per_bit), self.ss,
self.es]) # Save states and times.
- idx = bitvals.index(decode_bit(self.pulses))
- self.putx([idx, [decode_bit(self.pulses)]]) # Write decoded bit.
- self.putx([5, [pinlabels(self.bit_count)]]) # Write pin labels.
+ idx = bitvals.index(decode_bit(self.pulses, self.pulses_per_bit))
+ self.putx([idx, [decode_bit(self.pulses, self.pulses_per_bit)]]) # Write decoded bit.
+ self.putx([5, [pinlabels(self.bit_count, self.packet_bits)]]) # Write pin labels.
self.pulses = []
self.ss = self.samplenum
else:
import sigrokdecode as srd
+( ANN_RGB, ) = range(1)
+
class Decoder(srd.Decoder):
api_version = 3
id = 'rgb_led_spi'
self.reset()
def reset(self):
- self.ss_cmd, self.es_cmd = 0, 0
+ self.ss_cmd = None
self.mosi_bytes = []
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
- def putx(self, data):
- self.put(self.ss_cmd, self.es_cmd, self.out_ann, data)
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
def decode(self, ss, es, data):
- ptype, mosi, miso = data
+ ptype = data[0]
- # Only care about data packets.
+ # Grab the payload of three DATA packets. These hold the
+ # RGB values (in this very order).
if ptype != 'DATA':
return
- self.ss, self.es = ss, es
-
- if len(self.mosi_bytes) == 0:
+ _, mosi, _ = data
+ if not self.mosi_bytes:
self.ss_cmd = ss
self.mosi_bytes.append(mosi)
-
- # RGB value == 3 bytes
- if len(self.mosi_bytes) != 3:
+ if len(self.mosi_bytes) < 3:
return
- red, green, blue = self.mosi_bytes
+ # Emit annotations. Invalidate accumulated details as soon as
+ # they were processed, to prepare the next iteration.
+ ss_cmd, es_cmd = self.ss_cmd, es
+ self.ss_cmd = None
+ red, green, blue = self.mosi_bytes[:3]
+ self.mosi_bytes.clear()
rgb_value = int(red) << 16 | int(green) << 8 | int(blue)
-
- self.es_cmd = es
- self.putx([0, ['#%.6x' % rgb_value]])
- self.mosi_bytes = []
+ self.putg(ss_cmd, es_cmd, ANN_RGB, ['#{:06x}'.format(rgb_value)])
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
+# Implementor's notes on the wire format:
+# - World Semi vendor, (Adafruit copy of the) datasheet
+# https://cdn-shop.adafruit.com/datasheets/WS2812.pdf
+# - reset pulse is 50us (or more) of low pin level
+# - 24bits per WS281x item, 3x 8bits, MSB first, GRB sequence,
+# cascaded WS281x items, all "excess bits" are passed through
+# - bit time starts with high period, continues with low period,
+# high to low periods' ratio determines bit value, datasheet
+# mentions 0.35us/0.8us for value 0, 0.7us/0.6us for value 1
+# (huge 150ns tolerances, un-even 0/1 value length, hmm)
+# - experience suggests the timing "is variable", rough estimation
+# often is good enough, microcontroller firmware got away with
+# four quanta per bit time, or even with three quanta (30%/60%),
+# Adafruit learn article suggests 1.2us total and 0.4/0.8 or
+# 0.8/0.4 high/low parts, four quanta are easier to handle when
+# the bit stream is sent via SPI to avoid MCU bit banging and its
+# inaccurate timing (when interrupts are used in the firmware)
+# - RGBW datasheet (Adafruit copy) for SK6812
+# https://cdn-shop.adafruit.com/product-files/2757/p2757_SK6812RGBW_REV01.pdf
+# also 1.2us total, shared across 0.3/0.9 for 0, 0.6/0.6 for 1,
+# 80us reset pulse, R8/G8/B8/W8 format per 32bits
+# - WS2815, RGB LED, uses GRB wire format, 280us RESET pulse width
+# - more vendors and models available and in popular use,
+# suggests "one third" or "two thirds" ratio would be most robust,
+# sample "a little before" the bit half? reset pulse width may need
+# to become an option? matrices and/or fast refresh environments
+# may want to experiment with back to back pixel streams
+
import sigrokdecode as srd
-from functools import reduce
+from common.srdhelper import bitpack_msb
class SamplerateError(Exception):
pass
+class DecoderError(Exception):
+ pass
+
+(
+ ANN_BIT, ANN_RESET, ANN_RGB,
+ ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,
+) = range(7)
+
class Decoder(srd.Decoder):
api_version = 3
id = 'rgb_led_ws281x'
('bit', 'Bit'),
('reset', 'RESET'),
('rgb', 'RGB'),
+ ('r', 'R'),
+ ('g', 'G'),
+ ('b', 'B'),
+ ('w', 'W'),
)
annotation_rows = (
- ('bits', 'Bits', (0, 1)),
- ('rgb-vals', 'RGB values', (2,)),
+ ('bits', 'Bits', (ANN_BIT, ANN_RESET,)),
+ ('rgb-comps', 'RGB components', (ANN_COMP_R, ANN_COMP_G, ANN_COMP_B, ANN_COMP_W,)),
+ ('rgb-vals', 'RGB values', (ANN_RGB,)),
)
options = (
- {'id': 'type', 'desc': 'RGB or RGBW', 'default': 'RGB',
- 'values': ('RGB', 'RGBW')},
+ {'id': 'wireorder', 'desc': 'colour components order (wire)',
+ 'default': 'GRB',
+ 'values': ('BGR', 'BRG', 'GBR', 'GRB', 'RBG', 'RGB', 'RWBG', 'RGBW')},
+ {'id': 'textorder', 'desc': 'components output order (text)',
+ 'default': 'RGB[W]', 'values': ('wire', 'RGB[W]', 'RGB', 'RGBW', 'RGWB')},
)
def __init__(self):
def reset(self):
self.samplerate = None
- self.oldpin = None
- self.ss_packet = None
- self.ss = None
- self.es = None
self.bits = []
- self.inreset = False
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
- def handle_bits(self, samplenum):
- if self.options['type'] == 'RGB':
- if len(self.bits) == 24:
- grb = reduce(lambda a, b: (a << 1) | b, self.bits)
- rgb = (grb & 0xff0000) >> 8 | (grb & 0x00ff00) << 8 | (grb & 0x0000ff)
- self.put(self.ss_packet, samplenum, self.out_ann,
- [2, ['#%06x' % rgb]])
- self.bits = []
- self.ss_packet = None
+ def putg(self, ss, es, cls, text):
+ self.put(ss, es, self.out_ann, [cls, text])
+
+ def handle_bits(self):
+ if len(self.bits) < self.need_bits:
+ return
+ ss_packet, es_packet = self.bits[0][1], self.bits[-1][2]
+ r, g, b, w = 0, 0, 0, None
+ comps = []
+ for i, c in enumerate(self.wireformat):
+ first_idx, after_idx = 8 * i, 8 * i + 8
+ comp_bits = self.bits[first_idx:after_idx]
+ comp_ss, comp_es = comp_bits[0][1], comp_bits[-1][2]
+ comp_value = bitpack_msb(comp_bits, 0)
+ comp_text = '{:02x}'.format(comp_value)
+ comp_ann = {
+ 'r': ANN_COMP_R, 'g': ANN_COMP_G,
+ 'b': ANN_COMP_B, 'w': ANN_COMP_W,
+ }.get(c.lower(), None)
+ comp_item = (comp_ss, comp_es, comp_ann, comp_value, comp_text)
+ comps.append(comp_item)
+ if c.lower() == 'r':
+ r = comp_value
+ elif c.lower() == 'g':
+ g = comp_value
+ elif c.lower() == 'b':
+ b = comp_value
+ elif c.lower() == 'w':
+ w = comp_value
+ wt = '' if w is None else '{:02x}'.format(w)
+ if self.textformat == 'wire':
+ rgb_text = '#' + ''.join([c[-1] for c in comps])
else:
- if len(self.bits) == 32:
- grb = reduce(lambda a, b: (a << 1) | b, self.bits)
- rgb = (grb & 0xff0000) >> 8 | (grb & 0x00ff00) << 8 | (grb & 0xff0000ff)
- self.put(self.ss_packet, samplenum, self.out_ann,
- [2, ['#%08x' % rgb]])
- self.bits = []
- self.ss_packet = None
+ rgb_text = self.textformat.format(r = r, g = g, b = b, w = w, wt = wt)
+ for ss_comp, es_comp, cls_comp, value_comp, text_comp in comps:
+ self.putg(ss_comp, es_comp, cls_comp, [text_comp])
+ if rgb_text:
+ self.putg(ss_packet, es_packet, ANN_RGB, [rgb_text])
+ self.bits.clear()
+
+ def handle_bit(self, ss, es, value, ann_late = False):
+ if not ann_late:
+ text = ['{:d}'.format(value)]
+ self.putg(ss, es, ANN_BIT, text)
+ item = (value, ss, es)
+ self.bits.append(item)
+ self.handle_bits()
+ if ann_late:
+ text = ['{:d}'.format(value)]
+ self.putg(ss, es, ANN_BIT, text)
def decode(self):
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
- while True:
- # TODO: Come up with more appropriate self.wait() conditions.
- (pin,) = self.wait()
-
- if self.oldpin is None:
- self.oldpin = pin
- continue
-
- # Check RESET condition (manufacturer recommends 50 usec minimal,
- # but real minimum is ~10 usec).
- if not self.inreset and not pin and self.es is not None and \
- self.ss is not None and \
- (self.samplenum - self.es) / self.samplerate > 50e-6:
-
- # Decode last bit value.
- tH = (self.es - self.ss) / self.samplerate
- bit_ = True if tH >= 625e-9 else False
-
- self.bits.append(bit_)
- self.handle_bits(self.es)
-
- self.put(self.ss, self.es, self.out_ann, [0, ['%d' % bit_]])
- self.put(self.es, self.samplenum, self.out_ann,
- [1, ['RESET', 'RST', 'R']])
-
- self.inreset = True
- self.bits = []
- self.ss_packet = None
- self.ss = None
-
- if not self.oldpin and pin:
- # Rising edge.
- if self.ss and self.es:
- period = self.samplenum - self.ss
- duty = self.es - self.ss
- # Ideal duty for T0H: 33%, T1H: 66%.
- bit_ = (duty / period) > 0.5
+ # Preprocess options here, to simplify logic which executes
+ # much later in loops while settings have the same values.
+ wireorder = self.options['wireorder'].lower()
+ self.wireformat = [c for c in wireorder if c in 'rgbw']
+ self.need_bits = len(self.wireformat) * 8
+ textorder = self.options['textorder'].lower()
+ if textorder == 'wire':
+ self.textformat = 'wire'
+ elif textorder == 'rgb[w]':
+ self.textformat = '#{r:02x}{g:02x}{b:02x}{wt:s}'
+ else:
+ self.textformat = {
+ # "Obvious" permutations of R/G/B.
+ 'bgr': '#{b:02x}{g:02x}{r:02x}',
+ 'brg': '#{b:02x}{r:02x}{g:02x}',
+ 'gbr': '#{g:02x}{b:02x}{r:02x}',
+ 'grb': '#{g:02x}{r:02x}{b:02x}',
+ 'rbg': '#{r:02x}{b:02x}{g:02x}',
+ 'rgb': '#{r:02x}{g:02x}{b:02x}',
+ # RGB plus White. Only one of them useful?
+ 'rgbw': '#{r:02x}{g:02x}{b:02x}{w:02x}',
+ # Weird RGBW permutation for compatibility to test case.
+ # Neither used RGBW nor the 'wire' order. Obsolete now?
+ 'rgwb': '#{r:02x}{g:02x}{w:02x}{b:02x}',
+ }.get(textorder, None)
+ if self.textformat is None:
+ raise DecoderError('Unsupported text output format.')
- self.put(self.ss, self.samplenum, self.out_ann,
- [0, ['%d' % bit_]])
+ # Either check for edges which communicate bit values, or for
+ # long periods of idle level which represent a reset pulse.
+ # Track the left-most, right-most, and inner edge positions of
+ # a bit. The positive period's width determines the bit's value.
+ # Initially synchronize to the input stream by searching for a
+ # low period, which preceeds a data bit or starts a reset pulse.
+ # Don't annotate the very first reset pulse, but process it. We
+ # may not see the right-most edge of a data bit when reset is
+ # adjacent to that bit time.
+ cond_bit_starts = {0: 'r'}
+ cond_inbit_edge = {0: 'f'}
+ samples_625ns = int(self.samplerate * 625e-9)
+ samples_50us = round(self.samplerate * 50e-6)
+ cond_reset_pulse = {'skip': samples_50us + 1}
+ conds = [cond_bit_starts, cond_inbit_edge, cond_reset_pulse]
+ ss_bit, inv_bit, es_bit = None, None, None
+ pin, = self.wait({0: 'l'})
+ inv_bit = self.samplenum
+ check_reset = False
+ while True:
+ pin, = self.wait(conds)
- self.bits.append(bit_)
- self.handle_bits(self.samplenum)
+ # Check RESET condition. Manufacturers may disagree on the
+ # minimal pulse width. 50us are recommended in datasheets,
+ # experiments suggest the limit is around 10us.
+ # When the RESET pulse is adjacent to the low phase of the
+ # last bit time, we have no appropriate condition for the
+ # bit time's end location. That's why this BIT's annotation
+ # is shorter (only spans the high phase), and the RESET
+ # annotation immediately follows (spans from the falling edge
+ # to the end of the minimum RESET pulse width).
+ if check_reset and self.matched[2]:
+ es_bit = inv_bit
+ ss_rst, es_rst = inv_bit, self.samplenum
- if self.ss_packet is None:
- self.ss_packet = self.samplenum
+ if ss_bit and inv_bit and es_bit:
+ # Decode last bit value. Use the last processed bit's
+ # width for comparison when available. Fallback to an
+ # arbitrary threshold otherwise (which can result in
+ # false detection of value 1 for those captures where
+ # high and low pulses are of similar width).
+ duty = inv_bit - ss_bit
+ thres = samples_625ns
+ if self.bits:
+ period = self.bits[-1][2] - self.bits[-1][1]
+ thres = period * 0.5
+ bit_value = 1 if duty >= thres else 0
+ self.handle_bit(ss_bit, inv_bit, bit_value, True)
- self.ss = self.samplenum
+ if ss_rst and es_rst:
+ text = ['RESET', 'RST', 'R']
+ self.putg(ss_rst, es_rst, ANN_RESET, text)
+ check_reset = False
- elif self.oldpin and not pin:
- # Falling edge.
- self.inreset = False
- self.es = self.samplenum
+ self.bits.clear()
+ ss_bit, inv_bit, es_bit = None, None, None
- self.oldpin = pin
+ # Rising edge starts a bit time. Falling edge ends its high
+ # period. Get the previous bit's duty cycle and thus its
+ # bit value when the next bit starts.
+ if self.matched[0]: # and pin:
+ check_reset = False
+ if ss_bit and inv_bit:
+ # Got a previous bit? Handle it.
+ es_bit = self.samplenum
+ period = es_bit - ss_bit
+ duty = inv_bit - ss_bit
+ # Ideal duty for T0H: 33%, T1H: 66%.
+ bit_value = 1 if (duty / period) > 0.5 else 0
+ self.handle_bit(ss_bit, es_bit, bit_value)
+ ss_bit, inv_bit, es_bit = self.samplenum, None, None
+ if self.matched[1]: # and not pin:
+ check_reset = True
+ inv_bit = self.samplenum
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2016 Anthony Symons <antus@pcmhacking.net>
+## Copyright (C) 2023 Gerhard Sittig <gerhard.sittig@gmx.net>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
##
import sigrokdecode as srd
+from common.srdhelper import bitpack_msb
+
+# VPW Timings. From the SAE J1850 1995 rev section 23.406 documentation.
+# Ideal, minimum and maximum tolerances.
+VPW_SOF = 200
+VPW_SOFL = 164
+VPW_SOFH = 245 # 240 by the spec, 245 so a 60us 4x sample will pass
+VPW_LONG = 128
+VPW_LONGL = 97
+VPW_LONGH = 170 # 164 by the spec but 170 for low sample rate tolerance.
+VPW_SHORT = 64
+VPW_SHORTL = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling
+VPW_SHORTH = 97
+VPW_IFS = 240
class SamplerateError(Exception):
pass
-def timeuf(t):
- return int (t * 1000.0 * 1000.0)
-
-class Ann:
- ANN_RAW, ANN_SOF, ANN_IFS, ANN_DATA, \
- ANN_PACKET = range(5)
+(
+ ANN_SOF, ANN_BIT, ANN_IFS, ANN_BYTE,
+ ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM,
+ ANN_M1_PID,
+ ANN_WARN,
+) = range(12)
class Decoder(srd.Decoder):
api_version = 3
{'id': 'data', 'name': 'Data', 'desc': 'Data line'},
)
annotations = (
- ('raw', 'Raw'),
('sof', 'SOF'),
+ ('bit', 'Bit'),
('ifs', 'EOF/IFS'),
+ ('byte', 'Byte'),
+ ('prio', 'Priority'),
+ ('dest', 'Destination'),
+ ('src', 'Source'),
+ ('mode', 'Mode'),
('data', 'Data'),
- ('packet', 'Packet'),
+ ('csum', 'Checksum'),
+ ('m1_pid', 'Pid'),
+ ('warn', 'Warning'),
)
annotation_rows = (
- ('raws', 'Raws', (Ann.ANN_RAW, Ann.ANN_SOF, Ann.ANN_IFS,)),
- ('bytes', 'Bytes', (Ann.ANN_DATA,)),
- ('packets', 'Packets', (Ann.ANN_PACKET,)),
+ ('bits', 'Bits', (ANN_SOF, ANN_BIT, ANN_IFS,)),
+ ('bytes', 'Bytes', (ANN_BYTE,)),
+ ('fields', 'Fields', (ANN_PRIO, ANN_DEST, ANN_SRC, ANN_MODE, ANN_DATA, ANN_CSUM,)),
+ ('values', 'Values', (ANN_M1_PID,)),
+ ('warns', 'Warnings', (ANN_WARN,)),
)
+ # TODO Add support for options? Polarity. Glitch length.
def __init__(self):
self.reset()
def reset(self):
- self.state = 'IDLE'
self.samplerate = None
- self.byte = 0 # the byte offset in the packet
- self.mode = 0 # for by packet decode
- self.data = 0 # the current byte
- self.datastart = 0 # sample number this byte started at
- self.csa = 0 # track the last byte seperately to retrospectively add the CS marker
- self.csb = 0
- self.count = 0 # which bit number we are up to
- self.active = 0 # which logic level is considered active
-
- # vpw timings. ideal, min and max tollerances.
- # From SAE J1850 1995 rev section 23.406
-
- self.sof = 200
- self.sofl = 164
- self.sofh = 245 # 240 by the spec, 245 so a 60us 4x sample will pass
- self.long = 128
- self.longl = 97
- self.longh = 170 # 164 by the spec but 170 for low sample rate tolerance.
- self.short = 64
- self.shortl = 24 # 35 by the spec, 24 to allow down to 6us as measured in practice for 4x @ 1mhz sampling
- self.shorth = 97
- self.ifs = 240
- self.spd = 1 # set to 4 when a 4x SOF is detected (VPW high speed frame)
+ self.active = 0 # Signal polarity. Needs to become an option?
+ self.bits = []
+ self.fields = {}
- def handle_bit(self, ss, es, b):
- self.data |= (b << 7-self.count) # MSB-first
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["%d" % b]])
- if self.count == 0:
- self.datastart = ss
- if self.count == 7:
- self.csa = self.datastart # for CS
- self.csb = self.samplenum # for CS
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_DATA, ["%02X" % self.data]])
- # add protocol parsing here
- if self.byte == 0:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Priority','Prio','P']])
- elif self.byte == 1:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Destination','Dest','D']])
- elif self.byte == 2:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Source','Src','S']])
- elif self.byte == 3:
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Mode','M']])
- self.mode = self.data
- elif self.mode == 1 and self.byte == 4: # mode 1 payload
- self.put(self.datastart, self.samplenum, self.out_ann, [Ann.ANN_PACKET, ['Pid','P']])
-
- # prepare for next byte
- self.count = -1
- self.data = 0
- self.byte = self.byte + 1 # track packet offset
- self.count = self.count + 1
+ def start(self):
+ self.out_ann = self.register(srd.OUTPUT_ANN)
def metadata(self, key, value):
if key == srd.SRD_CONF_SAMPLERATE:
self.samplerate = value
- def start(self):
- self.out_ann = self.register(srd.OUTPUT_ANN)
+ def putg(self, ss, es, cls, texts):
+ self.put(ss, es, self.out_ann, [cls, texts])
+
+ def invalidate_frame_details(self):
+ self.bits.clear()
+ self.fields.clear()
+
+ def handle_databytes(self, fields, data):
+ # TODO Deep inspection of header fields and data values, including
+ # checksum verification results.
+ mode = fields.get('mode', None)
+ if mode is None:
+ return
+ if mode == 1:
+ # An earlier implementation commented that for mode 1 the
+ # first data byte would be the PID. But example captures
+ # have no data bytes in packets for that mode. This position
+ # is taken by the checksum. Is this correct?
+ pid = data[0] if data else fields.get('csum', None)
+ if pid is None:
+ text = ['PID missing']
+ self.putg(ss, es, ANN_WARN, text)
+ else:
+ byte_text = '{:02x}'.format(pid)
+ self.putg(ss, es, ANN_M1_PID, [byte_text])
+
+ def handle_byte(self, ss, es, b):
+ # Annotate all raw byte values. Inspect and process the first
+ # bytes in a frame already. Cease inspection and only accumulate
+ # all other bytes after the mode. The checksum's position and
+ # thus the data bytes' span will only be known when EOF or IFS
+ # were seen. Implementor's note: This method just identifies
+ # header fields. Processing is left to the .handle_databytes()
+ # method. Until then validity will have been checked, too (CS).
+ byte_text = '{:02x}'.format(b)
+ self.putg(ss, es, ANN_BYTE, [byte_text])
+
+ if not 'prio' in self.fields:
+ self.fields.update({'prio': b})
+ self.putg(ss, es, ANN_PRIO, [byte_text])
+ return
+ if not 'dest' in self.fields:
+ self.fields.update({'dest': b})
+ self.putg(ss, es, ANN_DEST, [byte_text])
+ return
+ if not 'src' in self.fields:
+ self.fields.update({'src': b})
+ self.putg(ss, es, ANN_SRC, [byte_text])
+ return
+ if not 'mode' in self.fields:
+ self.fields.update({'mode': b})
+ self.putg(ss, es, ANN_MODE, [byte_text])
+ return
+ if not 'data' in self.fields:
+ self.fields.update({'data': [], 'csum': None})
+ self.fields['data'].append((b, ss, es))
+
+ def handle_sof(self, ss, es, speed):
+ text = ['{speed:d}x SOF', 'S{speed:d}', 'S']
+ text = [f.format(speed = speed) for f in text]
+ self.putg(ss, es, ANN_SOF, text)
+ self.invalidate_frame_details()
+ self.fields.update({'speed': speed})
+
+ def handle_bit(self, ss, es, b):
+ self.bits.append((b, ss, es))
+ self.putg(ss, es, ANN_BIT, ['{:d}'.format(b)])
+ if len(self.bits) < 8:
+ return
+ ss, es = self.bits[0][1], self.bits[-1][2]
+ b = bitpack_msb(self.bits, 0)
+ self.bits.clear()
+ self.handle_byte(ss, es, b)
+
+ def handle_eof(self, ss, es, is_ifs = False):
+ # EOF or IFS were seen. Post process the data bytes sequence.
+ # Separate the checksum from the data bytes. Emit annotations.
+ # Pass data bytes and header fields to deeper inspection.
+ data = self.fields.get('data', {})
+ if not data:
+ text = ['Short data phase', 'Data']
+ self.putg(ss, es, ANN_WARN, text)
+ csum = None
+ if len(data) >= 1:
+ csum, ss_csum, es_csum = data.pop()
+ self.fields.update({'csum': csum})
+ # TODO Verify checksum's correctness?
+ if data:
+ ss_data, es_data = data[0][1], data[-1][2]
+ text = ' '.join(['{:02x}'.format(b[0]) for b in data])
+ self.putg(ss_data, es_data, ANN_DATA, [text])
+ if csum is not None:
+ text = '{:02x}'.format(csum)
+ self.putg(ss_csum, es_csum, ANN_CSUM, [text])
+ text = ['IFS', 'I'] if is_ifs else ['EOF', 'E']
+ self.putg(ss, es, ANN_IFS, text)
+ self.handle_databytes(self.fields, data);
+ self.invalidate_frame_details()
+
+ def handle_unknown(self, ss, es):
+ text = ['Unknown condition', 'Unknown', 'UNK']
+ self.putg(ss, es, ANN_WARN, text)
+ self.invalidate_frame_details()
+
+ def usecs_to_samples(self, us):
+ us *= 1e-6
+ us *= self.samplerate
+ return int(us)
+
+ def samples_to_usecs(self, n):
+ n /= self.samplerate
+ n *= 1000.0 * 1000.0
+ return int(n)
def decode(self):
if not self.samplerate:
raise SamplerateError('Cannot decode without samplerate.')
- self.wait({0: 'e'})
+ # Get the distance between edges. Classify the distance
+ # to derive symbols and data bit values. Prepare waiting
+ # for an interframe gap as well, while this part of the
+ # condition is optional (switches in and out at runtime).
+ conds_edge = {0: 'e'}
+ conds_edge_only = [conds_edge]
+ conds_edge_idle = [conds_edge, {'skip': 0}]
+ conds = conds_edge_only
+ self.wait(conds)
es = self.samplenum
+ spd = None
while True:
ss = es
- pin, = self.wait({0: 'e'})
+ pin, = self.wait(conds)
es = self.samplenum
+ count = es - ss
+ t = self.samples_to_usecs(count)
+
+ # Synchronization to the next frame. Wait for SOF.
+ # Silently keep synchronizing until SOF was seen.
+ if spd is None:
+ if not self.matched[0]:
+ continue
+ if pin != self.active:
+ continue
+
+ # Detect the frame's speed from the SOF length. Adjust
+ # the expected BIT lengths to the SOF derived speed.
+ # Arrange for the additional supervision of EOF/IFS.
+ if t in range(VPW_SOFL // 1, VPW_SOFH // 1):
+ spd = 1
+ elif t in range(VPW_SOFL // 4, VPW_SOFH // 4):
+ spd = 4
+ else:
+ continue
+ short_lower, short_upper = VPW_SHORTL // spd, VPW_SHORTH // spd
+ long_lower, long_upper = VPW_LONGL // spd, VPW_LONGH // spd
+ samples = self.usecs_to_samples(VPW_IFS // spd)
+ conds_edge_idle[-1]['skip'] = samples
+ conds = conds_edge_idle
+
+ # Emit the SOF annotation. Start collecting DATA.
+ self.handle_sof(ss, es, spd)
+ continue
+
+ # Inside the DATA phase. Get data bits. Handle EOF/IFS.
+ if len(conds) > 1 and self.matched[1]:
+ # TODO The current implementation gets here after a
+ # pre-determined minimum wait time. Does not differ
+ # between EOF and IFS. An earlier implementation had
+ # this developer note: EOF=239-280 IFS=281+
+ self.handle_eof(ss, es)
+ # Enter the IDLE phase. Wait for the next SOF.
+ spd = None
+ conds = conds_edge_only
+ continue
+ if t in range(short_lower, short_upper):
+ value = 1 if pin == self.active else 0
+ self.handle_bit(ss, es, value)
+ continue
+ if t in range(long_lower, long_upper):
+ value = 0 if pin == self.active else 1
+ self.handle_bit(ss, es, value)
+ continue
- samples = es - ss
- t = timeuf(samples / self.samplerate)
- if self.state == 'IDLE': # detect and set speed from the size of sof
- if pin == self.active and t in range(self.sofl , self.sofh):
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['1X SOF', 'S1', 'S']])
- self.spd = 1
- self.data = 0
- self.count = 0
- self.state = 'DATA'
- elif pin == self.active and t in range(int(self.sofl / 4) , int(self.sofh / 4)):
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ['4X SOF', 'S4', '4']])
- self.spd = 4
- self.data = 0
- self.count = 0
- self.state = 'DATA'
-
- elif self.state == 'DATA':
- if t >= int(self.ifs / self.spd):
- self.state = 'IDLE'
- self.put(ss, es, self.out_ann, [Ann.ANN_RAW, ["EOF/IFS", "E"]]) # EOF=239-280 IFS=281+
- self.put(self.csa, self.csb, self.out_ann, [Ann.ANN_PACKET, ['Checksum','CS','C']]) # retrospective print of CS
- self.byte = 0 # reset packet offset
- elif t in range(int(self.shortl / self.spd), int(self.shorth / self.spd)):
- if pin == self.active:
- self.handle_bit(ss, es, 1)
- else:
- self.handle_bit(ss, es, 0)
- elif t in range(int(self.longl / self.spd), int(self.longh / self.spd)):
- if pin == self.active:
- self.handle_bit(ss, es, 0)
- else:
- self.handle_bit(ss, es, 1)
+ # Implementation detail: An earlier implementation used to
+ # ignore everything that was not handled above. This would
+ # be motivated by the noisy environment the protocol is
+ # typically used in. This more recent implementation accepts
+ # short glitches, but by design falls back to synchronization
+ # to the input stream for other unhandled conditions. This
+ # wants to improve usability of the decoder, by presenting
+ # potential issues to the user. The threshold (microseconds
+ # between edges that are not valid symbols that are handled
+ # above) is an arbitrary choice.
+ if t <= 2:
+ continue
+ self.handle_unknown(ss, es)
+ spd = None
+ conds = conds_edge_only
--- /dev/null
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+'''
+SBUS by Futaba, a hobby remote control protocol on top of UART.
+Sometimes referred to as "Serial BUS" or S-BUS.
+
+UART communication typically runs at 100kbps with 8e2 frame format and
+inverted signals (high voltage level is logic low).
+
+SBUS messages take 3ms to transfer, and typically repeat in intervals
+of 7ms or 14ms. An SBUS message consists of 25 UART bytes, and carries
+16 proportional channels with 11 bits each, and 2 digital channels
+(boolean, 1 bit), and flags which represent current communication state.
+Proportional channel values typically are in the 192..1792 range, but
+individual implementations may differ.
+'''
+
+from .pd import Decoder
--- /dev/null
+##
+## This file is part of the libsigrokdecode project.
+##
+## Copyright (C) 2022 Gerhard Sittig <gerhard.sittig@gmx.net>
+##
+## This program is free software; you can redistribute it and/or modify
+## it under the terms of the GNU General Public License as published by
+## the Free Software Foundation; either version 2 of the License, or
+## (at your option) any later version.
+##
+## This program is distributed in the hope that it will be useful,
+## but WITHOUT ANY WARRANTY; without even the implied warranty of
+## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+## GNU General Public License for more details.
+##
+## You should have received a copy of the GNU General Public License
+## along with this program; if not, see <http://www.gnu.org/licenses/>.
+##
+
+"""
+OUTPUT_PYTHON format:
+
+Packet:
+(<ptype>, <pdata>)
+
+This is the list of <ptype> codes and their respective <pdata> values:
+ - 'HEADER': The data is the header byte's value.
+ - 'PROPORTIONAL': The data is a tuple of the channel number (1-based)
+ and the channel's value.
+ - 'DIGITAL': The data is a tuple of the channel number (1-based)
+ and the channel's value.
+ - 'FLAG': The data is a tuple of the flag's name, and the flag's value.
+ - 'FOOTER': The data is the footer byte's value.
+"""
+
+import sigrokdecode as srd
+from common.srdhelper import bitpack_lsb
+
+class Ann:
+ HEADER, PROPORTIONAL, DIGITAL, FRAME_LOST, FAILSAFE, FOOTER, \
+ WARN = range(7)
+ FLAG_LSB = FRAME_LOST
+
+class Decoder(srd.Decoder):
+ api_version = 3
+ id = 'sbus_futaba'
+ name = 'SBUS (Futaba)'
+ longname = 'Futaba SBUS (Serial bus)'
+ desc = 'Serial bus for hobby remote control by Futaba'
+ license = 'gplv2+'
+ inputs = ['uart']
+ outputs = ['sbus_futaba']
+ tags = ['Remote Control']
+ options = (
+ {'id': 'prop_val_min', 'desc': 'Proportional value lower boundary', 'default': 0},
+ {'id': 'prop_val_max', 'desc': 'Proportional value upper boundary', 'default': 2047},
+ )
+ annotations = (
+ ('header', 'Header'),
+ ('proportional', 'Proportional'),
+ ('digital', 'Digital'),
+ ('framelost', 'Frame Lost'),
+ ('failsafe', 'Failsafe'),
+ ('footer', 'Footer'),
+ ('warning', 'Warning'),
+ )
+ annotation_rows = (
+ ('framing', 'Framing', (Ann.HEADER, Ann.FOOTER,
+ Ann.FRAME_LOST, Ann.FAILSAFE)),
+ ('channels', 'Channels', (Ann.PROPORTIONAL, Ann.DIGITAL)),
+ ('warnings', 'Warnings', (Ann.WARN,)),
+ )
+
+ def __init__(self):
+ self.bits_accum = []
+ self.sent_fields = None
+ self.msg_complete = None
+ self.failed = None
+ self.reset()
+
+ def reset(self):
+ self.bits_accum.clear()
+ self.sent_fields = 0
+ self.msg_complete = False
+ self.failed = None
+
+ def start(self):
+ self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_py = self.register(srd.OUTPUT_PYTHON)
+
+ def putg(self, ss, es, data):
+ # Put a graphical annotation.
+ self.put(ss, es, self.out_ann, data)
+
+ def putpy(self, ss, es, data):
+ # Pass Python to upper layers.
+ self.put(ss, es, self.out_py, data)
+
+ def get_ss_es_bits(self, bitcount):
+ # Get start/end times, and bit values of given length.
+ # Gets all remaining data when 'bitcount' is None.
+ if bitcount is None:
+ bitcount = len(self.bits_accum)
+ if len(self.bits_accum) < bitcount:
+ return None, None, None
+ bits = self.bits_accum[:bitcount]
+ self.bits_accum = self.bits_accum[bitcount:]
+ ss, es = bits[0][1], bits[-1][2]
+ bits = [b[0] for b in bits]
+ return ss, es, bits
+
+ def flush_accum_bits(self):
+ # Valid data was queued. See if we got full SBUS fields so far.
+ # Annotate them early, cease inspection of failed messages. The
+ # implementation is phrased to reduce the potential for clipboard
+ # errors: 'upto' is the next supported field count, 'want' is one
+ # field's bit count. Grab as many as we find in an invocation.
+ upto = 0
+ if self.failed:
+ return
+ # Annotate the header byte. Not seeing the expected bit pattern
+ # emits a warning annotation, but by design won't fail the SBUS
+ # message. It's considered more useful to present the channels'
+ # values instead. The warning still raises awareness.
+ upto += 1
+ want = 8
+ while self.sent_fields < upto:
+ if len(self.bits_accum) < want:
+ return
+ ss, es, bits = self.get_ss_es_bits(want)
+ value = bitpack_lsb(bits)
+ text = ['0x{:02x}'.format(value)]
+ self.putg(ss, es, [Ann.HEADER, text])
+ if value != 0x0f:
+ text = ['Unexpected header', 'Header']
+ self.putg(ss, es, [Ann.WARN, text])
+ self.putpy(ss, es, ['HEADER', value])
+ self.sent_fields += 1
+ # Annotate the proportional channels' data. Check for user
+ # provided value range violations. Channel numbers are in
+ # the 1..18 range (1-based).
+ upto += 16
+ want = 11
+ while self.sent_fields < upto:
+ if len(self.bits_accum) < want:
+ return
+ ss, es, bits = self.get_ss_es_bits(want)
+ value = bitpack_lsb(bits)
+ text = ['{:d}'.format(value)]
+ self.putg(ss, es, [Ann.PROPORTIONAL, text])
+ if value < self.options['prop_val_min']:
+ text = ['Low proportional value', 'Low value', 'Low']
+ self.putg(ss, es, [Ann.WARN, text])
+ if value > self.options['prop_val_max']:
+ text = ['High proportional value', 'High value', 'High']
+ self.putg(ss, es, [Ann.WARN, text])
+ idx = self.sent_fields - (upto - 16)
+ ch_nr = 1 + idx
+ self.putpy(ss, es, ['PROPORTIONAL', (ch_nr, value)])
+ self.sent_fields += 1
+ # Annotate the digital channels' data.
+ upto += 2
+ want = 1
+ while self.sent_fields < upto:
+ if len(self.bits_accum) < want:
+ return
+ ss, es, bits = self.get_ss_es_bits(want)
+ value = bitpack_lsb(bits)
+ text = ['{:d}'.format(value)]
+ self.putg(ss, es, [Ann.DIGITAL, text])
+ idx = self.sent_fields - (upto - 2)
+ ch_nr = 17 + idx
+ self.putpy(ss, es, ['DIGITAL', (ch_nr, value)])
+ self.sent_fields += 1
+ # Annotate the flags' state. Index starts from LSB.
+ flag_names = ['framelost', 'failsafe', 'msb']
+ upto += 2
+ want = 1
+ while self.sent_fields < upto:
+ if len(self.bits_accum) < want:
+ return
+ ss, es, bits = self.get_ss_es_bits(want)
+ value = bitpack_lsb(bits)
+ text = ['{:d}'.format(value)]
+ idx = self.sent_fields - (upto - 2)
+ cls = Ann.FLAG_LSB + idx
+ self.putg(ss, es, [cls, text])
+ flg_name = flag_names[idx]
+ self.putpy(ss, es, ['FLAG', (flg_name, value)])
+ self.sent_fields += 1
+ # Warn when flags' padding (bits [7:4]) is unexpexted.
+ upto += 1
+ want = 4
+ while self.sent_fields < upto:
+ if len(self.bits_accum) < want:
+ return
+ ss, es, bits = self.get_ss_es_bits(want)
+ value = bitpack_lsb(bits)
+ if value != 0x0:
+ text = ['Unexpected MSB flags', 'Flags']
+ self.putg(ss, es, [Ann.WARN, text])
+ flg_name = flag_names[-1]
+ self.putpy(ss, es, ['FLAG', (flg_name, value)])
+ self.sent_fields += 1
+ # Annotate the footer byte. Warn when unexpected.
+ upto += 1
+ want = 8
+ while self.sent_fields < upto:
+ if len(self.bits_accum) < want:
+ return
+ ss, es, bits = self.get_ss_es_bits(want)
+ value = bitpack_lsb(bits)
+ text = ['0x{:02x}'.format(value)]
+ self.putg(ss, es, [Ann.FOOTER, text])
+ if value != 0x00:
+ text = ['Unexpected footer', 'Footer']
+ self.putg(ss, es, [Ann.WARN, text])
+ self.putpy(ss, es, ['FOOTER', value])
+ self.sent_fields += 1
+ # Check for the completion of an SBUS message. Warn when more
+ # UART data is seen after the message. Defer the warning until
+ # more bits were collected, flush at next IDLE or BREAK, which
+ # spans all unprocessed data, and improves perception.
+ if self.sent_fields >= upto:
+ self.msg_complete = True
+ if self.msg_complete and self.bits_accum:
+ self.failed = ['Excess data bits', 'Excess']
+
+ def handle_bits(self, ss, es, bits):
+ # UART data bits were seen. Store them, validity is yet unknown.
+ self.bits_accum.extend(bits)
+
+ def handle_frame(self, ss, es, value, valid):
+ # A UART frame became complete. Get its validity. Process its bits.
+ if not valid:
+ self.failed = ['Invalid data', 'Invalid']
+ self.flush_accum_bits()
+
+ def handle_idle(self, ss, es):
+ # An IDLE period was seen in the UART level. Flush, reset state.
+ if self.bits_accum and not self.failed:
+ self.failed = ['Unprocessed data bits', 'Unprocessed']
+ if self.bits_accum and self.failed:
+ ss, es, _ = self.get_ss_es_bits(None)
+ self.putg(ss, es, [Ann.WARN, self.failed])
+ self.reset()
+
+ def handle_break(self, ss, es):
+ # A BREAK period was seen in the UART level. Warn, reset state.
+ break_ss, break_es = ss, es
+ if not self.failed:
+ self.failed = ['BREAK condition', 'Break']
+ # Re-use logic for "annotated bits warning".
+ self.handle_idle(None, None)
+ # Unconditionally annotate BREAK as warning.
+ text = ['BREAK condition', 'Break']
+ self.putg(ss, es, [Ann.WARN, text])
+ self.reset()
+
+ def decode(self, ss, es, data):
+ # Implementor's note: Expects DATA bits to arrive before FRAME
+ # validity. Either of IDLE or BREAK terminates an SBUS message.
+ ptype, rxtx, pdata = data
+ if ptype == 'DATA':
+ _, bits = pdata
+ self.handle_bits(ss, es, bits)
+ elif ptype == 'FRAME':
+ value, valid = pdata
+ self.handle_frame(ss, es, value, valid)
+ elif ptype == 'IDLE':
+ self.handle_idle(ss, es)
+ elif ptype == 'BREAK':
+ self.handle_break(ss, es)
def decode(self):
oldpins = self.wait()
-
- # Check if at least the 7 signals are present.
- if False in [p in (0, 1) for p in oldpins[:7]]:
- raise ChannelError('7 or 8 pins have to be present.')
-
lastpos = self.samplenum
+ # Check mandatory and optional decoder input signals.
+ if False in [p in (0, 1) for p in oldpins[:7]]:
+ raise ChannelError('Need at least segments A-G.')
self.have_dp = self.has_channel(7)
+ seg_count = 8 if self.have_dp else 7
- conditions = [{0: 'e'}, {1: 'e'}, {2: 'e'}, {3: 'e'}, {4: 'e'}, {5: 'e'}, {6: 'e'}]
-
- if self.have_dp:
- conditions.append({7: 'e'})
-
+ conditions = [{i: 'e'} for i in range(seg_count)]
while True:
# Wait for any change.
pins = self.wait(conditions)
+ # Invert all data lines if a common anode display is used.
if self.options['polarity'] == 'common-anode':
- # Invert all data lines if a common anode display is used.
- if self.have_dp:
- oldpins = tuple((1 - state for state in oldpins))
- else:
- oldpins = tuple((1 - state for state in oldpins[:7]))
+ oldpins = tuple((1 - state for state in oldpins[:seg_count]))
# Convert to character string.
digit = self.pins_to_hex(oldpins[:7])
-
if digit is None and self.options['show_unknown'] == 'yes':
digit = '#'
+ # Emit annotation when conversion succeeded.
+ # Optionally present the decimal point when active.
if digit is not None:
- dp = oldpins[7]
-
- # Check if decimal point is present and active.
- if self.have_dp and dp == 1:
- digit += '.'
-
+ if self.have_dp:
+ dp = oldpins[7]
+ if dp == 1:
+ digit += '.'
self.putb(lastpos, self.samplenum, [0, [digit]])
- lastpos = self.samplenum
-
oldpins = pins
+ lastpos = self.samplenum
0x15: 'FM25Q32',
},
'macronix': {
+ 0x13: 'MX25L8006',
0x14: 'MX25L1605D',
0x15: 'MX25L3205D',
0x16: 'MX25L6405D',
'sector_size': 4 * 1024,
'block_size': 64 * 1024,
},
+ 'macronix_mx25l8006': {
+ 'vendor': 'Macronix',
+ 'model': 'MX25L8006',
+ 'res_id': 0x13,
+ 'rems_id': 0xc213,
+ 'rems2_id': 0xc213,
+ 'rdid_id': 0xc22013,
+ 'page_size': 256,
+ 'sector_size': 4 * 1024,
+ 'block_size': 64 * 1024,
+ },
# Winbond
'winbond_w25q80dv': {
'vendor': 'Winbond',
##
## This file is part of the libsigrokdecode project.
##
-## Copyright (C) 2019-2020 Benjamin Vernoux <bvernoux@gmail.com>
+## Copyright (C) 2019-2021 Benjamin Vernoux <bvernoux@gmail.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
+## v0.1 - 17 September 2019 B.VERNOUX
+### Use ST25R3916 Datasheet DS12484 Rev 1 (January 2019)
+## v0.2 - 28 April 2020 B.VERNOUX
+### Use ST25R3916 Datasheet DS12484 Rev 2 (December 2019)
+## v0.3 - 17 June 2020 B.VERNOUX
+### Use ST25R3916 Datasheet DS12484 Rev 3 (04 June 2020)
+## v0.4 - 10 Aug 2021 B.VERNOUX
+### Fix FIFOR/FIFOW issues with Pulseview (with "Tabular Output View")
+### because of FIFO Read/FIFO Write commands, was not returning the
+### annotations short name FIFOR/FIFOW
import sigrokdecode as srd
from collections import namedtuple
def format_command(self):
'''Returns the label for the current command.'''
- if self.cmd in ('Write', 'Read', 'WriteB', 'ReadB', 'WriteT', 'ReadT', 'FIFO Write', 'FIFO Read'):
+ if self.cmd in ('Write', 'Read', 'WriteB', 'ReadB', 'WriteT', 'ReadT', 'FIFOW', 'FIFOR'):
return self.cmd
if self.cmd == 'Cmd':
reg = dir_cmd.get(self.dat, 'Unknown direct command')
# Register Space-B Access 0b11111011 0xFB => 'Space B'
# Register Test Access 0b11111100 0xFC => 'TestAccess'
if b == 0x80:
- return ('FIFO Write', b, 1, 99999)
+ return ('FIFOW', b, 1, 99999)
if b == 0xA0:
return ('Write', b, 1, 99999)
if b == 0xA8:
if b == 0xBF:
return ('Read', b, 1, 99999)
if b == 0x9F:
- return ('FIFO Read', b, 1, 99999)
+ return ('FIFOR', b, 1, 99999)
if (b >= 0x0C and b <= 0xE8) :
return ('Cmd', b, 0, 0)
if b == 0xFB:
self.decode_reg(pos, Ann.BURST_WRITET, self.dat, self.mosi_bytes())
elif self.cmd == 'ReadT':
self.decode_reg(pos, Ann.BURST_READT, self.dat, self.miso_bytes())
- elif self.cmd == 'FIFO Write':
+ elif self.cmd == 'FIFOW':
self.decode_reg(pos, Ann.FIFO_WRITE, self.dat, self.mosi_bytes())
- elif self.cmd == 'FIFO Read':
+ elif self.cmd == 'FIFOR':
self.decode_reg(pos, Ann.FIFO_READ, self.dat, self.miso_bytes())
elif self.cmd == 'Cmd':
self.decode_reg(pos, Ann.DIRECTCMD, self.dat, self.mosi_bytes())
import sigrokdecode as srd
+NUM_OUTPUT_CHANNELS = 8
+
+def logic_channels(num_channels):
+ l = []
+ for i in range(num_channels):
+ l.append(tuple(['p%d' % i, 'P-port input/output %d' % i]))
+ return tuple(l)
+
class Decoder(srd.Decoder):
api_version = 3
id = 'tca6408a'
('value', 'Register value'),
('warning', 'Warning'),
)
+ logic_output_channels = logic_channels(NUM_OUTPUT_CHANNELS)
annotation_rows = (
('regs', 'Registers', (0, 1)),
('warnings', 'Warnings', (2,)),
self.state = 'IDLE'
self.chip = -1
+ self.logic_output_es = 0
+ self.logic_value = 0
+
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
+ self.out_logic = self.register(srd.OUTPUT_LOGIC)
+
+ def flush(self):
+ self.put_logic_states()
def putx(self, data):
self.put(self.ss, self.es, self.out_ann, data)
+ def put_logic_states(self):
+ if (self.es > self.logic_output_es):
+ data = bytes([self.logic_value])
+ self.put(self.logic_output_es, self.es, self.out_logic, [0, data])
+ self.logic_output_es = self.es
+
def handle_reg_0x00(self, b):
self.putx([1, ['State of inputs: %02X' % b]])
+ # TODO
def handle_reg_0x01(self, b):
- self.putx([1, ['Outputs set: %02X' % b ]])
+ self.put_logic_states()
+ self.putx([1, ['Outputs set: %02X' % b]])
+ self.logic_value = b
def handle_reg_0x02(self, b):
self.putx([1, ['Polarity inverted: %02X' % b]])
{'id': 'parity', 'desc': 'Parity', 'default': 'none',
'values': ('none', 'odd', 'even', 'zero', 'one', 'ignore')},
{'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0,
- 'values': (0.0, 0.5, 1.0, 1.5)},
+ 'values': (0.0, 0.5, 1.0, 1.5, 2.0)},
{'id': 'bit_order', 'desc': 'Bit order', 'default': 'lsb-first',
'values': ('lsb-first', 'msb-first')},
{'id': 'format', 'desc': 'Data format', 'default': 'hex',
self.samplerate = None
self.frame_start = [-1, -1]
self.frame_valid = [None, None]
+ self.cur_frame_bit = [None, None]
self.startbit = [-1, -1]
self.cur_data_bit = [0, 0]
self.datavalue = [0, 0]
self.paritybit = [-1, -1]
- self.stopbit1 = [-1, -1]
+ self.stopbits = [[], []]
self.startsample = [-1, -1]
self.state = ['WAIT FOR START BIT', 'WAIT FOR START BIT']
self.databits = [[], []]
# Save the sample number where the start bit begins.
self.frame_start[rxtx] = self.samplenum
self.frame_valid[rxtx] = True
+ self.cur_frame_bit[rxtx] = 0
- self.state[rxtx] = 'GET START BIT'
+ self.advance_state(rxtx, signal)
def get_start_bit(self, rxtx, signal):
self.startbit[rxtx] = signal
+ self.cur_frame_bit[rxtx] += 1
# The startbit must be 0. If not, we report an error and wait
# for the next start bit (assuming this one was spurious).
es = self.samplenum + ceil(self.bit_width / 2.0)
self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,
(self.datavalue[rxtx], self.frame_valid[rxtx])])
- self.state[rxtx] = 'WAIT FOR START BIT'
+ self.advance_state(rxtx, signal, fatal = True, idle = es)
return
+ # Reset internal state for the pending UART frame.
self.cur_data_bit[rxtx] = 0
self.datavalue[rxtx] = 0
+ self.paritybit[rxtx] = -1
+ self.stopbits[rxtx].clear()
self.startsample[rxtx] = -1
+ self.databits[rxtx].clear()
self.putp(['STARTBIT', rxtx, self.startbit[rxtx]])
self.putg([Ann.RX_START + rxtx, ['Start bit', 'Start', 'S']])
- self.state[rxtx] = 'GET DATA BITS'
+ self.advance_state(rxtx, signal)
def handle_packet(self, rxtx):
d = 'rx' if (rxtx == RX) else 'tx'
# Store individual data bits and their start/end samplenumbers.
s, halfbit = self.samplenum, int(self.bit_width / 2)
self.databits[rxtx].append([signal, s - halfbit, s + halfbit])
+ self.cur_frame_bit[rxtx] += 1
# Return here, unless we already received all data bits.
self.cur_data_bit[rxtx] += 1
self.databits[rxtx] = []
- # Advance to either reception of the parity bit, or reception of
- # the STOP bits if parity is not applicable.
- self.state[rxtx] = 'GET PARITY BIT'
- if self.options['parity'] == 'none':
- self.state[rxtx] = 'GET STOP BITS'
+ self.advance_state(rxtx, signal)
def format_value(self, v):
# Format value 'v' according to configured options.
def get_parity_bit(self, rxtx, signal):
self.paritybit[rxtx] = signal
+ self.cur_frame_bit[rxtx] += 1
if parity_ok(self.options['parity'], self.paritybit[rxtx],
self.datavalue[rxtx], self.options['data_bits']):
self.putg([Ann.RX_PARITY_ERR + rxtx, ['Parity error', 'Parity err', 'PE']])
self.frame_valid[rxtx] = False
- self.state[rxtx] = 'GET STOP BITS'
+ self.advance_state(rxtx, signal)
- # TODO: Currently only supports 1 stop bit.
def get_stop_bits(self, rxtx, signal):
- self.stopbit1[rxtx] = signal
+ self.stopbits[rxtx].append(signal)
+ self.cur_frame_bit[rxtx] += 1
# Stop bits must be 1. If not, we report an error.
- if self.stopbit1[rxtx] != 1:
- self.putp(['INVALID STOPBIT', rxtx, self.stopbit1[rxtx]])
+ if signal != 1:
+ self.putp(['INVALID STOPBIT', rxtx, signal])
self.putg([Ann.RX_WARN + rxtx, ['Frame error', 'Frame err', 'FE']])
self.frame_valid[rxtx] = False
- self.putp(['STOPBIT', rxtx, self.stopbit1[rxtx]])
+ self.putp(['STOPBIT', rxtx, signal])
self.putg([Ann.RX_STOP + rxtx, ['Stop bit', 'Stop', 'T']])
+ # Postprocess the UART frame after all STOP bits were seen.
+ if len(self.stopbits[rxtx]) < self.options['stop_bits']:
+ return
+ self.advance_state(rxtx, signal)
+
+ def advance_state(self, rxtx, signal = None, fatal = False, idle = None):
+ # Advances the protocol decoder's internal state for all regular
+ # UART frame inspection. Deals with either edges, sample points,
+ # or other .wait() conditions. Also gracefully handles extreme
+ # undersampling. Each turn takes one .wait() call which in turn
+ # corresponds to at least one sample. That is why as many state
+ # transitions are done here as required within a single call.
+ frame_end = self.frame_start[rxtx] + self.frame_len_sample_count
+ if idle is not None:
+ # When requested by the caller, start another (potential)
+ # IDLE period after the caller specified position.
+ self.idle_start[rxtx] = idle
+ if fatal:
+ # When requested by the caller, don't advance to the next
+ # UART frame's field, but to the start of the next START bit
+ # instead.
+ self.state[rxtx] = 'WAIT FOR START BIT'
+ return
+ # Advance to the next UART frame's field that we expect. Cope
+ # with absence of optional fields. Force scan for next IDLE
+ # after the (optional) STOP bit field, so that callers need
+ # not deal with optional field presence. Also handles the cases
+ # where the decoder navigates to edges which are not strictly
+ # a field's sampling point.
+ if self.state[rxtx] == 'WAIT FOR START BIT':
+ self.state[rxtx] = 'GET START BIT'
+ return
+ if self.state[rxtx] == 'GET START BIT':
+ self.state[rxtx] = 'GET DATA BITS'
+ return
+ if self.state[rxtx] == 'GET DATA BITS':
+ self.state[rxtx] = 'GET PARITY BIT'
+ if self.options['parity'] != 'none':
+ return
+ # FALLTHROUGH
+ if self.state[rxtx] == 'GET PARITY BIT':
+ self.state[rxtx] = 'GET STOP BITS'
+ if self.options['stop_bits']:
+ return
+ # FALLTHROUGH
+ if self.state[rxtx] == 'GET STOP BITS':
+ # Postprocess the previously received UART frame. Advance
+ # the read position to after the frame's last bit time. So
+ # that the start of the next START bit won't fall into the
+ # end of the previously received UART frame. This improves
+ # robustness in the presence of glitchy input data.
+ ss = self.frame_start[rxtx]
+ es = self.samplenum + ceil(self.bit_width / 2.0)
+ self.handle_frame(rxtx, ss, es)
+ self.state[rxtx] = 'WAIT FOR START BIT'
+ self.idle_start[rxtx] = frame_end
+ return
+ # Unhandled state, actually a programming error. Emit diagnostics?
+ self.state[rxtx] = 'WAIT FOR START BIT'
+
+ def handle_frame(self, rxtx, ss, es):
# Pass the complete UART frame to upper layers.
- es = self.samplenum + ceil(self.bit_width / 2.0)
- self.putpse(self.frame_start[rxtx], es, ['FRAME', rxtx,
+ self.putpse(ss, es, ['FRAME', rxtx,
(self.datavalue[rxtx], self.frame_valid[rxtx])])
- self.state[rxtx] = 'WAIT FOR START BIT'
- self.idle_start[rxtx] = self.frame_start[rxtx] + self.frame_len_sample_count
+ def handle_idle(self, rxtx, ss, es):
+ self.putpse(ss, es, ['IDLE', rxtx, 0])
- def handle_break(self, rxtx):
- self.putpse(self.frame_start[rxtx], self.samplenum,
- ['BREAK', rxtx, 0])
- self.putgse(self.frame_start[rxtx], self.samplenum,
- [Ann.RX_BREAK + rxtx, ['Break condition', 'Break', 'Brk', 'B']])
+ def handle_break(self, rxtx, ss, es):
+ self.putpse(ss, es, ['BREAK', rxtx, 0])
+ self.putgse(ss, es, [Ann.RX_BREAK + rxtx,
+ ['Break condition', 'Break', 'Brk', 'B']])
self.state[rxtx] = 'WAIT FOR START BIT'
def get_wait_cond(self, rxtx, inv):
state = self.state[rxtx]
if state == 'WAIT FOR START BIT':
return {rxtx: 'r' if inv else 'f'}
- if state == 'GET START BIT':
- bitnum = 0
- elif state == 'GET DATA BITS':
- bitnum = 1 + self.cur_data_bit[rxtx]
- elif state == 'GET PARITY BIT':
- bitnum = 1 + self.options['data_bits']
- elif state == 'GET STOP BITS':
- bitnum = 1 + self.options['data_bits']
- bitnum += 0 if self.options['parity'] == 'none' else 1
- want_num = ceil(self.get_sample_point(rxtx, bitnum))
- return {'skip': want_num - self.samplenum}
+ if state in ('GET START BIT', 'GET DATA BITS',
+ 'GET PARITY BIT', 'GET STOP BITS'):
+ bitnum = self.cur_frame_bit[rxtx]
+ # TODO: Currently does not support half STOP bits.
+ want_num = ceil(self.get_sample_point(rxtx, bitnum))
+ return {'skip': want_num - self.samplenum}
def get_idle_cond(self, rxtx, inv):
# Return a condition that corresponds to the (expected) end of
return
diff = self.samplenum - self.break_start[rxtx]
if diff >= self.break_min_sample_count:
- self.handle_break(rxtx)
+ ss, es = self.frame_start[rxtx], self.samplenum
+ self.handle_break(rxtx, ss, es)
self.break_start[rxtx] = None
def inspect_idle(self, rxtx, signal, inv):
if diff < self.frame_len_sample_count:
return
ss, es = self.idle_start[rxtx], self.samplenum
- self.putpse(ss, es, ['IDLE', rxtx, 0])
- self.idle_start[rxtx] = self.samplenum
+ self.handle_idle(rxtx, ss, es)
+ self.idle_start[rxtx] = es
def decode(self):
if not self.samplerate:
di->got_new_samples = FALSE;
di->handled_all_samples = FALSE;
di->want_wait_terminate = FALSE;
+ di->communicate_eof = FALSE;
di->decoder_state = SRD_OK;
/*
di->got_new_samples = FALSE;
di->handled_all_samples = FALSE;
di->want_wait_terminate = FALSE;
+ di->communicate_eof = FALSE;
di->decoder_state = SRD_OK;
/* Conditions and mutex got reset after joining the thread. */
}
py_res = PyObject_CallMethod(di->py_inst, "decode", NULL);
srd_dbg("%s: decode() terminated.", di->inst_id);
+ /*
+ * Termination with an EOFError exception is accepted to simplify
+ * the implementation of decoders and for backwards compatibility.
+ */
+ if (PyErr_Occurred() && PyErr_ExceptionMatches(PyExc_EOFError)) {
+ srd_dbg("%s: ignoring EOFError during decode() execution.",
+ di->inst_id);
+ PyErr_Clear();
+ if (!py_res)
+ py_res = Py_None;
+ }
if (!py_res)
di->decoder_state = SRD_ERR;
g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex);
g_mutex_unlock(&di->data_mutex);
+ /* Flush all PDs in the stack that can be flushed */
+ srd_inst_flush(di);
+
if (di->want_wait_terminate)
return SRD_ERR_TERM_REQ;
return SRD_OK;
}
+
+/**
+ * Flush all data that is pending, bottom decoder first up to the top of the stack.
+ *
+ * @param di The decoder instance to call. Must not be NULL.
+ *
+ * @return SRD_OK upon success, a (negative) error code otherwise.
+ *
+ * @private
+ */
+SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di)
+{
+ PyGILState_STATE gstate;
+ PyObject *py_ret;
+ GSList *l;
+ int ret;
+
+ if (!di)
+ return SRD_ERR_ARG;
+
+ gstate = PyGILState_Ensure();
+ if (PyObject_HasAttrString(di->py_inst, "flush")) {
+ srd_dbg("Calling flush() of instance %s", di->inst_id);
+ py_ret = PyObject_CallMethod(di->py_inst, "flush", NULL);
+ Py_XDECREF(py_ret);
+ }
+ PyGILState_Release(gstate);
+
+ /* Pass the "flush" request to all stacked decoders. */
+ for (l = di->next_di; l; l = l->next) {
+ ret = srd_inst_flush(l->data);
+ if (ret != SRD_OK)
+ return ret;
+ }
+
+ return di->decoder_state;
+}
+
+/**
+ * Communicate the end of the stream of sample data to a decoder instance.
+ *
+ * @param[in] di The decoder instance to call. Must not be NULL.
+ *
+ * @return SRD_OK upon success, a (negative) error code otherwise.
+ *
+ * @private
+ */
+SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di)
+{
+ GSList *l;
+ int ret;
+
+ if (!di)
+ return SRD_ERR_ARG;
+
+ /*
+ * Send EOF to the caller specified decoder instance. Only
+ * communicate EOF to currently executing decoders. Never
+ * started or previously finished is perfectly acceptable.
+ */
+ srd_dbg("End of sample data: instance %s.", di->inst_id);
+ if (!di->thread_handle) {
+ srd_dbg("No worker thread, nothing to do.");
+ return SRD_OK;
+ }
+
+ /* Signal the thread about the EOF condition. */
+ g_mutex_lock(&di->data_mutex);
+ di->inbuf = NULL;
+ di->inbuflen = 0;
+ di->got_new_samples = TRUE;
+ di->handled_all_samples = FALSE;
+ di->want_wait_terminate = TRUE;
+ di->communicate_eof = TRUE;
+ g_cond_signal(&di->got_new_samples_cond);
+ g_mutex_unlock(&di->data_mutex);
+
+ /* Only return from here when the condition was handled. */
+ g_mutex_lock(&di->data_mutex);
+ while (!di->handled_all_samples && !di->want_wait_terminate)
+ g_cond_wait(&di->handled_all_samples_cond, &di->data_mutex);
+ g_mutex_unlock(&di->data_mutex);
+
+ /* Flush the decoder instance which handled EOF. */
+ srd_inst_flush(di);
+
+ /* Pass EOF to all stacked decoders. */
+ for (l = di->next_di; l; l = l->next) {
+ ret = srd_inst_send_eof(l->data);
+ if (ret != SRD_OK)
+ return ret;
+ }
+
+ return SRD_OK;
+}
+
/**
* Terminate current decoder work, prepare for re-use on new input data.
*
*
* Copyright (c) 2009-2019 Frank Meyer - frank(at)fli4l.de
* Copyright (c) 2009-2019 René Staffen - r.staffen(at)gmx.de
+ * Copyright (c) 2020-2021 Gerhard Sittig <gerhard.sittig@gmx.net>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
*/
#include "irmp-main-sharedlib.h"
+#include <errno.h>
+#include <glib.h>
+#include <Python.h>
#include <stdlib.h>
#include <string.h>
* upstream project.
*/
#if defined _WIN32
-# define WIN32
+# if !defined WIN32
+# define WIN32
+# endif
#else
-# define unix
+# if !defined unix
+# define unix
+# endif
#endif
#include "irmp.h"
#include "irmp.c"
# define ARRAY_SIZE(x) (sizeof(x) / sizeof(x[0]))
#endif
+static int irmp_lib_initialized;
+static size_t irmp_lib_client_id;
+static GMutex irmp_lib_mutex;
+
+struct irmp_instance {
+ size_t client_id;
+ GMutex *mutex;
+};
+
+static void irmp_lib_autoinit(void)
+{
+ if (irmp_lib_initialized)
+ return;
+
+ irmp_lib_client_id = 0;
+ g_mutex_init(&irmp_lib_mutex);
+
+ irmp_lib_initialized = 1;
+}
+
+static size_t irmp_next_client_id(void)
+{
+ size_t id;
+
+ do {
+ id = ++irmp_lib_client_id;
+ } while (!id);
+
+ return id;
+}
+
+IRMP_DLLEXPORT struct irmp_instance *irmp_instance_alloc(void)
+{
+ struct irmp_instance *inst;
+
+ irmp_lib_autoinit();
+
+ inst = g_malloc0(sizeof(*inst));
+ if (!inst)
+ return NULL;
+
+ inst->client_id = irmp_next_client_id();
+ inst->mutex = &irmp_lib_mutex;
+
+ return inst;
+}
+
+IRMP_DLLEXPORT void irmp_instance_free(struct irmp_instance *state)
+{
+
+ irmp_lib_autoinit();
+
+ if (!state)
+ return;
+
+ g_free(state);
+}
+
+IRMP_DLLEXPORT size_t irmp_instance_id(struct irmp_instance *state)
+{
+
+ irmp_lib_autoinit();
+
+ return state ? state->client_id : 0;
+}
+
+IRMP_DLLEXPORT int irmp_instance_lock(struct irmp_instance *state, int wait)
+{
+ int rc;
+ PyGILState_STATE pyst;
+
+ irmp_lib_autoinit();
+
+ if (!state || !state->mutex)
+ return -EINVAL;
+
+ pyst = PyGILState_Ensure();
+ Py_BEGIN_ALLOW_THREADS
+ if (wait) {
+ g_mutex_lock(state->mutex);
+ rc = 0;
+ } else {
+ rc = g_mutex_trylock(state->mutex);
+ }
+ Py_END_ALLOW_THREADS
+ PyGILState_Release(pyst);
+ if (rc != 0)
+ return rc;
+
+ return 0;
+}
+
+IRMP_DLLEXPORT void irmp_instance_unlock(struct irmp_instance *state)
+{
+
+ irmp_lib_autoinit();
+
+ if (!state || !state->mutex)
+ return;
+
+ g_mutex_unlock(state->mutex);
+}
+
static uint32_t s_end_sample;
IRMP_DLLEXPORT uint32_t irmp_get_sample_rate(void)
return "unknown";
return name;
}
+
+static __attribute__((constructor)) void init(void)
+{
+ irmp_lib_autoinit();
+}
*
* Copyright (c) 2009-2019 Frank Meyer - frank(at)fli4l.de
* Copyright (c) 2009-2019 René Staffen - r.staffen(at)gmx.de
+ * Copyright (c) 2020-2021 Gerhard Sittig <gerhard.sittig@gmx.net>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
/* Part of the library API is optional. */
#define WITH_IRMP_DETECT_BUFFER 0
+/**
+ * @brief State container for a decoder core instance. Opaque to clients.
+ */
+struct irmp_instance;
+
+/**
+ * @brief Allocate a decoder instance.
+ *
+ * @returns Reference to the allocated instance state.
+ */
+IRMP_DLLEXPORT struct irmp_instance *irmp_instance_alloc(void);
+
+/**
+ * @brief Release a decoder instance.
+ *
+ * @param[in] state Reference to the instance's state.
+ */
+IRMP_DLLEXPORT void irmp_instance_free(struct irmp_instance *state);
+
+/**
+ * @brief Get the client ID of an IRMP decoder core instance.
+ */
+IRMP_DLLEXPORT size_t irmp_instance_id(struct irmp_instance *state);
+
+/**
+ * @brief Acquire a decoder instance's lock.
+ *
+ * @param[in] state Reference to the instance's state.
+ * @param[in] wait Whether to block until the lock is acquired.
+ *
+ * @returns 0 upon success, non-zero upon failure
+ */
+IRMP_DLLEXPORT int irmp_instance_lock(struct irmp_instance *state, int wait);
+
+/**
+ * @brief Release a decoder instance's lock.
+ *
+ * @param[in] state Reference to the instance's state.
+ *
+ * @returns 0 upon success, non-zero upon failure
+ */
+IRMP_DLLEXPORT void irmp_instance_unlock(struct irmp_instance *state);
+
/**
* @brief IR decoder result data at the library's public API.
*/
#include <Python.h> /* First, so we avoid a _POSIX_C_SOURCE warning. */
#include "libsigrokdecode.h"
+/*
+ * Static definition of tables ending with an all-zero sentinel entry
+ * may raise warnings when compiling with -Wmissing-field-initializers.
+ * GCC suppresses the warning only with { 0 }, clang wants { } instead.
+ */
+#ifdef __clang__
+# define ALL_ZERO { }
+#else
+# define ALL_ZERO { 0 }
+#endif
+
enum {
SRD_TERM_ALWAYS_FALSE,
SRD_TERM_HIGH,
uint64_t abs_start_samplenum, uint64_t abs_end_samplenum,
const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize);
SRD_PRIV int process_samples_until_condition_match(struct srd_decoder_inst *di, gboolean *found_match);
+SRD_PRIV int srd_inst_flush(struct srd_decoder_inst *di);
+SRD_PRIV int srd_inst_send_eof(struct srd_decoder_inst *di);
SRD_PRIV int srd_inst_terminate_reset(struct srd_decoder_inst *di);
SRD_PRIV void srd_inst_free(struct srd_decoder_inst *di);
SRD_PRIV void srd_inst_free_all(struct srd_session *sess);
*/
/* Marks public libsigrokdecode API symbols. */
-#ifndef _WIN32
-#define SRD_API __attribute__((visibility("default")))
+#if defined _WIN32
+# if defined DLL_EXPORT
+# define SRD_API __declspec(dllexport)
+# else
+# define SRD_API extern
+# endif
#else
-#define SRD_API
+# define SRD_API __attribute__((visibility("default")))
#endif
/* Marks private, non-public libsigrokdecode symbols (not part of the API). */
-#ifndef _WIN32
-#define SRD_PRIV __attribute__((visibility("hidden")))
+#if defined _WIN32
+# define SRD_PRIV /* EMPTY */
#else
-#define SRD_PRIV
+# define SRD_PRIV __attribute__((visibility("hidden")))
#endif
/*
* When adding an output type, don't forget to...
- * - expose it to PDs in controller.c:PyInit_sigrokdecode()
- * - add a check in module_sigrokdecode.c:Decoder_put()
- * - add a debug string in type_decoder.c:OUTPUT_TYPES
+ * - expose it to PDs in module_sigrokdecode.c:PyInit_sigrokdecode()
+ * - add a check in type_decoder.c:Decoder_put()
+ * - add a debug string in type_decoder.c:output_type_name()
*/
enum srd_output_type {
SRD_OUTPUT_ANN,
SRD_OUTPUT_PYTHON,
SRD_OUTPUT_BINARY,
+ SRD_OUTPUT_LOGIC,
SRD_OUTPUT_META,
};
*/
GSList *binary;
+ /**
+ * List of logic output channels (item: id, description).
+ */
+ GSList *logic_output_channels;
+
/** List of decoder options. */
GSList *options;
GSList *ann_classes;
};
+struct srd_decoder_logic_output_channel {
+ char *id;
+ char *desc;
+};
+
struct srd_decoder_inst {
struct srd_decoder *decoder;
struct srd_session *sess;
/** Requests termination of wait() and decode(). */
gboolean want_wait_terminate;
+ /** Requests that .wait() terminates a Python iteration. */
+ gboolean communicate_eof;
+
/** Indicates the current state of the decoder stack. */
int decoder_state;
struct srd_proto_data_binary {
int bin_class; /* Index into "struct srd_decoder"->binary. */
uint64_t size;
- const unsigned char *data;
+ const uint8_t *data;
+};
+struct srd_proto_data_logic {
+ int logic_group;
+ uint64_t repeat_count; /* Number of times the value in data was repeated. */
+ const uint8_t *data; /* Bitfield containing the states of the logic outputs */
};
typedef void (*srd_pd_output_callback)(struct srd_proto_data *pdata,
SRD_API int srd_session_send(struct srd_session *sess,
uint64_t abs_start_samplenum, uint64_t abs_end_samplenum,
const uint8_t *inbuf, uint64_t inbuflen, uint64_t unitsize);
+SRD_API int srd_session_send_eof(struct srd_session *sess);
SRD_API int srd_session_terminate_reset(struct srd_session *sess);
SRD_API int srd_session_destroy(struct srd_session *sess);
SRD_API int srd_pd_output_callback_add(struct srd_session *sess,
goto err_out;
if (PyModule_AddIntConstant(mod, "OUTPUT_BINARY", SRD_OUTPUT_BINARY) < 0)
goto err_out;
+ if (PyModule_AddIntConstant(mod, "OUTPUT_LOGIC", SRD_OUTPUT_LOGIC) < 0)
+ goto err_out;
if (PyModule_AddIntConstant(mod, "OUTPUT_META", SRD_OUTPUT_META) < 0)
goto err_out;
/* Expose meta input symbols. */
return SRD_OK;
}
+/**
+ * Communicate the end of the stream of sample data to the session.
+ *
+ * @param[in] sess The session. Must not be NULL.
+ *
+ * @return SRD_OK upon success. A (negative) error code otherwise.
+ *
+ * @since 0.6.0
+ */
+SRD_API int srd_session_send_eof(struct srd_session *sess)
+{
+ GSList *d;
+ int ret;
+
+ if (!sess)
+ return SRD_ERR_ARG;
+
+ for (d = sess->di_list; d; d = d->next) {
+ ret = srd_inst_send_eof(d->data);
+ if (ret != SRD_OK)
+ return ret;
+ }
+
+ return SRD_OK;
+}
+
/**
* Terminate currently executing decoders in a session, reset internal state.
*
* @section sec_irc IRC
*
* You can find the sigrok developers in the
- * <a href="irc://chat.freenode.net/sigrok">\#sigrok</a>
- * IRC channel on Freenode.
+ * <a href="ircs://irc.libera.chat/#sigrok">\#sigrok</a>
+ * IRC channel on Libera.Chat.
*
* @section sec_website Website
*
}
/* Environment variable overrides everything, for debugging. */
+ /*
+ * TODO
+ * Is the comment still applicable and correct or up to date?
+ * This implementation adds paths which were specified by the
+ * env var. Which can shadow files in other locations, or can
+ * extend the set of available decoders. Which need not only
+ * serve for development, it is as beneficial to regular users.
+ * Without shadowing all other files still are found.
+ */
if ((env_path = g_getenv("SIGROKDECODE_DIR"))) {
if ((ret = srd_decoder_searchpath_add(env_path)) != SRD_OK) {
Py_Finalize();
return ret;
}
}
+ env_path = g_getenv("SIGROKDECODE_PATH");
+ if (env_path) {
+ char **dir_list, **dir_iter, *dir_item;
+ dir_list = g_strsplit(env_path, G_SEARCHPATH_SEPARATOR_S, 0);
+ for (dir_iter = dir_list; *dir_iter; dir_iter++) {
+ dir_item = *dir_iter;
+ if (!dir_item || !*dir_item)
+ continue;
+ ret = srd_decoder_searchpath_add(dir_item);
+ if (ret != SRD_OK) {
+ Py_Finalize();
+ return ret;
+ }
+ }
+ g_strfreev(dir_list);
+ }
- /* Initialize the Python GIL (this also happens to acquire it). */
+#if PY_VERSION_HEX < 0x03090000
+ /*
+ * Initialize and acquire the Python GIL. In Python 3.7+ this
+ * will be done implicitly as part of the Py_InitializeEx()
+ * call above. PyEval_InitThreads() was deprecated in 3.9.
+ */
PyEval_InitThreads();
+#endif
/* Release the GIL (ignore return value, we don't need it here). */
(void)PyEval_SaveThread();
/* This is only used for nicer srd_dbg() output. */
SRD_PRIV const char *output_type_name(unsigned int idx)
{
- static const char names[][16] = {
+ static const char *names[] = {
"OUTPUT_ANN",
"OUTPUT_PYTHON",
"OUTPUT_BINARY",
+ "OUTPUT_LOGIC",
"OUTPUT_META",
"(invalid)"
};
/* Should be a list of [annotation class, [string, ...]]. */
if (!PyList_Check(obj)) {
- srd_err("Protocol decoder %s submitted an annotation that"
- " is not a list", di->decoder->name);
+ srd_err("Protocol decoder %s submitted an annotation that is not a list",
+ di->decoder->name);
goto err;
}
/* Should have 2 elements. */
if (PyList_Size(obj) != 2) {
- srd_err("Protocol decoder %s submitted annotation list with "
- "%zd elements instead of 2", di->decoder->name,
- PyList_Size(obj));
+ ssize_t sz = PyList_Size(obj);
+ srd_err("Protocol decoder %s submitted annotation list with %zd elements instead of 2",
+ di->decoder->name, sz);
goto err;
}
*/
py_tmp = PyList_GetItem(obj, 0);
if (!PyLong_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted annotation list, but "
- "first element was not an integer.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted annotation list, but first element was not an integer.",
+ di->decoder->name);
goto err;
}
ann_class = PyLong_AsLong(py_tmp);
if (!(pdo = g_slist_nth_data(di->decoder->annotations, ann_class))) {
- srd_err("Protocol decoder %s submitted data to unregistered "
- "annotation class %d.", di->decoder->name, ann_class);
+ srd_err("Protocol decoder %s submitted data to unregistered annotation class %d.",
+ di->decoder->name, ann_class);
goto err;
}
/* Second element must be a list. */
py_tmp = PyList_GetItem(obj, 1);
if (!PyList_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted annotation list, but "
- "second element was not a list.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted annotation list, but second element was not a list.",
+ di->decoder->name);
goto err;
}
if (py_strseq_to_char(py_tmp, &ann_text) != SRD_OK) {
- srd_err("Protocol decoder %s submitted annotation list, but "
- "second element was malformed.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted annotation list, but second element was malformed.",
+ di->decoder->name);
goto err;
}
return SRD_ERR_PYTHON;
}
+static void release_logic(struct srd_proto_data_logic *pdl)
+{
+ if (!pdl)
+ return;
+ g_free((void *)pdl->data);
+}
+
+static int convert_logic(struct srd_decoder_inst *di, PyObject *obj,
+ struct srd_proto_data *pdata)
+{
+ struct srd_proto_data_logic *pdl;
+ PyObject *py_tmp;
+ Py_ssize_t size;
+ int logic_group;
+ char *group_name, *buf;
+ PyGILState_STATE gstate;
+
+ gstate = PyGILState_Ensure();
+
+ /* Should be a list of [logic group, bytes]. */
+ if (!PyList_Check(obj)) {
+ srd_err("Protocol decoder %s submitted non-list for SRD_OUTPUT_LOGIC.",
+ di->decoder->name);
+ goto err;
+ }
+
+ /* Should have 2 elements. */
+ if (PyList_Size(obj) != 2) {
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_LOGIC list "
+ "with %zd elements instead of 2", di->decoder->name,
+ PyList_Size(obj));
+ goto err;
+ }
+
+ /* The first element should be an integer. */
+ py_tmp = PyList_GetItem(obj, 0);
+ if (!PyLong_Check(py_tmp)) {
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_LOGIC list, "
+ "but first element was not an integer.", di->decoder->name);
+ goto err;
+ }
+ logic_group = PyLong_AsLong(py_tmp);
+ if (!(group_name = g_slist_nth_data(di->decoder->logic_output_channels, logic_group))) {
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_LOGIC with "
+ "unregistered logic group %d.", di->decoder->name, logic_group);
+ goto err;
+ }
+
+ /* Second element should be bytes. */
+ py_tmp = PyList_GetItem(obj, 1);
+ if (!PyBytes_Check(py_tmp)) {
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_LOGIC list, "
+ "but second element was not bytes.", di->decoder->name);
+ goto err;
+ }
+
+ /* Consider an empty set of bytes a bug. */
+ if (PyBytes_Size(py_tmp) == 0) {
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_LOGIC "
+ "with empty data set.", di->decoder->name);
+ goto err;
+ }
+
+ if (PyBytes_AsStringAndSize(py_tmp, &buf, &size) == -1)
+ goto err;
+
+ PyGILState_Release(gstate);
+
+ pdl = pdata->data;
+ pdl->logic_group = logic_group;
+ /* pdl->repeat_count is set by the caller as it depends on the sample range */
+ if (!(pdl->data = g_try_malloc(size)))
+ return SRD_ERR_MALLOC;
+ memcpy((void *)pdl->data, (const void *)buf, size);
+
+ return SRD_OK;
+
+err:
+ PyGILState_Release(gstate);
+
+ return SRD_ERR_PYTHON;
+}
+
static void release_binary(struct srd_proto_data_binary *pdb)
{
if (!pdb)
/* Should have 2 elements. */
if (PyList_Size(obj) != 2) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list "
- "with %zd elements instead of 2", di->decoder->name,
- PyList_Size(obj));
+ ssize_t sz = PyList_Size(obj);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list with %zd elements instead of 2",
+ di->decoder->name, sz);
goto err;
}
/* The first element should be an integer. */
py_tmp = PyList_GetItem(obj, 0);
if (!PyLong_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, "
- "but first element was not an integer.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, but first element was not an integer.",
+ di->decoder->name);
goto err;
}
bin_class = PyLong_AsLong(py_tmp);
if (!(class_name = g_slist_nth_data(di->decoder->binary, bin_class))) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with "
- "unregistered binary class %d.", di->decoder->name, bin_class);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with unregistered binary class %d.",
+ di->decoder->name, bin_class);
goto err;
}
/* Second element should be bytes. */
py_tmp = PyList_GetItem(obj, 1);
if (!PyBytes_Check(py_tmp)) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, "
- "but second element was not bytes.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY list, but second element was not bytes.",
+ di->decoder->name);
goto err;
}
/* Consider an empty set of bytes a bug. */
if (PyBytes_Size(py_tmp) == 0) {
- srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY "
- "with empty data set.", di->decoder->name);
+ srd_err("Protocol decoder %s submitted SRD_OUTPUT_BINARY with empty data set.",
+ di->decoder->name);
goto err;
}
if (g_variant_type_equal(pdata->pdo->meta_type, G_VARIANT_TYPE_INT64)) {
if (!PyLong_Check(obj)) {
- PyErr_Format(PyExc_TypeError, "This output was registered "
- "as 'int', but something else was passed.");
+ PyErr_Format(PyExc_TypeError,
+ "This output was registered as 'int', but something else was passed.");
goto err;
}
intvalue = PyLong_AsLongLong(obj);
pdata->data = g_variant_new_int64(intvalue);
} else if (g_variant_type_equal(pdata->pdo->meta_type, G_VARIANT_TYPE_DOUBLE)) {
if (!PyFloat_Check(obj)) {
- PyErr_Format(PyExc_TypeError, "This output was registered "
- "as 'float', but something else was passed.");
+ PyErr_Format(PyExc_TypeError,
+ "This output was registered as 'float', but something else was passed.");
goto err;
}
dvalue = PyFloat_AsDouble(obj);
g_variant_unref(gvar);
}
+PyDoc_STRVAR(Decoder_put_doc,
+ "Put an annotation for the specified span of samples.\n"
+ "\n"
+ "Arguments: start and end sample number, stream id, annotation data.\n"
+ "Annotation data's layout depends on the output stream type."
+);
+
static PyObject *Decoder_put(PyObject *self, PyObject *args)
{
GSList *l;
struct srd_proto_data pdata;
struct srd_proto_data_annotation pda;
struct srd_proto_data_binary pdb;
+ struct srd_proto_data_logic pdl;
uint64_t start_sample, end_sample;
int output_id;
struct srd_pd_callback *cb;
start_sample,
end_sample, output_type_name(pdo->output_type),
output_id, pdo->proto_id, next_di->inst_id);
- if (!(py_res = PyObject_CallMethod(
- next_di->py_inst, "decode", "KKO", start_sample,
- end_sample, py_data))) {
+ py_res = PyObject_CallMethod(next_di->py_inst, "decode",
+ "KKO", start_sample, end_sample, py_data);
+ if (!py_res) {
srd_exception_catch("Calling %s decode() failed",
next_di->inst_id);
}
release_binary(pdata.data);
}
break;
+ case SRD_OUTPUT_LOGIC:
+ if ((cb = srd_pd_output_callback_find(di->sess, pdo->output_type))) {
+ pdata.data = &pdl;
+ /* Convert from PyDict to srd_proto_data_logic. */
+ if (convert_logic(di, py_data, &pdata) != SRD_OK) {
+ /* An error was already logged. */
+ break;
+ }
+ if (end_sample <= start_sample) {
+ srd_err("Ignored SRD_OUTPUT_LOGIC with invalid sample range.");
+ break;
+ }
+ pdl.repeat_count = (end_sample - start_sample) - 1;
+ Py_BEGIN_ALLOW_THREADS
+ cb->cb(&pdata, cb->cb_data);
+ Py_END_ALLOW_THREADS
+ release_logic(pdata.data);
+ }
+ break;
case SRD_OUTPUT_META:
if ((cb = srd_pd_output_callback_find(di->sess, pdo->output_type))) {
/* Annotations need converting from PyObject. */
return NULL;
}
-static PyObject *Decoder_register(PyObject *self, PyObject *args,
- PyObject *kwargs)
+PyDoc_STRVAR(Decoder_register_doc,
+ "Register a new output stream."
+);
+
+static PyObject *Decoder_register(PyObject *self,
+ PyObject *args, PyObject *kwargs)
{
struct srd_decoder_inst *di;
struct srd_pd_output *pdo;
return SRD_OK;
}
+PyDoc_STRVAR(Decoder_wait_doc,
+ "Wait for one or more conditions to occur.\n"
+ "\n"
+ "Returns the sample data at the next position where the condition\n"
+ "is seen. When the optional condition is missing or empty, the next\n"
+ "sample number is used. The condition can be a dictionary with one\n"
+ "condition's details, or a list of dictionaries specifying multiple\n"
+ "conditions of which at least one condition must be true. Dicts can\n"
+ "contain one or more key/value pairs, all of which must be true for\n"
+ "the dict's condition to be considered true. The key either is a\n"
+ "channel index or a keyword, the value is the operation's parameter.\n"
+ "\n"
+ "Supported parameters for channel number keys: 'h', 'l', 'r', 'f',\n"
+ "or 'e' for level or edge conditions. Other supported keywords:\n"
+ "'skip' to advance over the given number of samples.\n"
+);
+
static PyObject *Decoder_wait(PyObject *self, PyObject *args)
{
int ret;
/* Signal the main thread that we handled all samples. */
g_cond_signal(&di->handled_all_samples_cond);
+ /*
+ * When EOF was provided externally, communicate the
+ * Python EOFError exception to .decode() and return
+ * from the .wait() method call. This is motivated by
+ * the use of Python context managers, so that .decode()
+ * methods can "close" incompletely accumulated data
+ * when the sample data is exhausted.
+ */
+ if (di->communicate_eof) {
+ /* Advance self.samplenum to the (absolute) last sample number. */
+ py_samplenum = PyLong_FromUnsignedLongLong(di->abs_cur_samplenum);
+ PyObject_SetAttrString(di->py_inst, "samplenum", py_samplenum);
+ Py_DECREF(py_samplenum);
+ /* Raise an EOFError Python exception. */
+ srd_dbg("%s: %s: Raising EOF from wait().",
+ di->inst_id, __func__);
+ g_mutex_unlock(&di->data_mutex);
+ PyErr_SetString(PyExc_EOFError, "samples exhausted");
+ goto err;
+ }
+
/*
* When termination of wait() and decode() was requested,
* then exit the loop after releasing the mutex.
return NULL;
}
+PyDoc_STRVAR(Decoder_has_channel_doc,
+ "Check whether input data is supplied for a given channel.\n"
+ "\n"
+ "Argument: A channel index.\n"
+ "Returns: A boolean, True if the channel is connected,\n"
+ "False if the channel is open (won't see any input data).\n"
+);
+
/**
* Return whether the specified channel was supplied to the decoder.
*
int idx, count;
struct srd_decoder_inst *di;
PyGILState_STATE gstate;
+ PyObject *bool_ret;
if (!self || !args)
return NULL;
PyGILState_Release(gstate);
- return (di->dec_channelmap[idx] == -1) ? Py_False : Py_True;
+ bool_ret = (di->dec_channelmap[idx] == -1) ? Py_False : Py_True;
+ Py_INCREF(bool_ret);
+ return bool_ret;
err:
PyGILState_Release(gstate);
return NULL;
}
+PyDoc_STRVAR(Decoder_doc, "sigrok Decoder base class");
+
static PyMethodDef Decoder_methods[] = {
- { "put", Decoder_put, METH_VARARGS,
- "Accepts a dictionary with the following keys: startsample, endsample, data" },
- { "register", (PyCFunction)(void(*)(void))Decoder_register, METH_VARARGS|METH_KEYWORDS,
- "Register a new output stream" },
- { "wait", Decoder_wait, METH_VARARGS,
- "Wait for one or more conditions to occur" },
- { "has_channel", Decoder_has_channel, METH_VARARGS,
- "Report whether a channel was supplied" },
- {NULL, NULL, 0, NULL}
+ { "put",
+ Decoder_put, METH_VARARGS,
+ Decoder_put_doc,
+ },
+ { "register",
+ (PyCFunction)(void(*)(void))Decoder_register, METH_VARARGS | METH_KEYWORDS,
+ Decoder_register_doc,
+ },
+ { "wait",
+ Decoder_wait, METH_VARARGS,
+ Decoder_wait_doc,
+ },
+ { "has_channel",
+ Decoder_has_channel, METH_VARARGS,
+ Decoder_has_channel_doc,
+ },
+ ALL_ZERO,
};
/**
{
PyType_Spec spec;
PyType_Slot slots[] = {
- { Py_tp_doc, "sigrok Decoder base class" },
+ { Py_tp_doc, Decoder_doc },
{ Py_tp_methods, Decoder_methods },
{ Py_tp_new, (void *)&PyType_GenericNew },
- { 0, NULL }
+ ALL_ZERO,
};
PyObject *py_obj;
PyGILState_STATE gstate;
SRD_PRIV int py_attr_as_strlist(PyObject *py_obj, const char *attr, GSList **outstrlist)
{
PyObject *py_list;
- Py_ssize_t i;
+ ssize_t idx;
int ret;
char *outstr;
PyGILState_STATE gstate;
*outstrlist = NULL;
- for (i = 0; i < PyList_Size(py_list); i++) {
- ret = py_listitem_as_str(py_list, i, &outstr);
+ for (idx = 0; idx < PyList_Size(py_list); idx++) {
+ ret = py_listitem_as_str(py_list, idx, &outstr);
if (ret < 0) {
- srd_dbg("Couldn't get item %" PY_FORMAT_SIZE_T "d.", i);
+ srd_dbg("Couldn't get item %zd.", idx);
goto err;
}
*outstrlist = g_slist_append(*outstrlist, outstr);
SRD_PRIV int py_listitem_as_str(PyObject *py_obj, Py_ssize_t idx,
char **outstr)
{
- PyObject *py_value;
PyGILState_STATE gstate;
+ ssize_t item_idx;
+ PyObject *py_value;
gstate = PyGILState_Ensure();
goto err;
}
- if (!(py_value = PyList_GetItem(py_obj, idx))) {
- srd_dbg("Couldn't get list item %" PY_FORMAT_SIZE_T "d.", idx);
+ item_idx = idx;
+ if (!(py_value = PyList_GetItem(py_obj, item_idx))) {
+ srd_dbg("Couldn't get list item %zd.", item_idx);
goto err;
}