]> sigrok.org Git - libsigrokdecode.git/blob - tests/pdtest
pdtest: Small fix.
[libsigrokdecode.git] / tests / pdtest
1 #!/usr/bin/env python3
2 ##
3 ## This file is part of the libsigrokdecode project.
4 ##
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
6 ##
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
11 ##
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 ## GNU General Public License for more details.
16 ##
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 ##
20
21 import os
22 import sys
23 from getopt import getopt
24 from tempfile import mkstemp
25 from subprocess import Popen, PIPE
26 from difflib import Differ
27 from hashlib import md5
28 from shutil import copy
29
30 DEBUG = 0
31 VERBOSE = False
32
33
34 class E_syntax(Exception):
35     pass
36 class E_badline(Exception):
37     pass
38
39 def INFO(msg, end='\n'):
40     if VERBOSE:
41         print(msg, end=end)
42         sys.stdout.flush()
43
44
45 def DBG(msg):
46     if DEBUG:
47         print(msg)
48
49
50 def ERR(msg):
51     print(msg, file=sys.stderr)
52
53
54 def usage(msg=None):
55     if msg:
56         print(msg.strip() + '\n')
57     print("""Usage: testpd [-dvarslR] [test, ...]
58   -d  Turn on debugging
59   -v  Verbose
60   -a  All tests
61   -l  List all tests
62   -s  Show test(s)
63   -r  Run test(s)
64   -f  Fix failed test(s)
65   -R <directory>  Save test reports to <directory>
66   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
67     sys.exit()
68
69
70 def check_tclist(tc):
71     if 'pdlist' not in tc or not tc['pdlist']:
72         return("No protocol decoders")
73     if 'input' not in tc or not tc['input']:
74         return("No input")
75     if 'output' not in tc or not tc['output']:
76         return("No output")
77     for op in tc['output']:
78         if 'match' not in op:
79             return("No match in output")
80
81     return None
82
83
84 def parse_testfile(path, pd, tc, op_type, op_class):
85     DBG("Opening '%s'" % path)
86     tclist = []
87     for line in open(path).read().split('\n'):
88         try:
89             line = line.strip()
90             if len(line) == 0 or line[0] == "#":
91                 continue
92             f = line.split()
93             if not tclist and f[0] != "test":
94                 # That can't be good.
95                 raise E_badline
96             key = f.pop(0)
97             if key == 'test':
98                 if len(f) != 1:
99                     raise E_syntax
100                 # new testcase
101                 tclist.append({
102                     'pd': pd,
103                     'name': f[0],
104                     'pdlist': [],
105                     'output': [],
106                 })
107             elif key == 'protocol-decoder':
108                 if len(f) < 1:
109                     raise E_syntax
110                 pd_spec = {
111                     'name': f.pop(0),
112                     'probes': [],
113                     'options': [],
114                 }
115                 while len(f):
116                     if len(f) == 1:
117                         # Always needs <key> <value>
118                         raise E_syntax
119                     a, b = f[:2]
120                     f = f[2:]
121                     if '=' not in b:
122                         raise E_syntax
123                     opt, val = b.split('=')
124                     if a == 'probe':
125                         try:
126                             val = int(val)
127                         except:
128                             raise E_syntax
129                         pd_spec['probes'].append([opt, val])
130                     elif a == 'option':
131                         pd_spec['options'].append([opt, val])
132                     else:
133                         raise E_syntax
134                 tclist[-1]['pdlist'].append(pd_spec)
135             elif key == 'stack':
136                 if len(f) < 2:
137                     raise E_syntax
138                 tclist[-1]['stack'] = f
139             elif key == 'input':
140                 if len(f) != 1:
141                     raise E_syntax
142                 tclist[-1]['input'] = f[0]
143             elif key == 'output':
144                 op_spec = {
145                     'pd': f.pop(0),
146                     'type': f.pop(0),
147                 }
148                 while len(f):
149                     if len(f) == 1:
150                         # Always needs <key> <value>
151                         raise E_syntax
152                     a, b = f[:2]
153                     f = f[2:]
154                     if a == 'class':
155                         op_spec['class'] = b
156                     elif a == 'match':
157                         op_spec['match'] = b
158                     else:
159                         raise E_syntax
160                 tclist[-1]['output'].append(op_spec)
161             else:
162                 raise E_badline
163         except E_badline as e:
164             ERR("Invalid syntax in %s: line '%s'" % (path, line))
165             return []
166         except E_syntax as e:
167             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
168             return []
169
170     # If a specific testcase was requested, keep only that one.
171     if tc is not None:
172         target_tc = None
173         for t in tclist:
174             if t['name'] == tc:
175                 target_tc = t
176                 break
177         # ...and a specific output type
178         if op_type is not None:
179             target_oplist = []
180             for op in target_tc['output']:
181                 if op['type'] == op_type:
182                     # ...and a specific output class
183                     if op_class is None or ('class' in op and op['class'] == op_class):
184                         target_oplist.append(op)
185                         DBG("match on [%s]" % str(op))
186             target_tc['output'] = target_oplist
187         if target_tc is None:
188             tclist = []
189         else:
190             tclist = [target_tc]
191     for t in tclist:
192         error = check_tclist(t)
193         if error:
194             ERR("Error in %s: %s" % (path, error))
195             return []
196
197     return tclist
198
199
200 def get_tests(testnames):
201     tests = []
202     for testspec in testnames:
203         # Optional testspec in the form i2c/rtc
204         tc = op_type = op_class = None
205         ts = testspec.strip("/").split("/")
206         pd = ts.pop(0)
207         if ts:
208             tc = ts.pop(0)
209         if ts:
210             op_type = ts.pop(0)
211         if ts:
212             op_class = ts.pop(0)
213         path = os.path.join(decoders_dir, pd)
214         if not os.path.isdir(path):
215             # User specified non-existent PD
216             raise Exception("%s not found." % path)
217         path = os.path.join(decoders_dir, pd, "test/test.conf")
218         if not os.path.exists(path):
219             # PD doesn't have any tests yet
220             continue
221         tests.append(parse_testfile(path, pd, tc, op_type, op_class))
222
223     return tests
224
225
226 def diff_text(f1, f2):
227     t1 = open(f1).readlines()
228     t2 = open(f2).readlines()
229     diff = []
230     d = Differ()
231     for line in d.compare(t1, t2):
232         if line[:2] in ('- ', '+ '):
233             diff.append(line.strip())
234
235     return diff
236
237
238 def compare_binary(f1, f2):
239     h1 = md5()
240     h1.update(open(f1, 'rb').read())
241     h2 = md5()
242     h2.update(open(f2, 'rb').read())
243     if h1.digest() == h2.digest():
244         result = None
245     else:
246         result = ["Binary output does not match."]
247
248     return result
249
250
251 def run_tests(tests, fix=False):
252     errors = 0
253     results = []
254     cmd = os.path.join(tests_dir, 'runtc')
255     for tclist in tests:
256         for tc in tclist:
257             args = [cmd]
258             if DEBUG > 1:
259                 args.append('-d')
260             for pd in tc['pdlist']:
261                 args.extend(['-P', pd['name']])
262                 for label, probe in pd['probes']:
263                     args.extend(['-p', "%s=%d" % (label, probe)])
264                 for option, value in pd['options']:
265                     args.extend(['-o', "%s=%s" % (option, value)])
266             args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
267             for op in tc['output']:
268                 name = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
269                 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
270                 if 'class' in op:
271                     opargs[-1] += ":%s" % op['class']
272                     name += "/%s" % op['class']
273                 if VERBOSE:
274                     dots = '.' * (60 - len(name) - 2)
275                     INFO("%s %s " % (name, dots), end='')
276                 results.append({
277                     'testcase': name,
278                 })
279                 try:
280                     fd, outfile = mkstemp()
281                     os.close(fd)
282                     opargs.extend(['-f', outfile])
283                     DBG("Running %s" % (' '.join(args + opargs)))
284                     p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
285                     stdout, stderr = p.communicate()
286                     if stdout:
287                         results[-1]['statistics'] = stdout.decode('utf-8').strip()
288                     if stderr:
289                         results[-1]['error'] = stderr.decode('utf-8').strip()
290                         errors += 1
291                     elif p.returncode != 0:
292                         # runtc indicated an error, but didn't output a
293                         # message on stderr about it
294                         results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
295                     if 'error' not in results[-1]:
296                         matchfile = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
297                         DBG("Comparing with %s" % matchfile)
298                         try:
299                             diff = diff_error = None
300                             if op['type'] in ('annotation', 'python'):
301                                 diff = diff_text(matchfile, outfile)
302                             elif op['type'] == 'binary':
303                                 diff = compare_binary(matchfile, outfile)
304                             else:
305                                 diff = ["Unsupported output type '%s'." % op['type']]
306                         except Exception as e:
307                             diff_error = e
308                         if fix:
309                             if diff or diff_error:
310                                 copy(outfile, matchfile)
311                                 DBG("Wrote %s" % matchfile)
312                         else:
313                             if diff:
314                                 results[-1]['diff'] = diff
315                             elif diff_error is not None:
316                                 raise diff_error
317                 except Exception as e:
318                     results[-1]['error'] = str(e)
319                 finally:
320                     os.unlink(outfile)
321                 if VERBOSE:
322                     if 'diff' in results[-1]:
323                         INFO("Output mismatch")
324                     elif 'error' in results[-1]:
325                         error = results[-1]['error']
326                         if len(error) > 20:
327                             error = error[:17] + '...'
328                         INFO(error)
329                     else:
330                         INFO("OK")
331                 gen_report(results[-1])
332
333     return results, errors
334
335
336 def gen_report(result):
337     out = []
338     if 'error' in result:
339         out.append("Error:")
340         out.append(result['error'])
341         out.append('')
342     if 'diff' in result:
343         out.append("Test output mismatch:")
344         out.extend(result['diff'])
345         out.append('')
346     if 'statistics' in result:
347         out.extend(["Statistics:", result['statistics']])
348         out.append('')
349
350     if out:
351         text = "Testcase: %s\n" % result['testcase']
352         text += '\n'.join(out)
353     else:
354         return
355
356     if report_dir:
357         filename = result['testcase'].replace('/', '_')
358         open(os.path.join(report_dir, filename), 'w').write(text)
359     else:
360         print(text)
361
362
363 def show_tests(tests):
364     for tclist in tests:
365         for tc in tclist:
366             print("Testcase: %s/%s" % (tc['pd'], tc['name']))
367             for pd in tc['pdlist']:
368                 print("  Protocol decoder: %s" % pd['name'])
369                 for label, probe in pd['probes']:
370                     print("    Probe %s=%d" % (label, probe))
371                 for option, value in pd['options']:
372                     print("    Option %s=%d" % (option, value))
373             if 'stack' in tc:
374                 print("  Stack: %s" % ' '.join(tc['stack']))
375             print("  Input: %s" % tc['input'])
376             for op in tc['output']:
377                 print("  Output:\n    Protocol decoder: %s" % op['pd'])
378                 print("    Type: %s" % op['type'])
379                 if 'class' in op:
380                     print("    Class: %s" % op['class'])
381                 print("    Match: %s" % op['match'])
382         print()
383
384
385 def list_tests(tests):
386     for tclist in tests:
387         for tc in tclist:
388             for op in tc['output']:
389                 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
390                 if 'class' in op:
391                     line += "/%s" % op['class']
392                 print(line)
393
394
395 #
396 # main
397 #
398
399 # project root
400 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
401 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
402 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
403 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
404
405 if len(sys.argv) == 1:
406     usage()
407
408 opt_all = opt_run = opt_show = opt_list = opt_fix = False
409 report_dir = None
410 opts, args = getopt(sys.argv[1:], "dvarslfR:S:")
411 for opt, arg in opts:
412     if opt == '-d':
413         DEBUG += 1
414     if opt == '-v':
415         VERBOSE = True
416     elif opt == '-a':
417         opt_all = True
418     elif opt == '-r':
419         opt_run = True
420     elif opt == '-s':
421         opt_show = True
422     elif opt == '-l':
423         opt_list = True
424     elif opt == '-f':
425         opt_fix = True
426     elif opt == '-R':
427         report_dir = arg
428     elif opt == '-S':
429         dumps_dir = arg
430
431 if opt_run and opt_show:
432     usage("Use either -s or -r, not both.")
433 if args and opt_all:
434     usage("Specify either -a or tests, not both.")
435 if report_dir is not None and not os.path.isdir(report_dir):
436     usage("%s is not a directory" % report_dir)
437
438 ret = 0
439 try:
440     if args:
441         testlist = get_tests(args)
442     elif opt_all:
443         testlist = get_tests(os.listdir(decoders_dir))
444     else:
445         usage("Specify either -a or tests.")
446
447     if opt_run:
448         if not os.path.isdir(dumps_dir):
449             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
450             sys.exit(1)
451         results, errors = run_tests(testlist, fix=opt_fix)
452         ret = errors
453     elif opt_show:
454         show_tests(testlist)
455     elif opt_list:
456         list_tests(testlist)
457     elif opt_fix:
458         run_tests(testlist, fix=True)
459     else:
460         usage()
461 except Exception as e:
462     print("Error: %s" % str(e))
463     if DEBUG:
464         raise
465
466 sys.exit(ret)
467