pdtest: Minor help text improvements.
[sigrok-test.git] / decoder / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the sigrok-test project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 import re
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import Differ
28 from hashlib import md5
29 from shutil import copy
30
31 DEBUG = 0
32 VERBOSE = False
33
34
35 class E_syntax(Exception):
36     pass
37 class E_badline(Exception):
38     pass
39
40 def INFO(msg, end='\n'):
41     if VERBOSE:
42         print(msg, end=end)
43         sys.stdout.flush()
44
45
46 def DBG(msg):
47     if DEBUG:
48         print(msg)
49
50
51 def ERR(msg):
52     print(msg, file=sys.stderr)
53
54
55 def usage(msg=None):
56     if msg:
57         print(msg.strip() + '\n')
58     print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
59   -d  Turn on debugging
60   -v  Verbose
61   -a  All tests
62   -l  List test(s)
63   -s  Show test(s)
64   -r  Run test(s)
65   -f  Fix failed test(s)
66   -c  Report decoder code coverage
67   -R <directory>  Save test reports to <directory>
68   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
69     sys.exit()
70
71
72 def check_tclist(tc):
73     if 'pdlist' not in tc or not tc['pdlist']:
74         return("No protocol decoders")
75     if 'input' not in tc or not tc['input']:
76         return("No input")
77     if 'output' not in tc or not tc['output']:
78         return("No output")
79     for op in tc['output']:
80         if 'match' not in op:
81             return("No match in output")
82
83     return None
84
85
86 def parse_testfile(path, pd, tc, op_type, op_class):
87     DBG("Opening '%s'" % path)
88     tclist = []
89     for line in open(path).read().split('\n'):
90         try:
91             line = line.strip()
92             if len(line) == 0 or line[0] == "#":
93                 continue
94             f = line.split()
95             if not tclist and f[0] != "test":
96                 # That can't be good.
97                 raise E_badline
98             key = f.pop(0)
99             if key == 'test':
100                 if len(f) != 1:
101                     raise E_syntax
102                 # new testcase
103                 tclist.append({
104                     'pd': pd,
105                     'name': f[0],
106                     'pdlist': [],
107                     'output': [],
108                 })
109             elif key == 'protocol-decoder':
110                 if len(f) < 1:
111                     raise E_syntax
112                 pd_spec = {
113                     'name': f.pop(0),
114                     'channels': [],
115                     'options': [],
116                 }
117                 while len(f):
118                     if len(f) == 1:
119                         # Always needs <key> <value>
120                         raise E_syntax
121                     a, b = f[:2]
122                     f = f[2:]
123                     if '=' not in b:
124                         raise E_syntax
125                     opt, val = b.split('=')
126                     if a == 'channel':
127                         try:
128                             val = int(val)
129                         except:
130                             raise E_syntax
131                         pd_spec['channels'].append([opt, val])
132                     elif a == 'option':
133                         pd_spec['options'].append([opt, val])
134                     else:
135                         raise E_syntax
136                 tclist[-1]['pdlist'].append(pd_spec)
137             elif key == 'stack':
138                 if len(f) < 2:
139                     raise E_syntax
140                 tclist[-1]['stack'] = f
141             elif key == 'input':
142                 if len(f) != 1:
143                     raise E_syntax
144                 tclist[-1]['input'] = f[0]
145             elif key == 'output':
146                 op_spec = {
147                     'pd': f.pop(0),
148                     'type': f.pop(0),
149                 }
150                 while len(f):
151                     if len(f) == 1:
152                         # Always needs <key> <value>
153                         raise E_syntax
154                     a, b = f[:2]
155                     f = f[2:]
156                     if a == 'class':
157                         op_spec['class'] = b
158                     elif a == 'match':
159                         op_spec['match'] = b
160                     else:
161                         raise E_syntax
162                 tclist[-1]['output'].append(op_spec)
163             else:
164                 raise E_badline
165         except E_badline as e:
166             ERR("Invalid syntax in %s: line '%s'" % (path, line))
167             return []
168         except E_syntax as e:
169             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
170             return []
171
172     # If a specific testcase was requested, keep only that one.
173     if tc is not None:
174         target_tc = None
175         for t in tclist:
176             if t['name'] == tc:
177                 target_tc = t
178                 break
179         # ...and a specific output type
180         if op_type is not None:
181             target_oplist = []
182             for op in target_tc['output']:
183                 if op['type'] == op_type:
184                     # ...and a specific output class
185                     if op_class is None or ('class' in op and op['class'] == op_class):
186                         target_oplist.append(op)
187                         DBG("match on [%s]" % str(op))
188             target_tc['output'] = target_oplist
189         if target_tc is None:
190             tclist = []
191         else:
192             tclist = [target_tc]
193     for t in tclist:
194         error = check_tclist(t)
195         if error:
196             ERR("Error in %s: %s" % (path, error))
197             return []
198
199     return tclist
200
201
202 def get_tests(testnames):
203     tests = {}
204     for testspec in testnames:
205         # Optional testspec in the form pd/testcase/type/class
206         tc = op_type = op_class = None
207         ts = testspec.strip("/").split("/")
208         pd = ts.pop(0)
209         tests[pd] = []
210         if ts:
211             tc = ts.pop(0)
212         if ts:
213             op_type = ts.pop(0)
214         if ts:
215             op_class = ts.pop(0)
216         path = os.path.join(tests_dir, pd)
217         if not os.path.isdir(path):
218             # User specified non-existent PD
219             raise Exception("%s not found." % path)
220         path = os.path.join(tests_dir, pd, "test.conf")
221         if not os.path.exists(path):
222             # PD doesn't have any tests yet
223             continue
224         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
225
226     return tests
227
228
229 def diff_text(f1, f2):
230     t1 = open(f1).readlines()
231     t2 = open(f2).readlines()
232     diff = []
233     d = Differ()
234     for line in d.compare(t1, t2):
235         if line[:2] in ('- ', '+ '):
236             diff.append(line.strip())
237
238     return diff
239
240
241 def compare_binary(f1, f2):
242     h1 = md5()
243     h1.update(open(f1, 'rb').read())
244     h2 = md5()
245     h2.update(open(f2, 'rb').read())
246     if h1.digest() == h2.digest():
247         result = None
248     else:
249         result = ["Binary output does not match."]
250
251     return result
252
253
254 # runtc's stdout can have lines like:
255 # coverage: lines=161 missed=2 coverage=99%
256 def parse_stats(text):
257     stats = {}
258     for line in text.strip().split('\n'):
259         fields = line.split()
260         key = fields.pop(0).strip(':')
261         if key not in stats:
262             stats[key] = []
263         stats[key].append({})
264         for f in fields:
265             k, v = f.split('=')
266             stats[key][-1][k] = v
267
268     return stats
269
270
271 # take result set of all tests in a PD, and summarize which lines
272 # were not covered by any of the tests.
273 def coverage_sum(cvglist):
274     lines = 0
275     missed = 0
276     missed_lines = {}
277     for record in cvglist:
278         lines = int(record['lines'])
279         missed += int(record['missed'])
280         if 'missed_lines' not in record:
281             continue
282         for linespec in record['missed_lines'].split(','):
283             if linespec not in missed_lines:
284                 missed_lines[linespec] = 1
285             else:
286                 missed_lines[linespec] += 1
287
288     # keep only those lines that didn't show up in every non-summary record
289     final_missed = []
290     for linespec in missed_lines:
291         if missed_lines[linespec] != len(cvglist):
292             continue
293         final_missed.append(linespec)
294
295     return lines, final_missed
296
297
298 def run_tests(tests, fix=False):
299     errors = 0
300     results = []
301     cmd = [os.path.join(runtc_dir, 'runtc')]
302     if opt_coverage:
303         fd, coverage = mkstemp()
304         os.close(fd)
305         cmd.extend(['-c', coverage])
306     else:
307         coverage = None
308     for pd in sorted(tests.keys()):
309         pd_cvg = []
310         for tclist in tests[pd]:
311             for tc in tclist:
312                 args = cmd[:]
313                 if DEBUG > 1:
314                     args.append('-d')
315                 # Set up PD stack for this test.
316                 for spd in tc['pdlist']:
317                     args.extend(['-P', spd['name']])
318                     for label, channel in spd['channels']:
319                         args.extend(['-p', "%s=%d" % (label, channel)])
320                     for option, value in spd['options']:
321                         args.extend(['-o', "%s=%s" % (option, value)])
322                 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
323                 for op in tc['output']:
324                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
325                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
326                     if 'class' in op:
327                         opargs[-1] += ":%s" % op['class']
328                         name += "/%s" % op['class']
329                     if VERBOSE:
330                         dots = '.' * (77 - len(name) - 2)
331                         INFO("%s %s " % (name, dots), end='')
332                     results.append({
333                         'testcase': name,
334                     })
335                     try:
336                         fd, outfile = mkstemp()
337                         os.close(fd)
338                         opargs.extend(['-f', outfile])
339                         DBG("Running %s" % (' '.join(args + opargs)))
340                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
341                         stdout, stderr = p.communicate()
342                         if stdout:
343                             # statistics and coverage data on stdout
344                             results[-1].update(parse_stats(stdout.decode('utf-8')))
345                         if stderr:
346                             results[-1]['error'] = stderr.decode('utf-8').strip()
347                             errors += 1
348                         elif p.returncode != 0:
349                             # runtc indicated an error, but didn't output a
350                             # message on stderr about it
351                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
352                         if 'error' not in results[-1]:
353                             matchfile = os.path.join(tests_dir, op['pd'], op['match'])
354                             DBG("Comparing with %s" % matchfile)
355                             try:
356                                 diff = diff_error = None
357                                 if op['type'] in ('annotation', 'python'):
358                                     diff = diff_text(matchfile, outfile)
359                                 elif op['type'] == 'binary':
360                                     diff = compare_binary(matchfile, outfile)
361                                 else:
362                                     diff = ["Unsupported output type '%s'." % op['type']]
363                             except Exception as e:
364                                 diff_error = e
365                             if fix:
366                                 if diff or diff_error:
367                                     copy(outfile, matchfile)
368                                     DBG("Wrote %s" % matchfile)
369                             else:
370                                 if diff:
371                                     results[-1]['diff'] = diff
372                                 elif diff_error is not None:
373                                     raise diff_error
374                     except Exception as e:
375                         results[-1]['error'] = str(e)
376                     finally:
377                         if coverage:
378                             results[-1]['coverage_report'] = coverage
379                         os.unlink(outfile)
380                     if op['type'] == 'exception' and 'error' in results[-1]:
381                         # filter out the exception we were looking for
382                         reg = "^Error: srd: %s:" % op['match']
383                         if re.match(reg, results[-1]['error']):
384                             # found it, not an error
385                             results[-1].pop('error')
386                             errors -= 1
387                     if VERBOSE:
388                         if 'diff' in results[-1]:
389                             INFO("Output mismatch")
390                         elif 'error' in results[-1]:
391                             error = results[-1]['error']
392                             if len(error) > 20:
393                                 error = error[:17] + '...'
394                             INFO(error)
395                         elif 'coverage' in results[-1]:
396                             # report coverage of this PD
397                             for record in results[-1]['coverage']:
398                                 # but not others used in the stack
399                                 # as part of the test.
400                                 if record['scope'] == pd:
401                                     INFO(record['coverage'])
402                                     break
403                         else:
404                             INFO("OK")
405                     gen_report(results[-1])
406                     if coverage:
407                         os.unlink(coverage)
408                         # only keep track of coverage records for this PD,
409                         # not others in the stack just used for testing.
410                         for cvg in results[-1]['coverage']:
411                             if cvg['scope'] == pd:
412                                 pd_cvg.append(cvg)
413         if opt_coverage and len(pd_cvg) > 1:
414             # report total coverage of this PD, across all the tests
415             # that were done on it.
416             total_lines, missed_lines = coverage_sum(pd_cvg)
417             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
418             if VERBOSE:
419                 dots = '.' * (54 - len(pd) - 2)
420                 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
421             if report_dir:
422                 # generate a missing lines list across all the files in
423                 # the PD
424                 files = {}
425                 for entry in missed_lines:
426                     filename, line = entry.split(':')
427                     if filename not in files:
428                         files[filename] = []
429                     files[filename].append(line)
430                 text = ''
431                 for filename in sorted(files.keys()):
432                     line_list = ','.join(sorted(files[filename], key=int))
433                     text += "%s: %s\n" % (filename, line_list)
434                 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
435
436
437     return results, errors
438
439
440 def gen_report(result):
441     out = []
442     if 'error' in result:
443         out.append("Error:")
444         out.append(result['error'])
445         out.append('')
446     if 'diff' in result:
447         out.append("Test output mismatch:")
448         out.extend(result['diff'])
449         out.append('')
450     if 'coverage_report' in result:
451         out.append(open(result['coverage_report'], 'r').read())
452         out.append('')
453
454     if out:
455         text = "Testcase: %s\n" % result['testcase']
456         text += '\n'.join(out)
457     else:
458         return
459
460     if report_dir:
461         filename = result['testcase'].replace('/', '_')
462         open(os.path.join(report_dir, filename), 'w').write(text)
463     else:
464         print(text)
465
466
467 def show_tests(tests):
468     for pd in sorted(tests.keys()):
469         for tclist in tests[pd]:
470             for tc in tclist:
471                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
472                 for pd in tc['pdlist']:
473                     print("  Protocol decoder: %s" % pd['name'])
474                     for label, channel in pd['channels']:
475                         print("    Channel %s=%d" % (label, channel))
476                     for option, value in pd['options']:
477                         print("    Option %s=%d" % (option, value))
478                 if 'stack' in tc:
479                     print("  Stack: %s" % ' '.join(tc['stack']))
480                 print("  Input: %s" % tc['input'])
481                 for op in tc['output']:
482                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
483                     print("    Type: %s" % op['type'])
484                     if 'class' in op:
485                         print("    Class: %s" % op['class'])
486                     print("    Match: %s" % op['match'])
487             print()
488
489
490 def list_tests(tests):
491     for pd in sorted(tests.keys()):
492         for tclist in tests[pd]:
493             for tc in tclist:
494                 for op in tc['output']:
495                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
496                     if 'class' in op:
497                         line += "/%s" % op['class']
498                     print(line)
499
500
501 #
502 # main
503 #
504
505 # project root
506 runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
507 base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
508 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
509 tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
510
511 if len(sys.argv) == 1:
512     usage()
513
514 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
515 report_dir = None
516 try:
517     opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
518 except Exception as e:
519     usage('error while parsing command line arguments: {}'.format(e))
520 for opt, arg in opts:
521     if opt == '-d':
522         DEBUG += 1
523     if opt == '-v':
524         VERBOSE = True
525     elif opt == '-a':
526         opt_all = True
527     elif opt == '-r':
528         opt_run = True
529     elif opt == '-s':
530         opt_show = True
531     elif opt == '-l':
532         opt_list = True
533     elif opt == '-f':
534         opt_fix = True
535     elif opt == '-c':
536         opt_coverage = True
537     elif opt == '-R':
538         report_dir = arg
539     elif opt == '-S':
540         dumps_dir = arg
541
542 if opt_run and opt_show:
543     usage("Use either -s or -r, not both.")
544 if args and opt_all:
545     usage("Specify either -a or tests, not both.")
546 if report_dir is not None and not os.path.isdir(report_dir):
547     usage("%s is not a directory" % report_dir)
548
549 ret = 0
550 try:
551     if args:
552         testlist = get_tests(args)
553     elif opt_all:
554         testlist = get_tests(os.listdir(tests_dir))
555     else:
556         usage("Specify either -a or tests.")
557
558     if opt_run:
559         if not os.path.isdir(dumps_dir):
560             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
561             sys.exit(1)
562         results, errors = run_tests(testlist, fix=opt_fix)
563         ret = errors
564     elif opt_show:
565         show_tests(testlist)
566     elif opt_list:
567         list_tests(testlist)
568     elif opt_fix:
569         run_tests(testlist, fix=True)
570     else:
571         usage()
572 except Exception as e:
573     print("Error: %s" % str(e))
574     if DEBUG:
575         raise
576
577 sys.exit(ret)
578