subunit: Update to latest upstream version.
[metze/samba/wip.git] / lib / subunit / python / subunit / test_results.py
index 33fb50e07306ad60a27739473d2cbe9bf579c4c8..c00a2d3e9706cf3c2011b1c35856e63ff08402b7 100644 (file)
 
 """TestResult helper classes used to by subunit."""
 
+import csv
 import datetime
 
 import testtools
+from testtools.compat import all
+from testtools.content import (
+    text_content,
+    TracebackContent,
+    )
 
 from subunit import iso8601
 
@@ -34,6 +40,9 @@ class TestResultDecorator(object):
     or features by degrading them.
     """
 
+    # XXX: Since lp:testtools r250, this is in testtools. Once it's released,
+    # we should gut this and just use that.
+
     def __init__(self, decorated):
         """Create a TestResultDecorator forwarding to decorated."""
         # Make every decorator degrade gracefully.
@@ -200,34 +209,44 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
         return self.decorated.time(a_datetime)
 
 
-class TagCollapsingDecorator(TestResultDecorator):
-    """Collapses many 'tags' calls into one where possible."""
+class TagsMixin(object):
 
-    def __init__(self, result):
-        super(TagCollapsingDecorator, self).__init__(result)
-        # The (new, gone) tags for the current test.
-        self._current_test_tags = None
+    def __init__(self):
+        self._clear_tags()
 
-    def startTest(self, test):
-        """Start a test.
+    def _clear_tags(self):
+        self._global_tags = set(), set()
+        self._test_tags = None
 
-        Not directly passed to the client, but used for handling of tags
-        correctly.
-        """
-        self.decorated.startTest(test)
-        self._current_test_tags = set(), set()
+    def _get_active_tags(self):
+        global_new, global_gone = self._global_tags
+        if self._test_tags is None:
+            return set(global_new)
+        test_new, test_gone = self._test_tags
+        return global_new.difference(test_gone).union(test_new)
 
-    def stopTest(self, test):
-        """Stop a test.
+    def _get_current_scope(self):
+        if self._test_tags:
+            return self._test_tags
+        return self._global_tags
 
-        Not directly passed to the client, but used for handling of tags
-        correctly.
-        """
-        # Tags to output for this test.
-        if self._current_test_tags[0] or self._current_test_tags[1]:
-            self.decorated.tags(*self._current_test_tags)
-        self.decorated.stopTest(test)
-        self._current_test_tags = None
+    def _flush_current_scope(self, tag_receiver):
+        new_tags, gone_tags = self._get_current_scope()
+        if new_tags or gone_tags:
+            tag_receiver.tags(new_tags, gone_tags)
+        if self._test_tags:
+            self._test_tags = set(), set()
+        else:
+            self._global_tags = set(), set()
+
+    def startTestRun(self):
+        self._clear_tags()
+
+    def startTest(self, test):
+        self._test_tags = set(), set()
+
+    def stopTest(self, test):
+        self._test_tags = None
 
     def tags(self, new_tags, gone_tags):
         """Handle tag instructions.
@@ -238,14 +257,25 @@ class TagCollapsingDecorator(TestResultDecorator):
         :param new_tags: Tags to add,
         :param gone_tags: Tags to remove.
         """
-        if self._current_test_tags is not None:
-            # gather the tags until the test stops.
-            self._current_test_tags[0].update(new_tags)
-            self._current_test_tags[0].difference_update(gone_tags)
-            self._current_test_tags[1].update(gone_tags)
-            self._current_test_tags[1].difference_update(new_tags)
-        else:
-            return self.decorated.tags(new_tags, gone_tags)
+        current_new_tags, current_gone_tags = self._get_current_scope()
+        current_new_tags.update(new_tags)
+        current_new_tags.difference_update(gone_tags)
+        current_gone_tags.update(gone_tags)
+        current_gone_tags.difference_update(new_tags)
+
+
+class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin):
+    """Collapses many 'tags' calls into one where possible."""
+
+    def __init__(self, result):
+        super(TagCollapsingDecorator, self).__init__(result)
+        self._clear_tags()
+
+    def _before_event(self):
+        self._flush_current_scope(self.decorated)
+
+    def tags(self, new_tags, gone_tags):
+        TagsMixin.tags(self, new_tags, gone_tags)
 
 
 class TimeCollapsingDecorator(HookedTestResultDecorator):
@@ -273,93 +303,58 @@ class TimeCollapsingDecorator(HookedTestResultDecorator):
         self._last_received_time = a_time
 
 
-def all_true(bools):
-    """Return True if all of 'bools' are True. False otherwise."""
-    for b in bools:
-        if not b:
-            return False
-    return True
+def and_predicates(predicates):
+    """Return a predicate that is true iff all predicates are true."""
+    # XXX: Should probably be in testtools to be better used by matchers. jml
+    return lambda *args, **kwargs: all(p(*args, **kwargs) for p in predicates)
 
 
-class TestResultFilter(TestResultDecorator):
-    """A pyunit TestResult interface implementation which filters tests.
+def make_tag_filter(with_tags, without_tags):
+    """Make a callback that checks tests against tags."""
 
