]> sigrok.org Git - libsigrokdecode.git/blob - tests/pdtest
pdtest/runtc: Support for output type 'exception'.
[libsigrokdecode.git] / tests / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the libsigrokdecode project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 import re
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import Differ
28 from hashlib import md5
29 from shutil import copy
30
31 DEBUG = 0
32 VERBOSE = False
33
34
35 class E_syntax(Exception):
36     pass
37 class E_badline(Exception):
38     pass
39
40 def INFO(msg, end='\n'):
41     if VERBOSE:
42         print(msg, end=end)
43         sys.stdout.flush()
44
45
46 def DBG(msg):
47     if DEBUG:
48         print(msg)
49
50
51 def ERR(msg):
52     print(msg, file=sys.stderr)
53
54
55 def usage(msg=None):
56     if msg:
57         print(msg.strip() + '\n')
58     print("""Usage: testpd [-dvarslR] [test, ...]
59   -d  Turn on debugging
60   -v  Verbose
61   -a  All tests
62   -l  List all tests
63   -s  Show test(s)
64   -r  Run test(s)
65   -f  Fix failed test(s)
66   -c  Report decoder code coverage
67   -R <directory>  Save test reports to <directory>
68   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
69     sys.exit()
70
71
72 def check_tclist(tc):
73     if 'pdlist' not in tc or not tc['pdlist']:
74         return("No protocol decoders")
75     if 'input' not in tc or not tc['input']:
76         return("No input")
77     if 'output' not in tc or not tc['output']:
78         return("No output")
79     for op in tc['output']:
80         if 'match' not in op:
81             return("No match in output")
82
83     return None
84
85
86 def parse_testfile(path, pd, tc, op_type, op_class):
87     DBG("Opening '%s'" % path)
88     tclist = []
89     for line in open(path).read().split('\n'):
90         try:
91             line = line.strip()
92             if len(line) == 0 or line[0] == "#":
93                 continue
94             f = line.split()
95             if not tclist and f[0] != "test":
96                 # That can't be good.
97                 raise E_badline
98             key = f.pop(0)
99             if key == 'test':
100                 if len(f) != 1:
101                     raise E_syntax
102                 # new testcase
103                 tclist.append({
104                     'pd': pd,
105                     'name': f[0],
106                     'pdlist': [],
107                     'output': [],
108                 })
109             elif key == 'protocol-decoder':
110                 if len(f) < 1:
111                     raise E_syntax
112                 pd_spec = {
113                     'name': f.pop(0),
114                     'channels': [],
115                     'options': [],
116                 }
117                 while len(f):
118                     if len(f) == 1:
119                         # Always needs <key> <value>
120                         raise E_syntax
121                     a, b = f[:2]
122                     f = f[2:]
123                     if '=' not in b:
124                         raise E_syntax
125                     opt, val = b.split('=')
126                     if a == 'channel':
127                         try:
128                             val = int(val)
129                         except:
130                             raise E_syntax
131                         pd_spec['channels'].append([opt, val])
132                     elif a == 'option':
133                         pd_spec['options'].append([opt, val])
134                     else:
135                         raise E_syntax
136                 tclist[-1]['pdlist'].append(pd_spec)
137             elif key == 'stack':
138                 if len(f) < 2:
139                     raise E_syntax
140                 tclist[-1]['stack'] = f
141             elif key == 'input':
142                 if len(f) != 1:
143                     raise E_syntax
144                 tclist[-1]['input'] = f[0]
145             elif key == 'output':
146                 op_spec = {
147                     'pd': f.pop(0),
148                     'type': f.pop(0),
149                 }
150                 while len(f):
151                     if len(f) == 1:
152                         # Always needs <key> <value>
153                         raise E_syntax
154                     a, b = f[:2]
155                     f = f[2:]
156                     if a == 'class':
157                         op_spec['class'] = b
158                     elif a == 'match':
159                         op_spec['match'] = b
160                     else:
161                         raise E_syntax
162                 tclist[-1]['output'].append(op_spec)
163             else:
164                 raise E_badline
165         except E_badline as e:
166             ERR("Invalid syntax in %s: line '%s'" % (path, line))
167             return []
168         except E_syntax as e:
169             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
170             return []
171
172     # If a specific testcase was requested, keep only that one.
173     if tc is not None:
174         target_tc = None
175         for t in tclist:
176             if t['name'] == tc:
177                 target_tc = t
178                 break
179         # ...and a specific output type
180         if op_type is not None:
181             target_oplist = []
182             for op in target_tc['output']:
183                 if op['type'] == op_type:
184                     # ...and a specific output class
185                     if op_class is None or ('class' in op and op['class'] == op_class):
186                         target_oplist.append(op)
187                         DBG("match on [%s]" % str(op))
188             target_tc['output'] = target_oplist
189         if target_tc is None:
190             tclist = []
191         else:
192             tclist = [target_tc]
193     for t in tclist:
194         error = check_tclist(t)
195         if error:
196             ERR("Error in %s: %s" % (path, error))
197             return []
198
199     return tclist
200
201
202 def get_tests(testnames):
203     tests = {}
204     for testspec in testnames:
205         # Optional testspec in the form pd/testcase/type/class
206         tc = op_type = op_class = None
207         ts = testspec.strip("/").split("/")
208         pd = ts.pop(0)
209         tests[pd] = []
210         if ts:
211             tc = ts.pop(0)
212         if ts:
213             op_type = ts.pop(0)
214         if ts:
215             op_class = ts.pop(0)
216         path = os.path.join(decoders_dir, pd)
217         if not os.path.isdir(path):
218             # User specified non-existent PD
219             raise Exception("%s not found." % path)
220         path = os.path.join(decoders_dir, pd, "test/test.conf")
221         if not os.path.exists(path):
222             # PD doesn't have any tests yet
223             continue
224         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
225
226     return tests
227
228
229 def diff_text(f1, f2):
230     t1 = open(f1).readlines()
231     t2 = open(f2).readlines()
232     diff = []
233     d = Differ()
234     for line in d.compare(t1, t2):
235         if line[:2] in ('- ', '+ '):
236             diff.append(line.strip())
237
238     return diff
239
240
241 def compare_binary(f1, f2):
242     h1 = md5()
243     h1.update(open(f1, 'rb').read())
244     h2 = md5()
245     h2.update(open(f2, 'rb').read())
246     if h1.digest() == h2.digest():
247         result = None
248     else:
249         result = ["Binary output does not match."]
250
251     return result
252
253
254 # runtc's stdout can have lines like:
255 # coverage: lines=161 missed=2 coverage=99%
256 def parse_stats(text):
257     stats = {}
258     for line in text.strip().split('\n'):
259         fields = line.split()
260         key = fields.pop(0).strip(':')
261         if key not in stats:
262             stats[key] = []
263         stats[key].append({})
264         for f in fields:
265             k, v = f.split('=')
266             stats[key][-1][k] = v
267
268     return stats
269
270
271 # take result set of all tests in a PD, and summarize which lines
272 # were not covered by any of the tests.
273 def coverage_sum(cvglist):
274     lines = 0
275     missed = 0
276     missed_lines = {}
277     for record in cvglist:
278         lines = int(record['lines'])
279         missed += int(record['missed'])
280         if 'missed_lines' not in record:
281             continue
282         for linespec in record['missed_lines'].split(','):
283             if linespec not in missed_lines:
284                 missed_lines[linespec] = 1
285             else:
286                 missed_lines[linespec] += 1
287
288     # keep only those lines that didn't show up in every non-summary record
289     final_missed = []
290     for linespec in missed_lines:
291         if missed_lines[linespec] != len(cvglist):
292             continue
293         final_missed.append(linespec)
294
295     return lines, final_missed
296
297
298 def run_tests(tests, fix=False):
299     errors = 0
300     results = []
301     cmd = [os.path.join(tests_dir, 'runtc')]
302     if opt_coverage:
303         fd, coverage = mkstemp()
304         os.close(fd)
305         cmd.extend(['-c', coverage])
306     else:
307         coverage = None
308     for pd in sorted(tests.keys()):
309         pd_cvg = []
310         for tclist in tests[pd]:
311             for tc in tclist:
312                 args = cmd.copy()
313                 if DEBUG > 1:
314                     args.append('-d')
315                 # Set up PD stack for this test.
316                 for spd in tc['pdlist']:
317                     args.extend(['-P', spd['name']])
318                     for label, channel in spd['channels']:
319                         args.extend(['-p', "%s=%d" % (label, channel)])
320                     for option, value in spd['options']:
321                         args.extend(['-o', "%s=%s" % (option, value)])
322                 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
323                 for op in tc['output']:
324                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
325                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
326                     if 'class' in op:
327                         opargs[-1] += ":%s" % op['class']
328                         name += "/%s" % op['class']
329                     if VERBOSE:
330                         dots = '.' * (60 - len(name) - 2)
331                         INFO("%s %s " % (name, dots), end='')
332                     results.append({
333                         'testcase': name,
334                     })
335                     try:
336                         fd, outfile = mkstemp()
337                         os.close(fd)
338                         opargs.extend(['-f', outfile])
339                         DBG("Running %s" % (' '.join(args + opargs)))
340                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
341                         stdout, stderr = p.communicate()
342                         if stdout:
343                             # statistics and coverage data on stdout
344                             results[-1].update(parse_stats(stdout.decode('utf-8')))
345                         if stderr:
346                             results[-1]['error'] = stderr.decode('utf-8').strip()
347                             errors += 1
348                         elif p.returncode != 0:
349                             # runtc indicated an error, but didn't output a
350                             # message on stderr about it
351                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
352                         if 'error' not in results[-1]:
353                             matchfile = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
354                             DBG("Comparing with %s" % matchfile)
355                             try:
356                                 diff = diff_error = None
357                                 if op['type'] in ('annotation', 'python'):
358                                     diff = diff_text(matchfile, outfile)
359                                 elif op['type'] == 'binary':
360                                     diff = compare_binary(matchfile, outfile)
361                                 else:
362                                     diff = ["Unsupported output type '%s'." % op['type']]
363                             except Exception as e:
364                                 diff_error = e
365                             if fix:
366                                 if diff or diff_error:
367                                     copy(outfile, matchfile)
368                                     DBG("Wrote %s" % matchfile)
369                             else:
370                                 if diff:
371                                     results[-1]['diff'] = diff
372                                 elif diff_error is not None:
373                                     raise diff_error
374                     except Exception as e:
375                         results[-1]['error'] = str(e)
376                     finally:
377                         if coverage:
378                             results[-1]['coverage_report'] = coverage
379                         os.unlink(outfile)
380                     if op['type'] == 'exception' and 'error' in results[-1]:
381                         # filter out the exception we were looking for
382                         reg = "^Error: srd: Protocol decoder instance %s: %s:" % (op['pd'], op['match'])
383                         if re.match(reg, results[-1]['error']):
384                             # found it, not an error
385                             results[-1].pop('error')
386                     if VERBOSE:
387                         if 'diff' in results[-1]:
388                             INFO("Output mismatch")
389                         elif 'error' in results[-1]:
390                             error = results[-1]['error']
391                             if len(error) > 20:
392                                 error = error[:17] + '...'
393                             INFO(error)
394                         elif 'coverage' in results[-1]:
395                             # report coverage of this PD
396                             for record in results[-1]['coverage']:
397                                 # but not others used in the stack
398                                 # as part of the test.
399                                 if record['scope'] == pd:
400                                     INFO(record['coverage'])
401                                     break
402                         else:
403                             INFO("OK")
404                     gen_report(results[-1])
405                     if coverage:
406                         os.unlink(coverage)
407                         # only keep track of coverage records for this PD,
408                         # not others in the stack just used for testing.
409                         for cvg in results[-1]['coverage']:
410                             if cvg['scope'] == pd:
411                                 pd_cvg.append(cvg)
412         if VERBOSE and opt_coverage and len(pd_cvg) > 1:
413             # report total coverage of this PD, across all the tests
414             # that were done on it.
415             total_lines, missed_lines = coverage_sum(pd_cvg)
416             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
417             if VERBOSE:
418                 dots = '.' * (54 - len(pd) - 2)
419             INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
420
421     return results, errors
422
423
424 def gen_report(result):
425     out = []
426     if 'error' in result:
427         out.append("Error:")
428         out.append(result['error'])
429         out.append('')
430     if 'diff' in result:
431         out.append("Test output mismatch:")
432         out.extend(result['diff'])
433         out.append('')
434     if 'coverage_report' in result:
435         out.append(open(result['coverage_report'], 'r').read())
436         out.append('')
437
438     if out:
439         text = "Testcase: %s\n" % result['testcase']
440         text += '\n'.join(out)
441     else:
442         return
443
444     if report_dir:
445         filename = result['testcase'].replace('/', '_')
446         open(os.path.join(report_dir, filename), 'w').write(text)
447     else:
448         print(text)
449
450
451 def show_tests(tests):
452     for pd in sorted(tests.keys()):
453         for tclist in tests[pd]:
454             for tc in tclist:
455                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
456                 for pd in tc['pdlist']:
457                     print("  Protocol decoder: %s" % pd['name'])
458                     for label, channel in pd['channels']:
459                         print("    Channel %s=%d" % (label, channel))
460                     for option, value in pd['options']:
461                         print("    Option %s=%d" % (option, value))
462                 if 'stack' in tc:
463                     print("  Stack: %s" % ' '.join(tc['stack']))
464                 print("  Input: %s" % tc['input'])
465                 for op in tc['output']:
466                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
467                     print("    Type: %s" % op['type'])
468                     if 'class' in op:
469                         print("    Class: %s" % op['class'])
470                     print("    Match: %s" % op['match'])
471             print()
472
473
474 def list_tests(tests):
475     for pd in sorted(tests.keys()):
476         for tclist in tests[pd]:
477             for tc in tclist:
478                 for op in tc['output']:
479                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
480                     if 'class' in op:
481                         line += "/%s" % op['class']
482                     print(line)
483
484
485 #
486 # main
487 #
488
489 # project root
490 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
491 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
492 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
493 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
494
495 if len(sys.argv) == 1:
496     usage()
497
498 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
499 report_dir = None
500 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
501 for opt, arg in opts:
502     if opt == '-d':
503         DEBUG += 1
504     if opt == '-v':
505         VERBOSE = True
506     elif opt == '-a':
507         opt_all = True
508     elif opt == '-r':
509         opt_run = True
510     elif opt == '-s':
511         opt_show = True
512     elif opt == '-l':
513         opt_list = True
514     elif opt == '-f':
515         opt_fix = True
516     elif opt == '-c':
517         opt_coverage = True
518     elif opt == '-R':
519         report_dir = arg
520     elif opt == '-S':
521         dumps_dir = arg
522
523 if opt_run and opt_show:
524     usage("Use either -s or -r, not both.")
525 if args and opt_all:
526     usage("Specify either -a or tests, not both.")
527 if report_dir is not None and not os.path.isdir(report_dir):
528     usage("%s is not a directory" % report_dir)
529
530 ret = 0
531 try:
532     if args:
533         testlist = get_tests(args)
534     elif opt_all:
535         testlist = get_tests(os.listdir(decoders_dir))
536     else:
537         usage("Specify either -a or tests.")
538
539     if opt_run:
540         if not os.path.isdir(dumps_dir):
541             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
542             sys.exit(1)
543         results, errors = run_tests(testlist, fix=opt_fix)
544         ret = errors
545     elif opt_show:
546         show_tests(testlist)
547     elif opt_list:
548         list_tests(testlist)
549     elif opt_fix:
550         run_tests(testlist, fix=True)
551     else:
552         usage()
553 except Exception as e:
554     print("Error: %s" % str(e))
555     if DEBUG:
556         raise
557
558 sys.exit(ret)
559