]> sigrok.org Git - sigrok-test.git/blob - decoder/pdtest
pdtest: Make exit code communicate the termination cause not counts
[sigrok-test.git] / decoder / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the sigrok-test project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 import re
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import Differ
28 from hashlib import md5
29 from shutil import copy
30
31 DEBUG = 0
32 VERBOSE = False
33
34
35 class E_syntax(Exception):
36     pass
37 class E_badline(Exception):
38     pass
39
40 def INFO(msg, end='\n'):
41     if VERBOSE:
42         print(msg, end=end)
43         sys.stdout.flush()
44
45
46 def DBG(msg):
47     if DEBUG:
48         print(msg)
49
50
51 def ERR(msg):
52     print(msg, file=sys.stderr)
53
54
55 def usage(msg=None):
56     if msg:
57         print(msg.strip() + '\n')
58     print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
59   -d  Turn on debugging
60   -v  Verbose
61   -a  All tests
62   -l  List test(s)
63   -s  Show test(s)
64   -r  Run test(s)
65   -f  Fix failed test(s)
66   -c  Report decoder code coverage
67   -R <directory>  Save test reports to <directory>
68   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
69     sys.exit()
70
71
72 def check_tclist(tc):
73     if 'pdlist' not in tc or not tc['pdlist']:
74         return("No protocol decoders")
75     if 'input' not in tc or not tc['input']:
76         return("No input")
77     if 'output' not in tc or not tc['output']:
78         return("No output")
79     for op in tc['output']:
80         if 'match' not in op:
81             return("No match in output")
82
83     return None
84
85
86 def parse_testfile(path, pd, tc, op_type, op_class):
87     DBG("Opening '%s'" % path)
88     tclist = []
89     for line in open(path).read().split('\n'):
90         try:
91             line = line.strip()
92             if len(line) == 0 or line[0] == "#":
93                 continue
94             f = line.split()
95             if not tclist and f[0] != "test":
96                 # That can't be good.
97                 raise E_badline
98             key = f.pop(0)
99             if key == 'test':
100                 if len(f) != 1:
101                     raise E_syntax
102                 # new testcase
103                 tclist.append({
104                     'pd': pd,
105                     'name': f[0],
106                     'pdlist': [],
107                     'output': [],
108                 })
109             elif key == 'protocol-decoder':
110                 if len(f) < 1:
111                     raise E_syntax
112                 pd_spec = {
113                     'name': f.pop(0),
114                     'channels': [],
115                     'options': [],
116                 }
117                 while len(f):
118                     if len(f) == 1:
119                         # Always needs <key> <value>
120                         raise E_syntax
121                     a, b = f[:2]
122                     f = f[2:]
123                     if '=' not in b:
124                         raise E_syntax
125                     opt, val = b.split('=')
126                     if a == 'channel':
127                         try:
128                             val = int(val)
129                         except:
130                             raise E_syntax
131                         pd_spec['channels'].append([opt, val])
132                     elif a == 'option':
133                         pd_spec['options'].append([opt, val])
134                     else:
135                         raise E_syntax
136                 tclist[-1]['pdlist'].append(pd_spec)
137             elif key == 'stack':
138                 if len(f) < 2:
139                     raise E_syntax
140                 tclist[-1]['stack'] = f
141             elif key == 'input':
142                 if len(f) != 1:
143                     raise E_syntax
144                 tclist[-1]['input'] = f[0]
145             elif key == 'output':
146                 op_spec = {
147                     'pd': f.pop(0),
148                     'type': f.pop(0),
149                 }
150                 while len(f):
151                     if len(f) == 1:
152                         # Always needs <key> <value>
153                         raise E_syntax
154                     a, b = f[:2]
155                     f = f[2:]
156                     if a == 'class':
157                         op_spec['class'] = b
158                     elif a == 'match':
159                         op_spec['match'] = b
160                     else:
161                         raise E_syntax
162                 tclist[-1]['output'].append(op_spec)
163             else:
164                 raise E_badline
165         except E_badline as e:
166             ERR("Invalid syntax in %s: line '%s'" % (path, line))
167             return []
168         except E_syntax as e:
169             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
170             return []
171
172     # If a specific testcase was requested, keep only that one.
173     if tc is not None:
174         target_tc = None
175         for t in tclist:
176             if t['name'] == tc:
177                 target_tc = t
178                 break
179         # ...and a specific output type
180         if op_type is not None:
181             target_oplist = []
182             for op in target_tc['output']:
183                 if op['type'] == op_type:
184                     # ...and a specific output class
185                     if op_class is None or ('class' in op and op['class'] == op_class):
186                         target_oplist.append(op)
187                         DBG("match on [%s]" % str(op))
188             target_tc['output'] = target_oplist
189         if target_tc is None:
190             tclist = []
191         else:
192             tclist = [target_tc]
193     for t in tclist:
194         error = check_tclist(t)
195         if error:
196             ERR("Error in %s: %s" % (path, error))
197             return []
198
199     return tclist
200
201
202 def get_tests(testnames):
203     tests = {}
204     for testspec in testnames:
205         # Optional testspec in the form pd/testcase/type/class
206         tc = op_type = op_class = None
207         ts = testspec.strip("/").split("/")
208         pd = ts.pop(0)
209         tests[pd] = []
210         if ts:
211             tc = ts.pop(0)
212         if ts:
213             op_type = ts.pop(0)
214         if ts:
215             op_class = ts.pop(0)
216         path = os.path.join(tests_dir, pd)
217         if not os.path.isdir(path):
218             # User specified non-existent PD
219             raise Exception("%s not found." % path)
220         path = os.path.join(tests_dir, pd, "test.conf")
221         if not os.path.exists(path):
222             # PD doesn't have any tests yet
223             continue
224         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
225
226     return tests
227
228
229 def diff_text(f1, f2):
230     t1 = open(f1).readlines()
231     t2 = open(f2).readlines()
232     diff = []
233     d = Differ()
234     for line in d.compare(t1, t2):
235         if line[:2] in ('- ', '+ '):
236             diff.append(line.strip())
237
238     return diff
239
240
241 def compare_binary(f1, f2):
242     h1 = md5()
243     h1.update(open(f1, 'rb').read())
244     h2 = md5()
245     h2.update(open(f2, 'rb').read())
246     if h1.digest() == h2.digest():
247         result = None
248     else:
249         result = ["Binary output does not match."]
250
251     return result
252
253
254 # runtc's stdout can have lines like:
255 # coverage: lines=161 missed=2 coverage=99%
256 def parse_stats(text):
257     stats = {}
258     for line in text.strip().split('\n'):
259         fields = line.split()
260         key = fields.pop(0).strip(':')
261         if key not in stats:
262             stats[key] = []
263         stats[key].append({})
264         for f in fields:
265             k, v = f.split('=')
266             stats[key][-1][k] = v
267
268     return stats
269
270
271 # take result set of all tests in a PD, and summarize which lines
272 # were not covered by any of the tests.
273 def coverage_sum(cvglist):
274     lines = 0
275     missed = 0
276     missed_lines = {}
277     for record in cvglist:
278         lines = int(record['lines'])
279         missed += int(record['missed'])
280         if 'missed_lines' not in record:
281             continue
282         for linespec in record['missed_lines'].split(','):
283             if linespec not in missed_lines:
284                 missed_lines[linespec] = 1
285             else:
286                 missed_lines[linespec] += 1
287
288     # keep only those lines that didn't show up in every non-summary record
289     final_missed = []
290     for linespec in missed_lines:
291         if missed_lines[linespec] != len(cvglist):
292             continue
293         final_missed.append(linespec)
294
295     return lines, final_missed
296
297
298 def run_tests(tests, fix=False):
299     errors = 0
300     results = []
301     cmd = [os.path.join(runtc_dir, 'runtc')]
302     if opt_coverage:
303         fd, coverage = mkstemp()
304         os.close(fd)
305         cmd.extend(['-c', coverage])
306     else:
307         coverage = None
308     for pd in sorted(tests.keys()):
309         pd_cvg = []
310         for tclist in tests[pd]:
311             for tc in tclist:
312                 args = cmd[:]
313                 if DEBUG > 1:
314                     args.append('-d')
315                 # Set up PD stack for this test.
316                 for spd in tc['pdlist']:
317                     args.extend(['-P', spd['name']])
318                     for label, channel in spd['channels']:
319                         args.extend(['-p', "%s=%d" % (label, channel)])
320                     for option, value in spd['options']:
321                         args.extend(['-o', "%s=%s" % (option, value)])
322                 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
323                 for op in tc['output']:
324                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
325                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
326                     if 'class' in op:
327                         opargs[-1] += ":%s" % op['class']
328                         name += "/%s" % op['class']
329                     if VERBOSE:
330                         dots = '.' * (77 - len(name) - 2)
331                         INFO("%s %s " % (name, dots), end='')
332                     results.append({
333                         'testcase': name,
334                     })
335                     try:
336                         fd, outfile = mkstemp()
337                         os.close(fd)
338                         opargs.extend(['-f', outfile])
339                         DBG("Running %s" % (' '.join(args + opargs)))
340                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
341                         stdout, stderr = p.communicate()
342                         if stdout:
343                             # statistics and coverage data on stdout
344                             results[-1].update(parse_stats(stdout.decode('utf-8')))
345                         if stderr:
346                             results[-1]['error'] = stderr.decode('utf-8').strip()
347                             errors += 1
348                         elif p.returncode != 0:
349                             # runtc indicated an error, but didn't output a
350                             # message on stderr about it
351                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
352                         if 'error' not in results[-1]:
353                             matchfile = os.path.join(tests_dir, op['pd'], op['match'])
354                             DBG("Comparing with %s" % matchfile)
355                             try:
356                                 diff = diff_error = None
357                                 if op['type'] in ('annotation', 'python'):
358                                     diff = diff_text(matchfile, outfile)
359                                 elif op['type'] == 'binary':
360                                     diff = compare_binary(matchfile, outfile)
361                                 else:
362                                     diff = ["Unsupported output type '%s'." % op['type']]
363                             except Exception as e:
364                                 diff_error = e
365                             if fix:
366                                 if diff or diff_error:
367                                     copy(outfile, matchfile)
368                                     DBG("Wrote %s" % matchfile)
369                             else:
370                                 if diff:
371                                     results[-1]['diff'] = diff
372                                 elif diff_error is not None:
373                                     raise diff_error
374                     except Exception as e:
375                         results[-1]['error'] = str(e)
376                     finally:
377                         if coverage:
378                             results[-1]['coverage_report'] = coverage
379                         os.unlink(outfile)
380                     if op['type'] == 'exception' and 'error' in results[-1]:
381                         # filter out the exception we were looking for
382                         reg = "^Error: srd: %s:" % op['match']
383                         if re.match(reg, results[-1]['error']):
384                             # found it, not an error
385                             results[-1].pop('error')
386                             errors -= 1
387                     if VERBOSE:
388                         if 'diff' in results[-1]:
389                             INFO("Output mismatch")
390                         elif 'error' in results[-1]:
391                             error = results[-1]['error']
392                             if len(error) > 20:
393                                 error = error[:17] + '...'
394                             INFO(error)
395                         elif 'coverage' in results[-1]:
396                             # report coverage of this PD
397                             for record in results[-1]['coverage']:
398                                 # but not others used in the stack
399                                 # as part of the test.
400                                 if record['scope'] == pd:
401                                     INFO(record['coverage'])
402                                     break
403                         else:
404                             INFO("OK")
405                     gen_report(results[-1])
406                     if coverage:
407                         os.unlink(coverage)
408                         # only keep track of coverage records for this PD,
409                         # not others in the stack just used for testing.
410                         for cvg in results[-1]['coverage']:
411                             if cvg['scope'] == pd:
412                                 pd_cvg.append(cvg)
413         if opt_coverage and len(pd_cvg) > 1:
414             # report total coverage of this PD, across all the tests
415             # that were done on it.
416             total_lines, missed_lines = coverage_sum(pd_cvg)
417             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
418             if VERBOSE:
419                 dots = '.' * (54 - len(pd) - 2)
420                 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
421             if report_dir:
422                 # generate a missing lines list across all the files in
423                 # the PD
424                 files = {}
425                 for entry in missed_lines:
426                     filename, line = entry.split(':')
427                     if filename not in files:
428                         files[filename] = []
429                     files[filename].append(line)
430                 text = ''
431                 for filename in sorted(files.keys()):
432                     line_list = ','.join(sorted(files[filename], key=int))
433                     text += "%s: %s\n" % (filename, line_list)
434                 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
435
436
437     return results, errors
438
439 def get_run_tests_error_diff_counts(results):
440     """Get error and diff counters from run_tests() results."""
441     errs = 0
442     diffs = 0
443     for result in results:
444         if 'error' in result:
445             errs += 1
446         if 'diff' in result:
447             diffs += 1
448     return errs, diffs
449
450
451 def gen_report(result):
452     out = []
453     if 'error' in result:
454         out.append("Error:")
455         out.append(result['error'])
456         out.append('')
457     if 'diff' in result:
458         out.append("Test output mismatch:")
459         out.extend(result['diff'])
460         out.append('')
461     if 'coverage_report' in result:
462         out.append(open(result['coverage_report'], 'r').read())
463         out.append('')
464
465     if out:
466         text = "Testcase: %s\n" % result['testcase']
467         text += '\n'.join(out)
468     else:
469         return
470
471     if report_dir:
472         filename = result['testcase'].replace('/', '_')
473         open(os.path.join(report_dir, filename), 'w').write(text)
474     else:
475         print(text)
476
477
478 def show_tests(tests):
479     for pd in sorted(tests.keys()):
480         for tclist in tests[pd]:
481             for tc in tclist:
482                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
483                 for pd in tc['pdlist']:
484                     print("  Protocol decoder: %s" % pd['name'])
485                     for label, channel in pd['channels']:
486                         print("    Channel %s=%d" % (label, channel))
487                     for option, value in pd['options']:
488                         print("    Option %s=%d" % (option, value))
489                 if 'stack' in tc:
490                     print("  Stack: %s" % ' '.join(tc['stack']))
491                 print("  Input: %s" % tc['input'])
492                 for op in tc['output']:
493                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
494                     print("    Type: %s" % op['type'])
495                     if 'class' in op:
496                         print("    Class: %s" % op['class'])
497                     print("    Match: %s" % op['match'])
498             print()
499
500
501 def list_tests(tests):
502     for pd in sorted(tests.keys()):
503         for tclist in tests[pd]:
504             for tc in tclist:
505                 for op in tc['output']:
506                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
507                     if 'class' in op:
508                         line += "/%s" % op['class']
509                     print(line)
510
511
512 #
513 # main
514 #
515
516 # project root
517 runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
518 base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
519 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
520 tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
521
522 if len(sys.argv) == 1:
523     usage()
524
525 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
526 report_dir = None
527 try:
528     opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
529 except Exception as e:
530     usage('error while parsing command line arguments: {}'.format(e))
531 for opt, arg in opts:
532     if opt == '-d':
533         DEBUG += 1
534     if opt == '-v':
535         VERBOSE = True
536     elif opt == '-a':
537         opt_all = True
538     elif opt == '-r':
539         opt_run = True
540     elif opt == '-s':
541         opt_show = True
542     elif opt == '-l':
543         opt_list = True
544     elif opt == '-f':
545         opt_fix = True
546     elif opt == '-c':
547         opt_coverage = True
548     elif opt == '-R':
549         report_dir = arg
550     elif opt == '-S':
551         dumps_dir = arg
552
553 if opt_run and opt_show:
554     usage("Use either -s or -r, not both.")
555 if args and opt_all:
556     usage("Specify either -a or tests, not both.")
557 if report_dir is not None and not os.path.isdir(report_dir):
558     usage("%s is not a directory" % report_dir)
559
560 ret = 0
561 try:
562     if args:
563         testlist = get_tests(args)
564     elif opt_all:
565         testlist = get_tests(os.listdir(tests_dir))
566     else:
567         usage("Specify either -a or tests.")
568
569     if opt_run:
570         if not os.path.isdir(dumps_dir):
571             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
572             sys.exit(1)
573         results, errors = run_tests(testlist, fix=opt_fix)
574         ret = 0
575         errs, diffs = get_run_tests_error_diff_counts(results)
576         if errs:
577             ret = 1
578         elif diffs:
579             ret = 2
580     elif opt_show:
581         show_tests(testlist)
582     elif opt_list:
583         list_tests(testlist)
584     elif opt_fix:
585         run_tests(testlist, fix=True)
586     else:
587         usage()
588 except Exception as e:
589     print("Error: %s" % str(e))
590     if DEBUG:
591         raise
592
593 sys.exit(ret)
594