Remove bundled subunit.
[obnox/samba/samba-obnox.git] / lib / testtools / testtools / tests / test_content.py
1 # Copyright (c) 2008-2012 testtools developers. See LICENSE for details.
2
3 import json
4 import os
5 import tempfile
6 import unittest
7
8 from testtools import TestCase
9 from testtools.compat import (
10     _b,
11     _u,
12     BytesIO,
13     StringIO,
14     )
15 from testtools.content import (
16     attach_file,
17     Content,
18     content_from_file,
19     content_from_stream,
20     JSON,
21     json_content,
22     TracebackContent,
23     text_content,
24     )
25 from testtools.content_type import (
26     ContentType,
27     UTF8_TEXT,
28     )
29 from testtools.matchers import (
30     Equals,
31     MatchesException,
32     Raises,
33     raises,
34     )
35 from testtools.tests.helpers import an_exc_info
36
37
38 raises_value_error = Raises(MatchesException(ValueError))
39
40
41 class TestContent(TestCase):
42
43     def test___init___None_errors(self):
44         self.assertThat(lambda: Content(None, None), raises_value_error)
45         self.assertThat(
46             lambda: Content(None, lambda: ["traceback"]), raises_value_error)
47         self.assertThat(
48             lambda: Content(ContentType("text", "traceback"), None),
49             raises_value_error)
50
51     def test___init___sets_ivars(self):
52         content_type = ContentType("foo", "bar")
53         content = Content(content_type, lambda: ["bytes"])
54         self.assertEqual(content_type, content.content_type)
55         self.assertEqual(["bytes"], list(content.iter_bytes()))
56
57     def test___eq__(self):
58         content_type = ContentType("foo", "bar")
59         one_chunk = lambda: [_b("bytes")]
60         two_chunk = lambda: [_b("by"), _b("tes")]
61         content1 = Content(content_type, one_chunk)
62         content2 = Content(content_type, one_chunk)
63         content3 = Content(content_type, two_chunk)
64         content4 = Content(content_type, lambda: [_b("by"), _b("te")])
65         content5 = Content(ContentType("f", "b"), two_chunk)
66         self.assertEqual(content1, content2)
67         self.assertEqual(content1, content3)
68         self.assertNotEqual(content1, content4)
69         self.assertNotEqual(content1, content5)
70
71     def test___repr__(self):
72         content = Content(ContentType("application", "octet-stream"),
73             lambda: [_b("\x00bin"), _b("ary\xff")])
74         self.assertIn("\\x00binary\\xff", repr(content))
75
76     def test_iter_text_not_text_errors(self):
77         content_type = ContentType("foo", "bar")
78         content = Content(content_type, lambda: ["bytes"])
79         self.assertThat(content.iter_text, raises_value_error)
80
81     def test_iter_text_decodes(self):
82         content_type = ContentType("text", "strange", {"charset": "utf8"})
83         content = Content(
84             content_type, lambda: [_u("bytes\xea").encode("utf8")])
85         self.assertEqual([_u("bytes\xea")], list(content.iter_text()))
86
87     def test_iter_text_default_charset_iso_8859_1(self):
88         content_type = ContentType("text", "strange")
89         text = _u("bytes\xea")
90         iso_version = text.encode("ISO-8859-1")
91         content = Content(content_type, lambda: [iso_version])
92         self.assertEqual([text], list(content.iter_text()))
93
94     def test_as_text(self):
95         content_type = ContentType("text", "strange", {"charset": "utf8"})
96         content = Content(
97             content_type, lambda: [_u("bytes\xea").encode("utf8")])
98         self.assertEqual(_u("bytes\xea"), content.as_text())
99
100     def test_from_file(self):
101         fd, path = tempfile.mkstemp()
102         self.addCleanup(os.remove, path)
103         os.write(fd, _b('some data'))
104         os.close(fd)
105         content = content_from_file(path, UTF8_TEXT, chunk_size=2)
106         self.assertThat(
107             list(content.iter_bytes()),
108             Equals([_b('so'), _b('me'), _b(' d'), _b('at'), _b('a')]))
109
110     def test_from_nonexistent_file(self):
111         directory = tempfile.mkdtemp()
112         nonexistent = os.path.join(directory, 'nonexistent-file')
113         content = content_from_file(nonexistent)
114         self.assertThat(content.iter_bytes, raises(IOError))
115
116     def test_from_file_default_type(self):
117         content = content_from_file('/nonexistent/path')
118         self.assertThat(content.content_type, Equals(UTF8_TEXT))
119
120     def test_from_file_eager_loading(self):
121         fd, path = tempfile.mkstemp()
122         os.write(fd, _b('some data'))
123         os.close(fd)
124         content = content_from_file(path, UTF8_TEXT, buffer_now=True)
125         os.remove(path)
126         self.assertThat(
127             ''.join(content.iter_text()), Equals('some data'))
128
129     def test_from_file_with_simple_seek(self):
130         f = tempfile.NamedTemporaryFile()
131         f.write(_b('some data'))
132         f.flush()
133         self.addCleanup(f.close)
134         content = content_from_file(
135             f.name, UTF8_TEXT, chunk_size=50, seek_offset=5)
136         self.assertThat(
137             list(content.iter_bytes()), Equals([_b('data')]))
138
139     def test_from_file_with_whence_seek(self):
140         f = tempfile.NamedTemporaryFile()
141         f.write(_b('some data'))
142         f.flush()
143         self.addCleanup(f.close)
144         content = content_from_file(
145             f.name, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2)
146         self.assertThat(
147             list(content.iter_bytes()), Equals([_b('data')]))
148
149     def test_from_stream(self):
150         data = StringIO('some data')
151         content = content_from_stream(data, UTF8_TEXT, chunk_size=2)
152         self.assertThat(
153             list(content.iter_bytes()), Equals(['so', 'me', ' d', 'at', 'a']))
154
155     def test_from_stream_default_type(self):
156         data = StringIO('some data')
157         content = content_from_stream(data)
158         self.assertThat(content.content_type, Equals(UTF8_TEXT))
159
160     def test_from_stream_eager_loading(self):
161         fd, path = tempfile.mkstemp()
162         self.addCleanup(os.remove, path)
163         self.addCleanup(os.close, fd)
164         os.write(fd, _b('some data'))
165         stream = open(path, 'rb')
166         self.addCleanup(stream.close)
167         content = content_from_stream(stream, UTF8_TEXT, buffer_now=True)
168         os.write(fd, _b('more data'))
169         self.assertThat(
170             ''.join(content.iter_text()), Equals('some data'))
171
172     def test_from_stream_with_simple_seek(self):
173         data = BytesIO(_b('some data'))
174         content = content_from_stream(
175             data, UTF8_TEXT, chunk_size=50, seek_offset=5)
176         self.assertThat(
177             list(content.iter_bytes()), Equals([_b('data')]))
178
179     def test_from_stream_with_whence_seek(self):
180         data = BytesIO(_b('some data'))
181         content = content_from_stream(
182             data, UTF8_TEXT, chunk_size=50, seek_offset=-4, seek_whence=2)
183         self.assertThat(
184             list(content.iter_bytes()), Equals([_b('data')]))
185
186     def test_from_text(self):
187         data = _u("some data")
188         expected = Content(UTF8_TEXT, lambda: [data.encode('utf8')])
189         self.assertEqual(expected, text_content(data))
190
191     def test_json_content(self):
192         data = {'foo': 'bar'}
193         expected = Content(JSON, lambda: [_b('{"foo": "bar"}')])
194         self.assertEqual(expected, json_content(data))
195
196
197 class TestTracebackContent(TestCase):
198
199     def test___init___None_errors(self):
200         self.assertThat(
201             lambda: TracebackContent(None, None), raises_value_error)
202
203     def test___init___sets_ivars(self):
204         content = TracebackContent(an_exc_info, self)
205         content_type = ContentType("text", "x-traceback",
206             {"language": "python", "charset": "utf8"})
207         self.assertEqual(content_type, content.content_type)
208         result = unittest.TestResult()
209         expected = result._exc_info_to_string(an_exc_info, self)
210         self.assertEqual(expected, ''.join(list(content.iter_text())))
211
212
213 class TestAttachFile(TestCase):
214
215     def make_file(self, data):
216         # GZ 2011-04-21: This helper could be useful for methods above trying
217         #                to use mkstemp, but should handle write failures and
218         #                always close the fd. There must be a better way.
219         fd, path = tempfile.mkstemp()
220         self.addCleanup(os.remove, path)
221         os.write(fd, _b(data))
222         os.close(fd)
223         return path
224
225     def test_simple(self):
226         class SomeTest(TestCase):
227             def test_foo(self):
228                 pass
229         test = SomeTest('test_foo')
230         data = 'some data'
231         path = self.make_file(data)
232         my_content = text_content(data)
233         attach_file(test, path, name='foo')
234         self.assertEqual({'foo': my_content}, test.getDetails())
235
236     def test_optional_name(self):
237         # If no name is provided, attach_file just uses the base name of the
238         # file.
239         class SomeTest(TestCase):
240             def test_foo(self):
241                 pass
242         test = SomeTest('test_foo')
243         path = self.make_file('some data')
244         base_path = os.path.basename(path)
245         attach_file(test, path)
246         self.assertEqual([base_path], list(test.getDetails()))
247
248     def test_lazy_read(self):
249         class SomeTest(TestCase):
250             def test_foo(self):
251                 pass
252         test = SomeTest('test_foo')
253         path = self.make_file('some data')
254         attach_file(test, path, name='foo', buffer_now=False)
255         content = test.getDetails()['foo']
256         content_file = open(path, 'w')
257         content_file.write('new data')
258         content_file.close()
259         self.assertEqual(''.join(content.iter_text()), 'new data')
260
261     def test_eager_read_by_default(self):
262         class SomeTest(TestCase):
263             def test_foo(self):
264                 pass
265         test = SomeTest('test_foo')
266         path = self.make_file('some data')
267         attach_file(test, path, name='foo')
268         content = test.getDetails()['foo']
269         content_file = open(path, 'w')
270         content_file.write('new data')
271         content_file.close()
272         self.assertEqual(''.join(content.iter_text()), 'some data')
273
274
275 def test_suite():
276     from unittest import TestLoader
277     return TestLoader().loadTestsFromName(__name__)