subunit: Initial work on using the standard TestResult class.
[kamenim/samba.git] / selftest / subunithelper.py
1 # Python module for parsing and generating the Subunit protocol
2 # (Samba-specific)
3 # Copyright (C) 2008-2009 Jelmer Vernooij <jelmer@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 __all__ = ['parse_results']
19
20 import re
21 import sys
22 import subunit
23 import testtools
24 import time
25
26 VALID_RESULTS = ['success', 'successful', 'failure', 'fail', 'skip', 'knownfail', 'error', 'xfail', 'skip-testsuite', 'testsuite-failure', 'testsuite-xfail', 'testsuite-success', 'testsuite-error']
27
28 def parse_results(msg_ops, statistics, fh):
29     expected_fail = 0
30     open_tests = []
31
32     while fh:
33         l = fh.readline()
34         if l == "":
35             break
36         parts = l.split(None, 1)
37         if not len(parts) == 2 or not l.startswith(parts[0]):
38             msg_ops.output_msg(l)
39             continue
40         command = parts[0].rstrip(":")
41         arg = parts[1]
42         if command in ("test", "testing"):
43             msg_ops.control_msg(l)
44             msg_ops.start_test(arg.rstrip())
45             open_tests.append(arg.rstrip())
46         elif command == "time":
47             msg_ops.control_msg(l)
48             grp = re.match(
49                 '(\d+)-(\d+)-(\d+) (\d+):(\d+):([.0-9]+)\n', arg)
50             if grp is None:
51                 grp = re.match(
52                     '(\d+)-(\d+)-(\d+) (\d+):(\d+):([.0-9]+)Z\n', arg)
53                 if grp is None:
54                     print "Unable to parse time line: %s" % arg
55             if grp is not None:
56                 msg_ops.report_time(time.mktime((int(grp.group(1)), int(grp.group(2)), int(grp.group(3)), int(grp.group(4)), int(grp.group(5)), int(float(grp.group(6))), 0, 0, 0)))
57         elif command in VALID_RESULTS:
58             msg_ops.control_msg(l)
59             result = command
60             grp = re.match("(.*?)( \[)?([ \t]*)( multipart)?\n", arg)
61             (testname, hasreason) = (grp.group(1), grp.group(2))
62             if hasreason:
63                 reason = ""
64                 # reason may be specified in next lines
65                 terminated = False
66                 while fh:
67                     l = fh.readline()
68                     if l == "":
69                         break
70                     msg_ops.control_msg(l)
71                     if l == "]\n":
72                         terminated = True
73                         break
74                     else:
75                         reason += l
76
77                 if not terminated:
78                     statistics['TESTS_ERROR']+=1
79                     msg_ops.end_test(testname, "error", True, 
80                                        "reason (%s) interrupted" % result)
81                     return 1
82             else:
83                 reason = None
84             if result in ("success", "successful"):
85                 try:
86                     open_tests.remove(testname)
87                 except ValueError:
88                     statistics['TESTS_ERROR']+=1
89                     msg_ops.end_test(testname, "error", True, 
90                             "Test was never started")
91                 else:
92                     statistics['TESTS_EXPECTED_OK']+=1
93                     msg_ops.end_test(testname, "success", False, reason)
94             elif result in ("xfail", "knownfail"):
95                 try:
96                     open_tests.remove(testname)
97                 except ValueError:
98                     statistics['TESTS_ERROR']+=1
99                     msg_ops.end_test(testname, "error", True, 
100                             "Test was never started")
101                 else:
102                     statistics['TESTS_EXPECTED_FAIL']+=1
103                     msg_ops.end_test(testname, "xfail", False, reason)
104                     expected_fail+=1
105             elif result in ("failure", "fail"):
106                 try:
107                     open_tests.remove(testname)
108                 except ValueError:
109                     statistics['TESTS_ERROR']+=1
110                     msg_ops.end_test(testname, "error", True, 
111                             "Test was never started")
112                 else:
113                     statistics['TESTS_UNEXPECTED_FAIL']+=1
114                     msg_ops.end_test(testname, "failure", True, reason)
115             elif result == "skip":
116                 statistics['TESTS_SKIP']+=1
117                 # Allow tests to be skipped without prior announcement of test
118                 last = open_tests.pop()
119                 if last is not None and last != testname:
120                     open_tests.append(testname)
121                 msg_ops.end_test(testname, "skip", False, reason)
122             elif result == "error":
123                 statistics['TESTS_ERROR']+=1
124                 try:
125                     open_tests.remove(testname)
126                 except ValueError:
127                     pass
128                 msg_ops.end_test(testname, "error", True, reason)
129             elif result == "skip-testsuite":
130                 msg_ops.skip_testsuite(testname)
131             elif result == "testsuite-success":
132                 msg_ops.end_testsuite(testname, "success", reason)
133             elif result == "testsuite-failure":
134                 msg_ops.end_testsuite(testname, "failure", reason)
135             elif result == "testsuite-xfail":
136                 msg_ops.end_testsuite(testname, "xfail", reason)
137             elif result == "testsuite-error":
138                 msg_ops.end_testsuite(testname, "error", reason)
139             else:
140                 raise AssertionError("Recognized but unhandled result %r" %
141                     result)
142         elif command == "testsuite":
143             msg_ops.start_testsuite(arg.strip())
144         elif command == "progress":
145             arg = arg.strip()
146             if arg == "pop":
147                 msg_ops.progress(None, subunit.PROGRESS_POP)
148             elif arg == "push":
149                 msg_ops.progress(None, subunit.PROGRESS_PUSH)
150             elif arg[0] in '+-':
151                 msg_ops.progress(int(arg), subunit.PROGRESS_CUR)
152             else:
153                 msg_ops.progress(int(arg), subunit.PROGRESS_SET)
154         else:
155             msg_ops.output_msg(l)
156
157     while open_tests:
158         msg_ops.end_test(open_tests.pop(), "error", True,
159                    "was started but never finished!")
160         statistics['TESTS_ERROR']+=1
161
162     if statistics['TESTS_ERROR'] > 0:
163         return 1
164     if statistics['TESTS_UNEXPECTED_FAIL'] > 0:
165         return 1 
166     return 0
167
168
169 class SubunitOps(object):
170
171     def start_test(self, testname):
172         print "test: %s" % testname
173
174     def end_test(self, name, result, reason=None):
175         if reason:
176             print "%s: %s [" % (result, name)
177             print reason
178             print "]"
179         else:
180             print "%s: %s" % (result, name)
181
182     def skip_test(self, name, reason=None):
183         self.end_test(name, "skip", reason)
184
185     def fail_test(self, name, reason=None):
186         self.end_test(name, "fail", reason)
187
188     def success_test(self, name, reason=None):
189         self.end_test(name, "success", reason)
190
191     def xfail_test(self, name, reason=None):
192         self.end_test(name, "xfail", reason)
193
194     def report_time(self, t):
195         (year, mon, mday, hour, min, sec, wday, yday, isdst) = time.localtime(t)
196         print "time: %04d-%02d-%02d %02d:%02d:%02d" % (year, mon, mday, hour, min, sec)
197
198     def progress(self, offset, whence):
199         if whence == subunit.PROGRESS_CUR and offset > -1:
200             prefix = "+"
201         elif whence == subunit.PROGRESS_PUSH:
202             prefix = ""
203             offset = "push"
204         elif whence == subunit.PROGRESS_POP:
205             prefix = ""
206             offset = "pop"
207         else:
208             prefix = ""
209         print "progress: %s%s" % (prefix, offset)
210
211     # The following are Samba extensions:
212     def start_testsuite(self, name):
213         print "testsuite: %s" % name
214
215     def skip_testsuite(self, name, reason=None):
216         if reason:
217             print "skip-testsuite: %s [\n%s\n]" % (name, reason)
218         else:
219             print "skip-testsuite: %s" % name
220
221     def end_testsuite(self, name, result, reason=None):
222         if reason:
223             print "testsuite-%s: %s [" % (result, name)
224             print "%s" % reason
225             print "]"
226         else:
227             print "testsuite-%s: %s" % (result, name)
228
229
230 def read_test_regexes(name):
231     ret = {}
232     f = open(name, 'r')
233     try:
234         for l in f:
235             l = l.strip()
236             if l == "" or l[0] == "#":
237                 continue
238             if "#" in l:
239                 (regex, reason) = l.split("#", 1)
240                 ret[regex.strip()] = reason.strip()
241             else:
242                 ret[l] = None
243     finally:
244         f.close()
245     return ret
246
247
248 def find_in_list(regexes, fullname):
249     for regex, reason in regexes.iteritems():
250         if re.match(regex, fullname):
251             if reason is None:
252                 return ""
253             return reason
254     return None
255
256
257 class FilterOps(testtools.testresult.TestResult):
258
259     def control_msg(self, msg):
260         pass # We regenerate control messages, so ignore this
261
262     def report_time(self, time):
263         self._ops.report_time(time)
264
265     def progress(self, delta, whence):
266         self._ops.progress(delta, whence)
267
268     def output_msg(self, msg):
269         if self.output is None:
270             sys.stdout.write(msg)
271         else:
272             self.output+=msg
273
274     def start_test(self, testname):
275         if self.prefix is not None:
276             testname = self.prefix + testname
277
278         if self.strip_ok_output:
279            self.output = ""
280
281         self._ops.start_test(testname)
282
283     def end_test(self, testname, result, unexpected, reason):
284         if self.prefix is not None:
285             testname = self.prefix + testname
286
287         if result in ("fail", "failure") and not unexpected:
288             result = "xfail"
289             self.xfail_added+=1
290             self.total_xfail+=1
291         xfail_reason = find_in_list(self.expected_failures, testname)
292         if xfail_reason is not None and result in ("fail", "failure"):
293             result = "xfail"
294             self.xfail_added+=1
295             self.total_xfail+=1
296             reason += xfail_reason
297
298         if result in ("fail", "failure"):
299             self.fail_added+=1
300             self.total_fail+=1
301
302         if result == "error":
303             self.error_added+=1
304             self.total_error+=1
305
306         if self.strip_ok_output:
307             if result not in ("success", "xfail", "skip"):
308                 print self.output
309         self.output = None
310
311         self._ops.end_test(testname, result, reason)
312
313     def skip_testsuite(self, name, reason=None):
314         self._ops.skip_testsuite(name, reason)
315
316     def start_testsuite(self, name):
317         self._ops.start_testsuite(name)
318
319         self.error_added = 0
320         self.fail_added = 0
321         self.xfail_added = 0
322
323     def end_testsuite(self, name, result, reason=None):
324         xfail = False
325
326         if self.xfail_added > 0:
327             xfail = True
328         if self.fail_added > 0 or self.error_added > 0:
329             xfail = False
330
331         if xfail and result in ("fail", "failure"):
332             result = "xfail"
333
334         if self.fail_added > 0 and result != "failure":
335             result = "failure"
336             if reason is None:
337                 reason = "Subunit/Filter Reason"
338             reason += "\n failures[%d]" % self.fail_added
339
340         if self.error_added > 0 and result != "error":
341             result = "error"
342             if reason is None:
343                 reason = "Subunit/Filter Reason"
344             reason += "\n errors[%d]" % self.error_added
345
346         self._ops.end_testsuite(name, result, reason)
347
348     def __init__(self, prefix, expected_failures, strip_ok_output):
349         self._ops = SubunitOps()
350         self.output = None
351         self.prefix = prefix
352         self.expected_failures = expected_failures
353         self.strip_ok_output = strip_ok_output
354         self.xfail_added = 0
355         self.total_xfail = 0
356         self.total_error = 0
357         self.total_fail = 0