]> sigrok.org Git - libsigrokdecode.git/blob - tests/pdtest
Rename 'probe' to 'channel' everywhere.
[libsigrokdecode.git] / tests / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the libsigrokdecode project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 from getopt import getopt
24 from tempfile import mkstemp
25 from subprocess import Popen, PIPE
26 from difflib import Differ
27 from hashlib import md5
28 from shutil import copy
29
30 DEBUG = 0
31 VERBOSE = False
32
33
34 class E_syntax(Exception):
35     pass
36 class E_badline(Exception):
37     pass
38
39 def INFO(msg, end='\n'):
40     if VERBOSE:
41         print(msg, end=end)
42         sys.stdout.flush()
43
44
45 def DBG(msg):
46     if DEBUG:
47         print(msg)
48
49
50 def ERR(msg):
51     print(msg, file=sys.stderr)
52
53
54 def usage(msg=None):
55     if msg:
56         print(msg.strip() + '\n')
57     print("""Usage: testpd [-dvarslR] [test, ...]
58   -d  Turn on debugging
59   -v  Verbose
60   -a  All tests
61   -l  List all tests
62   -s  Show test(s)
63   -r  Run test(s)
64   -f  Fix failed test(s)
65   -c  Report decoder code coverage
66   -R <directory>  Save test reports to <directory>
67   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
68     sys.exit()
69
70
71 def check_tclist(tc):
72     if 'pdlist' not in tc or not tc['pdlist']:
73         return("No protocol decoders")
74     if 'input' not in tc or not tc['input']:
75         return("No input")
76     if 'output' not in tc or not tc['output']:
77         return("No output")
78     for op in tc['output']:
79         if 'match' not in op:
80             return("No match in output")
81
82     return None
83
84
85 def parse_testfile(path, pd, tc, op_type, op_class):
86     DBG("Opening '%s'" % path)
87     tclist = []
88     for line in open(path).read().split('\n'):
89         try:
90             line = line.strip()
91             if len(line) == 0 or line[0] == "#":
92                 continue
93             f = line.split()
94             if not tclist and f[0] != "test":
95                 # That can't be good.
96                 raise E_badline
97             key = f.pop(0)
98             if key == 'test':
99                 if len(f) != 1:
100                     raise E_syntax
101                 # new testcase
102                 tclist.append({
103                     'pd': pd,
104                     'name': f[0],
105                     'pdlist': [],
106                     'output': [],
107                 })
108             elif key == 'protocol-decoder':
109                 if len(f) < 1:
110                     raise E_syntax
111                 pd_spec = {
112                     'name': f.pop(0),
113                     'channels': [],
114                     'options': [],
115                 }
116                 while len(f):
117                     if len(f) == 1:
118                         # Always needs <key> <value>
119                         raise E_syntax
120                     a, b = f[:2]
121                     f = f[2:]
122                     if '=' not in b:
123                         raise E_syntax
124                     opt, val = b.split('=')
125                     if a == 'channel':
126                         try:
127                             val = int(val)
128                         except:
129                             raise E_syntax
130                         pd_spec['channels'].append([opt, val])
131                     elif a == 'option':
132                         pd_spec['options'].append([opt, val])
133                     else:
134                         raise E_syntax
135                 tclist[-1]['pdlist'].append(pd_spec)
136             elif key == 'stack':
137                 if len(f) < 2:
138                     raise E_syntax
139                 tclist[-1]['stack'] = f
140             elif key == 'input':
141                 if len(f) != 1:
142                     raise E_syntax
143                 tclist[-1]['input'] = f[0]
144             elif key == 'output':
145                 op_spec = {
146                     'pd': f.pop(0),
147                     'type': f.pop(0),
148                 }
149                 while len(f):
150                     if len(f) == 1:
151                         # Always needs <key> <value>
152                         raise E_syntax
153                     a, b = f[:2]
154                     f = f[2:]
155                     if a == 'class':
156                         op_spec['class'] = b
157                     elif a == 'match':
158                         op_spec['match'] = b
159                     else:
160                         raise E_syntax
161                 tclist[-1]['output'].append(op_spec)
162             else:
163                 raise E_badline
164         except E_badline as e:
165             ERR("Invalid syntax in %s: line '%s'" % (path, line))
166             return []
167         except E_syntax as e:
168             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
169             return []
170
171     # If a specific testcase was requested, keep only that one.
172     if tc is not None:
173         target_tc = None
174         for t in tclist:
175             if t['name'] == tc:
176                 target_tc = t
177                 break
178         # ...and a specific output type
179         if op_type is not None:
180             target_oplist = []
181             for op in target_tc['output']:
182                 if op['type'] == op_type:
183                     # ...and a specific output class
184                     if op_class is None or ('class' in op and op['class'] == op_class):
185                         target_oplist.append(op)
186                         DBG("match on [%s]" % str(op))
187             target_tc['output'] = target_oplist
188         if target_tc is None:
189             tclist = []
190         else:
191             tclist = [target_tc]
192     for t in tclist:
193         error = check_tclist(t)
194         if error:
195             ERR("Error in %s: %s" % (path, error))
196             return []
197
198     return tclist
199
200
201 def get_tests(testnames):
202     tests = {}
203     for testspec in testnames:
204         # Optional testspec in the form pd/testcase/type/class
205         tc = op_type = op_class = None
206         ts = testspec.strip("/").split("/")
207         pd = ts.pop(0)
208         tests[pd] = []
209         if ts:
210             tc = ts.pop(0)
211         if ts:
212             op_type = ts.pop(0)
213         if ts:
214             op_class = ts.pop(0)
215         path = os.path.join(decoders_dir, pd)
216         if not os.path.isdir(path):
217             # User specified non-existent PD
218             raise Exception("%s not found." % path)
219         path = os.path.join(decoders_dir, pd, "test/test.conf")
220         if not os.path.exists(path):
221             # PD doesn't have any tests yet
222             continue
223         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
224
225     return tests
226
227
228 def diff_text(f1, f2):
229     t1 = open(f1).readlines()
230     t2 = open(f2).readlines()
231     diff = []
232     d = Differ()
233     for line in d.compare(t1, t2):
234         if line[:2] in ('- ', '+ '):
235             diff.append(line.strip())
236
237     return diff
238
239
240 def compare_binary(f1, f2):
241     h1 = md5()
242     h1.update(open(f1, 'rb').read())
243     h2 = md5()
244     h2.update(open(f2, 'rb').read())
245     if h1.digest() == h2.digest():
246         result = None
247     else:
248         result = ["Binary output does not match."]
249
250     return result
251
252
253 # runtc's stdout can have lines like:
254 # coverage: lines=161 missed=2 coverage=99%
255 def parse_stats(text):
256     stats = {}
257     for line in text.strip().split('\n'):
258         fields = line.split()
259         key = fields.pop(0).strip(':')
260         if key not in stats:
261             stats[key] = []
262         stats[key].append({})
263         for f in fields:
264             k, v = f.split('=')
265             stats[key][-1][k] = v
266
267     return stats
268
269
270 # take result set of all tests in a PD, and summarize which lines
271 # were not covered by any of the tests.
272 def coverage_sum(cvglist):
273     lines = 0
274     missed = 0
275     missed_lines = {}
276     for record in cvglist:
277         lines = int(record['lines'])
278         missed += int(record['missed'])
279         if 'missed_lines' not in record:
280             continue
281         for linespec in record['missed_lines'].split(','):
282             if linespec not in missed_lines:
283                 missed_lines[linespec] = 1
284             else:
285                 missed_lines[linespec] += 1
286
287     # keep only those lines that didn't show up in every non-summary record
288     final_missed = []
289     for linespec in missed_lines:
290         if missed_lines[linespec] != len(cvglist):
291             continue
292         final_missed.append(linespec)
293
294     return lines, final_missed
295
296
297 def run_tests(tests, fix=False):
298     errors = 0
299     results = []
300     cmd = [os.path.join(tests_dir, 'runtc')]
301     if opt_coverage:
302         fd, coverage = mkstemp()
303         os.close(fd)
304         cmd.extend(['-c', coverage])
305     else:
306         coverage = None
307     for pd in sorted(tests.keys()):
308         pd_cvg = []
309         for tclist in tests[pd]:
310             for tc in tclist:
311                 args = cmd.copy()
312                 if DEBUG > 1:
313                     args.append('-d')
314                 # Set up PD stack for this test.
315                 for spd in tc['pdlist']:
316                     args.extend(['-P', spd['name']])
317                     for label, channel in spd['channels']:
318                         args.extend(['-p', "%s=%d" % (label, channel)])
319                     for option, value in spd['options']:
320                         args.extend(['-o', "%s=%s" % (option, value)])
321                 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
322                 for op in tc['output']:
323                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
324                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
325                     if 'class' in op:
326                         opargs[-1] += ":%s" % op['class']
327                         name += "/%s" % op['class']
328                     if VERBOSE:
329                         dots = '.' * (60 - len(name) - 2)
330                         INFO("%s %s " % (name, dots), end='')
331                     results.append({
332                         'testcase': name,
333                     })
334                     try:
335                         fd, outfile = mkstemp()
336                         os.close(fd)
337                         opargs.extend(['-f', outfile])
338                         DBG("Running %s" % (' '.join(args + opargs)))
339                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
340                         stdout, stderr = p.communicate()
341                         if stdout:
342                             # statistics and coverage data on stdout
343                             results[-1].update(parse_stats(stdout.decode('utf-8')))
344                         if stderr:
345                             results[-1]['error'] = stderr.decode('utf-8').strip()
346                             errors += 1
347                         elif p.returncode != 0:
348                             # runtc indicated an error, but didn't output a
349                             # message on stderr about it
350                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
351                         if 'error' not in results[-1]:
352                             matchfile = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
353                             DBG("Comparing with %s" % matchfile)
354                             try:
355                                 diff = diff_error = None
356                                 if op['type'] in ('annotation', 'python'):
357                                     diff = diff_text(matchfile, outfile)
358                                 elif op['type'] == 'binary':
359                                     diff = compare_binary(matchfile, outfile)
360                                 else:
361                                     diff = ["Unsupported output type '%s'." % op['type']]
362                             except Exception as e:
363                                 diff_error = e
364                             if fix:
365                                 if diff or diff_error:
366                                     copy(outfile, matchfile)
367                                     DBG("Wrote %s" % matchfile)
368                             else:
369                                 if diff:
370                                     results[-1]['diff'] = diff
371                                 elif diff_error is not None:
372                                     raise diff_error
373                     except Exception as e:
374                         results[-1]['error'] = str(e)
375                     finally:
376                         if coverage:
377                             results[-1]['coverage_report'] = coverage
378                         os.unlink(outfile)
379                     if VERBOSE:
380                         if 'diff' in results[-1]:
381                             INFO("Output mismatch")
382                         elif 'error' in results[-1]:
383                             error = results[-1]['error']
384                             if len(error) > 20:
385                                 error = error[:17] + '...'
386                             INFO(error)
387                         elif 'coverage' in results[-1]:
388                             # report coverage of this PD
389                             for record in results[-1]['coverage']:
390                                 # but not others used in the stack
391                                 # as part of the test.
392                                 if record['scope'] == pd:
393                                     INFO(record['coverage'])
394                                     break
395                         else:
396                             INFO("OK")
397                     gen_report(results[-1])
398                     if coverage:
399                         os.unlink(coverage)
400                         # only keep track of coverage records for this PD,
401                         # not others in the stack just used for testing.
402                         for cvg in results[-1]['coverage']:
403                             if cvg['scope'] == pd:
404                                 pd_cvg.append(cvg)
405         if VERBOSE and opt_coverage and len(pd_cvg) > 1:
406             # report total coverage of this PD, across all the tests
407             # that were done on it.
408             total_lines, missed_lines = coverage_sum(pd_cvg)
409             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
410             if VERBOSE:
411                 dots = '.' * (54 - len(pd) - 2)
412             INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
413
414     return results, errors
415
416
417 def gen_report(result):
418     out = []
419     if 'error' in result:
420         out.append("Error:")
421         out.append(result['error'])
422         out.append('')
423     if 'diff' in result:
424         out.append("Test output mismatch:")
425         out.extend(result['diff'])
426         out.append('')
427     if 'coverage_report' in result:
428         out.append(open(result['coverage_report'], 'r').read())
429         out.append('')
430
431     if out:
432         text = "Testcase: %s\n" % result['testcase']
433         text += '\n'.join(out)
434     else:
435         return
436
437     if report_dir:
438         filename = result['testcase'].replace('/', '_')
439         open(os.path.join(report_dir, filename), 'w').write(text)
440     else:
441         print(text)
442
443
444 def show_tests(tests):
445     for pd in sorted(tests.keys()):
446         for tclist in tests[pd]:
447             for tc in tclist:
448                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
449                 for pd in tc['pdlist']:
450                     print("  Protocol decoder: %s" % pd['name'])
451                     for label, channel in pd['channels']:
452                         print("    Channel %s=%d" % (label, channel))
453                     for option, value in pd['options']:
454                         print("    Option %s=%d" % (option, value))
455                 if 'stack' in tc:
456                     print("  Stack: %s" % ' '.join(tc['stack']))
457                 print("  Input: %s" % tc['input'])
458                 for op in tc['output']:
459                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
460                     print("    Type: %s" % op['type'])
461                     if 'class' in op:
462                         print("    Class: %s" % op['class'])
463                     print("    Match: %s" % op['match'])
464             print()
465
466
467 def list_tests(tests):
468     for pd in sorted(tests.keys()):
469         for tclist in tests[pd]:
470             for tc in tclist:
471                 for op in tc['output']:
472                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
473                     if 'class' in op:
474                         line += "/%s" % op['class']
475                     print(line)
476
477
478 #
479 # main
480 #
481
482 # project root
483 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
484 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
485 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
486 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
487
488 if len(sys.argv) == 1:
489     usage()
490
491 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
492 report_dir = None
493 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
494 for opt, arg in opts:
495     if opt == '-d':
496         DEBUG += 1
497     if opt == '-v':
498         VERBOSE = True
499     elif opt == '-a':
500         opt_all = True
501     elif opt == '-r':
502         opt_run = True
503     elif opt == '-s':
504         opt_show = True
505     elif opt == '-l':
506         opt_list = True
507     elif opt == '-f':
508         opt_fix = True
509     elif opt == '-c':
510         opt_coverage = True
511     elif opt == '-R':
512         report_dir = arg
513     elif opt == '-S':
514         dumps_dir = arg
515
516 if opt_run and opt_show:
517     usage("Use either -s or -r, not both.")
518 if args and opt_all:
519     usage("Specify either -a or tests, not both.")
520 if report_dir is not None and not os.path.isdir(report_dir):
521     usage("%s is not a directory" % report_dir)
522
523 ret = 0
524 try:
525     if args:
526         testlist = get_tests(args)
527     elif opt_all:
528         testlist = get_tests(os.listdir(decoders_dir))
529     else:
530         usage("Specify either -a or tests.")
531
532     if opt_run:
533         if not os.path.isdir(dumps_dir):
534             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
535             sys.exit(1)
536         results, errors = run_tests(testlist, fix=opt_fix)
537         ret = errors
538     elif opt_show:
539         show_tests(testlist)
540     elif opt_list:
541         list_tests(testlist)
542     elif opt_fix:
543         run_tests(testlist, fix=True)
544     else:
545         usage()
546 except Exception as e:
547     print("Error: %s" % str(e))
548     if DEBUG:
549         raise
550
551 sys.exit(ret)
552