673f89d6a560f6e1a1e19e33d9683f6273431ae2
[third_party/subunit] / python / subunit / tests / test_output_filter.py
1 #
2 #  subunit: extensions to python unittest to get test results from subprocesses.
3 #  Copyright (C) 2013 Subunit Contributors
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 import datetime
18 from functools import partial
19 from io import BytesIO, StringIO, TextIOWrapper
20 import optparse
21 import sys
22 from tempfile import NamedTemporaryFile
23
24 from contextlib import contextmanager
25 from testscenarios import WithScenarios
26 from testtools import TestCase
27 from testtools.compat import _u
28 from testtools.matchers import (
29     Equals,
30     Matcher,
31     MatchesListwise,
32     Mismatch,
33     raises,
34 )
35 from testtools.testresult.doubles import StreamResult
36
37 from subunit.iso8601 import UTC
38 from subunit.v2 import StreamResultToBytes, ByteStreamToStreamResult
39 from subunit._output import (
40     _ALL_ACTIONS,
41     _FINAL_ACTIONS,
42     generate_stream_results,
43     parse_arguments,
44 )
45 import subunit._output as _o
46
47
48 class SafeOptionParser(optparse.OptionParser):
49     """An ArgumentParser class that doesn't call sys.exit."""
50
51     def exit(self, status=0, message=""):
52         raise RuntimeError(message)
53
54
55 safe_parse_arguments = partial(parse_arguments, ParserClass=SafeOptionParser)
56
57
58 class TestCaseWithPatchedStderr(TestCase):
59
60     def setUp(self):
61         super(TestCaseWithPatchedStderr, self).setUp()
62         # prevent OptionParser from printing to stderr:
63         if sys.version[0] > '2':
64             self._stderr = StringIO()
65         else:
66             self._stderr = BytesIO()
67         self.patch(optparse.sys, 'stderr', self._stderr)
68
69
70 class TestStatusArgParserTests(WithScenarios, TestCaseWithPatchedStderr):
71
72     scenarios = [
73         (cmd, dict(command=cmd, option='--' + cmd)) for cmd in _ALL_ACTIONS
74     ]
75
76     def test_can_parse_all_commands_with_test_id(self):
77         test_id = self.getUniqueString()
78         args = safe_parse_arguments(args=[self.option, test_id])
79
80         self.assertThat(args.action, Equals(self.command))
81         self.assertThat(args.test_id, Equals(test_id))
82
83     def test_all_commands_parse_file_attachment(self):
84         with NamedTemporaryFile() as tmp_file:
85             args = safe_parse_arguments(
86                 args=[self.option, 'foo', '--attach-file', tmp_file.name]
87             )
88             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
89
90     def test_all_commands_accept_mimetype_argument(self):
91         with NamedTemporaryFile() as tmp_file:
92             args = safe_parse_arguments(
93                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--mimetype', "text/plain"]
94             )
95             self.assertThat(args.mimetype, Equals("text/plain"))
96
97     def test_all_commands_accept_file_name_argument(self):
98         with NamedTemporaryFile() as tmp_file:
99             args = safe_parse_arguments(
100                 args=[self.option, 'foo', '--attach-file', tmp_file.name, '--file-name', "foo"]
101             )
102             self.assertThat(args.file_name, Equals("foo"))
103
104     def test_all_commands_accept_tags_argument(self):
105         args = safe_parse_arguments(
106             args=[self.option, 'foo', '--tag', "foo", "--tag", "bar", "--tag", "baz"]
107         )
108         self.assertThat(args.tags, Equals(["foo", "bar", "baz"]))
109
110     def test_attach_file_with_hyphen_opens_stdin(self):
111         self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"Hello")))
112         args = safe_parse_arguments(
113             args=[self.option, "foo", "--attach-file", "-"]
114         )
115
116         self.assertThat(args.attach_file.read(), Equals(b"Hello"))
117
118     def test_attach_file_with_hyphen_sets_filename_to_stdin(self):
119         args = safe_parse_arguments(
120             args=[self.option, "foo", "--attach-file", "-"]
121         )
122
123         self.assertThat(args.file_name, Equals("stdin"))
124
125     def test_can_override_stdin_filename(self):
126         args = safe_parse_arguments(
127             args=[self.option, "foo", "--attach-file", "-", '--file-name', 'foo']
128         )
129
130         self.assertThat(args.file_name, Equals("foo"))
131
132     def test_requires_test_id(self):
133         fn = lambda: safe_parse_arguments(args=[self.option])
134         self.assertThat(
135             fn,
136             raises(RuntimeError('subunit-output: error: argument %s: must '
137                 'specify a single TEST_ID.\n' % self.option))
138         )
139
140
141 class ArgParserTests(TestCaseWithPatchedStderr):
142
143     def test_can_parse_attach_file_without_test_id(self):
144         with NamedTemporaryFile() as tmp_file:
145             args = safe_parse_arguments(
146                 args=["--attach-file", tmp_file.name]
147             )
148             self.assertThat(args.attach_file.name, Equals(tmp_file.name))
149
150     def test_can_run_without_args(self):
151         args = safe_parse_arguments([])
152
153     def test_cannot_specify_more_than_one_status_command(self):
154         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--skip', 'bar'])
155         self.assertThat(
156             fn,
157             raises(RuntimeError('subunit-output: error: argument --skip: '
158                 'Only one status may be specified at once.\n'))
159         )
160
161     def test_cannot_specify_mimetype_without_attach_file(self):
162         fn = lambda: safe_parse_arguments(['--mimetype', 'foo'])
163         self.assertThat(
164             fn,
165             raises(RuntimeError('subunit-output: error: Cannot specify '
166                 '--mimetype without --attach-file\n'))
167         )
168
169     def test_cannot_specify_filename_without_attach_file(self):
170         fn = lambda: safe_parse_arguments(['--file-name', 'foo'])
171         self.assertThat(
172             fn,
173             raises(RuntimeError('subunit-output: error: Cannot specify '
174                 '--file-name without --attach-file\n'))
175         )
176
177     def test_can_specify_tags_without_status_command(self):
178         args = safe_parse_arguments(['--tag', 'foo'])
179         self.assertEqual(['foo'], args.tags)
180
181     def test_must_specify_tags_with_tags_options(self):
182         fn = lambda: safe_parse_arguments(['--fail', 'foo', '--tag'])
183         self.assertThat(
184             fn,
185             raises(RuntimeError('subunit-output: error: --tag option requires 1 argument\n'))
186         )
187
188
189 def get_result_for(commands):
190     """Get a result object from *commands.
191
192     Runs the 'generate_stream_results' function from subunit._output after
193     parsing *commands as if they were specified on the command line. The
194     resulting bytestream is then converted back into a result object and
195     returned.
196     """
197     stream = BytesIO()
198
199     args = safe_parse_arguments(commands)
200     output_writer = StreamResultToBytes(output_stream=stream)
201     generate_stream_results(args, output_writer)
202
203     stream.seek(0)
204
205     case = ByteStreamToStreamResult(source=stream)
206     result = StreamResult()
207     case.run(result)
208     return result
209
210
211 @contextmanager
212 def temp_file_contents(data):
213     """Create a temporary file on disk containing 'data'."""
214     with NamedTemporaryFile() as f:
215         f.write(data)
216         f.seek(0)
217         yield f
218
219
220 class StatusStreamResultTests(WithScenarios, TestCase):
221
222     scenarios = [
223         (s, dict(status=s, option='--' + s)) for s in _ALL_ACTIONS
224     ]
225
226     _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
227
228     def setUp(self):
229         super(StatusStreamResultTests, self).setUp()
230         self.patch(_o, 'create_timestamp', lambda: self._dummy_timestamp)
231         self.test_id = self.getUniqueString()
232
233     def test_only_one_packet_is_generated(self):
234         result = get_result_for([self.option, self.test_id])
235         self.assertThat(
236             len(result._events),
237             Equals(1)
238         )
239
240     def test_correct_status_is_generated(self):
241         result = get_result_for([self.option, self.test_id])
242
243         self.assertThat(
244             result._events[0],
245             MatchesStatusCall(test_status=self.status)
246         )
247
248     def test_all_commands_generate_tags(self):
249         result = get_result_for([self.option, self.test_id, '--tag', 'hello', '--tag', 'world'])
250         self.assertThat(
251             result._events[0],
252             MatchesStatusCall(test_tags=set(['hello', 'world']))
253         )
254
255     def test_all_commands_generate_timestamp(self):
256         result = get_result_for([self.option, self.test_id])
257
258         self.assertThat(
259             result._events[0],
260             MatchesStatusCall(timestamp=self._dummy_timestamp)
261         )
262
263     def test_all_commands_generate_correct_test_id(self):
264         result = get_result_for([self.option, self.test_id])
265
266         self.assertThat(
267             result._events[0],
268             MatchesStatusCall(test_id=self.test_id)
269         )
270
271     def test_file_is_sent_in_single_packet(self):
272         with temp_file_contents(b"Hello") as f:
273             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
274
275             self.assertThat(
276                 result._events,
277                 MatchesListwise([
278                     MatchesStatusCall(file_bytes=b'Hello', eof=True),
279                 ])
280             )
281
282     def test_can_read_binary_files(self):
283         with temp_file_contents(b"\xDE\xAD\xBE\xEF") as f:
284             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
285
286             self.assertThat(
287                 result._events,
288                 MatchesListwise([
289                     MatchesStatusCall(file_bytes=b"\xDE\xAD\xBE\xEF", eof=True),
290                 ])
291             )
292
293     def test_can_read_empty_files(self):
294         with temp_file_contents(b"") as f:
295             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
296
297             self.assertThat(
298                 result._events,
299                 MatchesListwise([
300                     MatchesStatusCall(file_bytes=b"", file_name=f.name, eof=True),
301                 ])
302             )
303
304     def test_can_read_stdin(self):
305         self.patch(_o.sys, 'stdin', TextIOWrapper(BytesIO(b"\xFE\xED\xFA\xCE")))
306         result = get_result_for([self.option, self.test_id, '--attach-file', '-'])
307
308         self.assertThat(
309             result._events,
310             MatchesListwise([
311                 MatchesStatusCall(file_bytes=b"\xFE\xED\xFA\xCE", file_name='stdin', eof=True),
312             ])
313         )
314
315     def test_file_is_sent_with_test_id(self):
316         with temp_file_contents(b"Hello") as f:
317             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
318
319             self.assertThat(
320                 result._events,
321                 MatchesListwise([
322                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'Hello', eof=True),
323                 ])
324             )
325
326     def test_file_is_sent_with_test_status(self):
327         with temp_file_contents(b"Hello") as f:
328             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
329
330             self.assertThat(
331                 result._events,
332                 MatchesListwise([
333                     MatchesStatusCall(test_status=self.status, file_bytes=b'Hello', eof=True),
334                 ])
335             )
336
337     def test_file_chunk_size_is_honored(self):
338         with temp_file_contents(b"Hello") as f:
339             self.patch(_o, '_CHUNK_SIZE', 1)
340             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
341
342             self.assertThat(
343                 result._events,
344                 MatchesListwise([
345                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'H', eof=False),
346                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'e', eof=False),
347                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
348                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'l', eof=False),
349                     MatchesStatusCall(test_id=self.test_id, file_bytes=b'o', eof=True),
350                 ])
351             )
352
353     def test_file_mimetype_specified_once_only(self):
354         with temp_file_contents(b"Hi") as f:
355             self.patch(_o, '_CHUNK_SIZE', 1)
356             result = get_result_for([
357                 self.option,
358                 self.test_id,
359                 '--attach-file',
360                 f.name,
361                 '--mimetype',
362                 'text/plain',
363             ])
364
365             self.assertThat(
366                 result._events,
367                 MatchesListwise([
368                     MatchesStatusCall(test_id=self.test_id, mime_type='text/plain', file_bytes=b'H', eof=False),
369                     MatchesStatusCall(test_id=self.test_id, mime_type=None, file_bytes=b'i', eof=True),
370                 ])
371             )
372
373     def test_tags_specified_once_only(self):
374         with temp_file_contents(b"Hi") as f:
375             self.patch(_o, '_CHUNK_SIZE', 1)
376             result = get_result_for([
377                 self.option,
378                 self.test_id,
379                 '--attach-file',
380                 f.name,
381                 '--tag',
382                 'foo',
383                 '--tag',
384                 'bar',
385             ])
386
387             self.assertThat(
388                 result._events,
389                 MatchesListwise([
390                     MatchesStatusCall(test_id=self.test_id, test_tags=set(['foo', 'bar'])),
391                     MatchesStatusCall(test_id=self.test_id, test_tags=None),
392                 ])
393             )
394
395     def test_timestamp_specified_once_only(self):
396         with temp_file_contents(b"Hi") as f:
397             self.patch(_o, '_CHUNK_SIZE', 1)
398             result = get_result_for([
399                 self.option,
400                 self.test_id,
401                 '--attach-file',
402                 f.name,
403             ])
404
405             self.assertThat(
406                 result._events,
407                 MatchesListwise([
408                     MatchesStatusCall(test_id=self.test_id, timestamp=self._dummy_timestamp),
409                     MatchesStatusCall(test_id=self.test_id, timestamp=None),
410                 ])
411             )
412
413     def test_test_status_specified_once_only(self):
414         with temp_file_contents(b"Hi") as f:
415             self.patch(_o, '_CHUNK_SIZE', 1)
416             result = get_result_for([
417                 self.option,
418                 self.test_id,
419                 '--attach-file',
420                 f.name,
421             ])
422
423             # 'inprogress' status should be on the first packet only, all other
424             # statuses should be on the last packet.
425             if self.status in _FINAL_ACTIONS:
426                 first_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
427                 last_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
428             else:
429                 first_call = MatchesStatusCall(test_id=self.test_id, test_status=self.status)
430                 last_call = MatchesStatusCall(test_id=self.test_id, test_status=None)
431             self.assertThat(
432                 result._events,
433                 MatchesListwise([first_call, last_call])
434             )
435
436     def test_filename_can_be_overridden(self):
437         with temp_file_contents(b"Hello") as f:
438             specified_file_name = self.getUniqueString()
439             result = get_result_for([
440                 self.option,
441                 self.test_id,
442                 '--attach-file',
443                 f.name,
444                 '--file-name',
445                 specified_file_name])
446
447             self.assertThat(
448                 result._events,
449                 MatchesListwise([
450                     MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
451                 ])
452             )
453
454     def test_file_name_is_used_by_default(self):
455         with temp_file_contents(b"Hello") as f:
456             result = get_result_for([self.option, self.test_id, '--attach-file', f.name])
457
458             self.assertThat(
459                 result._events,
460                 MatchesListwise([
461                     MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
462                 ])
463             )
464
465
466 class FileDataTests(TestCase):
467
468     def test_can_attach_file_without_test_id(self):
469         with temp_file_contents(b"Hello") as f:
470             result = get_result_for(['--attach-file', f.name])
471
472             self.assertThat(
473                 result._events,
474                 MatchesListwise([
475                     MatchesStatusCall(test_id=None, file_bytes=b'Hello', eof=True),
476                 ])
477             )
478
479     def test_file_name_is_used_by_default(self):
480         with temp_file_contents(b"Hello") as f:
481             result = get_result_for(['--attach-file', f.name])
482
483             self.assertThat(
484                 result._events,
485                 MatchesListwise([
486                     MatchesStatusCall(file_name=f.name, file_bytes=b'Hello', eof=True),
487                 ])
488             )
489
490     def test_filename_can_be_overridden(self):
491         with temp_file_contents(b"Hello") as f:
492             specified_file_name = self.getUniqueString()
493             result = get_result_for([
494                 '--attach-file',
495                 f.name,
496                 '--file-name',
497                 specified_file_name
498             ])
499
500             self.assertThat(
501                 result._events,
502                 MatchesListwise([
503                     MatchesStatusCall(file_name=specified_file_name, file_bytes=b'Hello'),
504                 ])
505             )
506
507     def test_files_have_timestamp(self):
508         _dummy_timestamp = datetime.datetime(2013, 1, 1, 0, 0, 0, 0, UTC)
509         self.patch(_o, 'create_timestamp', lambda: _dummy_timestamp)
510
511         with temp_file_contents(b"Hello") as f:
512             specified_file_name = self.getUniqueString()
513             result = get_result_for([
514                 '--attach-file',
515                 f.name,
516             ])
517
518             self.assertThat(
519                 result._events,
520                 MatchesListwise([
521                     MatchesStatusCall(file_bytes=b'Hello', timestamp=_dummy_timestamp),
522                 ])
523             )
524
525     def test_can_specify_tags_without_test_status(self):
526         result = get_result_for([
527             '--tag',
528             'foo',
529         ])
530
531         self.assertThat(
532             result._events,
533             MatchesListwise([
534                 MatchesStatusCall(test_tags=set(['foo'])),
535             ])
536         )
537
538 class MatchesStatusCall(Matcher):
539
540     _position_lookup = {
541         'call': 0,
542         'test_id': 1,
543         'test_status': 2,
544         'test_tags': 3,
545         'runnable': 4,
546         'file_name': 5,
547         'file_bytes': 6,
548         'eof': 7,
549         'mime_type': 8,
550         'route_code': 9,
551         'timestamp': 10,
552     }
553
554     def __init__(self, **kwargs):
555         unknown_kwargs = list(filter(
556             lambda k: k not in self._position_lookup,
557             kwargs
558         ))
559         if unknown_kwargs:
560             raise ValueError("Unknown keywords: %s" % ','.join(unknown_kwargs))
561         self._filters = kwargs
562
563     def match(self, call_tuple):
564         for k, v in self._filters.items():
565             try:
566                 pos = self._position_lookup[k]
567                 if call_tuple[pos] != v:
568                     return Mismatch(
569                         "Value for key is %r, not %r" % (call_tuple[pos], v)
570                     )
571             except IndexError:
572                 return Mismatch("Key %s is not present." % k)
573
574     def __str__(self):
575         return "<MatchesStatusCall %r>" % self._filters