Drop support for failfast mode, rather than adding support for it everywhere.
[obnox/samba/samba-obnox.git] / python / samba / subunit / run.py
1 #!/usr/bin/python
2 #
3 # Simple subunit testrunner for python
4 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
5
6 # Cobbled together from testtools and subunit:
7 # Copyright (C) 2005-2011 Robert Collins <robertc@robertcollins.net>
8 # Copyright (c) 2008-2011 testtools developers.
9 #
10 #  Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
11 #  license at the users choice. A copy of both licenses are available in the
12 #  project source as Apache-2.0 and BSD. You may not use this file except in
13 #  compliance with one of these two licences.
14 #
15 #  Unless required by applicable law or agreed to in writing, software
16 #  distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
17 #  WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
18 #  license you chose for the specific language governing permissions and
19 #  limitations under that license.
20 #
21
22 """Run a unittest testcase reporting results as Subunit.
23
24   $ python -m samba.subunit.run mylib.tests.test_suite
25 """
26
27 from iso8601.iso8601 import Utc
28
29 import datetime
30 import os
31 import sys
32 import traceback
33 import unittest
34
35
36 # Whether or not to hide layers of the stack trace that are
37 # unittest/testtools internal code.  Defaults to True since the
38 # system-under-test is rarely unittest or testtools.
39 HIDE_INTERNAL_STACK = True
40
41
42 def write_traceback(stream, err, test):
43     """Converts a sys.exc_info()-style tuple of values into a string.
44
45     Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
46     """
47     def _is_relevant_tb_level(tb):
48         return '__unittest' in tb.tb_frame.f_globals
49
50     def _count_relevant_tb_levels(tb):
51         length = 0
52         while tb and not _is_relevant_tb_level(tb):
53             length += 1
54             tb = tb.tb_next
55         return length
56
57     exctype, value, tb = err
58     # Skip test runner traceback levels
59     if HIDE_INTERNAL_STACK:
60         while tb and _is_relevant_tb_level(tb):
61             tb = tb.tb_next
62
63     format_exception = traceback.format_exception
64
65     if (HIDE_INTERNAL_STACK and test.failureException
66         and isinstance(value, test.failureException)):
67         # Skip assert*() traceback levels
68         length = _count_relevant_tb_levels(tb)
69         msgLines = format_exception(exctype, value, tb, length)
70     else:
71         msgLines = format_exception(exctype, value, tb)
72     stream.writelines(msgLines)
73
74
75 class TestProtocolClient(unittest.TestResult):
76     """A TestResult which generates a subunit stream for a test run.
77
78     # Get a TestSuite or TestCase to run
79     suite = make_suite()
80     # Create a stream (any object with a 'write' method). This should accept
81     # bytes not strings: subunit is a byte orientated protocol.
82     stream = file('tests.log', 'wb')
83     # Create a subunit result object which will output to the stream
84     result = subunit.TestProtocolClient(stream)
85     # Optionally, to get timing data for performance analysis, wrap the
86     # serialiser with a timing decorator
87     result = subunit.test_results.AutoTimingTestResultDecorator(result)
88     # Run the test suite reporting to the subunit result object
89     suite.run(result)
90     # Close the stream.
91     stream.close()
92     """
93
94     def __init__(self, stream):
95         unittest.TestResult.__init__(self)
96         self._stream = stream
97
98     def addError(self, test, error=None):
99         """Report an error in test test.
100
101         :param error: Standard unittest positional argument form - an
102             exc_info tuple.
103         """
104         self._addOutcome("error", test, error=error)
105
106     def addExpectedFailure(self, test, error=None):
107         """Report an expected failure in test test.
108
109         :param error: Standard unittest positional argument form - an
110             exc_info tuple.
111         """
112         self._addOutcome("xfail", test, error=error)
113
114     def addFailure(self, test, error=None):
115         """Report a failure in test test.
116
117         :param error: Standard unittest positional argument form - an
118             exc_info tuple.
119         """
120         self._addOutcome("failure", test, error=error)
121
122     def _addOutcome(self, outcome, test, error=None, error_permitted=True):
123         """Report a failure in test test.
124
125         :param outcome: A string describing the outcome - used as the
126             event name in the subunit stream.
127         :param error: Standard unittest positional argument form - an
128             exc_info tuple.
129         :param error_permitted: If True then error must be supplied.
130             If False then error must not be supplied.
131         """
132         self._stream.write(("%s: " % outcome) + self._test_id(test))
133         if error_permitted:
134             if error is None:
135                 raise ValueError
136         else:
137             if error is not None:
138                 raise ValueError
139         if error is not None:
140             self._stream.write(" [\n")
141             write_traceback(self._stream, error, test)
142         else:
143             self._stream.write("\n")
144         if error is not None:
145             self._stream.write("]\n")
146
147     def addSkip(self, test, reason=None):
148         """Report a skipped test."""
149         if reason is None:
150             self._addOutcome("skip", test, error=None)
151         else:
152             self._stream.write("skip: %s [\n" % test.id())
153             self._stream.write("%s\n" % reason)
154             self._stream.write("]\n")
155
156     def addSuccess(self, test):
157         """Report a success in a test."""
158         self._addOutcome("successful", test, error_permitted=False)
159
160     def addUnexpectedSuccess(self, test):
161         """Report an unexpected success in test test.
162         """
163         self._addOutcome("uxsuccess", test, error_permitted=False)
164
165     def _test_id(self, test):
166         result = test.id()
167         if type(result) is not bytes:
168             result = result.encode('utf8')
169         return result
170
171     def startTest(self, test):
172         """Mark a test as starting its test run."""
173         super(TestProtocolClient, self).startTest(test)
174         self._stream.write("test: " + self._test_id(test) + "\n")
175         self._stream.flush()
176
177     def stopTest(self, test):
178         super(TestProtocolClient, self).stopTest(test)
179         self._stream.flush()
180
181     def time(self, a_datetime):
182         """Inform the client of the time.
183
184         ":param datetime: A datetime.datetime object.
185         """
186         time = a_datetime.astimezone(Utc())
187         self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
188             time.year, time.month, time.day, time.hour, time.minute,
189             time.second, time.microsecond))
190
191
192 def _flatten_tests(suite_or_case, unpack_outer=False):
193     try:
194         tests = iter(suite_or_case)
195     except TypeError:
196         # Not iterable, assume it's a test case.
197         return [(suite_or_case.id(), suite_or_case)]
198     if (type(suite_or_case) in (unittest.TestSuite,) or
199         unpack_outer):
200         # Plain old test suite (or any others we may add).
201         result = []
202         for test in tests:
203             # Recurse to flatten.
204             result.extend(_flatten_tests(test))
205         return result
206     else:
207         # Find any old actual test and grab its id.
208         suite_id = None
209         tests = iterate_tests(suite_or_case)
210         for test in tests:
211             suite_id = test.id()
212             break
213         # If it has a sort_tests method, call that.
214         if getattr(suite_or_case, 'sort_tests', None) is not None:
215             suite_or_case.sort_tests()
216         return [(suite_id, suite_or_case)]
217
218
219 def sorted_tests(suite_or_case, unpack_outer=False):
220     """Sort suite_or_case while preserving non-vanilla TestSuites."""
221     tests = _flatten_tests(suite_or_case, unpack_outer=unpack_outer)
222     tests.sort()
223     return unittest.TestSuite([test for (sort_key, test) in tests])
224
225
226 def iterate_tests(test_suite_or_case):
227     """Iterate through all of the test cases in 'test_suite_or_case'."""
228     try:
229         suite = iter(test_suite_or_case)
230     except TypeError:
231         yield test_suite_or_case
232     else:
233         for test in suite:
234             for subtest in iterate_tests(test):
235                 yield subtest
236
237
238 defaultTestLoader = unittest.defaultTestLoader
239 defaultTestLoaderCls = unittest.TestLoader
240
241 if getattr(defaultTestLoader, 'discover', None) is None:
242     try:
243         import discover
244         defaultTestLoader = discover.DiscoveringTestLoader()
245         defaultTestLoaderCls = discover.DiscoveringTestLoader
246         have_discover = True
247     except ImportError:
248         have_discover = False
249 else:
250     have_discover = True
251
252
253 ####################
254 # Taken from python 2.7 and slightly modified for compatibility with
255 # older versions. Delete when 2.7 is the oldest supported version.
256 # Modifications:
257 #  - Use have_discover to raise an error if the user tries to use
258 #    discovery on an old version and doesn't have discover installed.
259 #  - If --catch is given check that installHandler is available, as
260 #    it won't be on old python versions.
261 #  - print calls have been been made single-source python3 compatibile.
262 #  - exception handling likewise.
263 #  - The default help has been changed to USAGE_AS_MAIN and USAGE_FROM_MODULE
264 #    removed.
265 #  - A tweak has been added to detect 'python -m *.run' and use a
266 #    better progName in that case.
267 #  - self.module is more comprehensively set to None when being invoked from
268 #    the commandline - __name__ is used as a sentinel value.
269 #  - --list has been added which can list tests (should be upstreamed).
270 #  - --load-list has been added which can reduce the tests used (should be
271 #    upstreamed).
272 #  - The limitation of using getopt is declared to the user.
273 #  - http://bugs.python.org/issue16709 is worked around, by sorting tests when
274 #    discover is used.
275
276 CATCHBREAK   = "  -c, --catch      Catch control-C and display results\n"
277 BUFFEROUTPUT = "  -b, --buffer     Buffer stdout and stderr during test runs\n"
278
279 USAGE_AS_MAIN = """\
280 Usage: %(progName)s [options] [tests]
281
282 Options:
283   -h, --help       Show this message
284   -v, --verbose    Verbose output
285   -q, --quiet      Minimal output
286   -l, --list       List tests rather than executing them.
287   --load-list      Specifies a file containing test ids, only tests matching
288                    those ids are executed.
289 %(catchbreak)s%(buffer)s
290 Examples:
291   %(progName)s test_module               - run tests from test_module
292   %(progName)s module.TestClass          - run tests from module.TestClass
293   %(progName)s module.Class.test_method  - run specified test method
294
295 All options must come before [tests].  [tests] can be a list of any number of
296 test modules, classes and test methods.
297
298 Alternative Usage: %(progName)s discover [options]
299
300 Options:
301   -v, --verbose    Verbose output
302 s%(catchbreak)s%(buffer)s  -s directory     Directory to start discovery ('.' default)
303   -p pattern       Pattern to match test files ('test*.py' default)
304   -t directory     Top level directory of project (default to
305                    start directory)
306   -l, --list       List tests rather than executing them.
307   --load-list      Specifies a file containing test ids, only tests matching
308                    those ids are executed.
309
310 For test discovery all test modules must be importable from the top
311 level directory of the project.
312 """
313
314
315 # NOT a TestResult, because we are implementing the interface, not inheriting
316 # it.
317 class TestResultDecorator(object):
318     """General pass-through decorator.
319
320     This provides a base that other TestResults can inherit from to
321     gain basic forwarding functionality. It also takes care of
322     handling the case where the target doesn't support newer methods
323     or features by degrading them.
324     """
325
326     def __init__(self, decorated):
327         """Create a TestResultDecorator forwarding to decorated."""
328         # Make every decorator degrade gracefully.
329         self.decorated = decorated
330
331     def startTest(self, test):
332         return self.decorated.startTest(test)
333
334     def startTestRun(self):
335         return self.decorated.startTestRun()
336
337     def stopTest(self, test):
338         return self.decorated.stopTest(test)
339
340     def stopTestRun(self):
341         return self.decorated.stopTestRun()
342
343     def addError(self, test, err=None):
344         return self.decorated.addError(test, err)
345
346     def addFailure(self, test, err=None):
347         return self.decorated.addFailure(test, err)
348
349     def addSuccess(self, test):
350         return self.decorated.addSuccess(test)
351
352     def addSkip(self, test, reason=None):
353         return self.decorated.addSkip(test, reason)
354
355     def addExpectedFailure(self, test, err=None):
356         return self.decorated.addExpectedFailure(test, err)
357
358     def addUnexpectedSuccess(self, test):
359         return self.decorated.addUnexpectedSuccess(test)
360
361     def wasSuccessful(self):
362         return self.decorated.wasSuccessful()
363
364     @property
365     def shouldStop(self):
366         return self.decorated.shouldStop
367
368     def stop(self):
369         return self.decorated.stop()
370
371     @property
372     def testsRun(self):
373         return self.decorated.testsRun
374
375     def time(self, a_datetime):
376         return self.decorated.time(a_datetime)
377
378
379 class HookedTestResultDecorator(TestResultDecorator):
380     """A TestResult which calls a hook on every event."""
381
382     def __init__(self, decorated):
383         self.super = super(HookedTestResultDecorator, self)
384         self.super.__init__(decorated)
385
386     def startTest(self, test):
387         self._before_event()
388         return self.super.startTest(test)
389
390     def startTestRun(self):
391         self._before_event()
392         return self.super.startTestRun()
393
394     def stopTest(self, test):
395         self._before_event()
396         return self.super.stopTest(test)
397
398     def stopTestRun(self):
399         self._before_event()
400         return self.super.stopTestRun()
401
402     def addError(self, test, err=None):
403         self._before_event()
404         return self.super.addError(test, err)
405
406     def addFailure(self, test, err=None):
407         self._before_event()
408         return self.super.addFailure(test, err)
409
410     def addSuccess(self, test):
411         self._before_event()
412         return self.super.addSuccess(test)
413
414     def addSkip(self, test, reason=None):
415         self._before_event()
416         return self.super.addSkip(test, reason)
417
418     def addExpectedFailure(self, test, err=None):
419         self._before_event()
420         return self.super.addExpectedFailure(test, err)
421
422     def addUnexpectedSuccess(self, test):
423         self._before_event()
424         return self.super.addUnexpectedSuccess(test)
425
426     def wasSuccessful(self):
427         self._before_event()
428         return self.super.wasSuccessful()
429
430     @property
431     def shouldStop(self):
432         self._before_event()
433         return self.super.shouldStop
434
435     def stop(self):
436         self._before_event()
437         return self.super.stop()
438
439     def time(self, a_datetime):
440         self._before_event()
441         return self.super.time(a_datetime)
442
443
444 class AutoTimingTestResultDecorator(HookedTestResultDecorator):
445     """Decorate a TestResult to add time events to a test run.
446
447     By default this will cause a time event before every test event,
448     but if explicit time data is being provided by the test run, then
449     this decorator will turn itself off to prevent causing confusion.
450     """
451
452     def __init__(self, decorated):
453         self._time = None
454         super(AutoTimingTestResultDecorator, self).__init__(decorated)
455
456     def _before_event(self):
457         time = self._time
458         if time is not None:
459             return
460         time = datetime.datetime.utcnow().replace(tzinfo=Utc())
461         self.decorated.time(time)
462
463     @property
464     def shouldStop(self):
465         return self.decorated.shouldStop
466
467     def time(self, a_datetime):
468         """Provide a timestamp for the current test activity.
469
470         :param a_datetime: If None, automatically add timestamps before every
471             event (this is the default behaviour if time() is not called at
472             all).  If not None, pass the provided time onto the decorated
473             result object and disable automatic timestamps.
474         """
475         self._time = a_datetime
476         return self.decorated.time(a_datetime)
477
478
479 class SubunitTestRunner(object):
480
481     def __init__(self, verbosity=None, buffer=None, stream=None):
482         """Create a SubunitTestRunner.
483
484         :param verbosity: Ignored.
485         :param buffer: Ignored.
486         """
487         self.stream = stream or sys.stdout
488
489     def run(self, test):
490         "Run the given test case or test suite."
491         result = TestProtocolClient(self.stream)
492         result = AutoTimingTestResultDecorator(result)
493         test(result)
494         return result
495
496
497 class TestProgram(object):
498     """A command-line program that runs a set of tests; this is primarily
499        for making test modules conveniently executable.
500     """
501     USAGE = USAGE_AS_MAIN
502
503     # defaults for testing
504     catchbreak = buffer = progName = None
505
506     def __init__(self, module=__name__, defaultTest=None, argv=None,
507                     testRunner=None, testLoader=defaultTestLoader,
508                     exit=True, verbosity=1, catchbreak=None,
509                     buffer=None, stdout=None):
510         if module == __name__:
511             self.module = None
512         elif isinstance(module, str):
513             self.module = __import__(module)
514             for part in module.split('.')[1:]:
515                 self.module = getattr(self.module, part)
516         else:
517             self.module = module
518         if argv is None:
519             argv = sys.argv
520         if stdout is None:
521             stdout = sys.stdout
522         if testRunner is None:
523             testRunner = SubunitTestRunner()
524
525         self.exit = exit
526         self.catchbreak = catchbreak
527         self.verbosity = verbosity
528         self.buffer = buffer
529         self.defaultTest = defaultTest
530         self.listtests = False
531         self.load_list = None
532         self.testRunner = testRunner
533         self.testLoader = testLoader
534         progName = argv[0]
535         if progName.endswith('%srun.py' % os.path.sep):
536             elements = progName.split(os.path.sep)
537             progName = '%s.run' % elements[-2]
538         else:
539             progName = os.path.basename(argv[0])
540         self.progName = progName
541         self.parseArgs(argv)
542         if self.load_list:
543             # TODO: preserve existing suites (like testresources does in
544             # OptimisingTestSuite.add, but with a standard protocol).
545             # This is needed because the load_tests hook allows arbitrary
546             # suites, even if that is rarely used.
547             source = open(self.load_list, 'rb')
548             try:
549                 lines = source.readlines()
550             finally:
551                 source.close()
552             test_ids = set(line.strip().decode('utf-8') for line in lines)
553             filtered = unittest.TestSuite()
554             for test in iterate_tests(self.test):
555                 if test.id() in test_ids:
556                     filtered.addTest(test)
557             self.test = filtered
558         if not self.listtests:
559             self.runTests()
560         else:
561             for test in iterate_tests(self.test):
562                 stdout.write('%s\n' % test.id())
563
564     def parseArgs(self, argv):
565         if len(argv) > 1 and argv[1].lower() == 'discover':
566             self._do_discovery(argv[2:])
567             return
568
569         import getopt
570         long_opts = ['help', 'verbose', 'quiet', 'catch', 'buffer',
571             'list', 'load-list=']
572         try:
573             options, args = getopt.getopt(argv[1:], 'hHvqfcbl', long_opts)
574             for opt, value in options:
575                 if opt in ('-h','-H','--help'):
576                     self.usageExit()
577                 if opt in ('-q','--quiet'):
578                     self.verbosity = 0
579                 if opt in ('-v','--verbose'):
580                     self.verbosity = 2
581                 if opt in ('-c','--catch'):
582                     if self.catchbreak is None:
583                         self.catchbreak = True
584                     # Should this raise an exception if -c is not valid?
585                 if opt in ('-b','--buffer'):
586                     if self.buffer is None:
587                         self.buffer = True
588                     # Should this raise an exception if -b is not valid?
589                 if opt in ('-l', '--list'):
590                     self.listtests = True
591                 if opt == '--load-list':
592                     self.load_list = value
593             if len(args) == 0 and self.defaultTest is None:
594                 # createTests will load tests from self.module
595                 self.testNames = None
596             elif len(args) > 0:
597                 self.testNames = args
598             else:
599                 self.testNames = (self.defaultTest,)
600             self.createTests()
601         except getopt.error:
602             self.usageExit(sys.exc_info()[1])
603
604     def createTests(self):
605         if self.testNames is None:
606             self.test = self.testLoader.loadTestsFromModule(self.module)
607         else:
608             self.test = self.testLoader.loadTestsFromNames(self.testNames,
609                                                            self.module)
610
611     def _do_discovery(self, argv, Loader=defaultTestLoaderCls):
612         # handle command line args for test discovery
613         if not have_discover:
614             raise AssertionError("Unable to use discovery, must use python 2.7 "
615                     "or greater, or install the discover package.")
616         self.progName = '%s discover' % self.progName
617         import optparse
618         parser = optparse.OptionParser()
619         parser.prog = self.progName
620         parser.add_option('-v', '--verbose', dest='verbose', default=False,
621                           help='Verbose output', action='store_true')
622         if self.catchbreak != False:
623             parser.add_option('-c', '--catch', dest='catchbreak', default=False,
624                               help='Catch ctrl-C and display results so far',
625                               action='store_true')
626         if self.buffer != False:
627             parser.add_option('-b', '--buffer', dest='buffer', default=False,
628                               help='Buffer stdout and stderr during tests',
629                               action='store_true')
630         parser.add_option('-s', '--start-directory', dest='start', default='.',
631                           help="Directory to start discovery ('.' default)")
632         parser.add_option('-p', '--pattern', dest='pattern', default='test*.py',
633                           help="Pattern to match tests ('test*.py' default)")
634         parser.add_option('-t', '--top-level-directory', dest='top', default=None,
635                           help='Top level directory of project (defaults to start directory)')
636         parser.add_option('-l', '--list', dest='listtests', default=False, action="store_true",
637                           help='List tests rather than running them.')
638         parser.add_option('--load-list', dest='load_list', default=None,
639                           help='Specify a filename containing the test ids to use.')
640
641         options, args = parser.parse_args(argv)
642         if len(args) > 3:
643             self.usageExit()
644
645         for name, value in zip(('start', 'pattern', 'top'), args):
646             setattr(options, name, value)
647
648         # only set options from the parsing here
649         # if they weren't set explicitly in the constructor
650         if self.catchbreak is None:
651             self.catchbreak = options.catchbreak
652         if self.buffer is None:
653             self.buffer = options.buffer
654         self.listtests = options.listtests
655         self.load_list = options.load_list
656
657         if options.verbose:
658             self.verbosity = 2
659
660         start_dir = options.start
661         pattern = options.pattern
662         top_level_dir = options.top
663
664         loader = Loader()
665         # See http://bugs.python.org/issue16709
666         # While sorting here is intrusive, its better than being random.
667         # Rules for the sort:
668         # - standard suites are flattened, and the resulting tests sorted by
669         #   id.
670         # - non-standard suites are preserved as-is, and sorted into position
671         #   by the first test found by iterating the suite.
672         # We do this by a DSU process: flatten and grab a key, sort, strip the
673         # keys.
674         loaded = loader.discover(start_dir, pattern, top_level_dir)
675         self.test = sorted_tests(loaded)
676
677     def runTests(self):
678         if (self.catchbreak
679             and getattr(unittest, 'installHandler', None) is not None):
680             unittest.installHandler()
681         self.result = self.testRunner.run(self.test)
682         if self.exit:
683             sys.exit(not self.result.wasSuccessful())
684
685     def usageExit(self, msg=None):
686         if msg:
687             print (msg)
688         usage = {'progName': self.progName, 'catchbreak': '',
689                  'buffer': ''}
690         if self.catchbreak != False:
691             usage['catchbreak'] = CATCHBREAK
692         if self.buffer != False:
693             usage['buffer'] = BUFFEROUTPUT
694         usage_text = self.USAGE % usage
695         usage_lines = usage_text.split('\n')
696         usage_lines.insert(2, "Run a test suite with a subunit reporter.")
697         usage_lines.insert(3, "")
698         print('\n'.join(usage_lines))
699         sys.exit(2)
700
701
702 if __name__ == '__main__':
703     TestProgram(module=None, argv=sys.argv, stdout=sys.stdout)