Remove unnecessary python path updates for bundled subunit/testtools.
[obnox/samba/samba-obnox.git] / lib / subunit / python / subunit / test_results.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2009  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """TestResult helper classes used to by subunit."""
18
19 import csv
20 import datetime
21
22 import testtools
23 from testtools.compat import all
24 from testtools.content import (
25     text_content,
26     TracebackContent,
27     )
28
29 from subunit import iso8601
30
31
32 # NOT a TestResult, because we are implementing the interface, not inheriting
33 # it.
34 class TestResultDecorator(object):
35     """General pass-through decorator.
36
37     This provides a base that other TestResults can inherit from to
38     gain basic forwarding functionality. It also takes care of
39     handling the case where the target doesn't support newer methods
40     or features by degrading them.
41     """
42
43     # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44     # we should gut this and just use that.
45
46     def __init__(self, decorated):
47         """Create a TestResultDecorator forwarding to decorated."""
48         # Make every decorator degrade gracefully.
49         self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
50
51     def startTest(self, test):
52         return self.decorated.startTest(test)
53
54     def startTestRun(self):
55         return self.decorated.startTestRun()
56
57     def stopTest(self, test):
58         return self.decorated.stopTest(test)
59
60     def stopTestRun(self):
61         return self.decorated.stopTestRun()
62
63     def addError(self, test, err=None, details=None):
64         return self.decorated.addError(test, err, details=details)
65
66     def addFailure(self, test, err=None, details=None):
67         return self.decorated.addFailure(test, err, details=details)
68
69     def addSuccess(self, test, details=None):
70         return self.decorated.addSuccess(test, details=details)
71
72     def addSkip(self, test, reason=None, details=None):
73         return self.decorated.addSkip(test, reason, details=details)
74
75     def addExpectedFailure(self, test, err=None, details=None):
76         return self.decorated.addExpectedFailure(test, err, details=details)
77
78     def addUnexpectedSuccess(self, test, details=None):
79         return self.decorated.addUnexpectedSuccess(test, details=details)
80
81     def _get_failfast(self):
82         return getattr(self.decorated, 'failfast', False)
83
84     def _set_failfast(self, value):
85         self.decorated.failfast = value
86     failfast = property(_get_failfast, _set_failfast)
87
88     def progress(self, offset, whence):
89         return self.decorated.progress(offset, whence)
90
91     def wasSuccessful(self):
92         return self.decorated.wasSuccessful()
93
94     @property
95     def shouldStop(self):
96         return self.decorated.shouldStop
97
98     def stop(self):
99         return self.decorated.stop()
100
101     @property
102     def testsRun(self):
103         return self.decorated.testsRun
104
105     def tags(self, new_tags, gone_tags):
106         return self.decorated.tags(new_tags, gone_tags)
107
108     def time(self, a_datetime):
109         return self.decorated.time(a_datetime)
110
111
112 class HookedTestResultDecorator(TestResultDecorator):
113     """A TestResult which calls a hook on every event."""
114
115     def __init__(self, decorated):
116         self.super = super(HookedTestResultDecorator, self)
117         self.super.__init__(decorated)
118
119     def startTest(self, test):
120         self._before_event()
121         return self.super.startTest(test)
122
123     def startTestRun(self):
124         self._before_event()
125         return self.super.startTestRun()
126
127     def stopTest(self, test):
128         self._before_event()
129         return self.super.stopTest(test)
130
131     def stopTestRun(self):
132         self._before_event()
133         return self.super.stopTestRun()
134
135     def addError(self, test, err=None, details=None):
136         self._before_event()
137         return self.super.addError(test, err, details=details)
138
139     def addFailure(self, test, err=None, details=None):
140         self._before_event()
141         return self.super.addFailure(test, err, details=details)
142
143     def addSuccess(self, test, details=None):
144         self._before_event()
145         return self.super.addSuccess(test, details=details)
146
147     def addSkip(self, test, reason=None, details=None):
148         self._before_event()
149         return self.super.addSkip(test, reason, details=details)
150
151     def addExpectedFailure(self, test, err=None, details=None):
152         self._before_event()
153         return self.super.addExpectedFailure(test, err, details=details)
154
155     def addUnexpectedSuccess(self, test, details=None):
156         self._before_event()
157         return self.super.addUnexpectedSuccess(test, details=details)
158
159     def progress(self, offset, whence):
160         self._before_event()
161         return self.super.progress(offset, whence)
162
163     def wasSuccessful(self):
164         self._before_event()
165         return self.super.wasSuccessful()
166
167     @property
168     def shouldStop(self):
169         self._before_event()
170         return self.super.shouldStop
171
172     def stop(self):
173         self._before_event()
174         return self.super.stop()
175
176     def time(self, a_datetime):
177         self._before_event()
178         return self.super.time(a_datetime)
179
180
181 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
182     """Decorate a TestResult to add time events to a test run.
183
184     By default this will cause a time event before every test event,
185     but if explicit time data is being provided by the test run, then
186     this decorator will turn itself off to prevent causing confusion.
187     """
188
189     def __init__(self, decorated):
190         self._time = None
191         super(AutoTimingTestResultDecorator, self).__init__(decorated)
192
193     def _before_event(self):
194         time = self._time
195         if time is not None:
196             return
197         time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
198         self.decorated.time(time)
199
200     def progress(self, offset, whence):
201         return self.decorated.progress(offset, whence)
202
203     @property
204     def shouldStop(self):
205         return self.decorated.shouldStop
206
207     def time(self, a_datetime):
208         """Provide a timestamp for the current test activity.
209
210         :param a_datetime: If None, automatically add timestamps before every
211             event (this is the default behaviour if time() is not called at
212             all).  If not None, pass the provided time onto the decorated
213             result object and disable automatic timestamps.
214         """
215         self._time = a_datetime
216         return self.decorated.time(a_datetime)
217
218
219 class TagsMixin(object):
220
221     def __init__(self):
222         self._clear_tags()
223
224     def _clear_tags(self):
225         self._global_tags = set(), set()
226         self._test_tags = None
227
228     def _get_active_tags(self):
229         global_new, global_gone = self._global_tags
230         if self._test_tags is None:
231             return set(global_new)
232         test_new, test_gone = self._test_tags
233         return global_new.difference(test_gone).union(test_new)
234
235     def _get_current_scope(self):
236         if self._test_tags:
237             return self._test_tags
238         return self._global_tags
239
240     def _flush_current_scope(self, tag_receiver):
241         new_tags, gone_tags = self._get_current_scope()
242         if new_tags or gone_tags:
243             tag_receiver.tags(new_tags, gone_tags)
244         if self._test_tags:
245             self._test_tags = set(), set()
246         else:
247             self._global_tags = set(), set()
248
249     def startTestRun(self):
250         self._clear_tags()
251
252     def startTest(self, test):
253         self._test_tags = set(), set()
254
255     def stopTest(self, test):
256         self._test_tags = None
257
258     def tags(self, new_tags, gone_tags):
259         """Handle tag instructions.
260
261         Adds and removes tags as appropriate. If a test is currently running,
262         tags are not affected for subsequent tests.
263
264         :param new_tags: Tags to add,
265         :param gone_tags: Tags to remove.
266         """
267         current_new_tags, current_gone_tags = self._get_current_scope()
268         current_new_tags.update(new_tags)
269         current_new_tags.difference_update(gone_tags)
270         current_gone_tags.update(gone_tags)
271         current_gone_tags.difference_update(new_tags)
272
273
274 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
275     """Collapses many 'tags' calls into one where possible."""
276
277     def __init__(self, result):
278         super(TagCollapsingDecorator, self).__init__(result)
279         self._clear_tags()
280
281     def _before_event(self):
282         self._flush_current_scope(self.decorated)
283
284     def tags(self, new_tags, gone_tags):
285         TagsMixin.tags(self, new_tags, gone_tags)
286
287
288 class TimeCollapsingDecorator(HookedTestResultDecorator):
289     """Only pass on the first and last of a consecutive sequence of times."""
290
291     def __init__(self, decorated):
292         super(TimeCollapsingDecorator, self).__init__(decorated)
293         self._last_received_time = None
294         self._last_sent_time = None
295
296     def _before_event(self):
297         if self._last_received_time is None:
298             return
299         if self._last_received_time != self._last_sent_time:
300             self.decorated.time(self._last_received_time)
301             self._last_sent_time = self._last_received_time
302         self._last_received_time = None
303
304     def time(self, a_time):
305         # Don't upcall, because we don't want to call _before_event, it's only
306         # for non-time events.
307         if self._last_received_time is None:
308             self.decorated.time(a_time)
309             self._last_sent_time = a_time
310         self._last_received_time = a_time
311
312
313 def and_predicates(predicates):
314     """Return a predicate that is true iff all predicates are true."""
315     # XXX: Should probably be in testtools to be better used by matchers. jml
316     return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
317
318
319 def make_tag_filter(with_tags, without_tags):
320     """Make a callback that checks tests against tags."""
321
322     with_tags = with_tags and set(with_tags) or None
323     without_tags = without_tags and set(without_tags) or None
324
325     def check_tags(test, outcome, err, details, tags):
326         if with_tags and not with_tags <= tags:
327             return False
328         if without_tags and bool(without_tags & tags):
329             return False
330         return True
331
332     return check_tags
333
334
335 class _PredicateFilter(TestResultDecorator, TagsMixin):
336
337     def __init__(self, result, predicate):
338         super(_PredicateFilter, self).__init__(result)
339         self._clear_tags()
340         self.decorated = TimeCollapsingDecorator(
341             TagCollapsingDecorator(self.decorated))
342         self._predicate = predicate
343         # The current test (for filtering tags)
344         self._current_test = None
345         # Has the current test been filtered (for outputting test tags)
346         self._current_test_filtered = None
347         # Calls to this result that we don't know whether to forward on yet.
348         self._buffered_calls = []
349
350     def filter_predicate(self, test, outcome, error, details):
351         return self._predicate(
352             test, outcome, error, details, self._get_active_tags())
353
354     def addError(self, test, err=None, details=None):
355         if (self.filter_predicate(test, 'error', err, details)):
356             self._buffered_calls.append(
357                 ('addError', [test, err], {'details': details}))
358         else:
359             self._filtered()
360
361     def addFailure(self, test, err=None, details=None):
362         if (self.filter_predicate(test, 'failure', err, details)):
363             self._buffered_calls.append(
364                 ('addFailure', [test, err], {'details': details}))
365         else:
366             self._filtered()
367
368     def addSkip(self, test, reason=None, details=None):
369         if (self.filter_predicate(test, 'skip', reason, details)):
370             self._buffered_calls.append(
371                 ('addSkip', [test, reason], {'details': details}))
372         else:
373             self._filtered()
374
375     def addExpectedFailure(self, test, err=None, details=None):
376         if self.filter_predicate(test, 'expectedfailure', err, details):
377             self._buffered_calls.append(
378                 ('addExpectedFailure', [test, err], {'details': details}))
379         else:
380             self._filtered()
381
382     def addUnexpectedSuccess(self, test, details=None):
383         self._buffered_calls.append(
384             ('addUnexpectedSuccess', [test], {'details': details}))
385
386     def addSuccess(self, test, details=None):
387         if (self.filter_predicate(test, 'success', None, details)):
388             self._buffered_calls.append(
389                 ('addSuccess', [test], {'details': details}))
390         else:
391             self._filtered()
392
393     def _filtered(self):
394         self._current_test_filtered = True
395
396     def startTest(self, test):
397         """Start a test.
398
399         Not directly passed to the client, but used for handling of tags
400         correctly.
401         """
402         TagsMixin.startTest(self, test)
403         self._current_test = test
404         self._current_test_filtered = False
405         self._buffered_calls.append(('startTest', [test], {}))
406
407     def stopTest(self, test):
408         """Stop a test.
409
410         Not directly passed to the client, but used for handling of tags
411         correctly.
412         """
413         if not self._current_test_filtered:
414             for method, args, kwargs in self._buffered_calls:
415                 getattr(self.decorated, method)(*args, **kwargs)
416             self.decorated.stopTest(test)
417         self._current_test = None
418         self._current_test_filtered = None
419         self._buffered_calls = []
420         TagsMixin.stopTest(self, test)
421
422     def tags(self, new_tags, gone_tags):
423         TagsMixin.tags(self, new_tags, gone_tags)
424         if self._current_test is not None:
425             self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
426         else:
427             return super(_PredicateFilter, self).tags(new_tags, gone_tags)
428
429     def time(self, a_time):
430         return self.decorated.time(a_time)
431
432     def id_to_orig_id(self, id):
433         if id.startswith("subunit.RemotedTestCase."):
434             return id[len("subunit.RemotedTestCase."):]
435         return id
436
437
438 class TestResultFilter(TestResultDecorator):
439     """A pyunit TestResult interface implementation which filters tests.
440
441     Tests that pass the filter are handed on to another TestResult instance
442     for further processing/reporting. To obtain the filtered results,
443     the other instance must be interrogated.
444
445     :ivar result: The result that tests are passed to after filtering.
446     :ivar filter_predicate: The callback run to decide whether to pass
447         a result.
448     """
449
450     def __init__(self, result, filter_error=False, filter_failure=False,
451         filter_success=True, filter_skip=False, filter_xfail=False,
452         filter_predicate=None, fixup_expected_failures=None):
453         """Create a FilterResult object filtering to result.
454
455         :param filter_error: Filter out errors.
456         :param filter_failure: Filter out failures.
457         :param filter_success: Filter out successful tests.
458         :param filter_skip: Filter out skipped tests.
459         :param filter_xfail: Filter out expected failure tests.
460         :param filter_predicate: A callable taking (test, outcome, err,
461             details, tags) and returning True if the result should be passed
462             through.  err and details may be none if no error or extra
463             metadata is available. outcome is the name of the outcome such
464             as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
465             are still supported but should be updated to accept the tags
466             parameter for efficiency.
467         :param fixup_expected_failures: Set of test ids to consider known
468             failing.
469         """
470         predicates = []
471         if filter_error:
472             predicates.append(
473                 lambda t, outcome, e, d, tags: outcome != 'error')
474         if filter_failure:
475             predicates.append(
476                 lambda t, outcome, e, d, tags: outcome != 'failure')
477         if filter_success:
478             predicates.append(
479                 lambda t, outcome, e, d, tags: outcome != 'success')
480         if filter_skip:
481             predicates.append(
482                 lambda t, outcome, e, d, tags: outcome != 'skip')
483         if filter_xfail:
484             predicates.append(
485                 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
486         if filter_predicate is not None:
487             def compat(test, outcome, error, details, tags):
488                 # 0.0.7 and earlier did not support the 'tags' parameter.
489                 try:
490                     return filter_predicate(
491                         test, outcome, error, details, tags)
492                 except TypeError:
493                     return filter_predicate(test, outcome, error, details)
494             predicates.append(compat)
495         predicate = and_predicates(predicates)
496         super(TestResultFilter, self).__init__(
497             _PredicateFilter(result, predicate))
498         if fixup_expected_failures is None:
499             self._fixup_expected_failures = frozenset()
500         else:
501             self._fixup_expected_failures = fixup_expected_failures
502
503     def addError(self, test, err=None, details=None):
504         if self._failure_expected(test):
505             self.addExpectedFailure(test, err=err, details=details)
506         else:
507             super(TestResultFilter, self).addError(
508                 test, err=err, details=details)
509
510     def addFailure(self, test, err=None, details=None):
511         if self._failure_expected(test):
512             self.addExpectedFailure(test, err=err, details=details)
513         else:
514             super(TestResultFilter, self).addFailure(
515                 test, err=err, details=details)
516
517     def addSuccess(self, test, details=None):
518         if self._failure_expected(test):
519             self.addUnexpectedSuccess(test, details=details)
520         else:
521             super(TestResultFilter, self).addSuccess(test, details=details)
522
523     def _failure_expected(self, test):
524         return (test.id() in self._fixup_expected_failures)
525
526
527 class TestIdPrintingResult(testtools.TestResult):
528
529     def __init__(self, stream, show_times=False):
530         """Create a FilterResult object outputting to stream."""
531         super(TestIdPrintingResult, self).__init__()
532         self._stream = stream
533         self.failed_tests = 0
534         self.__time = None
535         self.show_times = show_times
536         self._test = None
537         self._test_duration = 0
538
539     def addError(self, test, err):
540         self.failed_tests += 1
541         self._test = test
542
543     def addFailure(self, test, err):
544         self.failed_tests += 1
545         self._test = test
546
547     def addSuccess(self, test):
548         self._test = test
549
550     def addSkip(self, test, reason=None, details=None):
551         self._test = test
552
553     def addUnexpectedSuccess(self, test, details=None):
554         self.failed_tests += 1
555         self._test = test
556
557     def addExpectedFailure(self, test, err=None, details=None):
558         self._test = test
559
560     def reportTest(self, test, duration):
561         if self.show_times:
562             seconds = duration.seconds
563             seconds += duration.days * 3600 * 24
564             seconds += duration.microseconds / 1000000.0
565             self._stream.write(test.id() + ' %0.3f\n' % seconds)
566         else:
567             self._stream.write(test.id() + '\n')
568
569     def startTest(self, test):
570         self._start_time = self._time()
571
572     def stopTest(self, test):
573         test_duration = self._time() - self._start_time
574         self.reportTest(self._test, test_duration)
575
576     def time(self, time):
577         self.__time = time
578
579     def _time(self):
580         return self.__time
581
582     def wasSuccessful(self):
583         "Tells whether or not this result was a success"
584         return self.failed_tests == 0
585
586
587 class TestByTestResult(testtools.TestResult):
588     """Call something every time a test completes."""
589
590 # XXX: In testtools since lp:testtools r249.  Once that's released, just
591 # import that.
592
593     def __init__(self, on_test):
594         """Construct a ``TestByTestResult``.
595
596         :param on_test: A callable that take a test case, a status (one of
597             "success", "failure", "error", "skip", or "xfail"), a start time
598             (a ``datetime`` with timezone), a stop time, an iterable of tags,
599             and a details dict. Is called at the end of each test (i.e. on
600             ``stopTest``) with the accumulated values for that test.
601         """
602         super(TestByTestResult, self).__init__()
603         self._on_test = on_test
604
605     def startTest(self, test):
606         super(TestByTestResult, self).startTest(test)
607         self._start_time = self._now()
608         # There's no supported (i.e. tested) behaviour that relies on these
609         # being set, but it makes me more comfortable all the same. -- jml
610         self._status = None
611         self._details = None
612         self._stop_time = None
613
614     def stopTest(self, test):
615         self._stop_time = self._now()
616         super(TestByTestResult, self).stopTest(test)
617         self._on_test(
618             test=test,
619             status=self._status,
620             start_time=self._start_time,
621             stop_time=self._stop_time,
622             # current_tags is new in testtools 0.9.13.
623             tags=getattr(self, 'current_tags', None),
624             details=self._details)
625
626     def _err_to_details(self, test, err, details):
627         if details:
628             return details
629         return {'traceback': TracebackContent(err, test)}
630
631     def addSuccess(self, test, details=None):
632         super(TestByTestResult, self).addSuccess(test)
633         self._status = 'success'
634         self._details = details
635
636     def addFailure(self, test, err=None, details=None):
637         super(TestByTestResult, self).addFailure(test, err, details)
638         self._status = 'failure'
639         self._details = self._err_to_details(test, err, details)
640
641     def addError(self, test, err=None, details=None):
642         super(TestByTestResult, self).addError(test, err, details)
643         self._status = 'error'
644         self._details = self._err_to_details(test, err, details)
645
646     def addSkip(self, test, reason=None, details=None):
647         super(TestByTestResult, self).addSkip(test, reason, details)
648         self._status = 'skip'
649         if details is None:
650             details = {'reason': text_content(reason)}
651         elif reason:
652             # XXX: What if details already has 'reason' key?
653             details['reason'] = text_content(reason)
654         self._details = details
655
656     def addExpectedFailure(self, test, err=None, details=None):
657         super(TestByTestResult, self).addExpectedFailure(test, err, details)
658         self._status = 'xfail'
659         self._details = self._err_to_details(test, err, details)
660
661     def addUnexpectedSuccess(self, test, details=None):
662         super(TestByTestResult, self).addUnexpectedSuccess(test, details)
663         self._status = 'success'
664         self._details = details
665
666
667 class CsvResult(TestByTestResult):
668
669     def __init__(self, stream):
670         super(CsvResult, self).__init__(self._on_test)
671         self._write_row = csv.writer(stream).writerow
672
673     def _on_test(self, test, status, start_time, stop_time, tags, details):
674         self._write_row([test.id(), status, start_time, stop_time])
675
676     def startTestRun(self):
677         super(CsvResult, self).startTestRun()
678         self._write_row(['test', 'status', 'start_time', 'stop_time'])