2a90f2c2e965f4e84d0b5037dccbad2b7be559b0
[libsigrokdecode.git] / tests / pdtest
1 #!/usr/bin/env /usr/bin/python3
2
3 import os
4 import sys
5 from getopt import getopt
6 from tempfile import mkstemp
7 from subprocess import Popen, PIPE
8 from difflib import Differ
9 from hashlib import md5
10 from shutil import copy
11
12 DEBUG = 0
13 VERBOSE = False
14
15
16 class E_syntax(Exception):
17     pass
18 class E_badline(Exception):
19     pass
20
21 def INFO(msg, end='\n'):
22     if VERBOSE:
23         print(msg, end=end)
24         sys.stdout.flush()
25
26
27 def DBG(msg):
28     if DEBUG:
29         print(msg)
30
31
32 def ERR(msg):
33     print(msg, file=sys.stderr)
34
35
36 def usage(msg=None):
37     if msg:
38         print(msg.strip() + '\n')
39     print("""Usage: testpd [-dvarslR] [test, ...]
40   -d  Turn on debugging
41   -v  Verbose
42   -a  All tests
43   -l  List all tests
44   -s  Show test(s)
45   -r  Run test(s)
46   -f  Fix failed test(s)
47   -R <directory>  Save test reports to <directory>
48   <test>  Protocol decoder name ("i2c") and optionally test name ("i2c/icc")""")
49     sys.exit()
50
51
52 def check_tclist(tc):
53     if 'pdlist' not in tc or not tc['pdlist']:
54         return("No protocol decoders")
55     if 'input' not in tc or not tc['input']:
56         return("No input")
57     if 'output' not in tc or not tc['output']:
58         return("No output")
59     for op in tc['output']:
60         if 'match' not in op:
61             return("No match in output")
62
63     return None
64
65
66 def parse_testfile(path, pd, tc, op_type, op_class):
67     DBG("Opening '%s'" % path)
68     tclist = []
69     for line in open(path).read().split('\n'):
70         try:
71             line = line.strip()
72             if len(line) == 0 or line[0] == "#":
73                 continue
74             f = line.split()
75             if not tclist and f[0] != "test":
76                 # That can't be good.
77                 raise E_badline
78             key = f.pop(0)
79             if key == 'test':
80                 if len(f) != 1:
81                     raise E_syntax
82                 # new testcase
83                 tclist.append({
84                     'pd': pd,
85                     'name': f[0],
86                     'pdlist': [],
87                     'output': [],
88                 })
89             elif key == 'protocol-decoder':
90                 if len(f) < 1:
91                     raise E_syntax
92                 pd_spec = {
93                     'name': f.pop(0),
94                     'probes': [],
95                     'options': [],
96                 }
97                 while len(f):
98                     if len(f) == 1:
99                         # Always needs <key> <value>
100                         raise E_syntax
101                     a, b = f[:2]
102                     f = f[2:]
103                     if '=' not in b:
104                         raise E_syntax
105                     opt, val = b.split('=')
106                     if a == 'probe':
107                         try:
108                             val = int(val)
109                         except:
110                             raise E_syntax
111                         pd_spec['probes'].append([opt, val])
112                     elif a == 'option':
113                         pd_spec['options'].append([opt, val])
114                     else:
115                         raise E_syntax
116                 tclist[-1]['pdlist'].append(pd_spec)
117             elif key == 'stack':
118                 if len(f) < 2:
119                     raise E_syntax
120                 tclist[-1]['stack'] = f
121             elif key == 'input':
122                 if len(f) != 1:
123                     raise E_syntax
124                 tclist[-1]['input'] = f[0]
125             elif key == 'output':
126                 op_spec = {
127                     'pd': f.pop(0),
128                     'type': f.pop(0),
129                 }
130                 while len(f):
131                     if len(f) == 1:
132                         # Always needs <key> <value>
133                         raise E_syntax
134                     a, b = f[:2]
135                     f = f[2:]
136                     if a == 'class':
137                         op_spec['class'] = b
138                     elif a == 'match':
139                         op_spec['match'] = b
140                     else:
141                         raise E_syntax
142                 tclist[-1]['output'].append(op_spec)
143             else:
144                 raise E_badline
145         except E_badline as e:
146             ERR("Invalid syntax in %s: line '%s'" % (path, line))
147             return []
148         except E_syntax as e:
149             ERR("Unable to parse %s: unknown line '%s'" % (path, line))
150             return []
151
152     # If a specific testcase was requested, keep only that one.
153     if tc is not None:
154         target_tc = None
155         for t in tclist:
156             if t['name'] == tc:
157                 target_tc = t
158                 break
159         # ...and a specific output type
160         if op_type is not None:
161             target_oplist = []
162             for op in target_tc['output']:
163                 if op['type'] == op_type:
164                     # ...and a specific output class
165                     if op_class is None or ('class' in op and op['class'] == op_class):
166                         target_oplist.append(op)
167                         DBG("match on [%s]" % str(op))
168             target_tc['output'] = target_oplist
169         if target_tc is None:
170             tclist = []
171         else:
172             tclist = [target_tc]
173     for t in tclist:
174         error = check_tclist(t)
175         if error:
176             ERR("Error in %s: %s" % (path, error))
177             return []
178
179     return tclist
180
181
182 def get_tests(testnames):
183     tests = []
184     for testspec in testnames:
185         # Optional testspec in the form i2c/rtc
186         tc = op_type = op_class = None
187         ts = testspec.strip("/").split("/")
188         pd = ts.pop(0)
189         if ts:
190             tc = ts.pop(0)
191         if ts:
192             op_type = ts.pop(0)
193         if ts:
194             op_class = ts.pop(0)
195         path = os.path.join(decoders_dir, pd)
196         if not os.path.isdir(path):
197             # User specified non-existent PD
198             raise Exception("%s not found." % path)
199         path = os.path.join(decoders_dir, pd, "test/test.conf")
200         if not os.path.exists(path):
201             # PD doesn't have any tests yet
202             continue
203         tests.append(parse_testfile(path, pd, tc, op_type, op_class))
204
205     return tests
206
207
208 def diff_textfiles(f1, f2):
209     t1 = open(f1).readlines()
210     t2 = open(f2).readlines()
211     diff = []
212     d = Differ()
213     for line in d.compare(t1, t2):
214         if line[:2] in ('- ', '+ '):
215             diff.append(line.strip())
216
217     return diff
218
219
220 def compare_binfiles(f1, f2):
221     h1 = md5()
222     h1.update(open(f1, 'rb').read())
223     h2 = md5()
224     h2.update(open(f2, 'rb').read())
225     if h1.digest() == h2.digest():
226         result = None
227     else:
228         result = ["Binary output does not match."]
229
230     return result
231
232
233 def run_tests(tests, fix=False):
234     errors = 0
235     results = []
236     cmd = os.path.join(tests_dir, 'runtc')
237     for tclist in tests:
238         for tc in tclist:
239             args = [cmd]
240             if DEBUG > 1:
241                 args.append('-d')
242             for pd in tc['pdlist']:
243                 args.extend(['-P', pd['name']])
244                 for label, probe in pd['probes']:
245                     args.extend(['-p', "%s=%d" % (label, probe)])
246                 for option, value in pd['options']:
247                     args.extend(['-o', "%s=%s" % (option, value)])
248             args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
249             for op in tc['output']:
250                 name = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
251                 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
252                 if 'class' in op:
253                     opargs[-1] += ":%s" % op['class']
254                     name += "/%s" % op['class']
255                 if VERBOSE:
256                     dots = '.' * (60 - len(name) - 2)
257                     INFO("%s %s " % (name, dots), end='')
258                 results.append({
259                     'testcase': name,
260                 })
261                 try:
262                     fd, outfile = mkstemp()
263                     os.close(fd)
264                     opargs.extend(['-f', outfile])
265                     DBG("Running %s" % (' '.join(args + opargs)))
266                     p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
267                     stdout, stderr = p.communicate()
268                     if stdout:
269                         results[-1]['statistics'] = stdout.decode('utf-8').strip()
270                     if stderr:
271                         results[-1]['error'] = stderr.decode('utf-8').strip()
272                         errors += 1
273                     elif p.returncode != 0:
274                         # runtc indicated an error, but didn't output a
275                         # message on stderr about it
276                         results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
277                     if 'error' not in results[-1]:
278                         match = os.path.join(decoders_dir, op['pd'], 'test', op['match'])
279                         try:
280                             diff = diff_error = None
281                             if op['type'] == 'annotation':
282                                 diff = diff_textfiles(match, outfile)
283                             elif op['type'] == 'binary':
284                                 diff = compare_binfiles(match, outfile)
285                             else:
286                                 diff = ["Unsupported output type '%s'." % op['type']]
287                         except Exception as e:
288                             diff_error = e
289                         if fix:
290                             if diff or diff_error:
291                                 copy(outfile, match)
292                                 DBG("Wrote %s" % match)
293                         else:
294                             if diff:
295                                 results[-1]['diff'] = diff
296                             elif diff_error is not None:
297                                 raise diff_error
298                 except Exception as e:
299                     results[-1]['error'] = str(e)
300                 finally:
301                     os.unlink(outfile)
302                 if VERBOSE:
303                     if 'diff' in results[-1]:
304                         INFO("Output mismatch")
305                     elif 'error' in results[-1]:
306                         error = results[-1]['error']
307                         if len(error) > 20:
308                             error = error[:17] + '...'
309                         INFO(error)
310                     else:
311                         INFO("OK")
312                 gen_report(results[-1])
313
314     return results, errors
315
316
317 def gen_report(result):
318     out = []
319     if 'error' in result:
320         out.append("Error:")
321         out.append(result['error'])
322         out.append('')
323     if 'diff' in result:
324         out.append("Test output mismatch:")
325         out.extend(result['diff'])
326         out.append('')
327     if 'statistics' in result:
328         out.extend(["Statistics:", result['statistics']])
329         out.append('')
330
331     if out:
332         text = "Testcase: %s\n" % result['testcase']
333         text += '\n'.join(out)
334     else:
335         return
336
337     if report_dir:
338         filename = result['testcase'].replace('/', '_')
339         open(os.path.join(report_dir, filename), 'w').write(text)
340     else:
341         print(text)
342
343
344 def show_tests(tests):
345     for tclist in tests:
346         for tc in tclist:
347             print("Testcase: %s/%s" % (tc['pd'], tc['name']))
348             for pd in tc['pdlist']:
349                 print("  Protocol decoder: %s" % pd['name'])
350                 for label, probe in pd['probes']:
351                     print("    Probe %s=%d" % (label, probe))
352                 for option, value in pd['options']:
353                     print("    Option %s=%d" % (option, value))
354             if 'stack' in tc:
355                 print("  Stack: %s" % ' '.join(tc['stack']))
356             print("  Input: %s" % tc['input'])
357             for op in tc['output']:
358                 print("  Output:\n    Protocol decoder: %s" % op['pd'])
359                 print("    Type: %s" % op['type'])
360                 if 'class' in op:
361                     print("    Class: %s" % op['class'])
362                 print("    Match: %s" % op['match'])
363         print()
364
365
366 def list_tests(tests):
367     for tclist in tests:
368         for tc in tclist:
369             for op in tc['output']:
370                 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
371                 if 'class' in op:
372                     line += "/%s" % op['class']
373                 print(line)
374
375
376 #
377 # main
378 #
379
380 # project root
381 tests_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
382 base_dir = os.path.abspath(os.path.join(os.curdir, tests_dir, os.path.pardir))
383 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
384 decoders_dir = os.path.abspath(os.path.join(base_dir, 'decoders'))
385
386 if len(sys.argv) == 1:
387     usage()
388
389 opt_all = opt_run = opt_show = opt_list = opt_fix = False
390 report_dir = None
391 opts, args = getopt(sys.argv[1:], "dvarslfRS:")
392 for opt, arg in opts:
393     if opt == '-d':
394         DEBUG += 1
395     if opt == '-v':
396         VERBOSE = True
397     elif opt == '-a':
398         opt_all = True
399     elif opt == '-r':
400         opt_run = True
401     elif opt == '-s':
402         opt_show = True
403     elif opt == '-l':
404         opt_list = True
405     elif opt == '-f':
406         opt_fix = True
407     elif opt == '-R':
408         report_dir = arg
409     elif opt == '-S':
410         dumps_dir = arg
411
412 if opt_run and opt_show:
413     usage("Use either -s or -r, not both.")
414 if args and opt_all:
415     usage("Specify either -a or tests, not both.")
416 if report_dir is not None and not os.path.isdir(report_dir):
417     usage("%s is not a directory" % report_dir)
418
419 ret = 0
420 try:
421     if args:
422         testlist = get_tests(args)
423     elif opt_all:
424         testlist = get_tests(os.listdir(decoders_dir))
425     else:
426         usage("Specify either -a or tests.")
427
428     if opt_run:
429         if not os.path.isdir(dumps_dir):
430             ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
431             sys.exit(1)
432         results, errors = run_tests(testlist)
433         ret = errors
434     elif opt_show:
435         show_tests(testlist)
436     elif opt_list:
437         list_tests(testlist)
438     elif opt_fix:
439         run_tests(testlist, fix=True)
440     else:
441         usage()
442 except Exception as e:
443     print("Error: %s" % str(e))
444     if DEBUG:
445         raise
446
447 sys.exit(ret)
448