2 # subunit: extensions to python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Thomi Richards <thomi.richards@canonical.com>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
19 from collections import namedtuple
21 from functools import partial
22 from io import BytesIO
23 from testtools import TestCase
24 from testtools.matchers import (
29 from testtools.testresult.doubles import StreamResult
31 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
32 from subunit._output import (
35 translate_command_name,
38 import subunit._output as _o
41 class SafeArgumentParser(argparse.ArgumentParser):
43 def exit(self, status=0, message=""):
44 raise RuntimeError("ArgumentParser requested to exit with status "\
45 " %d and message %r" % (status, message))
48 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
51 class OutputFilterArgumentTests(TestCase):
53 """Tests for the command line argument parser."""
55 _all_supported_commands = ('start', 'pass', 'fail', 'skip', 'exists')
57 def _test_command(self, command, test_id):
58 args = safe_parse_arguments(args=[command, test_id])
60 self.assertThat(args.action, Equals(command))
61 self.assertThat(args.test_id, Equals(test_id))
63 def test_can_parse_all_commands_with_test_id(self):
64 for command in self._all_supported_commands:
65 self._test_command(command, self.getUniqueString())
67 def test_command_translation(self):
68 self.assertThat(translate_command_name('start'), Equals('inprogress'))
69 self.assertThat(translate_command_name('pass'), Equals('success'))
70 for command in ('fail', 'skip', 'exists'):
71 self.assertThat(translate_command_name(command), Equals(command))
73 def test_all_commands_parse_file_attachment(self):
74 for command in self._all_supported_commands:
75 args = safe_parse_arguments(
76 args=[command, 'foo', '--attach-file', '/some/path']
78 self.assertThat(args.attach_file, Equals('/some/path'))
81 class ByteStreamCompatibilityTests(TestCase):
83 _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
86 super(ByteStreamCompatibilityTests, self).setUp()
87 self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
89 def _get_result_for(self, *commands):
90 """Get a result object from *commands.
92 Runs the 'generate_bytestream' function from subunit._output after
93 parsing *commands as if they were specified on the command line. The
94 resulting bytestream is then converted back into a result object and
100 for command_list in commands:
101 args = safe_parse_arguments(command_list)
102 output_writer = StreamResultToBytes(output_stream=stream)
103 generate_bytestream(args, output_writer)
107 case = ByteStreamToStreamResult(source=stream)
108 result = StreamResult()
112 def test_start_generates_inprogress(self):
113 result = self._get_result_for(
122 test_status='inprogress',
123 timestamp=self._dummy_timestamp,
127 def test_pass_generates_success(self):
128 result = self._get_result_for(
137 test_status='success',
138 timestamp=self._dummy_timestamp,
142 def test_fail_generates_fail(self):
143 result = self._get_result_for(
153 timestamp=self._dummy_timestamp,
157 def test_skip_generates_skip(self):
158 result = self._get_result_for(
168 timestamp=self._dummy_timestamp,
172 def test_exists_generates_exists(self):
173 result = self._get_result_for(
182 test_status='exists',
183 timestamp=self._dummy_timestamp,
188 class MatchesCall(Matcher):
204 def __init__(self, **kwargs):
205 unknown_kwargs = filter(
206 lambda k: k not in self._position_lookup,
210 raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
211 self._filters = kwargs
213 def match(self, call_tuple):
214 for k,v in self._filters.items():
216 if call_tuple[self._position_lookup[k]] != v:
217 return Mismatch("Value for key is %r, not %r" % (self._position_lookup[k], v))
219 return Mismatch("Key %s is not present." % k)
222 return "<MatchesCall %r>" % self._filters