3 ## This file is part of the sigrok-test project.
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ## GNU General Public License for more details.
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import Differ
28 from hashlib import md5
29 from shutil import copy
35 class E_syntax(Exception):
37 class E_badline(Exception):
40 def INFO(msg, end='\n'):
52 print(msg, file=sys.stderr)
57 print(msg.strip() + '\n')
58 print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
65 -f Fix failed test(s) / create initial output for new test(s)
66 -c Report decoder code coverage
67 -R <directory> Save test reports to <directory>
68 <test> Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
73 if 'pdlist' not in tc or not tc['pdlist']:
74 return("No protocol decoders")
75 if 'input' not in tc or not tc['input']:
77 if 'output' not in tc or not tc['output']:
79 for op in tc['output']:
81 return("No match in output")
86 def parse_testfile(path, pd, tc, op_type, op_class):
87 DBG("Opening '%s'" % path)
89 for line in open(path).read().split('\n'):
92 if len(line) == 0 or line[0] == "#":
95 if not tclist and f[0] != "test":
109 elif key == 'protocol-decoder':
120 # Always needs <key> <value>
126 opt, val = b.split('=')
132 pd_spec['channels'].append([opt, val])
134 pd_spec['options'].append([opt, val])
135 elif a == 'initial_pin':
140 pd_spec['initial_pins'].append([opt, val])
143 tclist[-1]['pdlist'].append(pd_spec)
147 tclist[-1]['stack'] = f
151 tclist[-1]['input'] = f[0]
152 elif key == 'output':
159 # Always needs <key> <value>
169 tclist[-1]['output'].append(op_spec)
172 except E_badline as e:
173 ERR("Invalid syntax in %s: line '%s'" % (path, line))
175 except E_syntax as e:
176 ERR("Unable to parse %s: unknown line '%s'" % (path, line))
179 # If a specific testcase was requested, keep only that one.
186 # ...and a specific output type
187 if op_type is not None:
189 for op in target_tc['output']:
190 if op['type'] == op_type:
191 # ...and a specific output class
192 if op_class is None or ('class' in op and op['class'] == op_class):
193 target_oplist.append(op)
194 DBG("match on [%s]" % str(op))
195 target_tc['output'] = target_oplist
196 if target_tc is None:
201 error = check_tclist(t)
203 ERR("Error in %s: %s" % (path, error))
209 def get_tests(testnames):
211 for testspec in testnames:
212 # Optional testspec in the form pd/testcase/type/class
213 tc = op_type = op_class = None
214 ts = testspec.strip("/").split("/")
223 path = os.path.join(tests_dir, pd)
224 if not os.path.isdir(path):
225 # User specified non-existent PD
226 raise Exception("%s not found." % path)
227 path = os.path.join(tests_dir, pd, "test.conf")
228 if not os.path.exists(path):
229 # PD doesn't have any tests yet
231 tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
236 def diff_text(f1, f2):
237 t1 = open(f1).readlines()
238 t2 = open(f2).readlines()
241 for line in d.compare(t1, t2):
242 if line[:2] in ('- ', '+ '):
243 diff.append(line.strip())
248 def compare_binary(f1, f2):
250 h1.update(open(f1, 'rb').read())
252 h2.update(open(f2, 'rb').read())
253 if h1.digest() == h2.digest():
256 result = ["Binary output does not match."]
261 # runtc's stdout can have lines like:
262 # coverage: lines=161 missed=2 coverage=99%
263 def parse_stats(text):
265 for line in text.strip().split('\n'):
266 fields = line.split()
267 key = fields.pop(0).strip(':')
270 stats[key].append({})
273 stats[key][-1][k] = v
278 # take result set of all tests in a PD, and summarize which lines
279 # were not covered by any of the tests.
280 def coverage_sum(cvglist):
284 for record in cvglist:
285 lines = int(record['lines'])
286 missed += int(record['missed'])
287 if 'missed_lines' not in record:
289 for linespec in record['missed_lines'].split(','):
290 if linespec not in missed_lines:
291 missed_lines[linespec] = 1
293 missed_lines[linespec] += 1
295 # keep only those lines that didn't show up in every non-summary record
297 for linespec in missed_lines:
298 if missed_lines[linespec] != len(cvglist):
300 final_missed.append(linespec)
302 return lines, final_missed
305 def run_tests(tests, fix=False):
308 cmd = [os.path.join(runtc_dir, 'runtc')]
310 fd, coverage = mkstemp()
312 cmd.extend(['-c', coverage])
315 for pd in sorted(tests.keys()):
317 for tclist in tests[pd]:
322 # Set up PD stack for this test.
323 for spd in tc['pdlist']:
324 args.extend(['-P', spd['name']])
325 for label, channel in spd['channels']:
326 args.extend(['-p', "%s=%d" % (label, channel)])
327 for option, value in spd['options']:
328 args.extend(['-o', "%s=%s" % (option, value)])
329 for label, initial_pin in spd['initial_pins']:
330 args.extend(['-N', "%s=%d" % (label, initial_pin)])
331 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
332 for op in tc['output']:
333 name = "%s/%s/%s" % (pd, tc['name'], op['type'])
334 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
336 opargs[-1] += ":%s" % op['class']
337 name += "/%s" % op['class']
339 dots = '.' * (77 - len(name) - 2)
340 INFO("%s %s " % (name, dots), end='')
345 fd, outfile = mkstemp()
347 opargs.extend(['-f', outfile])
348 DBG("Running %s" % (' '.join(args + opargs)))
349 p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
350 stdout, stderr = p.communicate()
352 # statistics and coverage data on stdout
353 results[-1].update(parse_stats(stdout.decode('utf-8')))
355 results[-1]['error'] = stderr.decode('utf-8').strip()
357 elif p.returncode != 0:
358 # runtc indicated an error, but didn't output a
359 # message on stderr about it
360 results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
361 if 'error' not in results[-1]:
362 matchfile = os.path.join(tests_dir, op['pd'], op['match'])
363 DBG("Comparing with %s" % matchfile)
365 diff = diff_error = None
366 if op['type'] in ('annotation', 'python'):
367 diff = diff_text(matchfile, outfile)
368 elif op['type'] == 'binary':
369 diff = compare_binary(matchfile, outfile)
371 diff = ["Unsupported output type '%s'." % op['type']]
372 except Exception as e:
375 if diff or diff_error:
376 copy(outfile, matchfile)
377 DBG("Wrote %s" % matchfile)
380 results[-1]['diff'] = diff
381 elif diff_error is not None:
383 except Exception as e:
384 results[-1]['error'] = str(e)
387 results[-1]['coverage_report'] = coverage
389 if op['type'] == 'exception' and 'error' in results[-1]:
390 # filter out the exception we were looking for
391 reg = "^Error: srd: %s:" % op['match']
392 if re.match(reg, results[-1]['error']):
393 # found it, not an error
394 results[-1].pop('error')
397 if 'diff' in results[-1]:
398 INFO("Output mismatch")
399 elif 'error' in results[-1]:
400 error = results[-1]['error']
402 error = error[:17] + '...'
404 elif 'coverage' in results[-1]:
405 # report coverage of this PD
406 for record in results[-1]['coverage']:
407 # but not others used in the stack
408 # as part of the test.
409 if record['scope'] == pd:
410 INFO(record['coverage'])
414 gen_report(results[-1])
417 # only keep track of coverage records for this PD,
418 # not others in the stack just used for testing.
419 for cvg in results[-1]['coverage']:
420 if cvg['scope'] == pd:
422 if opt_coverage and len(pd_cvg) > 1:
423 # report total coverage of this PD, across all the tests
424 # that were done on it.
425 total_lines, missed_lines = coverage_sum(pd_cvg)
426 pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
428 dots = '.' * (54 - len(pd) - 2)
429 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
431 # generate a missing lines list across all the files in
434 for entry in missed_lines:
435 filename, line = entry.split(':')
436 if filename not in files:
438 files[filename].append(line)
440 for filename in sorted(files.keys()):
441 line_list = ','.join(sorted(files[filename], key=int))
442 text += "%s: %s\n" % (filename, line_list)
443 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
446 return results, errors
448 def get_run_tests_error_diff_counts(results):
449 """Get error and diff counters from run_tests() results."""
452 for result in results:
453 if 'error' in result:
460 def gen_report(result):
462 if 'error' in result:
464 out.append(result['error'])
467 out.append("Test output mismatch:")
468 out.extend(result['diff'])
470 if 'coverage_report' in result:
471 out.append(open(result['coverage_report'], 'r').read())
475 text = "Testcase: %s\n" % result['testcase']
476 text += '\n'.join(out)
481 filename = result['testcase'].replace('/', '_')
482 open(os.path.join(report_dir, filename), 'w').write(text)
487 def show_tests(tests):
488 for pd in sorted(tests.keys()):
489 for tclist in tests[pd]:
491 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
492 for pd in tc['pdlist']:
493 print(" Protocol decoder: %s" % pd['name'])
494 for label, channel in pd['channels']:
495 print(" Channel %s=%d" % (label, channel))
496 for option, value in pd['options']:
497 print(" Option %s=%s" % (option, value))
498 for label, initial_pin in pd['initial_pins']:
499 print(" Initial pin %s=%d" % (label, initial_pin))
501 print(" Stack: %s" % ' '.join(tc['stack']))
502 print(" Input: %s" % tc['input'])
503 for op in tc['output']:
504 print(" Output:\n Protocol decoder: %s" % op['pd'])
505 print(" Type: %s" % op['type'])
507 print(" Class: %s" % op['class'])
508 print(" Match: %s" % op['match'])
512 def list_tests(tests):
513 for pd in sorted(tests.keys()):
514 for tclist in tests[pd]:
516 for op in tc['output']:
517 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
519 line += "/%s" % op['class']
528 runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
529 base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
530 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
531 tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
533 if len(sys.argv) == 1:
536 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
539 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
540 except Exception as e:
541 usage('error while parsing command line arguments: {}'.format(e))
542 for opt, arg in opts:
564 if opt_run and opt_show:
565 usage("Use either -s or -r, not both.")
567 usage("Specify either -a or tests, not both.")
568 if report_dir is not None and not os.path.isdir(report_dir):
569 usage("%s is not a directory" % report_dir)
574 testlist = get_tests(args)
575 elif opt_all or opt_list:
576 testlist = get_tests(os.listdir(tests_dir))
578 usage("Specify either -a or tests.")
581 if not os.path.isdir(dumps_dir):
582 ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
584 results, errors = run_tests(testlist, fix=opt_fix)
586 errs, diffs = get_run_tests_error_diff_counts(results)
596 run_tests(testlist, fix=True)
599 except Exception as e:
600 print("Error: %s" % str(e))