-    Tests that pass the filter are handed on to another TestResult instance
-    for further processing/reporting. To obtain the filtered results,
-    the other instance must be interrogated.
+    with_tags = with_tags and set(with_tags) or None
+    without_tags = without_tags and set(without_tags) or None
 
-    :ivar result: The result that tests are passed to after filtering.
-    :ivar filter_predicate: The callback run to decide whether to pass
-        a result.
-    """
+    def check_tags(test, outcome, err, details, tags):
+        if with_tags and not with_tags <= tags:
+            return False
+        if without_tags and bool(without_tags & tags):
+            return False
+        return True
 
-    def __init__(self, result, filter_error=False, filter_failure=False,
-        filter_success=True, filter_skip=False, filter_xfail=False,
-        filter_predicate=None, fixup_expected_failures=None):
-        """Create a FilterResult object filtering to result.
+    return check_tags
 
-        :param filter_error: Filter out errors.
-        :param filter_failure: Filter out failures.
-        :param filter_success: Filter out successful tests.
-        :param filter_skip: Filter out skipped tests.
-        :param filter_xfail: Filter out expected failure tests.
-        :param filter_predicate: A callable taking (test, outcome, err,
-            details) and returning True if the result should be passed
-            through.  err and details may be none if no error or extra
-            metadata is available. outcome is the name of the outcome such
-            as 'success' or 'failure'.
-        :param fixup_expected_failures: Set of test ids to consider known
-            failing.
-        """
-        super(TestResultFilter, self).__init__(result)
+
+class _PredicateFilter(TestResultDecorator, TagsMixin):
+
+    def __init__(self, result, predicate):
+        super(_PredicateFilter, self).__init__(result)
+        self._clear_tags()
         self.decorated = TimeCollapsingDecorator(
             TagCollapsingDecorator(self.decorated))
-        predicates = []
-        if filter_error:
-            predicates.append(lambda t, outcome, e, d: outcome != 'error')
-        if filter_failure:
-            predicates.append(lambda t, outcome, e, d: outcome != 'failure')
-        if filter_success:
-            predicates.append(lambda t, outcome, e, d: outcome != 'success')
-        if filter_skip:
-            predicates.append(lambda t, outcome, e, d: outcome != 'skip')
-        if filter_xfail:
-            predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
-        if filter_predicate is not None:
-            predicates.append(filter_predicate)
-        self.filter_predicate = (
-            lambda test, outcome, err, details:
-                all_true(p(test, outcome, err, details) for p in predicates))
+        self._predicate = predicate
         # The current test (for filtering tags)
         self._current_test = None
         # Has the current test been filtered (for outputting test tags)
         self._current_test_filtered = None
         # Calls to this result that we don't know whether to forward on yet.
         self._buffered_calls = []
-        if fixup_expected_failures is None:
-            self._fixup_expected_failures = frozenset()
-        else:
-            self._fixup_expected_failures = fixup_expected_failures
+
+    def filter_predicate(self, test, outcome, error, details):
+        return self._predicate(
+            test, outcome, error, details, self._get_active_tags())
 
     def addError(self, test, err=None, details=None):
         if (self.filter_predicate(test, 'error', err, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addExpectedFailure', [test, err], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addError', [test, err], {'details': details}))
+            self._buffered_calls.append(
+                ('addError', [test, err], {'details': details}))
         else:
             self._filtered()
 
     def addFailure(self, test, err=None, details=None):
         if (self.filter_predicate(test, 'failure', err, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addExpectedFailure', [test, err], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addFailure', [test, err], {'details': details}))
+            self._buffered_calls.append(
+                ('addFailure', [test, err], {'details': details}))
         else:
             self._filtered()
 
@@ -370,17 +365,6 @@ class TestResultFilter(TestResultDecorator):
         else:
             self._filtered()
 
-    def addSuccess(self, test, details=None):
-        if (self.filter_predicate(test, 'success', None, details)):
-            if self._failure_expected(test):
-                self._buffered_calls.append(
-                    ('addUnexpectedSuccess', [test], {'details': details}))
-            else:
-                self._buffered_calls.append(
-                    ('addSuccess', [test], {'details': details}))
-        else:
-            self._filtered()
-
     def addExpectedFailure(self, test, err=None, details=None):
         if self.filter_predicate(test, 'expectedfailure', err, details):
             self._buffered_calls.append(
@@ -392,18 +376,23 @@ class TestResultFilter(TestResultDecorator):
         self._buffered_calls.append(
             ('addUnexpectedSuccess', [test], {'details': details}))
 
+    def addSuccess(self, test, details=None):
+        if (self.filter_predicate(test, 'success', None, details)):
+            self._buffered_calls.append(
+                ('addSuccess', [test], {'details': details}))
+        else:
+            self._filtered()
+
     def _filtered(self):
         self._current_test_filtered = True
 
-    def _failure_expected(self, test):
-        return (test.id() in self._fixup_expected_failures)
-
     def startTest(self, test):
         """Start a test.
 
         Not directly passed to the client, but used for handling of tags
         correctly.
         """
+        TagsMixin.startTest(self, test)
         self._current_test = test
         self._current_test_filtered = False
         self._buffered_calls.append(('startTest', [test], {}))
@@ -415,19 +404,23 @@ class TestResultFilter(TestResultDecorator):
         correctly.
         """
         if not self._current_test_filtered:
