ldb: Add test to show a reindex failure must not leave the DB corrupt
[metze/samba/wip.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18
19 def tempdir():
20     import tempfile
21     try:
22         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
23     except KeyError:
24         dir_prefix = None
25     return tempfile.mkdtemp(dir=dir_prefix)
26
27
28 class NoContextTests(TestCase):
29
30     def test_valid_attr_name(self):
31         self.assertTrue(ldb.valid_attr_name("foo"))
32         self.assertFalse(ldb.valid_attr_name("24foo"))
33
34     def test_timestring(self):
35         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
37
38     def test_string_to_time(self):
39         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
41
42     def test_binary_encode(self):
43         encoded = ldb.binary_encode(b'test\\x')
44         decoded = ldb.binary_decode(encoded)
45         self.assertEqual(decoded, b'test\\x')
46
47         encoded2 = ldb.binary_encode('test\\x')
48         self.assertEqual(encoded2, encoded)
49
50
51 class LdbBaseTest(TestCase):
52     def setUp(self):
53         super(LdbBaseTest, self).setUp()
54         try:
55             if self.prefix is None:
56                 self.prefix = TDB_PREFIX
57         except AttributeError:
58             self.prefix = TDB_PREFIX
59
60     def tearDown(self):
61         super(LdbBaseTest, self).tearDown()
62
63     def url(self):
64         return self.prefix + self.filename
65
66     def flags(self):
67         if self.prefix == MDB_PREFIX:
68             return ldb.FLG_NOSYNC
69         else:
70             return 0
71
72
73 class SimpleLdb(LdbBaseTest):
74
75     def setUp(self):
76         super(SimpleLdb, self).setUp()
77         self.testdir = tempdir()
78         self.filename = os.path.join(self.testdir, "test.ldb")
79         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
80
81     def tearDown(self):
82         shutil.rmtree(self.testdir)
83         super(SimpleLdb, self).tearDown()
84         # Ensure the LDB is closed now, so we close the FD
85         del(self.ldb)
86
87     def test_connect(self):
88         ldb.Ldb(self.url(), flags=self.flags())
89
90     def test_connect_none(self):
91         ldb.Ldb()
92
93     def test_connect_later(self):
94         x = ldb.Ldb()
95         x.connect(self.url(), flags=self.flags())
96
97     def test_repr(self):
98         x = ldb.Ldb()
99         self.assertTrue(repr(x).startswith("<ldb connection"))
100
101     def test_set_create_perms(self):
102         x = ldb.Ldb()
103         x.set_create_perms(0o600)
104
105     def test_modules_none(self):
106         x = ldb.Ldb()
107         self.assertEqual([], x.modules())
108
109     def test_modules_tdb(self):
110         x = ldb.Ldb(self.url(), flags=self.flags())
111         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
112
113     def test_firstmodule_none(self):
114         x = ldb.Ldb()
115         self.assertEqual(x.firstmodule, None)
116
117     def test_firstmodule_tdb(self):
118         x = ldb.Ldb(self.url(), flags=self.flags())
119         mod = x.firstmodule
120         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
121
122     def test_search(self):
123         l = ldb.Ldb(self.url(), flags=self.flags())
124         self.assertEqual(len(l.search()), 0)
125
126     def test_search_controls(self):
127         l = ldb.Ldb(self.url(), flags=self.flags())
128         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
129
130     def test_search_attrs(self):
131         l = ldb.Ldb(self.url(), flags=self.flags())
132         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
133
134     def test_search_string_dn(self):
135         l = ldb.Ldb(self.url(), flags=self.flags())
136         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
137
138     def test_search_attr_string(self):
139         l = ldb.Ldb(self.url(), flags=self.flags())
140         self.assertRaises(TypeError, l.search, attrs="dc")
141         self.assertRaises(TypeError, l.search, attrs=b"dc")
142
143     def test_opaque(self):
144         l = ldb.Ldb(self.url(), flags=self.flags())
145         l.set_opaque("my_opaque", l)
146         self.assertTrue(l.get_opaque("my_opaque") is not None)
147         self.assertEqual(None, l.get_opaque("unknown"))
148
149     def test_search_scope_base_empty_db(self):
150         l = ldb.Ldb(self.url(), flags=self.flags())
151         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
152                           ldb.SCOPE_BASE)), 0)
153
154     def test_search_scope_onelevel_empty_db(self):
155         l = ldb.Ldb(self.url(), flags=self.flags())
156         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
157                           ldb.SCOPE_ONELEVEL)), 0)
158
159     def test_delete(self):
160         l = ldb.Ldb(self.url(), flags=self.flags())
161         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
162
163     def test_delete_w_unhandled_ctrl(self):
164         l = ldb.Ldb(self.url(), flags=self.flags())
165         m = ldb.Message()
166         m.dn = ldb.Dn(l, "dc=foo1")
167         m["b"] = [b"a"]
168         l.add(m)
169         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
170         l.delete(m.dn)
171
172     def test_contains(self):
173         name = self.url()
174         l = ldb.Ldb(name, flags=self.flags())
175         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
176         l = ldb.Ldb(name, flags=self.flags())
177         m = ldb.Message()
178         m.dn = ldb.Dn(l, "dc=foo3")
179         m["b"] = ["a"]
180         l.add(m)
181         try:
182             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
183             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
184         finally:
185             l.delete(m.dn)
186
187     def test_get_config_basedn(self):
188         l = ldb.Ldb(self.url(), flags=self.flags())
189         self.assertEqual(None, l.get_config_basedn())
190
191     def test_get_root_basedn(self):
192         l = ldb.Ldb(self.url(), flags=self.flags())
193         self.assertEqual(None, l.get_root_basedn())
194
195     def test_get_schema_basedn(self):
196         l = ldb.Ldb(self.url(), flags=self.flags())
197         self.assertEqual(None, l.get_schema_basedn())
198
199     def test_get_default_basedn(self):
200         l = ldb.Ldb(self.url(), flags=self.flags())
201         self.assertEqual(None, l.get_default_basedn())
202
203     def test_add(self):
204         l = ldb.Ldb(self.url(), flags=self.flags())
205         m = ldb.Message()
206         m.dn = ldb.Dn(l, "dc=foo4")
207         m["bla"] = b"bla"
208         self.assertEqual(len(l.search()), 0)
209         l.add(m)
210         try:
211             self.assertEqual(len(l.search()), 1)
212         finally:
213             l.delete(ldb.Dn(l, "dc=foo4"))
214
215     def test_search_iterator(self):
216         l = ldb.Ldb(self.url(), flags=self.flags())
217         s = l.search_iterator()
218         s.abandon()
219         try:
220             for me in s:
221                 self.fail()
222             self.fail()
223         except RuntimeError as re:
224             pass
225         try:
226             s.abandon()
227             self.fail()
228         except RuntimeError as re:
229             pass
230         try:
231             s.result()
232             self.fail()
233         except RuntimeError as re:
234             pass
235
236         s = l.search_iterator()
237         count = 0
238         for me in s:
239             self.assertTrue(isinstance(me, ldb.Message))
240             count += 1
241         r = s.result()
242         self.assertEqual(len(r), 0)
243         self.assertEqual(count, 0)
244
245         m1 = ldb.Message()
246         m1.dn = ldb.Dn(l, "dc=foo4")
247         m1["bla"] = b"bla"
248         l.add(m1)
249         try:
250             s = l.search_iterator()
251             msgs = []
252             for me in s:
253                 self.assertTrue(isinstance(me, ldb.Message))
254                 count += 1
255                 msgs.append(me)
256             r = s.result()
257             self.assertEqual(len(r), 0)
258             self.assertEqual(len(msgs), 1)
259             self.assertEqual(msgs[0].dn, m1.dn)
260
261             m2 = ldb.Message()
262             m2.dn = ldb.Dn(l, "dc=foo5")
263             m2["bla"] = b"bla"
264             l.add(m2)
265
266             s = l.search_iterator()
267             msgs = []
268             for me in s:
269                 self.assertTrue(isinstance(me, ldb.Message))
270                 count += 1
271                 msgs.append(me)
272             r = s.result()
273             self.assertEqual(len(r), 0)
274             self.assertEqual(len(msgs), 2)
275             if msgs[0].dn == m1.dn:
276                 self.assertEqual(msgs[0].dn, m1.dn)
277                 self.assertEqual(msgs[1].dn, m2.dn)
278             else:
279                 self.assertEqual(msgs[0].dn, m2.dn)
280                 self.assertEqual(msgs[1].dn, m1.dn)
281
282             s = l.search_iterator()
283             msgs = []
284             for me in s:
285                 self.assertTrue(isinstance(me, ldb.Message))
286                 count += 1
287                 msgs.append(me)
288                 break
289             try:
290                 s.result()
291                 self.fail()
292             except RuntimeError as re:
293                 pass
294             for me in s:
295                 self.assertTrue(isinstance(me, ldb.Message))
296                 count += 1
297                 msgs.append(me)
298                 break
299             for me in s:
300                 self.fail()
301
302             r = s.result()
303             self.assertEqual(len(r), 0)
304             self.assertEqual(len(msgs), 2)
305             if msgs[0].dn == m1.dn:
306                 self.assertEqual(msgs[0].dn, m1.dn)
307                 self.assertEqual(msgs[1].dn, m2.dn)
308             else:
309                 self.assertEqual(msgs[0].dn, m2.dn)
310                 self.assertEqual(msgs[1].dn, m1.dn)
311         finally:
312             l.delete(ldb.Dn(l, "dc=foo4"))
313             l.delete(ldb.Dn(l, "dc=foo5"))
314
315     def test_add_text(self):
316         l = ldb.Ldb(self.url(), flags=self.flags())
317         m = ldb.Message()
318         m.dn = ldb.Dn(l, "dc=foo4")
319         m["bla"] = "bla"
320         self.assertEqual(len(l.search()), 0)
321         l.add(m)
322         try:
323             self.assertEqual(len(l.search()), 1)
324         finally:
325             l.delete(ldb.Dn(l, "dc=foo4"))
326
327     def test_add_w_unhandled_ctrl(self):
328         l = ldb.Ldb(self.url(), flags=self.flags())
329         m = ldb.Message()
330         m.dn = ldb.Dn(l, "dc=foo4")
331         m["bla"] = b"bla"
332         self.assertEqual(len(l.search()), 0)
333         self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
334
335     def test_add_dict(self):
336         l = ldb.Ldb(self.url(), flags=self.flags())
337         m = {"dn": ldb.Dn(l, "dc=foo5"),
338              "bla": b"bla"}
339         self.assertEqual(len(l.search()), 0)
340         l.add(m)
341         try:
342             self.assertEqual(len(l.search()), 1)
343         finally:
344             l.delete(ldb.Dn(l, "dc=foo5"))
345
346     def test_add_dict_text(self):
347         l = ldb.Ldb(self.url(), flags=self.flags())
348         m = {"dn": ldb.Dn(l, "dc=foo5"),
349              "bla": "bla"}
350         self.assertEqual(len(l.search()), 0)
351         l.add(m)
352         try:
353             self.assertEqual(len(l.search()), 1)
354         finally:
355             l.delete(ldb.Dn(l, "dc=foo5"))
356
357     def test_add_dict_string_dn(self):
358         l = ldb.Ldb(self.url(), flags=self.flags())
359         m = {"dn": "dc=foo6", "bla": b"bla"}
360         self.assertEqual(len(l.search()), 0)
361         l.add(m)
362         try:
363             self.assertEqual(len(l.search()), 1)
364         finally:
365             l.delete(ldb.Dn(l, "dc=foo6"))
366
367     def test_add_dict_bytes_dn(self):
368         l = ldb.Ldb(self.url(), flags=self.flags())
369         m = {"dn": b"dc=foo6", "bla": b"bla"}
370         self.assertEqual(len(l.search()), 0)
371         l.add(m)
372         try:
373             self.assertEqual(len(l.search()), 1)
374         finally:
375             l.delete(ldb.Dn(l, "dc=foo6"))
376
377     def test_rename(self):
378         l = ldb.Ldb(self.url(), flags=self.flags())
379         m = ldb.Message()
380         m.dn = ldb.Dn(l, "dc=foo7")
381         m["bla"] = b"bla"
382         self.assertEqual(len(l.search()), 0)
383         l.add(m)
384         try:
385             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
386             self.assertEqual(len(l.search()), 1)
387         finally:
388             l.delete(ldb.Dn(l, "dc=bar"))
389
390     def test_rename_string_dns(self):
391         l = ldb.Ldb(self.url(), flags=self.flags())
392         m = ldb.Message()
393         m.dn = ldb.Dn(l, "dc=foo8")
394         m["bla"] = b"bla"
395         self.assertEqual(len(l.search()), 0)
396         l.add(m)
397         self.assertEqual(len(l.search()), 1)
398         try:
399             l.rename("dc=foo8", "dc=bar")
400             self.assertEqual(len(l.search()), 1)
401         finally:
402             l.delete(ldb.Dn(l, "dc=bar"))
403
404     def test_empty_dn(self):
405         l = ldb.Ldb(self.url(), flags=self.flags())
406         self.assertEqual(0, len(l.search()))
407         m = ldb.Message()
408         m.dn = ldb.Dn(l, "dc=empty")
409         l.add(m)
410         rm = l.search()
411         self.assertEqual(1, len(rm))
412         self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
413
414         rm = l.search(m.dn)
415         self.assertEqual(1, len(rm))
416         self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
417         rm = l.search(m.dn, attrs=["blah"])
418         self.assertEqual(1, len(rm))
419         self.assertEqual(0, len(rm[0]))
420
421     def test_modify_delete(self):
422         l = ldb.Ldb(self.url(), flags=self.flags())
423         m = ldb.Message()
424         m.dn = ldb.Dn(l, "dc=modifydelete")
425         m["bla"] = [b"1234"]
426         l.add(m)
427         rm = l.search(m.dn)[0]
428         self.assertEqual([b"1234"], list(rm["bla"]))
429         try:
430             m = ldb.Message()
431             m.dn = ldb.Dn(l, "dc=modifydelete")
432             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
433             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
434             l.modify(m)
435             rm = l.search(m.dn)
436             self.assertEqual(1, len(rm))
437             self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
438             rm = l.search(m.dn, attrs=["bla"])
439             self.assertEqual(1, len(rm))
440             self.assertEqual(0, len(rm[0]))
441         finally:
442             l.delete(ldb.Dn(l, "dc=modifydelete"))
443
444     def test_modify_delete_text(self):
445         l = ldb.Ldb(self.url(), flags=self.flags())
446         m = ldb.Message()
447         m.dn = ldb.Dn(l, "dc=modifydelete")
448         m.text["bla"] = ["1234"]
449         l.add(m)
450         rm = l.search(m.dn)[0]
451         self.assertEqual(["1234"], list(rm.text["bla"]))
452         try:
453             m = ldb.Message()
454             m.dn = ldb.Dn(l, "dc=modifydelete")
455             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
456             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
457             l.modify(m)
458             rm = l.search(m.dn)
459             self.assertEqual(1, len(rm))
460             self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
461             rm = l.search(m.dn, attrs=["bla"])
462             self.assertEqual(1, len(rm))
463             self.assertEqual(0, len(rm[0]))
464         finally:
465             l.delete(ldb.Dn(l, "dc=modifydelete"))
466
467     def test_modify_add(self):
468         l = ldb.Ldb(self.url(), flags=self.flags())
469         m = ldb.Message()
470         m.dn = ldb.Dn(l, "dc=add")
471         m["bla"] = [b"1234"]
472         l.add(m)
473         try:
474             m = ldb.Message()
475             m.dn = ldb.Dn(l, "dc=add")
476             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
477             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
478             l.modify(m)
479             rm = l.search(m.dn)[0]
480             self.assertEqual(2, len(rm))
481             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
482         finally:
483             l.delete(ldb.Dn(l, "dc=add"))
484
485     def test_modify_add_text(self):
486         l = ldb.Ldb(self.url(), flags=self.flags())
487         m = ldb.Message()
488         m.dn = ldb.Dn(l, "dc=add")
489         m.text["bla"] = ["1234"]
490         l.add(m)
491         try:
492             m = ldb.Message()
493             m.dn = ldb.Dn(l, "dc=add")
494             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
495             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
496             l.modify(m)
497             rm = l.search(m.dn)[0]
498             self.assertEqual(2, len(rm))
499             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
500         finally:
501             l.delete(ldb.Dn(l, "dc=add"))
502
503     def test_modify_replace(self):
504         l = ldb.Ldb(self.url(), flags=self.flags())
505         m = ldb.Message()
506         m.dn = ldb.Dn(l, "dc=modify2")
507         m["bla"] = [b"1234", b"456"]
508         l.add(m)
509         try:
510             m = ldb.Message()
511             m.dn = ldb.Dn(l, "dc=modify2")
512             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
513             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
514             l.modify(m)
515             rm = l.search(m.dn)[0]
516             self.assertEqual(2, len(rm))
517             self.assertEqual([b"789"], list(rm["bla"]))
518             rm = l.search(m.dn, attrs=["bla"])[0]
519             self.assertEqual(1, len(rm))
520         finally:
521             l.delete(ldb.Dn(l, "dc=modify2"))
522
523     def test_modify_replace_text(self):
524         l = ldb.Ldb(self.url(), flags=self.flags())
525         m = ldb.Message()
526         m.dn = ldb.Dn(l, "dc=modify2")
527         m.text["bla"] = ["1234", "456"]
528         l.add(m)
529         try:
530             m = ldb.Message()
531             m.dn = ldb.Dn(l, "dc=modify2")
532             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
533             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
534             l.modify(m)
535             rm = l.search(m.dn)[0]
536             self.assertEqual(2, len(rm))
537             self.assertEqual(["789"], list(rm.text["bla"]))
538             rm = l.search(m.dn, attrs=["bla"])[0]
539             self.assertEqual(1, len(rm))
540         finally:
541             l.delete(ldb.Dn(l, "dc=modify2"))
542
543     def test_modify_flags_change(self):
544         l = ldb.Ldb(self.url(), flags=self.flags())
545         m = ldb.Message()
546         m.dn = ldb.Dn(l, "dc=add")
547         m["bla"] = [b"1234"]
548         l.add(m)
549         try:
550             m = ldb.Message()
551             m.dn = ldb.Dn(l, "dc=add")
552             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
553             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
554             l.modify(m)
555             rm = l.search(m.dn)[0]
556             self.assertEqual(2, len(rm))
557             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
558
559             # Now create another modify, but switch the flags before we do it
560             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
561             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
562             l.modify(m)
563             rm = l.search(m.dn, attrs=["bla"])[0]
564             self.assertEqual(1, len(rm))
565             self.assertEqual([b"1234"], list(rm["bla"]))
566         finally:
567             l.delete(ldb.Dn(l, "dc=add"))
568
569     def test_modify_flags_change_text(self):
570         l = ldb.Ldb(self.url(), flags=self.flags())
571         m = ldb.Message()
572         m.dn = ldb.Dn(l, "dc=add")
573         m.text["bla"] = ["1234"]
574         l.add(m)
575         try:
576             m = ldb.Message()
577             m.dn = ldb.Dn(l, "dc=add")
578             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
579             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
580             l.modify(m)
581             rm = l.search(m.dn)[0]
582             self.assertEqual(2, len(rm))
583             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
584
585             # Now create another modify, but switch the flags before we do it
586             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
587             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
588             l.modify(m)
589             rm = l.search(m.dn, attrs=["bla"])[0]
590             self.assertEqual(1, len(rm))
591             self.assertEqual(["1234"], list(rm.text["bla"]))
592         finally:
593             l.delete(ldb.Dn(l, "dc=add"))
594
595     def test_transaction_commit(self):
596         l = ldb.Ldb(self.url(), flags=self.flags())
597         l.transaction_start()
598         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
599         m["foo"] = [b"bar"]
600         l.add(m)
601         l.transaction_commit()
602         l.delete(m.dn)
603
604     def test_transaction_cancel(self):
605         l = ldb.Ldb(self.url(), flags=self.flags())
606         l.transaction_start()
607         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
608         m["foo"] = [b"bar"]
609         l.add(m)
610         l.transaction_cancel()
611         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
612
613     def test_set_debug(self):
614         def my_report_fn(level, text):
615             pass
616         l = ldb.Ldb(self.url(), flags=self.flags())
617         l.set_debug(my_report_fn)
618
619     def test_zero_byte_string(self):
620         """Testing we do not get trapped in the \0 byte in a property string."""
621         l = ldb.Ldb(self.url(), flags=self.flags())
622         l.add({
623             "dn" : b"dc=somedn",
624             "objectclass" : b"user",
625             "cN" : b"LDAPtestUSER",
626             "givenname" : b"ldap",
627             "displayname" : b"foo\0bar",
628         })
629         res = l.search(expression="(dn=dc=somedn)")
630         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
631
632     def test_no_crash_broken_expr(self):
633         l = ldb.Ldb(self.url(), flags=self.flags())
634         self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
635
636 class SearchTests(LdbBaseTest):
637     def tearDown(self):
638         shutil.rmtree(self.testdir)
639         super(SearchTests, self).tearDown()
640
641         # Ensure the LDB is closed now, so we close the FD
642         del(self.l)
643
644
645     def setUp(self):
646         super(SearchTests, self).setUp()
647         self.testdir = tempdir()
648         self.filename = os.path.join(self.testdir, "search_test.ldb")
649         self.l = ldb.Ldb(self.url(),
650                          flags=self.flags(),
651                          options=["modules:rdn_name"])
652
653         self.l.add({"dn": "@ATTRIBUTES",
654                     "DC": "CASE_INSENSITIVE"})
655
656         # Note that we can't use the name objectGUID here, as we
657         # want to stay clear of the objectGUID handler in LDB and
658         # instead use just the 16 bytes raw, which we just keep
659         # to printable chars here for ease of handling.
660
661         self.l.add({"dn": "DC=SAMBA,DC=ORG",
662                     "name": b"samba.org",
663                     "objectUUID": b"0123456789abcddf"})
664         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
665                     "name": b"Admins",
666                     "x": "z", "y": "a",
667                     "objectUUID": b"0123456789abcde1"})
668         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
669                     "name": b"Users",
670                     "x": "z", "y": "a",
671                     "objectUUID": b"0123456789abcde2"})
672         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
673                     "name": b"OU #1",
674                     "x": "y", "y": "a",
675                     "objectUUID": b"0123456789abcde3"})
676         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
677                     "name": b"OU #2",
678                     "x": "y", "y": "a",
679                     "objectUUID": b"0123456789abcde4"})
680         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
681                     "name": b"OU #3",
682                     "x": "y", "y": "a",
683                     "objectUUID": b"0123456789abcde5"})
684         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
685                     "name": b"OU #4",
686                     "x": "y", "y": "a",
687                     "objectUUID": b"0123456789abcde6"})
688         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
689                     "name": b"OU #5",
690                     "x": "y", "y": "a",
691                     "objectUUID": b"0123456789abcde7"})
692         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
693                     "name": b"OU #6",
694                     "x": "y", "y": "a",
695                     "objectUUID": b"0123456789abcde8"})
696         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
697                     "name": b"OU #7",
698                     "x": "y", "y": "a",
699                     "objectUUID": b"0123456789abcde9"})
700         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
701                     "name": b"OU #8",
702                     "x": "y", "y": "a",
703                     "objectUUID": b"0123456789abcde0"})
704         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
705                     "name": b"OU #9",
706                     "x": "y", "y": "a",
707                     "objectUUID": b"0123456789abcdea"})
708         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
709                     "name": b"OU #10",
710                     "x": "y", "y": "a",
711                     "objectUUID": b"0123456789abcdeb"})
712         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
713                     "name": b"OU #10",
714                     "x": "y", "y": "a",
715                     "objectUUID": b"0123456789abcdec"})
716         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
717                     "name": b"OU #10",
718                     "x": "y", "y": "b",
719                     "objectUUID": b"0123456789abcded"})
720         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
721                     "name": b"OU #10",
722                     "x": "x", "y": "b",
723                     "objectUUID": b"0123456789abcdee"})
724         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
725                     "name": b"OU #10",
726                     "x": "x", "y": "b",
727                     "objectUUID": b"0123456789abcd01"})
728         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
729                     "name": b"OU #10",
730                     "x": "x", "y": "b",
731                     "objectUUID": b"0123456789abcd02"})
732         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
733                     "name": b"OU #10",
734                     "x": "x", "y": "b",
735                     "objectUUID": b"0123456789abcd03"})
736         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
737                     "name": b"OU #10",
738                     "x": "x", "y": "b",
739                     "objectUUID": b"0123456789abcd04"})
740         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
741                     "name": b"OU #10",
742                     "x": "x", "y": "b",
743                     "objectUUID": b"0123456789abcd05"})
744         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
745                     "name": b"OU #10",
746                     "x": "x", "y": "b",
747                     "objectUUID": b"0123456789abcd06"})
748         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
749                     "name": b"OU #10",
750                     "x": "x", "y": "b",
751                     "objectUUID": b"0123456789abcd07"})
752         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
753                     "name": b"OU #10",
754                     "x": "x", "y": "c",
755                     "objectUUID": b"0123456789abcd08"})
756         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
757                     "name": b"OU #10",
758                     "x": "x", "y": "c",
759                     "objectUUID": b"0123456789abcd09"})
760
761     def test_base(self):
762         """Testing a search"""
763
764         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
765                               scope=ldb.SCOPE_BASE)
766         self.assertEqual(len(res11), 1)
767
768     def test_base_lower(self):
769         """Testing a search"""
770
771         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
772                               scope=ldb.SCOPE_BASE)
773         self.assertEqual(len(res11), 1)
774
775     def test_base_or(self):
776         """Testing a search"""
777
778         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
779                               scope=ldb.SCOPE_BASE,
780                               expression="(|(ou=ou11)(ou=ou12))")
781         self.assertEqual(len(res11), 1)
782
783     def test_base_or2(self):
784         """Testing a search"""
785
786         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
787                               scope=ldb.SCOPE_BASE,
788                               expression="(|(x=y)(y=b))")
789         self.assertEqual(len(res11), 1)
790
791     def test_base_and(self):
792         """Testing a search"""
793
794         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
795                               scope=ldb.SCOPE_BASE,
796                               expression="(&(ou=ou11)(ou=ou12))")
797         self.assertEqual(len(res11), 0)
798
799     def test_base_and2(self):
800         """Testing a search"""
801
802         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
803                               scope=ldb.SCOPE_BASE,
804                               expression="(&(x=y)(y=a))")
805         self.assertEqual(len(res11), 1)
806
807     def test_base_false(self):
808         """Testing a search"""
809
810         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
811                               scope=ldb.SCOPE_BASE,
812                               expression="(|(ou=ou13)(ou=ou12))")
813         self.assertEqual(len(res11), 0)
814
815     def test_check_base_false(self):
816         """Testing a search"""
817         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
818                               scope=ldb.SCOPE_BASE,
819                               expression="(|(ou=ou13)(ou=ou12))")
820         self.assertEqual(len(res11), 0)
821
822     def test_check_base_error(self):
823         """Testing a search"""
824         checkbaseonsearch = {"dn": "@OPTIONS",
825                              "checkBaseOnSearch": b"TRUE"}
826         try:
827             self.l.add(checkbaseonsearch)
828         except ldb.LdbError as err:
829             enum = err.args[0]
830             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
831             m = ldb.Message.from_dict(self.l,
832                                       checkbaseonsearch)
833             self.l.modify(m)
834
835         try:
836             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
837                                   scope=ldb.SCOPE_BASE,
838                                   expression="(|(ou=ou13)(ou=ou12))")
839             self.fail("Should have failed on missing base")
840         except ldb.LdbError as err:
841             enum = err.args[0]
842             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
843
844     def test_subtree_and(self):
845         """Testing a search"""
846
847         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
848                               scope=ldb.SCOPE_SUBTREE,
849                               expression="(&(ou=ou11)(ou=ou12))")
850         self.assertEqual(len(res11), 0)
851
852     def test_subtree_and2(self):
853         """Testing a search"""
854
855         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
856                               scope=ldb.SCOPE_SUBTREE,
857                               expression="(&(x=y)(|(y=b)(y=c)))")
858         self.assertEqual(len(res11), 1)
859
860     def test_subtree_and2_lower(self):
861         """Testing a search"""
862
863         res11 = self.l.search(base="DC=samba,DC=org",
864                               scope=ldb.SCOPE_SUBTREE,
865                               expression="(&(x=y)(|(y=b)(y=c)))")
866         self.assertEqual(len(res11), 1)
867
868     def test_subtree_or(self):
869         """Testing a search"""
870
871         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
872                               scope=ldb.SCOPE_SUBTREE,
873                               expression="(|(ou=ou11)(ou=ou12))")
874         self.assertEqual(len(res11), 2)
875
876     def test_subtree_or2(self):
877         """Testing a search"""
878
879         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
880                               scope=ldb.SCOPE_SUBTREE,
881                               expression="(|(x=y)(y=b))")
882         self.assertEqual(len(res11), 20)
883
884     def test_subtree_or3(self):
885         """Testing a search"""
886
887         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
888                               scope=ldb.SCOPE_SUBTREE,
889                               expression="(|(x=y)(y=b)(y=c))")
890         self.assertEqual(len(res11), 22)
891
892     def test_one_and(self):
893         """Testing a search"""
894
895         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
896                               scope=ldb.SCOPE_ONELEVEL,
897                               expression="(&(ou=ou11)(ou=ou12))")
898         self.assertEqual(len(res11), 0)
899
900     def test_one_and2(self):
901         """Testing a search"""
902
903         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
904                               scope=ldb.SCOPE_ONELEVEL,
905                               expression="(&(x=y)(y=b))")
906         self.assertEqual(len(res11), 1)
907
908     def test_one_or(self):
909         """Testing a search"""
910
911         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
912                               scope=ldb.SCOPE_ONELEVEL,
913                               expression="(|(ou=ou11)(ou=ou12))")
914         self.assertEqual(len(res11), 2)
915
916     def test_one_or2(self):
917         """Testing a search"""
918
919         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
920                               scope=ldb.SCOPE_ONELEVEL,
921                               expression="(|(x=y)(y=b))")
922         self.assertEqual(len(res11), 20)
923
924     def test_one_or2_lower(self):
925         """Testing a search"""
926
927         res11 = self.l.search(base="DC=samba,DC=org",
928                               scope=ldb.SCOPE_ONELEVEL,
929                               expression="(|(x=y)(y=b))")
930         self.assertEqual(len(res11), 20)
931
932     def test_subtree_and_or(self):
933         """Testing a search"""
934
935         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
936                               scope=ldb.SCOPE_SUBTREE,
937                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
938         self.assertEqual(len(res11), 0)
939
940     def test_subtree_and_or2(self):
941         """Testing a search"""
942
943         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
944                               scope=ldb.SCOPE_SUBTREE,
945                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
946         self.assertEqual(len(res11), 0)
947
948     def test_subtree_and_or3(self):
949         """Testing a search"""
950
951         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
952                               scope=ldb.SCOPE_SUBTREE,
953                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
954         self.assertEqual(len(res11), 2)
955
956     def test_subtree_and_or4(self):
957         """Testing a search"""
958
959         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
960                               scope=ldb.SCOPE_SUBTREE,
961                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
962         self.assertEqual(len(res11), 2)
963
964     def test_subtree_and_or5(self):
965         """Testing a search"""
966
967         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
968                               scope=ldb.SCOPE_SUBTREE,
969                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
970         self.assertEqual(len(res11), 1)
971
972     def test_subtree_or_and(self):
973         """Testing a search"""
974
975         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
976                               scope=ldb.SCOPE_SUBTREE,
977                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
978         self.assertEqual(len(res11), 10)
979
980     def test_subtree_large_and_unique(self):
981         """Testing a search"""
982
983         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
984                               scope=ldb.SCOPE_SUBTREE,
985                               expression="(&(ou=ou10)(y=a))")
986         self.assertEqual(len(res11), 1)
987
988     def test_subtree_and_none(self):
989         """Testing a search"""
990
991         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
992                               scope=ldb.SCOPE_SUBTREE,
993                               expression="(&(ou=ouX)(y=a))")
994         self.assertEqual(len(res11), 0)
995
996     def test_subtree_and_idx_record(self):
997         """Testing a search against the index record"""
998
999         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1000                               scope=ldb.SCOPE_SUBTREE,
1001                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1002         self.assertEqual(len(res11), 0)
1003
1004     def test_subtree_and_idxone_record(self):
1005         """Testing a search against the index record"""
1006
1007         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1008                               scope=ldb.SCOPE_SUBTREE,
1009                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1010         self.assertEqual(len(res11), 0)
1011
1012     def test_dn_filter_one(self):
1013         """Testing that a dn= filter succeeds
1014         (or fails with disallowDNFilter
1015         set and IDXGUID or (IDX and not IDXONE) mode)
1016         when the scope is SCOPE_ONELEVEL.
1017
1018         This should be made more consistent, but for now lock in
1019         the behaviour
1020
1021         """
1022
1023         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1024                               scope=ldb.SCOPE_ONELEVEL,
1025                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1026         if hasattr(self, 'disallowDNFilter') and \
1027            hasattr(self, 'IDX') and \
1028            (hasattr(self, 'IDXGUID') or \
1029             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1030             self.assertEqual(len(res11), 0)
1031         else:
1032             self.assertEqual(len(res11), 1)
1033
1034     def test_dn_filter_subtree(self):
1035         """Testing that a dn= filter succeeds
1036         (or fails with disallowDNFilter set)
1037         when the scope is SCOPE_SUBTREE"""
1038
1039         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1040                               scope=ldb.SCOPE_SUBTREE,
1041                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1042         if hasattr(self, 'disallowDNFilter') \
1043            and hasattr(self, 'IDX'):
1044             self.assertEqual(len(res11), 0)
1045         else:
1046             self.assertEqual(len(res11), 1)
1047
1048     def test_dn_filter_base(self):
1049         """Testing that (incorrectly) a dn= filter works
1050         when the scope is SCOPE_BASE"""
1051
1052         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1053                               scope=ldb.SCOPE_BASE,
1054                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1055
1056         # At some point we should fix this, but it isn't trivial
1057         self.assertEqual(len(res11), 1)
1058
1059
1060 class IndexedSearchTests(SearchTests):
1061     """Test searches using the index, to ensure the index doesn't
1062        break things"""
1063     def setUp(self):
1064         super(IndexedSearchTests, self).setUp()
1065         self.l.add({"dn": "@INDEXLIST",
1066                     "@IDXATTR": [b"x", b"y", b"ou"]})
1067         self.IDX = True
1068
1069 class IndexedSearchDnFilterTests(SearchTests):
1070     """Test searches using the index, to ensure the index doesn't
1071        break things"""
1072     def setUp(self):
1073         super(IndexedSearchDnFilterTests, self).setUp()
1074         self.l.add({"dn": "@OPTIONS",
1075                     "disallowDNFilter": "TRUE"})
1076         self.disallowDNFilter = True
1077
1078         self.l.add({"dn": "@INDEXLIST",
1079                     "@IDXATTR": [b"x", b"y", b"ou"]})
1080         self.IDX = True
1081
1082 class IndexedAndOneLevelSearchTests(SearchTests):
1083     """Test searches using the index including @IDXONE, to ensure
1084        the index doesn't break things"""
1085     def setUp(self):
1086         super(IndexedAndOneLevelSearchTests, self).setUp()
1087         self.l.add({"dn": "@INDEXLIST",
1088                     "@IDXATTR": [b"x", b"y", b"ou"],
1089                     "@IDXONE": [b"1"]})
1090         self.IDX = True
1091
1092 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1093     """Test searches using the index including @IDXONE, to ensure
1094        the index doesn't break things"""
1095     def setUp(self):
1096         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1097         self.l.add({"dn": "@OPTIONS",
1098                     "disallowDNFilter": "TRUE"})
1099         self.disallowDNFilter = True
1100
1101         self.l.add({"dn": "@INDEXLIST",
1102                     "@IDXATTR": [b"x", b"y", b"ou"],
1103                     "@IDXONE": [b"1"]})
1104         self.IDX = True
1105         self.IDXONE = True
1106
1107 class GUIDIndexedSearchTests(SearchTests):
1108     """Test searches using the index, to ensure the index doesn't
1109        break things"""
1110     def setUp(self):
1111         super(GUIDIndexedSearchTests, self).setUp()
1112
1113         self.l.add({"dn": "@INDEXLIST",
1114                     "@IDXATTR": [b"x", b"y", b"ou"],
1115                     "@IDXGUID": [b"objectUUID"],
1116                     "@IDX_DN_GUID": [b"GUID"]})
1117         self.IDXGUID = True
1118         self.IDXONE = True
1119
1120
1121 class GUIDIndexedDNFilterSearchTests(SearchTests):
1122     """Test searches using the index, to ensure the index doesn't
1123        break things"""
1124     def setUp(self):
1125         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1126         self.l.add({"dn": "@OPTIONS",
1127                     "disallowDNFilter": "TRUE"})
1128         self.disallowDNFilter = True
1129
1130         self.l.add({"dn": "@INDEXLIST",
1131                     "@IDXATTR": [b"x", b"y", b"ou"],
1132                     "@IDXGUID": [b"objectUUID"],
1133                     "@IDX_DN_GUID": [b"GUID"]})
1134         self.IDX = True
1135         self.IDXGUID = True
1136
1137 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1138     """Test searches using the index including @IDXONE, to ensure
1139        the index doesn't break things"""
1140     def setUp(self):
1141         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1142         self.l.add({"dn": "@OPTIONS",
1143                     "disallowDNFilter": "TRUE"})
1144         self.disallowDNFilter = True
1145
1146         self.l.add({"dn": "@INDEXLIST",
1147                     "@IDXATTR": [b"x", b"y", b"ou"],
1148                     "@IDXONE": [b"1"],
1149                     "@IDXGUID": [b"objectUUID"],
1150                     "@IDX_DN_GUID": [b"GUID"]})
1151         self.IDX = True
1152         self.IDXGUID = True
1153         self.IDXONE = True
1154
1155
1156 class AddModifyTests(LdbBaseTest):
1157     def tearDown(self):
1158         shutil.rmtree(self.testdir)
1159         super(AddModifyTests, self).tearDown()
1160
1161         # Ensure the LDB is closed now, so we close the FD
1162         del(self.l)
1163
1164     def setUp(self):
1165         super(AddModifyTests, self).setUp()
1166         self.testdir = tempdir()
1167         self.filename = os.path.join(self.testdir, "add_test.ldb")
1168         self.l = ldb.Ldb(self.url(),
1169                          flags=self.flags(),
1170                          options=["modules:rdn_name"])
1171         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1172                     "name": b"samba.org",
1173                     "objectUUID": b"0123456789abcdef"})
1174         self.l.add({"dn": "@ATTRIBUTES",
1175                     "objectUUID": "UNIQUE_INDEX"})
1176
1177     def test_add_dup(self):
1178         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1179                     "name": b"Admins",
1180                     "x": "z", "y": "a",
1181                     "objectUUID": b"0123456789abcde1"})
1182         try:
1183             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1184                         "name": b"Admins",
1185                         "x": "z", "y": "a",
1186                         "objectUUID": b"0123456789abcde2"})
1187             self.fail("Should have failed adding dupliate entry")
1188         except ldb.LdbError as err:
1189             enum = err.args[0]
1190             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1191
1192     def test_add_del_add(self):
1193         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1194                     "name": b"Admins",
1195                     "x": "z", "y": "a",
1196                     "objectUUID": b"0123456789abcde1"})
1197         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1198         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1199                     "name": b"Admins",
1200                     "x": "z", "y": "a",
1201                     "objectUUID": b"0123456789abcde2"})
1202
1203     def test_add_move_add(self):
1204         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1205                     "name": b"Admins",
1206                     "x": "z", "y": "a",
1207                     "objectUUID": b"0123456789abcde1"})
1208         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1209                       "OU=DUP2,DC=SAMBA,DC=ORG")
1210         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1211                     "name": b"Admins",
1212                     "x": "z", "y": "a",
1213                     "objectUUID": b"0123456789abcde2"})
1214
1215     def test_add_move_fail_move_move(self):
1216         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1217                     "name": b"Admins",
1218                     "x": "z", "y": "a",
1219                     "objectUUID": b"0123456789abcde1"})
1220         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1221                     "name": b"Admins",
1222                     "x": "z", "y": "a",
1223                     "objectUUID": b"0123456789abcde2"})
1224         try:
1225             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1226                           "OU=DUP2,DC=SAMBA,DC=ORG")
1227             self.fail("Should have failed on duplicate DN")
1228         except ldb.LdbError as err:
1229             enum = err.args[0]
1230             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1231
1232         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1233                       "OU=DUP3,DC=SAMBA,DC=ORG")
1234
1235         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1236                       "OU=DUP2,DC=SAMBA,DC=ORG")
1237
1238         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1239                              scope=ldb.SCOPE_SUBTREE,
1240                              expression="(objectUUID=0123456789abcde1)")
1241         self.assertEqual(len(res2), 1)
1242         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1243
1244         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1245                              scope=ldb.SCOPE_SUBTREE,
1246                              expression="(objectUUID=0123456789abcde2)")
1247         self.assertEqual(len(res3), 1)
1248         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1249
1250     def test_move_missing(self):
1251         try:
1252             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1253                           "OU=DUP2,DC=SAMBA,DC=ORG")
1254             self.fail("Should have failed on missing")
1255         except ldb.LdbError as err:
1256             enum = err.args[0]
1257             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1258
1259     def test_move_missing2(self):
1260         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1261                     "name": b"Admins",
1262                     "x": "z", "y": "a",
1263                     "objectUUID": b"0123456789abcde2"})
1264
1265         try:
1266             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1267                           "OU=DUP2,DC=SAMBA,DC=ORG")
1268             self.fail("Should have failed on missing")
1269         except ldb.LdbError as err:
1270             enum = err.args[0]
1271             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1272
1273     def test_move_fail_move_add(self):
1274         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1275                     "name": b"Admins",
1276                     "x": "z", "y": "a",
1277                     "objectUUID": b"0123456789abcde1"})
1278         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1279                     "name": b"Admins",
1280                     "x": "z", "y": "a",
1281                     "objectUUID": b"0123456789abcde2"})
1282         try:
1283             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1284                           "OU=DUP2,DC=SAMBA,DC=ORG")
1285             self.fail("Should have failed on duplicate DN")
1286         except ldb.LdbError as err:
1287             enum = err.args[0]
1288             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1289
1290         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1291                       "OU=DUP3,DC=SAMBA,DC=ORG")
1292
1293         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1294                     "name": b"Admins",
1295                     "x": "z", "y": "a",
1296                     "objectUUID": b"0123456789abcde3"})
1297
1298
1299 class IndexedAddModifyTests(AddModifyTests):
1300     """Test searches using the index, to ensure the index doesn't
1301        break things"""
1302     def setUp(self):
1303         super(IndexedAddModifyTests, self).setUp()
1304         self.l.add({"dn": "@INDEXLIST",
1305                     "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1306                     "@IDXONE": [b"1"]})
1307
1308     def test_duplicate_GUID(self):
1309         try:
1310             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1311                         "name": b"Admins",
1312                         "x": "z", "y": "a",
1313                         "objectUUID": b"0123456789abcdef"})
1314             self.fail("Should have failed adding dupliate GUID")
1315         except ldb.LdbError as err:
1316             enum = err.args[0]
1317             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1318
1319     def test_duplicate_name_dup_GUID(self):
1320         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1321                     "name": b"Admins",
1322                     "x": "z", "y": "a",
1323                     "objectUUID": b"a123456789abcdef"})
1324         try:
1325             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1326                         "name": b"Admins",
1327                         "x": "z", "y": "a",
1328                         "objectUUID": b"a123456789abcdef"})
1329             self.fail("Should have failed adding dupliate GUID")
1330         except ldb.LdbError as err:
1331             enum = err.args[0]
1332             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1333
1334     def test_duplicate_name_dup_GUID2(self):
1335         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1336                     "name": b"Admins",
1337                     "x": "z", "y": "a",
1338                     "objectUUID": b"abc3456789abcdef"})
1339         try:
1340             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1341                         "name": b"Admins",
1342                         "x": "z", "y": "a",
1343                         "objectUUID": b"aaa3456789abcdef"})
1344             self.fail("Should have failed adding dupliate DN")
1345         except ldb.LdbError as err:
1346             enum = err.args[0]
1347             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1348
1349         # Checking the GUID didn't stick in the index
1350         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1351                     "name": b"Admins",
1352                     "x": "z", "y": "a",
1353                     "objectUUID": b"aaa3456789abcdef"})
1354
1355     def test_add_dup_guid_add(self):
1356         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1357                     "name": b"Admins",
1358                     "x": "z", "y": "a",
1359                     "objectUUID": b"0123456789abcde1"})
1360         try:
1361             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1362                         "name": b"Admins",
1363                         "x": "z", "y": "a",
1364                         "objectUUID": b"0123456789abcde1"})
1365             self.fail("Should have failed on duplicate GUID")
1366
1367         except ldb.LdbError as err:
1368             enum = err.args[0]
1369             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1370
1371         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1372                     "name": b"Admins",
1373                     "x": "z", "y": "a",
1374                     "objectUUID": b"0123456789abcde2"})
1375
1376 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1377     """Test searches using the index, to ensure the index doesn't
1378        break things"""
1379     def setUp(self):
1380         super(GUIDIndexedAddModifyTests, self).setUp()
1381         indexlist = {"dn": "@INDEXLIST",
1382                      "@IDXATTR": [b"x", b"y", b"ou"],
1383                      "@IDXONE": [b"1"],
1384                      "@IDXGUID": [b"objectUUID"],
1385                      "@IDX_DN_GUID": [b"GUID"]}
1386         m = ldb.Message.from_dict(self.l, indexlist, ldb.FLAG_MOD_REPLACE)
1387         self.l.modify(m)
1388
1389
1390 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1391     """Test GUID index behaviour insdie the transaction"""
1392     def setUp(self):
1393         super(GUIDTransIndexedAddModifyTests, self).setUp()
1394         self.l.transaction_start()
1395
1396     def tearDown(self):
1397         self.l.transaction_commit()
1398         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1399
1400 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1401     """Test index behaviour insdie the transaction"""
1402     def setUp(self):
1403         super(TransIndexedAddModifyTests, self).setUp()
1404         self.l.transaction_start()
1405
1406     def tearDown(self):
1407         self.l.transaction_commit()
1408         super(TransIndexedAddModifyTests, self).tearDown()
1409
1410
1411 class BadIndexTests(LdbBaseTest):
1412     def setUp(self):
1413         super(BadIndexTests, self).setUp()
1414         self.testdir = tempdir()
1415         self.filename = os.path.join(self.testdir, "test.ldb")
1416         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1417         if hasattr(self, 'IDXGUID'):
1418             self.ldb.add({"dn": "@INDEXLIST",
1419                           "@IDXATTR": [b"x", b"y", b"ou"],
1420                           "@IDXGUID": [b"objectUUID"],
1421                           "@IDX_DN_GUID": [b"GUID"]})
1422         else:
1423             self.ldb.add({"dn": "@INDEXLIST",
1424                           "@IDXATTR": [b"x", b"y", b"ou"]})
1425
1426         super(BadIndexTests, self).setUp()
1427
1428     def test_unique(self):
1429         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1430                       "objectUUID": b"0123456789abcde1",
1431                       "y": "1"})
1432         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1433                       "objectUUID": b"0123456789abcde2",
1434                       "y": "1"})
1435         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1436                       "objectUUID": b"0123456789abcde3",
1437                       "y": "1"})
1438
1439         res = self.ldb.search(expression="(y=1)",
1440                               base="dc=samba,dc=org")
1441         self.assertEquals(len(res), 3)
1442
1443         # Now set this to unique index, but forget to check the result
1444         try:
1445             self.ldb.add({"dn": "@ATTRIBUTES",
1446                         "y": "UNIQUE_INDEX"})
1447             self.fail()
1448         except ldb.LdbError:
1449             pass
1450
1451         # We must still have a working index
1452         res = self.ldb.search(expression="(y=1)",
1453                               base="dc=samba,dc=org")
1454         self.assertEquals(len(res), 3)
1455
1456     def test_unique_transaction(self):
1457         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1458                       "objectUUID": b"0123456789abcde1",
1459                       "y": "1"})
1460         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1461                       "objectUUID": b"0123456789abcde2",
1462                       "y": "1"})
1463         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1464                       "objectUUID": b"0123456789abcde3",
1465                       "y": "1"})
1466
1467         res = self.ldb.search(expression="(y=1)",
1468                               base="dc=samba,dc=org")
1469         self.assertEquals(len(res), 3)
1470
1471         self.ldb.transaction_start()
1472
1473         # Now set this to unique index, but forget to check the result
1474         try:
1475             self.ldb.add({"dn": "@ATTRIBUTES",
1476                         "y": "UNIQUE_INDEX"})
1477         except ldb.LdbError:
1478             pass
1479
1480         try:
1481             self.ldb.transaction_commit()
1482             self.fail()
1483
1484         except ldb.LdbError as err:
1485             enum = err.args[0]
1486             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1487
1488         # We must still have a working index
1489         res = self.ldb.search(expression="(y=1)",
1490                               base="dc=samba,dc=org")
1491
1492         self.assertEquals(len(res), 3)
1493
1494     def test_casefold(self):
1495         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1496                       "objectUUID": b"0123456789abcde1",
1497                       "y": "a"})
1498         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1499                       "objectUUID": b"0123456789abcde2",
1500                       "y": "A"})
1501         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1502                       "objectUUID": b"0123456789abcde3",
1503                       "y": ["a", "A"]})
1504
1505         res = self.ldb.search(expression="(y=a)",
1506                               base="dc=samba,dc=org")
1507         self.assertEquals(len(res), 2)
1508
1509         self.ldb.add({"dn": "@ATTRIBUTES",
1510                       "y": "CASE_INSENSITIVE"})
1511
1512         # We must still have a working index
1513         res = self.ldb.search(expression="(y=a)",
1514                               base="dc=samba,dc=org")
1515
1516         if hasattr(self, 'IDXGUID'):
1517             self.assertEquals(len(res), 3)
1518         else:
1519             # We should not return this entry twice, but sadly
1520             # we have not yet fixed
1521             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1522             self.assertEquals(len(res), 4)
1523
1524     def test_casefold_transaction(self):
1525         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1526                       "objectUUID": b"0123456789abcde1",
1527                       "y": "a"})
1528         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1529                       "objectUUID": b"0123456789abcde2",
1530                       "y": "A"})
1531         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1532                       "objectUUID": b"0123456789abcde3",
1533                       "y": ["a", "A"]})
1534
1535         res = self.ldb.search(expression="(y=a)",
1536                               base="dc=samba,dc=org")
1537         self.assertEquals(len(res), 2)
1538
1539         self.ldb.transaction_start()
1540
1541         self.ldb.add({"dn": "@ATTRIBUTES",
1542                       "y": "CASE_INSENSITIVE"})
1543
1544         self.ldb.transaction_commit()
1545
1546         # We must still have a working index
1547         res = self.ldb.search(expression="(y=a)",
1548                               base="dc=samba,dc=org")
1549
1550         if hasattr(self, 'IDXGUID'):
1551             self.assertEquals(len(res), 3)
1552         else:
1553             # We should not return this entry twice, but sadly
1554             # we have not yet fixed
1555             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1556             self.assertEquals(len(res), 4)
1557
1558
1559     def tearDown(self):
1560         super(BadIndexTests, self).tearDown()
1561
1562
1563 class GUIDBadIndexTests(BadIndexTests):
1564     """Test Bad index things with GUID index mode"""
1565     def setUp(self):
1566         self.IDXGUID = True
1567
1568         super(GUIDBadIndexTests, self).setUp()
1569
1570
1571 class DnTests(TestCase):
1572
1573     def setUp(self):
1574         super(DnTests, self).setUp()
1575         self.ldb = ldb.Ldb()
1576
1577     def tearDown(self):
1578         super(DnTests, self).tearDown()
1579         del(self.ldb)
1580
1581     def test_set_dn_invalid(self):
1582         x = ldb.Message()
1583         def assign():
1584             x.dn = "astring"
1585         self.assertRaises(TypeError, assign)
1586
1587     def test_eq(self):
1588         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1589         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1590         self.assertEqual(x, y)
1591         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1592         self.assertNotEqual(x, y)
1593
1594     def test_str(self):
1595         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1596         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1597
1598     def test_repr(self):
1599         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1600         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1601
1602     def test_get_casefold(self):
1603         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1604         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1605
1606     def test_validate(self):
1607         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1608         self.assertTrue(x.validate())
1609
1610     def test_parent(self):
1611         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1612         self.assertEqual("bar=bloe", x.parent().__str__())
1613
1614     def test_parent_nonexistent(self):
1615         x = ldb.Dn(self.ldb, "@BLA")
1616         self.assertEqual(None, x.parent())
1617
1618     def test_is_valid(self):
1619         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1620         self.assertTrue(x.is_valid())
1621         x = ldb.Dn(self.ldb, "")
1622         self.assertTrue(x.is_valid())
1623
1624     def test_is_special(self):
1625         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1626         self.assertFalse(x.is_special())
1627         x = ldb.Dn(self.ldb, "@FOOBAR")
1628         self.assertTrue(x.is_special())
1629
1630     def test_check_special(self):
1631         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1632         self.assertFalse(x.check_special("FOOBAR"))
1633         x = ldb.Dn(self.ldb, "@FOOBAR")
1634         self.assertTrue(x.check_special("@FOOBAR"))
1635
1636     def test_len(self):
1637         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1638         self.assertEqual(2, len(x))
1639         x = ldb.Dn(self.ldb, "dc=foo21")
1640         self.assertEqual(1, len(x))
1641
1642     def test_add_child(self):
1643         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1644         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1645         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1646
1647     def test_add_base(self):
1648         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1649         base = ldb.Dn(self.ldb, "bla=bloe")
1650         self.assertTrue(x.add_base(base))
1651         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1652
1653     def test_add_child_str(self):
1654         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1655         self.assertTrue(x.add_child("bla=bloe"))
1656         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1657
1658     def test_add_base_str(self):
1659         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1660         base = "bla=bloe"
1661         self.assertTrue(x.add_base(base))
1662         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1663
1664     def test_add(self):
1665         x = ldb.Dn(self.ldb, "dc=foo24")
1666         y = ldb.Dn(self.ldb, "bar=bla")
1667         self.assertEqual("dc=foo24,bar=bla", str(x + y))
1668
1669     def test_remove_base_components(self):
1670         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1671         x.remove_base_components(len(x)-1)
1672         self.assertEqual("dc=foo24", str(x))
1673
1674     def test_parse_ldif(self):
1675         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1676         msg = next(msgs)
1677         self.assertEqual("foo=bar", str(msg[1].dn))
1678         self.assertTrue(isinstance(msg[1], ldb.Message))
1679         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1680         self.assertEqual("dn: foo=bar\n\n", ldif)
1681
1682     def test_parse_ldif_more(self):
1683         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1684         msg = next(msgs)
1685         self.assertEqual("foo=bar", str(msg[1].dn))
1686         msg = next(msgs)
1687         self.assertEqual("bar=bar", str(msg[1].dn))
1688
1689     def test_canonical_string(self):
1690         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1691         self.assertEqual("/bloe/foo25", x.canonical_str())
1692
1693     def test_canonical_ex_string(self):
1694         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1695         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1696
1697     def test_ldb_is_child_of(self):
1698         """Testing ldb_dn_compare_dn"""
1699         dn1 = ldb.Dn(self.ldb, "dc=base")
1700         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1701         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1702         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1703
1704         self.assertTrue(dn1.is_child_of(dn1))
1705         self.assertTrue(dn2.is_child_of(dn1))
1706         self.assertTrue(dn4.is_child_of(dn1))
1707         self.assertTrue(dn4.is_child_of(dn3))
1708         self.assertTrue(dn4.is_child_of(dn4))
1709         self.assertFalse(dn3.is_child_of(dn2))
1710         self.assertFalse(dn1.is_child_of(dn4))
1711
1712     def test_ldb_is_child_of_str(self):
1713         """Testing ldb_dn_compare_dn"""
1714         dn1_str = "dc=base"
1715         dn2_str = "cn=foo,dc=base"
1716         dn3_str = "cn=bar,dc=base"
1717         dn4_str = "cn=baz,cn=bar,dc=base"
1718
1719         dn1 = ldb.Dn(self.ldb, dn1_str)
1720         dn2 = ldb.Dn(self.ldb, dn2_str)
1721         dn3 = ldb.Dn(self.ldb, dn3_str)
1722         dn4 = ldb.Dn(self.ldb, dn4_str)
1723
1724         self.assertTrue(dn1.is_child_of(dn1_str))
1725         self.assertTrue(dn2.is_child_of(dn1_str))
1726         self.assertTrue(dn4.is_child_of(dn1_str))
1727         self.assertTrue(dn4.is_child_of(dn3_str))
1728         self.assertTrue(dn4.is_child_of(dn4_str))
1729         self.assertFalse(dn3.is_child_of(dn2_str))
1730         self.assertFalse(dn1.is_child_of(dn4_str))
1731
1732     def test_get_component_name(self):
1733         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1734         self.assertEqual(dn.get_component_name(0), 'cn')
1735         self.assertEqual(dn.get_component_name(1), 'dc')
1736         self.assertEqual(dn.get_component_name(2), None)
1737         self.assertEqual(dn.get_component_name(-1), None)
1738
1739     def test_get_component_value(self):
1740         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1741         self.assertEqual(dn.get_component_value(0), 'foo')
1742         self.assertEqual(dn.get_component_value(1), 'base')
1743         self.assertEqual(dn.get_component_name(2), None)
1744         self.assertEqual(dn.get_component_name(-1), None)
1745
1746     def test_set_component(self):
1747         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1748         dn.set_component(0, 'cn', 'bar')
1749         self.assertEqual(str(dn), "cn=bar,dc=base")
1750         dn.set_component(1, 'o', 'asep')
1751         self.assertEqual(str(dn), "cn=bar,o=asep")
1752         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1753         self.assertEqual(str(dn), "cn=bar,o=asep")
1754         dn.set_component(1, 'o', 'a,b+c')
1755         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1756
1757     def test_set_component_bytes(self):
1758         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1759         dn.set_component(0, 'cn', b'bar')
1760         self.assertEqual(str(dn), "cn=bar,dc=base")
1761         dn.set_component(1, 'o', b'asep')
1762         self.assertEqual(str(dn), "cn=bar,o=asep")
1763
1764     def test_set_component_none(self):
1765         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1766         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1767
1768     def test_get_extended_component_null(self):
1769         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1770         self.assertEqual(dn.get_extended_component("TEST"), None)
1771
1772     def test_get_extended_component(self):
1773         self.ldb._register_test_extensions()
1774         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1775         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1776
1777     def test_set_extended_component(self):
1778         self.ldb._register_test_extensions()
1779         dn = ldb.Dn(self.ldb, "dc=base")
1780         dn.set_extended_component("TEST", "foo")
1781         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1782         dn.set_extended_component("TEST", b"bar")
1783         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1784
1785     def test_extended_str(self):
1786         self.ldb._register_test_extensions()
1787         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1788         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1789
1790     def test_get_rdn_name(self):
1791         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1792         self.assertEqual(dn.get_rdn_name(), 'cn')
1793
1794     def test_get_rdn_value(self):
1795         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1796         self.assertEqual(dn.get_rdn_value(), 'foo')
1797
1798     def test_get_casefold(self):
1799         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1800         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1801
1802     def test_get_linearized(self):
1803         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1804         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1805
1806     def test_is_null(self):
1807         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1808         self.assertFalse(dn.is_null())
1809
1810         dn = ldb.Dn(self.ldb, '')
1811         self.assertTrue(dn.is_null())
1812
1813 class LdbMsgTests(TestCase):
1814
1815     def setUp(self):
1816         super(LdbMsgTests, self).setUp()
1817         self.msg = ldb.Message()
1818
1819     def test_init_dn(self):
1820         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1821         self.assertEqual("dc=foo27", str(self.msg.dn))
1822
1823     def test_iter_items(self):
1824         self.assertEqual(0, len(self.msg.items()))
1825         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
1826         self.assertEqual(1, len(self.msg.items()))
1827
1828     def test_repr(self):
1829         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
1830         self.msg["dc"] = b"foo"
1831         if PY3:
1832             self.assertIn(repr(self.msg), [
1833                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1834                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1835             ])
1836             self.assertIn(repr(self.msg.text), [
1837                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1838                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1839             ])
1840         else:
1841             self.assertEqual(
1842                 repr(self.msg),
1843                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1844             self.assertEqual(
1845                 repr(self.msg.text),
1846                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1847
1848     def test_len(self):
1849         self.assertEqual(0, len(self.msg))
1850
1851     def test_notpresent(self):
1852         self.assertRaises(KeyError, lambda: self.msg["foo"])
1853
1854     def test_del(self):
1855         del self.msg["foo"]
1856
1857     def test_add(self):
1858         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1859
1860     def test_add_text(self):
1861         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1862
1863     def test_elements_empty(self):
1864         self.assertEqual([], self.msg.elements())
1865
1866     def test_elements(self):
1867         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1868         self.msg.add(el)
1869         self.assertEqual([el], self.msg.elements())
1870         self.assertEqual([el.text], self.msg.text.elements())
1871
1872     def test_add_value(self):
1873         self.assertEqual(0, len(self.msg))
1874         self.msg["foo"] = [b"foo"]
1875         self.assertEqual(1, len(self.msg))
1876
1877     def test_add_value_text(self):
1878         self.assertEqual(0, len(self.msg))
1879         self.msg["foo"] = ["foo"]
1880         self.assertEqual(1, len(self.msg))
1881
1882     def test_add_value_multiple(self):
1883         self.assertEqual(0, len(self.msg))
1884         self.msg["foo"] = [b"foo", b"bla"]
1885         self.assertEqual(1, len(self.msg))
1886         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
1887
1888     def test_add_value_multiple_text(self):
1889         self.assertEqual(0, len(self.msg))
1890         self.msg["foo"] = ["foo", "bla"]
1891         self.assertEqual(1, len(self.msg))
1892         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
1893
1894     def test_set_value(self):
1895         self.msg["foo"] = [b"fool"]
1896         self.assertEqual([b"fool"], list(self.msg["foo"]))
1897         self.msg["foo"] = [b"bar"]
1898         self.assertEqual([b"bar"], list(self.msg["foo"]))
1899
1900     def test_set_value_text(self):
1901         self.msg["foo"] = ["fool"]
1902         self.assertEqual(["fool"], list(self.msg.text["foo"]))
1903         self.msg["foo"] = ["bar"]
1904         self.assertEqual(["bar"], list(self.msg.text["foo"]))
1905
1906     def test_keys(self):
1907         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1908         self.msg["foo"] = [b"bla"]
1909         self.msg["bar"] = [b"bla"]
1910         self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
1911
1912     def test_keys_text(self):
1913         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1914         self.msg["foo"] = ["bla"]
1915         self.msg["bar"] = ["bla"]
1916         self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
1917
1918     def test_dn(self):
1919         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1920         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
1921
1922     def test_get_dn(self):
1923         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1924         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
1925
1926     def test_dn_text(self):
1927         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1928         self.assertEqual("@BASEINFO", str(self.msg.dn))
1929         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
1930
1931     def test_get_dn_text(self):
1932         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1933         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
1934         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
1935
1936     def test_get_invalid(self):
1937         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1938         self.assertRaises(TypeError, self.msg.get, 42)
1939
1940     def test_get_other(self):
1941         self.msg["foo"] = [b"bar"]
1942         self.assertEqual(b"bar", self.msg.get("foo")[0])
1943         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
1944         self.assertEqual(None, self.msg.get("foo", idx=1))
1945         self.assertEqual("", self.msg.get("foo", default='', idx=1))
1946
1947     def test_get_other_text(self):
1948         self.msg["foo"] = ["bar"]
1949         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
1950         self.assertEqual("bar", self.msg.text.get("foo")[0])
1951         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
1952         self.assertEqual(None, self.msg.get("foo", idx=1))
1953         self.assertEqual("", self.msg.get("foo", default='', idx=1))
1954
1955     def test_get_default(self):
1956         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
1957         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
1958
1959     def test_get_default_text(self):
1960         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
1961         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
1962
1963     def test_get_unknown(self):
1964         self.assertEqual(None, self.msg.get("lalalala"))
1965
1966     def test_get_unknown_text(self):
1967         self.assertEqual(None, self.msg.text.get("lalalala"))
1968
1969     def test_msg_diff(self):
1970         l = ldb.Ldb()
1971         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
1972         msg1 = next(msgs)[1]
1973         msg2 = next(msgs)[1]
1974         msgdiff = l.msg_diff(msg1, msg2)
1975         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
1976         self.assertRaises(KeyError, lambda: msgdiff["foo"])
1977         self.assertEqual(1, len(msgdiff))
1978
1979     def test_equal_empty(self):
1980         msg1 = ldb.Message()
1981         msg2 = ldb.Message()
1982         self.assertEqual(msg1, msg2)
1983
1984     def test_equal_simplel(self):
1985         db = ldb.Ldb()
1986         msg1 = ldb.Message()
1987         msg1.dn = ldb.Dn(db, "foo=bar")
1988         msg2 = ldb.Message()
1989         msg2.dn = ldb.Dn(db, "foo=bar")
1990         self.assertEqual(msg1, msg2)
1991         msg1['foo'] = b'bar'
1992         msg2['foo'] = b'bar'
1993         self.assertEqual(msg1, msg2)
1994         msg2['foo'] = b'blie'
1995         self.assertNotEqual(msg1, msg2)
1996         msg2['foo'] = b'blie'
1997
1998     def test_from_dict(self):
1999         rec = {"dn": "dc=fromdict",
2000                "a1": [b"a1-val1", b"a1-val1"]}
2001         l = ldb.Ldb()
2002         # check different types of input Flags
2003         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2004             m = ldb.Message.from_dict(l, rec, flags)
2005             self.assertEqual(rec["a1"], list(m["a1"]))
2006             self.assertEqual(flags, m["a1"].flags())
2007         # check input params
2008         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2009         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2010         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2011         # Message.from_dict expects dictionary with 'dn'
2012         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2013         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2014
2015     def test_from_dict_text(self):
2016         rec = {"dn": "dc=fromdict",
2017                "a1": ["a1-val1", "a1-val1"]}
2018         l = ldb.Ldb()
2019         # check different types of input Flags
2020         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2021             m = ldb.Message.from_dict(l, rec, flags)
2022             self.assertEqual(rec["a1"], list(m.text["a1"]))
2023             self.assertEqual(flags, m.text["a1"].flags())
2024         # check input params
2025         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2026         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2027         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2028         # Message.from_dict expects dictionary with 'dn'
2029         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2030         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2031
2032     def test_copy_add_message_element(self):
2033         m = ldb.Message()
2034         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2035         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2036         mto = ldb.Message()
2037         mto["1"] = m["1"]
2038         mto["2"] = m["2"]
2039         self.assertEqual(mto["1"], m["1"])
2040         self.assertEqual(mto["2"], m["2"])
2041         mto = ldb.Message()
2042         mto.add(m["1"])
2043         mto.add(m["2"])
2044         self.assertEqual(mto["1"], m["1"])
2045         self.assertEqual(mto["2"], m["2"])
2046
2047     def test_copy_add_message_element_text(self):
2048         m = ldb.Message()
2049         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2050         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2051         mto = ldb.Message()
2052         mto["1"] = m["1"]
2053         mto["2"] = m["2"]
2054         self.assertEqual(mto["1"], m.text["1"])
2055         self.assertEqual(mto["2"], m.text["2"])
2056         mto = ldb.Message()
2057         mto.add(m["1"])
2058         mto.add(m["2"])
2059         self.assertEqual(mto.text["1"], m.text["1"])
2060         self.assertEqual(mto.text["2"], m.text["2"])
2061         self.assertEqual(mto["1"], m["1"])
2062         self.assertEqual(mto["2"], m["2"])
2063
2064
2065 class MessageElementTests(TestCase):
2066
2067     def test_cmp_element(self):
2068         x = ldb.MessageElement([b"foo"])
2069         y = ldb.MessageElement([b"foo"])
2070         z = ldb.MessageElement([b"bzr"])
2071         self.assertEqual(x, y)
2072         self.assertNotEqual(x, z)
2073
2074     def test_cmp_element_text(self):
2075         x = ldb.MessageElement([b"foo"])
2076         y = ldb.MessageElement(["foo"])
2077         self.assertEqual(x, y)
2078
2079     def test_create_iterable(self):
2080         x = ldb.MessageElement([b"foo"])
2081         self.assertEqual([b"foo"], list(x))
2082         self.assertEqual(["foo"], list(x.text))
2083
2084     def test_repr(self):
2085         x = ldb.MessageElement([b"foo"])
2086         if PY3:
2087             self.assertEqual("MessageElement([b'foo'])", repr(x))
2088             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2089         else:
2090             self.assertEqual("MessageElement(['foo'])", repr(x))
2091             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2092         x = ldb.MessageElement([b"foo", b"bla"])
2093         self.assertEqual(2, len(x))
2094         if PY3:
2095             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2096             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2097         else:
2098             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2099             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2100
2101     def test_get_item(self):
2102         x = ldb.MessageElement([b"foo", b"bar"])
2103         self.assertEqual(b"foo", x[0])
2104         self.assertEqual(b"bar", x[1])
2105         self.assertEqual(b"bar", x[-1])
2106         self.assertRaises(IndexError, lambda: x[45])
2107
2108     def test_get_item_text(self):
2109         x = ldb.MessageElement(["foo", "bar"])
2110         self.assertEqual("foo", x.text[0])
2111         self.assertEqual("bar", x.text[1])
2112         self.assertEqual("bar", x.text[-1])
2113         self.assertRaises(IndexError, lambda: x[45])
2114
2115     def test_len(self):
2116         x = ldb.MessageElement([b"foo", b"bar"])
2117         self.assertEqual(2, len(x))
2118
2119     def test_eq(self):
2120         x = ldb.MessageElement([b"foo", b"bar"])
2121         y = ldb.MessageElement([b"foo", b"bar"])
2122         self.assertEqual(y, x)
2123         x = ldb.MessageElement([b"foo"])
2124         self.assertNotEqual(y, x)
2125         y = ldb.MessageElement([b"foo"])
2126         self.assertEqual(y, x)
2127
2128     def test_extended(self):
2129         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2130         if PY3:
2131             self.assertEqual("MessageElement([b'456'])", repr(el))
2132             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2133         else:
2134             self.assertEqual("MessageElement(['456'])", repr(el))
2135             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2136
2137     def test_bad_text(self):
2138         el = ldb.MessageElement(b'\xba\xdd')
2139         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2140
2141
2142 class ModuleTests(TestCase):
2143
2144     def setUp(self):
2145         super(ModuleTests, self).setUp()
2146         self.testdir = tempdir()
2147         self.filename = os.path.join(self.testdir, "test.ldb")
2148         self.ldb = ldb.Ldb(self.filename)
2149
2150     def tearDown(self):
2151         shutil.rmtree(self.testdir)
2152         super(ModuleTests, self).setUp()
2153
2154     def test_register_module(self):
2155         class ExampleModule:
2156             name = "example"
2157         ldb.register_module(ExampleModule)
2158
2159     def test_use_module(self):
2160         ops = []
2161         class ExampleModule:
2162             name = "bla"
2163
2164             def __init__(self, ldb, next):
2165                 ops.append("init")
2166                 self.next = next
2167
2168             def search(self, *args, **kwargs):
2169                 return self.next.search(*args, **kwargs)
2170
2171             def request(self, *args, **kwargs):
2172                 pass
2173
2174         ldb.register_module(ExampleModule)
2175         l = ldb.Ldb(self.filename)
2176         l.add({"dn": "@MODULES", "@LIST": "bla"})
2177         self.assertEqual([], ops)
2178         l = ldb.Ldb(self.filename)
2179         self.assertEqual(["init"], ops)
2180
2181 class LdbResultTests(LdbBaseTest):
2182
2183     def setUp(self):
2184         super(LdbResultTests, self).setUp()
2185         self.testdir = tempdir()
2186         self.filename = os.path.join(self.testdir, "test.ldb")
2187         self.l = ldb.Ldb(self.url(), flags=self.flags())
2188         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
2189         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
2190         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
2191         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1"})
2192         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2"})
2193         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3"})
2194         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4"})
2195         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5"})
2196         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6"})
2197         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7"})
2198         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8"})
2199         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9"})
2200         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
2201
2202     def tearDown(self):
2203         shutil.rmtree(self.testdir)
2204         super(LdbResultTests, self).tearDown()
2205         # Ensure the LDB is closed now, so we close the FD
2206         del(self.l)
2207
2208     def test_return_type(self):
2209         res = self.l.search()
2210         self.assertEqual(str(res), "<ldb result>")
2211
2212     def test_get_msgs(self):
2213         res = self.l.search()
2214         list = res.msgs
2215
2216     def test_get_controls(self):
2217         res = self.l.search()
2218         list = res.controls
2219
2220     def test_get_referals(self):
2221         res = self.l.search()
2222         list = res.referals
2223
2224     def test_iter_msgs(self):
2225         found = False
2226         for l in self.l.search().msgs:
2227             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2228                 found = True
2229         self.assertTrue(found)
2230
2231     def test_iter_msgs_count(self):
2232         self.assertTrue(self.l.search().count > 0)
2233         # 13 objects has been added to the DC=SAMBA, DC=ORG
2234         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2235
2236     def test_iter_controls(self):
2237         res = self.l.search().controls
2238         it = iter(res)
2239
2240     def test_create_control(self):
2241         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2242         c = ldb.Control(self.l, "relax:1")
2243         self.assertEqual(c.critical, True)
2244         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2245
2246     def test_iter_refs(self):
2247         res = self.l.search().referals
2248         it = iter(res)
2249
2250     def test_search_sequence_msgs(self):
2251         found = False
2252         res = self.l.search().msgs
2253
2254         for i in range(0, len(res)):
2255             l = res[i]
2256             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2257                 found = True
2258         self.assertTrue(found)
2259
2260     def test_search_as_iter(self):
2261         found = False
2262         res = self.l.search()
2263
2264         for l in res:
2265             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2266                 found = True
2267         self.assertTrue(found)
2268
2269     def test_search_iter(self):
2270         found = False
2271         res = self.l.search_iterator()
2272
2273         for l in res:
2274             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2275                 found = True
2276         self.assertTrue(found)
2277
2278
2279     # Show that search results can't see into a transaction
2280     def test_search_against_trans(self):
2281         found11 = False
2282
2283         (r1, w1) = os.pipe()
2284
2285         (r2, w2) = os.pipe()
2286
2287         # For the first element, fork a child that will
2288         # write to the DB
2289         pid = os.fork()
2290         if pid == 0:
2291             # In the child, re-open
2292             del(self.l)
2293             gc.collect()
2294
2295             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2296             # start a transaction
2297             child_ldb.transaction_start()
2298
2299             # write to it
2300             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2301                            "name": b"samba.org"})
2302
2303             os.write(w1, b"added")
2304
2305             # Now wait for the search to be done
2306             os.read(r2, 6)
2307
2308             # and commit
2309             try:
2310                 child_ldb.transaction_commit()
2311             except LdbError as err:
2312                 # We print this here to see what went wrong in the child
2313                 print(err)
2314                 os._exit(1)
2315
2316             os.write(w1, b"transaction")
2317             os._exit(0)
2318
2319         self.assertEqual(os.read(r1, 5), b"added")
2320
2321         # This should not turn up until the transaction is concluded
2322         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2323                             scope=ldb.SCOPE_BASE)
2324         self.assertEqual(len(res11), 0)
2325
2326         os.write(w2, b"search")
2327
2328         # Now wait for the transaction to be done.  This should
2329         # deadlock, but the search doesn't hold a read lock for the
2330         # iterator lifetime currently.
2331         self.assertEqual(os.read(r1, 11), b"transaction")
2332
2333         # This should now turn up, as the transaction is over
2334         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2335                             scope=ldb.SCOPE_BASE)
2336         self.assertEqual(len(res11), 1)
2337
2338         self.assertFalse(found11)
2339
2340         (got_pid, status) = os.waitpid(pid, 0)
2341         self.assertEqual(got_pid, pid)
2342
2343
2344     def test_search_iter_against_trans(self):
2345         found = False
2346         found11 = False
2347
2348         # We need to hold this iterator open to hold the all-record
2349         # lock
2350         res = self.l.search_iterator()
2351
2352         (r1, w1) = os.pipe()
2353
2354         (r2, w2) = os.pipe()
2355
2356         # For the first element, with the sequence open (which
2357         # means with ldb locks held), fork a child that will
2358         # write to the DB
2359         pid = os.fork()
2360         if pid == 0:
2361             # In the child, re-open
2362             del(res)
2363             del(self.l)
2364             gc.collect()
2365
2366             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2367             # start a transaction
2368             child_ldb.transaction_start()
2369
2370             # write to it
2371             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2372                            "name": b"samba.org"})
2373
2374             os.write(w1, b"added")
2375
2376             # Now wait for the search to be done
2377             os.read(r2, 6)
2378
2379             # and commit
2380             try:
2381                 child_ldb.transaction_commit()
2382             except LdbError as err:
2383                 # We print this here to see what went wrong in the child
2384                 print(err)
2385                 os._exit(1)
2386
2387             os.write(w1, b"transaction")
2388             os._exit(0)
2389
2390         self.assertEqual(os.read(r1, 5), b"added")
2391
2392         # This should not turn up until the transaction is concluded
2393         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2394                             scope=ldb.SCOPE_BASE)
2395         self.assertEqual(len(res11), 0)
2396
2397         os.write(w2, b"search")
2398
2399         # allow the transaction to start
2400         time.sleep(1)
2401
2402         # This should not turn up until the search finishes and
2403         # removed the read lock, but for ldb_tdb that happened as soon
2404         # as we called the first res.next()
2405         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2406                             scope=ldb.SCOPE_BASE)
2407         self.assertEqual(len(res11), 0)
2408
2409         # These results are all collected at the first next(res) call
2410         for l in res:
2411             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2412                 found = True
2413             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2414                 found11 = True
2415
2416         # Now wait for the transaction to be done.
2417         self.assertEqual(os.read(r1, 11), b"transaction")
2418
2419         # This should now turn up, as the transaction is over and all
2420         # read locks are gone
2421         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2422                             scope=ldb.SCOPE_BASE)
2423         self.assertEqual(len(res11), 1)
2424
2425         self.assertTrue(found)
2426         self.assertFalse(found11)
2427
2428         (got_pid, status) = os.waitpid(pid, 0)
2429         self.assertEqual(got_pid, pid)
2430
2431
2432 class BadTypeTests(TestCase):
2433     def test_control(self):
2434         l = ldb.Ldb()
2435         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2436         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2437
2438     def test_modify(self):
2439         l = ldb.Ldb()
2440         dn = ldb.Dn(l, 'a=b')
2441         m = ldb.Message(dn)
2442         self.assertRaises(TypeError, l.modify, '<bad type>')
2443         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2444
2445     def test_add(self):
2446         l = ldb.Ldb()
2447         dn = ldb.Dn(l, 'a=b')
2448         m = ldb.Message(dn)
2449         self.assertRaises(TypeError, l.add, '<bad type>')
2450         self.assertRaises(TypeError, l.add, m, '<bad type>')
2451
2452     def test_delete(self):
2453         l = ldb.Ldb()
2454         dn = ldb.Dn(l, 'a=b')
2455         self.assertRaises(TypeError, l.add, '<bad type>')
2456         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2457
2458     def test_rename(self):
2459         l = ldb.Ldb()
2460         dn = ldb.Dn(l, 'a=b')
2461         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2462         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2463         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2464
2465     def test_search(self):
2466         l = ldb.Ldb()
2467         self.assertRaises(TypeError, l.search, base=1234)
2468         self.assertRaises(TypeError, l.search, scope='<bad type>')
2469         self.assertRaises(TypeError, l.search, expression=1234)
2470         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2471         self.assertRaises(TypeError, l.search, controls='<bad type>')
2472
2473
2474 class VersionTests(TestCase):
2475
2476     def test_version(self):
2477         self.assertTrue(isinstance(ldb.__version__, str))
2478
2479
2480 if __name__ == '__main__':
2481     import unittest
2482     unittest.TestProgram()