aec6edb03209ba53a8abae23332e52d87a992045
[kai/samba.git] / lib / testtools / testtools / testresult / real.py
1 # Copyright (c) 2008 testtools developers. See LICENSE for details.
2
3 """Test results and related things."""
4
5 __metaclass__ = type
6 __all__ = [
7     'ExtendedToOriginalDecorator',
8     'MultiTestResult',
9     'TestResult',
10     'ThreadsafeForwardingResult',
11     ]
12
13 import datetime
14 import sys
15 import unittest
16
17 from testtools.compat import all, _format_exc_info, str_is_unicode, _u
18
19 # From http://docs.python.org/library/datetime.html
20 _ZERO = datetime.timedelta(0)
21
22 # A UTC class.
23
24 class UTC(datetime.tzinfo):
25     """UTC"""
26
27     def utcoffset(self, dt):
28         return _ZERO
29
30     def tzname(self, dt):
31         return "UTC"
32
33     def dst(self, dt):
34         return _ZERO
35
36 utc = UTC()
37
38
39 class TestResult(unittest.TestResult):
40     """Subclass of unittest.TestResult extending the protocol for flexability.
41
42     This test result supports an experimental protocol for providing additional
43     data to in test outcomes. All the outcome methods take an optional dict
44     'details'. If supplied any other detail parameters like 'err' or 'reason'
45     should not be provided. The details dict is a mapping from names to
46     MIME content objects (see testtools.content). This permits attaching
47     tracebacks, log files, or even large objects like databases that were
48     part of the test fixture. Until this API is accepted into upstream
49     Python it is considered experimental: it may be replaced at any point
50     by a newer version more in line with upstream Python. Compatibility would
51     be aimed for in this case, but may not be possible.
52
53     :ivar skip_reasons: A dict of skip-reasons -> list of tests. See addSkip.
54     """
55
56     def __init__(self):
57         # startTestRun resets all attributes, and older clients don't know to
58         # call startTestRun, so it is called once here.
59         # Because subclasses may reasonably not expect this, we call the
60         # specific version we want to run.
61         TestResult.startTestRun(self)
62
63     def addExpectedFailure(self, test, err=None, details=None):
64         """Called when a test has failed in an expected manner.
65
66         Like with addSuccess and addError, testStopped should still be called.
67
68         :param test: The test that has been skipped.
69         :param err: The exc_info of the error that was raised.
70         :return: None
71         """
72         # This is the python 2.7 implementation
73         self.expectedFailures.append(
74             (test, self._err_details_to_string(test, err, details)))
75
76     def addError(self, test, err=None, details=None):
77         """Called when an error has occurred. 'err' is a tuple of values as
78         returned by sys.exc_info().
79
80         :param details: Alternative way to supply details about the outcome.
81             see the class docstring for more information.
82         """
83         self.errors.append((test,
84             self._err_details_to_string(test, err, details)))
85
86     def addFailure(self, test, err=None, details=None):
87         """Called when an error has occurred. 'err' is a tuple of values as
88         returned by sys.exc_info().
89
90         :param details: Alternative way to supply details about the outcome.
91             see the class docstring for more information.
92         """
93         self.failures.append((test,
94             self._err_details_to_string(test, err, details)))
95
96     def addSkip(self, test, reason=None, details=None):
97         """Called when a test has been skipped rather than running.
98
99         Like with addSuccess and addError, testStopped should still be called.
100
101         This must be called by the TestCase. 'addError' and 'addFailure' will
102         not call addSkip, since they have no assumptions about the kind of
103         errors that a test can raise.
104
105         :param test: The test that has been skipped.
106         :param reason: The reason for the test being skipped. For instance,
107             u"pyGL is not available".
108         :param details: Alternative way to supply details about the outcome.
109             see the class docstring for more information.
110         :return: None
111         """
112         if reason is None:
113             reason = details.get('reason')
114             if reason is None:
115                 reason = 'No reason given'
116             else:
117                 reason = ''.join(reason.iter_text())
118         skip_list = self.skip_reasons.setdefault(reason, [])
119         skip_list.append(test)
120
121     def addSuccess(self, test, details=None):
122         """Called when a test succeeded."""
123
124     def addUnexpectedSuccess(self, test, details=None):
125         """Called when a test was expected to fail, but succeed."""
126         self.unexpectedSuccesses.append(test)
127
128     def wasSuccessful(self):
129         """Has this result been successful so far?
130
131         If there have been any errors, failures or unexpected successes,
132         return False.  Otherwise, return True.
133
134         Note: This differs from standard unittest in that we consider
135         unexpected successes to be equivalent to failures, rather than
136         successes.
137         """
138         return not (self.errors or self.failures or self.unexpectedSuccesses)
139
140     if str_is_unicode:
141         # Python 3 and IronPython strings are unicode, use parent class method
142         _exc_info_to_unicode = unittest.TestResult._exc_info_to_string
143     else:
144         # For Python 2, need to decode components of traceback according to
145         # their source, so can't use traceback.format_exception
146         # Here follows a little deep magic to copy the existing method and
147         # replace the formatter with one that returns unicode instead
148         from types import FunctionType as __F, ModuleType as __M
149         __f = unittest.TestResult._exc_info_to_string.im_func
150         __g = dict(__f.func_globals)
151         __m = __M("__fake_traceback")
152         __m.format_exception = _format_exc_info
153         __g["traceback"] = __m
154         _exc_info_to_unicode = __F(__f.func_code, __g, "_exc_info_to_unicode")
155         del __F, __M, __f, __g, __m
156
157     def _err_details_to_string(self, test, err=None, details=None):
158         """Convert an error in exc_info form or a contents dict to a string."""
159         if err is not None:
160             return self._exc_info_to_unicode(err, test)
161         return _details_to_str(details, special='traceback')
162
163     def _now(self):
164         """Return the current 'test time'.
165
166         If the time() method has not been called, this is equivalent to
167         datetime.now(), otherwise its the last supplied datestamp given to the
168         time() method.
169         """
170         if self.__now is None:
171             return datetime.datetime.now(utc)
172         else:
173             return self.__now
174
175     def startTestRun(self):
176         """Called before a test run starts.
177
178         New in Python 2.7. The testtools version resets the result to a
179         pristine condition ready for use in another test run.  Note that this
180         is different from Python 2.7's startTestRun, which does nothing.
181         """
182         super(TestResult, self).__init__()
183         self.skip_reasons = {}
184         self.__now = None
185         # -- Start: As per python 2.7 --
186         self.expectedFailures = []
187         self.unexpectedSuccesses = []
188         # -- End:   As per python 2.7 --
189
190     def stopTestRun(self):
191         """Called after a test run completes
192
193         New in python 2.7
194         """
195
196     def time(self, a_datetime):
197         """Provide a timestamp to represent the current time.
198
199         This is useful when test activity is time delayed, or happening
200         concurrently and getting the system time between API calls will not
201         accurately represent the duration of tests (or the whole run).
202
203         Calling time() sets the datetime used by the TestResult object.
204         Time is permitted to go backwards when using this call.
205
206         :param a_datetime: A datetime.datetime object with TZ information or
207             None to reset the TestResult to gathering time from the system.
208         """
209         self.__now = a_datetime
210
211     def done(self):
212         """Called when the test runner is done.
213
214         deprecated in favour of stopTestRun.
215         """
216
217
218 class MultiTestResult(TestResult):
219     """A test result that dispatches to many test results."""
220
221     def __init__(self, *results):
222         TestResult.__init__(self)
223         self._results = list(map(ExtendedToOriginalDecorator, results))
224
225     def __repr__(self):
226         return '<%s (%s)>' % (
227             self.__class__.__name__, ', '.join(map(repr, self._results)))
228
229     def _dispatch(self, message, *args, **kwargs):
230         return tuple(
231             getattr(result, message)(*args, **kwargs)
232             for result in self._results)
233
234     def startTest(self, test):
235         return self._dispatch('startTest', test)
236
237     def stopTest(self, test):
238         return self._dispatch('stopTest', test)
239
240     def addError(self, test, error=None, details=None):
241         return self._dispatch('addError', test, error, details=details)
242
243     def addExpectedFailure(self, test, err=None, details=None):
244         return self._dispatch(
245             'addExpectedFailure', test, err, details=details)
246
247     def addFailure(self, test, err=None, details=None):
248         return self._dispatch('addFailure', test, err, details=details)
249
250     def addSkip(self, test, reason=None, details=None):
251         return self._dispatch('addSkip', test, reason, details=details)
252
253     def addSuccess(self, test, details=None):
254         return self._dispatch('addSuccess', test, details=details)
255
256     def addUnexpectedSuccess(self, test, details=None):
257         return self._dispatch('addUnexpectedSuccess', test, details=details)
258
259     def startTestRun(self):
260         return self._dispatch('startTestRun')
261
262     def stopTestRun(self):
263         return self._dispatch('stopTestRun')
264
265     def time(self, a_datetime):
266         return self._dispatch('time', a_datetime)
267
268     def done(self):
269         return self._dispatch('done')
270
271     def wasSuccessful(self):
272         """Was this result successful?
273
274         Only returns True if every constituent result was successful.
275         """
276         return all(self._dispatch('wasSuccessful'))
277
278
279 class TextTestResult(TestResult):
280     """A TestResult which outputs activity to a text stream."""
281
282     def __init__(self, stream):
283         """Construct a TextTestResult writing to stream."""
284         super(TextTestResult, self).__init__()
285         self.stream = stream
286         self.sep1 = '=' * 70 + '\n'
287         self.sep2 = '-' * 70 + '\n'
288
289     def _delta_to_float(self, a_timedelta):
290         return (a_timedelta.days * 86400.0 + a_timedelta.seconds +
291             a_timedelta.microseconds / 1000000.0)
292
293     def _show_list(self, label, error_list):
294         for test, output in error_list:
295             self.stream.write(self.sep1)
296             self.stream.write("%s: %s\n" % (label, test.id()))
297             self.stream.write(self.sep2)
298             self.stream.write(output)
299
300     def startTestRun(self):
301         super(TextTestResult, self).startTestRun()
302         self.__start = self._now()
303         self.stream.write("Tests running...\n")
304
305     def stopTestRun(self):
306         if self.testsRun != 1:
307             plural = 's'
308         else:
309             plural = ''
310         stop = self._now()
311         self._show_list('ERROR', self.errors)
312         self._show_list('FAIL', self.failures)
313         for test in self.unexpectedSuccesses:
314             self.stream.write(
315                 "%sUNEXPECTED SUCCESS: %s\n%s" % (
316                     self.sep1, test.id(), self.sep2))
317         self.stream.write("\nRan %d test%s in %.3fs\n" %
318             (self.testsRun, plural,
319              self._delta_to_float(stop - self.__start)))
320         if self.wasSuccessful():
321             self.stream.write("OK\n")
322         else:
323             self.stream.write("FAILED (")
324             details = []
325             details.append("failures=%d" % (
326                 sum(map(len, (
327                     self.failures, self.errors, self.unexpectedSuccesses)))))
328             self.stream.write(", ".join(details))
329             self.stream.write(")\n")
330         super(TextTestResult, self).stopTestRun()
331
332
333 class ThreadsafeForwardingResult(TestResult):
334     """A TestResult which ensures the target does not receive mixed up calls.
335
336     This is used when receiving test results from multiple sources, and batches
337     up all the activity for a single test into a thread-safe batch where all
338     other ThreadsafeForwardingResult objects sharing the same semaphore will be
339     locked out.
340
341     Typical use of ThreadsafeForwardingResult involves creating one
342     ThreadsafeForwardingResult per thread in a ConcurrentTestSuite. These
343     forward to the TestResult that the ConcurrentTestSuite run method was
344     called with.
345
346     target.done() is called once for each ThreadsafeForwardingResult that
347     forwards to the same target. If the target's done() takes special action,
348     care should be taken to accommodate this.
349     """
350
351     def __init__(self, target, semaphore):
352         """Create a ThreadsafeForwardingResult forwarding to target.
353
354         :param target: A TestResult.
355         :param semaphore: A threading.Semaphore with limit 1.
356         """
357         TestResult.__init__(self)
358         self.result = ExtendedToOriginalDecorator(target)
359         self.semaphore = semaphore
360
361     def __repr__(self):
362         return '<%s %r>' % (self.__class__.__name__, self.result)
363
364     def _add_result_with_semaphore(self, method, test, *args, **kwargs):
365         self.semaphore.acquire()
366         try:
367             self.result.time(self._test_start)
368             self.result.startTest(test)
369             self.result.time(self._now())
370             try:
371                 method(test, *args, **kwargs)
372             finally:
373                 self.result.stopTest(test)
374         finally:
375             self.semaphore.release()
376
377     def addError(self, test, err=None, details=None):
378         self._add_result_with_semaphore(self.result.addError,
379             test, err, details=details)
380
381     def addExpectedFailure(self, test, err=None, details=None):
382         self._add_result_with_semaphore(self.result.addExpectedFailure,
383             test, err, details=details)
384
385     def addFailure(self, test, err=None, details=None):
386         self._add_result_with_semaphore(self.result.addFailure,
387             test, err, details=details)
388
389     def addSkip(self, test, reason=None, details=None):
390         self._add_result_with_semaphore(self.result.addSkip,
391             test, reason, details=details)
392
393     def addSuccess(self, test, details=None):
394         self._add_result_with_semaphore(self.result.addSuccess,
395             test, details=details)
396
397     def addUnexpectedSuccess(self, test, details=None):
398         self._add_result_with_semaphore(self.result.addUnexpectedSuccess,
399             test, details=details)
400
401     def startTestRun(self):
402         self.semaphore.acquire()
403         try:
404             self.result.startTestRun()
405         finally:
406             self.semaphore.release()
407
408     def stopTestRun(self):
409         self.semaphore.acquire()
410         try:
411             self.result.stopTestRun()
412         finally:
413             self.semaphore.release()
414
415     def done(self):
416         self.semaphore.acquire()
417         try:
418             self.result.done()
419         finally:
420             self.semaphore.release()
421
422     def startTest(self, test):
423         self._test_start = self._now()
424         super(ThreadsafeForwardingResult, self).startTest(test)
425
426     def wasSuccessful(self):
427         return self.result.wasSuccessful()
428
429
430 class ExtendedToOriginalDecorator(object):
431     """Permit new TestResult API code to degrade gracefully with old results.
432
433     This decorates an existing TestResult and converts missing outcomes
434     such as addSkip to older outcomes such as addSuccess. It also supports
435     the extended details protocol. In all cases the most recent protocol
436     is attempted first, and fallbacks only occur when the decorated result
437     does not support the newer style of calling.
438     """
439
440     def __init__(self, decorated):
441         self.decorated = decorated
442
443     def __repr__(self):
444         return '<%s %r>' % (self.__class__.__name__, self.decorated)
445
446     def __getattr__(self, name):
447         return getattr(self.decorated, name)
448
449     def addError(self, test, err=None, details=None):
450         self._check_args(err, details)
451         if details is not None:
452             try:
453                 return self.decorated.addError(test, details=details)
454             except TypeError:
455                 # have to convert
456                 err = self._details_to_exc_info(details)
457         return self.decorated.addError(test, err)
458
459     def addExpectedFailure(self, test, err=None, details=None):
460         self._check_args(err, details)
461         addExpectedFailure = getattr(
462             self.decorated, 'addExpectedFailure', None)
463         if addExpectedFailure is None:
464             return self.addSuccess(test)
465         if details is not None:
466             try:
467                 return addExpectedFailure(test, details=details)
468             except TypeError:
469                 # have to convert
470                 err = self._details_to_exc_info(details)
471         return addExpectedFailure(test, err)
472
473     def addFailure(self, test, err=None, details=None):
474         self._check_args(err, details)
475         if details is not None:
476             try:
477                 return self.decorated.addFailure(test, details=details)
478             except TypeError:
479                 # have to convert
480                 err = self._details_to_exc_info(details)
481         return self.decorated.addFailure(test, err)
482
483     def addSkip(self, test, reason=None, details=None):
484         self._check_args(reason, details)
485         addSkip = getattr(self.decorated, 'addSkip', None)
486         if addSkip is None:
487             return self.decorated.addSuccess(test)
488         if details is not None:
489             try:
490                 return addSkip(test, details=details)
491             except TypeError:
492                 # extract the reason if it's available
493                 try:
494                     reason = ''.join(details['reason'].iter_text())
495                 except KeyError:
496                     reason = _details_to_str(details)
497         return addSkip(test, reason)
498
499     def addUnexpectedSuccess(self, test, details=None):
500         outcome = getattr(self.decorated, 'addUnexpectedSuccess', None)
501         if outcome is None:
502             try:
503                 test.fail("")
504             except test.failureException:
505                 return self.addFailure(test, sys.exc_info())
506         if details is not None:
507             try:
508                 return outcome(test, details=details)
509             except TypeError:
510                 pass
511         return outcome(test)
512
513     def addSuccess(self, test, details=None):
514         if details is not None:
515             try:
516                 return self.decorated.addSuccess(test, details=details)
517             except TypeError:
518                 pass
519         return self.decorated.addSuccess(test)
520
521     def _check_args(self, err, details):
522         param_count = 0
523         if err is not None:
524             param_count += 1
525         if details is not None:
526             param_count += 1
527         if param_count != 1:
528             raise ValueError("Must pass only one of err '%s' and details '%s"
529                 % (err, details))
530
531     def _details_to_exc_info(self, details):
532         """Convert a details dict to an exc_info tuple."""
533         return (
534             _StringException,
535             _StringException(_details_to_str(details, special='traceback')),
536             None)
537
538     def done(self):
539         try:
540             return self.decorated.done()
541         except AttributeError:
542             return
543
544     def progress(self, offset, whence):
545         method = getattr(self.decorated, 'progress', None)
546         if method is None:
547             return
548         return method(offset, whence)
549
550     @property
551     def shouldStop(self):
552         return self.decorated.shouldStop
553
554     def startTest(self, test):
555         return self.decorated.startTest(test)
556
557     def startTestRun(self):
558         try:
559             return self.decorated.startTestRun()
560         except AttributeError:
561             return
562
563     def stop(self):
564         return self.decorated.stop()
565
566     def stopTest(self, test):
567         return self.decorated.stopTest(test)
568
569     def stopTestRun(self):
570         try:
571             return self.decorated.stopTestRun()
572         except AttributeError:
573             return
574
575     def tags(self, new_tags, gone_tags):
576         method = getattr(self.decorated, 'tags', None)
577         if method is None:
578             return
579         return method(new_tags, gone_tags)
580
581     def time(self, a_datetime):
582         method = getattr(self.decorated, 'time', None)
583         if method is None:
584             return
585         return method(a_datetime)
586
587     def wasSuccessful(self):
588         return self.decorated.wasSuccessful()
589
590
591 class _StringException(Exception):
592     """An exception made from an arbitrary string."""
593
594     if not str_is_unicode:
595         def __init__(self, string):
596             if type(string) is not unicode:
597                 raise TypeError("_StringException expects unicode, got %r" %
598                     (string,))
599             Exception.__init__(self, string)
600
601         def __str__(self):
602             return self.args[0].encode("utf-8")
603
604         def __unicode__(self):
605             return self.args[0]
606     # For 3.0 and above the default __str__ is fine, so we don't define one.
607
608     def __hash__(self):
609         return id(self)
610
611     def __eq__(self, other):
612         try:
613             return self.args == other.args
614         except AttributeError:
615             return False
616
617
618 def _format_text_attachment(name, text):
619     if '\n' in text:
620         return "%s: {{{\n%s\n}}}\n" % (name, text)
621     return "%s: {{{%s}}}" % (name, text)
622
623
624 def _details_to_str(details, special=None):
625     """Convert a details dict to a string.
626
627     :param details: A dictionary mapping short names to ``Content`` objects.
628     :param special: If specified, an attachment that should have special
629         attention drawn to it. The primary attachment. Normally it's the
630         traceback that caused the test to fail.
631     :return: A formatted string that can be included in text test results.
632     """
633     empty_attachments = []
634     binary_attachments = []
635     text_attachments = []
636     special_content = None
637     # sorted is for testing, may want to remove that and use a dict
638     # subclass with defined order for items instead.
639     for key, content in sorted(details.items()):
640         if content.content_type.type != 'text':
641             binary_attachments.append((key, content.content_type))
642             continue
643         text = _u('').join(content.iter_text()).strip()
644         if not text:
645             empty_attachments.append(key)
646             continue
647         # We want the 'special' attachment to be at the bottom.
648         if key == special:
649             special_content = '%s\n' % (text,)
650             continue
651         text_attachments.append(_format_text_attachment(key, text))
652     if text_attachments and not text_attachments[-1].endswith('\n'):
653         text_attachments.append('')
654     if special_content:
655         text_attachments.append(special_content)
656     lines = []
657     if binary_attachments:
658         lines.append('Binary content:\n')
659         for name, content_type in binary_attachments:
660             lines.append('  %s (%s)\n' % (name, content_type))
661     if empty_attachments:
662         lines.append('Empty attachments:\n')
663         for name in empty_attachments:
664             lines.append('  %s\n' % (name,))
665     if (binary_attachments or empty_attachments) and text_attachments:
666         lines.append('\n')
667     lines.append('\n'.join(text_attachments))
668     return _u('').join(lines)