- cmd = os.path.join(tests_dir, 'runtc')
- for tclist in tests:
- for tc in tclist:
- args = [cmd]
- for pd in tc['pdlist']:
- args.extend(['-P', pd['name']])
- for label, probe in pd['probes']:
- args.extend(['-p', "%s=%d" % (label, probe)])
- for option, value in pd['options']:
- args.extend(['-o', "%s=%s" % (option, value)])
- args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
- for op in tc['output']:
- name = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
- opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
- if 'class' in op:
- opargs[-1] += ":%s" % op['class']
- name += "/%s" % op['class']
- if VERBOSE:
- dots = '.' * (60 - len(name) - 2)
- INFO("%s %s " % (name, dots), end='')
- results.append({
- 'testcase': name,
- })
- try:
- fd, outfile = mkstemp()
- os.close(fd)
- opargs.extend(['-f', outfile])
- DBG("Running %s" % (' '.join(args + opargs)))
- p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
- stdout, stderr = p.communicate()
- if stdout:
- results[-1]['statistics'] = stdout.decode('utf-8').strip()
- if stderr:
- results[-1]['error'] = stderr.decode('utf-8').strip()
- errors += 1
- elif p.returncode != 0:
- # runtc indicated an error, but didn't output a
- # message on stderr about it
- results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
- # Only bother with the diff if it all worked.
- if 'error' not in results[-1]:
- match = "%s/%s/test/%s" % (decoders_dir, op['pd'], op['match'])
- diff = diff_files(match, outfile)
- if diff:
- results[-1]['diff'] = diff
- except Exception as e:
- results[-1]['error'] = str(e)
- finally:
- os.unlink(outfile)
- if VERBOSE:
- if 'diff' in results[-1]:
- INFO("Output mismatch")
- elif 'error' in results[-1]:
- error = results[-1]['error']
- if len(error) > 20:
- error = error[:17] + '...'
- INFO(error)
- else:
- INFO("OK")
- gen_report(results[-1])
+ cmd = [os.path.join(tests_dir, 'runtc')]
+ if opt_coverage:
+ fd, coverage = mkstemp()
+ os.close(fd)
+ cmd.extend(['-c', coverage])
+ else:
+ coverage = None
+ for pd in sorted(tests.keys()):
+ pd_cvg = []
+ for tclist in tests[pd]:
+ for tc in tclist:
+ args = cmd.copy()
+ if DEBUG > 1:
+ args.append('-d')
+ # Set up PD stack for this test.
+ for spd in tc['pdlist']:
+ args.extend(['-P', spd['name']])
+ for label, channel in spd['channels']:
+ args.extend(['-p', "%s=%d" % (label, channel)])
+ for option, value in spd['options']:
+ args.extend(['-o', "%s=%s" % (option, value)])
+ args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
+ for op in tc['output']:
+ name = "%s/%s/%s" % (pd, tc['name'], op['type'])
+ opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
+ if 'class' in op:
+ opargs[-1] += ":%s" % op['class']
+ name += "/%s" % op['class']
+ if VERBOSE:
+ dots = '.' * (60 - len(name) - 2)
+ INFO("%s %s " % (name, dots), end='')
+ results.append({
+ 'testcase': name,
+ })
+ try:
+ fd, outfile = mkstemp()
+ os.close(fd)
+ opargs.extend(['-f', outfile])
+ DBG("Running %s" % (' '.join(args + opargs)))
+ p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
+ stdout, stderr = p.communicate()
+ if stdout:
+ # statistics and coverage data on stdout
+ results[-1].update(parse_stats(stdout.decode('utf-8')))
+ if stderr:
+ results[-1]['error'] = stderr.decode('utf-8').strip()
+ errors += 1
+ elif p.returncode != 0:
+ # runtc indicated an error, but didn't output a
+ # message on stderr about it
+ results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
+ if 'error' not in results[-1]:
+ matchfile = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
+ DBG("Comparing with %s" % matchfile)
+ try:
+ diff = diff_error = None
+ if op['type'] in ('annotation', 'python'):
+ diff = diff_text(matchfile, outfile)
+ elif op['type'] == 'binary':
+ diff = compare_binary(matchfile, outfile)
+ else:
+ diff = ["Unsupported output type '%s'." % op['type']]
+ except Exception as e:
+ diff_error = e
+ if fix:
+ if diff or diff_error:
+ copy(outfile, matchfile)
+ DBG("Wrote %s" % matchfile)
+ else:
+ if diff:
+ results[-1]['diff'] = diff
+ elif diff_error is not None:
+ raise diff_error
+ except Exception as e:
+ results[-1]['error'] = str(e)
+ finally:
+ if coverage:
+ results[-1]['coverage_report'] = coverage
+ os.unlink(outfile)
+ if VERBOSE:
+ if 'diff' in results[-1]:
+ INFO("Output mismatch")
+ elif 'error' in results[-1]:
+ error = results[-1]['error']
+ if len(error) > 20:
+ error = error[:17] + '...'
+ INFO(error)
+ elif 'coverage' in results[-1]:
+ # report coverage of this PD
+ for record in results[-1]['coverage']:
+ # but not others used in the stack
+ # as part of the test.
+ if record['scope'] == pd:
+ INFO(record['coverage'])
+ break
+ else:
+ INFO("OK")
+ gen_report(results[-1])
+ if coverage:
+ os.unlink(coverage)
+ # only keep track of coverage records for this PD,
+ # not others in the stack just used for testing.
+ for cvg in results[-1]['coverage']:
+ if cvg['scope'] == pd:
+ pd_cvg.append(cvg)
+ if VERBOSE and opt_coverage and len(pd_cvg) > 1:
+ # report total coverage of this PD, across all the tests
+ # that were done on it.
+ total_lines, missed_lines = coverage_sum(pd_cvg)
+ pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
+ if VERBOSE:
+ dots = '.' * (54 - len(pd) - 2)
+ INFO("%s total %s %d%%" % (pd, dots, pd_coverage))