-            # Tags to output for this test.
             for method, args, kwargs in self._buffered_calls:
                 getattr(self.decorated, method)(*args, **kwargs)
             self.decorated.stopTest(test)
         self._current_test = None
         self._current_test_filtered = None
         self._buffered_calls = []
+        TagsMixin.stopTest(self, test)
 
-    def time(self, a_time):
+    def tags(self, new_tags, gone_tags):
+        TagsMixin.tags(self, new_tags, gone_tags)
         if self._current_test is not None:
-            self._buffered_calls.append(('time', [a_time], {}))
+            self._buffered_calls.append(('tags', [new_tags, gone_tags], {}))
         else:
-            return self.decorated.time(a_time)
+            return super(_PredicateFilter, self).tags(new_tags, gone_tags)
+
+    def time(self, a_time):
+        return self.decorated.time(a_time)
 
     def id_to_orig_id(self, id):
         if id.startswith("subunit.RemotedTestCase."):
@@ -435,6 +428,95 @@ class TestResultFilter(TestResultDecorator):
         return id
 
 
+class TestResultFilter(TestResultDecorator):
+    """A pyunit TestResult interface implementation which filters tests.
+
+    Tests that pass the filter are handed on to another TestResult instance
+    for further processing/reporting. To obtain the filtered results,
+    the other instance must be interrogated.
+
+    :ivar result: The result that tests are passed to after filtering.
+    :ivar filter_predicate: The callback run to decide whether to pass
+        a result.
+    """
+
+    def __init__(self, result, filter_error=False, filter_failure=False,
+        filter_success=True, filter_skip=False, filter_xfail=False,
+        filter_predicate=None, fixup_expected_failures=None):
+        """Create a FilterResult object filtering to result.
+
+        :param filter_error: Filter out errors.
+        :param filter_failure: Filter out failures.
+        :param filter_success: Filter out successful tests.
+        :param filter_skip: Filter out skipped tests.
+        :param filter_xfail: Filter out expected failure tests.
+        :param filter_predicate: A callable taking (test, outcome, err,
+            details, tags) and returning True if the result should be passed
+            through.  err and details may be none if no error or extra
+            metadata is available. outcome is the name of the outcome such
+            as 'success' or 'failure'. tags is new in 0.0.8; 0.0.7 filters
+            are still supported but should be updated to accept the tags
+            parameter for efficiency.
+        :param fixup_expected_failures: Set of test ids to consider known
+            failing.
+        """
+        predicates = []
+        if filter_error:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'error')
+        if filter_failure:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'failure')
+        if filter_success:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'success')
+        if filter_skip:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'skip')
+        if filter_xfail:
+            predicates.append(
+                lambda t, outcome, e, d, tags: outcome != 'expectedfailure')
+        if filter_predicate is not None:
+            def compat(test, outcome, error, details, tags):
+                # 0.0.7 and earlier did not support the 'tags' parameter.
+                try:
+                    return filter_predicate(
+                        test, outcome, error, details, tags)
+                except TypeError:
+                    return filter_predicate(test, outcome, error, details)
+            predicates.append(compat)
+        predicate = and_predicates(predicates)
+        super(TestResultFilter, self).__init__(
+            _PredicateFilter(result, predicate))
+        if fixup_expected_failures is None:
+            self._fixup_expected_failures = frozenset()
+        else:
+            self._fixup_expected_failures = fixup_expected_failures
+
+    def addError(self, test, err=None, details=None):
+        if self._failure_expected(test):
+            self.addExpectedFailure(test, err=err, details=details)
+        else:
+            super(TestResultFilter, self).addError(
+                test, err=err, details=details)
+
+    def addFailure(self, test, err=None, details=None):
+        if self._failure_expected(test):
+            self.addExpectedFailure(test, err=err, details=details)
+        else:
+            super(TestResultFilter, self).addFailure(
+                test, err=err, details=details)
+
+    def addSuccess(self, test, details=None):
+        if self._failure_expected(test):
+            self.addUnexpectedSuccess(test, details=details)
+        else:
+            super(TestResultFilter, self).addSuccess(test, details=details)
+
+    def _failure_expected(self, test):
+        return (test.id() in self._fixup_expected_failures)
+
+
 class TestIdPrintingResult(testtools.TestResult):
 
     def __init__(self, stream, show_times=False):
