3 # Simple subunit testrunner for python
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 # Cobbled together from testtools and subunit:
7 # Copyright (C) 2005-2011 Robert Collins <robertc@robertcollins.net>
8 # Copyright (c) 2008-2011 testtools developers.
10 # Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
11 # license at the users choice. A copy of both licenses are available in the
12 # project source as Apache-2.0 and BSD. You may not use this file except in
13 # compliance with one of these two licences.
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
17 # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
18 # license you chose for the specific language governing permissions and
19 # limitations under that license.
22 """Run a unittest testcase reporting results as Subunit.
24 $ python -m samba.subunit.run mylib.tests.test_suite
27 from iso8601.iso8601 import Utc
36 # Whether or not to hide layers of the stack trace that are
37 # unittest/testtools internal code. Defaults to True since the
38 # system-under-test is rarely unittest or testtools.
39 HIDE_INTERNAL_STACK = True
42 def write_traceback(stream, err, test):
43 """Converts a sys.exc_info()-style tuple of values into a string.
45 Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
47 def _is_relevant_tb_level(tb):
48 return '__unittest' in tb.tb_frame.f_globals
50 def _count_relevant_tb_levels(tb):
52 while tb and not _is_relevant_tb_level(tb):
57 exctype, value, tb = err
58 # Skip test runner traceback levels
59 if HIDE_INTERNAL_STACK:
60 while tb and _is_relevant_tb_level(tb):
63 format_exception = traceback.format_exception
65 if (HIDE_INTERNAL_STACK and test.failureException
66 and isinstance(value, test.failureException)):
67 # Skip assert*() traceback levels
68 length = _count_relevant_tb_levels(tb)
69 msgLines = format_exception(exctype, value, tb, length)
71 msgLines = format_exception(exctype, value, tb)
72 stream.writelines(msgLines)
75 class TestProtocolClient(unittest.TestResult):
76 """A TestResult which generates a subunit stream for a test run.
78 # Get a TestSuite or TestCase to run
80 # Create a stream (any object with a 'write' method). This should accept
81 # bytes not strings: subunit is a byte orientated protocol.
82 stream = file('tests.log', 'wb')
83 # Create a subunit result object which will output to the stream
84 result = subunit.TestProtocolClient(stream)
85 # Optionally, to get timing data for performance analysis, wrap the
86 # serialiser with a timing decorator
87 result = subunit.test_results.AutoTimingTestResultDecorator(result)
88 # Run the test suite reporting to the subunit result object
94 def __init__(self, stream):
95 unittest.TestResult.__init__(self)
98 def addError(self, test, error=None):
99 """Report an error in test test.
101 :param error: Standard unittest positional argument form - an
104 self._addOutcome("error", test, error=error)
106 def addExpectedFailure(self, test, error=None):
107 """Report an expected failure in test test.
109 :param error: Standard unittest positional argument form - an
112 self._addOutcome("xfail", test, error=error)
114 def addFailure(self, test, error=None):
115 """Report a failure in test test.
117 :param error: Standard unittest positional argument form - an
120 self._addOutcome("failure", test, error=error)
122 def _addOutcome(self, outcome, test, error=None, error_permitted=True):
123 """Report a failure in test test.
125 :param outcome: A string describing the outcome - used as the
126 event name in the subunit stream.
127 :param error: Standard unittest positional argument form - an
129 :param error_permitted: If True then error must be supplied.
130 If False then error must not be supplied.
132 self._stream.write(("%s: " % outcome) + self._test_id(test))
137 if error is not None:
139 if error is not None:
140 self._stream.write(" [\n")
141 write_traceback(self._stream, error, test)
143 self._stream.write("\n")
144 if error is not None:
145 self._stream.write("]\n")
147 def addSkip(self, test, reason=None):
148 """Report a skipped test."""
150 self._addOutcome("skip", test, error=None)
152 self._stream.write("skip: %s [\n" % test.id())
153 self._stream.write("%s\n" % reason)
154 self._stream.write("]\n")
156 def addSuccess(self, test):
157 """Report a success in a test."""
158 self._addOutcome("successful", test, error_permitted=False)
160 def addUnexpectedSuccess(self, test):
161 """Report an unexpected success in test test.
163 self._addOutcome("uxsuccess", test, error_permitted=False)
165 def _test_id(self, test):
167 if type(result) is not bytes:
168 result = result.encode('utf8')
171 def startTest(self, test):
172 """Mark a test as starting its test run."""
173 super(TestProtocolClient, self).startTest(test)
174 self._stream.write("test: " + self._test_id(test) + "\n")
177 def stopTest(self, test):
178 super(TestProtocolClient, self).stopTest(test)
181 def time(self, a_datetime):
182 """Inform the client of the time.
184 ":param datetime: A datetime.datetime object.
186 time = a_datetime.astimezone(Utc())
187 self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
188 time.year, time.month, time.day, time.hour, time.minute,
189 time.second, time.microsecond))
192 def _flatten_tests(suite_or_case, unpack_outer=False):
194 tests = iter(suite_or_case)
196 # Not iterable, assume it's a test case.
197 return [(suite_or_case.id(), suite_or_case)]
198 if (type(suite_or_case) in (unittest.TestSuite,) or
200 # Plain old test suite (or any others we may add).
203 # Recurse to flatten.
204 result.extend(_flatten_tests(test))
207 # Find any old actual test and grab its id.
209 tests = iterate_tests(suite_or_case)
213 # If it has a sort_tests method, call that.
214 if getattr(suite_or_case, 'sort_tests', None) is not None:
215 suite_or_case.sort_tests()
216 return [(suite_id, suite_or_case)]
219 def sorted_tests(suite_or_case, unpack_outer=False):
220 """Sort suite_or_case while preserving non-vanilla TestSuites."""
221 tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
223 return unittest.TestSuite([test for (sort_key, test) in tests])
226 def iterate_tests(test_suite_or_case):
227 """Iterate through all of the test cases in 'test_suite_or_case'."""
229 suite = iter(test_suite_or_case)
231 yield test_suite_or_case
234 for subtest in iterate_tests(test):
238 defaultTestLoader = unittest.defaultTestLoader
239 defaultTestLoaderCls = unittest.TestLoader
241 if getattr(defaultTestLoader, 'discover', None) is None:
244 defaultTestLoader = discover.DiscoveringTestLoader()
245 defaultTestLoaderCls = discover.DiscoveringTestLoader
248 have_discover = False
254 # Taken from python 2.7 and slightly modified for compatibility with
255 # older versions. Delete when 2.7 is the oldest supported version.
257 # - Use have_discover to raise an error if the user tries to use
258 # discovery on an old version and doesn't have discover installed.
259 # - If --catch is given check that installHandler is available, as
260 # it won't be on old python versions.
261 # - print calls have been been made single-source python3 compatibile.
262 # - exception handling likewise.
263 # - The default help has been changed to USAGE_AS_MAIN and USAGE_FROM_MODULE
265 # - A tweak has been added to detect 'python -m *.run' and use a
266 # better progName in that case.
267 # - self.module is more comprehensively set to None when being invoked from
268 # the commandline - __name__ is used as a sentinel value.
269 # - --list has been added which can list tests (should be upstreamed).
270 # - --load-list has been added which can reduce the tests used (should be
272 # - The limitation of using getopt is declared to the user.
273 # - http://bugs.python.org/issue16709 is worked around, by sorting tests when
276 CATCHBREAK = " -c, --catch Catch control-C and display results\n"
277 BUFFEROUTPUT = " -b, --buffer Buffer stdout and stderr during test runs\n"
280 Usage: %(progName)s [options] [tests]
283 -h, --help Show this message
284 -v, --verbose Verbose output
285 -q, --quiet Minimal output
286 -l, --list List tests rather than executing them.
287 --load-list Specifies a file containing test ids, only tests matching
288 those ids are executed.
289 %(catchbreak)s%(buffer)s
291 %(progName)s test_module - run tests from test_module
292 %(progName)s module.TestClass - run tests from module.TestClass
293 %(progName)s module.Class.test_method - run specified test method
295 All options must come before [tests]. [tests] can be a list of any number of
296 test modules, classes and test methods.
298 Alternative Usage: %(progName)s discover [options]
301 -v, --verbose Verbose output
302 s%(catchbreak)s%(buffer)s -s directory Directory to start discovery ('.' default)
303 -p pattern Pattern to match test files ('test*.py' default)
304 -t directory Top level directory of project (default to
306 -l, --list List tests rather than executing them.
307 --load-list Specifies a file containing test ids, only tests matching
308 those ids are executed.
310 For test discovery all test modules must be importable from the top
311 level directory of the project.
315 # NOT a TestResult, because we are implementing the interface, not inheriting
317 class TestResultDecorator(object):
318 """General pass-through decorator.
320 This provides a base that other TestResults can inherit from to
321 gain basic forwarding functionality. It also takes care of
322 handling the case where the target doesn't support newer methods
323 or features by degrading them.
326 def __init__(self, decorated):
327 """Create a TestResultDecorator forwarding to decorated."""
328 # Make every decorator degrade gracefully.
329 self.decorated = decorated
331 def startTest(self, test):
332 return self.decorated.startTest(test)
334 def startTestRun(self):
335 return self.decorated.startTestRun()
337 def stopTest(self, test):
338 return self.decorated.stopTest(test)
340 def stopTestRun(self):
341 return self.decorated.stopTestRun()
343 def addError(self, test, err=None):
344 return self.decorated.addError(test, err)
346 def addFailure(self, test, err=None):
347 return self.decorated.addFailure(test, err)
349 def addSuccess(self, test):
350 return self.decorated.addSuccess(test)
352 def addSkip(self, test, reason=None):
353 return self.decorated.addSkip(test, reason)
355 def addExpectedFailure(self, test, err=None):
356 return self.decorated.addExpectedFailure(test, err)
358 def addUnexpectedSuccess(self, test):
359 return self.decorated.addUnexpectedSuccess(test)
361 def wasSuccessful(self):
362 return self.decorated.wasSuccessful()
365 def shouldStop(self):
366 return self.decorated.shouldStop
369 return self.decorated.stop()
373 return self.decorated.testsRun
375 def time(self, a_datetime):
376 return self.decorated.time(a_datetime)
379 class HookedTestResultDecorator(TestResultDecorator):
380 """A TestResult which calls a hook on every event."""
382 def __init__(self, decorated):
383 self.super = super(HookedTestResultDecorator, self)
384 self.super.__init__(decorated)
386 def startTest(self, test):
388 return self.super.startTest(test)
390 def startTestRun(self):
392 return self.super.startTestRun()
394 def stopTest(self, test):
396 return self.super.stopTest(test)
398 def stopTestRun(self):
400 return self.super.stopTestRun()
402 def addError(self, test, err=None):
404 return self.super.addError(test, err)
406 def addFailure(self, test, err=None):
408 return self.super.addFailure(test, err)
410 def addSuccess(self, test):
412 return self.super.addSuccess(test)
414 def addSkip(self, test, reason=None):
416 return self.super.addSkip(test, reason)
418 def addExpectedFailure(self, test, err=None):
420 return self.super.addExpectedFailure(test, err)
422 def addUnexpectedSuccess(self, test):
424 return self.super.addUnexpectedSuccess(test)
426 def wasSuccessful(self):
428 return self.super.wasSuccessful()
431 def shouldStop(self):
433 return self.super.shouldStop
437 return self.super.stop()
439 def time(self, a_datetime):
441 return self.super.time(a_datetime)
444 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
445 """Decorate a TestResult to add time events to a test run.
447 By default this will cause a time event before every test event,
448 but if explicit time data is being provided by the test run, then
449 this decorator will turn itself off to prevent causing confusion.
452 def __init__(self, decorated):
454 super(AutoTimingTestResultDecorator, self).__init__(decorated)
456 def _before_event(self):
460 time = datetime.datetime.utcnow().replace(tzinfo=Utc())
461 self.decorated.time(time)
464 def shouldStop(self):
465 return self.decorated.shouldStop
467 def time(self, a_datetime):
468 """Provide a timestamp for the current test activity.
470 :param a_datetime: If None, automatically add timestamps before every
471 event (this is the default behaviour if time() is not called at
472 all). If not None, pass the provided time onto the decorated
473 result object and disable automatic timestamps.
475 self._time = a_datetime
476 return self.decorated.time(a_datetime)
479 class SubunitTestRunner(object):
481 def __init__(self, verbosity=None, buffer=None, stream=None):
482 """Create a SubunitTestRunner.
484 :param verbosity: Ignored.
485 :param buffer: Ignored.
487 self.stream = stream or sys.stdout
490 "Run the given test case or test suite."
491 result = TestProtocolClient(self.stream)
492 result = AutoTimingTestResultDecorator(result)
497 class TestProgram(object):
498 """A command-line program that runs a set of tests; this is primarily
499 for making test modules conveniently executable.
501 USAGE = USAGE_AS_MAIN
503 # defaults for testing
504 catchbreak = buffer = progName = None
506 def __init__(self, module=__name__, defaultTest=None, argv=None,
507 testRunner=None, testLoader=defaultTestLoader,
508 exit=True, verbosity=1, catchbreak=None,
509 buffer=None, stdout=None):
510 if module == __name__:
512 elif isinstance(module, str):
513 self.module = __import__(module)
514 for part in module.split('.')[1:]:
515 self.module = getattr(self.module, part)
522 if testRunner is None:
523 testRunner = SubunitTestRunner()
526 self.catchbreak = catchbreak
527 self.verbosity = verbosity
529 self.defaultTest = defaultTest
530 self.listtests = False
531 self.load_list = None
532 self.testRunner = testRunner
533 self.testLoader = testLoader
535 if progName.endswith('%srun.py' % os.path.sep):
536 elements = progName.split(os.path.sep)
537 progName = '%s.run' % elements[-2]
539 progName = os.path.basename(argv[0])
540 self.progName = progName
543 # TODO: preserve existing suites (like testresources does in
544 # OptimisingTestSuite.add, but with a standard protocol).
545 # This is needed because the load_tests hook allows arbitrary
546 # suites, even if that is rarely used.
547 source = open(self.load_list, 'rb')
549 lines = source.readlines()
552 test_ids = set(line.strip().decode('utf-8') for line in lines)
553 filtered = unittest.TestSuite()
554 for test in iterate_tests(self.test):
555 if test.id() in test_ids:
556 filtered.addTest(test)
558 if not self.listtests:
561 for test in iterate_tests(self.test):
562 stdout.write('%s\n' % test.id())
564 def parseArgs(self, argv):
565 if len(argv) > 1 and argv[1].lower() == 'discover':
566 self._do_discovery(argv[2:])
570 long_opts = ['help', 'verbose', 'quiet', 'catch', 'buffer',
571 'list', 'load-list=']
573 options, args = getopt.getopt(argv[1:], 'hHvqfcbl', long_opts)
574 for opt, value in options:
575 if opt in ('-h','-H','--help'):
577 if opt in ('-q','--quiet'):
579 if opt in ('-v','--verbose'):
581 if opt in ('-c','--catch'):
582 if self.catchbreak is None:
583 self.catchbreak = True
584 # Should this raise an exception if -c is not valid?
585 if opt in ('-b','--buffer'):
586 if self.buffer is None:
588 # Should this raise an exception if -b is not valid?
589 if opt in ('-l', '--list'):
590 self.listtests = True
591 if opt == '--load-list':
592 self.load_list = value
593 if len(args) == 0 and self.defaultTest is None:
594 # createTests will load tests from self.module
595 self.testNames = None
597 self.testNames = args
599 self.testNames = (self.defaultTest,)
602 self.usageExit(sys.exc_info()[1])
604 def createTests(self):
605 if self.testNames is None:
606 self.test = self.testLoader.loadTestsFromModule(self.module)
608 self.test = self.testLoader.loadTestsFromNames(self.testNames,
611 def _do_discovery(self, argv, Loader=defaultTestLoaderCls):
612 # handle command line args for test discovery
613 if not have_discover:
614 raise AssertionError("Unable to use discovery, must use python 2.7 "
615 "or greater, or install the discover package.")
616 self.progName = '%s discover' % self.progName
618 parser = optparse.OptionParser()
619 parser.prog = self.progName
620 parser.add_option('-v', '--verbose', dest='verbose', default=False,
621 help='Verbose output', action='store_true')
622 if self.catchbreak != False:
623 parser.add_option('-c', '--catch', dest='catchbreak', default=False,
624 help='Catch ctrl-C and display results so far',
626 if self.buffer != False:
627 parser.add_option('-b', '--buffer', dest='buffer', default=False,
628 help='Buffer stdout and stderr during tests',
630 parser.add_option('-s', '--start-directory', dest='start', default='.',
631 help="Directory to start discovery ('.' default)")
632 parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
633 help="Pattern to match tests ('test*.py' default)")
634 parser.add_option('-t', '--top-level-directory', dest='top', default=None,
635 help='Top level directory of project (defaults to start directory)')
636 parser.add_option('-l', '--list', dest='listtests', default=False, action="store_true",
637 help='List tests rather than running them.')
638 parser.add_option('--load-list', dest='load_list', default=None,
639 help='Specify a filename containing the test ids to use.')
641 options, args = parser.parse_args(argv)
645 for name, value in zip(('start', 'pattern', 'top'), args):
646 setattr(options, name, value)
648 # only set options from the parsing here
649 # if they weren't set explicitly in the constructor
650 if self.catchbreak is None:
651 self.catchbreak = options.catchbreak
652 if self.buffer is None:
653 self.buffer = options.buffer
654 self.listtests = options.listtests
655 self.load_list = options.load_list
660 start_dir = options.start
661 pattern = options.pattern
662 top_level_dir = options.top
665 # See http://bugs.python.org/issue16709
666 # While sorting here is intrusive, its better than being random.
667 # Rules for the sort:
668 # - standard suites are flattened, and the resulting tests sorted by
670 # - non-standard suites are preserved as-is, and sorted into position
671 # by the first test found by iterating the suite.
672 # We do this by a DSU process: flatten and grab a key, sort, strip the
674 loaded = loader.discover(start_dir, pattern, top_level_dir)
675 self.test = sorted_tests(loaded)
679 and getattr(unittest, 'installHandler', None) is not None):
680 unittest.installHandler()
681 self.result = self.testRunner.run(self.test)
683 sys.exit(not self.result.wasSuccessful())
685 def usageExit(self, msg=None):
688 usage = {'progName': self.progName, 'catchbreak': '',
690 if self.catchbreak != False:
691 usage['catchbreak'] = CATCHBREAK
692 if self.buffer != False:
693 usage['buffer'] = BUFFEROUTPUT
694 usage_text = self.USAGE % usage
695 usage_lines = usage_text.split('\n')
696 usage_lines.insert(2, "Run a test suite with a subunit reporter.")
697 usage_lines.insert(3, "")
698 print('\n'.join(usage_lines))
702 if __name__ == '__main__':
703 TestProgram(module=None, argv=sys.argv, stdout=sys.stdout)