c55d6973f1f3a4062273a01ca8612c1e18408922
[metze/samba/wip.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18
19 def tempdir():
20     import tempfile
21     try:
22         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
23     except KeyError:
24         dir_prefix = None
25     return tempfile.mkdtemp(dir=dir_prefix)
26
27
28 class NoContextTests(TestCase):
29
30     def test_valid_attr_name(self):
31         self.assertTrue(ldb.valid_attr_name("foo"))
32         self.assertFalse(ldb.valid_attr_name("24foo"))
33
34     def test_timestring(self):
35         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
37
38     def test_string_to_time(self):
39         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
41
42     def test_binary_encode(self):
43         encoded = ldb.binary_encode(b'test\\x')
44         decoded = ldb.binary_decode(encoded)
45         self.assertEqual(decoded, b'test\\x')
46
47         encoded2 = ldb.binary_encode('test\\x')
48         self.assertEqual(encoded2, encoded)
49
50
51 class LdbBaseTest(TestCase):
52     def setUp(self):
53         super(LdbBaseTest, self).setUp()
54         try:
55             if self.prefix is None:
56                 self.prefix = TDB_PREFIX
57         except AttributeError:
58             self.prefix = TDB_PREFIX
59
60     def tearDown(self):
61         super(LdbBaseTest, self).tearDown()
62
63     def url(self):
64         return self.prefix + self.filename
65
66     def flags(self):
67         if self.prefix == MDB_PREFIX:
68             return ldb.FLG_NOSYNC
69         else:
70             return 0
71
72
73 class SimpleLdb(LdbBaseTest):
74
75     def setUp(self):
76         super(SimpleLdb, self).setUp()
77         self.testdir = tempdir()
78         self.filename = os.path.join(self.testdir, "test.ldb")
79         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
80         try:
81             self.ldb.add(self.index)
82         except AttributeError:
83             pass
84
85     def tearDown(self):
86         shutil.rmtree(self.testdir)
87         super(SimpleLdb, self).tearDown()
88         # Ensure the LDB is closed now, so we close the FD
89         del(self.ldb)
90
91     def test_connect(self):
92         ldb.Ldb(self.url(), flags=self.flags())
93
94     def test_connect_none(self):
95         ldb.Ldb()
96
97     def test_connect_later(self):
98         x = ldb.Ldb()
99         x.connect(self.url(), flags=self.flags())
100
101     def test_repr(self):
102         x = ldb.Ldb()
103         self.assertTrue(repr(x).startswith("<ldb connection"))
104
105     def test_set_create_perms(self):
106         x = ldb.Ldb()
107         x.set_create_perms(0o600)
108
109     def test_modules_none(self):
110         x = ldb.Ldb()
111         self.assertEqual([], x.modules())
112
113     def test_modules_tdb(self):
114         x = ldb.Ldb(self.url(), flags=self.flags())
115         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
116
117     def test_firstmodule_none(self):
118         x = ldb.Ldb()
119         self.assertEqual(x.firstmodule, None)
120
121     def test_firstmodule_tdb(self):
122         x = ldb.Ldb(self.url(), flags=self.flags())
123         mod = x.firstmodule
124         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
125
126     def test_search(self):
127         l = ldb.Ldb(self.url(), flags=self.flags())
128         self.assertEqual(len(l.search()), 0)
129
130     def test_search_controls(self):
131         l = ldb.Ldb(self.url(), flags=self.flags())
132         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
133
134     def test_search_attrs(self):
135         l = ldb.Ldb(self.url(), flags=self.flags())
136         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
137
138     def test_search_string_dn(self):
139         l = ldb.Ldb(self.url(), flags=self.flags())
140         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
141
142     def test_search_attr_string(self):
143         l = ldb.Ldb(self.url(), flags=self.flags())
144         self.assertRaises(TypeError, l.search, attrs="dc")
145         self.assertRaises(TypeError, l.search, attrs=b"dc")
146
147     def test_opaque(self):
148         l = ldb.Ldb(self.url(), flags=self.flags())
149         l.set_opaque("my_opaque", l)
150         self.assertTrue(l.get_opaque("my_opaque") is not None)
151         self.assertEqual(None, l.get_opaque("unknown"))
152
153     def test_search_scope_base_empty_db(self):
154         l = ldb.Ldb(self.url(), flags=self.flags())
155         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
156                           ldb.SCOPE_BASE)), 0)
157
158     def test_search_scope_onelevel_empty_db(self):
159         l = ldb.Ldb(self.url(), flags=self.flags())
160         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
161                           ldb.SCOPE_ONELEVEL)), 0)
162
163     def test_delete(self):
164         l = ldb.Ldb(self.url(), flags=self.flags())
165         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
166
167     def test_delete_w_unhandled_ctrl(self):
168         l = ldb.Ldb(self.url(), flags=self.flags())
169         m = ldb.Message()
170         m.dn = ldb.Dn(l, "dc=foo1")
171         m["b"] = [b"a"]
172         m["objectUUID"] = b"0123456789abcdef"
173         l.add(m)
174         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
175         l.delete(m.dn)
176
177     def test_contains(self):
178         name = self.url()
179         l = ldb.Ldb(name, flags=self.flags())
180         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
181         l = ldb.Ldb(name, flags=self.flags())
182         m = ldb.Message()
183         m.dn = ldb.Dn(l, "dc=foo3")
184         m["b"] = ["a"]
185         m["objectUUID"] = b"0123456789abcdef"
186         l.add(m)
187         try:
188             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
189             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
190         finally:
191             l.delete(m.dn)
192
193     def test_get_config_basedn(self):
194         l = ldb.Ldb(self.url(), flags=self.flags())
195         self.assertEqual(None, l.get_config_basedn())
196
197     def test_get_root_basedn(self):
198         l = ldb.Ldb(self.url(), flags=self.flags())
199         self.assertEqual(None, l.get_root_basedn())
200
201     def test_get_schema_basedn(self):
202         l = ldb.Ldb(self.url(), flags=self.flags())
203         self.assertEqual(None, l.get_schema_basedn())
204
205     def test_get_default_basedn(self):
206         l = ldb.Ldb(self.url(), flags=self.flags())
207         self.assertEqual(None, l.get_default_basedn())
208
209     def test_add(self):
210         l = ldb.Ldb(self.url(), flags=self.flags())
211         m = ldb.Message()
212         m.dn = ldb.Dn(l, "dc=foo4")
213         m["bla"] = b"bla"
214         m["objectUUID"] = b"0123456789abcdef"
215         self.assertEqual(len(l.search()), 0)
216         l.add(m)
217         try:
218             self.assertEqual(len(l.search()), 1)
219         finally:
220             l.delete(ldb.Dn(l, "dc=foo4"))
221
222     def test_search_iterator(self):
223         l = ldb.Ldb(self.url(), flags=self.flags())
224         s = l.search_iterator()
225         s.abandon()
226         try:
227             for me in s:
228                 self.fail()
229             self.fail()
230         except RuntimeError as re:
231             pass
232         try:
233             s.abandon()
234             self.fail()
235         except RuntimeError as re:
236             pass
237         try:
238             s.result()
239             self.fail()
240         except RuntimeError as re:
241             pass
242
243         s = l.search_iterator()
244         count = 0
245         for me in s:
246             self.assertTrue(isinstance(me, ldb.Message))
247             count += 1
248         r = s.result()
249         self.assertEqual(len(r), 0)
250         self.assertEqual(count, 0)
251
252         m1 = ldb.Message()
253         m1.dn = ldb.Dn(l, "dc=foo4")
254         m1["bla"] = b"bla"
255         m1["objectUUID"] = b"0123456789abcdef"
256         l.add(m1)
257         try:
258             s = l.search_iterator()
259             msgs = []
260             for me in s:
261                 self.assertTrue(isinstance(me, ldb.Message))
262                 count += 1
263                 msgs.append(me)
264             r = s.result()
265             self.assertEqual(len(r), 0)
266             self.assertEqual(len(msgs), 1)
267             self.assertEqual(msgs[0].dn, m1.dn)
268
269             m2 = ldb.Message()
270             m2.dn = ldb.Dn(l, "dc=foo5")
271             m2["bla"] = b"bla"
272             m2["objectUUID"] = b"0123456789abcdee"
273             l.add(m2)
274
275             s = l.search_iterator()
276             msgs = []
277             for me in s:
278                 self.assertTrue(isinstance(me, ldb.Message))
279                 count += 1
280                 msgs.append(me)
281             r = s.result()
282             self.assertEqual(len(r), 0)
283             self.assertEqual(len(msgs), 2)
284             if msgs[0].dn == m1.dn:
285                 self.assertEqual(msgs[0].dn, m1.dn)
286                 self.assertEqual(msgs[1].dn, m2.dn)
287             else:
288                 self.assertEqual(msgs[0].dn, m2.dn)
289                 self.assertEqual(msgs[1].dn, m1.dn)
290
291             s = l.search_iterator()
292             msgs = []
293             for me in s:
294                 self.assertTrue(isinstance(me, ldb.Message))
295                 count += 1
296                 msgs.append(me)
297                 break
298             try:
299                 s.result()
300                 self.fail()
301             except RuntimeError as re:
302                 pass
303             for me in s:
304                 self.assertTrue(isinstance(me, ldb.Message))
305                 count += 1
306                 msgs.append(me)
307                 break
308             for me in s:
309                 self.fail()
310
311             r = s.result()
312             self.assertEqual(len(r), 0)
313             self.assertEqual(len(msgs), 2)
314             if msgs[0].dn == m1.dn:
315                 self.assertEqual(msgs[0].dn, m1.dn)
316                 self.assertEqual(msgs[1].dn, m2.dn)
317             else:
318                 self.assertEqual(msgs[0].dn, m2.dn)
319                 self.assertEqual(msgs[1].dn, m1.dn)
320         finally:
321             l.delete(ldb.Dn(l, "dc=foo4"))
322             l.delete(ldb.Dn(l, "dc=foo5"))
323
324     def test_add_text(self):
325         l = ldb.Ldb(self.url(), flags=self.flags())
326         m = ldb.Message()
327         m.dn = ldb.Dn(l, "dc=foo4")
328         m["bla"] = "bla"
329         m["objectUUID"] = b"0123456789abcdef"
330         self.assertEqual(len(l.search()), 0)
331         l.add(m)
332         try:
333             self.assertEqual(len(l.search()), 1)
334         finally:
335             l.delete(ldb.Dn(l, "dc=foo4"))
336
337     def test_add_w_unhandled_ctrl(self):
338         l = ldb.Ldb(self.url(), flags=self.flags())
339         m = ldb.Message()
340         m.dn = ldb.Dn(l, "dc=foo4")
341         m["bla"] = b"bla"
342         self.assertEqual(len(l.search()), 0)
343         self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
344
345     def test_add_dict(self):
346         l = ldb.Ldb(self.url(), flags=self.flags())
347         m = {"dn": ldb.Dn(l, "dc=foo5"),
348              "bla": b"bla",
349              "objectUUID": b"0123456789abcdef"}
350         self.assertEqual(len(l.search()), 0)
351         l.add(m)
352         try:
353             self.assertEqual(len(l.search()), 1)
354         finally:
355             l.delete(ldb.Dn(l, "dc=foo5"))
356
357     def test_add_dict_text(self):
358         l = ldb.Ldb(self.url(), flags=self.flags())
359         m = {"dn": ldb.Dn(l, "dc=foo5"),
360              "bla": "bla",
361              "objectUUID": b"0123456789abcdef"}
362         self.assertEqual(len(l.search()), 0)
363         l.add(m)
364         try:
365             self.assertEqual(len(l.search()), 1)
366         finally:
367             l.delete(ldb.Dn(l, "dc=foo5"))
368
369     def test_add_dict_string_dn(self):
370         l = ldb.Ldb(self.url(), flags=self.flags())
371         m = {"dn": "dc=foo6", "bla": b"bla",
372              "objectUUID": b"0123456789abcdef"}
373         self.assertEqual(len(l.search()), 0)
374         l.add(m)
375         try:
376             self.assertEqual(len(l.search()), 1)
377         finally:
378             l.delete(ldb.Dn(l, "dc=foo6"))
379
380     def test_add_dict_bytes_dn(self):
381         l = ldb.Ldb(self.url(), flags=self.flags())
382         m = {"dn": b"dc=foo6", "bla": b"bla",
383               "objectUUID": b"0123456789abcdef"}
384         self.assertEqual(len(l.search()), 0)
385         l.add(m)
386         try:
387             self.assertEqual(len(l.search()), 1)
388         finally:
389             l.delete(ldb.Dn(l, "dc=foo6"))
390
391     def test_rename(self):
392         l = ldb.Ldb(self.url(), flags=self.flags())
393         m = ldb.Message()
394         m.dn = ldb.Dn(l, "dc=foo7")
395         m["bla"] = b"bla"
396         m["objectUUID"] = b"0123456789abcdef"
397         self.assertEqual(len(l.search()), 0)
398         l.add(m)
399         try:
400             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
401             self.assertEqual(len(l.search()), 1)
402         finally:
403             l.delete(ldb.Dn(l, "dc=bar"))
404
405     def test_rename_string_dns(self):
406         l = ldb.Ldb(self.url(), flags=self.flags())
407         m = ldb.Message()
408         m.dn = ldb.Dn(l, "dc=foo8")
409         m["bla"] = b"bla"
410         m["objectUUID"] = b"0123456789abcdef"
411         self.assertEqual(len(l.search()), 0)
412         l.add(m)
413         self.assertEqual(len(l.search()), 1)
414         try:
415             l.rename("dc=foo8", "dc=bar")
416             self.assertEqual(len(l.search()), 1)
417         finally:
418             l.delete(ldb.Dn(l, "dc=bar"))
419
420     def test_empty_dn(self):
421         l = ldb.Ldb(self.url(), flags=self.flags())
422         self.assertEqual(0, len(l.search()))
423         m = ldb.Message()
424         m.dn = ldb.Dn(l, "dc=empty")
425         m["objectUUID"] = b"0123456789abcdef"
426         l.add(m)
427         rm = l.search()
428         self.assertEqual(1, len(rm))
429         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
430                          set(rm[0].keys()))
431
432         rm = l.search(m.dn)
433         self.assertEqual(1, len(rm))
434         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
435                          set(rm[0].keys()))
436         rm = l.search(m.dn, attrs=["blah"])
437         self.assertEqual(1, len(rm))
438         self.assertEqual(0, len(rm[0]))
439
440     def test_modify_delete(self):
441         l = ldb.Ldb(self.url(), flags=self.flags())
442         m = ldb.Message()
443         m.dn = ldb.Dn(l, "dc=modifydelete")
444         m["bla"] = [b"1234"]
445         m["objectUUID"] = b"0123456789abcdef"
446         l.add(m)
447         rm = l.search(m.dn)[0]
448         self.assertEqual([b"1234"], list(rm["bla"]))
449         try:
450             m = ldb.Message()
451             m.dn = ldb.Dn(l, "dc=modifydelete")
452             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
453             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
454             l.modify(m)
455             rm = l.search(m.dn)
456             self.assertEqual(1, len(rm))
457             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
458                              set(rm[0].keys()))
459             rm = l.search(m.dn, attrs=["bla"])
460             self.assertEqual(1, len(rm))
461             self.assertEqual(0, len(rm[0]))
462         finally:
463             l.delete(ldb.Dn(l, "dc=modifydelete"))
464
465     def test_modify_delete_text(self):
466         l = ldb.Ldb(self.url(), flags=self.flags())
467         m = ldb.Message()
468         m.dn = ldb.Dn(l, "dc=modifydelete")
469         m.text["bla"] = ["1234"]
470         m["objectUUID"] = b"0123456789abcdef"
471         l.add(m)
472         rm = l.search(m.dn)[0]
473         self.assertEqual(["1234"], list(rm.text["bla"]))
474         try:
475             m = ldb.Message()
476             m.dn = ldb.Dn(l, "dc=modifydelete")
477             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
478             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
479             l.modify(m)
480             rm = l.search(m.dn)
481             self.assertEqual(1, len(rm))
482             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
483                              set(rm[0].keys()))
484             rm = l.search(m.dn, attrs=["bla"])
485             self.assertEqual(1, len(rm))
486             self.assertEqual(0, len(rm[0]))
487         finally:
488             l.delete(ldb.Dn(l, "dc=modifydelete"))
489
490     def test_modify_add(self):
491         l = ldb.Ldb(self.url(), flags=self.flags())
492         m = ldb.Message()
493         m.dn = ldb.Dn(l, "dc=add")
494         m["bla"] = [b"1234"]
495         m["objectUUID"] = b"0123456789abcdef"
496         l.add(m)
497         try:
498             m = ldb.Message()
499             m.dn = ldb.Dn(l, "dc=add")
500             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
501             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
502             l.modify(m)
503             rm = l.search(m.dn)[0]
504             self.assertEqual(3, len(rm))
505             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
506         finally:
507             l.delete(ldb.Dn(l, "dc=add"))
508
509     def test_modify_add_text(self):
510         l = ldb.Ldb(self.url(), flags=self.flags())
511         m = ldb.Message()
512         m.dn = ldb.Dn(l, "dc=add")
513         m.text["bla"] = ["1234"]
514         m["objectUUID"] = b"0123456789abcdef"
515         l.add(m)
516         try:
517             m = ldb.Message()
518             m.dn = ldb.Dn(l, "dc=add")
519             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
520             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
521             l.modify(m)
522             rm = l.search(m.dn)[0]
523             self.assertEqual(3, len(rm))
524             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
525         finally:
526             l.delete(ldb.Dn(l, "dc=add"))
527
528     def test_modify_replace(self):
529         l = ldb.Ldb(self.url(), flags=self.flags())
530         m = ldb.Message()
531         m.dn = ldb.Dn(l, "dc=modify2")
532         m["bla"] = [b"1234", b"456"]
533         m["objectUUID"] = b"0123456789abcdef"
534         l.add(m)
535         try:
536             m = ldb.Message()
537             m.dn = ldb.Dn(l, "dc=modify2")
538             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
539             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
540             l.modify(m)
541             rm = l.search(m.dn)[0]
542             self.assertEqual(3, len(rm))
543             self.assertEqual([b"789"], list(rm["bla"]))
544             rm = l.search(m.dn, attrs=["bla"])[0]
545             self.assertEqual(1, len(rm))
546         finally:
547             l.delete(ldb.Dn(l, "dc=modify2"))
548
549     def test_modify_replace_text(self):
550         l = ldb.Ldb(self.url(), flags=self.flags())
551         m = ldb.Message()
552         m.dn = ldb.Dn(l, "dc=modify2")
553         m.text["bla"] = ["1234", "456"]
554         m["objectUUID"] = b"0123456789abcdef"
555         l.add(m)
556         try:
557             m = ldb.Message()
558             m.dn = ldb.Dn(l, "dc=modify2")
559             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
560             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
561             l.modify(m)
562             rm = l.search(m.dn)[0]
563             self.assertEqual(3, len(rm))
564             self.assertEqual(["789"], list(rm.text["bla"]))
565             rm = l.search(m.dn, attrs=["bla"])[0]
566             self.assertEqual(1, len(rm))
567         finally:
568             l.delete(ldb.Dn(l, "dc=modify2"))
569
570     def test_modify_flags_change(self):
571         l = ldb.Ldb(self.url(), flags=self.flags())
572         m = ldb.Message()
573         m.dn = ldb.Dn(l, "dc=add")
574         m["bla"] = [b"1234"]
575         m["objectUUID"] = b"0123456789abcdef"
576         l.add(m)
577         try:
578             m = ldb.Message()
579             m.dn = ldb.Dn(l, "dc=add")
580             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
581             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
582             l.modify(m)
583             rm = l.search(m.dn)[0]
584             self.assertEqual(3, len(rm))
585             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
586
587             # Now create another modify, but switch the flags before we do it
588             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
589             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
590             l.modify(m)
591             rm = l.search(m.dn, attrs=["bla"])[0]
592             self.assertEqual(1, len(rm))
593             self.assertEqual([b"1234"], list(rm["bla"]))
594         finally:
595             l.delete(ldb.Dn(l, "dc=add"))
596
597     def test_modify_flags_change_text(self):
598         l = ldb.Ldb(self.url(), flags=self.flags())
599         m = ldb.Message()
600         m.dn = ldb.Dn(l, "dc=add")
601         m.text["bla"] = ["1234"]
602         m["objectUUID"] = b"0123456789abcdef"
603         l.add(m)
604         try:
605             m = ldb.Message()
606             m.dn = ldb.Dn(l, "dc=add")
607             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
608             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
609             l.modify(m)
610             rm = l.search(m.dn)[0]
611             self.assertEqual(3, len(rm))
612             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
613
614             # Now create another modify, but switch the flags before we do it
615             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
616             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
617             l.modify(m)
618             rm = l.search(m.dn, attrs=["bla"])[0]
619             self.assertEqual(1, len(rm))
620             self.assertEqual(["1234"], list(rm.text["bla"]))
621         finally:
622             l.delete(ldb.Dn(l, "dc=add"))
623
624     def test_transaction_commit(self):
625         l = ldb.Ldb(self.url(), flags=self.flags())
626         l.transaction_start()
627         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
628         m["foo"] = [b"bar"]
629         m["objectUUID"] = b"0123456789abcdef"
630         l.add(m)
631         l.transaction_commit()
632         l.delete(m.dn)
633
634     def test_transaction_cancel(self):
635         l = ldb.Ldb(self.url(), flags=self.flags())
636         l.transaction_start()
637         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
638         m["foo"] = [b"bar"]
639         m["objectUUID"] = b"0123456789abcdee"
640         l.add(m)
641         l.transaction_cancel()
642         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
643
644     def test_set_debug(self):
645         def my_report_fn(level, text):
646             pass
647         l = ldb.Ldb(self.url(), flags=self.flags())
648         l.set_debug(my_report_fn)
649
650     def test_zero_byte_string(self):
651         """Testing we do not get trapped in the \0 byte in a property string."""
652         l = ldb.Ldb(self.url(), flags=self.flags())
653         l.add({
654             "dn" : b"dc=somedn",
655             "objectclass" : b"user",
656             "cN" : b"LDAPtestUSER",
657             "givenname" : b"ldap",
658             "displayname" : b"foo\0bar",
659             "objectUUID" : b"0123456789abcdef"
660         })
661         res = l.search(expression="(dn=dc=somedn)")
662         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
663
664     def test_no_crash_broken_expr(self):
665         l = ldb.Ldb(self.url(), flags=self.flags())
666         self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
667
668 # Run the SimpleLdb tests against an lmdb backend
669 class SimpleLdbLmdb(SimpleLdb):
670
671     def setUp(self):
672         self.prefix = MDB_PREFIX
673         super(SimpleLdbLmdb, self).setUp()
674
675     def tearDown(self):
676         super(SimpleLdbLmdb, self).tearDown()
677
678 class SearchTests(LdbBaseTest):
679     def tearDown(self):
680         shutil.rmtree(self.testdir)
681         super(SearchTests, self).tearDown()
682
683         # Ensure the LDB is closed now, so we close the FD
684         del(self.l)
685
686
687     def setUp(self):
688         super(SearchTests, self).setUp()
689         self.testdir = tempdir()
690         self.filename = os.path.join(self.testdir, "search_test.ldb")
691         self.l = ldb.Ldb(self.url(),
692                          flags=self.flags(),
693                          options=["modules:rdn_name"])
694         try:
695             self.l.add(self.index)
696         except AttributeError:
697             pass
698
699         self.l.add({"dn": "@ATTRIBUTES",
700                     "DC": "CASE_INSENSITIVE"})
701
702         # Note that we can't use the name objectGUID here, as we
703         # want to stay clear of the objectGUID handler in LDB and
704         # instead use just the 16 bytes raw, which we just keep
705         # to printable chars here for ease of handling.
706
707         self.l.add({"dn": "DC=SAMBA,DC=ORG",
708                     "name": b"samba.org",
709                     "objectUUID": b"0123456789abcdef"})
710         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
711                     "name": b"Admins",
712                     "x": "z", "y": "a",
713                     "objectUUID": b"0123456789abcde1"})
714         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
715                     "name": b"Users",
716                     "x": "z", "y": "a",
717                     "objectUUID": b"0123456789abcde2"})
718         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
719                     "name": b"OU #1",
720                     "x": "y", "y": "a",
721                     "objectUUID": b"0123456789abcde3"})
722         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
723                     "name": b"OU #2",
724                     "x": "y", "y": "a",
725                     "objectUUID": b"0123456789abcde4"})
726         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
727                     "name": b"OU #3",
728                     "x": "y", "y": "a",
729                     "objectUUID": b"0123456789abcde5"})
730         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
731                     "name": b"OU #4",
732                     "x": "y", "y": "a",
733                     "objectUUID": b"0123456789abcde6"})
734         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
735                     "name": b"OU #5",
736                     "x": "y", "y": "a",
737                     "objectUUID": b"0123456789abcde7"})
738         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
739                     "name": b"OU #6",
740                     "x": "y", "y": "a",
741                     "objectUUID": b"0123456789abcde8"})
742         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
743                     "name": b"OU #7",
744                     "x": "y", "y": "a",
745                     "objectUUID": b"0123456789abcde9"})
746         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
747                     "name": b"OU #8",
748                     "x": "y", "y": "a",
749                     "objectUUID": b"0123456789abcde0"})
750         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
751                     "name": b"OU #9",
752                     "x": "y", "y": "a",
753                     "objectUUID": b"0123456789abcdea"})
754         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
755                     "name": b"OU #10",
756                     "x": "y", "y": "a",
757                     "objectUUID": b"0123456789abcdeb"})
758         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
759                     "name": b"OU #10",
760                     "x": "y", "y": "a",
761                     "objectUUID": b"0123456789abcdec"})
762         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
763                     "name": b"OU #10",
764                     "x": "y", "y": "b",
765                     "objectUUID": b"0123456789abcded"})
766         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
767                     "name": b"OU #10",
768                     "x": "x", "y": "b",
769                     "objectUUID": b"0123456789abcdee"})
770         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
771                     "name": b"OU #10",
772                     "x": "x", "y": "b",
773                     "objectUUID": b"0123456789abcd01"})
774         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
775                     "name": b"OU #10",
776                     "x": "x", "y": "b",
777                     "objectUUID": b"0123456789abcd02"})
778         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
779                     "name": b"OU #10",
780                     "x": "x", "y": "b",
781                     "objectUUID": b"0123456789abcd03"})
782         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
783                     "name": b"OU #10",
784                     "x": "x", "y": "b",
785                     "objectUUID": b"0123456789abcd04"})
786         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
787                     "name": b"OU #10",
788                     "x": "x", "y": "b",
789                     "objectUUID": b"0123456789abcd05"})
790         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
791                     "name": b"OU #10",
792                     "x": "x", "y": "b",
793                     "objectUUID": b"0123456789abcd06"})
794         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
795                     "name": b"OU #10",
796                     "x": "x", "y": "b",
797                     "objectUUID": b"0123456789abcd07"})
798         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
799                     "name": b"OU #10",
800                     "x": "x", "y": "c",
801                     "objectUUID": b"0123456789abcd08"})
802         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
803                     "name": b"OU #10",
804                     "x": "x", "y": "c",
805                     "objectUUID": b"0123456789abcd09"})
806
807     def test_base(self):
808         """Testing a search"""
809
810         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
811                               scope=ldb.SCOPE_BASE)
812         self.assertEqual(len(res11), 1)
813
814     def test_base_lower(self):
815         """Testing a search"""
816
817         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
818                               scope=ldb.SCOPE_BASE)
819         self.assertEqual(len(res11), 1)
820
821     def test_base_or(self):
822         """Testing a search"""
823
824         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
825                               scope=ldb.SCOPE_BASE,
826                               expression="(|(ou=ou11)(ou=ou12))")
827         self.assertEqual(len(res11), 1)
828
829     def test_base_or2(self):
830         """Testing a search"""
831
832         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
833                               scope=ldb.SCOPE_BASE,
834                               expression="(|(x=y)(y=b))")
835         self.assertEqual(len(res11), 1)
836
837     def test_base_and(self):
838         """Testing a search"""
839
840         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
841                               scope=ldb.SCOPE_BASE,
842                               expression="(&(ou=ou11)(ou=ou12))")
843         self.assertEqual(len(res11), 0)
844
845     def test_base_and2(self):
846         """Testing a search"""
847
848         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
849                               scope=ldb.SCOPE_BASE,
850                               expression="(&(x=y)(y=a))")
851         self.assertEqual(len(res11), 1)
852
853     def test_base_false(self):
854         """Testing a search"""
855
856         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
857                               scope=ldb.SCOPE_BASE,
858                               expression="(|(ou=ou13)(ou=ou12))")
859         self.assertEqual(len(res11), 0)
860
861     def test_check_base_false(self):
862         """Testing a search"""
863         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
864                               scope=ldb.SCOPE_BASE,
865                               expression="(|(ou=ou13)(ou=ou12))")
866         self.assertEqual(len(res11), 0)
867
868     def test_check_base_error(self):
869         """Testing a search"""
870         checkbaseonsearch = {"dn": "@OPTIONS",
871                              "checkBaseOnSearch": b"TRUE"}
872         try:
873             self.l.add(checkbaseonsearch)
874         except ldb.LdbError as err:
875             enum = err.args[0]
876             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
877             m = ldb.Message.from_dict(self.l,
878                                       checkbaseonsearch)
879             self.l.modify(m)
880
881         try:
882             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
883                                   scope=ldb.SCOPE_BASE,
884                                   expression="(|(ou=ou13)(ou=ou12))")
885             self.fail("Should have failed on missing base")
886         except ldb.LdbError as err:
887             enum = err.args[0]
888             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
889
890     def test_subtree_and(self):
891         """Testing a search"""
892
893         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
894                               scope=ldb.SCOPE_SUBTREE,
895                               expression="(&(ou=ou11)(ou=ou12))")
896         self.assertEqual(len(res11), 0)
897
898     def test_subtree_and2(self):
899         """Testing a search"""
900
901         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
902                               scope=ldb.SCOPE_SUBTREE,
903                               expression="(&(x=y)(|(y=b)(y=c)))")
904         self.assertEqual(len(res11), 1)
905
906     def test_subtree_and2_lower(self):
907         """Testing a search"""
908
909         res11 = self.l.search(base="DC=samba,DC=org",
910                               scope=ldb.SCOPE_SUBTREE,
911                               expression="(&(x=y)(|(y=b)(y=c)))")
912         self.assertEqual(len(res11), 1)
913
914     def test_subtree_or(self):
915         """Testing a search"""
916
917         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
918                               scope=ldb.SCOPE_SUBTREE,
919                               expression="(|(ou=ou11)(ou=ou12))")
920         self.assertEqual(len(res11), 2)
921
922     def test_subtree_or2(self):
923         """Testing a search"""
924
925         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
926                               scope=ldb.SCOPE_SUBTREE,
927                               expression="(|(x=y)(y=b))")
928         self.assertEqual(len(res11), 20)
929
930     def test_subtree_or3(self):
931         """Testing a search"""
932
933         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
934                               scope=ldb.SCOPE_SUBTREE,
935                               expression="(|(x=y)(y=b)(y=c))")
936         self.assertEqual(len(res11), 22)
937
938     def test_one_and(self):
939         """Testing a search"""
940
941         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
942                               scope=ldb.SCOPE_ONELEVEL,
943                               expression="(&(ou=ou11)(ou=ou12))")
944         self.assertEqual(len(res11), 0)
945
946     def test_one_and2(self):
947         """Testing a search"""
948
949         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
950                               scope=ldb.SCOPE_ONELEVEL,
951                               expression="(&(x=y)(y=b))")
952         self.assertEqual(len(res11), 1)
953
954     def test_one_or(self):
955         """Testing a search"""
956
957         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
958                               scope=ldb.SCOPE_ONELEVEL,
959                               expression="(|(ou=ou11)(ou=ou12))")
960         self.assertEqual(len(res11), 2)
961
962     def test_one_or2(self):
963         """Testing a search"""
964
965         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
966                               scope=ldb.SCOPE_ONELEVEL,
967                               expression="(|(x=y)(y=b))")
968         self.assertEqual(len(res11), 20)
969
970     def test_one_or2_lower(self):
971         """Testing a search"""
972
973         res11 = self.l.search(base="DC=samba,DC=org",
974                               scope=ldb.SCOPE_ONELEVEL,
975                               expression="(|(x=y)(y=b))")
976         self.assertEqual(len(res11), 20)
977
978     def test_subtree_and_or(self):
979         """Testing a search"""
980
981         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
982                               scope=ldb.SCOPE_SUBTREE,
983                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
984         self.assertEqual(len(res11), 0)
985
986     def test_subtree_and_or2(self):
987         """Testing a search"""
988
989         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
990                               scope=ldb.SCOPE_SUBTREE,
991                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
992         self.assertEqual(len(res11), 0)
993
994     def test_subtree_and_or3(self):
995         """Testing a search"""
996
997         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
998                               scope=ldb.SCOPE_SUBTREE,
999                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1000         self.assertEqual(len(res11), 2)
1001
1002     def test_subtree_and_or4(self):
1003         """Testing a search"""
1004
1005         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1006                               scope=ldb.SCOPE_SUBTREE,
1007                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1008         self.assertEqual(len(res11), 2)
1009
1010     def test_subtree_and_or5(self):
1011         """Testing a search"""
1012
1013         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1014                               scope=ldb.SCOPE_SUBTREE,
1015                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1016         self.assertEqual(len(res11), 1)
1017
1018     def test_subtree_or_and(self):
1019         """Testing a search"""
1020
1021         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1022                               scope=ldb.SCOPE_SUBTREE,
1023                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1024         self.assertEqual(len(res11), 10)
1025
1026     def test_subtree_large_and_unique(self):
1027         """Testing a search"""
1028
1029         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1030                               scope=ldb.SCOPE_SUBTREE,
1031                               expression="(&(ou=ou10)(y=a))")
1032         self.assertEqual(len(res11), 1)
1033
1034     def test_subtree_and_none(self):
1035         """Testing a search"""
1036
1037         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1038                               scope=ldb.SCOPE_SUBTREE,
1039                               expression="(&(ou=ouX)(y=a))")
1040         self.assertEqual(len(res11), 0)
1041
1042     def test_subtree_and_idx_record(self):
1043         """Testing a search against the index record"""
1044
1045         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1046                               scope=ldb.SCOPE_SUBTREE,
1047                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1048         self.assertEqual(len(res11), 0)
1049
1050     def test_subtree_and_idxone_record(self):
1051         """Testing a search against the index record"""
1052
1053         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1054                               scope=ldb.SCOPE_SUBTREE,
1055                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1056         self.assertEqual(len(res11), 0)
1057
1058     def test_dn_filter_one(self):
1059         """Testing that a dn= filter succeeds
1060         (or fails with disallowDNFilter
1061         set and IDXGUID or (IDX and not IDXONE) mode)
1062         when the scope is SCOPE_ONELEVEL.
1063
1064         This should be made more consistent, but for now lock in
1065         the behaviour
1066
1067         """
1068
1069         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1070                               scope=ldb.SCOPE_ONELEVEL,
1071                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1072         if hasattr(self, 'disallowDNFilter') and \
1073            hasattr(self, 'IDX') and \
1074            (hasattr(self, 'IDXGUID') or \
1075             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1076             self.assertEqual(len(res11), 0)
1077         else:
1078             self.assertEqual(len(res11), 1)
1079
1080     def test_dn_filter_subtree(self):
1081         """Testing that a dn= filter succeeds
1082         (or fails with disallowDNFilter set)
1083         when the scope is SCOPE_SUBTREE"""
1084
1085         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1086                               scope=ldb.SCOPE_SUBTREE,
1087                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1088         if hasattr(self, 'disallowDNFilter') \
1089            and hasattr(self, 'IDX'):
1090             self.assertEqual(len(res11), 0)
1091         else:
1092             self.assertEqual(len(res11), 1)
1093
1094     def test_dn_filter_base(self):
1095         """Testing that (incorrectly) a dn= filter works
1096         when the scope is SCOPE_BASE"""
1097
1098         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1099                               scope=ldb.SCOPE_BASE,
1100                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1101
1102         # At some point we should fix this, but it isn't trivial
1103         self.assertEqual(len(res11), 1)
1104
1105
1106 # Run the search tests against an lmdb backend
1107 class SearchTestsLmdb(SearchTests):
1108
1109     def setUp(self):
1110         self.prefix = MDB_PREFIX
1111         super(SearchTestsLmdb, self).setUp()
1112
1113     def tearDown(self):
1114         super(SearchTestsLmdb, self).tearDown()
1115
1116
1117 class IndexedSearchTests(SearchTests):
1118     """Test searches using the index, to ensure the index doesn't
1119        break things"""
1120     def setUp(self):
1121         super(IndexedSearchTests, self).setUp()
1122         self.l.add({"dn": "@INDEXLIST",
1123                     "@IDXATTR": [b"x", b"y", b"ou"]})
1124         self.IDX = True
1125
1126 class IndexedSearchDnFilterTests(SearchTests):
1127     """Test searches using the index, to ensure the index doesn't
1128        break things"""
1129     def setUp(self):
1130         super(IndexedSearchDnFilterTests, self).setUp()
1131         self.l.add({"dn": "@OPTIONS",
1132                     "disallowDNFilter": "TRUE"})
1133         self.disallowDNFilter = True
1134
1135         self.l.add({"dn": "@INDEXLIST",
1136                     "@IDXATTR": [b"x", b"y", b"ou"]})
1137         self.IDX = True
1138
1139 class IndexedAndOneLevelSearchTests(SearchTests):
1140     """Test searches using the index including @IDXONE, to ensure
1141        the index doesn't break things"""
1142     def setUp(self):
1143         super(IndexedAndOneLevelSearchTests, self).setUp()
1144         self.l.add({"dn": "@INDEXLIST",
1145                     "@IDXATTR": [b"x", b"y", b"ou"],
1146                     "@IDXONE": [b"1"]})
1147         self.IDX = True
1148
1149 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1150     """Test searches using the index including @IDXONE, to ensure
1151        the index doesn't break things"""
1152     def setUp(self):
1153         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1154         self.l.add({"dn": "@OPTIONS",
1155                     "disallowDNFilter": "TRUE"})
1156         self.disallowDNFilter = True
1157
1158         self.l.add({"dn": "@INDEXLIST",
1159                     "@IDXATTR": [b"x", b"y", b"ou"],
1160                     "@IDXONE": [b"1"]})
1161         self.IDX = True
1162         self.IDXONE = True
1163
1164 class GUIDIndexedSearchTests(SearchTests):
1165     """Test searches using the index, to ensure the index doesn't
1166        break things"""
1167     def setUp(self):
1168         self.index = {"dn": "@INDEXLIST",
1169                       "@IDXATTR": [b"x", b"y", b"ou"],
1170                       "@IDXGUID": [b"objectUUID"],
1171                       "@IDX_DN_GUID": [b"GUID"]}
1172         super(GUIDIndexedSearchTests, self).setUp()
1173
1174         self.IDXGUID = True
1175         self.IDXONE = True
1176
1177
1178 class GUIDIndexedDNFilterSearchTests(SearchTests):
1179     """Test searches using the index, to ensure the index doesn't
1180        break things"""
1181     def setUp(self):
1182         self.index = {"dn": "@INDEXLIST",
1183                       "@IDXATTR": [b"x", b"y", b"ou"],
1184                       "@IDXGUID": [b"objectUUID"],
1185                       "@IDX_DN_GUID": [b"GUID"]}
1186         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1187         self.l.add({"dn": "@OPTIONS",
1188                     "disallowDNFilter": "TRUE"})
1189         self.disallowDNFilter = True
1190         self.IDX = True
1191         self.IDXGUID = True
1192
1193 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1194     """Test searches using the index including @IDXONE, to ensure
1195        the index doesn't break things"""
1196     def setUp(self):
1197         self.index = {"dn": "@INDEXLIST",
1198                       "@IDXATTR": [b"x", b"y", b"ou"],
1199                       "@IDXGUID": [b"objectUUID"],
1200                       "@IDX_DN_GUID": [b"GUID"]}
1201         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1202         self.l.add({"dn": "@OPTIONS",
1203                     "disallowDNFilter": "TRUE"})
1204         self.disallowDNFilter = True
1205         self.IDX = True
1206         self.IDXGUID = True
1207         self.IDXONE = True
1208
1209 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1210
1211     def setUp(self):
1212         self.prefix = MDB_PREFIX
1213         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1214
1215     def tearDown(self):
1216         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1217
1218
1219 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1220
1221     def setUp(self):
1222         self.prefix = MDB_PREFIX
1223         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1224
1225     def tearDown(self):
1226         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1227
1228
1229 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1230
1231     def setUp(self):
1232         self.prefix = MDB_PREFIX
1233         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1234
1235     def tearDown(self):
1236         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1237
1238
1239 class AddModifyTests(LdbBaseTest):
1240     def tearDown(self):
1241         shutil.rmtree(self.testdir)
1242         super(AddModifyTests, self).tearDown()
1243
1244         # Ensure the LDB is closed now, so we close the FD
1245         del(self.l)
1246
1247     def setUp(self):
1248         super(AddModifyTests, self).setUp()
1249         self.testdir = tempdir()
1250         self.filename = os.path.join(self.testdir, "add_test.ldb")
1251         self.l = ldb.Ldb(self.url(),
1252                          flags=self.flags(),
1253                          options=["modules:rdn_name"])
1254         try:
1255             self.l.add(self.index)
1256         except AttributeError:
1257             pass
1258
1259         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1260                     "name": b"samba.org",
1261                     "objectUUID": b"0123456789abcdef"})
1262         self.l.add({"dn": "@ATTRIBUTES",
1263                     "objectUUID": "UNIQUE_INDEX"})
1264
1265     def test_add_dup(self):
1266         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1267                     "name": b"Admins",
1268                     "x": "z", "y": "a",
1269                     "objectUUID": b"0123456789abcde1"})
1270         try:
1271             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1272                         "name": b"Admins",
1273                         "x": "z", "y": "a",
1274                         "objectUUID": b"0123456789abcde2"})
1275             self.fail("Should have failed adding dupliate entry")
1276         except ldb.LdbError as err:
1277             enum = err.args[0]
1278             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1279
1280     def test_add_del_add(self):
1281         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1282                     "name": b"Admins",
1283                     "x": "z", "y": "a",
1284                     "objectUUID": b"0123456789abcde1"})
1285         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1286         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1287                     "name": b"Admins",
1288                     "x": "z", "y": "a",
1289                     "objectUUID": b"0123456789abcde2"})
1290
1291     def test_add_move_add(self):
1292         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1293                     "name": b"Admins",
1294                     "x": "z", "y": "a",
1295                     "objectUUID": b"0123456789abcde1"})
1296         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1297                       "OU=DUP2,DC=SAMBA,DC=ORG")
1298         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1299                     "name": b"Admins",
1300                     "x": "z", "y": "a",
1301                     "objectUUID": b"0123456789abcde2"})
1302
1303     def test_add_move_fail_move_move(self):
1304         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1305                     "name": b"Admins",
1306                     "x": "z", "y": "a",
1307                     "objectUUID": b"0123456789abcde1"})
1308         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1309                     "name": b"Admins",
1310                     "x": "z", "y": "a",
1311                     "objectUUID": b"0123456789abcde2"})
1312
1313         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1314                              scope=ldb.SCOPE_SUBTREE,
1315                              expression="(objectUUID=0123456789abcde1)")
1316         self.assertEqual(len(res2), 1)
1317         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1318
1319         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1320                              scope=ldb.SCOPE_SUBTREE,
1321                              expression="(objectUUID=0123456789abcde2)")
1322         self.assertEqual(len(res3), 1)
1323         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1324
1325         try:
1326             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1327                           "OU=DUP2,DC=SAMBA,DC=ORG")
1328             self.fail("Should have failed on duplicate DN")
1329         except ldb.LdbError as err:
1330             enum = err.args[0]
1331             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1332
1333         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1334                       "OU=DUP3,DC=SAMBA,DC=ORG")
1335
1336         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1337                       "OU=DUP2,DC=SAMBA,DC=ORG")
1338
1339         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1340                              scope=ldb.SCOPE_SUBTREE,
1341                              expression="(objectUUID=0123456789abcde1)")
1342         self.assertEqual(len(res2), 1)
1343         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1344
1345         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1346                              scope=ldb.SCOPE_SUBTREE,
1347                              expression="(objectUUID=0123456789abcde2)")
1348         self.assertEqual(len(res3), 1)
1349         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1350
1351     def test_move_missing(self):
1352         try:
1353             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1354                           "OU=DUP2,DC=SAMBA,DC=ORG")
1355             self.fail("Should have failed on missing")
1356         except ldb.LdbError as err:
1357             enum = err.args[0]
1358             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1359
1360     def test_move_missing2(self):
1361         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1362                     "name": b"Admins",
1363                     "x": "z", "y": "a",
1364                     "objectUUID": b"0123456789abcde2"})
1365
1366         try:
1367             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1368                           "OU=DUP2,DC=SAMBA,DC=ORG")
1369             self.fail("Should have failed on missing")
1370         except ldb.LdbError as err:
1371             enum = err.args[0]
1372             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1373
1374     def test_move_fail_move_add(self):
1375         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1376                     "name": b"Admins",
1377                     "x": "z", "y": "a",
1378                     "objectUUID": b"0123456789abcde1"})
1379         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1380                     "name": b"Admins",
1381                     "x": "z", "y": "a",
1382                     "objectUUID": b"0123456789abcde2"})
1383         try:
1384             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1385                           "OU=DUP2,DC=SAMBA,DC=ORG")
1386             self.fail("Should have failed on duplicate DN")
1387         except ldb.LdbError as err:
1388             enum = err.args[0]
1389             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1390
1391         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1392                       "OU=DUP3,DC=SAMBA,DC=ORG")
1393
1394         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1395                     "name": b"Admins",
1396                     "x": "z", "y": "a",
1397                     "objectUUID": b"0123456789abcde3"})
1398
1399
1400 class AddModifyTestsLmdb(AddModifyTests):
1401
1402     def setUp(self):
1403         self.prefix = MDB_PREFIX
1404         super(AddModifyTestsLmdb, self).setUp()
1405
1406     def tearDown(self):
1407         super(AddModifyTestsLmdb, self).tearDown()
1408
1409 class IndexedAddModifyTests(AddModifyTests):
1410     """Test searches using the index, to ensure the index doesn't
1411        break things"""
1412     def setUp(self):
1413         if not hasattr(self, 'index'):
1414             self.index = {"dn": "@INDEXLIST",
1415                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1416                           "@IDXONE": [b"1"]}
1417         super(IndexedAddModifyTests, self).setUp()
1418
1419     def test_duplicate_GUID(self):
1420         try:
1421             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1422                         "name": b"Admins",
1423                         "x": "z", "y": "a",
1424                         "objectUUID": b"0123456789abcdef"})
1425             self.fail("Should have failed adding dupliate GUID")
1426         except ldb.LdbError as err:
1427             enum = err.args[0]
1428             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1429
1430     def test_duplicate_name_dup_GUID(self):
1431         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1432                     "name": b"Admins",
1433                     "x": "z", "y": "a",
1434                     "objectUUID": b"a123456789abcdef"})
1435         try:
1436             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1437                         "name": b"Admins",
1438                         "x": "z", "y": "a",
1439                         "objectUUID": b"a123456789abcdef"})
1440             self.fail("Should have failed adding dupliate GUID")
1441         except ldb.LdbError as err:
1442             enum = err.args[0]
1443             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1444
1445     def test_duplicate_name_dup_GUID2(self):
1446         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1447                     "name": b"Admins",
1448                     "x": "z", "y": "a",
1449                     "objectUUID": b"abc3456789abcdef"})
1450         try:
1451             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1452                         "name": b"Admins",
1453                         "x": "z", "y": "a",
1454                         "objectUUID": b"aaa3456789abcdef"})
1455             self.fail("Should have failed adding dupliate DN")
1456         except ldb.LdbError as err:
1457             enum = err.args[0]
1458             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1459
1460         # Checking the GUID didn't stick in the index
1461         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1462                     "name": b"Admins",
1463                     "x": "z", "y": "a",
1464                     "objectUUID": b"aaa3456789abcdef"})
1465
1466     def test_add_dup_guid_add(self):
1467         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1468                     "name": b"Admins",
1469                     "x": "z", "y": "a",
1470                     "objectUUID": b"0123456789abcde1"})
1471         try:
1472             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1473                         "name": b"Admins",
1474                         "x": "z", "y": "a",
1475                         "objectUUID": b"0123456789abcde1"})
1476             self.fail("Should have failed on duplicate GUID")
1477
1478         except ldb.LdbError as err:
1479             enum = err.args[0]
1480             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1481
1482         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1483                     "name": b"Admins",
1484                     "x": "z", "y": "a",
1485                     "objectUUID": b"0123456789abcde2"})
1486
1487 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1488     """Test searches using the index, to ensure the index doesn't
1489        break things"""
1490     def setUp(self):
1491         self.index = {"dn": "@INDEXLIST",
1492                       "@IDXATTR": [b"x", b"y", b"ou"],
1493                       "@IDXONE": [b"1"],
1494                       "@IDXGUID": [b"objectUUID"],
1495                       "@IDX_DN_GUID": [b"GUID"]}
1496         super(GUIDIndexedAddModifyTests, self).setUp()
1497
1498
1499 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1500     """Test GUID index behaviour insdie the transaction"""
1501     def setUp(self):
1502         super(GUIDTransIndexedAddModifyTests, self).setUp()
1503         self.l.transaction_start()
1504
1505     def tearDown(self):
1506         self.l.transaction_commit()
1507         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1508
1509 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1510     """Test index behaviour insdie the transaction"""
1511     def setUp(self):
1512         super(TransIndexedAddModifyTests, self).setUp()
1513         self.l.transaction_start()
1514
1515     def tearDown(self):
1516         self.l.transaction_commit()
1517         super(TransIndexedAddModifyTests, self).tearDown()
1518
1519 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1520
1521     def setUp(self):
1522         self.prefix = MDB_PREFIX
1523         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1524
1525     def tearDown(self):
1526         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1527
1528 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1529
1530     def setUp(self):
1531         self.prefix = MDB_PREFIX
1532         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1533
1534     def tearDown(self):
1535         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1536
1537 class BadIndexTests(LdbBaseTest):
1538     def setUp(self):
1539         super(BadIndexTests, self).setUp()
1540         self.testdir = tempdir()
1541         self.filename = os.path.join(self.testdir, "test.ldb")
1542         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1543         if hasattr(self, 'IDXGUID'):
1544             self.ldb.add({"dn": "@INDEXLIST",
1545                           "@IDXATTR": [b"x", b"y", b"ou"],
1546                           "@IDXGUID": [b"objectUUID"],
1547                           "@IDX_DN_GUID": [b"GUID"]})
1548         else:
1549             self.ldb.add({"dn": "@INDEXLIST",
1550                           "@IDXATTR": [b"x", b"y", b"ou"]})
1551
1552         super(BadIndexTests, self).setUp()
1553
1554     def test_unique(self):
1555         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1556                       "objectUUID": b"0123456789abcde1",
1557                       "y": "1"})
1558         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1559                       "objectUUID": b"0123456789abcde2",
1560                       "y": "1"})
1561         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1562                       "objectUUID": b"0123456789abcde3",
1563                       "y": "1"})
1564
1565         res = self.ldb.search(expression="(y=1)",
1566                               base="dc=samba,dc=org")
1567         self.assertEquals(len(res), 3)
1568
1569         # Now set this to unique index, but forget to check the result
1570         try:
1571             self.ldb.add({"dn": "@ATTRIBUTES",
1572                         "y": "UNIQUE_INDEX"})
1573             self.fail()
1574         except ldb.LdbError:
1575             pass
1576
1577         # We must still have a working index
1578         res = self.ldb.search(expression="(y=1)",
1579                               base="dc=samba,dc=org")
1580         self.assertEquals(len(res), 3)
1581
1582     def test_unique_transaction(self):
1583         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1584                       "objectUUID": b"0123456789abcde1",
1585                       "y": "1"})
1586         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1587                       "objectUUID": b"0123456789abcde2",
1588                       "y": "1"})
1589         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1590                       "objectUUID": b"0123456789abcde3",
1591                       "y": "1"})
1592
1593         res = self.ldb.search(expression="(y=1)",
1594                               base="dc=samba,dc=org")
1595         self.assertEquals(len(res), 3)
1596
1597         self.ldb.transaction_start()
1598
1599         # Now set this to unique index, but forget to check the result
1600         try:
1601             self.ldb.add({"dn": "@ATTRIBUTES",
1602                         "y": "UNIQUE_INDEX"})
1603         except ldb.LdbError:
1604             pass
1605
1606         try:
1607             self.ldb.transaction_commit()
1608             self.fail()
1609
1610         except ldb.LdbError as err:
1611             enum = err.args[0]
1612             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1613
1614         # We must still have a working index
1615         res = self.ldb.search(expression="(y=1)",
1616                               base="dc=samba,dc=org")
1617
1618         self.assertEquals(len(res), 3)
1619
1620     def test_casefold(self):
1621         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1622                       "objectUUID": b"0123456789abcde1",
1623                       "y": "a"})
1624         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1625                       "objectUUID": b"0123456789abcde2",
1626                       "y": "A"})
1627         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1628                       "objectUUID": b"0123456789abcde3",
1629                       "y": ["a", "A"]})
1630
1631         res = self.ldb.search(expression="(y=a)",
1632                               base="dc=samba,dc=org")
1633         self.assertEquals(len(res), 2)
1634
1635         self.ldb.add({"dn": "@ATTRIBUTES",
1636                       "y": "CASE_INSENSITIVE"})
1637
1638         # We must still have a working index
1639         res = self.ldb.search(expression="(y=a)",
1640                               base="dc=samba,dc=org")
1641
1642         if hasattr(self, 'IDXGUID'):
1643             self.assertEquals(len(res), 3)
1644         else:
1645             # We should not return this entry twice, but sadly
1646             # we have not yet fixed
1647             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1648             self.assertEquals(len(res), 4)
1649
1650     def test_casefold_transaction(self):
1651         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1652                       "objectUUID": b"0123456789abcde1",
1653                       "y": "a"})
1654         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1655                       "objectUUID": b"0123456789abcde2",
1656                       "y": "A"})
1657         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1658                       "objectUUID": b"0123456789abcde3",
1659                       "y": ["a", "A"]})
1660
1661         res = self.ldb.search(expression="(y=a)",
1662                               base="dc=samba,dc=org")
1663         self.assertEquals(len(res), 2)
1664
1665         self.ldb.transaction_start()
1666
1667         self.ldb.add({"dn": "@ATTRIBUTES",
1668                       "y": "CASE_INSENSITIVE"})
1669
1670         self.ldb.transaction_commit()
1671
1672         # We must still have a working index
1673         res = self.ldb.search(expression="(y=a)",
1674                               base="dc=samba,dc=org")
1675
1676         if hasattr(self, 'IDXGUID'):
1677             self.assertEquals(len(res), 3)
1678         else:
1679             # We should not return this entry twice, but sadly
1680             # we have not yet fixed
1681             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1682             self.assertEquals(len(res), 4)
1683
1684
1685     def tearDown(self):
1686         super(BadIndexTests, self).tearDown()
1687
1688
1689 class GUIDBadIndexTests(BadIndexTests):
1690     """Test Bad index things with GUID index mode"""
1691     def setUp(self):
1692         self.IDXGUID = True
1693
1694         super(GUIDBadIndexTests, self).setUp()
1695
1696 class DnTests(TestCase):
1697
1698     def setUp(self):
1699         super(DnTests, self).setUp()
1700         self.ldb = ldb.Ldb()
1701
1702     def tearDown(self):
1703         super(DnTests, self).tearDown()
1704         del(self.ldb)
1705
1706     def test_set_dn_invalid(self):
1707         x = ldb.Message()
1708         def assign():
1709             x.dn = "astring"
1710         self.assertRaises(TypeError, assign)
1711
1712     def test_eq(self):
1713         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1714         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1715         self.assertEqual(x, y)
1716         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1717         self.assertNotEqual(x, y)
1718
1719     def test_str(self):
1720         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1721         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1722
1723     def test_repr(self):
1724         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1725         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1726
1727     def test_get_casefold(self):
1728         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1729         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1730
1731     def test_validate(self):
1732         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1733         self.assertTrue(x.validate())
1734
1735     def test_parent(self):
1736         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1737         self.assertEqual("bar=bloe", x.parent().__str__())
1738
1739     def test_parent_nonexistent(self):
1740         x = ldb.Dn(self.ldb, "@BLA")
1741         self.assertEqual(None, x.parent())
1742
1743     def test_is_valid(self):
1744         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1745         self.assertTrue(x.is_valid())
1746         x = ldb.Dn(self.ldb, "")
1747         self.assertTrue(x.is_valid())
1748
1749     def test_is_special(self):
1750         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1751         self.assertFalse(x.is_special())
1752         x = ldb.Dn(self.ldb, "@FOOBAR")
1753         self.assertTrue(x.is_special())
1754
1755     def test_check_special(self):
1756         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1757         self.assertFalse(x.check_special("FOOBAR"))
1758         x = ldb.Dn(self.ldb, "@FOOBAR")
1759         self.assertTrue(x.check_special("@FOOBAR"))
1760
1761     def test_len(self):
1762         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1763         self.assertEqual(2, len(x))
1764         x = ldb.Dn(self.ldb, "dc=foo21")
1765         self.assertEqual(1, len(x))
1766
1767     def test_add_child(self):
1768         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1769         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1770         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1771
1772     def test_add_base(self):
1773         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1774         base = ldb.Dn(self.ldb, "bla=bloe")
1775         self.assertTrue(x.add_base(base))
1776         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1777
1778     def test_add_child_str(self):
1779         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1780         self.assertTrue(x.add_child("bla=bloe"))
1781         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1782
1783     def test_add_base_str(self):
1784         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1785         base = "bla=bloe"
1786         self.assertTrue(x.add_base(base))
1787         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1788
1789     def test_add(self):
1790         x = ldb.Dn(self.ldb, "dc=foo24")
1791         y = ldb.Dn(self.ldb, "bar=bla")
1792         self.assertEqual("dc=foo24,bar=bla", str(x + y))
1793
1794     def test_remove_base_components(self):
1795         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1796         x.remove_base_components(len(x)-1)
1797         self.assertEqual("dc=foo24", str(x))
1798
1799     def test_parse_ldif(self):
1800         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1801         msg = next(msgs)
1802         self.assertEqual("foo=bar", str(msg[1].dn))
1803         self.assertTrue(isinstance(msg[1], ldb.Message))
1804         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1805         self.assertEqual("dn: foo=bar\n\n", ldif)
1806
1807     def test_parse_ldif_more(self):
1808         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1809         msg = next(msgs)
1810         self.assertEqual("foo=bar", str(msg[1].dn))
1811         msg = next(msgs)
1812         self.assertEqual("bar=bar", str(msg[1].dn))
1813
1814     def test_canonical_string(self):
1815         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1816         self.assertEqual("/bloe/foo25", x.canonical_str())
1817
1818     def test_canonical_ex_string(self):
1819         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1820         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1821
1822     def test_ldb_is_child_of(self):
1823         """Testing ldb_dn_compare_dn"""
1824         dn1 = ldb.Dn(self.ldb, "dc=base")
1825         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1826         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1827         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1828
1829         self.assertTrue(dn1.is_child_of(dn1))
1830         self.assertTrue(dn2.is_child_of(dn1))
1831         self.assertTrue(dn4.is_child_of(dn1))
1832         self.assertTrue(dn4.is_child_of(dn3))
1833         self.assertTrue(dn4.is_child_of(dn4))
1834         self.assertFalse(dn3.is_child_of(dn2))
1835         self.assertFalse(dn1.is_child_of(dn4))
1836
1837     def test_ldb_is_child_of_str(self):
1838         """Testing ldb_dn_compare_dn"""
1839         dn1_str = "dc=base"
1840         dn2_str = "cn=foo,dc=base"
1841         dn3_str = "cn=bar,dc=base"
1842         dn4_str = "cn=baz,cn=bar,dc=base"
1843
1844         dn1 = ldb.Dn(self.ldb, dn1_str)
1845         dn2 = ldb.Dn(self.ldb, dn2_str)
1846         dn3 = ldb.Dn(self.ldb, dn3_str)
1847         dn4 = ldb.Dn(self.ldb, dn4_str)
1848
1849         self.assertTrue(dn1.is_child_of(dn1_str))
1850         self.assertTrue(dn2.is_child_of(dn1_str))
1851         self.assertTrue(dn4.is_child_of(dn1_str))
1852         self.assertTrue(dn4.is_child_of(dn3_str))
1853         self.assertTrue(dn4.is_child_of(dn4_str))
1854         self.assertFalse(dn3.is_child_of(dn2_str))
1855         self.assertFalse(dn1.is_child_of(dn4_str))
1856
1857     def test_get_component_name(self):
1858         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1859         self.assertEqual(dn.get_component_name(0), 'cn')
1860         self.assertEqual(dn.get_component_name(1), 'dc')
1861         self.assertEqual(dn.get_component_name(2), None)
1862         self.assertEqual(dn.get_component_name(-1), None)
1863
1864     def test_get_component_value(self):
1865         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1866         self.assertEqual(dn.get_component_value(0), 'foo')
1867         self.assertEqual(dn.get_component_value(1), 'base')
1868         self.assertEqual(dn.get_component_name(2), None)
1869         self.assertEqual(dn.get_component_name(-1), None)
1870
1871     def test_set_component(self):
1872         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1873         dn.set_component(0, 'cn', 'bar')
1874         self.assertEqual(str(dn), "cn=bar,dc=base")
1875         dn.set_component(1, 'o', 'asep')
1876         self.assertEqual(str(dn), "cn=bar,o=asep")
1877         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1878         self.assertEqual(str(dn), "cn=bar,o=asep")
1879         dn.set_component(1, 'o', 'a,b+c')
1880         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1881
1882     def test_set_component_bytes(self):
1883         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1884         dn.set_component(0, 'cn', b'bar')
1885         self.assertEqual(str(dn), "cn=bar,dc=base")
1886         dn.set_component(1, 'o', b'asep')
1887         self.assertEqual(str(dn), "cn=bar,o=asep")
1888
1889     def test_set_component_none(self):
1890         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1891         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1892
1893     def test_get_extended_component_null(self):
1894         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1895         self.assertEqual(dn.get_extended_component("TEST"), None)
1896
1897     def test_get_extended_component(self):
1898         self.ldb._register_test_extensions()
1899         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1900         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1901
1902     def test_set_extended_component(self):
1903         self.ldb._register_test_extensions()
1904         dn = ldb.Dn(self.ldb, "dc=base")
1905         dn.set_extended_component("TEST", "foo")
1906         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1907         dn.set_extended_component("TEST", b"bar")
1908         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1909
1910     def test_extended_str(self):
1911         self.ldb._register_test_extensions()
1912         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1913         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1914
1915     def test_get_rdn_name(self):
1916         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1917         self.assertEqual(dn.get_rdn_name(), 'cn')
1918
1919     def test_get_rdn_value(self):
1920         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1921         self.assertEqual(dn.get_rdn_value(), 'foo')
1922
1923     def test_get_casefold(self):
1924         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1925         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1926
1927     def test_get_linearized(self):
1928         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1929         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1930
1931     def test_is_null(self):
1932         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1933         self.assertFalse(dn.is_null())
1934
1935         dn = ldb.Dn(self.ldb, '')
1936         self.assertTrue(dn.is_null())
1937
1938 class LdbMsgTests(TestCase):
1939
1940     def setUp(self):
1941         super(LdbMsgTests, self).setUp()
1942         self.msg = ldb.Message()
1943
1944     def test_init_dn(self):
1945         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1946         self.assertEqual("dc=foo27", str(self.msg.dn))
1947
1948     def test_iter_items(self):
1949         self.assertEqual(0, len(self.msg.items()))
1950         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
1951         self.assertEqual(1, len(self.msg.items()))
1952
1953     def test_repr(self):
1954         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
1955         self.msg["dc"] = b"foo"
1956         if PY3:
1957             self.assertIn(repr(self.msg), [
1958                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1959                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1960             ])
1961             self.assertIn(repr(self.msg.text), [
1962                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1963                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1964             ])
1965         else:
1966             self.assertEqual(
1967                 repr(self.msg),
1968                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1969             self.assertEqual(
1970                 repr(self.msg.text),
1971                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1972
1973     def test_len(self):
1974         self.assertEqual(0, len(self.msg))
1975
1976     def test_notpresent(self):
1977         self.assertRaises(KeyError, lambda: self.msg["foo"])
1978
1979     def test_del(self):
1980         del self.msg["foo"]
1981
1982     def test_add(self):
1983         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1984
1985     def test_add_text(self):
1986         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1987
1988     def test_elements_empty(self):
1989         self.assertEqual([], self.msg.elements())
1990
1991     def test_elements(self):
1992         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1993         self.msg.add(el)
1994         self.assertEqual([el], self.msg.elements())
1995         self.assertEqual([el.text], self.msg.text.elements())
1996
1997     def test_add_value(self):
1998         self.assertEqual(0, len(self.msg))
1999         self.msg["foo"] = [b"foo"]
2000         self.assertEqual(1, len(self.msg))
2001
2002     def test_add_value_text(self):
2003         self.assertEqual(0, len(self.msg))
2004         self.msg["foo"] = ["foo"]
2005         self.assertEqual(1, len(self.msg))
2006
2007     def test_add_value_multiple(self):
2008         self.assertEqual(0, len(self.msg))
2009         self.msg["foo"] = [b"foo", b"bla"]
2010         self.assertEqual(1, len(self.msg))
2011         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2012
2013     def test_add_value_multiple_text(self):
2014         self.assertEqual(0, len(self.msg))
2015         self.msg["foo"] = ["foo", "bla"]
2016         self.assertEqual(1, len(self.msg))
2017         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2018
2019     def test_set_value(self):
2020         self.msg["foo"] = [b"fool"]
2021         self.assertEqual([b"fool"], list(self.msg["foo"]))
2022         self.msg["foo"] = [b"bar"]
2023         self.assertEqual([b"bar"], list(self.msg["foo"]))
2024
2025     def test_set_value_text(self):
2026         self.msg["foo"] = ["fool"]
2027         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2028         self.msg["foo"] = ["bar"]
2029         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2030
2031     def test_keys(self):
2032         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2033         self.msg["foo"] = [b"bla"]
2034         self.msg["bar"] = [b"bla"]
2035         self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
2036
2037     def test_keys_text(self):
2038         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2039         self.msg["foo"] = ["bla"]
2040         self.msg["bar"] = ["bla"]
2041         self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
2042
2043     def test_dn(self):
2044         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2045         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2046
2047     def test_get_dn(self):
2048         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2049         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2050
2051     def test_dn_text(self):
2052         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2053         self.assertEqual("@BASEINFO", str(self.msg.dn))
2054         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2055
2056     def test_get_dn_text(self):
2057         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2058         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2059         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2060
2061     def test_get_invalid(self):
2062         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2063         self.assertRaises(TypeError, self.msg.get, 42)
2064
2065     def test_get_other(self):
2066         self.msg["foo"] = [b"bar"]
2067         self.assertEqual(b"bar", self.msg.get("foo")[0])
2068         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2069         self.assertEqual(None, self.msg.get("foo", idx=1))
2070         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2071
2072     def test_get_other_text(self):
2073         self.msg["foo"] = ["bar"]
2074         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2075         self.assertEqual("bar", self.msg.text.get("foo")[0])
2076         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2077         self.assertEqual(None, self.msg.get("foo", idx=1))
2078         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2079
2080     def test_get_default(self):
2081         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2082         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2083
2084     def test_get_default_text(self):
2085         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2086         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2087
2088     def test_get_unknown(self):
2089         self.assertEqual(None, self.msg.get("lalalala"))
2090
2091     def test_get_unknown_text(self):
2092         self.assertEqual(None, self.msg.text.get("lalalala"))
2093
2094     def test_msg_diff(self):
2095         l = ldb.Ldb()
2096         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2097         msg1 = next(msgs)[1]
2098         msg2 = next(msgs)[1]
2099         msgdiff = l.msg_diff(msg1, msg2)
2100         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2101         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2102         self.assertEqual(1, len(msgdiff))
2103
2104     def test_equal_empty(self):
2105         msg1 = ldb.Message()
2106         msg2 = ldb.Message()
2107         self.assertEqual(msg1, msg2)
2108
2109     def test_equal_simplel(self):
2110         db = ldb.Ldb()
2111         msg1 = ldb.Message()
2112         msg1.dn = ldb.Dn(db, "foo=bar")
2113         msg2 = ldb.Message()
2114         msg2.dn = ldb.Dn(db, "foo=bar")
2115         self.assertEqual(msg1, msg2)
2116         msg1['foo'] = b'bar'
2117         msg2['foo'] = b'bar'
2118         self.assertEqual(msg1, msg2)
2119         msg2['foo'] = b'blie'
2120         self.assertNotEqual(msg1, msg2)
2121         msg2['foo'] = b'blie'
2122
2123     def test_from_dict(self):
2124         rec = {"dn": "dc=fromdict",
2125                "a1": [b"a1-val1", b"a1-val1"]}
2126         l = ldb.Ldb()
2127         # check different types of input Flags
2128         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2129             m = ldb.Message.from_dict(l, rec, flags)
2130             self.assertEqual(rec["a1"], list(m["a1"]))
2131             self.assertEqual(flags, m["a1"].flags())
2132         # check input params
2133         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2134         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2135         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2136         # Message.from_dict expects dictionary with 'dn'
2137         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2138         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2139
2140     def test_from_dict_text(self):
2141         rec = {"dn": "dc=fromdict",
2142                "a1": ["a1-val1", "a1-val1"]}
2143         l = ldb.Ldb()
2144         # check different types of input Flags
2145         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2146             m = ldb.Message.from_dict(l, rec, flags)
2147             self.assertEqual(rec["a1"], list(m.text["a1"]))
2148             self.assertEqual(flags, m.text["a1"].flags())
2149         # check input params
2150         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2151         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2152         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2153         # Message.from_dict expects dictionary with 'dn'
2154         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2155         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2156
2157     def test_copy_add_message_element(self):
2158         m = ldb.Message()
2159         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2160         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2161         mto = ldb.Message()
2162         mto["1"] = m["1"]
2163         mto["2"] = m["2"]
2164         self.assertEqual(mto["1"], m["1"])
2165         self.assertEqual(mto["2"], m["2"])
2166         mto = ldb.Message()
2167         mto.add(m["1"])
2168         mto.add(m["2"])
2169         self.assertEqual(mto["1"], m["1"])
2170         self.assertEqual(mto["2"], m["2"])
2171
2172     def test_copy_add_message_element_text(self):
2173         m = ldb.Message()
2174         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2175         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2176         mto = ldb.Message()
2177         mto["1"] = m["1"]
2178         mto["2"] = m["2"]
2179         self.assertEqual(mto["1"], m.text["1"])
2180         self.assertEqual(mto["2"], m.text["2"])
2181         mto = ldb.Message()
2182         mto.add(m["1"])
2183         mto.add(m["2"])
2184         self.assertEqual(mto.text["1"], m.text["1"])
2185         self.assertEqual(mto.text["2"], m.text["2"])
2186         self.assertEqual(mto["1"], m["1"])
2187         self.assertEqual(mto["2"], m["2"])
2188
2189
2190 class MessageElementTests(TestCase):
2191
2192     def test_cmp_element(self):
2193         x = ldb.MessageElement([b"foo"])
2194         y = ldb.MessageElement([b"foo"])
2195         z = ldb.MessageElement([b"bzr"])
2196         self.assertEqual(x, y)
2197         self.assertNotEqual(x, z)
2198
2199     def test_cmp_element_text(self):
2200         x = ldb.MessageElement([b"foo"])
2201         y = ldb.MessageElement(["foo"])
2202         self.assertEqual(x, y)
2203
2204     def test_create_iterable(self):
2205         x = ldb.MessageElement([b"foo"])
2206         self.assertEqual([b"foo"], list(x))
2207         self.assertEqual(["foo"], list(x.text))
2208
2209     def test_repr(self):
2210         x = ldb.MessageElement([b"foo"])
2211         if PY3:
2212             self.assertEqual("MessageElement([b'foo'])", repr(x))
2213             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2214         else:
2215             self.assertEqual("MessageElement(['foo'])", repr(x))
2216             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2217         x = ldb.MessageElement([b"foo", b"bla"])
2218         self.assertEqual(2, len(x))
2219         if PY3:
2220             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2221             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2222         else:
2223             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2224             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2225
2226     def test_get_item(self):
2227         x = ldb.MessageElement([b"foo", b"bar"])
2228         self.assertEqual(b"foo", x[0])
2229         self.assertEqual(b"bar", x[1])
2230         self.assertEqual(b"bar", x[-1])
2231         self.assertRaises(IndexError, lambda: x[45])
2232
2233     def test_get_item_text(self):
2234         x = ldb.MessageElement(["foo", "bar"])
2235         self.assertEqual("foo", x.text[0])
2236         self.assertEqual("bar", x.text[1])
2237         self.assertEqual("bar", x.text[-1])
2238         self.assertRaises(IndexError, lambda: x[45])
2239
2240     def test_len(self):
2241         x = ldb.MessageElement([b"foo", b"bar"])
2242         self.assertEqual(2, len(x))
2243
2244     def test_eq(self):
2245         x = ldb.MessageElement([b"foo", b"bar"])
2246         y = ldb.MessageElement([b"foo", b"bar"])
2247         self.assertEqual(y, x)
2248         x = ldb.MessageElement([b"foo"])
2249         self.assertNotEqual(y, x)
2250         y = ldb.MessageElement([b"foo"])
2251         self.assertEqual(y, x)
2252
2253     def test_extended(self):
2254         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2255         if PY3:
2256             self.assertEqual("MessageElement([b'456'])", repr(el))
2257             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2258         else:
2259             self.assertEqual("MessageElement(['456'])", repr(el))
2260             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2261
2262     def test_bad_text(self):
2263         el = ldb.MessageElement(b'\xba\xdd')
2264         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2265
2266
2267 class ModuleTests(TestCase):
2268
2269     def setUp(self):
2270         super(ModuleTests, self).setUp()
2271         self.testdir = tempdir()
2272         self.filename = os.path.join(self.testdir, "test.ldb")
2273         self.ldb = ldb.Ldb(self.filename)
2274
2275     def tearDown(self):
2276         shutil.rmtree(self.testdir)
2277         super(ModuleTests, self).setUp()
2278
2279     def test_register_module(self):
2280         class ExampleModule:
2281             name = "example"
2282         ldb.register_module(ExampleModule)
2283
2284     def test_use_module(self):
2285         ops = []
2286         class ExampleModule:
2287             name = "bla"
2288
2289             def __init__(self, ldb, next):
2290                 ops.append("init")
2291                 self.next = next
2292
2293             def search(self, *args, **kwargs):
2294                 return self.next.search(*args, **kwargs)
2295
2296             def request(self, *args, **kwargs):
2297                 pass
2298
2299         ldb.register_module(ExampleModule)
2300         l = ldb.Ldb(self.filename)
2301         l.add({"dn": "@MODULES", "@LIST": "bla"})
2302         self.assertEqual([], ops)
2303         l = ldb.Ldb(self.filename)
2304         self.assertEqual(["init"], ops)
2305
2306 class LdbResultTests(LdbBaseTest):
2307
2308     def setUp(self):
2309         super(LdbResultTests, self).setUp()
2310         self.testdir = tempdir()
2311         self.filename = os.path.join(self.testdir, "test.ldb")
2312         self.l = ldb.Ldb(self.url(), flags=self.flags())
2313         try:
2314             self.l.add(self.index)
2315         except AttributeError:
2316             pass
2317         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2318                     "objectUUID": b"0123456789abcde0"})
2319         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2320                     "objectUUID": b"0123456789abcde1"})
2321         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2322                     "objectUUID": b"0123456789abcde2"})
2323         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2324                     "objectUUID": b"0123456789abcde3"})
2325         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2326                     "objectUUID": b"0123456789abcde4"})
2327         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2328                     "objectUUID": b"0123456789abcde5"})
2329         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2330                     "objectUUID": b"0123456789abcde6"})
2331         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2332                     "objectUUID": b"0123456789abcde7"})
2333         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2334                     "objectUUID": b"0123456789abcde8"})
2335         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2336                     "objectUUID": b"0123456789abcde9"})
2337         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2338                     "objectUUID": b"0123456789abcdea"})
2339         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2340                     "objectUUID": b"0123456789abcdeb"})
2341         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2342                     "objectUUID": b"0123456789abcdec"})
2343
2344     def tearDown(self):
2345         shutil.rmtree(self.testdir)
2346         super(LdbResultTests, self).tearDown()
2347         # Ensure the LDB is closed now, so we close the FD
2348         del(self.l)
2349
2350     def test_return_type(self):
2351         res = self.l.search()
2352         self.assertEqual(str(res), "<ldb result>")
2353
2354     def test_get_msgs(self):
2355         res = self.l.search()
2356         list = res.msgs
2357
2358     def test_get_controls(self):
2359         res = self.l.search()
2360         list = res.controls
2361
2362     def test_get_referals(self):
2363         res = self.l.search()
2364         list = res.referals
2365
2366     def test_iter_msgs(self):
2367         found = False
2368         for l in self.l.search().msgs:
2369             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2370                 found = True
2371         self.assertTrue(found)
2372
2373     def test_iter_msgs_count(self):
2374         self.assertTrue(self.l.search().count > 0)
2375         # 13 objects has been added to the DC=SAMBA, DC=ORG
2376         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2377
2378     def test_iter_controls(self):
2379         res = self.l.search().controls
2380         it = iter(res)
2381
2382     def test_create_control(self):
2383         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2384         c = ldb.Control(self.l, "relax:1")
2385         self.assertEqual(c.critical, True)
2386         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2387
2388     def test_iter_refs(self):
2389         res = self.l.search().referals
2390         it = iter(res)
2391
2392     def test_search_sequence_msgs(self):
2393         found = False
2394         res = self.l.search().msgs
2395
2396         for i in range(0, len(res)):
2397             l = res[i]
2398             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2399                 found = True
2400         self.assertTrue(found)
2401
2402     def test_search_as_iter(self):
2403         found = False
2404         res = self.l.search()
2405
2406         for l in res:
2407             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2408                 found = True
2409         self.assertTrue(found)
2410
2411     def test_search_iter(self):
2412         found = False
2413         res = self.l.search_iterator()
2414
2415         for l in res:
2416             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2417                 found = True
2418         self.assertTrue(found)
2419
2420
2421     # Show that search results can't see into a transaction
2422     def test_search_against_trans(self):
2423         found11 = False
2424
2425         (r1, w1) = os.pipe()
2426
2427         (r2, w2) = os.pipe()
2428
2429         # For the first element, fork a child that will
2430         # write to the DB
2431         pid = os.fork()
2432         if pid == 0:
2433             # In the child, re-open
2434             del(self.l)
2435             gc.collect()
2436
2437             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2438             # start a transaction
2439             child_ldb.transaction_start()
2440
2441             # write to it
2442             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2443                            "name": b"samba.org",
2444                            "objectUUID": b"o123456789acbdef"})
2445
2446             os.write(w1, b"added")
2447
2448             # Now wait for the search to be done
2449             os.read(r2, 6)
2450
2451             # and commit
2452             try:
2453                 child_ldb.transaction_commit()
2454             except LdbError as err:
2455                 # We print this here to see what went wrong in the child
2456                 print(err)
2457                 os._exit(1)
2458
2459             os.write(w1, b"transaction")
2460             os._exit(0)
2461
2462         self.assertEqual(os.read(r1, 5), b"added")
2463
2464         # This should not turn up until the transaction is concluded
2465         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2466                             scope=ldb.SCOPE_BASE)
2467         self.assertEqual(len(res11), 0)
2468
2469         os.write(w2, b"search")
2470
2471         # Now wait for the transaction to be done.  This should
2472         # deadlock, but the search doesn't hold a read lock for the
2473         # iterator lifetime currently.
2474         self.assertEqual(os.read(r1, 11), b"transaction")
2475
2476         # This should now turn up, as the transaction is over
2477         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2478                             scope=ldb.SCOPE_BASE)
2479         self.assertEqual(len(res11), 1)
2480
2481         self.assertFalse(found11)
2482
2483         (got_pid, status) = os.waitpid(pid, 0)
2484         self.assertEqual(got_pid, pid)
2485
2486
2487     def test_search_iter_against_trans(self):
2488         found = False
2489         found11 = False
2490
2491         # We need to hold this iterator open to hold the all-record
2492         # lock
2493         res = self.l.search_iterator()
2494
2495         (r1, w1) = os.pipe()
2496
2497         (r2, w2) = os.pipe()
2498
2499         # For the first element, with the sequence open (which
2500         # means with ldb locks held), fork a child that will
2501         # write to the DB
2502         pid = os.fork()
2503         if pid == 0:
2504             # In the child, re-open
2505             del(res)
2506             del(self.l)
2507             gc.collect()
2508
2509             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2510             # start a transaction
2511             child_ldb.transaction_start()
2512
2513             # write to it
2514             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2515                            "name": b"samba.org",
2516                            "objectUUID": b"o123456789acbdef"})
2517
2518             os.write(w1, b"added")
2519
2520             # Now wait for the search to be done
2521             os.read(r2, 6)
2522
2523             # and commit
2524             try:
2525                 child_ldb.transaction_commit()
2526             except LdbError as err:
2527                 # We print this here to see what went wrong in the child
2528                 print(err)
2529                 os._exit(1)
2530
2531             os.write(w1, b"transaction")
2532             os._exit(0)
2533
2534         self.assertEqual(os.read(r1, 5), b"added")
2535
2536         # This should not turn up until the transaction is concluded
2537         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2538                             scope=ldb.SCOPE_BASE)
2539         self.assertEqual(len(res11), 0)
2540
2541         os.write(w2, b"search")
2542
2543         # allow the transaction to start
2544         time.sleep(1)
2545
2546         # This should not turn up until the search finishes and
2547         # removed the read lock, but for ldb_tdb that happened as soon
2548         # as we called the first res.next()
2549         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2550                             scope=ldb.SCOPE_BASE)
2551         self.assertEqual(len(res11), 0)
2552
2553         # These results are all collected at the first next(res) call
2554         for l in res:
2555             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2556                 found = True
2557             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2558                 found11 = True
2559
2560         # Now wait for the transaction to be done.
2561         self.assertEqual(os.read(r1, 11), b"transaction")
2562
2563         # This should now turn up, as the transaction is over and all
2564         # read locks are gone
2565         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2566                             scope=ldb.SCOPE_BASE)
2567         self.assertEqual(len(res11), 1)
2568
2569         self.assertTrue(found)
2570         self.assertFalse(found11)
2571
2572         (got_pid, status) = os.waitpid(pid, 0)
2573         self.assertEqual(got_pid, pid)
2574
2575
2576 class LdbResultTestsLmdb(LdbResultTests):
2577
2578     def setUp(self):
2579         self.prefix = MDB_PREFIX
2580         super(LdbResultTestsLmdb, self).setUp()
2581
2582     def tearDown(self):
2583         super(LdbResultTestsLmdb, self).tearDown()
2584
2585
2586 class BadTypeTests(TestCase):
2587     def test_control(self):
2588         l = ldb.Ldb()
2589         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2590         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2591
2592     def test_modify(self):
2593         l = ldb.Ldb()
2594         dn = ldb.Dn(l, 'a=b')
2595         m = ldb.Message(dn)
2596         self.assertRaises(TypeError, l.modify, '<bad type>')
2597         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2598
2599     def test_add(self):
2600         l = ldb.Ldb()
2601         dn = ldb.Dn(l, 'a=b')
2602         m = ldb.Message(dn)
2603         self.assertRaises(TypeError, l.add, '<bad type>')
2604         self.assertRaises(TypeError, l.add, m, '<bad type>')
2605
2606     def test_delete(self):
2607         l = ldb.Ldb()
2608         dn = ldb.Dn(l, 'a=b')
2609         self.assertRaises(TypeError, l.add, '<bad type>')
2610         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2611
2612     def test_rename(self):
2613         l = ldb.Ldb()
2614         dn = ldb.Dn(l, 'a=b')
2615         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2616         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2617         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2618
2619     def test_search(self):
2620         l = ldb.Ldb()
2621         self.assertRaises(TypeError, l.search, base=1234)
2622         self.assertRaises(TypeError, l.search, scope='<bad type>')
2623         self.assertRaises(TypeError, l.search, expression=1234)
2624         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2625         self.assertRaises(TypeError, l.search, controls='<bad type>')
2626
2627
2628 class VersionTests(TestCase):
2629
2630     def test_version(self):
2631         self.assertTrue(isinstance(ldb.__version__, str))
2632
2633
2634 if __name__ == '__main__':
2635     import unittest
2636     unittest.TestProgram()