@@ -493,3 +575,97 @@ class TestIdPrintingResult(testtools.TestResult):
     def wasSuccessful(self):
         "Tells whether or not this result was a success"
         return self.failed_tests == 0
+
+
+class TestByTestResult(testtools.TestResult):
+    """Call something every time a test completes."""
+
+# XXX: In testtools since lp:testtools r249.  Once that's released, just
+# import that.
+
+    def __init__(self, on_test):
+        """Construct a ``TestByTestResult``.
+
+        :param on_test: A callable that take a test case, a status (one of
+            "success", "failure", "error", "skip", or "xfail"), a start time
+            (a ``datetime`` with timezone), a stop time, an iterable of tags,
+            and a details dict. Is called at the end of each test (i.e. on
+            ``stopTest``) with the accumulated values for that test.
+        """
+        super(TestByTestResult, self).__init__()
+        self._on_test = on_test
+
+    def startTest(self, test):
+        super(TestByTestResult, self).startTest(test)
+        self._start_time = self._now()
+        # There's no supported (i.e. tested) behaviour that relies on these
+        # being set, but it makes me more comfortable all the same. -- jml
+        self._status = None
+        self._details = None
+        self._stop_time = None
+
+    def stopTest(self, test):
+        self._stop_time = self._now()
+        super(TestByTestResult, self).stopTest(test)
+        self._on_test(
+            test=test,
+            status=self._status,
+            start_time=self._start_time,
+            stop_time=self._stop_time,
+            # current_tags is new in testtools 0.9.13.
+            tags=getattr(self, 'current_tags', None),
+            details=self._details)
+
+    def _err_to_details(self, test, err, details):
+        if details:
+            return details
+        return {'traceback': TracebackContent(err, test)}
+
+    def addSuccess(self, test, details=None):
+        super(TestByTestResult, self).addSuccess(test)
+        self._status = 'success'
+        self._details = details
+
+    def addFailure(self, test, err=None, details=None):
+        super(TestByTestResult, self).addFailure(test, err, details)
+        self._status = 'failure'
+        self._details = self._err_to_details(test, err, details)
+
+    def addError(self, test, err=None, details=None):
+        super(TestByTestResult, self).addError(test, err, details)
+        self._status = 'error'
+        self._details = self._err_to_details(test, err, details)
+
+    def addSkip(self, test, reason=None, details=None):
+        super(TestByTestResult, self).addSkip(test, reason, details)
+        self._status = 'skip'
+        if details is None:
+            details = {'reason': text_content(reason)}
+        elif reason:
+            # XXX: What if details already has 'reason' key?
+            details['reason'] = text_content(reason)
+        self._details = details
+
+    def addExpectedFailure(self, test, err=None, details=None):
+        super(TestByTestResult, self).addExpectedFailure(test, err, details)
+        self._status = 'xfail'
+        self._details = self._err_to_details(test, err, details)
+
+    def addUnexpectedSuccess(self, test, details=None):
+        super(TestByTestResult, self).addUnexpectedSuccess(test, details)
+        self._status = 'success'
+        self._details = details
+
+
+class CsvResult(TestByTestResult):
+
+    def __init__(self, stream):
+        super(CsvResult, self).__init__(self._on_test)
+        self._write_row = csv.writer(stream).writerow
+
+    def _on_test(self, test, status, start_time, stop_time, tags, details):
+        self._write_row([test.id(), status, start_time, stop_time])
+
+    def startTestRun(self):
+        super(CsvResult, self).startTestRun()
+        self._write_row(['test', 'status', 'start_time', 'stop_time'])