]> sigrok.org Git - sigrok-test.git/blob - decoder/pdtest
parallel: adjust hex number format of expected PD output
[sigrok-test.git] / decoder / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the sigrok-test project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 import re
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import Differ
28 from hashlib import md5
29 from shutil import copy
30
31 DEBUG = 0
32 VERBOSE = False
33
34
35 class E_syntax(Exception):
36     pass
37 class E_badline(Exception):
38     pass
39
40 def INFO(msg, end='\n'):
41     if VERBOSE:
42         print(msg, end=end)
43         sys.stdout.flush()
44
45
46 def DBG(msg):
47     if DEBUG:
48         print(msg)
49
50
51 def ERR(msg):
52     print(msg, file=sys.stderr)
53
54
55 def usage(msg=None):
56     if msg:
57         print(msg.strip() + '\n')
58     print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
59   -d  Turn on debugging
60   -v  Verbose
61   -a  All tests
62   -l  List test(s)
63   -s  Show test(s)
64   -r  Run test(s)
65   -f  Fix failed test(s) / create initial output for new test(s)
66   -c  Report decoder code coverage
67   -R <directory>  Save test reports to <directory>
68   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
69     sys.exit()
70
71
72 def check_tclist(tc):
73     if 'pdlist' not in tc or not tc['pdlist']:
74         return("No protocol decoders")
75     if 'input' not in tc or not tc['input']:
76         return("No input")
77     if 'output' not in tc or not tc['output']:
78         return("No output")
79     for op in tc['output']:
80         if 'match' not in op:
81             return("No match in output")
82
83     return None
84
85
86 def parse_testfile(path, pd, tc, op_type, op_class):
87     DBG("Opening '%s'" % path)
88     tclist = []
89     for line in open(path).read().split('\n'):
90         try:
91             line = line.strip()
92             if len(line) == 0 or line[0] == "#":
93                 continue
94             f = line.split()
95             if not tclist and f[0] != "test":
96                 # That can't be good.
97                 raise E_badline
98             key = f.pop(0)
99             if key == 'test':
100                 if len(f) != 1:
101                     raise E_syntax
102                 # new testcase
103                 tclist.append({
104                     'pd': pd,
105                     'name': f[0],
106                     'pdlist': [],
107                     'output': [],
108                 })
109             elif key == 'protocol-decoder':
110                 if len(f) < 1:
111                     raise E_syntax
112                 pd_spec = {
113                     'name': f.pop(0),
114                     'channels': [],
115                     'options': [],
116                     'initial_pins': [],
117                 }
118                 while len(f):
119                     if len(f) == 1:
120                         # Always needs <key> <value>
121                         raise E_syntax
122                     a, b = f[:2]
123                     f = f[2:]
124                     if '=' not in b:
125                         raise E_syntax
126                     opt, val = b.split('=')
127                     if a == 'channel':
128                         try:
129                             val = int(val)
130                         except:
131                             raise E_syntax
132                         pd_spec['channels'].append([opt, val])
133                     elif a == 'option':
134                         pd_spec['options'].append([opt, val])
135                     elif a == 'initial_pin':
136                         try:
137                             val = int(val)
138                         except:
139                             raise E_syntax
140                         pd_spec['initial_pins'].append([opt, val])
141                     else:
142                         raise E_syntax
143                 tclist[-1]['pdlist'].append(pd_spec)
144             elif key == 'stack':
145                 if len(f) < 2:
146                     raise E_syntax
147                 tclist[-1]['stack'] = f
148             elif key == 'input':
149                 if len(f) != 1:
150                     raise E_syntax
151                 tclist[-1]['input'] = f[0]
152             elif key == 'output':
153                 op_spec = {
154                     'pd': f.pop(0),
155                     'type': f.pop(0),
156                 }
157                 while len(f):
158                     if len(f) == 1:
159                         # Always needs <key> <value>
160                         raise E_syntax
161                     a, b = f[:2]
162                     f = f[2:]
163                     if a == 'class':
164                         op_spec['class'] = b
165                     elif a == 'match':
166                         op_spec['match'] = b
167                     else:
168                         raise E_syntax
169                 tclist[-1]['output'].append(op_spec)
170             else:
171                 raise E_badline
172         except E_badline as e:
173             ERR("Invalid syntax in %s: line '%s'" % (path, line))
174             return []
175         except E_syntax as e:
176             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
177             return []
178
179     # If a specific testcase was requested, keep only that one.
180     if tc is not None:
181         target_tc = None
182         for t in tclist:
183             if t['name'] == tc:
184                 target_tc = t
185                 break
186         # ...and a specific output type
187         if op_type is not None:
188             target_oplist = []
189             for op in target_tc['output']:
190                 if op['type'] == op_type:
191                     # ...and a specific output class
192                     if op_class is None or ('class' in op and op['class'] == op_class):
193                         target_oplist.append(op)
194                         DBG("match on [%s]" % str(op))
195             target_tc['output'] = target_oplist
196         if target_tc is None:
197             tclist = []
198         else:
199             tclist = [target_tc]
200     for t in tclist:
201         error = check_tclist(t)
202         if error:
203             ERR("Error in %s: %s" % (path, error))
204             return []
205
206     return tclist
207
208
209 def get_tests(testnames):
210     tests = {}
211     for testspec in testnames:
212         # Optional testspec in the form pd/testcase/type/class
213         tc = op_type = op_class = None
214         ts = testspec.strip("/").split("/")
215         pd = ts.pop(0)
216         tests[pd] = []
217         if ts:
218             tc = ts.pop(0)
219         if ts:
220             op_type = ts.pop(0)
221         if ts:
222             op_class = ts.pop(0)
223         path = os.path.join(tests_dir, pd)
224         if not os.path.isdir(path):
225             # User specified non-existent PD
226             raise Exception("%s not found." % path)
227         path = os.path.join(tests_dir, pd, "test.conf")
228         if not os.path.exists(path):
229             # PD doesn't have any tests yet
230             continue
231         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
232
233     return tests
234
235
236 def diff_text(f1, f2):
237     t1 = open(f1).readlines()
238     t2 = open(f2).readlines()
239     diff = []
240     d = Differ()
241     for line in d.compare(t1, t2):
242         if line[:2] in ('- ', '+ '):
243             diff.append(line.strip())
244
245     return diff
246
247
248 def compare_binary(f1, f2):
249     h1 = md5()
250     h1.update(open(f1, 'rb').read())
251     h2 = md5()
252     h2.update(open(f2, 'rb').read())
253     if h1.digest() == h2.digest():
254         result = None
255     else:
256         result = ["Binary output does not match."]
257
258     return result
259
260
261 # runtc's stdout can have lines like:
262 # coverage: lines=161 missed=2 coverage=99%
263 def parse_stats(text):
264     stats = {}
265     for line in text.strip().split('\n'):
266         fields = line.split()
267         key = fields.pop(0).strip(':')
268         if key not in stats:
269             stats[key] = []
270         stats[key].append({})
271         for f in fields:
272             k, v = f.split('=')
273             stats[key][-1][k] = v
274
275     return stats
276
277
278 # take result set of all tests in a PD, and summarize which lines
279 # were not covered by any of the tests.
280 def coverage_sum(cvglist):
281     lines = 0
282     missed = 0
283     missed_lines = {}
284     for record in cvglist:
285         lines = int(record['lines'])
286         missed += int(record['missed'])
287         if 'missed_lines' not in record:
288             continue
289         for linespec in record['missed_lines'].split(','):
290             if linespec not in missed_lines:
291                 missed_lines[linespec] = 1
292             else:
293                 missed_lines[linespec] += 1
294
295     # keep only those lines that didn't show up in every non-summary record
296     final_missed = []
297     for linespec in missed_lines:
298         if missed_lines[linespec] != len(cvglist):
299             continue
300         final_missed.append(linespec)
301
302     return lines, final_missed
303
304
305 def run_tests(tests, fix=False):
306     errors = 0
307     results = []
308     cmd = [os.path.join(runtc_dir, 'runtc')]
309     if opt_coverage:
310         fd, coverage = mkstemp()
311         os.close(fd)
312         cmd.extend(['-c', coverage])
313     else:
314         coverage = None
315     for pd in sorted(tests.keys()):
316         pd_cvg = []
317         for tclist in tests[pd]:
318             for tc in tclist:
319                 args = cmd[:]
320                 if DEBUG > 1:
321                     args.append('-d')
322                 # Set up PD stack for this test.
323                 for spd in tc['pdlist']:
324                     args.extend(['-P', spd['name']])
325                     for label, channel in spd['channels']:
326                         args.extend(['-p', "%s=%d" % (label, channel)])
327                     for option, value in spd['options']:
328                         args.extend(['-o', "%s=%s" % (option, value)])
329                     for label, initial_pin in spd['initial_pins']:
330                         args.extend(['-N', "%s=%d" % (label, initial_pin)])
331                 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
332                 for op in tc['output']:
333                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
334                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
335                     if 'class' in op:
336                         opargs[-1] += ":%s" % op['class']
337                         name += "/%s" % op['class']
338                     if VERBOSE:
339                         dots = '.' * (77 - len(name) - 2)
340                         INFO("%s %s " % (name, dots), end='')
341                     results.append({
342                         'testcase': name,
343                     })
344                     try:
345                         fd, outfile = mkstemp()
346                         os.close(fd)
347                         opargs.extend(['-f', outfile])
348                         DBG("Running %s" % (' '.join(args + opargs)))
349                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
350                         stdout, stderr = p.communicate()
351                         if stdout:
352                             # statistics and coverage data on stdout
353                             results[-1].update(parse_stats(stdout.decode('utf-8')))
354                         if stderr:
355                             results[-1]['error'] = stderr.decode('utf-8').strip()
356                             errors += 1
357                         elif p.returncode != 0:
358                             # runtc indicated an error, but didn't output a
359                             # message on stderr about it
360                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
361                         if 'error' not in results[-1]:
362                             matchfile = os.path.join(tests_dir, op['pd'], op['match'])
363                             DBG("Comparing with %s" % matchfile)
364                             try:
365                                 diff = diff_error = None
366                                 if op['type'] in ('annotation', 'python'):
367                                     diff = diff_text(matchfile, outfile)
368                                 elif op['type'] == 'binary':
369                                     diff = compare_binary(matchfile, outfile)
370                                 else:
371                                     diff = ["Unsupported output type '%s'." % op['type']]
372                             except Exception as e:
373                                 diff_error = e
374                             if fix:
375                                 if diff or diff_error:
376                                     copy(outfile, matchfile)
377                                     DBG("Wrote %s" % matchfile)
378                             else:
379                                 if diff:
380                                     results[-1]['diff'] = diff
381                                 elif diff_error is not None:
382                                     raise diff_error
383                     except Exception as e:
384                         results[-1]['error'] = str(e)
385                     finally:
386                         if coverage:
387                             results[-1]['coverage_report'] = coverage
388                         os.unlink(outfile)
389                     if op['type'] == 'exception' and 'error' in results[-1]:
390                         # filter out the exception we were looking for
391                         reg = "^Error: srd: %s:" % op['match']
392                         if re.match(reg, results[-1]['error']):
393                             # found it, not an error
394                             results[-1].pop('error')
395                             errors -= 1
396                     if VERBOSE:
397                         if 'diff' in results[-1]:
398                             INFO("Output mismatch")
399                         elif 'error' in results[-1]:
400                             error = results[-1]['error']
401                             if len(error) > 20:
402                                 error = error[:17] + '...'
403                             INFO(error)
404                         elif 'coverage' in results[-1]:
405                             # report coverage of this PD
406                             for record in results[-1]['coverage']:
407                                 # but not others used in the stack
408                                 # as part of the test.
409                                 if record['scope'] == pd:
410                                     INFO(record['coverage'])
411                                     break
412                         else:
413                             INFO("OK")
414                     gen_report(results[-1])
415                     if coverage:
416                         os.unlink(coverage)
417                         # only keep track of coverage records for this PD,
418                         # not others in the stack just used for testing.
419                         for cvg in results[-1]['coverage']:
420                             if cvg['scope'] == pd:
421                                 pd_cvg.append(cvg)
422         if opt_coverage and len(pd_cvg) > 1:
423             # report total coverage of this PD, across all the tests
424             # that were done on it.
425             total_lines, missed_lines = coverage_sum(pd_cvg)
426             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
427             if VERBOSE:
428                 dots = '.' * (54 - len(pd) - 2)
429                 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
430             if report_dir:
431                 # generate a missing lines list across all the files in
432                 # the PD
433                 files = {}
434                 for entry in missed_lines:
435                     filename, line = entry.split(':')
436                     if filename not in files:
437                         files[filename] = []
438                     files[filename].append(line)
439                 text = ''
440                 for filename in sorted(files.keys()):
441                     line_list = ','.join(sorted(files[filename], key=int))
442                     text += "%s: %s\n" % (filename, line_list)
443                 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
444
445
446     return results, errors
447
448 def get_run_tests_error_diff_counts(results):
449     """Get error and diff counters from run_tests() results."""
450     errs = 0
451     diffs = 0
452     for result in results:
453         if 'error' in result:
454             errs += 1
455         if 'diff' in result:
456             diffs += 1
457     return errs, diffs
458
459
460 def gen_report(result):
461     out = []
462     if 'error' in result:
463         out.append("Error:")
464         out.append(result['error'])
465         out.append('')
466     if 'diff' in result:
467         out.append("Test output mismatch:")
468         out.extend(result['diff'])
469         out.append('')
470     if 'coverage_report' in result:
471         out.append(open(result['coverage_report'], 'r').read())
472         out.append('')
473
474     if out:
475         text = "Testcase: %s\n" % result['testcase']
476         text += '\n'.join(out)
477     else:
478         return
479
480     if report_dir:
481         filename = result['testcase'].replace('/', '_')
482         open(os.path.join(report_dir, filename), 'w').write(text)
483     else:
484         print(text)
485
486
487 def show_tests(tests):
488     for pd in sorted(tests.keys()):
489         for tclist in tests[pd]:
490             for tc in tclist:
491                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
492                 for pd in tc['pdlist']:
493                     print("  Protocol decoder: %s" % pd['name'])
494                     for label, channel in pd['channels']:
495                         print("    Channel %s=%d" % (label, channel))
496                     for option, value in pd['options']:
497                         print("    Option %s=%s" % (option, value))
498                     for label, initial_pin in pd['initial_pins']:
499                         print("    Initial pin %s=%d" % (label, initial_pin))
500                 if 'stack' in tc:
501                     print("  Stack: %s" % ' '.join(tc['stack']))
502                 print("  Input: %s" % tc['input'])
503                 for op in tc['output']:
504                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
505                     print("    Type: %s" % op['type'])
506                     if 'class' in op:
507                         print("    Class: %s" % op['class'])
508                     print("    Match: %s" % op['match'])
509             print()
510
511
512 def list_tests(tests):
513     for pd in sorted(tests.keys()):
514         for tclist in tests[pd]:
515             for tc in tclist:
516                 for op in tc['output']:
517                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
518                     if 'class' in op:
519                         line += "/%s" % op['class']
520                     print(line)
521
522
523 #
524 # main
525 #
526
527 # project root
528 runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
529 base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
530 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
531 tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
532
533 if len(sys.argv) == 1:
534     usage()
535
536 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
537 report_dir = None
538 try:
539     opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
540 except Exception as e:
541     usage('error while parsing command line arguments: {}'.format(e))
542 for opt, arg in opts:
543     if opt == '-d':
544         DEBUG += 1
545     if opt == '-v':
546         VERBOSE = True
547     elif opt == '-a':
548         opt_all = True
549     elif opt == '-r':
550         opt_run = True
551     elif opt == '-s':
552         opt_show = True
553     elif opt == '-l':
554         opt_list = True
555     elif opt == '-f':
556         opt_fix = True
557     elif opt == '-c':
558         opt_coverage = True
559     elif opt == '-R':
560         report_dir = arg
561     elif opt == '-S':
562         dumps_dir = arg
563
564 if opt_run and opt_show:
565     usage("Use either -s or -r, not both.")
566 if args and opt_all:
567     usage("Specify either -a or tests, not both.")
568 if report_dir is not None and not os.path.isdir(report_dir):
569     usage("%s is not a directory" % report_dir)
570
571 ret = 0
572 try:
573     if args:
574         testlist = get_tests(args)
575     elif opt_all or opt_list:
576         testlist = get_tests(os.listdir(tests_dir))
577     else:
578         usage("Specify either -a or tests.")
579
580     if opt_run:
581         if not os.path.isdir(dumps_dir):
582             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
583             sys.exit(1)
584         results, errors = run_tests(testlist, fix=opt_fix)
585         ret = 0
586         errs, diffs = get_run_tests_error_diff_counts(results)
587         if errs:
588             ret = 1
589         elif diffs:
590             ret = 2
591     elif opt_show:
592         show_tests(testlist)
593     elif opt_list:
594         list_tests(testlist)
595     elif opt_fix:
596         run_tests(testlist, fix=True)
597     else:
598         usage()
599 except Exception as e:
600     print("Error: %s" % str(e))
601     if DEBUG:
602         raise
603
604 sys.exit(ret)