Add StreamTagger, for tagging StreamResult events.
authorRobert Collins <robertc@robertcollins.net>
Sat, 16 Mar 2013 06:45:05 +0000 (19:45 +1300)
committerRobert Collins <robertc@robertcollins.net>
Sat, 16 Mar 2013 06:45:09 +0000 (19:45 +1300)
NEWS
doc/for-framework-folk.rst
testtools/__init__.py
testtools/testresult/__init__.py
testtools/testresult/real.py
testtools/tests/test_testresult.py

diff --git a/NEWS b/NEWS
index 4f997596d5c465dbac73fe54cb58778a51b268c4..b781bd05578eb5189b2f4a53a7133427cb4fe8c2 100644 (file)
--- a/NEWS
+++ b/NEWS
@@ -31,6 +31,9 @@ Improvements
 * New support class ``StreamSummary`` which summarises a ``StreamResult``
   stream compatibly with ``TestResult`` code. (Robert Collins)
 
+* New support class ``StreamTagger`` which adds or removes tags from
+  ``StreamResult`` events. (RobertCollins)
+
 * New support class ``StreamToDict`` which converts a ``StreamResult`` to a
   series of dicts describing a test. Useful for writing trivial stream
   analysers. (Robert Collins)
index 5c9ffe83fdf9ffd0b0fed052548feb554c5a5a60..407be3641023aadcfc5914b7a487cd8b27e5e2f0 100644 (file)
@@ -187,6 +187,18 @@ Aborting multiple workers in a distributed environment requires hooking
 whatever signalling mechanism the distributed environment has up to a
 ``TestControl`` in each worker process.
 
+StreamTagger
+------------
+
+A ``StreamResult`` filter that adds or removes tags from events::
+
+    >>> from testtools import StreamTagger
+    >>> sink = StreamResult()
+    >>> result = StreamTagger([sink], set(['add']), set(['discard']))
+    >>> result.startTestRun()
+    >>> # Run tests against result here.
+    >>> result.stopTestRun()
+
 StreamToDict
 ------------
 
index 2bde458d05d2da351cfa93cd99efc21ca9a8206a..faab2fb15abd12ecc62ceba3ae2e1ea39f23669b 100644 (file)
@@ -31,6 +31,7 @@ __all__ = [
     'StreamFailFast',
     'StreamResult',
     'StreamSummary',
+    'StreamTagger',
     'StreamToDict',
     'StreamToExtendedDecorator',
     'TestControl',
@@ -81,6 +82,7 @@ else:
         StreamFailFast,
         StreamResult,
         StreamSummary,
+        StreamTagger,
         StreamToDict,
         StreamToExtendedDecorator,
         Tagger,
index 404aa3c6def9859e65552d891f0d1a9512701d91..f4f302e76ef559dde0ef3072274ee76c93ffa45f 100644 (file)
@@ -10,6 +10,7 @@ __all__ = [
     'StreamFailFast',
     'StreamResult',
     'StreamSummary',
+    'StreamTagger',
     'StreamToDict',
     'StreamToExtendedDecorator',
     'Tagger',
@@ -29,6 +30,7 @@ from testtools.testresult.real import (
     StreamFailFast,
     StreamResult,
     StreamSummary,
+    StreamTagger,
     StreamToDict,
     StreamToExtendedDecorator,
     Tagger,
index d9369d27f1bad1cb5ff36c3a5cdc26798bc80cc6..f1379cbb04a602e5ce44c95121daa558a0286cf9 100644 (file)
@@ -10,6 +10,7 @@ __all__ = [
     'StreamFailFast',
     'StreamResult',
     'StreamSummary',
+    'StreamTagger',
     'StreamToDict',
     'StreamToExtendedDecorator',
     'Tagger',
@@ -423,6 +424,29 @@ class StreamFailFast(StreamResult):
             self.on_error()
 
 
+class StreamTagger(CopyStreamResult):
+    """Adds or discards tags from StreamResult events."""
+
+    def __init__(self, targets, add=None, discard=None):
+        """Create a StreamTagger.
+
+        :param targets: A list of targets to forward events onto.
+        :param add: Either None or an iterable of tags to add to each event.
+        :param discard: Either None or an iterable of tags to discard from each
+            event.
+        """
+        super(StreamTagger, self).__init__(targets)
+        self.add = frozenset(add or ())
+        self.discard = frozenset(discard or ())
+
+    def status(self, *args, **kwargs):
+        test_tags = kwargs.get('test_tags') or set()
+        test_tags.update(self.add)
+        test_tags.difference_update(self.discard)
+        kwargs['test_tags'] = test_tags or None
+        super(StreamTagger, self).status(*args, **kwargs)
+
+
 class StreamToDict(StreamResult):
     """A specialised StreamResult that emits a callback as tests complete.
 
index c35a231869ae0b82096442e468d7411724802a8b..99a804eac049268bc7d246715a123499e37eea21 100644 (file)
@@ -27,6 +27,7 @@ from testtools import (
     StreamFailFast,
     StreamResult,
     StreamSummary,
+    StreamTagger,
     StreamToDict,
     StreamToExtendedDecorator,
     Tagger,
@@ -563,6 +564,12 @@ class TestStreamSummaryResultContract(TestCase, TestStreamResultContract):
         return StreamSummary()
 
 
+class TestStreamTaggerContract(TestCase, TestStreamResultContract):
+
+    def _make_result(self):
+        return StreamTagger([StreamResult()], add=set(), discard=set())
+
+
 class TestStreamToDictContract(TestCase, TestStreamResultContract):
 
     def _make_result(self):
@@ -649,6 +656,45 @@ class TestCopyStreamResultCopies(TestCase):
                 ])))
 
 
+class TestStreamTagger(TestCase):
+
+    def test_adding(self):
+        log = LoggingStreamResult()
+        result = StreamTagger([log], add=['foo'])
+        result.startTestRun()
+        result.status()
+        result.status(test_tags=set(['bar']))
+        result.status(test_tags=None)
+        result.stopTestRun()
+        self.assertEqual([
+            ('startTestRun',),
+            ('status', None, None, set(['foo']), True, None, None, False, None, None, None),
+            ('status', None, None, set(['foo', 'bar']), True, None, None, False, None, None, None),
+            ('status', None, None, set(['foo']), True, None, None, False, None, None, None),
+            ('stopTestRun',),
+            ], log._events)
+
+    def test_discarding(self):
+        log = LoggingStreamResult()
+        result = StreamTagger([log], discard=['foo'])
+        result.startTestRun()
+        result.status()
+        result.status(test_tags=None)
+        result.status(test_tags=set(['foo']))
+        result.status(test_tags=set(['bar']))
+        result.status(test_tags=set(['foo', 'bar']))
+        result.stopTestRun()
+        self.assertEqual([
+            ('startTestRun',),
+            ('status', None, None, None, True, None, None, False, None, None, None),
+            ('status', None, None, None, True, None, None, False, None, None, None),
+            ('status', None, None, None, True, None, None, False, None, None, None),
+            ('status', None, None, set(['bar']), True, None, None, False, None, None, None),
+            ('status', None, None, set(['bar']), True, None, None, False, None, None, None),
+            ('stopTestRun',),
+            ], log._events)
+
+
 class TestStreamToDict(TestCase):
 
     def test_hung_test(self):