]> sigrok.org Git - libsigrokdecode.git/blob - tests/pdtest
tests/pdtest: Small fix to make it work with Python 3.2.
[libsigrokdecode.git] / tests / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the libsigrokdecode project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 import re
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import Differ
28 from hashlib import md5
29 from shutil import copy
30
31 DEBUG = 0
32 VERBOSE = False
33
34
35 class E_syntax(Exception):
36     pass
37 class E_badline(Exception):
38     pass
39
40 def INFO(msg, end='\n'):
41     if VERBOSE:
42         print(msg, end=end)
43         sys.stdout.flush()
44
45
46 def DBG(msg):
47     if DEBUG:
48         print(msg)
49
50
51 def ERR(msg):
52     print(msg, file=sys.stderr)
53
54
55 def usage(msg=None):
56     if msg:
57         print(msg.strip() + '\n')
58     print("""Usage: testpd [-dvarslR] [test, ...]
59   -d  Turn on debugging
60   -v  Verbose
61   -a  All tests
62   -l  List all tests
63   -s  Show test(s)
64   -r  Run test(s)
65   -f  Fix failed test(s)
66   -c  Report decoder code coverage
67   -R <directory>  Save test reports to <directory>
68   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
69     sys.exit()
70
71
72 def check_tclist(tc):
73     if 'pdlist' not in tc or not tc['pdlist']:
74         return("No protocol decoders")
75     if 'input' not in tc or not tc['input']:
76         return("No input")
77     if 'output' not in tc or not tc['output']:
78         return("No output")
79     for op in tc['output']:
80         if 'match' not in op:
81             return("No match in output")
82
83     return None
84
85
86 def parse_testfile(path, pd, tc, op_type, op_class):
87     DBG("Opening '%s'" % path)
88     tclist = []
89     for line in open(path).read().split('\n'):
90         try:
91             line = line.strip()
92             if len(line) == 0 or line[0] == "#":
93                 continue
94             f = line.split()
95             if not tclist and f[0] != "test":
96                 # That can't be good.
97                 raise E_badline
98             key = f.pop(0)
99             if key == 'test':
100                 if len(f) != 1:
101                     raise E_syntax
102                 # new testcase
103                 tclist.append({
104                     'pd': pd,
105                     'name': f[0],
106                     'pdlist': [],
107                     'output': [],
108                 })
109             elif key == 'protocol-decoder':
110                 if len(f) < 1:
111                     raise E_syntax
112                 pd_spec = {
113                     'name': f.pop(0),
114                     'channels': [],
115                     'options': [],
116                 }
117                 while len(f):
118                     if len(f) == 1:
119                         # Always needs <key> <value>
120                         raise E_syntax
121                     a, b = f[:2]
122                     f = f[2:]
123                     if '=' not in b:
124                         raise E_syntax
125                     opt, val = b.split('=')
126                     if a == 'channel':
127                         try:
128                             val = int(val)
129                         except:
130                             raise E_syntax
131                         pd_spec['channels'].append([opt, val])
132                     elif a == 'option':
133                         pd_spec['options'].append([opt, val])
134                     else:
135                         raise E_syntax
136                 tclist[-1]['pdlist'].append(pd_spec)
137             elif key == 'stack':
138                 if len(f) < 2:
139                     raise E_syntax
140                 tclist[-1]['stack'] = f
141             elif key == 'input':
142                 if len(f) != 1:
143                     raise E_syntax
144                 tclist[-1]['input'] = f[0]
145             elif key == 'output':
146                 op_spec = {
147                     'pd': f.pop(0),
148                     'type': f.pop(0),
149                 }
150                 while len(f):
151                     if len(f) == 1:
152                         # Always needs <key> <value>
153                         raise E_syntax
154                     a, b = f[:2]
155                     f = f[2:]
156                     if a == 'class':
157                         op_spec['class'] = b
158                     elif a == 'match':
159                         op_spec['match'] = b
160                     else:
161                         raise E_syntax
162                 tclist[-1]['output'].append(op_spec)
163             else:
164                 raise E_badline
165         except E_badline as e:
166             ERR("Invalid syntax in %s: line '%s'" % (path, line))
167             return []
168         except E_syntax as e:
169             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
170             return []
171
172     # If a specific testcase was requested, keep only that one.
173     if tc is not None:
174         target_tc = None
175         for t in tclist:
176             if t['name'] == tc:
177                 target_tc = t
178                 break
179         # ...and a specific output type
180         if op_type is not None:
181             target_oplist = []
182             for op in target_tc['output']:
183                 if op['type'] == op_type:
184                     # ...and a specific output class
185                     if op_class is None or ('class' in op and op['class'] == op_class):
186                         target_oplist.append(op)
187                         DBG("match on [%s]" % str(op))
188             target_tc['output'] = target_oplist
189         if target_tc is None:
190             tclist = []
191         else:
192             tclist = [target_tc]
193     for t in tclist:
194         error = check_tclist(t)
195         if error:
196             ERR("Error in %s: %s" % (path, error))
197             return []
198
199     return tclist
200
201
202 def get_tests(testnames):
203     tests = {}
204     for testspec in testnames:
205         # Optional testspec in the form pd/testcase/type/class
206         tc = op_type = op_class = None
207         ts = testspec.strip("/").split("/")
208         pd = ts.pop(0)
209         tests[pd] = []
210         if ts:
211             tc = ts.pop(0)
212         if ts:
213             op_type = ts.pop(0)
214         if ts:
215             op_class = ts.pop(0)
216         path = os.path.join(decoders_dir, pd)
217         if not os.path.isdir(path):
218             # User specified non-existent PD
219             raise Exception("%s not found." % path)
220         path = os.path.join(decoders_dir, pd, "test/test.conf")
221         if not os.path.exists(path):
222             # PD doesn't have any tests yet
223             continue
224         tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
225
226     return tests
227
228
229 def diff_text(f1, f2):
230     t1 = open(f1).readlines()
231     t2 = open(f2).readlines()
232     diff = []
233     d = Differ()
234     for line in d.compare(t1, t2):
235         if line[:2] in ('- ', '+ '):
236             diff.append(line.strip())
237
238     return diff
239
240
241 def compare_binary(f1, f2):
242     h1 = md5()
243     h1.update(open(f1, 'rb').read())
244     h2 = md5()
245     h2.update(open(f2, 'rb').read())
246     if h1.digest() == h2.digest():
247         result = None
248     else:
249         result = ["Binary output does not match."]
250
251     return result
252
253
254 # runtc's stdout can have lines like:
255 # coverage: lines=161 missed=2 coverage=99%
256 def parse_stats(text):
257     stats = {}
258     for line in text.strip().split('\n'):
259         fields = line.split()
260         key = fields.pop(0).strip(':')
261         if key not in stats:
262             stats[key] = []
263         stats[key].append({})
264         for f in fields:
265             k, v = f.split('=')
266             stats[key][-1][k] = v
267
268     return stats
269
270
271 # take result set of all tests in a PD, and summarize which lines
272 # were not covered by any of the tests.
273 def coverage_sum(cvglist):
274     lines = 0
275     missed = 0
276     missed_lines = {}
277     for record in cvglist:
278         lines = int(record['lines'])
279         missed += int(record['missed'])
280         if 'missed_lines' not in record:
281             continue
282         for linespec in record['missed_lines'].split(','):
283             if linespec not in missed_lines:
284                 missed_lines[linespec] = 1
285             else:
286                 missed_lines[linespec] += 1
287
288     # keep only those lines that didn't show up in every non-summary record
289     final_missed = []
290     for linespec in missed_lines:
291         if missed_lines[linespec] != len(cvglist):
292             continue
293         final_missed.append(linespec)
294
295     return lines, final_missed
296
297
298 def run_tests(tests, fix=False):
299     errors = 0
300     results = []
301     cmd = [os.path.join(tests_dir, 'runtc')]
302     if opt_coverage:
303         fd, coverage = mkstemp()
304         os.close(fd)
305         cmd.extend(['-c', coverage])
306     else:
307         coverage = None
308     for pd in sorted(tests.keys()):
309         pd_cvg = []
310         for tclist in tests[pd]:
311             for tc in tclist:
312                 args = cmd[:]
313                 if DEBUG > 1:
314                     args.append('-d')
315                 # Set up PD stack for this test.
316                 for spd in tc['pdlist']:
317                     args.extend(['-P', spd['name']])
318                     for label, channel in spd['channels']:
319                         args.extend(['-p', "%s=%d" % (label, channel)])
320                     for option, value in spd['options']:
321                         args.extend(['-o', "%s=%s" % (option, value)])
322                 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
323                 for op in tc['output']:
324                     name = "%s/%s/%s" % (pd, tc['name'], op['type'])
325                     opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
326                     if 'class' in op:
327                         opargs[-1] += ":%s" % op['class']
328                         name += "/%s" % op['class']
329                     if VERBOSE:
330                         dots = '.' * (60 - len(name) - 2)
331                         INFO("%s %s " % (name, dots), end='')
332                     results.append({
333                         'testcase': name,
334                     })
335                     try:
336                         fd, outfile = mkstemp()
337                         os.close(fd)
338                         opargs.extend(['-f', outfile])
339                         DBG("Running %s" % (' '.join(args + opargs)))
340                         p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
341                         stdout, stderr = p.communicate()
342                         if stdout:
343                             # statistics and coverage data on stdout
344                             results[-1].update(parse_stats(stdout.decode('utf-8')))
345                         if stderr:
346                             results[-1]['error'] = stderr.decode('utf-8').strip()
347                             errors += 1
348                         elif p.returncode != 0:
349                             # runtc indicated an error, but didn't output a
350                             # message on stderr about it
351                             results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
352                         if 'error' not in results[-1]:
353                             matchfile = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
354                             DBG("Comparing with %s" % matchfile)
355                             try:
356                                 diff = diff_error = None
357                                 if op['type'] in ('annotation', 'python'):
358                                     diff = diff_text(matchfile, outfile)
359                                 elif op['type'] == 'binary':
360                                     diff = compare_binary(matchfile, outfile)
361                                 else:
362                                     diff = ["Unsupported output type '%s'." % op['type']]
363                             except Exception as e:
364                                 diff_error = e
365                             if fix:
366                                 if diff or diff_error:
367                                     copy(outfile, matchfile)
368                                     DBG("Wrote %s" % matchfile)
369                             else:
370                                 if diff:
371                                     results[-1]['diff'] = diff
372                                 elif diff_error is not None:
373                                     raise diff_error
374                     except Exception as e:
375                         results[-1]['error'] = str(e)
376                     finally:
377                         if coverage:
378                             results[-1]['coverage_report'] = coverage
379                         os.unlink(outfile)
380                     if op['type'] == 'exception' and 'error' in results[-1]:
381                         # filter out the exception we were looking for
382                         reg = "^Error: srd: %s:" % op['match']
383                         if re.match(reg, results[-1]['error']):
384                             # found it, not an error
385                             results[-1].pop('error')
386                     if VERBOSE:
387                         if 'diff' in results[-1]:
388                             INFO("Output mismatch")
389                         elif 'error' in results[-1]:
390                             error = results[-1]['error']
391                             if len(error) > 20:
392                                 error = error[:17] + '...'
393                             INFO(error)
394                         elif 'coverage' in results[-1]:
395                             # report coverage of this PD
396                             for record in results[-1]['coverage']:
397                                 # but not others used in the stack
398                                 # as part of the test.
399                                 if record['scope'] == pd:
400                                     INFO(record['coverage'])
401                                     break
402                         else:
403                             INFO("OK")
404                     gen_report(results[-1])
405                     if coverage:
406                         os.unlink(coverage)
407                         # only keep track of coverage records for this PD,
408                         # not others in the stack just used for testing.
409                         for cvg in results[-1]['coverage']:
410                             if cvg['scope'] == pd:
411                                 pd_cvg.append(cvg)
412         if opt_coverage and len(pd_cvg) > 1:
413             # report total coverage of this PD, across all the tests
414             # that were done on it.
415             total_lines, missed_lines = coverage_sum(pd_cvg)
416             pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
417             if VERBOSE:
418                 dots = '.' * (54 - len(pd) - 2)
419                 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
420             if report_dir:
421                 # generate a missing lines list across all the files in
422                 # the PD
423                 files = {}
424                 for entry in missed_lines:
425                     filename, line = entry.split(':')
426                     if filename not in files:
427                         files[filename] = []
428                     files[filename].append(line)
429                 text = ''
430                 for filename in sorted(files.keys()):
431                     line_list = ','.join(sorted(files[filename], key=int))
432                     text += "%s: %s\n" % (filename, line_list)
433                 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
434
435
436     return results, errors
437
438
439 def gen_report(result):
440     out = []
441     if 'error' in result:
442         out.append("Error:")
443         out.append(result['error'])
444         out.append('')
445     if 'diff' in result:
446         out.append("Test output mismatch:")
447         out.extend(result['diff'])
448         out.append('')
449     if 'coverage_report' in result:
450         out.append(open(result['coverage_report'], 'r').read())
451         out.append('')
452
453     if out:
454         text = "Testcase: %s\n" % result['testcase']
455         text += '\n'.join(out)
456     else:
457         return
458
459     if report_dir:
460         filename = result['testcase'].replace('/', '_')
461         open(os.path.join(report_dir, filename), 'w').write(text)
462     else:
463         print(text)
464
465
466 def show_tests(tests):
467     for pd in sorted(tests.keys()):
468         for tclist in tests[pd]:
469             for tc in tclist:
470                 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
471                 for pd in tc['pdlist']:
472                     print("  Protocol decoder: %s" % pd['name'])
473                     for label, channel in pd['channels']:
474                         print("    Channel %s=%d" % (label, channel))
475                     for option, value in pd['options']:
476                         print("    Option %s=%d" % (option, value))
477                 if 'stack' in tc:
478                     print("  Stack: %s" % ' '.join(tc['stack']))
479                 print("  Input: %s" % tc['input'])
480                 for op in tc['output']:
481                     print("  Output:\n    Protocol decoder: %s" % op['pd'])
482                     print("    Type: %s" % op['type'])
483                     if 'class' in op:
484                         print("    Class: %s" % op['class'])
485                     print("    Match: %s" % op['match'])
486             print()
487
488
489 def list_tests(tests):
490     for pd in sorted(tests.keys()):
491         for tclist in tests[pd]:
492             for tc in tclist:
493                 for op in tc['output']:
494                     line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
495                     if 'class' in op:
496                         line += "/%s" % op['class']
497                     print(line)
498
499
500 #
501 # main
502 #
503
504 # project root
505 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
506 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
507 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
508 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
509
510 if len(sys.argv) == 1:
511     usage()
512
513 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
514 report_dir = None
515 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
516 for opt, arg in opts:
517     if opt == '-d':
518         DEBUG += 1
519     if opt == '-v':
520         VERBOSE = True
521     elif opt == '-a':
522         opt_all = True
523     elif opt == '-r':
524         opt_run = True
525     elif opt == '-s':
526         opt_show = True
527     elif opt == '-l':
528         opt_list = True
529     elif opt == '-f':
530         opt_fix = True
531     elif opt == '-c':
532         opt_coverage = True
533     elif opt == '-R':
534         report_dir = arg
535     elif opt == '-S':
536         dumps_dir = arg
537
538 if opt_run and opt_show:
539     usage("Use either -s or -r, not both.")
540 if args and opt_all:
541     usage("Specify either -a or tests, not both.")
542 if report_dir is not None and not os.path.isdir(report_dir):
543     usage("%s is not a directory" % report_dir)
544
545 ret = 0
546 try:
547     if args:
548         testlist = get_tests(args)
549     elif opt_all:
550         testlist = get_tests(os.listdir(decoders_dir))
551     else:
552         usage("Specify either -a or tests.")
553
554     if opt_run:
555         if not os.path.isdir(dumps_dir):
556             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
557             sys.exit(1)
558         results, errors = run_tests(testlist, fix=opt_fix)
559         ret = errors
560     elif opt_show:
561         show_tests(testlist)
562     elif opt_list:
563         list_tests(testlist)
564     elif opt_fix:
565         run_tests(testlist, fix=True)
566     else:
567         usage()
568 except Exception as e:
569     print("Error: %s" % str(e))
570     if DEBUG:
571         raise
572
573 sys.exit(ret)
574