3 # Simple subunit testrunner for python
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 # Cobbled together from testtools and subunit:
7 # Copyright (C) 2005-2011 Robert Collins <robertc@robertcollins.net>
8 # Copyright (c) 2008-2011 testtools developers.
10 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
11 # license at the users choice. A copy of both licenses are available in the
12 # project source as Apache-2.0 and BSD. You may not use this file except in
13 # compliance with one of these two licences.
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # license you chose for the specific language governing permissions and
19 # limitations under that license.
22 """Run a unittest testcase reporting results as Subunit.
24 $ python -m samba.subunit.run mylib.tests.test_suite
34 # From http://docs.python.org/library/datetime.html
35 _ZERO = datetime.timedelta(0)
39 class UTC(datetime.tzinfo):
42 def utcoffset(self, dt):
53 # Whether or not to hide layers of the stack trace that are
54 # unittest/testtools internal code. Defaults to True since the
55 # system-under-test is rarely unittest or testtools.
56 HIDE_INTERNAL_STACK = True
59 def write_traceback(stream, err, test):
60 """Converts a sys.exc_info()-style tuple of values into a string.
62 Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
64 def _is_relevant_tb_level(tb):
65 return '__unittest' in tb.tb_frame.f_globals
67 def _count_relevant_tb_levels(tb):
69 while tb and not _is_relevant_tb_level(tb):
74 exctype, value, tb = err
75 # Skip test runner traceback levels
76 if HIDE_INTERNAL_STACK:
77 while tb and _is_relevant_tb_level(tb):
80 format_exception = traceback.format_exception
82 if (HIDE_INTERNAL_STACK and test.failureException
83 and isinstance(value, test.failureException)):
84 # Skip assert*() traceback levels
85 length = _count_relevant_tb_levels(tb)
86 msgLines = format_exception(exctype, value, tb, length)
88 msgLines = format_exception(exctype, value, tb)
89 stream.writelines(msgLines)
92 class TestProtocolClient(unittest.TestResult):
93 """A TestResult which generates a subunit stream for a test run.
95 # Get a TestSuite or TestCase to run
97 # Create a stream (any object with a 'write' method). This should accept
98 # bytes not strings: subunit is a byte orientated protocol.
99 stream = file('tests.log', 'wb')
100 # Create a subunit result object which will output to the stream
101 result = subunit.TestProtocolClient(stream)
102 # Optionally, to get timing data for performance analysis, wrap the
103 # serialiser with a timing decorator
104 result = subunit.test_results.AutoTimingTestResultDecorator(result)
105 # Run the test suite reporting to the subunit result object
111 def __init__(self, stream):
112 unittest.TestResult.__init__(self)
113 self._stream = stream
115 def addError(self, test, error=None):
116 """Report an error in test test.
118 :param error: Standard unittest positional argument form - an
121 self._addOutcome("error", test, error=error)
125 def addExpectedFailure(self, test, error=None):
126 """Report an expected failure in test test.
128 :param error: Standard unittest positional argument form - an
131 self._addOutcome("xfail", test, error=error)
133 def addFailure(self, test, error=None):
134 """Report a failure in test test.
136 :param error: Standard unittest positional argument form - an
139 self._addOutcome("failure", test, error=error)
143 def _addOutcome(self, outcome, test, error=None, error_permitted=True):
144 """Report a failure in test test.
146 :param outcome: A string describing the outcome - used as the
147 event name in the subunit stream.
148 :param error: Standard unittest positional argument form - an
150 :param error_permitted: If True then error must be supplied.
151 If False then error must not be supplied.
153 self._stream.write(("%s: " % outcome) + self._test_id(test))
158 if error is not None:
160 if error is not None:
161 self._stream.write(" [\n")
162 write_traceback(self._stream, error, test)
164 self._stream.write("\n")
165 if error is not None:
166 self._stream.write("]\n")
168 def addSkip(self, test, reason=None):
169 """Report a skipped test."""
171 self._addOutcome("skip", test, error=None)
173 self._stream.write("skip: %s [\n" % test.id())
174 self._stream.write("%s\n" % reason)
175 self._stream.write("]\n")
177 def addSuccess(self, test):
178 """Report a success in a test."""
179 self._addOutcome("successful", test, error_permitted=False)
181 def addUnexpectedSuccess(self, test):
182 """Report an unexpected success in test test.
184 self._addOutcome("uxsuccess", test, error_permitted=False)
188 def _test_id(self, test):
190 if type(result) is not bytes:
191 result = result.encode('utf8')
194 def startTest(self, test):
195 """Mark a test as starting its test run."""
196 super(TestProtocolClient, self).startTest(test)
197 self._stream.write("test: " + self._test_id(test) + "\n")
200 def stopTest(self, test):
201 super(TestProtocolClient, self).stopTest(test)
204 def time(self, a_datetime):
205 """Inform the client of the time.
207 ":param datetime: A datetime.datetime object.
209 time = a_datetime.astimezone(UTC())
210 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
211 time.year, time.month, time.day, time.hour, time.minute,
212 time.second, time.microsecond))
215 def _flatten_tests(suite_or_case, unpack_outer=False):
217 tests = iter(suite_or_case)
219 # Not iterable, assume it's a test case.
220 return [(suite_or_case.id(), suite_or_case)]
221 if (type(suite_or_case) in (unittest.TestSuite,) or
223 # Plain old test suite (or any others we may add).
226 # Recurse to flatten.
227 result.extend(_flatten_tests(test))
230 # Find any old actual test and grab its id.
232 tests = iterate_tests(suite_or_case)
236 # If it has a sort_tests method, call that.
237 if getattr(suite_or_case, 'sort_tests', None) is not None:
238 suite_or_case.sort_tests()
239 return [(suite_id, suite_or_case)]
242 def sorted_tests(suite_or_case, unpack_outer=False):
243 """Sort suite_or_case while preserving non-vanilla TestSuites."""
244 tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
246 return unittest.TestSuite([test for (sort_key, test) in tests])
249 def iterate_tests(test_suite_or_case):
250 """Iterate through all of the test cases in 'test_suite_or_case'."""
252 suite = iter(test_suite_or_case)
254 yield test_suite_or_case
257 for subtest in iterate_tests(test):
261 defaultTestLoader = unittest.defaultTestLoader
262 defaultTestLoaderCls = unittest.TestLoader
264 if getattr(defaultTestLoader, 'discover', None) is None:
267 defaultTestLoader = discover.DiscoveringTestLoader()
268 defaultTestLoaderCls = discover.DiscoveringTestLoader
271 have_discover = False
277 # Taken from python 2.7 and slightly modified for compatibility with
278 # older versions. Delete when 2.7 is the oldest supported version.
280 # - Use have_discover to raise an error if the user tries to use
281 # discovery on an old version and doesn't have discover installed.
282 # - If --catch is given check that installHandler is available, as
283 # it won't be on old python versions.
284 # - print calls have been been made single-source python3 compatibile.
285 # - exception handling likewise.
286 # - The default help has been changed to USAGE_AS_MAIN and USAGE_FROM_MODULE
288 # - A tweak has been added to detect 'python -m *.run' and use a
289 # better progName in that case.
290 # - self.module is more comprehensively set to None when being invoked from
291 # the commandline - __name__ is used as a sentinel value.
292 # - --list has been added which can list tests (should be upstreamed).
293 # - --load-list has been added which can reduce the tests used (should be
295 # - The limitation of using getopt is declared to the user.
296 # - http://bugs.python.org/issue16709 is worked around, by sorting tests when
299 FAILFAST = " -f, --failfast Stop on first failure\n"
300 CATCHBREAK = " -c, --catch Catch control-C and display results\n"
301 BUFFEROUTPUT = " -b, --buffer Buffer stdout and stderr during test runs\n"
304 Usage: %(progName)s [options] [tests]
307 -h, --help Show this message
308 -v, --verbose Verbose output
309 -q, --quiet Minimal output
310 -l, --list List tests rather than executing them.
311 --load-list Specifies a file containing test ids, only tests matching
312 those ids are executed.
313 %(failfast)s%(catchbreak)s%(buffer)s
315 %(progName)s test_module - run tests from test_module
316 %(progName)s module.TestClass - run tests from module.TestClass
317 %(progName)s module.Class.test_method - run specified test method
319 All options must come before [tests]. [tests] can be a list of any number of
320 test modules, classes and test methods.
322 Alternative Usage: %(progName)s discover [options]
325 -v, --verbose Verbose output
326 %(failfast)s%(catchbreak)s%(buffer)s -s directory Directory to start discovery ('.' default)
327 -p pattern Pattern to match test files ('test*.py' default)
328 -t directory Top level directory of project (default to
330 -l, --list List tests rather than executing them.
331 --load-list Specifies a file containing test ids, only tests matching
332 those ids are executed.
334 For test discovery all test modules must be importable from the top
335 level directory of the project.
339 # NOT a TestResult, because we are implementing the interface, not inheriting
341 class TestResultDecorator(object):
342 """General pass-through decorator.
344 This provides a base that other TestResults can inherit from to
345 gain basic forwarding functionality. It also takes care of
346 handling the case where the target doesn't support newer methods
347 or features by degrading them.
350 def __init__(self, decorated):
351 """Create a TestResultDecorator forwarding to decorated."""
352 # Make every decorator degrade gracefully.
353 self.decorated = decorated
355 def startTest(self, test):
356 return self.decorated.startTest(test)
358 def startTestRun(self):
359 return self.decorated.startTestRun()
361 def stopTest(self, test):
362 return self.decorated.stopTest(test)
364 def stopTestRun(self):
365 return self.decorated.stopTestRun()
367 def addError(self, test, err=None):
368 return self.decorated.addError(test, err)
370 def addFailure(self, test, err=None):
371 return self.decorated.addFailure(test, err)
373 def addSuccess(self, test):
374 return self.decorated.addSuccess(test)
376 def addSkip(self, test, reason=None):
377 return self.decorated.addSkip(test, reason)
379 def addExpectedFailure(self, test, err=None):
380 return self.decorated.addExpectedFailure(test, err)
382 def addUnexpectedSuccess(self, test):
383 return self.decorated.addUnexpectedSuccess(test)
385 def _get_failfast(self):
386 return getattr(self.decorated, 'failfast', False)
388 def _set_failfast(self, value):
389 self.decorated.failfast = value
390 failfast = property(_get_failfast, _set_failfast)
392 def wasSuccessful(self):
393 return self.decorated.wasSuccessful()
396 def shouldStop(self):
397 return self.decorated.shouldStop
400 return self.decorated.stop()
404 return self.decorated.testsRun
406 def time(self, a_datetime):
407 return self.decorated.time(a_datetime)
410 class HookedTestResultDecorator(TestResultDecorator):
411 """A TestResult which calls a hook on every event."""
413 def __init__(self, decorated):
414 self.super = super(HookedTestResultDecorator, self)
415 self.super.__init__(decorated)
417 def startTest(self, test):
419 return self.super.startTest(test)
421 def startTestRun(self):
423 return self.super.startTestRun()
425 def stopTest(self, test):
427 return self.super.stopTest(test)
429 def stopTestRun(self):
431 return self.super.stopTestRun()
433 def addError(self, test, err=None):
435 return self.super.addError(test, err)
437 def addFailure(self, test, err=None):
439 return self.super.addFailure(test, err)
441 def addSuccess(self, test):
443 return self.super.addSuccess(test)
445 def addSkip(self, test, reason=None):
447 return self.super.addSkip(test, reason)
449 def addExpectedFailure(self, test, err=None):
451 return self.super.addExpectedFailure(test, err)
453 def addUnexpectedSuccess(self, test):
455 return self.super.addUnexpectedSuccess(test)
457 def wasSuccessful(self):
459 return self.super.wasSuccessful()
462 def shouldStop(self):
464 return self.super.shouldStop
468 return self.super.stop()
470 def time(self, a_datetime):
472 return self.super.time(a_datetime)
475 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
476 """Decorate a TestResult to add time events to a test run.
478 By default this will cause a time event before every test event,
479 but if explicit time data is being provided by the test run, then
480 this decorator will turn itself off to prevent causing confusion.
483 def __init__(self, decorated):
485 super(AutoTimingTestResultDecorator, self).__init__(decorated)
487 def _before_event(self):
491 time = datetime.datetime.utcnow().replace(tzinfo=UTC())
492 self.decorated.time(time)
495 def shouldStop(self):
496 return self.decorated.shouldStop
498 def time(self, a_datetime):
499 """Provide a timestamp for the current test activity.
501 :param a_datetime: If None, automatically add timestamps before every
502 event (this is the default behaviour if time() is not called at
503 all). If not None, pass the provided time onto the decorated
504 result object and disable automatic timestamps.
506 self._time = a_datetime
507 return self.decorated.time(a_datetime)
510 class SubunitTestRunner(object):
512 def __init__(self, verbosity=None, failfast=None, buffer=None, stream=None):
513 """Create a SubunitTestRunner.
515 :param verbosity: Ignored.
516 :param failfast: Stop running tests at the first failure.
517 :param buffer: Ignored.
519 self.failfast = failfast
520 self.stream = stream or sys.stdout
523 "Run the given test case or test suite."
524 result = TestProtocolClient(self.stream)
525 result = AutoTimingTestResultDecorator(result)
526 if self.failfast is not None:
527 result.failfast = self.failfast
532 class TestProgram(object):
533 """A command-line program that runs a set of tests; this is primarily
534 for making test modules conveniently executable.
536 USAGE = USAGE_AS_MAIN
538 # defaults for testing
539 failfast = catchbreak = buffer = progName = None
541 def __init__(self, module=__name__, defaultTest=None, argv=None,
542 testRunner=None, testLoader=defaultTestLoader,
543 exit=True, verbosity=1, failfast=None, catchbreak=None,
544 buffer=None, stdout=None):
545 if module == __name__:
547 elif isinstance(module, str):
548 self.module = __import__(module)
549 for part in module.split('.')[1:]:
550 self.module = getattr(self.module, part)
559 self.failfast = failfast
560 self.catchbreak = catchbreak
561 self.verbosity = verbosity
563 self.defaultTest = defaultTest
564 self.listtests = False
565 self.load_list = None
566 self.testRunner = testRunner
567 self.testLoader = testLoader
569 if progName.endswith('%srun.py' % os.path.sep):
570 elements = progName.split(os.path.sep)
571 progName = '%s.run' % elements[-2]
573 progName = os.path.basename(argv[0])
574 self.progName = progName
577 # TODO: preserve existing suites (like testresources does in
578 # OptimisingTestSuite.add, but with a standard protocol).
579 # This is needed because the load_tests hook allows arbitrary
580 # suites, even if that is rarely used.
581 source = open(self.load_list, 'rb')
583 lines = source.readlines()
586 test_ids = set(line.strip().decode('utf-8') for line in lines)
587 filtered = unittest.TestSuite()
588 for test in iterate_tests(self.test):
589 if test.id() in test_ids:
590 filtered.addTest(test)
592 if not self.listtests:
595 for test in iterate_tests(self.test):
596 stdout.write('%s\n' % test.id())
598 def parseArgs(self, argv):
599 if len(argv) > 1 and argv[1].lower() == 'discover':
600 self._do_discovery(argv[2:])
604 long_opts = ['help', 'verbose', 'quiet', 'failfast', 'catch', 'buffer',
605 'list', 'load-list=']
607 options, args = getopt.getopt(argv[1:], 'hHvqfcbl', long_opts)
608 for opt, value in options:
609 if opt in ('-h','-H','--help'):
611 if opt in ('-q','--quiet'):
613 if opt in ('-v','--verbose'):
615 if opt in ('-f','--failfast'):
616 if self.failfast is None:
618 # Should this raise an exception if -f is not valid?
619 if opt in ('-c','--catch'):
620 if self.catchbreak is None:
621 self.catchbreak = True
622 # Should this raise an exception if -c is not valid?
623 if opt in ('-b','--buffer'):
624 if self.buffer is None:
626 # Should this raise an exception if -b is not valid?
627 if opt in ('-l', '--list'):
628 self.listtests = True
629 if opt == '--load-list':
630 self.load_list = value
631 if len(args) == 0 and self.defaultTest is None:
632 # createTests will load tests from self.module
633 self.testNames = None
635 self.testNames = args
637 self.testNames = (self.defaultTest,)
640 self.usageExit(sys.exc_info()[1])
642 def createTests(self):
643 if self.testNames is None:
644 self.test = self.testLoader.loadTestsFromModule(self.module)
646 self.test = self.testLoader.loadTestsFromNames(self.testNames,
649 def _do_discovery(self, argv, Loader=defaultTestLoaderCls):
650 # handle command line args for test discovery
651 if not have_discover:
652 raise AssertionError("Unable to use discovery, must use python 2.7 "
653 "or greater, or install the discover package.")
654 self.progName = '%s discover' % self.progName
656 parser = optparse.OptionParser()
657 parser.prog = self.progName
658 parser.add_option('-v', '--verbose', dest='verbose', default=False,
659 help='Verbose output', action='store_true')
660 if self.failfast != False:
661 parser.add_option('-f', '--failfast', dest='failfast', default=False,
662 help='Stop on first fail or error',
664 if self.catchbreak != False:
665 parser.add_option('-c', '--catch', dest='catchbreak', default=False,
666 help='Catch ctrl-C and display results so far',
668 if self.buffer != False:
669 parser.add_option('-b', '--buffer', dest='buffer', default=False,
670 help='Buffer stdout and stderr during tests',
672 parser.add_option('-s', '--start-directory', dest='start', default='.',
673 help="Directory to start discovery ('.' default)")
674 parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
675 help="Pattern to match tests ('test*.py' default)")
676 parser.add_option('-t', '--top-level-directory', dest='top', default=None,
677 help='Top level directory of project (defaults to start directory)')
678 parser.add_option('-l', '--list', dest='listtests', default=False, action="store_true",
679 help='List tests rather than running them.')
680 parser.add_option('--load-list', dest='load_list', default=None,
681 help='Specify a filename containing the test ids to use.')
683 options, args = parser.parse_args(argv)
687 for name, value in zip(('start', 'pattern', 'top'), args):
688 setattr(options, name, value)
690 # only set options from the parsing here
691 # if they weren't set explicitly in the constructor
692 if self.failfast is None:
693 self.failfast = options.failfast
694 if self.catchbreak is None:
695 self.catchbreak = options.catchbreak
696 if self.buffer is None:
697 self.buffer = options.buffer
698 self.listtests = options.listtests
699 self.load_list = options.load_list
704 start_dir = options.start
705 pattern = options.pattern
706 top_level_dir = options.top
709 # See http://bugs.python.org/issue16709
710 # While sorting here is intrusive, its better than being random.
711 # Rules for the sort:
712 # - standard suites are flattened, and the resulting tests sorted by
714 # - non-standard suites are preserved as-is, and sorted into position
715 # by the first test found by iterating the suite.
716 # We do this by a DSU process: flatten and grab a key, sort, strip the
718 loaded = loader.discover(start_dir, pattern, top_level_dir)
719 self.test = sorted_tests(loaded)
723 and getattr(unittest, 'installHandler', None) is not None):
724 unittest.installHandler()
725 self.result = self.testRunner.run(self.test)
727 sys.exit(not self.result.wasSuccessful())
729 def usageExit(self, msg=None):
732 usage = {'progName': self.progName, 'catchbreak': '', 'failfast': '',
734 if self.failfast != False:
735 usage['failfast'] = FAILFAST
736 if self.catchbreak != False:
737 usage['catchbreak'] = CATCHBREAK
738 if self.buffer != False:
739 usage['buffer'] = BUFFEROUTPUT
740 usage_text = self.USAGE % usage
741 usage_lines = usage_text.split('\n')
742 usage_lines.insert(2, "Run a test suite with a subunit reporter.")
743 usage_lines.insert(3, "")
744 print('\n'.join(usage_lines))
748 if __name__ == '__main__':
749 TestProgram(module=None, argv=sys.argv, testRunner=SubunitTestRunner(),