]> sigrok.org Git - sigrok-test.git/blob - decoder/pdtest
pdtest: add support for input format specs in test.conf files
[sigrok-test.git] / decoder / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the sigrok-test project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 import re
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import unified_diff
28 from hashlib import md5
29 from shutil import copy
30
31 DEBUG = 0
32 VERBOSE = False
33
34
35 class E_syntax(Exception):
36     pass
37 class E_badline(Exception):
38     pass
39
40 def INFO(msg, end='\n'):
41     if VERBOSE:
42         print(msg, end=end)
43         sys.stdout.flush()
44
45
46 def DBG(msg):
47     if DEBUG:
48         print(msg)
49
50
51 def ERR(msg):
52     print(msg, file=sys.stderr)
53
54
55 def usage(msg=None):
56     if msg:
57         print(msg.strip() + '\n')
58     print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
59   -d  Turn on debugging
60   -v  Verbose
61   -a  All tests
62   -l  List test(s)
63   -s  Show test(s)
64   -r  Run test(s)
65   -f  Fix failed test(s) / create initial output for new test(s)
66   -c  Report decoder code coverage
67   -R <directory>  Save test reports to <directory>
68   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
69     sys.exit()
70
71
72 def check_tclist(tc):
73     if 'pdlist' not in tc or not tc['pdlist']:
74         return("No protocol decoders")
75     if 'input' not in tc or not tc['input']:
76         return("No input")
77     if 'output' not in tc or not tc['output']:
78         return("No output")
79     for op in tc['output']:
80         if 'match' not in op:
81             return("No match in output")
82
83     return None
84
85
86 def parse_testfile(path, pd, tc, op_type, op_class):
87     DBG("Opening '%s'" % path)
88     tclist = []
89     for line in open(path).read().split('\n'):
90         try:
91             line = line.strip()
92             if len(line) == 0 or line[0] == "#":
93                 continue
94             f = line.split()
95             if not tclist and f[0] != "test":
96                 # That can't be good.
97                 raise E_badline
98             key = f.pop(0)
99             if key == 'test':
100                 if len(f) != 1:
101                     raise E_syntax
102                 # new testcase
103                 tclist.append({
104                     'pd': pd,
105                     'name': f[0],
106                     'pdlist': [],
107                     'output': [],
108                 })
109             elif key == 'protocol-decoder':
110                 if len(f) < 1:
111                     raise E_syntax
112                 pd_spec = {
113                     'name': f.pop(0),
114                     'channels': [],
115                     'options': [],
116                     'initial_pins': [],
117                 }
118                 while len(f):
119                     if len(f) == 1:
120                         # Always needs <key> <value>
121                         raise E_syntax
122                     a, b = f[:2]
123                     f = f[2:]
124                     if '=' not in b:
125                         raise E_syntax
126                     opt, val = b.split('=')
127                     if a == 'channel':
128                         try:
129                             val = int(val)
130                         except:
131                             raise E_syntax
132                         pd_spec['channels'].append([opt, val])
133                     elif a == 'option':
134                         pd_spec['options'].append([opt, val])
135                     elif a == 'initial_pin':
136                         try:
137                             val = int(val)
138                         except:
139                             raise E_syntax
140                         pd_spec['initial_pins'].append([opt, val])
141                     else:
142                         raise E_syntax
143                 tclist[-1]['pdlist'].append(pd_spec)
144             elif key == 'stack':
145                 if len(f) < 2:
146                     raise E_syntax
147                 tclist[-1]['stack'] = f
148             elif key == 'input':
149                 if len(f) < 1:
150                     raise E_syntax
151                 input_spec = {
152                     'name': f.pop(0),
153                     'format': None,
154                     'options': [],
155                 }
156                 while len(f):
157                     if len(f) < 2:
158                         # Always needs <key> <value>
159                         raise E_syntax
160                     a, b = f[:2]
161                     f = f[2:]
162                     if a == 'format':
163                         input_spec['format'] = b
164                     elif a == 'option':
165                         input_spec['options'].append(b)
166                     else:
167                         raise E_syntax
168                 tclist[-1]['input'] = input_spec
169             elif key == 'output':
170                 op_spec = {
171                     'pd': f.pop(0),
172                     'type': f.pop(0),
173                 }
174                 while len(f):
175                     if len(f) == 1:
176                         # Always needs <key> <value>
177                         raise E_syntax
178                     a, b = f[:2]
179                     f = f[2:]
180                     if a == 'class':
181                         op_spec['class'] = b
182                     elif a == 'match':
183                         op_spec['match'] = b
184                     else:
185                         raise E_syntax
186                 tclist[-1]['output'].append(op_spec)
187             else:
188                 raise E_badline
189         except E_badline as e:
190             ERR("Invalid syntax in %s: line '%s'" % (path, line))
191             return []
192         except E_syntax as e:
193             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
194             return []
195
196     # If a specific testcase was requested, keep only that one.
197     if tc is not None:
198         target_tc = None
199         for t in tclist:
200             if t['name'] == tc:
201                 target_tc = t
202                 break
203         # ...and a specific output type
204         if op_type is not None:
205             target_oplist = []
206             for op in target_tc['output']:
207                 if op['type'] == op_type:
208                     # ...and a specific output class
209                     if op_class is None or ('class' in op and op['class'] == op_class):
210                         target_oplist.append(op)
211                         DBG("match on [%s]" % str(op))
212             target_tc['output'] = target_oplist
213         if target_tc is None:
214             tclist = []
215         else:
216             tclist = [target_tc]
217     for t in tclist:
218         error = check_tclist(t)
219         if error:
220             ERR("Error in %s: %s" % (path, error))
221             return []
222
223     return tclist
224
225
226 def get_tests(testnames):
227     tests = {}
228     for testspec in testnames:
229         # Optional testspec in the form pd/testcase/type/class
230         tc = op_type = op_class = None
231         ts = testspec.strip("/").split("/")
232         pd = ts.pop(0)
233         tests[pd] = []
234         if ts:
235             tc = ts.pop(0)
236         if ts:
237             op_type = ts.pop(0)
238         if ts:
239             op_class = ts.pop(0)
240         path = os.path.join(tests_dir, pd)
241         if not os.path.isdir(path):
242             # User specified non-existent PD
243             raise Exception("%s not found." % path)
244         path = os.path.join(tests_dir, pd, "test.conf")
245         if not os.path.exists(path):
246             # PD doesn't have any tests yet
247             continue
248         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
249
250     return tests
251
252
253 def diff_text(f1, f2):
254     t1 = open(f1).readlines()
255     t2 = open(f2).readlines()
256     diff = list(unified_diff(t1, t2))
257     diff = diff[2:] # Strip two from/to filename lines with "+++"/"---".
258     diff = [d.strip() for d in diff if d[0] in ('+', '-')]
259     return diff
260
261
262 def compare_binary(f1, f2):
263     h1 = md5()
264     h1.update(open(f1, 'rb').read())
265     h2 = md5()
266     h2.update(open(f2, 'rb').read())
267     if h1.digest() == h2.digest():
268         result = None
269     else:
270         result = ["Binary output does not match."]
271
272     return result
273
274
275 # runtc's stdout can have lines like:
276 # coverage: lines=161 missed=2 coverage=99%
277 def parse_stats(text):
278     stats = {}
279     for line in text.strip().split('\n'):
280         fields = line.split()
281         key = fields.pop(0).strip(':')
282         if key not in stats:
283             stats[key] = []
284         stats[key].append({})
285         for f in fields:
286             k, v = f.split('=')
287             stats[key][-1][k] = v
288
289     return stats
290
291
292 # take result set of all tests in a PD, and summarize which lines
293 # were not covered by any of the tests.
294 def coverage_sum(cvglist):
295     lines = 0
296     missed = 0
297     missed_lines = {}
298     for record in cvglist:
299         lines = int(record['lines'])
300         missed += int(record['missed'])
301         if 'missed_lines' not in record:
302             continue
303         for linespec in record['missed_lines'].split(','):
304             if linespec not in missed_lines:
305                 missed_lines[linespec] = 1
306             else:
307                 missed_lines[linespec] += 1
308
309     # keep only those lines that didn't show up in every non-summary record
310     final_missed = []
311     for linespec in missed_lines:
312         if missed_lines[linespec] != len(cvglist):
313             continue
314         final_missed.append(linespec)
315
316     return lines, final_missed
317
318
319 def run_tests(tests, fix=False):
320     errors = 0
321     results = []
322     cmd = [os.path.join(runtc_dir, 'runtc')]
323     if opt_coverage:
324         fd, coverage = mkstemp()
325         os.close(fd)
326         cmd.extend(['-c', coverage])
327     else:
328         coverage = None
329     for pd in sorted(tests.keys()):
330         pd_cvg = []
331         for tclist in tests[pd]:
332             for tc in tclist:
333                 args = cmd[:]
334                 if DEBUG > 1:
335                     args.append('-d')
336                 # Set up PD stack for this test.
337                 for spd in tc['pdlist']:
338                     args.extend(['-P', spd['name']])
339                     for label, channel in spd['channels']:
340                         args.extend(['-p', "%s=%d" % (label, channel)])
341                     for option, value in spd['options']:
342                         args.extend(['-o', "%s=%s" % (option, value)])
343                     for label, initial_pin in spd['initial_pins']:
344                         args.extend(['-N', "%s=%d" % (label, initial_pin)])
345                 # Setup input spec for this test (optional format spec).
346                 in_spec = tc['input']
347                 infile = os.path.join(dumps_dir, in_spec['name'])
348                 args.extend(['-i', infile])
349                 if in_spec['format']:
350                     args.extend(['-I', in_spec['format']])
351                     for opt in in_spec['options']:
352                         args.extend(['-I', opt])
353                 # Setup output spec for this test.
354                 for op in tc['output']:
355                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
356                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
357                     if 'class' in op:
358                         opargs[-1] += ":%s" % op['class']
359                         name += "/%s" % op['class']
360                     if VERBOSE:
361                         dots = '.' * (77 - len(name) - 2)
362                         INFO("%s %s " % (name, dots), end='')
363                     results.append({
364                         'testcase': name,
365                     })
366                     try:
367                         fd, outfile = mkstemp()
368                         os.close(fd)
369                         opargs.extend(['-f', outfile])
370                         DBG("Running %s" % (' '.join(args + opargs)))
371                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
372                         stdout, stderr = p.communicate()
373                         if stdout:
374                             # statistics and coverage data on stdout
375                             results[-1].update(parse_stats(stdout.decode('utf-8')))
376                         if stderr:
377                             results[-1]['error'] = stderr.decode('utf-8').strip()
378                             errors += 1
379                         elif p.returncode != 0:
380                             # runtc indicated an error, but didn't output a
381                             # message on stderr about it
382                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
383                         if 'error' not in results[-1]:
384                             matchfile = os.path.join(tests_dir, op['pd'], op['match'])
385                             DBG("Comparing with %s" % matchfile)
386                             try:
387                                 diff = diff_error = None
388                                 if op['type'] in ('annotation', 'python'):
389                                     diff = diff_text(matchfile, outfile)
390                                 elif op['type'] == 'binary':
391                                     diff = compare_binary(matchfile, outfile)
392                                 else:
393                                     diff = ["Unsupported output type '%s'." % op['type']]
394                             except Exception as e:
395                                 diff_error = e
396                             if fix:
397                                 if diff or diff_error:
398                                     copy(outfile, matchfile)
399                                     DBG("Wrote %s" % matchfile)
400                             else:
401                                 if diff:
402                                     results[-1]['diff'] = diff
403                                 elif diff_error is not None:
404                                     raise diff_error
405                     except Exception as e:
406                         results[-1]['error'] = str(e)
407                     finally:
408                         if coverage:
409                             results[-1]['coverage_report'] = coverage
410                         os.unlink(outfile)
411                     if op['type'] == 'exception' and 'error' in results[-1]:
412                         # filter out the exception we were looking for
413                         reg = "^Error: srd: %s:" % op['match']
414                         if re.match(reg, results[-1]['error']):
415                             # found it, not an error
416                             results[-1].pop('error')
417                             errors -= 1
418                     if VERBOSE:
419                         if 'diff' in results[-1]:
420                             INFO("Output mismatch")
421                         elif 'error' in results[-1]:
422                             error = results[-1]['error']
423                             if len(error) > 20:
424                                 error = error[:17] + '...'
425                             INFO(error)
426                         elif 'coverage' in results[-1]:
427                             # report coverage of this PD
428                             for record in results[-1]['coverage']:
429                                 # but not others used in the stack
430                                 # as part of the test.
431                                 if record['scope'] == pd:
432                                     INFO(record['coverage'])
433                                     break
434                         else:
435                             INFO("OK")
436                     gen_report(results[-1])
437                     if coverage:
438                         os.unlink(coverage)
439                         # only keep track of coverage records for this PD,
440                         # not others in the stack just used for testing.
441                         for cvg in results[-1]['coverage']:
442                             if cvg['scope'] == pd:
443                                 pd_cvg.append(cvg)
444         if opt_coverage and len(pd_cvg) > 1:
445             # report total coverage of this PD, across all the tests
446             # that were done on it.
447             total_lines, missed_lines = coverage_sum(pd_cvg)
448             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
449             if VERBOSE:
450                 dots = '.' * (54 - len(pd) - 2)
451                 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
452             if report_dir:
453                 # generate a missing lines list across all the files in
454                 # the PD
455                 files = {}
456                 for entry in missed_lines:
457                     filename, line = entry.split(':')
458                     if filename not in files:
459                         files[filename] = []
460                     files[filename].append(line)
461                 text = ''
462                 for filename in sorted(files.keys()):
463                     line_list = ','.join(sorted(files[filename], key=int))
464                     text += "%s: %s\n" % (filename, line_list)
465                 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
466
467
468     return results, errors
469
470 def get_run_tests_error_diff_counts(results):
471     """Get error and diff counters from run_tests() results."""
472     errs = 0
473     diffs = 0
474     for result in results:
475         if 'error' in result:
476             errs += 1
477         if 'diff' in result:
478             diffs += 1
479     return errs, diffs
480
481
482 def gen_report(result):
483     out = []
484     if 'error' in result:
485         out.append("Error:")
486         out.append(result['error'])
487         out.append('')
488     if 'diff' in result:
489         out.append("Test output mismatch:")
490         out.extend(result['diff'])
491         out.append('')
492     if 'coverage_report' in result:
493         out.append(open(result['coverage_report'], 'r').read())
494         out.append('')
495
496     if out:
497         text = "Testcase: %s\n" % result['testcase']
498         text += '\n'.join(out)
499     else:
500         return
501
502     if report_dir:
503         filename = result['testcase'].replace('/', '_')
504         open(os.path.join(report_dir, filename), 'w').write(text)
505     else:
506         print(text)
507
508
509 def show_tests(tests):
510     for pd in sorted(tests.keys()):
511         for tclist in tests[pd]:
512             for tc in tclist:
513                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
514                 for pd in tc['pdlist']:
515                     print("  Protocol decoder: %s" % pd['name'])
516                     for label, channel in pd['channels']:
517                         print("    Channel %s=%d" % (label, channel))
518                     for option, value in pd['options']:
519                         print("    Option %s=%s" % (option, value))
520                     for label, initial_pin in pd['initial_pins']:
521                         print("    Initial pin %s=%d" % (label, initial_pin))
522                 if 'stack' in tc:
523                     print("  Stack: %s" % ' '.join(tc['stack']))
524                 print("  Input: %s" % tc['input'])
525                 for op in tc['output']:
526                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
527                     print("    Type: %s" % op['type'])
528                     if 'class' in op:
529                         print("    Class: %s" % op['class'])
530                     print("    Match: %s" % op['match'])
531             print()
532
533
534 def list_tests(tests):
535     for pd in sorted(tests.keys()):
536         for tclist in tests[pd]:
537             for tc in tclist:
538                 for op in tc['output']:
539                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
540                     if 'class' in op:
541                         line += "/%s" % op['class']
542                     print(line)
543
544
545 #
546 # main
547 #
548
549 # project root
550 runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
551 base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
552 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
553 tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
554
555 if len(sys.argv) == 1:
556     usage()
557
558 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
559 report_dir = None
560 try:
561     opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
562 except Exception as e:
563     usage('error while parsing command line arguments: {}'.format(e))
564 for opt, arg in opts:
565     if opt == '-d':
566         DEBUG += 1
567     if opt == '-v':
568         VERBOSE = True
569     elif opt == '-a':
570         opt_all = True
571     elif opt == '-r':
572         opt_run = True
573     elif opt == '-s':
574         opt_show = True
575     elif opt == '-l':
576         opt_list = True
577     elif opt == '-f':
578         opt_fix = True
579     elif opt == '-c':
580         opt_coverage = True
581     elif opt == '-R':
582         report_dir = arg
583     elif opt == '-S':
584         dumps_dir = arg
585
586 if opt_run and opt_show:
587     usage("Use either -s or -r, not both.")
588 if args and opt_all:
589     usage("Specify either -a or tests, not both.")
590 if report_dir is not None and not os.path.isdir(report_dir):
591     usage("%s is not a directory" % report_dir)
592
593 ret = 0
594 try:
595     if args:
596         testlist = get_tests(args)
597     elif opt_all or opt_list:
598         testlist = get_tests(os.listdir(tests_dir))
599     else:
600         usage("Specify either -a or tests.")
601
602     if opt_run:
603         if not os.path.isdir(dumps_dir):
604             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
605             sys.exit(1)
606         results, errors = run_tests(testlist, fix=opt_fix)
607         ret = 0
608         errs, diffs = get_run_tests_error_diff_counts(results)
609         if errs:
610             ret = 1
611         elif diffs:
612             ret = 2
613     elif opt_show:
614         show_tests(testlist)
615     elif opt_list:
616         list_tests(testlist)
617     elif opt_fix:
618         run_tests(testlist, fix=True)
619     else:
620         usage()
621 except Exception as e:
622     print("Error: %s" % str(e))
623     if DEBUG:
624         raise
625
626 sys.exit(ret)