b4c939756f268277e07ff92fff000eed7b69d424
[metze/samba/wip.git] / lib / subunit / python / subunit / __init__.py
1 #
2 #  subunit: extensions to Python unittest to get test results from subprocesses.
3 #  Copyright (C) 2005  Robert Collins <robertc@robertcollins.net>
4 #
5 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 #  license at the users choice. A copy of both licenses are available in the
7 #  project source as Apache-2.0 and BSD. You may not use this file except in
8 #  compliance with one of these two licences.
9 #
10 #  Unless required by applicable law or agreed to in writing, software
11 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
13 #  license you chose for the specific language governing permissions and
14 #  limitations under that license.
15 #
16
17 """Subunit - a streaming test protocol
18
19 Overview
20 ++++++++
21
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
26
27
28 Key Classes
29 -----------
30
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
33
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
38
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
41
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
48
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
57 and newer).
58
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all subsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
64
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occurred at, to allow reconstructing
68 test timing from a stream.
69
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
74
75
76 Python test support
77 -------------------
78
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
81
82   $ python -m subunit.run mylib.tests.test_suite
83
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
86
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
89
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
92 result object::
93
94  class AggregateTests(subunit.ExecTestCase):
95
96      def test_script_one(self):
97          './bin/script_one'
98
99      def test_script_two(self):
100          './bin/script_two'
101
102  # Normally your normal test loading would take of this automatically,
103  # It is only spelt out in detail here for clarity.
104  suite = unittest.TestSuite([AggregateTests("test_script_one"),
105      AggregateTests("test_script_two")])
106  # Create any TestResult class you like.
107  result = unittest._TextTestResult(sys.stdout)
108  # And run your suite as normal, Subunit will exec each external script as
109  # needed and report to your result object.
110  suite.run(result)
111
112 Utility modules
113 ---------------
114
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
117 """
118
119 import os
120 import re
121 import subprocess
122 import sys
123 import unittest
124
125 from testtools import content, content_type, ExtendedToOriginalDecorator
126 from testtools.compat import _b, _u, BytesIO, StringIO
127 try:
128     from testtools.testresult.real import _StringException
129     RemoteException = _StringException
130     # For testing: different pythons have different str() implementations.
131     if sys.version_info > (3, 0):
132         _remote_exception_str = "testtools.testresult.real._StringException"
133         _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
134     else:
135         _remote_exception_str = "_StringException" 
136         _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
137 except ImportError:
138     raise ImportError ("testtools.testresult.real does not contain "
139         "_StringException, check your version.")
140 from testtools import testresult
141
142 from subunit import chunked, details, iso8601, test_results
143
144
145 PROGRESS_SET = 0
146 PROGRESS_CUR = 1
147 PROGRESS_PUSH = 2
148 PROGRESS_POP = 3
149
150
151 def test_suite():
152     import subunit.tests
153     return subunit.tests.test_suite()
154
155
156 def join_dir(base_path, path):
157     """
158     Returns an absolute path to C{path}, calculated relative to the parent
159     of C{base_path}.
160
161     @param base_path: A path to a file or directory.
162     @param path: An absolute path, or a path relative to the containing
163     directory of C{base_path}.
164
165     @return: An absolute path to C{path}.
166     """
167     return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
168
169
170 def tags_to_new_gone(tags):
171     """Split a list of tags into a new_set and a gone_set."""
172     new_tags = set()
173     gone_tags = set()
174     for tag in tags:
175         if tag[0] == '-':
176             gone_tags.add(tag[1:])
177         else:
178             new_tags.add(tag)
179     return new_tags, gone_tags
180
181
182 class DiscardStream(object):
183     """A filelike object which discards what is written to it."""
184
185     def write(self, bytes):
186         pass
187
188
189 class _ParserState(object):
190     """State for the subunit parser."""
191
192     def __init__(self, parser):
193         self.parser = parser
194         self._test_sym = (_b('test'), _b('testing'))
195         self._colon_sym = _b(':')
196         self._error_sym = (_b('error'),)
197         self._failure_sym = (_b('failure'),)
198         self._progress_sym = (_b('progress'),)
199         self._skip_sym = _b('skip')
200         self._success_sym = (_b('success'), _b('successful'))
201         self._tags_sym = (_b('tags'),)
202         self._time_sym = (_b('time'),)
203         self._xfail_sym = (_b('xfail'),)
204         self._uxsuccess_sym = (_b('uxsuccess'),)
205         self._start_simple = _u(" [")
206         self._start_multipart = _u(" [ multipart")
207
208     def addError(self, offset, line):
209         """An 'error:' directive has been read."""
210         self.parser.stdOutLineReceived(line)
211
212     def addExpectedFail(self, offset, line):
213         """An 'xfail:' directive has been read."""
214         self.parser.stdOutLineReceived(line)
215
216     def addFailure(self, offset, line):
217         """A 'failure:' directive has been read."""
218         self.parser.stdOutLineReceived(line)
219
220     def addSkip(self, offset, line):
221         """A 'skip:' directive has been read."""
222         self.parser.stdOutLineReceived(line)
223
224     def addSuccess(self, offset, line):
225         """A 'success:' directive has been read."""
226         self.parser.stdOutLineReceived(line)
227
228     def lineReceived(self, line):
229         """a line has been received."""
230         parts = line.split(None, 1)
231         if len(parts) == 2 and line.startswith(parts[0]):
232             cmd, rest = parts
233             offset = len(cmd) + 1
234             cmd = cmd.rstrip(self._colon_sym)
235             if cmd in self._test_sym:
236                 self.startTest(offset, line)
237             elif cmd in self._error_sym:
238                 self.addError(offset, line)
239             elif cmd in self._failure_sym:
240                 self.addFailure(offset, line)
241             elif cmd in self._progress_sym:
242                 self.parser._handleProgress(offset, line)
243             elif cmd in self._skip_sym:
244                 self.addSkip(offset, line)
245             elif cmd in self._success_sym:
246                 self.addSuccess(offset, line)
247             elif cmd in self._tags_sym:
248                 self.parser._handleTags(offset, line)
249                 self.parser.subunitLineReceived(line)
250             elif cmd in self._time_sym:
251                 self.parser._handleTime(offset, line)
252                 self.parser.subunitLineReceived(line)
253             elif cmd in self._xfail_sym:
254                 self.addExpectedFail(offset, line)
255             elif cmd in self._uxsuccess_sym:
256                 self.addUnexpectedSuccess(offset, line)
257             else:
258                 self.parser.stdOutLineReceived(line)
259         else:
260             self.parser.stdOutLineReceived(line)
261
262     def lostConnection(self):
263         """Connection lost."""
264         self.parser._lostConnectionInTest(_u('unknown state of '))
265
266     def startTest(self, offset, line):
267         """A test start command received."""
268         self.parser.stdOutLineReceived(line)
269
270
271 class _InTest(_ParserState):
272     """State for the subunit parser after reading a test: directive."""
273
274     def _outcome(self, offset, line, no_details, details_state):
275         """An outcome directive has been read.
276
277         :param no_details: Callable to call when no details are presented.
278         :param details_state: The state to switch to for details
279             processing of this outcome.
280         """
281         test_name = line[offset:-1].decode('utf8')
282         if self.parser.current_test_description == test_name:
283             self.parser._state = self.parser._outside_test
284             self.parser.current_test_description = None
285             no_details()
286             self.parser.client.stopTest(self.parser._current_test)
287             self.parser._current_test = None
288             self.parser.subunitLineReceived(line)
289         elif self.parser.current_test_description + self._start_simple == \
290             test_name:
291             self.parser._state = details_state
292             details_state.set_simple()
293             self.parser.subunitLineReceived(line)
294         elif self.parser.current_test_description + self._start_multipart == \
295             test_name:
296             self.parser._state = details_state
297             details_state.set_multipart()
298             self.parser.subunitLineReceived(line)
299         else:
300             self.parser.stdOutLineReceived(line)
301
302     def _error(self):
303         self.parser.client.addError(self.parser._current_test,
304             details={})
305
306     def addError(self, offset, line):
307         """An 'error:' directive has been read."""
308         self._outcome(offset, line, self._error,
309             self.parser._reading_error_details)
310
311     def _xfail(self):
312         self.parser.client.addExpectedFailure(self.parser._current_test,
313             details={})
314
315     def addExpectedFail(self, offset, line):
316         """An 'xfail:' directive has been read."""
317         self._outcome(offset, line, self._xfail,
318             self.parser._reading_xfail_details)
319
320     def _uxsuccess(self):
321         self.parser.client.addUnexpectedSuccess(self.parser._current_test)
322
323     def addUnexpectedSuccess(self, offset, line):
324         """A 'uxsuccess:' directive has been read."""
325         self._outcome(offset, line, self._uxsuccess,
326             self.parser._reading_uxsuccess_details)
327
328     def _failure(self):
329         self.parser.client.addFailure(self.parser._current_test, details={})
330
331     def addFailure(self, offset, line):
332         """A 'failure:' directive has been read."""
333         self._outcome(offset, line, self._failure,
334             self.parser._reading_failure_details)
335
336     def _skip(self):
337         self.parser.client.addSkip(self.parser._current_test, details={})
338
339     def addSkip(self, offset, line):
340         """A 'skip:' directive has been read."""
341         self._outcome(offset, line, self._skip,
342             self.parser._reading_skip_details)
343
344     def _succeed(self):
345         self.parser.client.addSuccess(self.parser._current_test, details={})
346
347     def addSuccess(self, offset, line):
348         """A 'success:' directive has been read."""
349         self._outcome(offset, line, self._succeed,
350             self.parser._reading_success_details)
351
352     def lostConnection(self):
353         """Connection lost."""
354         self.parser._lostConnectionInTest(_u(''))
355
356
357 class _OutSideTest(_ParserState):
358     """State for the subunit parser outside of a test context."""
359
360     def lostConnection(self):
361         """Connection lost."""
362
363     def startTest(self, offset, line):
364         """A test start command received."""
365         self.parser._state = self.parser._in_test
366         test_name = line[offset:-1].decode('utf8')
367         self.parser._current_test = RemotedTestCase(test_name)
368         self.parser.current_test_description = test_name
369         self.parser.client.startTest(self.parser._current_test)
370         self.parser.subunitLineReceived(line)
371
372
373 class _ReadingDetails(_ParserState):
374     """Common logic for readin state details."""
375
376     def endDetails(self):
377         """The end of a details section has been reached."""
378         self.parser._state = self.parser._outside_test
379         self.parser.current_test_description = None
380         self._report_outcome()
381         self.parser.client.stopTest(self.parser._current_test)
382
383     def lineReceived(self, line):
384         """a line has been received."""
385         self.details_parser.lineReceived(line)
386         self.parser.subunitLineReceived(line)
387
388     def lostConnection(self):
389         """Connection lost."""
390         self.parser._lostConnectionInTest(_u('%s report of ') %
391             self._outcome_label())
392
393     def _outcome_label(self):
394         """The label to describe this outcome."""
395         raise NotImplementedError(self._outcome_label)
396
397     def set_simple(self):
398         """Start a simple details parser."""
399         self.details_parser = details.SimpleDetailsParser(self)
400
401     def set_multipart(self):
402         """Start a multipart details parser."""
403         self.details_parser = details.MultipartDetailsParser(self)
404
405
406 class _ReadingFailureDetails(_ReadingDetails):
407     """State for the subunit parser when reading failure details."""
408
409     def _report_outcome(self):
410         self.parser.client.addFailure(self.parser._current_test,
411             details=self.details_parser.get_details())
412
413     def _outcome_label(self):
414         return "failure"
415
416
417 class _ReadingErrorDetails(_ReadingDetails):
418     """State for the subunit parser when reading error details."""
419
420     def _report_outcome(self):
421         self.parser.client.addError(self.parser._current_test,
422             details=self.details_parser.get_details())
423
424     def _outcome_label(self):
425         return "error"
426
427
428 class _ReadingExpectedFailureDetails(_ReadingDetails):
429     """State for the subunit parser when reading xfail details."""
430
431     def _report_outcome(self):
432         self.parser.client.addExpectedFailure(self.parser._current_test,
433             details=self.details_parser.get_details())
434
435     def _outcome_label(self):
436         return "xfail"
437
438
439 class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
440     """State for the subunit parser when reading uxsuccess details."""
441
442     def _report_outcome(self):
443         self.parser.client.addUnexpectedSuccess(self.parser._current_test,
444             details=self.details_parser.get_details())
445
446     def _outcome_label(self):
447         return "uxsuccess"
448
449
450 class _ReadingSkipDetails(_ReadingDetails):
451     """State for the subunit parser when reading skip details."""
452
453     def _report_outcome(self):
454         self.parser.client.addSkip(self.parser._current_test,
455             details=self.details_parser.get_details("skip"))
456
457     def _outcome_label(self):
458         return "skip"
459
460
461 class _ReadingSuccessDetails(_ReadingDetails):
462     """State for the subunit parser when reading success details."""
463
464     def _report_outcome(self):
465         self.parser.client.addSuccess(self.parser._current_test,
466             details=self.details_parser.get_details("success"))
467
468     def _outcome_label(self):
469         return "success"
470
471
472 class TestProtocolServer(object):
473     """A parser for subunit.
474
475     :ivar tags: The current tags associated with the protocol stream.
476     """
477
478     def __init__(self, client, stream=None, forward_stream=None):
479         """Create a TestProtocolServer instance.
480
481         :param client: An object meeting the unittest.TestResult protocol.
482         :param stream: The stream that lines received which are not part of the
483             subunit protocol should be written to. This allows custom handling
484             of mixed protocols. By default, sys.stdout will be used for
485             convenience. It should accept bytes to its write() method.
486         :param forward_stream: A stream to forward subunit lines to. This
487             allows a filter to forward the entire stream while still parsing
488             and acting on it. By default forward_stream is set to
489             DiscardStream() and no forwarding happens.
490         """
491         self.client = ExtendedToOriginalDecorator(client)
492         if stream is None:
493             stream = sys.stdout
494             if sys.version_info > (3, 0):
495                 stream = stream.buffer
496         self._stream = stream
497         self._forward_stream = forward_stream or DiscardStream()
498         # state objects we can switch too
499         self._in_test = _InTest(self)
500         self._outside_test = _OutSideTest(self)
501         self._reading_error_details = _ReadingErrorDetails(self)
502         self._reading_failure_details = _ReadingFailureDetails(self)
503         self._reading_skip_details = _ReadingSkipDetails(self)
504         self._reading_success_details = _ReadingSuccessDetails(self)
505         self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
506         self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
507         # start with outside test.
508         self._state = self._outside_test
509         # Avoid casts on every call
510         self._plusminus = _b('+-')
511         self._push_sym = _b('push')
512         self._pop_sym = _b('pop')
513
514     def _handleProgress(self, offset, line):
515         """Process a progress directive."""
516         line = line[offset:].strip()
517         if line[0] in self._plusminus:
518             whence = PROGRESS_CUR
519             delta = int(line)
520         elif line == self._push_sym:
521             whence = PROGRESS_PUSH
522             delta = None
523         elif line == self._pop_sym:
524             whence = PROGRESS_POP
525             delta = None
526         else:
527             whence = PROGRESS_SET
528             delta = int(line)
529         self.client.progress(delta, whence)
530
531     def _handleTags(self, offset, line):
532         """Process a tags command."""
533         tags = line[offset:].decode('utf8').split()
534         new_tags, gone_tags = tags_to_new_gone(tags)
535         self.client.tags(new_tags, gone_tags)
536
537     def _handleTime(self, offset, line):
538         # Accept it, but do not do anything with it yet.
539         try:
540             event_time = iso8601.parse_date(line[offset:-1])
541         except TypeError:
542             raise TypeError(_u("Failed to parse %r, got %r")
543                 % (line, sys.exec_info[1]))
544         self.client.time(event_time)
545
546     def lineReceived(self, line):
547         """Call the appropriate local method for the received line."""
548         self._state.lineReceived(line)
549
550     def _lostConnectionInTest(self, state_string):
551         error_string = _u("lost connection during %stest '%s'") % (
552             state_string, self.current_test_description)
553         self.client.addError(self._current_test, RemoteError(error_string))
554         self.client.stopTest(self._current_test)
555
556     def lostConnection(self):
557         """The input connection has finished."""
558         self._state.lostConnection()
559
560     def readFrom(self, pipe):
561         """Blocking convenience API to parse an entire stream.
562
563         :param pipe: A file-like object supporting readlines().
564         :return: None.
565         """
566         for line in pipe.readlines():
567             self.lineReceived(line)
568         self.lostConnection()
569
570     def _startTest(self, offset, line):
571         """Internal call to change state machine. Override startTest()."""
572         self._state.startTest(offset, line)
573
574     def subunitLineReceived(self, line):
575         self._forward_stream.write(line)
576
577     def stdOutLineReceived(self, line):
578         self._stream.write(line)
579
580
581 class TestProtocolClient(testresult.TestResult):
582     """A TestResult which generates a subunit stream for a test run.
583
584     # Get a TestSuite or TestCase to run
585     suite = make_suite()
586     # Create a stream (any object with a 'write' method). This should accept
587     # bytes not strings: subunit is a byte orientated protocol.
588     stream = file('tests.log', 'wb')
589     # Create a subunit result object which will output to the stream
590     result = subunit.TestProtocolClient(stream)
591     # Optionally, to get timing data for performance analysis, wrap the
592     # serialiser with a timing decorator
593     result = subunit.test_results.AutoTimingTestResultDecorator(result)
594     # Run the test suite reporting to the subunit result object
595     suite.run(result)
596     # Close the stream.
597     stream.close()
598     """
599
600     def __init__(self, stream):
601         testresult.TestResult.__init__(self)
602         self._stream = stream
603         _make_stream_binary(stream)
604         self._progress_fmt = _b("progress: ")
605         self._bytes_eol = _b("\n")
606         self._progress_plus = _b("+")
607         self._progress_push = _b("push")
608         self._progress_pop = _b("pop")
609         self._empty_bytes = _b("")
610         self._start_simple = _b(" [\n")
611         self._end_simple = _b("]\n")
612
613     def addError(self, test, error=None, details=None):
614         """Report an error in test test.
615
616         Only one of error and details should be provided: conceptually there
617         are two separate methods:
618             addError(self, test, error)
619             addError(self, test, details)
620
621         :param error: Standard unittest positional argument form - an
622             exc_info tuple.
623         :param details: New Testing-in-python drafted API; a dict from string
624             to subunit.Content objects.
625         """
626         self._addOutcome("error", test, error=error, details=details)
627
628     def addExpectedFailure(self, test, error=None, details=None):
629         """Report an expected failure in test test.
630
631         Only one of error and details should be provided: conceptually there
632         are two separate methods:
633             addError(self, test, error)
634             addError(self, test, details)
635
636         :param error: Standard unittest positional argument form - an
637             exc_info tuple.
638         :param details: New Testing-in-python drafted API; a dict from string
639             to subunit.Content objects.
640         """
641         self._addOutcome("xfail", test, error=error, details=details)
642
643     def addFailure(self, test, error=None, details=None):
644         """Report a failure in test test.
645
646         Only one of error and details should be provided: conceptually there
647         are two separate methods:
648             addFailure(self, test, error)
649             addFailure(self, test, details)
650
651         :param error: Standard unittest positional argument form - an
652             exc_info tuple.
653         :param details: New Testing-in-python drafted API; a dict from string
654             to subunit.Content objects.
655         """
656         self._addOutcome("failure", test, error=error, details=details)
657
658     def _addOutcome(self, outcome, test, error=None, details=None,
659         error_permitted=True):
660         """Report a failure in test test.
661
662         Only one of error and details should be provided: conceptually there
663         are two separate methods:
664             addOutcome(self, test, error)
665             addOutcome(self, test, details)
666
667         :param outcome: A string describing the outcome - used as the
668             event name in the subunit stream.
669         :param error: Standard unittest positional argument form - an
670             exc_info tuple.
671         :param details: New Testing-in-python drafted API; a dict from string
672             to subunit.Content objects.
673         :param error_permitted: If True then one and only one of error or
674             details must be supplied. If False then error must not be supplied
675             and details is still optional.  """
676         self._stream.write(_b("%s: %s" % (outcome, test.id())))
677         if error_permitted:
678             if error is None and details is None:
679                 raise ValueError
680         else:
681             if error is not None:
682                 raise ValueError
683         if error is not None:
684             self._stream.write(self._start_simple)
685             # XXX: this needs to be made much stricter, along the lines of
686             # Martin[gz]'s work in testtools. Perhaps subunit can use that?
687             for line in self._exc_info_to_unicode(error, test).splitlines():
688                 self._stream.write(("%s\n" % line).encode('utf8'))
689         elif details is not None:
690             self._write_details(details)
691         else:
692             self._stream.write(_b("\n"))
693         if details is not None or error is not None:
694             self._stream.write(self._end_simple)
695
696     def addSkip(self, test, reason=None, details=None):
697         """Report a skipped test."""
698         if reason is None:
699             self._addOutcome("skip", test, error=None, details=details)
700         else:
701             self._stream.write(_b("skip: %s [\n" % test.id()))
702             self._stream.write(_b("%s\n" % reason))
703             self._stream.write(self._end_simple)
704
705     def addSuccess(self, test, details=None):
706         """Report a success in a test."""
707         self._addOutcome("successful", test, details=details, error_permitted=False)
708
709     def addUnexpectedSuccess(self, test, details=None):
710         """Report an unexpected success in test test.
711
712         Details can optionally be provided: conceptually there
713         are two separate methods:
714             addError(self, test)
715             addError(self, test, details)
716
717         :param details: New Testing-in-python drafted API; a dict from string
718             to subunit.Content objects.
719         """
720         self._addOutcome("uxsuccess", test, details=details,
721             error_permitted=False)
722
723     def startTest(self, test):
724         """Mark a test as starting its test run."""
725         super(TestProtocolClient, self).startTest(test)
726         self._stream.write(_b("test: %s\n" % test.id()))
727         self._stream.flush()
728
729     def stopTest(self, test):
730         super(TestProtocolClient, self).stopTest(test)
731         self._stream.flush()
732
733     def progress(self, offset, whence):
734         """Provide indication about the progress/length of the test run.
735
736         :param offset: Information about the number of tests remaining. If
737             whence is PROGRESS_CUR, then offset increases/decreases the
738             remaining test count. If whence is PROGRESS_SET, then offset
739             specifies exactly the remaining test count.
740         :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
741             PROGRESS_POP.
742         """
743         if whence == PROGRESS_CUR and offset > -1:
744             prefix = self._progress_plus
745             offset = _b(str(offset))
746         elif whence == PROGRESS_PUSH:
747             prefix = self._empty_bytes
748             offset = self._progress_push
749         elif whence == PROGRESS_POP:
750             prefix = self._empty_bytes
751             offset = self._progress_pop
752         else:
753             prefix = self._empty_bytes
754             offset = _b(str(offset))
755         self._stream.write(self._progress_fmt + prefix + offset +
756             self._bytes_eol)
757
758     def time(self, a_datetime):
759         """Inform the client of the time.
760
761         ":param datetime: A datetime.datetime object.
762         """
763         time = a_datetime.astimezone(iso8601.Utc())
764         self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
765             time.year, time.month, time.day, time.hour, time.minute,
766             time.second, time.microsecond)))
767
768     def _write_details(self, details):
769         """Output details to the stream.
770
771         :param details: An extended details dict for a test outcome.
772         """
773         self._stream.write(_b(" [ multipart\n"))
774         for name, content in sorted(details.items()):
775             self._stream.write(_b("Content-Type: %s/%s" %
776                 (content.content_type.type, content.content_type.subtype)))
777             parameters = content.content_type.parameters
778             if parameters:
779                 self._stream.write(_b(";"))
780                 param_strs = []
781                 for param, value in parameters.items():
782                     param_strs.append("%s=%s" % (param, value))
783                 self._stream.write(_b(",".join(param_strs)))
784             self._stream.write(_b("\n%s\n" % name))
785             encoder = chunked.Encoder(self._stream)
786             list(map(encoder.write, content.iter_bytes()))
787             encoder.close()
788
789     def done(self):
790         """Obey the testtools result.done() interface."""
791
792
793 def RemoteError(description=_u("")):
794     return (_StringException, _StringException(description), None)
795
796
797 class RemotedTestCase(unittest.TestCase):
798     """A class to represent test cases run in child processes.
799
800     Instances of this class are used to provide the Python test API a TestCase
801     that can be printed to the screen, introspected for metadata and so on.
802     However, as they are a simply a memoisation of a test that was actually
803     run in the past by a separate process, they cannot perform any interactive
804     actions.
805     """
806
807     def __eq__ (self, other):
808         try:
809             return self.__description == other.__description
810         except AttributeError:
811             return False
812
813     def __init__(self, description):
814         """Create a psuedo test case with description description."""
815         self.__description = description
816
817     def error(self, label):
818         raise NotImplementedError("%s on RemotedTestCases is not permitted." %
819             label)
820
821     def setUp(self):
822         self.error("setUp")
823
824     def tearDown(self):
825         self.error("tearDown")
826
827     def shortDescription(self):
828         return self.__description
829
830     def id(self):
831         return "%s" % (self.__description,)
832
833     def __str__(self):
834         return "%s (%s)" % (self.__description, self._strclass())
835
836     def __repr__(self):
837         return "<%s description='%s'>" % \
838                (self._strclass(), self.__description)
839
840     def run(self, result=None):
841         if result is None: result = self.defaultTestResult()
842         result.startTest(self)
843         result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
844         result.stopTest(self)
845
846     def _strclass(self):
847         cls = self.__class__
848         return "%s.%s" % (cls.__module__, cls.__name__)
849
850
851 class ExecTestCase(unittest.TestCase):
852     """A test case which runs external scripts for test fixtures."""
853
854     def __init__(self, methodName='runTest'):
855         """Create an instance of the class that will use the named test
856            method when executed. Raises a ValueError if the instance does
857            not have a method with the specified name.
858         """
859         unittest.TestCase.__init__(self, methodName)
860         testMethod = getattr(self, methodName)
861         self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
862                                testMethod.__doc__)
863
864     def countTestCases(self):
865         return 1
866
867     def run(self, result=None):
868         if result is None: result = self.defaultTestResult()
869         self._run(result)
870
871     def debug(self):
872         """Run the test without collecting errors in a TestResult"""
873         self._run(testresult.TestResult())
874
875     def _run(self, result):
876         protocol = TestProtocolServer(result)
877         process = subprocess.Popen(self.script, shell=True,
878             stdout=subprocess.PIPE)
879         _make_stream_binary(process.stdout)
880         output = process.communicate()[0]
881         protocol.readFrom(BytesIO(output))
882
883
884 class IsolatedTestCase(unittest.TestCase):
885     """A TestCase which executes in a forked process.
886
887     Each test gets its own process, which has a performance overhead but will
888     provide excellent isolation from global state (such as django configs,
889     zope utilities and so on).
890     """
891
892     def run(self, result=None):
893         if result is None: result = self.defaultTestResult()
894         run_isolated(unittest.TestCase, self, result)
895
896
897 class IsolatedTestSuite(unittest.TestSuite):
898     """A TestSuite which runs its tests in a forked process.
899
900     This decorator that will fork() before running the tests and report the
901     results from the child process using a Subunit stream.  This is useful for
902     handling tests that mutate global state, or are testing C extensions that
903     could crash the VM.
904     """
905
906     def run(self, result=None):
907         if result is None: result = testresult.TestResult()
908         run_isolated(unittest.TestSuite, self, result)
909
910
911 def run_isolated(klass, self, result):
912     """Run a test suite or case in a subprocess, using the run method on klass.
913     """
914     c2pread, c2pwrite = os.pipe()
915     # fixme - error -> result
916     # now fork
917     pid = os.fork()
918     if pid == 0:
919         # Child
920         # Close parent's pipe ends
921         os.close(c2pread)
922         # Dup fds for child
923         os.dup2(c2pwrite, 1)
924         # Close pipe fds.
925         os.close(c2pwrite)
926
927         # at this point, sys.stdin is redirected, now we want
928         # to filter it to escape ]'s.
929         ### XXX: test and write that bit.
930         stream = os.fdopen(1, 'wb')
931         result = TestProtocolClient(stream)
932         klass.run(self, result)
933         stream.flush()
934         sys.stderr.flush()
935         # exit HARD, exit NOW.
936         os._exit(0)
937     else:
938         # Parent
939         # Close child pipe ends
940         os.close(c2pwrite)
941         # hookup a protocol engine
942         protocol = TestProtocolServer(result)
943         fileobj = os.fdopen(c2pread, 'rb')
944         protocol.readFrom(fileobj)
945         os.waitpid(pid, 0)
946         # TODO return code evaluation.
947     return result
948
949
950 def TAP2SubUnit(tap, subunit):
951     """Filter a TAP pipe into a subunit pipe.
952
953     :param tap: A tap pipe/stream/file object.
954     :param subunit: A pipe/stream/file object to write subunit results to.
955     :return: The exit code to exit with.
956     """
957     BEFORE_PLAN = 0
958     AFTER_PLAN = 1
959     SKIP_STREAM = 2
960     state = BEFORE_PLAN
961     plan_start = 1
962     plan_stop = 0
963     def _skipped_test(subunit, plan_start):
964         # Some tests were skipped.
965         subunit.write('test test %d\n' % plan_start)
966         subunit.write('error test %d [\n' % plan_start)
967         subunit.write('test missing from TAP output\n')
968         subunit.write(']\n')
969         return plan_start + 1
970     # Test data for the next test to emit
971     test_name = None
972     log = []
973     result = None
974     def _emit_test():
975         "write out a test"
976         if test_name is None:
977             return
978         subunit.write("test %s\n" % test_name)
979         if not log:
980             subunit.write("%s %s\n" % (result, test_name))
981         else:
982             subunit.write("%s %s [\n" % (result, test_name))
983         if log:
984             for line in log:
985                 subunit.write("%s\n" % line)
986             subunit.write("]\n")
987         del log[:]
988     for line in tap:
989         if state == BEFORE_PLAN:
990             match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
991             if match:
992                 state = AFTER_PLAN
993                 _, plan_stop, comment = match.groups()
994                 plan_stop = int(plan_stop)
995                 if plan_start > plan_stop and plan_stop == 0:
996                     # skipped file
997                     state = SKIP_STREAM
998                     subunit.write("test file skip\n")
999                     subunit.write("skip file skip [\n")
1000                     subunit.write("%s\n" % comment)
1001                     subunit.write("]\n")
1002                 continue
1003         # not a plan line, or have seen one before
1004         match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1005         if match:
1006             # new test, emit current one.
1007             _emit_test()
1008             status, number, description, directive, directive_comment = match.groups()
1009             if status == 'ok':
1010                 result = 'success'
1011             else:
1012                 result = "failure"
1013             if description is None:
1014                 description = ''
1015             else:
1016                 description = ' ' + description
1017             if directive is not None:
1018                 if directive.upper() == 'TODO':
1019                     result = 'xfail'
1020                 elif directive.upper() == 'SKIP':
1021                     result = 'skip'
1022                 if directive_comment is not None:
1023                     log.append(directive_comment)
1024             if number is not None:
1025                 number = int(number)
1026                 while plan_start < number:
1027                     plan_start = _skipped_test(subunit, plan_start)
1028             test_name = "test %d%s" % (plan_start, description)
1029             plan_start += 1
1030             continue
1031         match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1032         if match:
1033             reason, = match.groups()
1034             if reason is None:
1035                 extra = ''
1036             else:
1037                 extra = ' %s' % reason
1038             _emit_test()
1039             test_name = "Bail out!%s" % extra
1040             result = "error"
1041             state = SKIP_STREAM
1042             continue
1043         match = re.match("\#.*\n", line)
1044         if match:
1045             log.append(line[:-1])
1046             continue
1047         subunit.write(line)
1048     _emit_test()
1049     while plan_start <= plan_stop:
1050         # record missed tests
1051         plan_start = _skipped_test(subunit, plan_start)
1052     return 0
1053
1054
1055 def tag_stream(original, filtered, tags):
1056     """Alter tags on a stream.
1057
1058     :param original: The input stream.
1059     :param filtered: The output stream.
1060     :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1061         '-TAG' commands.
1062
1063         A 'TAG' command will add the tag to the output stream,
1064         and override any existing '-TAG' command in that stream.
1065         Specifically:
1066          * A global 'tags: TAG' will be added to the start of the stream.
1067          * Any tags commands with -TAG will have the -TAG removed.
1068
1069         A '-TAG' command will remove the TAG command from the stream.
1070         Specifically:
1071          * A 'tags: -TAG' command will be added to the start of the stream.
1072          * Any 'tags: TAG' command will have 'TAG' removed from it.
1073         Additionally, any redundant tagging commands (adding a tag globally
1074         present, or removing a tag globally removed) are stripped as a
1075         by-product of the filtering.
1076     :return: 0
1077     """
1078     new_tags, gone_tags = tags_to_new_gone(tags)
1079     def write_tags(new_tags, gone_tags):
1080         if new_tags or gone_tags:
1081             filtered.write("tags: " + ' '.join(new_tags))
1082             if gone_tags:
1083                 for tag in gone_tags:
1084                     filtered.write("-" + tag)
1085             filtered.write("\n")
1086     write_tags(new_tags, gone_tags)
1087     # TODO: use the protocol parser and thus don't mangle test comments.
1088     for line in original:
1089         if line.startswith("tags:"):
1090             line_tags = line[5:].split()
1091             line_new, line_gone = tags_to_new_gone(line_tags)
1092             line_new = line_new - gone_tags
1093             line_gone = line_gone - new_tags
1094             write_tags(line_new, line_gone)
1095         else:
1096             filtered.write(line)
1097     return 0
1098
1099
1100 class ProtocolTestCase(object):
1101     """Subunit wire protocol to unittest.TestCase adapter.
1102
1103     ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1104     calling a ProtocolTestCase or invoking the run() method will make a 'test
1105     run' happen. The 'test run' will simply be a replay of the test activity
1106     that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1107     and ``countTestCases`` methods are not supported because there isn't a
1108     sensible mapping for those methods.
1109
1110     # Get a stream (any object with a readline() method), in this case the
1111     # stream output by the example from ``subunit.TestProtocolClient``.
1112     stream = file('tests.log', 'rb')
1113     # Create a parser which will read from the stream and emit
1114     # activity to a unittest.TestResult when run() is called.
1115     suite = subunit.ProtocolTestCase(stream)
1116     # Create a result object to accept the contents of that stream.
1117     result = unittest._TextTestResult(sys.stdout)
1118     # 'run' the tests - process the stream and feed its contents to result.
1119     suite.run(result)
1120     stream.close()
1121
1122     :seealso: TestProtocolServer (the subunit wire protocol parser).
1123     """
1124
1125     def __init__(self, stream, passthrough=None, forward=False):
1126         """Create a ProtocolTestCase reading from stream.
1127
1128         :param stream: A filelike object which a subunit stream can be read
1129             from.
1130         :param passthrough: A stream pass non subunit input on to. If not
1131             supplied, the TestProtocolServer default is used.
1132         :param forward: A stream to pass subunit input on to. If not supplied
1133             subunit input is not forwarded.
1134         """
1135         self._stream = stream
1136         _make_stream_binary(stream)
1137         self._passthrough = passthrough
1138         self._forward = forward
1139
1140     def __call__(self, result=None):
1141         return self.run(result)
1142
1143     def run(self, result=None):
1144         if result is None:
1145             result = self.defaultTestResult()
1146         protocol = TestProtocolServer(result, self._passthrough, self._forward)
1147         line = self._stream.readline()
1148         while line:
1149             protocol.lineReceived(line)
1150             line = self._stream.readline()
1151         protocol.lostConnection()
1152
1153
1154 class TestResultStats(testresult.TestResult):
1155     """A pyunit TestResult interface implementation for making statistics.
1156
1157     :ivar total_tests: The total tests seen.
1158     :ivar passed_tests: The tests that passed.
1159     :ivar failed_tests: The tests that failed.
1160     :ivar seen_tags: The tags seen across all tests.
1161     """
1162
1163     def __init__(self, stream):
1164         """Create a TestResultStats which outputs to stream."""
1165         testresult.TestResult.__init__(self)
1166         self._stream = stream
1167         self.failed_tests = 0
1168         self.skipped_tests = 0
1169         self.seen_tags = set()
1170
1171     @property
1172     def total_tests(self):
1173         return self.testsRun
1174
1175     def addError(self, test, err, details=None):
1176         self.failed_tests += 1
1177
1178     def addFailure(self, test, err, details=None):
1179         self.failed_tests += 1
1180
1181     def addSkip(self, test, reason, details=None):
1182         self.skipped_tests += 1
1183
1184     def formatStats(self):
1185         self._stream.write("Total tests:   %5d\n" % self.total_tests)
1186         self._stream.write("Passed tests:  %5d\n" % self.passed_tests)
1187         self._stream.write("Failed tests:  %5d\n" % self.failed_tests)
1188         self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1189         tags = sorted(self.seen_tags)
1190         self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1191
1192     @property
1193     def passed_tests(self):
1194         return self.total_tests - self.failed_tests - self.skipped_tests
1195
1196     def tags(self, new_tags, gone_tags):
1197         """Accumulate the seen tags."""
1198         self.seen_tags.update(new_tags)
1199
1200     def wasSuccessful(self):
1201         """Tells whether or not this result was a success"""
1202         return self.failed_tests == 0
1203
1204
1205 def get_default_formatter():
1206     """Obtain the default formatter to write to.
1207
1208     :return: A file-like object.
1209     """
1210     formatter = os.getenv("SUBUNIT_FORMATTER")
1211     if formatter:
1212         return os.popen(formatter, "w")
1213     else:
1214         stream = sys.stdout
1215         if sys.version_info > (3, 0):
1216             stream = stream.buffer
1217         return stream
1218
1219
1220 if sys.version_info > (3, 0):
1221     from io import UnsupportedOperation as _NoFilenoError
1222 else:
1223     _NoFilenoError = AttributeError
1224
1225 def read_test_list(path):
1226     """Read a list of test ids from a file on disk.
1227
1228     :param path: Path to the file
1229     :return: Sequence of test ids
1230     """
1231     f = open(path, 'rb')
1232     try:
1233         return [l.rstrip("\n") for l in f.readlines()]
1234     finally:
1235         f.close()
1236
1237
1238 def _make_stream_binary(stream):
1239     """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
1240     try:
1241         fileno = stream.fileno()
1242     except _NoFilenoError:
1243         return
1244     _make_binary_on_windows(fileno)
1245
1246 def _make_binary_on_windows(fileno):
1247     """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1248     if sys.platform == "win32":
1249         import msvcrt
1250         msvcrt.setmode(fileno, os.O_BINARY)