subunit: Update to latest upstream version.
[metze/samba/wip.git] / lib / subunit / python / subunit / tests / test_test_results.py
index fe82c04b065d81502de6977ad60fc25fd63b4895..236dfa22e51f0b15474ff5aae236b7feef86c48a 100644 (file)
@@ -6,7 +6,7 @@
 #  license at the users choice. A copy of both licenses are available in the
 #  project source as Apache-2.0 and BSD. You may not use this file except in
 #  compliance with one of these two licences.
-#  
+#
 #  Unless required by applicable law or agreed to in writing, software
 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
 #  limitations under that license.
 #
 
+import csv
 import datetime
-import unittest
-from StringIO import StringIO
-import os
 import sys
+import unittest
 
-from testtools.content_type import ContentType
-from testtools.content import Content
+from testtools import TestCase
+from testtools.compat import StringIO
+from testtools.content import (
+    text_content,
+    TracebackContent,
+    )
+from testtools.testresult.doubles import ExtendedTestResult
 
 import subunit
 import subunit.iso8601 as iso8601
 import subunit.test_results
 
+import testtools
+
 
 class LoggingDecorator(subunit.test_results.HookedTestResultDecorator):
 
@@ -82,22 +88,22 @@ class TestHookedTestResultDecorator(unittest.TestCase):
 
     def test_startTest(self):
         self.result.startTest(self)
-        
+
     def test_startTestRun(self):
         self.result.startTestRun()
-        
+
     def test_stopTest(self):
         self.result.stopTest(self)
-        
+
     def test_stopTestRun(self):
         self.result.stopTestRun()
 
     def test_addError(self):
         self.result.addError(self, subunit.RemoteError())
-        
+
     def test_addError_details(self):
         self.result.addError(self, details={})
-        
+
     def test_addFailure(self):
         self.result.addFailure(self, subunit.RemoteError())
 
@@ -142,7 +148,7 @@ class TestHookedTestResultDecorator(unittest.TestCase):
 
     def test_time(self):
         self.result.time(None)
+
 
 class TestAutoTimingTestResultDecorator(unittest.TestCase):
 
@@ -193,6 +199,367 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
         self.assertNotEqual(None, self.decorated._calls[2])
 
 
