1 #!/usr/bin/env /usr/bin/python3
5 from getopt import getopt
6 from tempfile import mkstemp
7 from subprocess import Popen, PIPE
8 from difflib import Differ
9 from hashlib import md5
10 from shutil import copy
16 class E_syntax(Exception):
18 class E_badline(Exception):
21 def INFO(msg, end='\n'):
33 print(msg, file=sys.stderr)
38 print(msg.strip() + '\n')
39 print("""Usage: testpd [-dvarslR] [test, ...]
47 -R <directory> Save test reports to <directory>
48 <test> Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
53 if 'pdlist' not in tc or not tc['pdlist']:
54 return("No protocol decoders")
55 if 'input' not in tc or not tc['input']:
57 if 'output' not in tc or not tc['output']:
59 for op in tc['output']:
61 return("No match in output")
66 def parse_testfile(path, pd, tc, op_type, op_class):
67 DBG("Opening '%s'" % path)
69 for line in open(path).read().split('\n'):
72 if len(line) == 0 or line[0] == "#":
75 if not tclist and f[0] != "test":
89 elif key == 'protocol-decoder':
99 # Always needs <key> <value>
105 opt, val = b.split('=')
111 pd_spec['probes'].append([opt, val])
113 pd_spec['options'].append([opt, val])
116 tclist[-1]['pdlist'].append(pd_spec)
120 tclist[-1]['stack'] = f
124 tclist[-1]['input'] = f[0]
125 elif key == 'output':
132 # Always needs <key> <value>
142 tclist[-1]['output'].append(op_spec)
145 except E_badline as e:
146 ERR("Invalid syntax in %s: line '%s'" % (path, line))
148 except E_syntax as e:
149 ERR("Unable to parse %s: unknown line '%s'" % (path, line))
152 # If a specific testcase was requested, keep only that one.
159 # ...and a specific output type
160 if op_type is not None:
162 for op in target_tc['output']:
163 if op['type'] == op_type:
164 # ...and a specific output class
165 if op_class is None or ('class' in op and op['class'] == op_class):
166 target_oplist.append(op)
167 DBG("match on [%s]" % str(op))
168 target_tc['output'] = target_oplist
169 if target_tc is None:
174 error = check_tclist(t)
176 ERR("Error in %s: %s" % (path, error))
182 def get_tests(testnames):
184 for testspec in testnames:
185 # Optional testspec in the form i2c/rtc
186 tc = op_type = op_class = None
187 ts = testspec.strip("/").split("/")
195 path = os.path.join(decoders_dir, pd)
196 if not os.path.isdir(path):
197 # User specified non-existent PD
198 raise Exception("%s not found." % path)
199 path = os.path.join(decoders_dir, pd, "test/test.conf")
200 if not os.path.exists(path):
201 # PD doesn't have any tests yet
203 tests.append(parse_testfile(path, pd, tc, op_type, op_class))
208 def diff_text(f1, f2):
209 t1 = open(f1).readlines()
210 t2 = open(f2).readlines()
213 for line in d.compare(t1, t2):
214 if line[:2] in ('- ', '+ '):
215 diff.append(line.strip())
220 def compare_binary(f1, f2):
222 h1.update(open(f1, 'rb').read())
224 h2.update(open(f2, 'rb').read())
225 if h1.digest() == h2.digest():
228 result = ["Binary output does not match."]
233 def run_tests(tests, fix=False):
236 cmd = os.path.join(tests_dir, 'runtc')
242 for pd in tc['pdlist']:
243 args.extend(['-P', pd['name']])
244 for label, probe in pd['probes']:
245 args.extend(['-p', "%s=%d" % (label, probe)])
246 for option, value in pd['options']:
247 args.extend(['-o', "%s=%s" % (option, value)])
248 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
249 for op in tc['output']:
250 name = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
251 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
253 opargs[-1] += ":%s" % op['class']
254 name += "/%s" % op['class']
256 dots = '.' * (60 - len(name) - 2)
257 INFO("%s %s " % (name, dots), end='')
262 fd, outfile = mkstemp()
264 opargs.extend(['-f', outfile])
265 DBG("Running %s" % (' '.join(args + opargs)))
266 p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
267 stdout, stderr = p.communicate()
269 results[-1]['statistics'] = stdout.decode('utf-8').strip()
271 results[-1]['error'] = stderr.decode('utf-8').strip()
273 elif p.returncode != 0:
274 # runtc indicated an error, but didn't output a
275 # message on stderr about it
276 results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
277 if 'error' not in results[-1]:
278 match = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
280 diff = diff_error = None
281 if op['type'] in ('annotation', 'python'):
282 diff = diff_text(match, outfile)
283 elif op['type'] == 'binary':
284 diff = compare_binary(match, outfile)
286 diff = ["Unsupported output type '%s'." % op['type']]
287 except Exception as e:
290 if diff or diff_error:
292 DBG("Wrote %s" % match)
295 results[-1]['diff'] = diff
296 elif diff_error is not None:
298 except Exception as e:
299 results[-1]['error'] = str(e)
303 if 'diff' in results[-1]:
304 INFO("Output mismatch")
305 elif 'error' in results[-1]:
306 error = results[-1]['error']
308 error = error[:17] + '...'
312 gen_report(results[-1])
314 return results, errors
317 def gen_report(result):
319 if 'error' in result:
321 out.append(result['error'])
324 out.append("Test output mismatch:")
325 out.extend(result['diff'])
327 if 'statistics' in result:
328 out.extend(["Statistics:", result['statistics']])
332 text = "Testcase: %s\n" % result['testcase']
333 text += '\n'.join(out)
338 filename = result['testcase'].replace('/', '_')
339 open(os.path.join(report_dir, filename), 'w').write(text)
344 def show_tests(tests):
347 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
348 for pd in tc['pdlist']:
349 print(" Protocol decoder: %s" % pd['name'])
350 for label, probe in pd['probes']:
351 print(" Probe %s=%d" % (label, probe))
352 for option, value in pd['options']:
353 print(" Option %s=%d" % (option, value))
355 print(" Stack: %s" % ' '.join(tc['stack']))
356 print(" Input: %s" % tc['input'])
357 for op in tc['output']:
358 print(" Output:\n Protocol decoder: %s" % op['pd'])
359 print(" Type: %s" % op['type'])
361 print(" Class: %s" % op['class'])
362 print(" Match: %s" % op['match'])
366 def list_tests(tests):
369 for op in tc['output']:
370 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
372 line += "/%s" % op['class']
381 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
382 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
383 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
384 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
386 if len(sys.argv) == 1:
389 opt_all = opt_run = opt_show = opt_list = opt_fix = False
391 opts, args = getopt(sys.argv[1:], "dvarslfRS:")
392 for opt, arg in opts:
412 if opt_run and opt_show:
413 usage("Use either -s or -r, not both.")
415 usage("Specify either -a or tests, not both.")
416 if report_dir is not None and not os.path.isdir(report_dir):
417 usage("%s is not a directory" % report_dir)
422 testlist = get_tests(args)
424 testlist = get_tests(os.listdir(decoders_dir))
426 usage("Specify either -a or tests.")
429 if not os.path.isdir(dumps_dir):
430 ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
432 results, errors = run_tests(testlist)
439 run_tests(testlist, fix=True)
442 except Exception as e:
443 print("Error: %s" % str(e))