2 # subunit: extensions to Python unittest to get test results from subprocesses.
3 # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
5 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
6 # license at the users choice. A copy of both licenses are available in the
7 # project source as Apache-2.0 and BSD. You may not use this file except in
8 # compliance with one of these two licences.
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
12 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 # license you chose for the specific language governing permissions and
14 # limitations under that license.
17 """Subunit - a streaming test protocol
22 The ``subunit`` Python package provides a number of ``unittest`` extensions
23 which can be used to cause tests to output Subunit, to parse Subunit streams
24 into test activity, perform seamless test isolation within a regular test
25 case and variously sort, filter and report on test runs.
31 The ``subunit.TestProtocolClient`` class is a ``unittest.TestResult``
32 extension which will translate a test run into a Subunit stream.
34 The ``subunit.ProtocolTestCase`` class is an adapter between the Subunit wire
35 protocol and the ``unittest.TestCase`` object protocol. It is used to translate
36 a stream into a test run, which regular ``unittest.TestResult`` objects can
37 process and report/inspect.
39 Subunit has support for non-blocking usage too, for use with asyncore or
40 Twisted. See the ``TestProtocolServer`` parser class for more details.
42 Subunit includes extensions to the Python ``TestResult`` protocol. These are
43 all done in a compatible manner: ``TestResult`` objects that do not implement
44 the extension methods will not cause errors to be raised, instead the extension
45 will either lose fidelity (for instance, folding expected failures to success
46 in Python versions < 2.7 or 3.1), or discard the extended data (for extra
47 details, tags, timestamping and progress markers).
49 The test outcome methods ``addSuccess``, ``addError``, ``addExpectedFailure``,
50 ``addFailure``, ``addSkip`` take an optional keyword parameter ``details``
51 which can be used instead of the usual python unittest parameter.
52 When used the value of details should be a dict from ``string`` to
53 ``testtools.content.Content`` objects. This is a draft API being worked on with
54 the Python Testing In Python mail list, with the goal of permitting a common
55 way to provide additional data beyond a traceback, such as captured data from
56 disk, logging messages etc. The reference for this API is in testtools (0.9.0
59 The ``tags(new_tags, gone_tags)`` method is called (if present) to add or
60 remove tags in the test run that is currently executing. If called when no
61 test is in progress (that is, if called outside of the ``startTest``,
62 ``stopTest`` pair), the the tags apply to all subsequent tests. If called
63 when a test is in progress, then the tags only apply to that test.
65 The ``time(a_datetime)`` method is called (if present) when a ``time:``
66 directive is encountered in a Subunit stream. This is used to tell a TestResult
67 about the time that events in the stream occurred at, to allow reconstructing
68 test timing from a stream.
70 The ``progress(offset, whence)`` method controls progress data for a stream.
71 The offset parameter is an int, and whence is one of subunit.PROGRESS_CUR,
72 subunit.PROGRESS_SET, PROGRESS_PUSH, PROGRESS_POP. Push and pop operations
73 ignore the offset parameter.
79 ``subunit.run`` is a convenience wrapper to run a Python test suite via
80 the command line, reporting via Subunit::
82 $ python -m subunit.run mylib.tests.test_suite
84 The ``IsolatedTestSuite`` class is a TestSuite that forks before running its
85 tests, allowing isolation between the test runner and some tests.
87 Similarly, ``IsolatedTestCase`` is a base class which can be subclassed to get
88 tests that will fork() before that individual test is run.
90 `ExecTestCase`` is a convenience wrapper for running an external
91 program to get a Subunit stream and then report that back to an arbitrary
94 class AggregateTests(subunit.ExecTestCase):
96 def test_script_one(self):
99 def test_script_two(self):
102 # Normally your normal test loading would take of this automatically,
103 # It is only spelt out in detail here for clarity.
104 suite = unittest.TestSuite([AggregateTests("test_script_one"),
105 AggregateTests("test_script_two")])
106 # Create any TestResult class you like.
107 result = unittest._TextTestResult(sys.stdout)
108 # And run your suite as normal, Subunit will exec each external script as
109 # needed and report to your result object.
115 * subunit.chunked contains HTTP chunked encoding/decoding logic.
116 * subunit.test_results contains TestResult helper classes.
125 from testtools import content, content_type, ExtendedToOriginalDecorator
126 from testtools.compat import _b, _u, BytesIO, StringIO
128 from testtools.testresult.real import _StringException
129 RemoteException = _StringException
130 # For testing: different pythons have different str() implementations.
131 if sys.version_info > (3, 0):
132 _remote_exception_str = "testtools.testresult.real._StringException"
133 _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
135 _remote_exception_str = "_StringException"
136 _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
138 raise ImportError ("testtools.testresult.real does not contain "
139 "_StringException, check your version.")
140 from testtools import testresult
142 from subunit import chunked, details, iso8601, test_results
153 return subunit.tests.test_suite()
156 def join_dir(base_path, path):
158 Returns an absolute path to C{path}, calculated relative to the parent
161 @param base_path: A path to a file or directory.
162 @param path: An absolute path, or a path relative to the containing
163 directory of C{base_path}.
165 @return: An absolute path to C{path}.
167 return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
170 def tags_to_new_gone(tags):
171 """Split a list of tags into a new_set and a gone_set."""
176 gone_tags.add(tag[1:])
179 return new_tags, gone_tags
182 class DiscardStream(object):
183 """A filelike object which discards what is written to it."""
185 def write(self, bytes):
189 class _ParserState(object):
190 """State for the subunit parser."""
192 def __init__(self, parser):
194 self._test_sym = (_b('test'), _b('testing'))
195 self._colon_sym = _b(':')
196 self._error_sym = (_b('error'),)
197 self._failure_sym = (_b('failure'),)
198 self._progress_sym = (_b('progress'),)
199 self._skip_sym = _b('skip')
200 self._success_sym = (_b('success'), _b('successful'))
201 self._tags_sym = (_b('tags'),)
202 self._time_sym = (_b('time'),)
203 self._xfail_sym = (_b('xfail'),)
204 self._uxsuccess_sym = (_b('uxsuccess'),)
205 self._start_simple = _u(" [")
206 self._start_multipart = _u(" [ multipart")
208 def addError(self, offset, line):
209 """An 'error:' directive has been read."""
210 self.parser.stdOutLineReceived(line)
212 def addExpectedFail(self, offset, line):
213 """An 'xfail:' directive has been read."""
214 self.parser.stdOutLineReceived(line)
216 def addFailure(self, offset, line):
217 """A 'failure:' directive has been read."""
218 self.parser.stdOutLineReceived(line)
220 def addSkip(self, offset, line):
221 """A 'skip:' directive has been read."""
222 self.parser.stdOutLineReceived(line)
224 def addSuccess(self, offset, line):
225 """A 'success:' directive has been read."""
226 self.parser.stdOutLineReceived(line)
228 def lineReceived(self, line):
229 """a line has been received."""
230 parts = line.split(None, 1)
231 if len(parts) == 2 and line.startswith(parts[0]):
233 offset = len(cmd) + 1
234 cmd = cmd.rstrip(self._colon_sym)
235 if cmd in self._test_sym:
236 self.startTest(offset, line)
237 elif cmd in self._error_sym:
238 self.addError(offset, line)
239 elif cmd in self._failure_sym:
240 self.addFailure(offset, line)
241 elif cmd in self._progress_sym:
242 self.parser._handleProgress(offset, line)
243 elif cmd in self._skip_sym:
244 self.addSkip(offset, line)
245 elif cmd in self._success_sym:
246 self.addSuccess(offset, line)
247 elif cmd in self._tags_sym:
248 self.parser._handleTags(offset, line)
249 self.parser.subunitLineReceived(line)
250 elif cmd in self._time_sym:
251 self.parser._handleTime(offset, line)
252 self.parser.subunitLineReceived(line)
253 elif cmd in self._xfail_sym:
254 self.addExpectedFail(offset, line)
255 elif cmd in self._uxsuccess_sym:
256 self.addUnexpectedSuccess(offset, line)
258 self.parser.stdOutLineReceived(line)
260 self.parser.stdOutLineReceived(line)
262 def lostConnection(self):
263 """Connection lost."""
264 self.parser._lostConnectionInTest(_u('unknown state of '))
266 def startTest(self, offset, line):
267 """A test start command received."""
268 self.parser.stdOutLineReceived(line)
271 class _InTest(_ParserState):
272 """State for the subunit parser after reading a test: directive."""
274 def _outcome(self, offset, line, no_details, details_state):
275 """An outcome directive has been read.
277 :param no_details: Callable to call when no details are presented.
278 :param details_state: The state to switch to for details
279 processing of this outcome.
281 test_name = line[offset:-1].decode('utf8')
282 if self.parser.current_test_description == test_name:
283 self.parser._state = self.parser._outside_test
284 self.parser.current_test_description = None
286 self.parser.client.stopTest(self.parser._current_test)
287 self.parser._current_test = None
288 self.parser.subunitLineReceived(line)
289 elif self.parser.current_test_description + self._start_simple == \
291 self.parser._state = details_state
292 details_state.set_simple()
293 self.parser.subunitLineReceived(line)
294 elif self.parser.current_test_description + self._start_multipart == \
296 self.parser._state = details_state
297 details_state.set_multipart()
298 self.parser.subunitLineReceived(line)
300 self.parser.stdOutLineReceived(line)
303 self.parser.client.addError(self.parser._current_test,
306 def addError(self, offset, line):
307 """An 'error:' directive has been read."""
308 self._outcome(offset, line, self._error,
309 self.parser._reading_error_details)
312 self.parser.client.addExpectedFailure(self.parser._current_test,
315 def addExpectedFail(self, offset, line):
316 """An 'xfail:' directive has been read."""
317 self._outcome(offset, line, self._xfail,
318 self.parser._reading_xfail_details)
320 def _uxsuccess(self):
321 self.parser.client.addUnexpectedSuccess(self.parser._current_test)
323 def addUnexpectedSuccess(self, offset, line):
324 """A 'uxsuccess:' directive has been read."""
325 self._outcome(offset, line, self._uxsuccess,
326 self.parser._reading_uxsuccess_details)
329 self.parser.client.addFailure(self.parser._current_test, details={})
331 def addFailure(self, offset, line):
332 """A 'failure:' directive has been read."""
333 self._outcome(offset, line, self._failure,
334 self.parser._reading_failure_details)
337 self.parser.client.addSkip(self.parser._current_test, details={})
339 def addSkip(self, offset, line):
340 """A 'skip:' directive has been read."""
341 self._outcome(offset, line, self._skip,
342 self.parser._reading_skip_details)
345 self.parser.client.addSuccess(self.parser._current_test, details={})
347 def addSuccess(self, offset, line):
348 """A 'success:' directive has been read."""
349 self._outcome(offset, line, self._succeed,
350 self.parser._reading_success_details)
352 def lostConnection(self):
353 """Connection lost."""
354 self.parser._lostConnectionInTest(_u(''))
357 class _OutSideTest(_ParserState):
358 """State for the subunit parser outside of a test context."""
360 def lostConnection(self):
361 """Connection lost."""
363 def startTest(self, offset, line):
364 """A test start command received."""
365 self.parser._state = self.parser._in_test
366 test_name = line[offset:-1].decode('utf8')
367 self.parser._current_test = RemotedTestCase(test_name)
368 self.parser.current_test_description = test_name
369 self.parser.client.startTest(self.parser._current_test)
370 self.parser.subunitLineReceived(line)
373 class _ReadingDetails(_ParserState):
374 """Common logic for readin state details."""
376 def endDetails(self):
377 """The end of a details section has been reached."""
378 self.parser._state = self.parser._outside_test
379 self.parser.current_test_description = None
380 self._report_outcome()
381 self.parser.client.stopTest(self.parser._current_test)
383 def lineReceived(self, line):
384 """a line has been received."""
385 self.details_parser.lineReceived(line)
386 self.parser.subunitLineReceived(line)
388 def lostConnection(self):
389 """Connection lost."""
390 self.parser._lostConnectionInTest(_u('%s report of ') %
391 self._outcome_label())
393 def _outcome_label(self):
394 """The label to describe this outcome."""
395 raise NotImplementedError(self._outcome_label)
397 def set_simple(self):
398 """Start a simple details parser."""
399 self.details_parser = details.SimpleDetailsParser(self)
401 def set_multipart(self):
402 """Start a multipart details parser."""
403 self.details_parser = details.MultipartDetailsParser(self)
406 class _ReadingFailureDetails(_ReadingDetails):
407 """State for the subunit parser when reading failure details."""
409 def _report_outcome(self):
410 self.parser.client.addFailure(self.parser._current_test,
411 details=self.details_parser.get_details())
413 def _outcome_label(self):
417 class _ReadingErrorDetails(_ReadingDetails):
418 """State for the subunit parser when reading error details."""
420 def _report_outcome(self):
421 self.parser.client.addError(self.parser._current_test,
422 details=self.details_parser.get_details())
424 def _outcome_label(self):
428 class _ReadingExpectedFailureDetails(_ReadingDetails):
429 """State for the subunit parser when reading xfail details."""
431 def _report_outcome(self):
432 self.parser.client.addExpectedFailure(self.parser._current_test,
433 details=self.details_parser.get_details())
435 def _outcome_label(self):
439 class _ReadingUnexpectedSuccessDetails(_ReadingDetails):
440 """State for the subunit parser when reading uxsuccess details."""
442 def _report_outcome(self):
443 self.parser.client.addUnexpectedSuccess(self.parser._current_test,
444 details=self.details_parser.get_details())
446 def _outcome_label(self):
450 class _ReadingSkipDetails(_ReadingDetails):
451 """State for the subunit parser when reading skip details."""
453 def _report_outcome(self):
454 self.parser.client.addSkip(self.parser._current_test,
455 details=self.details_parser.get_details("skip"))
457 def _outcome_label(self):
461 class _ReadingSuccessDetails(_ReadingDetails):
462 """State for the subunit parser when reading success details."""
464 def _report_outcome(self):
465 self.parser.client.addSuccess(self.parser._current_test,
466 details=self.details_parser.get_details("success"))
468 def _outcome_label(self):
472 class TestProtocolServer(object):
473 """A parser for subunit.
475 :ivar tags: The current tags associated with the protocol stream.
478 def __init__(self, client, stream=None, forward_stream=None):
479 """Create a TestProtocolServer instance.
481 :param client: An object meeting the unittest.TestResult protocol.
482 :param stream: The stream that lines received which are not part of the
483 subunit protocol should be written to. This allows custom handling
484 of mixed protocols. By default, sys.stdout will be used for
485 convenience. It should accept bytes to its write() method.
486 :param forward_stream: A stream to forward subunit lines to. This
487 allows a filter to forward the entire stream while still parsing
488 and acting on it. By default forward_stream is set to
489 DiscardStream() and no forwarding happens.
491 self.client = ExtendedToOriginalDecorator(client)
494 if sys.version_info > (3, 0):
495 stream = stream.buffer
496 self._stream = stream
497 self._forward_stream = forward_stream or DiscardStream()
498 # state objects we can switch too
499 self._in_test = _InTest(self)
500 self._outside_test = _OutSideTest(self)
501 self._reading_error_details = _ReadingErrorDetails(self)
502 self._reading_failure_details = _ReadingFailureDetails(self)
503 self._reading_skip_details = _ReadingSkipDetails(self)
504 self._reading_success_details = _ReadingSuccessDetails(self)
505 self._reading_xfail_details = _ReadingExpectedFailureDetails(self)
506 self._reading_uxsuccess_details = _ReadingUnexpectedSuccessDetails(self)
507 # start with outside test.
508 self._state = self._outside_test
509 # Avoid casts on every call
510 self._plusminus = _b('+-')
511 self._push_sym = _b('push')
512 self._pop_sym = _b('pop')
514 def _handleProgress(self, offset, line):
515 """Process a progress directive."""
516 line = line[offset:].strip()
517 if line[0] in self._plusminus:
518 whence = PROGRESS_CUR
520 elif line == self._push_sym:
521 whence = PROGRESS_PUSH
523 elif line == self._pop_sym:
524 whence = PROGRESS_POP
527 whence = PROGRESS_SET
529 self.client.progress(delta, whence)
531 def _handleTags(self, offset, line):
532 """Process a tags command."""
533 tags = line[offset:].decode('utf8').split()
534 new_tags, gone_tags = tags_to_new_gone(tags)
535 self.client.tags(new_tags, gone_tags)
537 def _handleTime(self, offset, line):
538 # Accept it, but do not do anything with it yet.
540 event_time = iso8601.parse_date(line[offset:-1])
542 raise TypeError(_u("Failed to parse %r, got %r")
543 % (line, sys.exec_info[1]))
544 self.client.time(event_time)
546 def lineReceived(self, line):
547 """Call the appropriate local method for the received line."""
548 self._state.lineReceived(line)
550 def _lostConnectionInTest(self, state_string):
551 error_string = _u("lost connection during %stest '%s'") % (
552 state_string, self.current_test_description)
553 self.client.addError(self._current_test, RemoteError(error_string))
554 self.client.stopTest(self._current_test)
556 def lostConnection(self):
557 """The input connection has finished."""
558 self._state.lostConnection()
560 def readFrom(self, pipe):
561 """Blocking convenience API to parse an entire stream.
563 :param pipe: A file-like object supporting readlines().
566 for line in pipe.readlines():
567 self.lineReceived(line)
568 self.lostConnection()
570 def _startTest(self, offset, line):
571 """Internal call to change state machine. Override startTest()."""
572 self._state.startTest(offset, line)
574 def subunitLineReceived(self, line):
575 self._forward_stream.write(line)
577 def stdOutLineReceived(self, line):
578 self._stream.write(line)
581 class TestProtocolClient(testresult.TestResult):
582 """A TestResult which generates a subunit stream for a test run.
584 # Get a TestSuite or TestCase to run
586 # Create a stream (any object with a 'write' method). This should accept
587 # bytes not strings: subunit is a byte orientated protocol.
588 stream = file('tests.log', 'wb')
589 # Create a subunit result object which will output to the stream
590 result = subunit.TestProtocolClient(stream)
591 # Optionally, to get timing data for performance analysis, wrap the
592 # serialiser with a timing decorator
593 result = subunit.test_results.AutoTimingTestResultDecorator(result)
594 # Run the test suite reporting to the subunit result object
600 def __init__(self, stream):
601 testresult.TestResult.__init__(self)
602 self._stream = stream
603 _make_stream_binary(stream)
604 self._progress_fmt = _b("progress: ")
605 self._bytes_eol = _b("\n")
606 self._progress_plus = _b("+")
607 self._progress_push = _b("push")
608 self._progress_pop = _b("pop")
609 self._empty_bytes = _b("")
610 self._start_simple = _b(" [\n")
611 self._end_simple = _b("]\n")
613 def addError(self, test, error=None, details=None):
614 """Report an error in test test.
616 Only one of error and details should be provided: conceptually there
617 are two separate methods:
618 addError(self, test, error)
619 addError(self, test, details)
621 :param error: Standard unittest positional argument form - an
623 :param details: New Testing-in-python drafted API; a dict from string
624 to subunit.Content objects.
626 self._addOutcome("error", test, error=error, details=details)
628 def addExpectedFailure(self, test, error=None, details=None):
629 """Report an expected failure in test test.
631 Only one of error and details should be provided: conceptually there
632 are two separate methods:
633 addError(self, test, error)
634 addError(self, test, details)
636 :param error: Standard unittest positional argument form - an
638 :param details: New Testing-in-python drafted API; a dict from string
639 to subunit.Content objects.
641 self._addOutcome("xfail", test, error=error, details=details)
643 def addFailure(self, test, error=None, details=None):
644 """Report a failure in test test.
646 Only one of error and details should be provided: conceptually there
647 are two separate methods:
648 addFailure(self, test, error)
649 addFailure(self, test, details)
651 :param error: Standard unittest positional argument form - an
653 :param details: New Testing-in-python drafted API; a dict from string
654 to subunit.Content objects.
656 self._addOutcome("failure", test, error=error, details=details)
658 def _addOutcome(self, outcome, test, error=None, details=None,
659 error_permitted=True):
660 """Report a failure in test test.
662 Only one of error and details should be provided: conceptually there
663 are two separate methods:
664 addOutcome(self, test, error)
665 addOutcome(self, test, details)
667 :param outcome: A string describing the outcome - used as the
668 event name in the subunit stream.
669 :param error: Standard unittest positional argument form - an
671 :param details: New Testing-in-python drafted API; a dict from string
672 to subunit.Content objects.
673 :param error_permitted: If True then one and only one of error or
674 details must be supplied. If False then error must not be supplied
675 and details is still optional. """
676 self._stream.write(_b("%s: %s" % (outcome, test.id())))
678 if error is None and details is None:
681 if error is not None:
683 if error is not None:
684 self._stream.write(self._start_simple)
685 # XXX: this needs to be made much stricter, along the lines of
686 # Martin[gz]'s work in testtools. Perhaps subunit can use that?
687 for line in self._exc_info_to_unicode(error, test).splitlines():
688 self._stream.write(("%s\n" % line).encode('utf8'))
689 elif details is not None:
690 self._write_details(details)
692 self._stream.write(_b("\n"))
693 if details is not None or error is not None:
694 self._stream.write(self._end_simple)
696 def addSkip(self, test, reason=None, details=None):
697 """Report a skipped test."""
699 self._addOutcome("skip", test, error=None, details=details)
701 self._stream.write(_b("skip: %s [\n" % test.id()))
702 self._stream.write(_b("%s\n" % reason))
703 self._stream.write(self._end_simple)
705 def addSuccess(self, test, details=None):
706 """Report a success in a test."""
707 self._addOutcome("successful", test, details=details, error_permitted=False)
709 def addUnexpectedSuccess(self, test, details=None):
710 """Report an unexpected success in test test.
712 Details can optionally be provided: conceptually there
713 are two separate methods:
715 addError(self, test, details)
717 :param details: New Testing-in-python drafted API; a dict from string
718 to subunit.Content objects.
720 self._addOutcome("uxsuccess", test, details=details,
721 error_permitted=False)
723 def startTest(self, test):
724 """Mark a test as starting its test run."""
725 super(TestProtocolClient, self).startTest(test)
726 self._stream.write(_b("test: %s\n" % test.id()))
729 def stopTest(self, test):
730 super(TestProtocolClient, self).stopTest(test)
733 def progress(self, offset, whence):
734 """Provide indication about the progress/length of the test run.
736 :param offset: Information about the number of tests remaining. If
737 whence is PROGRESS_CUR, then offset increases/decreases the
738 remaining test count. If whence is PROGRESS_SET, then offset
739 specifies exactly the remaining test count.
740 :param whence: One of PROGRESS_CUR, PROGRESS_SET, PROGRESS_PUSH,
743 if whence == PROGRESS_CUR and offset > -1:
744 prefix = self._progress_plus
745 offset = _b(str(offset))
746 elif whence == PROGRESS_PUSH:
747 prefix = self._empty_bytes
748 offset = self._progress_push
749 elif whence == PROGRESS_POP:
750 prefix = self._empty_bytes
751 offset = self._progress_pop
753 prefix = self._empty_bytes
754 offset = _b(str(offset))
755 self._stream.write(self._progress_fmt + prefix + offset +
758 def time(self, a_datetime):
759 """Inform the client of the time.
761 ":param datetime: A datetime.datetime object.
763 time = a_datetime.astimezone(iso8601.Utc())
764 self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
765 time.year, time.month, time.day, time.hour, time.minute,
766 time.second, time.microsecond)))
768 def _write_details(self, details):
769 """Output details to the stream.
771 :param details: An extended details dict for a test outcome.
773 self._stream.write(_b(" [ multipart\n"))
774 for name, content in sorted(details.items()):
775 self._stream.write(_b("Content-Type: %s/%s" %
776 (content.content_type.type, content.content_type.subtype)))
777 parameters = content.content_type.parameters
779 self._stream.write(_b(";"))
781 for param, value in parameters.items():
782 param_strs.append("%s=%s" % (param, value))
783 self._stream.write(_b(",".join(param_strs)))
784 self._stream.write(_b("\n%s\n" % name))
785 encoder = chunked.Encoder(self._stream)
786 list(map(encoder.write, content.iter_bytes()))
790 """Obey the testtools result.done() interface."""
793 def RemoteError(description=_u("")):
794 return (_StringException, _StringException(description), None)
797 class RemotedTestCase(unittest.TestCase):
798 """A class to represent test cases run in child processes.
800 Instances of this class are used to provide the Python test API a TestCase
801 that can be printed to the screen, introspected for metadata and so on.
802 However, as they are a simply a memoisation of a test that was actually
803 run in the past by a separate process, they cannot perform any interactive
807 def __eq__ (self, other):
809 return self.__description == other.__description
810 except AttributeError:
813 def __init__(self, description):
814 """Create a psuedo test case with description description."""
815 self.__description = description
817 def error(self, label):
818 raise NotImplementedError("%s on RemotedTestCases is not permitted." %
825 self.error("tearDown")
827 def shortDescription(self):
828 return self.__description
831 return "%s" % (self.__description,)
834 return "%s (%s)" % (self.__description, self._strclass())
837 return "<%s description='%s'>" % \
838 (self._strclass(), self.__description)
840 def run(self, result=None):
841 if result is None: result = self.defaultTestResult()
842 result.startTest(self)
843 result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n")))
844 result.stopTest(self)
848 return "%s.%s" % (cls.__module__, cls.__name__)
851 class ExecTestCase(unittest.TestCase):
852 """A test case which runs external scripts for test fixtures."""
854 def __init__(self, methodName='runTest'):
855 """Create an instance of the class that will use the named test
856 method when executed. Raises a ValueError if the instance does
857 not have a method with the specified name.
859 unittest.TestCase.__init__(self, methodName)
860 testMethod = getattr(self, methodName)
861 self.script = join_dir(sys.modules[self.__class__.__module__].__file__,
864 def countTestCases(self):
867 def run(self, result=None):
868 if result is None: result = self.defaultTestResult()
872 """Run the test without collecting errors in a TestResult"""
873 self._run(testresult.TestResult())
875 def _run(self, result):
876 protocol = TestProtocolServer(result)
877 process = subprocess.Popen(self.script, shell=True,
878 stdout=subprocess.PIPE)
879 _make_stream_binary(process.stdout)
880 output = process.communicate()[0]
881 protocol.readFrom(BytesIO(output))
884 class IsolatedTestCase(unittest.TestCase):
885 """A TestCase which executes in a forked process.
887 Each test gets its own process, which has a performance overhead but will
888 provide excellent isolation from global state (such as django configs,
889 zope utilities and so on).
892 def run(self, result=None):
893 if result is None: result = self.defaultTestResult()
894 run_isolated(unittest.TestCase, self, result)
897 class IsolatedTestSuite(unittest.TestSuite):
898 """A TestSuite which runs its tests in a forked process.
900 This decorator that will fork() before running the tests and report the
901 results from the child process using a Subunit stream. This is useful for
902 handling tests that mutate global state, or are testing C extensions that
906 def run(self, result=None):
907 if result is None: result = testresult.TestResult()
908 run_isolated(unittest.TestSuite, self, result)
911 def run_isolated(klass, self, result):
912 """Run a test suite or case in a subprocess, using the run method on klass.
914 c2pread, c2pwrite = os.pipe()
915 # fixme - error -> result
920 # Close parent's pipe ends
927 # at this point, sys.stdin is redirected, now we want
928 # to filter it to escape ]'s.
929 ### XXX: test and write that bit.
930 stream = os.fdopen(1, 'wb')
931 result = TestProtocolClient(stream)
932 klass.run(self, result)
935 # exit HARD, exit NOW.
939 # Close child pipe ends
941 # hookup a protocol engine
942 protocol = TestProtocolServer(result)
943 fileobj = os.fdopen(c2pread, 'rb')
944 protocol.readFrom(fileobj)
946 # TODO return code evaluation.
950 def TAP2SubUnit(tap, subunit):
951 """Filter a TAP pipe into a subunit pipe.
953 :param tap: A tap pipe/stream/file object.
954 :param subunit: A pipe/stream/file object to write subunit results to.
955 :return: The exit code to exit with.
963 def _skipped_test(subunit, plan_start):
964 # Some tests were skipped.
965 subunit.write('test test %d\n' % plan_start)
966 subunit.write('error test %d [\n' % plan_start)
967 subunit.write('test missing from TAP output\n')
969 return plan_start + 1
970 # Test data for the next test to emit
976 if test_name is None:
978 subunit.write("test %s\n" % test_name)
980 subunit.write("%s %s\n" % (result, test_name))
982 subunit.write("%s %s [\n" % (result, test_name))
985 subunit.write("%s\n" % line)
989 if state == BEFORE_PLAN:
990 match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
993 _, plan_stop, comment = match.groups()
994 plan_stop = int(plan_stop)
995 if plan_start > plan_stop and plan_stop == 0:
998 subunit.write("test file skip\n")
999 subunit.write("skip file skip [\n")
1000 subunit.write("%s\n" % comment)
1001 subunit.write("]\n")
1003 # not a plan line, or have seen one before
1004 match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP|skip|todo)(?:\s+(.*))?)?\n", line)
1006 # new test, emit current one.
1008 status, number, description, directive, directive_comment = match.groups()
1013 if description is None:
1016 description = ' ' + description
1017 if directive is not None:
1018 if directive.upper() == 'TODO':
1020 elif directive.upper() == 'SKIP':
1022 if directive_comment is not None:
1023 log.append(directive_comment)
1024 if number is not None:
1025 number = int(number)
1026 while plan_start < number:
1027 plan_start = _skipped_test(subunit, plan_start)
1028 test_name = "test %d%s" % (plan_start, description)
1031 match = re.match("Bail out\!(?:\s*(.*))?\n", line)
1033 reason, = match.groups()
1037 extra = ' %s' % reason
1039 test_name = "Bail out!%s" % extra
1043 match = re.match("\#.*\n", line)
1045 log.append(line[:-1])
1049 while plan_start <= plan_stop:
1050 # record missed tests
1051 plan_start = _skipped_test(subunit, plan_start)
1055 def tag_stream(original, filtered, tags):
1056 """Alter tags on a stream.
1058 :param original: The input stream.
1059 :param filtered: The output stream.
1060 :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
1063 A 'TAG' command will add the tag to the output stream,
1064 and override any existing '-TAG' command in that stream.
1066 * A global 'tags: TAG' will be added to the start of the stream.
1067 * Any tags commands with -TAG will have the -TAG removed.
1069 A '-TAG' command will remove the TAG command from the stream.
1071 * A 'tags: -TAG' command will be added to the start of the stream.
1072 * Any 'tags: TAG' command will have 'TAG' removed from it.
1073 Additionally, any redundant tagging commands (adding a tag globally
1074 present, or removing a tag globally removed) are stripped as a
1075 by-product of the filtering.
1078 new_tags, gone_tags = tags_to_new_gone(tags)
1079 def write_tags(new_tags, gone_tags):
1080 if new_tags or gone_tags:
1081 filtered.write("tags: " + ' '.join(new_tags))
1083 for tag in gone_tags:
1084 filtered.write("-" + tag)
1085 filtered.write("\n")
1086 write_tags(new_tags, gone_tags)
1087 # TODO: use the protocol parser and thus don't mangle test comments.
1088 for line in original:
1089 if line.startswith("tags:"):
1090 line_tags = line[5:].split()
1091 line_new, line_gone = tags_to_new_gone(line_tags)
1092 line_new = line_new - gone_tags
1093 line_gone = line_gone - new_tags
1094 write_tags(line_new, line_gone)
1096 filtered.write(line)
1100 class ProtocolTestCase(object):
1101 """Subunit wire protocol to unittest.TestCase adapter.
1103 ProtocolTestCase honours the core of ``unittest.TestCase`` protocol -
1104 calling a ProtocolTestCase or invoking the run() method will make a 'test
1105 run' happen. The 'test run' will simply be a replay of the test activity
1106 that has been encoded into the stream. The ``unittest.TestCase`` ``debug``
1107 and ``countTestCases`` methods are not supported because there isn't a
1108 sensible mapping for those methods.
1110 # Get a stream (any object with a readline() method), in this case the
1111 # stream output by the example from ``subunit.TestProtocolClient``.
1112 stream = file('tests.log', 'rb')
1113 # Create a parser which will read from the stream and emit
1114 # activity to a unittest.TestResult when run() is called.
1115 suite = subunit.ProtocolTestCase(stream)
1116 # Create a result object to accept the contents of that stream.
1117 result = unittest._TextTestResult(sys.stdout)
1118 # 'run' the tests - process the stream and feed its contents to result.
1122 :seealso: TestProtocolServer (the subunit wire protocol parser).
1125 def __init__(self, stream, passthrough=None, forward=False):
1126 """Create a ProtocolTestCase reading from stream.
1128 :param stream: A filelike object which a subunit stream can be read
1130 :param passthrough: A stream pass non subunit input on to. If not
1131 supplied, the TestProtocolServer default is used.
1132 :param forward: A stream to pass subunit input on to. If not supplied
1133 subunit input is not forwarded.
1135 self._stream = stream
1136 _make_stream_binary(stream)
1137 self._passthrough = passthrough
1138 self._forward = forward
1140 def __call__(self, result=None):
1141 return self.run(result)
1143 def run(self, result=None):
1145 result = self.defaultTestResult()
1146 protocol = TestProtocolServer(result, self._passthrough, self._forward)
1147 line = self._stream.readline()
1149 protocol.lineReceived(line)
1150 line = self._stream.readline()
1151 protocol.lostConnection()
1154 class TestResultStats(testresult.TestResult):
1155 """A pyunit TestResult interface implementation for making statistics.
1157 :ivar total_tests: The total tests seen.
1158 :ivar passed_tests: The tests that passed.
1159 :ivar failed_tests: The tests that failed.
1160 :ivar seen_tags: The tags seen across all tests.
1163 def __init__(self, stream):
1164 """Create a TestResultStats which outputs to stream."""
1165 testresult.TestResult.__init__(self)
1166 self._stream = stream
1167 self.failed_tests = 0
1168 self.skipped_tests = 0
1169 self.seen_tags = set()
1172 def total_tests(self):
1173 return self.testsRun
1175 def addError(self, test, err, details=None):
1176 self.failed_tests += 1
1178 def addFailure(self, test, err, details=None):
1179 self.failed_tests += 1
1181 def addSkip(self, test, reason, details=None):
1182 self.skipped_tests += 1
1184 def formatStats(self):
1185 self._stream.write("Total tests: %5d\n" % self.total_tests)
1186 self._stream.write("Passed tests: %5d\n" % self.passed_tests)
1187 self._stream.write("Failed tests: %5d\n" % self.failed_tests)
1188 self._stream.write("Skipped tests: %5d\n" % self.skipped_tests)
1189 tags = sorted(self.seen_tags)
1190 self._stream.write("Seen tags: %s\n" % (", ".join(tags)))
1193 def passed_tests(self):
1194 return self.total_tests - self.failed_tests - self.skipped_tests
1196 def tags(self, new_tags, gone_tags):
1197 """Accumulate the seen tags."""
1198 self.seen_tags.update(new_tags)
1200 def wasSuccessful(self):
1201 """Tells whether or not this result was a success"""
1202 return self.failed_tests == 0
1205 def get_default_formatter():
1206 """Obtain the default formatter to write to.
1208 :return: A file-like object.
1210 formatter = os.getenv("SUBUNIT_FORMATTER")
1212 return os.popen(formatter, "w")
1215 if sys.version_info > (3, 0):
1216 stream = stream.buffer
1220 if sys.version_info > (3, 0):
1221 from io import UnsupportedOperation as _NoFilenoError
1223 _NoFilenoError = AttributeError
1225 def read_test_list(path):
1226 """Read a list of test ids from a file on disk.
1228 :param path: Path to the file
1229 :return: Sequence of test ids
1231 f = open(path, 'rb')
1233 return [l.rstrip("\n") for l in f.readlines()]
1238 def _make_stream_binary(stream):
1239 """Ensure that a stream will be binary safe. See _make_binary_on_windows."""
1241 fileno = stream.fileno()
1242 except _NoFilenoError:
1244 _make_binary_on_windows(fileno)
1246 def _make_binary_on_windows(fileno):
1247 """Win32 mangles \r\n to \n and that breaks streams. See bug lp:505078."""
1248 if sys.platform == "win32":
1250 msvcrt.setmode(fileno, os.O_BINARY)