]> sigrok.org Git - sigrok-test.git/blame - decoder/pdtest
ir_nec: add tests for extended NEC protocol (failed and passed)
[sigrok-test.git] / decoder / pdtest
CommitLineData
dd37a782
UH
1#!/usr/bin/env python3
2##
3## This file is part of the sigrok-test project.
4##
5## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6##
7## This program is free software: you can redistribute it and/or modify
8## it under the terms of the GNU General Public License as published by
9## the Free Software Foundation, either version 3 of the License, or
10## (at your option) any later version.
11##
12## This program is distributed in the hope that it will be useful,
13## but WITHOUT ANY WARRANTY; without even the implied warranty of
14## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15## GNU General Public License for more details.
16##
17## You should have received a copy of the GNU General Public License
18## along with this program. If not, see <http://www.gnu.org/licenses/>.
19##
20
21import os
22import sys
23import re
24from getopt import getopt
25from tempfile import mkstemp
26from subprocess import Popen, PIPE
27from difflib import Differ
28from hashlib import md5
29from shutil import copy
30
31DEBUG = 0
32VERBOSE = False
33
34
35class E_syntax(Exception):
36 pass
37class E_badline(Exception):
38 pass
39
40def INFO(msg, end='\n'):
41 if VERBOSE:
42 print(msg, end=end)
43 sys.stdout.flush()
44
45
46def DBG(msg):
47 if DEBUG:
48 print(msg)
49
50
51def ERR(msg):
52 print(msg, file=sys.stderr)
53
54
55def usage(msg=None):
56 if msg:
57 print(msg.strip() + '\n')
121614a0 58 print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
dd37a782
UH
59 -d Turn on debugging
60 -v Verbose
61 -a All tests
121614a0 62 -l List test(s)
dd37a782
UH
63 -s Show test(s)
64 -r Run test(s)
810494f4 65 -f Fix failed test(s) / create initial output for new test(s)
dd37a782
UH
66 -c Report decoder code coverage
67 -R <directory> Save test reports to <directory>
121614a0 68 <test> Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
dd37a782
UH
69 sys.exit()
70
71
72def check_tclist(tc):
73 if 'pdlist' not in tc or not tc['pdlist']:
74 return("No protocol decoders")
75 if 'input' not in tc or not tc['input']:
76 return("No input")
77 if 'output' not in tc or not tc['output']:
78 return("No output")
79 for op in tc['output']:
80 if 'match' not in op:
81 return("No match in output")
82
83 return None
84
85
86def parse_testfile(path, pd, tc, op_type, op_class):
87 DBG("Opening '%s'" % path)
88 tclist = []
89 for line in open(path).read().split('\n'):
90 try:
91 line = line.strip()
92 if len(line) == 0 or line[0] == "#":
93 continue
94 f = line.split()
95 if not tclist and f[0] != "test":
96 # That can't be good.
97 raise E_badline
98 key = f.pop(0)
99 if key == 'test':
100 if len(f) != 1:
101 raise E_syntax
102 # new testcase
103 tclist.append({
104 'pd': pd,
105 'name': f[0],
106 'pdlist': [],
107 'output': [],
108 })
109 elif key == 'protocol-decoder':
110 if len(f) < 1:
111 raise E_syntax
112 pd_spec = {
113 'name': f.pop(0),
114 'channels': [],
115 'options': [],
b59c504e 116 'initial_pins': [],
dd37a782
UH
117 }
118 while len(f):
119 if len(f) == 1:
120 # Always needs <key> <value>
121 raise E_syntax
122 a, b = f[:2]
123 f = f[2:]
124 if '=' not in b:
125 raise E_syntax
126 opt, val = b.split('=')
127 if a == 'channel':
128 try:
129 val = int(val)
130 except:
131 raise E_syntax
132 pd_spec['channels'].append([opt, val])
133 elif a == 'option':
134 pd_spec['options'].append([opt, val])
b59c504e
UH
135 elif a == 'initial_pin':
136 try:
137 val = int(val)
138 except:
139 raise E_syntax
140 pd_spec['initial_pins'].append([opt, val])
dd37a782
UH
141 else:
142 raise E_syntax
143 tclist[-1]['pdlist'].append(pd_spec)
144 elif key == 'stack':
145 if len(f) < 2:
146 raise E_syntax
147 tclist[-1]['stack'] = f
148 elif key == 'input':
149 if len(f) != 1:
150 raise E_syntax
151 tclist[-1]['input'] = f[0]
152 elif key == 'output':
153 op_spec = {
154 'pd': f.pop(0),
155 'type': f.pop(0),
156 }
157 while len(f):
158 if len(f) == 1:
159 # Always needs <key> <value>
160 raise E_syntax
161 a, b = f[:2]
162 f = f[2:]
163 if a == 'class':
164 op_spec['class'] = b
165 elif a == 'match':
166 op_spec['match'] = b
167 else:
168 raise E_syntax
169 tclist[-1]['output'].append(op_spec)
170 else:
171 raise E_badline
172 except E_badline as e:
173 ERR("Invalid syntax in %s: line '%s'" % (path, line))
174 return []
175 except E_syntax as e:
176 ERR("Unable to parse %s: unknown line '%s'" % (path, line))
177 return []
178
179 # If a specific testcase was requested, keep only that one.
180 if tc is not None:
181 target_tc = None
182 for t in tclist:
183 if t['name'] == tc:
184 target_tc = t
185 break
186 # ...and a specific output type
187 if op_type is not None:
188 target_oplist = []
189 for op in target_tc['output']:
190 if op['type'] == op_type:
191 # ...and a specific output class
192 if op_class is None or ('class' in op and op['class'] == op_class):
193 target_oplist.append(op)
194 DBG("match on [%s]" % str(op))
195 target_tc['output'] = target_oplist
196 if target_tc is None:
197 tclist = []
198 else:
199 tclist = [target_tc]
200 for t in tclist:
201 error = check_tclist(t)
202 if error:
203 ERR("Error in %s: %s" % (path, error))
204 return []
205
206 return tclist
207
208
209def get_tests(testnames):
210 tests = {}
211 for testspec in testnames:
212 # Optional testspec in the form pd/testcase/type/class
213 tc = op_type = op_class = None
214 ts = testspec.strip("/").split("/")
215 pd = ts.pop(0)
216 tests[pd] = []
217 if ts:
218 tc = ts.pop(0)
219 if ts:
220 op_type = ts.pop(0)
221 if ts:
222 op_class = ts.pop(0)
223 path = os.path.join(tests_dir, pd)
224 if not os.path.isdir(path):
225 # User specified non-existent PD
226 raise Exception("%s not found." % path)
227 path = os.path.join(tests_dir, pd, "test.conf")
228 if not os.path.exists(path):
229 # PD doesn't have any tests yet
230 continue
231 tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
232
233 return tests
234
235
236def diff_text(f1, f2):
237 t1 = open(f1).readlines()
238 t2 = open(f2).readlines()
239 diff = []
240 d = Differ()
241 for line in d.compare(t1, t2):
242 if line[:2] in ('- ', '+ '):
243 diff.append(line.strip())
244
245 return diff
246
247
248def compare_binary(f1, f2):
249 h1 = md5()
250 h1.update(open(f1, 'rb').read())
251 h2 = md5()
252 h2.update(open(f2, 'rb').read())
253 if h1.digest() == h2.digest():
254 result = None
255 else:
256 result = ["Binary output does not match."]
257
258 return result
259
260
261# runtc's stdout can have lines like:
262# coverage: lines=161 missed=2 coverage=99%
263def parse_stats(text):
264 stats = {}
265 for line in text.strip().split('\n'):
266 fields = line.split()
267 key = fields.pop(0).strip(':')
268 if key not in stats:
269 stats[key] = []
270 stats[key].append({})
271 for f in fields:
272 k, v = f.split('=')
273 stats[key][-1][k] = v
274
275 return stats
276
277
278# take result set of all tests in a PD, and summarize which lines
279# were not covered by any of the tests.
280def coverage_sum(cvglist):
281 lines = 0
282 missed = 0
283 missed_lines = {}
284 for record in cvglist:
285 lines = int(record['lines'])
286 missed += int(record['missed'])
287 if 'missed_lines' not in record:
288 continue
289 for linespec in record['missed_lines'].split(','):
290 if linespec not in missed_lines:
291 missed_lines[linespec] = 1
292 else:
293 missed_lines[linespec] += 1
294
295 # keep only those lines that didn't show up in every non-summary record
296 final_missed = []
297 for linespec in missed_lines:
298 if missed_lines[linespec] != len(cvglist):
299 continue
300 final_missed.append(linespec)
301
302 return lines, final_missed
303
304
305def run_tests(tests, fix=False):
306 errors = 0
307 results = []
308 cmd = [os.path.join(runtc_dir, 'runtc')]
309 if opt_coverage:
310 fd, coverage = mkstemp()
311 os.close(fd)
312 cmd.extend(['-c', coverage])
313 else:
314 coverage = None
315 for pd in sorted(tests.keys()):
316 pd_cvg = []
317 for tclist in tests[pd]:
318 for tc in tclist:
319 args = cmd[:]
320 if DEBUG > 1:
321 args.append('-d')
322 # Set up PD stack for this test.
323 for spd in tc['pdlist']:
324 args.extend(['-P', spd['name']])
325 for label, channel in spd['channels']:
326 args.extend(['-p', "%s=%d" % (label, channel)])
327 for option, value in spd['options']:
328 args.extend(['-o', "%s=%s" % (option, value)])
b59c504e
UH
329 for label, initial_pin in spd['initial_pins']:
330 args.extend(['-N', "%s=%d" % (label, initial_pin)])
dd37a782
UH
331 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
332 for op in tc['output']:
333 name = "%s/%s/%s" % (pd, tc['name'], op['type'])
334 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
335 if 'class' in op:
336 opargs[-1] += ":%s" % op['class']
337 name += "/%s" % op['class']
338 if VERBOSE:
e44830e3 339 dots = '.' * (77 - len(name) - 2)
dd37a782
UH
340 INFO("%s %s " % (name, dots), end='')
341 results.append({
342 'testcase': name,
343 })
344 try:
345 fd, outfile = mkstemp()
346 os.close(fd)
347 opargs.extend(['-f', outfile])
348 DBG("Running %s" % (' '.join(args + opargs)))
349 p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
350 stdout, stderr = p.communicate()
351 if stdout:
352 # statistics and coverage data on stdout
353 results[-1].update(parse_stats(stdout.decode('utf-8')))
354 if stderr:
355 results[-1]['error'] = stderr.decode('utf-8').strip()
356 errors += 1
357 elif p.returncode != 0:
358 # runtc indicated an error, but didn't output a
359 # message on stderr about it
360 results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
361 if 'error' not in results[-1]:
362 matchfile = os.path.join(tests_dir, op['pd'], op['match'])
363 DBG("Comparing with %s" % matchfile)
364 try:
365 diff = diff_error = None
366 if op['type'] in ('annotation', 'python'):
367 diff = diff_text(matchfile, outfile)
368 elif op['type'] == 'binary':
369 diff = compare_binary(matchfile, outfile)
370 else:
371 diff = ["Unsupported output type '%s'." % op['type']]
372 except Exception as e:
373 diff_error = e
374 if fix:
375 if diff or diff_error:
376 copy(outfile, matchfile)
377 DBG("Wrote %s" % matchfile)
378 else:
379 if diff:
380 results[-1]['diff'] = diff
381 elif diff_error is not None:
382 raise diff_error
383 except Exception as e:
384 results[-1]['error'] = str(e)
385 finally:
386 if coverage:
387 results[-1]['coverage_report'] = coverage
388 os.unlink(outfile)
389 if op['type'] == 'exception' and 'error' in results[-1]:
390 # filter out the exception we were looking for
391 reg = "^Error: srd: %s:" % op['match']
392 if re.match(reg, results[-1]['error']):
393 # found it, not an error
394 results[-1].pop('error')
afd2f3f7 395 errors -= 1
dd37a782
UH
396 if VERBOSE:
397 if 'diff' in results[-1]:
398 INFO("Output mismatch")
399 elif 'error' in results[-1]:
400 error = results[-1]['error']
401 if len(error) > 20:
402 error = error[:17] + '...'
403 INFO(error)
404 elif 'coverage' in results[-1]:
405 # report coverage of this PD
406 for record in results[-1]['coverage']:
407 # but not others used in the stack
408 # as part of the test.
409 if record['scope'] == pd:
410 INFO(record['coverage'])
411 break
412 else:
413 INFO("OK")
414 gen_report(results[-1])
415 if coverage:
416 os.unlink(coverage)
417 # only keep track of coverage records for this PD,
418 # not others in the stack just used for testing.
419 for cvg in results[-1]['coverage']:
420 if cvg['scope'] == pd:
421 pd_cvg.append(cvg)
422 if opt_coverage and len(pd_cvg) > 1:
423 # report total coverage of this PD, across all the tests
424 # that were done on it.
425 total_lines, missed_lines = coverage_sum(pd_cvg)
426 pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
427 if VERBOSE:
428 dots = '.' * (54 - len(pd) - 2)
429 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
430 if report_dir:
431 # generate a missing lines list across all the files in
432 # the PD
433 files = {}
434 for entry in missed_lines:
435 filename, line = entry.split(':')
436 if filename not in files:
437 files[filename] = []
438 files[filename].append(line)
439 text = ''
440 for filename in sorted(files.keys()):
441 line_list = ','.join(sorted(files[filename], key=int))
442 text += "%s: %s\n" % (filename, line_list)
443 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
444
445
446 return results, errors
447
a1c10c43
GS
448def get_run_tests_error_diff_counts(results):
449 """Get error and diff counters from run_tests() results."""
450 errs = 0
451 diffs = 0
452 for result in results:
453 if 'error' in result:
454 errs += 1
455 if 'diff' in result:
456 diffs += 1
457 return errs, diffs
458
dd37a782
UH
459
460def gen_report(result):
461 out = []
462 if 'error' in result:
463 out.append("Error:")
464 out.append(result['error'])
465 out.append('')
466 if 'diff' in result:
467 out.append("Test output mismatch:")
468 out.extend(result['diff'])
469 out.append('')
470 if 'coverage_report' in result:
471 out.append(open(result['coverage_report'], 'r').read())
472 out.append('')
473
474 if out:
475 text = "Testcase: %s\n" % result['testcase']
476 text += '\n'.join(out)
477 else:
478 return
479
480 if report_dir:
481 filename = result['testcase'].replace('/', '_')
482 open(os.path.join(report_dir, filename), 'w').write(text)
483 else:
484 print(text)
485
486
487def show_tests(tests):
488 for pd in sorted(tests.keys()):
489 for tclist in tests[pd]:
490 for tc in tclist:
491 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
492 for pd in tc['pdlist']:
493 print(" Protocol decoder: %s" % pd['name'])
494 for label, channel in pd['channels']:
495 print(" Channel %s=%d" % (label, channel))
496 for option, value in pd['options']:
c1ac6dda 497 print(" Option %s=%s" % (option, value))
b59c504e
UH
498 for label, initial_pin in pd['initial_pins']:
499 print(" Initial pin %s=%d" % (label, initial_pin))
dd37a782
UH
500 if 'stack' in tc:
501 print(" Stack: %s" % ' '.join(tc['stack']))
502 print(" Input: %s" % tc['input'])
503 for op in tc['output']:
504 print(" Output:\n Protocol decoder: %s" % op['pd'])
505 print(" Type: %s" % op['type'])
506 if 'class' in op:
507 print(" Class: %s" % op['class'])
508 print(" Match: %s" % op['match'])
509 print()
510
511
512def list_tests(tests):
513 for pd in sorted(tests.keys()):
514 for tclist in tests[pd]:
515 for tc in tclist:
516 for op in tc['output']:
517 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
518 if 'class' in op:
519 line += "/%s" % op['class']
520 print(line)
521
522
523#
524# main
525#
526
527# project root
528runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
529base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
530dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
531tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
532
533if len(sys.argv) == 1:
534 usage()
535
536opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
537report_dir = None
cf646afd
JS
538try:
539 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
540except Exception as e:
541 usage('error while parsing command line arguments: {}'.format(e))
dd37a782
UH
542for opt, arg in opts:
543 if opt == '-d':
544 DEBUG += 1
545 if opt == '-v':
546 VERBOSE = True
547 elif opt == '-a':
548 opt_all = True
549 elif opt == '-r':
550 opt_run = True
551 elif opt == '-s':
552 opt_show = True
553 elif opt == '-l':
554 opt_list = True
555 elif opt == '-f':
556 opt_fix = True
557 elif opt == '-c':
558 opt_coverage = True
559 elif opt == '-R':
560 report_dir = arg
561 elif opt == '-S':
562 dumps_dir = arg
563
564if opt_run and opt_show:
565 usage("Use either -s or -r, not both.")
566if args and opt_all:
567 usage("Specify either -a or tests, not both.")
568if report_dir is not None and not os.path.isdir(report_dir):
569 usage("%s is not a directory" % report_dir)
570
571ret = 0
572try:
573 if args:
574 testlist = get_tests(args)
94ac3e40 575 elif opt_all or opt_list:
dd37a782
UH
576 testlist = get_tests(os.listdir(tests_dir))
577 else:
578 usage("Specify either -a or tests.")
579
580 if opt_run:
581 if not os.path.isdir(dumps_dir):
582 ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
583 sys.exit(1)
584 results, errors = run_tests(testlist, fix=opt_fix)
1a541759 585 ret = 0
a1c10c43 586 errs, diffs = get_run_tests_error_diff_counts(results)
1a541759
GS
587 if errs:
588 ret = 1
589 elif diffs:
590 ret = 2
dd37a782
UH
591 elif opt_show:
592 show_tests(testlist)
593 elif opt_list:
594 list_tests(testlist)
595 elif opt_fix:
596 run_tests(testlist, fix=True)
597 else:
598 usage()
599except Exception as e:
600 print("Error: %s" % str(e))
601 if DEBUG:
602 raise
603
604sys.exit(ret)