2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2009 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """TestResult helper classes used to by subunit."""
23 from testtools.compat import all
24 from testtools.content import (
29 from subunit import iso8601
32 # NOT a TestResult, because we are implementing the interface, not inheriting
34 class TestResultDecorator(object):
35 """General pass-through decorator.
37 This provides a base that other TestResults can inherit from to
38 gain basic forwarding functionality. It also takes care of
39 handling the case where the target doesn't support newer methods
40 or features by degrading them.
43 # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
44 # we should gut this and just use that.
46 def __init__(self, decorated):
47 """Create a TestResultDecorator forwarding to decorated."""
48 # Make every decorator degrade gracefully.
49 self.decorated = testtools.ExtendedToOriginalDecorator(decorated)
51 def startTest(self, test):
52 return self.decorated.startTest(test)
54 def startTestRun(self):
55 return self.decorated.startTestRun()
57 def stopTest(self, test):
58 return self.decorated.stopTest(test)
60 def stopTestRun(self):
61 return self.decorated.stopTestRun()
63 def addError(self, test, err=None, details=None):
64 return self.decorated.addError(test, err, details=details)
66 def addFailure(self, test, err=None, details=None):
67 return self.decorated.addFailure(test, err, details=details)
69 def addSuccess(self, test, details=None):
70 return self.decorated.addSuccess(test, details=details)
72 def addSkip(self, test, reason=None, details=None):
73 return self.decorated.addSkip(test, reason, details=details)
75 def addExpectedFailure(self, test, err=None, details=None):
76 return self.decorated.addExpectedFailure(test, err, details=details)
78 def addUnexpectedSuccess(self, test, details=None):
79 return self.decorated.addUnexpectedSuccess(test, details=details)
81 def _get_failfast(self):
82 return getattr(self.decorated, 'failfast', False)
84 def _set_failfast(self, value):
85 self.decorated.failfast = value
86 failfast = property(_get_failfast, _set_failfast)
88 def progress(self, offset, whence):
89 return self.decorated.progress(offset, whence)
91 def wasSuccessful(self):
92 return self.decorated.wasSuccessful()
96 return self.decorated.shouldStop
99 return self.decorated.stop()
103 return self.decorated.testsRun
105 def tags(self, new_tags, gone_tags):
106 return self.decorated.tags(new_tags, gone_tags)
108 def time(self, a_datetime):
109 return self.decorated.time(a_datetime)
112 class HookedTestResultDecorator(TestResultDecorator):
113 """A TestResult which calls a hook on every event."""
115 def __init__(self, decorated):
116 self.super = super(HookedTestResultDecorator, self)
117 self.super.__init__(decorated)
119 def startTest(self, test):
121 return self.super.startTest(test)
123 def startTestRun(self):
125 return self.super.startTestRun()
127 def stopTest(self, test):
129 return self.super.stopTest(test)
131 def stopTestRun(self):
133 return self.super.stopTestRun()
135 def addError(self, test, err=None, details=None):
137 return self.super.addError(test, err, details=details)
139 def addFailure(self, test, err=None, details=None):
141 return self.super.addFailure(test, err, details=details)
143 def addSuccess(self, test, details=None):
145 return self.super.addSuccess(test, details=details)
147 def addSkip(self, test, reason=None, details=None):
149 return self.super.addSkip(test, reason, details=details)
151 def addExpectedFailure(self, test, err=None, details=None):
153 return self.super.addExpectedFailure(test, err, details=details)
155 def addUnexpectedSuccess(self, test, details=None):
157 return self.super.addUnexpectedSuccess(test, details=details)
159 def progress(self, offset, whence):
161 return self.super.progress(offset, whence)
163 def wasSuccessful(self):
165 return self.super.wasSuccessful()
168 def shouldStop(self):
170 return self.super.shouldStop
174 return self.super.stop()
176 def time(self, a_datetime):
178 return self.super.time(a_datetime)
181 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
182 """Decorate a TestResult to add time events to a test run.
184 By default this will cause a time event before every test event,
185 but if explicit time data is being provided by the test run, then
186 this decorator will turn itself off to prevent causing confusion.
189 def __init__(self, decorated):
191 super(AutoTimingTestResultDecorator, self).__init__(decorated)
193 def _before_event(self):
197 time = datetime.datetime.utcnow().replace(tzinfo=iso8601.Utc())
198 self.decorated.time(time)
200 def progress(self, offset, whence):
201 return self.decorated.progress(offset, whence)
204 def shouldStop(self):
205 return self.decorated.shouldStop
207 def time(self, a_datetime):
208 """Provide a timestamp for the current test activity.
210 :param a_datetime: If None, automatically add timestamps before every
211 event (this is the default behaviour if time() is not called at
212 all). If not None, pass the provided time onto the decorated
213 result object and disable automatic timestamps.
215 self._time = a_datetime
216 return self.decorated.time(a_datetime)
219 class TagsMixin(object):
224 def _clear_tags(self):
225 self._global_tags = set(), set()
226 self._test_tags = None
228 def _get_active_tags(self):
229 global_new, global_gone = self._global_tags
230 if self._test_tags is None:
231 return set(global_new)
232 test_new, test_gone = self._test_tags
233 return global_new.difference(test_gone).union(test_new)
235 def _get_current_scope(self):
237 return self._test_tags
238 return self._global_tags
240 def _flush_current_scope(self, tag_receiver):
241 new_tags, gone_tags = self._get_current_scope()
242 if new_tags or gone_tags:
243 tag_receiver.tags(new_tags, gone_tags)
245 self._test_tags = set(), set()
247 self._global_tags = set(), set()
249 def startTestRun(self):
252 def startTest(self, test):
253 self._test_tags = set(), set()
255 def stopTest(self, test):
256 self._test_tags = None
258 def tags(self, new_tags, gone_tags):
259 """Handle tag instructions.
261 Adds and removes tags as appropriate. If a test is currently running,
262 tags are not affected for subsequent tests.
264 :param new_tags: Tags to add,
265 :param gone_tags: Tags to remove.
267 current_new_tags, current_gone_tags = self._get_current_scope()
268 current_new_tags.update(new_tags)
269 current_new_tags.difference_update(gone_tags)
270 current_gone_tags.update(gone_tags)
271 current_gone_tags.difference_update(new_tags)
274 class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
275 """Collapses many 'tags' calls into one where possible."""
277 def __init__(self, result):
278 super(TagCollapsingDecorator, self).__init__(result)
281 def _before_event(self):
282 self._flush_current_scope(self.decorated)
284 def tags(self, new_tags, gone_tags):
285 TagsMixin.tags(self, new_tags, gone_tags)
288 class TimeCollapsingDecorator(HookedTestResultDecorator):
289 """Only pass on the first and last of a consecutive sequence of times."""
291 def __init__(self, decorated):
292 super(TimeCollapsingDecorator, self).__init__(decorated)
293 self._last_received_time = None
294 self._last_sent_time = None
296 def _before_event(self):
297 if self._last_received_time is None:
299 if self._last_received_time != self._last_sent_time:
300 self.decorated.time(self._last_received_time)
301 self._last_sent_time = self._last_received_time
302 self._last_received_time = None
304 def time(self, a_time):
305 # Don't upcall, because we don't want to call _before_event, it's only
306 # for non-time events.
307 if self._last_received_time is None:
308 self.decorated.time(a_time)
309 self._last_sent_time = a_time
310 self._last_received_time = a_time
313 def and_predicates(predicates):
314 """Return a predicate that is true iff all predicates are true."""
315 # XXX: Should probably be in testtools to be better used by matchers. jml
316 return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
319 def make_tag_filter(with_tags, without_tags):
320 """Make a callback that checks tests against tags."""
322 with_tags = with_tags and set(with_tags) or None
323 without_tags = without_tags and set(without_tags) or None
325 def check_tags(test, outcome, err, details, tags):
326 if with_tags and not with_tags <= tags:
328 if without_tags and bool(without_tags & tags):
335 class _PredicateFilter(TestResultDecorator, TagsMixin):
337 def __init__(self, result, predicate):
338 super(_PredicateFilter, self).__init__(result)
340 self.decorated = TimeCollapsingDecorator(
341 TagCollapsingDecorator(self.decorated))
342 self._predicate = predicate
343 # The current test (for filtering tags)
344 self._current_test = None
345 # Has the current test been filtered (for outputting test tags)
346 self._current_test_filtered = None
347 # Calls to this result that we don't know whether to forward on yet.
348 self._buffered_calls = []
350 def filter_predicate(self, test, outcome, error, details):
351 return self._predicate(
352 test, outcome, error, details, self._get_active_tags())
354 def addError(self, test, err=None, details=None):
355 if (self.filter_predicate(test, 'error', err, details)):
356 self._buffered_calls.append(
357 ('addError', [test, err], {'details': details}))
361 def addFailure(self, test, err=None, details=None):
362 if (self.filter_predicate(test, 'failure', err, details)):
363 self._buffered_calls.append(
364 ('addFailure', [test, err], {'details': details}))
368 def addSkip(self, test, reason=None, details=None):
369 if (self.filter_predicate(test, 'skip', reason, details)):
370 self._buffered_calls.append(
371 ('addSkip', [test, reason], {'details': details}))
375 def addExpectedFailure(self, test, err=None, details=None):
376 if self.filter_predicate(test, 'expectedfailure', err, details):
377 self._buffered_calls.append(
378 ('addExpectedFailure', [test, err], {'details': details}))
382 def addUnexpectedSuccess(self, test, details=None):
383 self._buffered_calls.append(
384 ('addUnexpectedSuccess', [test], {'details': details}))
386 def addSuccess(self, test, details=None):
387 if (self.filter_predicate(test, 'success', None, details)):
388 self._buffered_calls.append(
389 ('addSuccess', [test], {'details': details}))
394 self._current_test_filtered = True
396 def startTest(self, test):
399 Not directly passed to the client, but used for handling of tags
402 TagsMixin.startTest(self, test)
403 self._current_test = test
404 self._current_test_filtered = False
405 self._buffered_calls.append(('startTest', [test], {}))
407 def stopTest(self, test):
410 Not directly passed to the client, but used for handling of tags
413 if not self._current_test_filtered:
414 for method, args, kwargs in self._buffered_calls:
415 getattr(self.decorated, method)(*args, **kwargs)
416 self.decorated.stopTest(test)
417 self._current_test = None
418 self._current_test_filtered = None
419 self._buffered_calls = []
420 TagsMixin.stopTest(self, test)
422 def tags(self, new_tags, gone_tags):
423 TagsMixin.tags(self, new_tags, gone_tags)
424 if self._current_test is not None:
425 self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
427 return super(_PredicateFilter, self).tags(new_tags, gone_tags)
429 def time(self, a_time):
430 return self.decorated.time(a_time)
432 def id_to_orig_id(self, id):
433 if id.startswith("subunit.RemotedTestCase."):
434 return id[len("subunit.RemotedTestCase."):]
438 class TestResultFilter(TestResultDecorator):
439 """A pyunit TestResult interface implementation which filters tests.
441 Tests that pass the filter are handed on to another TestResult instance
442 for further processing/reporting. To obtain the filtered results,
443 the other instance must be interrogated.
445 :ivar result: The result that tests are passed to after filtering.
446 :ivar filter_predicate: The callback run to decide whether to pass
450 def __init__(self, result, filter_error=False, filter_failure=False,
451 filter_success=True, filter_skip=False, filter_xfail=False,
452 filter_predicate=None, fixup_expected_failures=None):
453 """Create a FilterResult object filtering to result.
455 :param filter_error: Filter out errors.
456 :param filter_failure: Filter out failures.
457 :param filter_success: Filter out successful tests.
458 :param filter_skip: Filter out skipped tests.
459 :param filter_xfail: Filter out expected failure tests.
460 :param filter_predicate: A callable taking (test, outcome, err,
461 details, tags) and returning True if the result should be passed
462 through. err and details may be none if no error or extra
463 metadata is available. outcome is the name of the outcome such
464 as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
465 are still supported but should be updated to accept the tags
466 parameter for efficiency.
467 :param fixup_expected_failures: Set of test ids to consider known
473 lambda t, outcome, e, d, tags: outcome != 'error')
476 lambda t, outcome, e, d, tags: outcome != 'failure')
479 lambda t, outcome, e, d, tags: outcome != 'success')
482 lambda t, outcome, e, d, tags: outcome != 'skip')
485 lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
486 if filter_predicate is not None:
487 def compat(test, outcome, error, details, tags):
488 # 0.0.7 and earlier did not support the 'tags' parameter.
490 return filter_predicate(
491 test, outcome, error, details, tags)
493 return filter_predicate(test, outcome, error, details)
494 predicates.append(compat)
495 predicate = and_predicates(predicates)
496 super(TestResultFilter, self).__init__(
497 _PredicateFilter(result, predicate))
498 if fixup_expected_failures is None:
499 self._fixup_expected_failures = frozenset()
501 self._fixup_expected_failures = fixup_expected_failures
503 def addError(self, test, err=None, details=None):
504 if self._failure_expected(test):
505 self.addExpectedFailure(test, err=err, details=details)
507 super(TestResultFilter, self).addError(
508 test, err=err, details=details)
510 def addFailure(self, test, err=None, details=None):
511 if self._failure_expected(test):
512 self.addExpectedFailure(test, err=err, details=details)
514 super(TestResultFilter, self).addFailure(
515 test, err=err, details=details)
517 def addSuccess(self, test, details=None):
518 if self._failure_expected(test):
519 self.addUnexpectedSuccess(test, details=details)
521 super(TestResultFilter, self).addSuccess(test, details=details)
523 def _failure_expected(self, test):
524 return (test.id() in self._fixup_expected_failures)
527 class TestIdPrintingResult(testtools.TestResult):
529 def __init__(self, stream, show_times=False):
530 """Create a FilterResult object outputting to stream."""
531 super(TestIdPrintingResult, self).__init__()
532 self._stream = stream
533 self.failed_tests = 0
535 self.show_times = show_times
537 self._test_duration = 0
539 def addError(self, test, err):
540 self.failed_tests += 1
543 def addFailure(self, test, err):
544 self.failed_tests += 1
547 def addSuccess(self, test):
550 def addSkip(self, test, reason=None, details=None):
553 def addUnexpectedSuccess(self, test, details=None):
554 self.failed_tests += 1
557 def addExpectedFailure(self, test, err=None, details=None):
560 def reportTest(self, test, duration):
562 seconds = duration.seconds
563 seconds += duration.days * 3600 * 24
564 seconds += duration.microseconds / 1000000.0
565 self._stream.write(test.id() + ' %0.3f\n' % seconds)
567 self._stream.write(test.id() + '\n')
569 def startTest(self, test):
570 self._start_time = self._time()
572 def stopTest(self, test):
573 test_duration = self._time() - self._start_time
574 self.reportTest(self._test, test_duration)
576 def time(self, time):
582 def wasSuccessful(self):
583 "Tells whether or not this result was a success"
584 return self.failed_tests == 0
587 class TestByTestResult(testtools.TestResult):
588 """Call something every time a test completes."""
590 # XXX: In testtools since lp:testtools r249. Once that's released, just
593 def __init__(self, on_test):
594 """Construct a ``TestByTestResult``.
596 :param on_test: A callable that take a test case, a status (one of
597 "success", "failure", "error", "skip", or "xfail"), a start time
598 (a ``datetime`` with timezone), a stop time, an iterable of tags,
599 and a details dict. Is called at the end of each test (i.e. on
600 ``stopTest``) with the accumulated values for that test.
602 super(TestByTestResult, self).__init__()
603 self._on_test = on_test
605 def startTest(self, test):
606 super(TestByTestResult, self).startTest(test)
607 self._start_time = self._now()
608 # There's no supported (i.e. tested) behaviour that relies on these
609 # being set, but it makes me more comfortable all the same. -- jml
612 self._stop_time = None
614 def stopTest(self, test):
615 self._stop_time = self._now()
616 super(TestByTestResult, self).stopTest(test)
620 start_time=self._start_time,
621 stop_time=self._stop_time,
622 # current_tags is new in testtools 0.9.13.
623 tags=getattr(self, 'current_tags', None),
624 details=self._details)
626 def _err_to_details(self, test, err, details):
629 return {'traceback': TracebackContent(err, test)}
631 def addSuccess(self, test, details=None):
632 super(TestByTestResult, self).addSuccess(test)
633 self._status = 'success'
634 self._details = details
636 def addFailure(self, test, err=None, details=None):
637 super(TestByTestResult, self).addFailure(test, err, details)
638 self._status = 'failure'
639 self._details = self._err_to_details(test, err, details)
641 def addError(self, test, err=None, details=None):
642 super(TestByTestResult, self).addError(test, err, details)
643 self._status = 'error'
644 self._details = self._err_to_details(test, err, details)
646 def addSkip(self, test, reason=None, details=None):
647 super(TestByTestResult, self).addSkip(test, reason, details)
648 self._status = 'skip'
650 details = {'reason': text_content(reason)}
652 # XXX: What if details already has 'reason' key?
653 details['reason'] = text_content(reason)
654 self._details = details
656 def addExpectedFailure(self, test, err=None, details=None):
657 super(TestByTestResult, self).addExpectedFailure(test, err, details)
658 self._status = 'xfail'
659 self._details = self._err_to_details(test, err, details)
661 def addUnexpectedSuccess(self, test, details=None):
662 super(TestByTestResult, self).addUnexpectedSuccess(test, details)
663 self._status = 'success'
664 self._details = details
667 class CsvResult(TestByTestResult):
669 def __init__(self, stream):
670 super(CsvResult, self).__init__(self._on_test)
671 self._write_row = csv.writer(stream).writerow
673 def _on_test(self, test, status, start_time, stop_time, tags, details):
674 self._write_row([test.id(), status, start_time, stop_time])
676 def startTestRun(self):
677 super(CsvResult, self).startTestRun()
678 self._write_row(['test', 'status', 'start_time', 'stop_time'])