]> sigrok.org Git - libsigrokdecode.git/blobdiff - tests/pdtest
Add protocol decoder testing framework.
[libsigrokdecode.git] / tests / pdtest
diff --git a/tests/pdtest b/tests/pdtest
new file mode 100755 (executable)
index 0000000..59348d6
--- /dev/null
@@ -0,0 +1,399 @@
+#!/usr/bin/env /usr/bin/python3
+
+import os
+import sys
+from getopt import getopt
+from tempfile import mkstemp
+from subprocess import Popen, PIPE
+from difflib import Differ
+
+DEBUG = False
+VERBOSE = False
+
+
+class E_syntax(Exception):
+    pass
+class E_badline(Exception):
+    pass
+
+def INFO(msg, end='\n'):
+    if VERBOSE:
+        print(msg, end=end)
+        sys.stdout.flush()
+
+
+def DBG(msg):
+    if DEBUG:
+        print(msg)
+
+
+def ERR(msg):
+    print(msg, file=sys.stderr)
+
+
+def usage(msg=None):
+    if msg:
+        print(msg.strip() + '\n')
+    print("""Usage: testpd [-dvarslR] [test, ...]
+  -d  Turn on debugging
+  -v  Verbose
+  -a  All tests
+  -l  List all tests
+  -s  Show test(s)
+  -r  Run test(s)
+  -R <directory>  Save test reports to <directory>
+  <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
+    sys.exit()
+
+
+def check_testcase(tc):
+    if 'pdlist' not in tc or not tc['pdlist']:
+        return("No protocol decoders")
+    if 'input' not in tc or not tc['input']:
+        return("No input")
+    if 'output' not in tc or not tc['output']:
+        return("No output")
+    for op in tc['output']:
+        if 'match' not in op:
+            return("No match in output")
+
+    return None
+
+
+def parse_testfile(path, pd, tc, op_type, op_class):
+    DBG("Opening '%s'" % path)
+    tclist = []
+    for line in open(path).read().split('\n'):
+        try:
+            line = line.strip()
+            if len(line) == 0 or line[0] == "#":
+                continue
+            f = line.split()
+            if not tclist and f[0] != "test":
+                # That can't be good.
+                raise E_badline
+            key = f.pop(0)
+            if key == 'test':
+                if len(f) != 1:
+                    raise E_syntax
+                # new testcase
+                tclist.append({
+                    'pd': pd,
+                    'name': f[0],
+                    'pdlist': [],
+                    'output': [],
+                })
+            elif key == 'protocol-decoder':
+                if len(f) < 1:
+                    raise E_syntax
+                pd_spec = {
+                    'name': f.pop(0),
+                    'probes': [],
+                    'options': [],
+                }
+                while len(f):
+                    if len(f) == 1:
+                        # Always needs <key> <value>
+                        raise E_syntax
+                    a, b = f[:2]
+                    f = f[2:]
+                    if '=' not in b:
+                        raise E_syntax
+                    opt, val = b.split('=')
+                    if a == 'probe':
+                        try:
+                            val = int(val)
+                        except:
+                            raise E_syntax
+                        pd_spec['probes'].append([opt, val])
+                    elif a == 'option':
+                        pd_spec['options'].append([opt, val])
+                    else:
+                        raise E_syntax
+                tclist[-1]['pdlist'].append(pd_spec)
+            elif key == 'stack':
+                if len(f) < 2:
+                    raise E_syntax
+                tclist[-1]['stack'] = f
+            elif key == 'input':
+                if len(f) != 1:
+                    raise E_syntax
+                tclist[-1]['input'] = f[0]
+            elif key == 'output':
+                op_spec = {
+                    'pd': f.pop(0),
+                    'type': f.pop(0),
+                }
+                while len(f):
+                    if len(f) == 1:
+                        # Always needs <key> <value>
+                        raise E_syntax
+                    a, b = f[:2]
+                    f = f[2:]
+                    if a == 'class':
+                        op_spec['class'] = b
+                    elif a == 'match':
+                        op_spec['match'] = b
+                    else:
+                        raise E_syntax
+                tclist[-1]['output'].append(op_spec)
+            else:
+                raise E_badline
+        except E_badline as e:
+            ERR("Invalid syntax in %s: line '%s'" % (path, line))
+            return []
+        except E_syntax as e:
+            ERR("Unable to parse %s: unknown line '%s'" % (path, line))
+            return []
+
+    # If a specific testcase was requested, keep only that one.
+    if tc is not None:
+        target_tc = None
+        for t in tclist:
+            if t['name'] == tc:
+                target_tc = t
+                break
+        # ...and a specific output type
+        if op_type is not None:
+            target_oplist = []
+            for op in target_tc['output']:
+                if op['type'] == op_type:
+                    # ...and a specific output class
+                    if op_class is None or ('class' in op and op['class'] == op_class):
+                        target_oplist.append(op)
+                        DBG("match on [%s]" % str(op))
+            target_tc['output'] = target_oplist
+        if target_tc is None:
+            tclist = []
+        else:
+            tclist = [target_tc]
+            for t in tclist:
+                error = check_testcase(t)
+                if error:
+                    ERR("Error in %s: %s" % (path, error))
+                    return []
+
+    return tclist
+
+
+def get_tests(testnames):
+    tests = []
+    for testspec in testnames:
+        # Optional testspec in the form i2c/rtc
+        tc = op_type = op_class = None
+        ts = testspec.strip("/").split("/")
+        pd = ts.pop(0)
+        if ts:
+            tc = ts.pop(0)
+        if ts:
+            op_type = ts.pop(0)
+        if ts:
+            op_class = ts.pop(0)
+        path = os.path.join(decoders_dir, pd)
+        if not os.path.isdir(path):
+            # User specified non-existent PD
+            raise Exception("%s not found." % path)
+        path = os.path.join(decoders_dir, pd, "test/test.conf")
+        if not os.path.exists(path):
+            # PD doesn't have any tests yet
+            continue
+        tests.append(parse_testfile(path, pd, tc, op_type, op_class))
+
+    return tests
+
+
+def diff_files(f1, f2):
+    t1 = open(f1).readlines()
+    t2 = open(f2).readlines()
+    diff = []
+    d = Differ()
+    for line in d.compare(t1, t2):
+        if line[:2] in ('- ', '+ '):
+            diff.append(line.strip())
+
+    return diff
+
+
+def run_tests(tests):
+    errors = 0
+    results = []
+    cmd = os.path.join(tests_dir, 'runtc')
+    for tclist in tests:
+        for tc in tclist:
+            args = [cmd]
+            for pd in tc['pdlist']:
+                args.extend(['-P', pd['name']])
+                for label, probe in pd['probes']:
+                    args.extend(['-p', "%s=%d" % (label, probe)])
+                for option, value in pd['options']:
+                    args.extend(['-o', "%s=%s" % (option, value)])
+            args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
+            for op in tc['output']:
+                name = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
+                opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
+                if 'class' in op:
+                    opargs[-1] += ":%s" % op['class']
+                    name += "/%s" % op['class']
+                if VERBOSE:
+                    dots = '.' * (60 - len(name) - 2)
+                    INFO("%s %s " % (name, dots), end='')
+                results.append({
+                    'testcase': name,
+                })
+                try:
+                    fd, outfile = mkstemp()
+                    os.close(fd)
+                    opargs.extend(['-f', outfile])
+                    DBG("Running %s %s" % (cmd, ' '.join(args + opargs)))
+                    stdout, stderr = Popen(args + opargs, stdout=PIPE, stderr=PIPE).communicate()
+                    if stdout:
+                        results[-1]['statistics'] = stdout.decode('utf-8').strip()
+                    if stderr:
+                        results[-1]['error'] = stderr.decode('utf-8').strip()
+                        errors += 1
+                    match = "%s/%s/test/%s" % (decoders_dir, op['pd'], op['match'])
+                    diff = diff_files(match, outfile)
+                    if diff:
+                        results[-1]['diff'] = diff
+                except Exception as e:
+                    results[-1]['error'] = str(e)
+                finally:
+                    os.unlink(outfile)
+                if VERBOSE:
+                    if 'diff' in results[-1]:
+                        INFO("Output mismatch")
+                    elif 'error' in results[-1]:
+                        error = results[-1]['error']
+                        if len(error) > 20:
+                            error = error[:17] + '...'
+                        INFO(error)
+                    else:
+                        INFO("OK")
+                gen_report(results[-1])
+
+    return results, errors
+
+
+def gen_report(result):
+    out = []
+    if 'error' in result:
+        out.append("Error:")
+        out.append(result['error'])
+        out.append('')
+    if 'diff' in result:
+        out.append("Test output mismatch:")
+        out.extend(result['diff'])
+        out.append('')
+    if 'statistics' in result:
+        out.extend(["Statistics:", result['statistics']])
+        out.append('')
+
+    if out:
+        text = "Testcase: %s\n" % result['testcase']
+        text += '\n'.join(out)
+    else:
+        return
+
+    if report_dir:
+        filename = result['testcase'].replace('/', '_')
+        open(os.path.join(report_dir, filename), 'w').write(text)
+    else:
+        print(text)
+
+
+def show_tests(tests):
+    for tclist in tests:
+        for tc in tclist:
+            print("Testcase: %s/%s" % (tc['pd'], tc['name']))
+            for pd in tc['pdlist']:
+                print("  Protocol decoder: %s" % pd['name'])
+                for label, probe in pd['probes']:
+                    print("    Probe %s=%d" % (label, probe))
+                for option, value in pd['options']:
+                    print("    Option %s=%d" % (option, value))
+            if 'stack' in tc:
+                print("  Stack: %s" % ' '.join(tc['stack']))
+            print("  Input: %s" % tc['input'])
+            for op in tc['output']:
+                print("  Output:\n    Protocol decoder: %s" % op['pd'])
+                print("    Type: %s" % op['type'])
+                if 'class' in op:
+                    print("    Class: %s" % op['class'])
+                print("    Match: %s" % op['match'])
+        print()
+
+
+def list_tests(tests):
+    for tclist in tests:
+        for tc in tclist:
+            for op in tc['output']:
+                line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
+                if 'class' in op:
+                    line += "/%s" % op['class']
+                print(line)
+
+
+#
+# main
+#
+
+# project root
+tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
+base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
+dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
+decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
+
+if len(sys.argv) == 1:
+    usage()
+
+opt_all = opt_run = opt_show = opt_list = False
+report_dir = None
+opts, args = getopt(sys.argv[1:], "dvarslR:")
+for opt, arg in opts:
+    if opt == '-d':
+        DEBUG = True
+    if opt == '-v':
+        VERBOSE = True
+    elif opt == '-a':
+        opt_all = True
+    elif opt == '-r':
+        opt_run = True
+    elif opt == '-s':
+        opt_show = True
+    elif opt == '-l':
+        opt_list = True
+    elif opt == '-R':
+        report_dir = arg
+
+if opt_run and opt_show:
+    usage("Use either -s or -r, not both.")
+if args and opt_all:
+    usage("Specify either -a or tests, not both.")
+if report_dir is not None and not os.path.isdir(report_dir):
+    usage("%s is not a directory" % report_dir)
+
+ret = 0
+try:
+    if args:
+        testlist = get_tests(args)
+    elif opt_all:
+        testlist = get_tests(os.listdir(decoders_dir))
+    else:
+        usage("Specify either -a or tests.")
+
+    if opt_run:
+        results, errors = run_tests(testlist)
+        ret = errors
+    elif opt_show:
+        show_tests(testlist)
+    elif opt_list:
+        list_tests(testlist)
+    else:
+        usage()
+except Exception as e:
+    print("Error: %s" % str(e))
+    if DEBUG:
+        raise
+
+sys.exit(ret)
+