+class TestTagCollapsingDecorator(TestCase):
+
+    def test_tags_collapsed_outside_of_tests(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set())
+        tag_collapser.startTest(self)
+        self.assertEquals(
+            [('tags', set(['a', 'b']), set([])),
+             ('startTest', self),
+             ], result._events)
+
+    def test_tags_collapsed_outside_of_tests_are_flushed(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.startTestRun()
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set())
+        tag_collapser.startTest(self)
+        tag_collapser.addSuccess(self)
+        tag_collapser.stopTest(self)
+        tag_collapser.stopTestRun()
+        self.assertEquals(
+            [('startTestRun',),
+             ('tags', set(['a', 'b']), set([])),
+             ('startTest', self),
+             ('addSuccess', self),
+             ('stopTest', self),
+             ('stopTestRun',),
+             ], result._events)
+
+    def test_tags_forwarded_after_tests(self):
+        test = subunit.RemotedTestCase('foo')
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        tag_collapser.startTestRun()
+        tag_collapser.startTest(test)
+        tag_collapser.addSuccess(test)
+        tag_collapser.stopTest(test)
+        tag_collapser.tags(set(['a']), set(['b']))
+        tag_collapser.stopTestRun()
+        self.assertEqual(
+            [('startTestRun',),
+             ('startTest', test),
+             ('addSuccess', test),
+             ('stopTest', test),
+             ('tags', set(['a']), set(['b'])),
+             ('stopTestRun',),
+             ],
+            result._events)
+
+    def test_tags_collapsed_inside_of_tests(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.tags(set(['b']), set(['a']))
+        tag_collapser.tags(set(['c']), set())
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['b', 'c']), set(['a'])),
+             ('stopTest', test)],
+            result._events)
+
+    def test_tags_collapsed_inside_of_tests_different_ordering(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(), set(['a']))
+        tag_collapser.tags(set(['a', 'b']), set())
+        tag_collapser.tags(set(['c']), set())
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['a', 'b', 'c']), set()),
+             ('stopTest', test)],
+            result._events)
+
+    def test_tags_sent_before_result(self):
+        # Because addSuccess and friends tend to send subunit output
+        # immediately, and because 'tags:' before a result line means
+        # something different to 'tags:' after a result line, we need to be
+        # sure that tags are emitted before 'addSuccess' (or whatever).
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+        test = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(test)
+        tag_collapser.tags(set(['a']), set())
+        tag_collapser.addSuccess(test)
+        tag_collapser.stopTest(test)
+        self.assertEquals(
+            [('startTest', test),
+             ('tags', set(['a']), set()),
+             ('addSuccess', test),
+             ('stopTest', test)],
+            result._events)
+
+
+class TestTimeCollapsingDecorator(TestCase):
+
+    def make_time(self):
+        # Heh heh.
+        return datetime.datetime(
+            2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
+
+    def test_initial_time_forwarded(self):
+        # We always forward the first time event we see.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        tag_collapser.time(a_time)
+        self.assertEquals([('time', a_time)], result._events)
+
+    def test_time_collapsed_to_first_and_last(self):
+        # If there are many consecutive time events, only the first and last
+        # are sent through.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        times = [self.make_time() for i in range(5)]
+        for a_time in times:
+            tag_collapser.time(a_time)
+        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+        self.assertEquals(
+            [('time', times[0]), ('time', times[-1])], result._events[:-1])
+
+    def test_only_one_time_sent(self):
+        # If we receive a single time event followed by a non-time event, we
+        # send exactly one time event.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        tag_collapser.time(a_time)
+        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+        self.assertEquals([('time', a_time)], result._events[:-1])
+
+    def test_duplicate_times_not_sent(self):
+        # Many time events with the exact same time are collapsed into one
+        # time event.
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        for i in range(5):
+            tag_collapser.time(a_time)
+        tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+        self.assertEquals([('time', a_time)], result._events[:-1])
+
+    def test_no_times_inserted(self):
+        result = ExtendedTestResult()
+        tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+        a_time = self.make_time()
+        tag_collapser.time(a_time)
+        foo = subunit.RemotedTestCase('foo')
+        tag_collapser.startTest(foo)
+        tag_collapser.addSuccess(foo)
+        tag_collapser.stopTest(foo)
+        self.assertEquals(
+            [('time', a_time),
+             ('startTest', foo),
+             ('addSuccess', foo),
+             ('stopTest', foo)], result._events)
+
+
+class TestByTestResultTests(testtools.TestCase):
+
+    def setUp(self):
+        super(TestByTestResultTests, self).setUp()
+        self.log = []
+        self.result = subunit.test_results.TestByTestResult(self.on_test)
+        if sys.version_info >= (3, 0):
+            self.result._now = iter(range(5)).__next__
+        else:
+            self.result._now = iter(range(5)).next
+
+    def assertCalled(self, **kwargs):
+        defaults = {
+            'test': self,
+            'tags': set(),
+            'details': None,
+            'start_time': 0,
+            'stop_time': 1,
+            }
+        defaults.update(kwargs)
+        self.assertEqual([defaults], self.log)
+
+    def on_test(self, **kwargs):
+        self.log.append(kwargs)
+
+    def test_no_tests_nothing_reported(self):
+        self.result.startTestRun()
+        self.result.stopTestRun()
+        self.assertEqual([], self.log)
+
+    def test_add_success(self):
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertCalled(status='success')
+
+    def test_add_success_details(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addSuccess(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', details=details)
+
+    def test_tags(self):
+        if not getattr(self.result, 'tags', None):
+            self.skipTest("No tags in testtools")
+        self.result.tags(['foo'], [])
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', tags=set(['foo']))
+
+    def test_add_error(self):
+        self.result.startTest(self)
+        try:
+            1/0
+        except ZeroDivisionError:
+            error = sys.exc_info()
+        self.result.addError(self, error)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='error',
+            details={'traceback': TracebackContent(error, self)})
+
+    def test_add_error_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addError(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='error', details=details)
+
+    def test_add_failure(self):
+        self.result.startTest(self)
+        try:
+            self.fail("intentional failure")
+        except self.failureException:
+            failure = sys.exc_info()
+        self.result.addFailure(self, failure)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='failure',
+            details={'traceback': TracebackContent(failure, self)})
+
+    def test_add_failure_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addFailure(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='failure', details=details)
+
+    def test_add_xfail(self):
+        self.result.startTest(self)
+        try:
+            1/0
+        except ZeroDivisionError:
+            error = sys.exc_info()
+        self.result.addExpectedFailure(self, error)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='xfail',
+            details={'traceback': TracebackContent(error, self)})
+
+    def test_add_xfail_details(self):
+        self.result.startTest(self)
+        details = {"foo": text_content("bar")}
+        self.result.addExpectedFailure(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='xfail', details=details)
+
+    def test_add_unexpected_success(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addUnexpectedSuccess(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='success', details=details)
+
+    def test_add_skip_reason(self):
+        self.result.startTest(self)
+        reason = self.getUniqueString()
+        self.result.addSkip(self, reason)
+        self.result.stopTest(self)
+        self.assertCalled(
+            status='skip', details={'reason': text_content(reason)})
+
+    def test_add_skip_details(self):
+        self.result.startTest(self)
+        details = {'foo': 'bar'}
+        self.result.addSkip(self, details=details)
+        self.result.stopTest(self)
+        self.assertCalled(status='skip', details=details)
+
+    def test_twice(self):
+        self.result.startTest(self)
+        self.result.addSuccess(self, details={'foo': 'bar'})
+        self.result.stopTest(self)
+        self.result.startTest(self)
+        self.result.addSuccess(self)
+        self.result.stopTest(self)
+        self.assertEqual(
+            [{'test': self,
+              'status': 'success',
+              'start_time': 0,
+              'stop_time': 1,
+              'tags': set(),
+              'details': {'foo': 'bar'}},
+             {'test': self,
+              'status': 'success',
+              'start_time': 2,
+              'stop_time': 3,
+              'tags': set(),
+              'details': None},
+             ],
+            self.log)
+
+
+class TestCsvResult(testtools.TestCase):
+
+    def parse_stream(self, stream):
+        stream.seek(0)
+        reader = csv.reader(stream)
+        return list(reader)
+
+    def test_csv_output(self):
+        stream = StringIO()
+        result = subunit.test_results.CsvResult(stream)
+        if sys.version_info >= (3, 0):
+            result._now = iter(range(5)).__next__
+        else:
+            result._now = iter(range(5)).next
+        result.startTestRun()
+        result.startTest(self)
+        result.addSuccess(self)
+        result.stopTest(self)
+        result.stopTestRun()
+        self.assertEqual(
+            [['test', 'status', 'start_time', 'stop_time'],
+             [self.id(), 'success', '0', '1'],
+             ],
+            self.parse_stream(stream))
+
+    def test_just_header_when_no_tests(self):
+        stream = StringIO()
+        result = subunit.test_results.CsvResult(stream)
+        result.startTestRun()
+        result.stopTestRun()
+        self.assertEqual(
+            [['test', 'status', 'start_time', 'stop_time']],
+            self.parse_stream(stream))
+
+    def test_no_output_before_events(self):
+        stream = StringIO()
+        subunit.test_results.CsvResult(stream)
+        self.assertEqual([], self.parse_stream(stream))
+
+
 def test_suite():
     loader = subunit.tests.TestUtil.TestLoader()
     result = loader.loadTestsFromName(__name__)