Allow customisation of argument parser class used, so we can write failing tests...
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Thomi Richards <thomi.richards@canonical.com>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17
18 import argparse
19 from collections import namedtuple
20 import datetime
21 from functools import partial
22 from io import BytesIO
23 from testtools import TestCase
24 from testtools.matchers import (
25     Equals,
26     Matcher,
27     Mismatch,
28 )
29 from testtools.testresult.doubles import StreamResult
30
31 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
32 from subunit._output import (
33     generate_bytestream,
34     parse_arguments,
35     translate_command_name,
36     utc,
37 )
38 import subunit._output as _o
39
40
41 class SafeArgumentParser(argparse.ArgumentParser):
42
43     def exit(self, status=0, message=""):
44         raise RuntimeError("ArgumentParser requested to exit with status "\
45             " %d and message %r" % (status, message))
46
47
48 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeArgumentParser)
49
50
51 class OutputFilterArgumentTests(TestCase):
52
53     """Tests for the command line argument parser."""
54
55     _all_supported_commands = ('start', 'pass', 'fail', 'skip', 'exists')
56
57     def _test_command(self, command, test_id):
58         args = safe_parse_arguments(args=[command, test_id])
59
60         self.assertThat(args.action, Equals(command))
61         self.assertThat(args.test_id, Equals(test_id))
62
63     def test_can_parse_all_commands_with_test_id(self):
64         for command in self._all_supported_commands:
65             self._test_command(command, self.getUniqueString())
66
67     def test_command_translation(self):
68         self.assertThat(translate_command_name('start'), Equals('inprogress'))
69         self.assertThat(translate_command_name('pass'), Equals('success'))
70         for command in ('fail', 'skip', 'exists'):
71             self.assertThat(translate_command_name(command), Equals(command))
72
73     def test_all_commands_parse_file_attachment(self):
74         for command in self._all_supported_commands:
75             args = safe_parse_arguments(
76                 args=[command, 'foo', '--attach-file', '/some/path']
77             )
78             self.assertThat(args.attach_file, Equals('/some/path'))
79
80
81 class ByteStreamCompatibilityTests(TestCase):
82
83     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, utc)
84
85     def setUp(self):
86         super(ByteStreamCompatibilityTests, self).setUp()
87         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
88
89     def _get_result_for(self, *commands):
90         """Get a result object from *commands.
91
92         Runs the 'generate_bytestream' function from subunit._output after
93         parsing *commands as if they were specified on the command line. The
94         resulting bytestream is then converted back into a result object and
95         returned.
96
97         """
98         stream = BytesIO()
99
100         for command_list in commands:
101             args = safe_parse_arguments(command_list)
102             output_writer = StreamResultToBytes(output_stream=stream)
103             generate_bytestream(args, output_writer)
104
105         stream.seek(0)
106
107         case = ByteStreamToStreamResult(source=stream)
108         result = StreamResult()
109         case.run(result)
110         return result
111
112     def test_start_generates_inprogress(self):
113         result = self._get_result_for(
114             ['start', 'foo'],
115         )
116
117         self.assertThat(
118             result._events[0],
119             MatchesCall(
120                 call='status',
121                 test_id='foo',
122                 test_status='inprogress',
123                 timestamp=self._dummy_timestamp,
124             )
125         )
126
127     def test_pass_generates_success(self):
128         result = self._get_result_for(
129             ['pass', 'foo'],
130         )
131
132         self.assertThat(
133             result._events[0],
134             MatchesCall(
135                 call='status',
136                 test_id='foo',
137                 test_status='success',
138                 timestamp=self._dummy_timestamp,
139             )
140         )
141
142     def test_fail_generates_fail(self):
143         result = self._get_result_for(
144             ['fail', 'foo'],
145         )
146
147         self.assertThat(
148             result._events[0],
149             MatchesCall(
150                 call='status',
151                 test_id='foo',
152                 test_status='fail',
153                 timestamp=self._dummy_timestamp,
154             )
155         )
156
157     def test_skip_generates_skip(self):
158         result = self._get_result_for(
159             ['skip', 'foo'],
160         )
161
162         self.assertThat(
163             result._events[0],
164             MatchesCall(
165                 call='status',
166                 test_id='foo',
167                 test_status='skip',
168                 timestamp=self._dummy_timestamp,
169             )
170         )
171
172     def test_exists_generates_exists(self):
173         result = self._get_result_for(
174             ['exists', 'foo'],
175         )
176
177         self.assertThat(
178             result._events[0],
179             MatchesCall(
180                 call='status',
181                 test_id='foo',
182                 test_status='exists',
183                 timestamp=self._dummy_timestamp,
184             )
185         )
186
187
188 class MatchesCall(Matcher):
189
190     _position_lookup = {
191             'call': 0,
192             'test_id': 1,
193             'test_status': 2,
194             'test_tags': 3,
195             'runnable': 4,
196             'file_name': 5,
197             'file_bytes': 6,
198             'eof': 7,
199             'mime_type': 8,
200             'route_code': 9,
201             'timestamp': 10,
202         }
203
204     def __init__(self, **kwargs):
205         unknown_kwargs = filter(
206             lambda k: k not in self._position_lookup,
207             kwargs
208         )
209         if unknown_kwargs:
210             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
211         self._filters = kwargs
212
213     def match(self, call_tuple):
214         for k,v in self._filters.items():
215             try:
216                 if call_tuple[self._position_lookup[k]] != v:
217                     return Mismatch("Value for key is %r, not %r" % (self._position_lookup[k], v))
218             except IndexError:
219                 return Mismatch("Key %s is not present." % k)
220
221     def __str__(self):
222         return "<MatchesCall %r>" % self._filters
223