]> sigrok.org Git - sigrok-test.git/blame - decoder/pdtest
pdtest: add support for input format specs in test.conf files
[sigrok-test.git] / decoder / pdtest
CommitLineData
dd37a782
UH
1#!/usr/bin/env python3
2##
3## This file is part of the sigrok-test project.
4##
5## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6##
7## This program is free software: you can redistribute it and/or modify
8## it under the terms of the GNU General Public License as published by
9## the Free Software Foundation, either version 3 of the License, or
10## (at your option) any later version.
11##
12## This program is distributed in the hope that it will be useful,
13## but WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15## GNU General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with this program. If not, see <http://www.gnu.org/licenses/>.
19##
20
21import os
22import sys
23import re
24from getopt import getopt
25from tempfile import mkstemp
26from subprocess import Popen, PIPE
6a789f0a 27from difflib import unified_diff
dd37a782
UH
28from hashlib import md5
29from shutil import copy
30
31DEBUG = 0
32VERBOSE = False
33
34
35class E_syntax(Exception):
36 pass
37class E_badline(Exception):
38 pass
39
40def INFO(msg, end='\n'):
41 if VERBOSE:
42 print(msg, end=end)
43 sys.stdout.flush()
44
45
46def DBG(msg):
47 if DEBUG:
48 print(msg)
49
50
51def ERR(msg):
52 print(msg, file=sys.stderr)
53
54
55def usage(msg=None):
56 if msg:
57 print(msg.strip() + '\n')
121614a0 58 print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
dd37a782
UH
59 -d Turn on debugging
60 -v Verbose
61 -a All tests
121614a0 62 -l List test(s)
dd37a782
UH
63 -s Show test(s)
64 -r Run test(s)
810494f4 65 -f Fix failed test(s) / create initial output for new test(s)
dd37a782
UH
66 -c Report decoder code coverage
67 -R <directory> Save test reports to <directory>
121614a0 68 <test> Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
dd37a782
UH
69 sys.exit()
70
71
72def check_tclist(tc):
73 if 'pdlist' not in tc or not tc['pdlist']:
74 return("No protocol decoders")
75 if 'input' not in tc or not tc['input']:
76 return("No input")
77 if 'output' not in tc or not tc['output']:
78 return("No output")
79 for op in tc['output']:
80 if 'match' not in op:
81 return("No match in output")
82
83 return None
84
85
86def parse_testfile(path, pd, tc, op_type, op_class):
87 DBG("Opening '%s'" % path)
88 tclist = []
89 for line in open(path).read().split('\n'):
90 try:
91 line = line.strip()
92 if len(line) == 0 or line[0] == "#":
93 continue
94 f = line.split()
95 if not tclist and f[0] != "test":
96 # That can't be good.
97 raise E_badline
98 key = f.pop(0)
99 if key == 'test':
100 if len(f) != 1:
101 raise E_syntax
102 # new testcase
103 tclist.append({
104 'pd': pd,
105 'name': f[0],
106 'pdlist': [],
107 'output': [],
108 })
109 elif key == 'protocol-decoder':
110 if len(f) < 1:
111 raise E_syntax
112 pd_spec = {
113 'name': f.pop(0),
114 'channels': [],
115 'options': [],
b59c504e 116 'initial_pins': [],
dd37a782
UH
117 }
118 while len(f):
119 if len(f) == 1:
120 # Always needs <key> <value>
121 raise E_syntax
122 a, b = f[:2]
123 f = f[2:]
124 if '=' not in b:
125 raise E_syntax
126 opt, val = b.split('=')
127 if a == 'channel':
128 try:
129 val = int(val)
130 except:
131 raise E_syntax
132 pd_spec['channels'].append([opt, val])
133 elif a == 'option':
134 pd_spec['options'].append([opt, val])
b59c504e
UH
135 elif a == 'initial_pin':
136 try:
137 val = int(val)
138 except:
139 raise E_syntax
140 pd_spec['initial_pins'].append([opt, val])
dd37a782
UH
141 else:
142 raise E_syntax
143 tclist[-1]['pdlist'].append(pd_spec)
144 elif key == 'stack':
145 if len(f) < 2:
146 raise E_syntax
147 tclist[-1]['stack'] = f
148 elif key == 'input':
12f978c9 149 if len(f) < 1:
dd37a782 150 raise E_syntax
12f978c9
GS
151 input_spec = {
152 'name': f.pop(0),
153 'format': None,
154 'options': [],
155 }
156 while len(f):
157 if len(f) < 2:
158 # Always needs <key> <value>
159 raise E_syntax
160 a, b = f[:2]
161 f = f[2:]
162 if a == 'format':
163 input_spec['format'] = b
164 elif a == 'option':
165 input_spec['options'].append(b)
166 else:
167 raise E_syntax
168 tclist[-1]['input'] = input_spec
dd37a782
UH
169 elif key == 'output':
170 op_spec = {
171 'pd': f.pop(0),
172 'type': f.pop(0),
173 }
174 while len(f):
175 if len(f) == 1:
176 # Always needs <key> <value>
177 raise E_syntax
178 a, b = f[:2]
179 f = f[2:]
180 if a == 'class':
181 op_spec['class'] = b
182 elif a == 'match':
183 op_spec['match'] = b
184 else:
185 raise E_syntax
186 tclist[-1]['output'].append(op_spec)
187 else:
188 raise E_badline
189 except E_badline as e:
190 ERR("Invalid syntax in %s: line '%s'" % (path, line))
191 return []
192 except E_syntax as e:
193 ERR("Unable to parse %s: unknown line '%s'" % (path, line))
194 return []
195
196 # If a specific testcase was requested, keep only that one.
197 if tc is not None:
198 target_tc = None
199 for t in tclist:
200 if t['name'] == tc:
201 target_tc = t
202 break
203 # ...and a specific output type
204 if op_type is not None:
205 target_oplist = []
206 for op in target_tc['output']:
207 if op['type'] == op_type:
208 # ...and a specific output class
209 if op_class is None or ('class' in op and op['class'] == op_class):
210 target_oplist.append(op)
211 DBG("match on [%s]" % str(op))
212 target_tc['output'] = target_oplist
213 if target_tc is None:
214 tclist = []
215 else:
216 tclist = [target_tc]
217 for t in tclist:
218 error = check_tclist(t)
219 if error:
220 ERR("Error in %s: %s" % (path, error))
221 return []
222
223 return tclist
224
225
226def get_tests(testnames):
227 tests = {}
228 for testspec in testnames:
229 # Optional testspec in the form pd/testcase/type/class
230 tc = op_type = op_class = None
231 ts = testspec.strip("/").split("/")
232 pd = ts.pop(0)
233 tests[pd] = []
234 if ts:
235 tc = ts.pop(0)
236 if ts:
237 op_type = ts.pop(0)
238 if ts:
239 op_class = ts.pop(0)
240 path = os.path.join(tests_dir, pd)
241 if not os.path.isdir(path):
242 # User specified non-existent PD
243 raise Exception("%s not found." % path)
244 path = os.path.join(tests_dir, pd, "test.conf")
245 if not os.path.exists(path):
246 # PD doesn't have any tests yet
247 continue
248 tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
249
250 return tests
251
252
253def diff_text(f1, f2):
254 t1 = open(f1).readlines()
255 t2 = open(f2).readlines()
6a789f0a
GS
256 diff = list(unified_diff(t1, t2))
257 diff = diff[2:] # Strip two from/to filename lines with "+++"/"---".
258 diff = [d.strip() for d in diff if d[0] in ('+', '-')]
dd37a782
UH
259 return diff
260
261
262def compare_binary(f1, f2):
263 h1 = md5()
264 h1.update(open(f1, 'rb').read())
265 h2 = md5()
266 h2.update(open(f2, 'rb').read())
267 if h1.digest() == h2.digest():
268 result = None
269 else:
270 result = ["Binary output does not match."]
271
272 return result
273
274
275# runtc's stdout can have lines like:
276# coverage: lines=161 missed=2 coverage=99%
277def parse_stats(text):
278 stats = {}
279 for line in text.strip().split('\n'):
280 fields = line.split()
281 key = fields.pop(0).strip(':')
282 if key not in stats:
283 stats[key] = []
284 stats[key].append({})
285 for f in fields:
286 k, v = f.split('=')
287 stats[key][-1][k] = v
288
289 return stats
290
291
292# take result set of all tests in a PD, and summarize which lines
293# were not covered by any of the tests.
294def coverage_sum(cvglist):
295 lines = 0
296 missed = 0
297 missed_lines = {}
298 for record in cvglist:
299 lines = int(record['lines'])
300 missed += int(record['missed'])
301 if 'missed_lines' not in record:
302 continue
303 for linespec in record['missed_lines'].split(','):
304 if linespec not in missed_lines:
305 missed_lines[linespec] = 1
306 else:
307 missed_lines[linespec] += 1
308
309 # keep only those lines that didn't show up in every non-summary record
310 final_missed = []
311 for linespec in missed_lines:
312 if missed_lines[linespec] != len(cvglist):
313 continue
314 final_missed.append(linespec)
315
316 return lines, final_missed
317
318
319def run_tests(tests, fix=False):
320 errors = 0
321 results = []
322 cmd = [os.path.join(runtc_dir, 'runtc')]
323 if opt_coverage:
324 fd, coverage = mkstemp()
325 os.close(fd)
326 cmd.extend(['-c', coverage])
327 else:
328 coverage = None
329 for pd in sorted(tests.keys()):
330 pd_cvg = []
331 for tclist in tests[pd]:
332 for tc in tclist:
333 args = cmd[:]
334 if DEBUG > 1:
335 args.append('-d')
336 # Set up PD stack for this test.
337 for spd in tc['pdlist']:
338 args.extend(['-P', spd['name']])
339 for label, channel in spd['channels']:
340 args.extend(['-p', "%s=%d" % (label, channel)])
341 for option, value in spd['options']:
342 args.extend(['-o', "%s=%s" % (option, value)])
b59c504e
UH
343 for label, initial_pin in spd['initial_pins']:
344 args.extend(['-N', "%s=%d" % (label, initial_pin)])
12f978c9
GS
345 # Setup input spec for this test (optional format spec).
346 in_spec = tc['input']
347 infile = os.path.join(dumps_dir, in_spec['name'])
348 args.extend(['-i', infile])
349 if in_spec['format']:
350 args.extend(['-I', in_spec['format']])
351 for opt in in_spec['options']:
352 args.extend(['-I', opt])
353 # Setup output spec for this test.
dd37a782
UH
354 for op in tc['output']:
355 name = "%s/%s/%s" % (pd, tc['name'], op['type'])
356 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
357 if 'class' in op:
358 opargs[-1] += ":%s" % op['class']
359 name += "/%s" % op['class']
360 if VERBOSE:
e44830e3 361 dots = '.' * (77 - len(name) - 2)
dd37a782
UH
362 INFO("%s %s " % (name, dots), end='')
363 results.append({
364 'testcase': name,
365 })
366 try:
367 fd, outfile = mkstemp()
368 os.close(fd)
369 opargs.extend(['-f', outfile])
370 DBG("Running %s" % (' '.join(args + opargs)))
371 p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
372 stdout, stderr = p.communicate()
373 if stdout:
374 # statistics and coverage data on stdout
375 results[-1].update(parse_stats(stdout.decode('utf-8')))
376 if stderr:
377 results[-1]['error'] = stderr.decode('utf-8').strip()
378 errors += 1
379 elif p.returncode != 0:
380 # runtc indicated an error, but didn't output a
381 # message on stderr about it
382 results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
383 if 'error' not in results[-1]:
384 matchfile = os.path.join(tests_dir, op['pd'], op['match'])
385 DBG("Comparing with %s" % matchfile)
386 try:
387 diff = diff_error = None
388 if op['type'] in ('annotation', 'python'):
389 diff = diff_text(matchfile, outfile)
390 elif op['type'] == 'binary':
391 diff = compare_binary(matchfile, outfile)
392 else:
393 diff = ["Unsupported output type '%s'." % op['type']]
394 except Exception as e:
395 diff_error = e
396 if fix:
397 if diff or diff_error:
398 copy(outfile, matchfile)
399 DBG("Wrote %s" % matchfile)
400 else:
401 if diff:
402 results[-1]['diff'] = diff
403 elif diff_error is not None:
404 raise diff_error
405 except Exception as e:
406 results[-1]['error'] = str(e)
407 finally:
408 if coverage:
409 results[-1]['coverage_report'] = coverage
410 os.unlink(outfile)
411 if op['type'] == 'exception' and 'error' in results[-1]:
412 # filter out the exception we were looking for
413 reg = "^Error: srd: %s:" % op['match']
414 if re.match(reg, results[-1]['error']):
415 # found it, not an error
416 results[-1].pop('error')
afd2f3f7 417 errors -= 1
dd37a782
UH
418 if VERBOSE:
419 if 'diff' in results[-1]:
420 INFO("Output mismatch")
421 elif 'error' in results[-1]:
422 error = results[-1]['error']
423 if len(error) > 20:
424 error = error[:17] + '...'
425 INFO(error)
426 elif 'coverage' in results[-1]:
427 # report coverage of this PD
428 for record in results[-1]['coverage']:
429 # but not others used in the stack
430 # as part of the test.
431 if record['scope'] == pd:
432 INFO(record['coverage'])
433 break
434 else:
435 INFO("OK")
436 gen_report(results[-1])
437 if coverage:
438 os.unlink(coverage)
439 # only keep track of coverage records for this PD,
440 # not others in the stack just used for testing.
441 for cvg in results[-1]['coverage']:
442 if cvg['scope'] == pd:
443 pd_cvg.append(cvg)
444 if opt_coverage and len(pd_cvg) > 1:
445 # report total coverage of this PD, across all the tests
446 # that were done on it.
447 total_lines, missed_lines = coverage_sum(pd_cvg)
448 pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
449 if VERBOSE:
450 dots = '.' * (54 - len(pd) - 2)
451 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
452 if report_dir:
453 # generate a missing lines list across all the files in
454 # the PD
455 files = {}
456 for entry in missed_lines:
457 filename, line = entry.split(':')
458 if filename not in files:
459 files[filename] = []
460 files[filename].append(line)
461 text = ''
462 for filename in sorted(files.keys()):
463 line_list = ','.join(sorted(files[filename], key=int))
464 text += "%s: %s\n" % (filename, line_list)
465 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
466
467
468 return results, errors
469
a1c10c43
GS
470def get_run_tests_error_diff_counts(results):
471 """Get error and diff counters from run_tests() results."""
472 errs = 0
473 diffs = 0
474 for result in results:
475 if 'error' in result:
476 errs += 1
477 if 'diff' in result:
478 diffs += 1
479 return errs, diffs
480
dd37a782
UH
481
482def gen_report(result):
483 out = []
484 if 'error' in result:
485 out.append("Error:")
486 out.append(result['error'])
487 out.append('')
488 if 'diff' in result:
489 out.append("Test output mismatch:")
490 out.extend(result['diff'])
491 out.append('')
492 if 'coverage_report' in result:
493 out.append(open(result['coverage_report'], 'r').read())
494 out.append('')
495
496 if out:
497 text = "Testcase: %s\n" % result['testcase']
498 text += '\n'.join(out)
499 else:
500 return
501
502 if report_dir:
503 filename = result['testcase'].replace('/', '_')
504 open(os.path.join(report_dir, filename), 'w').write(text)
505 else:
506 print(text)
507
508
509def show_tests(tests):
510 for pd in sorted(tests.keys()):
511 for tclist in tests[pd]:
512 for tc in tclist:
513 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
514 for pd in tc['pdlist']:
515 print(" Protocol decoder: %s" % pd['name'])
516 for label, channel in pd['channels']:
517 print(" Channel %s=%d" % (label, channel))
518 for option, value in pd['options']:
c1ac6dda 519 print(" Option %s=%s" % (option, value))
b59c504e
UH
520 for label, initial_pin in pd['initial_pins']:
521 print(" Initial pin %s=%d" % (label, initial_pin))
dd37a782
UH
522 if 'stack' in tc:
523 print(" Stack: %s" % ' '.join(tc['stack']))
524 print(" Input: %s" % tc['input'])
525 for op in tc['output']:
526 print(" Output:\n Protocol decoder: %s" % op['pd'])
527 print(" Type: %s" % op['type'])
528 if 'class' in op:
529 print(" Class: %s" % op['class'])
530 print(" Match: %s" % op['match'])
531 print()
532
533
534def list_tests(tests):
535 for pd in sorted(tests.keys()):
536 for tclist in tests[pd]:
537 for tc in tclist:
538 for op in tc['output']:
539 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
540 if 'class' in op:
541 line += "/%s" % op['class']
542 print(line)
543
544
545#
546# main
547#
548
549# project root
550runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
551base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
552dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
553tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
554
555if len(sys.argv) == 1:
556 usage()
557
558opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
559report_dir = None
cf646afd
JS
560try:
561 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
562except Exception as e:
563 usage('error while parsing command line arguments: {}'.format(e))
dd37a782
UH
564for opt, arg in opts:
565 if opt == '-d':
566 DEBUG += 1
567 if opt == '-v':
568 VERBOSE = True
569 elif opt == '-a':
570 opt_all = True
571 elif opt == '-r':
572 opt_run = True
573 elif opt == '-s':
574 opt_show = True
575 elif opt == '-l':
576 opt_list = True
577 elif opt == '-f':
578 opt_fix = True
579 elif opt == '-c':
580 opt_coverage = True
581 elif opt == '-R':
582 report_dir = arg
583 elif opt == '-S':
584 dumps_dir = arg
585
586if opt_run and opt_show:
587 usage("Use either -s or -r, not both.")
588if args and opt_all:
589 usage("Specify either -a or tests, not both.")
590if report_dir is not None and not os.path.isdir(report_dir):
591 usage("%s is not a directory" % report_dir)
592
593ret = 0
594try:
595 if args:
596 testlist = get_tests(args)
94ac3e40 597 elif opt_all or opt_list:
dd37a782
UH
598 testlist = get_tests(os.listdir(tests_dir))
599 else:
600 usage("Specify either -a or tests.")
601
602 if opt_run:
603 if not os.path.isdir(dumps_dir):
604 ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
605 sys.exit(1)
606 results, errors = run_tests(testlist, fix=opt_fix)
1a541759 607 ret = 0
a1c10c43 608 errs, diffs = get_run_tests_error_diff_counts(results)
1a541759
GS
609 if errs:
610 ret = 1
611 elif diffs:
612 ret = 2
dd37a782
UH
613 elif opt_show:
614 show_tests(testlist)
615 elif opt_list:
616 list_tests(testlist)
617 elif opt_fix:
618 run_tests(testlist, fix=True)
619 else:
620 usage()
621except Exception as e:
622 print("Error: %s" % str(e))
623 if DEBUG:
624 raise
625
626sys.exit(ret)