3 ## This file is part of the libsigrokdecode project.
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ## GNU General Public License for more details.
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from getopt import getopt
24 from tempfile import mkstemp
25 from subprocess import Popen, PIPE
26 from difflib import Differ
27 from hashlib import md5
28 from shutil import copy
34 class E_syntax(Exception):
36 class E_badline(Exception):
39 def INFO(msg, end='\n'):
51 print(msg, file=sys.stderr)
56 print(msg.strip() + '\n')
57 print("""Usage: testpd [-dvarslR] [test, ...]
65 -c Report decoder code coverage
66 -R <directory> Save test reports to <directory>
67 <test> Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
72 if 'pdlist' not in tc or not tc['pdlist']:
73 return("No protocol decoders")
74 if 'input' not in tc or not tc['input']:
76 if 'output' not in tc or not tc['output']:
78 for op in tc['output']:
80 return("No match in output")
85 def parse_testfile(path, pd, tc, op_type, op_class):
86 DBG("Opening '%s'" % path)
88 for line in open(path).read().split('\n'):
91 if len(line) == 0 or line[0] == "#":
94 if not tclist and f[0] != "test":
108 elif key == 'protocol-decoder':
118 # Always needs <key> <value>
124 opt, val = b.split('=')
130 pd_spec['channels'].append([opt, val])
132 pd_spec['options'].append([opt, val])
135 tclist[-1]['pdlist'].append(pd_spec)
139 tclist[-1]['stack'] = f
143 tclist[-1]['input'] = f[0]
144 elif key == 'output':
151 # Always needs <key> <value>
161 tclist[-1]['output'].append(op_spec)
164 except E_badline as e:
165 ERR("Invalid syntax in %s: line '%s'" % (path, line))
167 except E_syntax as e:
168 ERR("Unable to parse %s: unknown line '%s'" % (path, line))
171 # If a specific testcase was requested, keep only that one.
178 # ...and a specific output type
179 if op_type is not None:
181 for op in target_tc['output']:
182 if op['type'] == op_type:
183 # ...and a specific output class
184 if op_class is None or ('class' in op and op['class'] == op_class):
185 target_oplist.append(op)
186 DBG("match on [%s]" % str(op))
187 target_tc['output'] = target_oplist
188 if target_tc is None:
193 error = check_tclist(t)
195 ERR("Error in %s: %s" % (path, error))
201 def get_tests(testnames):
203 for testspec in testnames:
204 # Optional testspec in the form pd/testcase/type/class
205 tc = op_type = op_class = None
206 ts = testspec.strip("/").split("/")
215 path = os.path.join(decoders_dir, pd)
216 if not os.path.isdir(path):
217 # User specified non-existent PD
218 raise Exception("%s not found." % path)
219 path = os.path.join(decoders_dir, pd, "test/test.conf")
220 if not os.path.exists(path):
221 # PD doesn't have any tests yet
223 tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
228 def diff_text(f1, f2):
229 t1 = open(f1).readlines()
230 t2 = open(f2).readlines()
233 for line in d.compare(t1, t2):
234 if line[:2] in ('- ', '+ '):
235 diff.append(line.strip())
240 def compare_binary(f1, f2):
242 h1.update(open(f1, 'rb').read())
244 h2.update(open(f2, 'rb').read())
245 if h1.digest() == h2.digest():
248 result = ["Binary output does not match."]
253 # runtc's stdout can have lines like:
254 # coverage: lines=161 missed=2 coverage=99%
255 def parse_stats(text):
257 for line in text.strip().split('\n'):
258 fields = line.split()
259 key = fields.pop(0).strip(':')
262 stats[key].append({})
265 stats[key][-1][k] = v
270 # take result set of all tests in a PD, and summarize which lines
271 # were not covered by any of the tests.
272 def coverage_sum(cvglist):
276 for record in cvglist:
277 lines = int(record['lines'])
278 missed += int(record['missed'])
279 if 'missed_lines' not in record:
281 for linespec in record['missed_lines'].split(','):
282 if linespec not in missed_lines:
283 missed_lines[linespec] = 1
285 missed_lines[linespec] += 1
287 # keep only those lines that didn't show up in every non-summary record
289 for linespec in missed_lines:
290 if missed_lines[linespec] != len(cvglist):
292 final_missed.append(linespec)
294 return lines, final_missed
297 def run_tests(tests, fix=False):
300 cmd = [os.path.join(tests_dir, 'runtc')]
302 fd, coverage = mkstemp()
304 cmd.extend(['-c', coverage])
307 for pd in sorted(tests.keys()):
309 for tclist in tests[pd]:
314 # Set up PD stack for this test.
315 for spd in tc['pdlist']:
316 args.extend(['-P', spd['name']])
317 for label, channel in spd['channels']:
318 args.extend(['-p', "%s=%d" % (label, channel)])
319 for option, value in spd['options']:
320 args.extend(['-o', "%s=%s" % (option, value)])
321 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
322 for op in tc['output']:
323 name = "%s/%s/%s" % (pd, tc['name'], op['type'])
324 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
326 opargs[-1] += ":%s" % op['class']
327 name += "/%s" % op['class']
329 dots = '.' * (60 - len(name) - 2)
330 INFO("%s %s " % (name, dots), end='')
335 fd, outfile = mkstemp()
337 opargs.extend(['-f', outfile])
338 DBG("Running %s" % (' '.join(args + opargs)))
339 p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
340 stdout, stderr = p.communicate()
342 # statistics and coverage data on stdout
343 results[-1].update(parse_stats(stdout.decode('utf-8')))
345 results[-1]['error'] = stderr.decode('utf-8').strip()
347 elif p.returncode != 0:
348 # runtc indicated an error, but didn't output a
349 # message on stderr about it
350 results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
351 if 'error' not in results[-1]:
352 matchfile = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
353 DBG("Comparing with %s" % matchfile)
355 diff = diff_error = None
356 if op['type'] in ('annotation', 'python'):
357 diff = diff_text(matchfile, outfile)
358 elif op['type'] == 'binary':
359 diff = compare_binary(matchfile, outfile)
361 diff = ["Unsupported output type '%s'." % op['type']]
362 except Exception as e:
365 if diff or diff_error:
366 copy(outfile, matchfile)
367 DBG("Wrote %s" % matchfile)
370 results[-1]['diff'] = diff
371 elif diff_error is not None:
373 except Exception as e:
374 results[-1]['error'] = str(e)
377 results[-1]['coverage_report'] = coverage
380 if 'diff' in results[-1]:
381 INFO("Output mismatch")
382 elif 'error' in results[-1]:
383 error = results[-1]['error']
385 error = error[:17] + '...'
387 elif 'coverage' in results[-1]:
388 # report coverage of this PD
389 for record in results[-1]['coverage']:
390 # but not others used in the stack
391 # as part of the test.
392 if record['scope'] == pd:
393 INFO(record['coverage'])
397 gen_report(results[-1])
400 # only keep track of coverage records for this PD,
401 # not others in the stack just used for testing.
402 for cvg in results[-1]['coverage']:
403 if cvg['scope'] == pd:
405 if VERBOSE and opt_coverage and len(pd_cvg) > 1:
406 # report total coverage of this PD, across all the tests
407 # that were done on it.
408 total_lines, missed_lines = coverage_sum(pd_cvg)
409 pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
411 dots = '.' * (54 - len(pd) - 2)
412 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
414 return results, errors
417 def gen_report(result):
419 if 'error' in result:
421 out.append(result['error'])
424 out.append("Test output mismatch:")
425 out.extend(result['diff'])
427 if 'coverage_report' in result:
428 out.append(open(result['coverage_report'], 'r').read())
432 text = "Testcase: %s\n" % result['testcase']
433 text += '\n'.join(out)
438 filename = result['testcase'].replace('/', '_')
439 open(os.path.join(report_dir, filename), 'w').write(text)
444 def show_tests(tests):
445 for pd in sorted(tests.keys()):
446 for tclist in tests[pd]:
448 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
449 for pd in tc['pdlist']:
450 print(" Protocol decoder: %s" % pd['name'])
451 for label, channel in pd['channels']:
452 print(" Channel %s=%d" % (label, channel))
453 for option, value in pd['options']:
454 print(" Option %s=%d" % (option, value))
456 print(" Stack: %s" % ' '.join(tc['stack']))
457 print(" Input: %s" % tc['input'])
458 for op in tc['output']:
459 print(" Output:\n Protocol decoder: %s" % op['pd'])
460 print(" Type: %s" % op['type'])
462 print(" Class: %s" % op['class'])
463 print(" Match: %s" % op['match'])
467 def list_tests(tests):
468 for pd in sorted(tests.keys()):
469 for tclist in tests[pd]:
471 for op in tc['output']:
472 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
474 line += "/%s" % op['class']
483 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
484 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
485 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
486 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
488 if len(sys.argv) == 1:
491 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
493 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
494 for opt, arg in opts:
516 if opt_run and opt_show:
517 usage("Use either -s or -r, not both.")
519 usage("Specify either -a or tests, not both.")
520 if report_dir is not None and not os.path.isdir(report_dir):
521 usage("%s is not a directory" % report_dir)
526 testlist = get_tests(args)
528 testlist = get_tests(os.listdir(decoders_dir))
530 usage("Specify either -a or tests.")
533 if not os.path.isdir(dumps_dir):
534 ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
536 results, errors = run_tests(testlist, fix=opt_fix)
543 run_tests(testlist, fix=True)
546 except Exception as e:
547 print("Error: %s" % str(e))