SQ??? ldb_mdb/tests: Run api and index test also on lmdb WHY not the same dict for...
[metze/samba/wip.git] / lib / ldb / tests / python / api.py
1 #!/usr/bin/env python
2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
4
5 import os
6 from unittest import TestCase
7 import sys
8 import gc
9 import time
10 import ldb
11 import shutil
12
13 PY3 = sys.version_info > (3, 0)
14
15 TDB_PREFIX = "tdb://"
16 MDB_PREFIX = "mdb://"
17
18
19 def tempdir():
20     import tempfile
21     try:
22         dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
23     except KeyError:
24         dir_prefix = None
25     return tempfile.mkdtemp(dir=dir_prefix)
26
27
28 class NoContextTests(TestCase):
29
30     def test_valid_attr_name(self):
31         self.assertTrue(ldb.valid_attr_name("foo"))
32         self.assertFalse(ldb.valid_attr_name("24foo"))
33
34     def test_timestring(self):
35         self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36         self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
37
38     def test_string_to_time(self):
39         self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40         self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
41
42     def test_binary_encode(self):
43         encoded = ldb.binary_encode(b'test\\x')
44         decoded = ldb.binary_decode(encoded)
45         self.assertEqual(decoded, b'test\\x')
46
47         encoded2 = ldb.binary_encode('test\\x')
48         self.assertEqual(encoded2, encoded)
49
50
51 class LdbBaseTest(TestCase):
52     def setUp(self):
53         super(LdbBaseTest, self).setUp()
54         try:
55             if self.prefix is None:
56                 self.prefix = TDB_PREFIX
57         except AttributeError:
58             self.prefix = TDB_PREFIX
59
60     def tearDown(self):
61         super(LdbBaseTest, self).tearDown()
62
63     def url(self):
64         return self.prefix + self.filename
65
66     def flags(self):
67         if self.prefix == MDB_PREFIX:
68             return ldb.FLG_NOSYNC
69         else:
70             return 0
71
72
73 class SimpleLdb(LdbBaseTest):
74
75     def setUp(self):
76         super(SimpleLdb, self).setUp()
77         self.testdir = tempdir()
78         self.filename = os.path.join(self.testdir, "test.ldb")
79         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
80         try:
81             self.ldb.add(self.index)
82         except AttributeError:
83             pass
84
85     def tearDown(self):
86         shutil.rmtree(self.testdir)
87         super(SimpleLdb, self).tearDown()
88         # Ensure the LDB is closed now, so we close the FD
89         del(self.ldb)
90
91     def test_connect(self):
92         ldb.Ldb(self.url(), flags=self.flags())
93
94     def test_connect_none(self):
95         ldb.Ldb()
96
97     def test_connect_later(self):
98         x = ldb.Ldb()
99         x.connect(self.url(), flags=self.flags())
100
101     def test_repr(self):
102         x = ldb.Ldb()
103         self.assertTrue(repr(x).startswith("<ldb connection"))
104
105     def test_set_create_perms(self):
106         x = ldb.Ldb()
107         x.set_create_perms(0o600)
108
109     def test_modules_none(self):
110         x = ldb.Ldb()
111         self.assertEqual([], x.modules())
112
113     def test_modules_tdb(self):
114         x = ldb.Ldb(self.url(), flags=self.flags())
115         self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
116
117     def test_firstmodule_none(self):
118         x = ldb.Ldb()
119         self.assertEqual(x.firstmodule, None)
120
121     def test_firstmodule_tdb(self):
122         x = ldb.Ldb(self.url(), flags=self.flags())
123         mod = x.firstmodule
124         self.assertEqual(repr(mod), "<ldb module 'tdb'>")
125
126     def test_search(self):
127         l = ldb.Ldb(self.url(), flags=self.flags())
128         self.assertEqual(len(l.search()), 0)
129
130     def test_search_controls(self):
131         l = ldb.Ldb(self.url(), flags=self.flags())
132         self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
133
134     def test_search_attrs(self):
135         l = ldb.Ldb(self.url(), flags=self.flags())
136         self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
137
138     def test_search_string_dn(self):
139         l = ldb.Ldb(self.url(), flags=self.flags())
140         self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
141
142     def test_search_attr_string(self):
143         l = ldb.Ldb(self.url(), flags=self.flags())
144         self.assertRaises(TypeError, l.search, attrs="dc")
145         self.assertRaises(TypeError, l.search, attrs=b"dc")
146
147     def test_opaque(self):
148         l = ldb.Ldb(self.url(), flags=self.flags())
149         l.set_opaque("my_opaque", l)
150         self.assertTrue(l.get_opaque("my_opaque") is not None)
151         self.assertEqual(None, l.get_opaque("unknown"))
152
153     def test_search_scope_base_empty_db(self):
154         l = ldb.Ldb(self.url(), flags=self.flags())
155         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
156                           ldb.SCOPE_BASE)), 0)
157
158     def test_search_scope_onelevel_empty_db(self):
159         l = ldb.Ldb(self.url(), flags=self.flags())
160         self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
161                           ldb.SCOPE_ONELEVEL)), 0)
162
163     def test_delete(self):
164         l = ldb.Ldb(self.url(), flags=self.flags())
165         self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
166
167     def test_delete_w_unhandled_ctrl(self):
168         l = ldb.Ldb(self.url(), flags=self.flags())
169         m = ldb.Message()
170         m.dn = ldb.Dn(l, "dc=foo1")
171         m["b"] = [b"a"]
172         m["objectUUID"] = b"0123456789abcdef"
173         l.add(m)
174         self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
175         l.delete(m.dn)
176
177     def test_contains(self):
178         name = self.url()
179         l = ldb.Ldb(name, flags=self.flags())
180         self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
181         l = ldb.Ldb(name, flags=self.flags())
182         m = ldb.Message()
183         m.dn = ldb.Dn(l, "dc=foo3")
184         m["b"] = ["a"]
185         m["objectUUID"] = b"0123456789abcdef"
186         l.add(m)
187         try:
188             self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
189             self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
190         finally:
191             l.delete(m.dn)
192
193     def test_get_config_basedn(self):
194         l = ldb.Ldb(self.url(), flags=self.flags())
195         self.assertEqual(None, l.get_config_basedn())
196
197     def test_get_root_basedn(self):
198         l = ldb.Ldb(self.url(), flags=self.flags())
199         self.assertEqual(None, l.get_root_basedn())
200
201     def test_get_schema_basedn(self):
202         l = ldb.Ldb(self.url(), flags=self.flags())
203         self.assertEqual(None, l.get_schema_basedn())
204
205     def test_get_default_basedn(self):
206         l = ldb.Ldb(self.url(), flags=self.flags())
207         self.assertEqual(None, l.get_default_basedn())
208
209     def test_add(self):
210         l = ldb.Ldb(self.url(), flags=self.flags())
211         m = ldb.Message()
212         m.dn = ldb.Dn(l, "dc=foo4")
213         m["bla"] = b"bla"
214         m["objectUUID"] = b"0123456789abcdef"
215         self.assertEqual(len(l.search()), 0)
216         l.add(m)
217         try:
218             self.assertEqual(len(l.search()), 1)
219         finally:
220             l.delete(ldb.Dn(l, "dc=foo4"))
221
222     def test_search_iterator(self):
223         l = ldb.Ldb(self.url(), flags=self.flags())
224         s = l.search_iterator()
225         s.abandon()
226         try:
227             for me in s:
228                 self.fail()
229             self.fail()
230         except RuntimeError as re:
231             pass
232         try:
233             s.abandon()
234             self.fail()
235         except RuntimeError as re:
236             pass
237         try:
238             s.result()
239             self.fail()
240         except RuntimeError as re:
241             pass
242
243         s = l.search_iterator()
244         count = 0
245         for me in s:
246             self.assertTrue(isinstance(me, ldb.Message))
247             count += 1
248         r = s.result()
249         self.assertEqual(len(r), 0)
250         self.assertEqual(count, 0)
251
252         m1 = ldb.Message()
253         m1.dn = ldb.Dn(l, "dc=foo4")
254         m1["bla"] = b"bla"
255         m1["objectUUID"] = b"0123456789abcdef"
256         l.add(m1)
257         try:
258             s = l.search_iterator()
259             msgs = []
260             for me in s:
261                 self.assertTrue(isinstance(me, ldb.Message))
262                 count += 1
263                 msgs.append(me)
264             r = s.result()
265             self.assertEqual(len(r), 0)
266             self.assertEqual(len(msgs), 1)
267             self.assertEqual(msgs[0].dn, m1.dn)
268
269             m2 = ldb.Message()
270             m2.dn = ldb.Dn(l, "dc=foo5")
271             m2["bla"] = b"bla"
272             m2["objectUUID"] = b"0123456789abcdee"
273             l.add(m2)
274
275             s = l.search_iterator()
276             msgs = []
277             for me in s:
278                 self.assertTrue(isinstance(me, ldb.Message))
279                 count += 1
280                 msgs.append(me)
281             r = s.result()
282             self.assertEqual(len(r), 0)
283             self.assertEqual(len(msgs), 2)
284             if msgs[0].dn == m1.dn:
285                 self.assertEqual(msgs[0].dn, m1.dn)
286                 self.assertEqual(msgs[1].dn, m2.dn)
287             else:
288                 self.assertEqual(msgs[0].dn, m2.dn)
289                 self.assertEqual(msgs[1].dn, m1.dn)
290
291             s = l.search_iterator()
292             msgs = []
293             for me in s:
294                 self.assertTrue(isinstance(me, ldb.Message))
295                 count += 1
296                 msgs.append(me)
297                 break
298             try:
299                 s.result()
300                 self.fail()
301             except RuntimeError as re:
302                 pass
303             for me in s:
304                 self.assertTrue(isinstance(me, ldb.Message))
305                 count += 1
306                 msgs.append(me)
307                 break
308             for me in s:
309                 self.fail()
310
311             r = s.result()
312             self.assertEqual(len(r), 0)
313             self.assertEqual(len(msgs), 2)
314             if msgs[0].dn == m1.dn:
315                 self.assertEqual(msgs[0].dn, m1.dn)
316                 self.assertEqual(msgs[1].dn, m2.dn)
317             else:
318                 self.assertEqual(msgs[0].dn, m2.dn)
319                 self.assertEqual(msgs[1].dn, m1.dn)
320         finally:
321             l.delete(ldb.Dn(l, "dc=foo4"))
322             l.delete(ldb.Dn(l, "dc=foo5"))
323
324     def test_add_text(self):
325         l = ldb.Ldb(self.url(), flags=self.flags())
326         m = ldb.Message()
327         m.dn = ldb.Dn(l, "dc=foo4")
328         m["bla"] = "bla"
329         m["objectUUID"] = b"0123456789abcdef"
330         self.assertEqual(len(l.search()), 0)
331         l.add(m)
332         try:
333             self.assertEqual(len(l.search()), 1)
334         finally:
335             l.delete(ldb.Dn(l, "dc=foo4"))
336
337     def test_add_w_unhandled_ctrl(self):
338         l = ldb.Ldb(self.url(), flags=self.flags())
339         m = ldb.Message()
340         m.dn = ldb.Dn(l, "dc=foo4")
341         m["bla"] = b"bla"
342         self.assertEqual(len(l.search()), 0)
343         self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
344
345     def test_add_dict(self):
346         l = ldb.Ldb(self.url(), flags=self.flags())
347         m = {"dn": ldb.Dn(l, "dc=foo5"),
348              "bla": b"bla",
349              "objectUUID": b"0123456789abcdef"}
350         self.assertEqual(len(l.search()), 0)
351         l.add(m)
352         try:
353             self.assertEqual(len(l.search()), 1)
354         finally:
355             l.delete(ldb.Dn(l, "dc=foo5"))
356
357     def test_add_dict_text(self):
358         l = ldb.Ldb(self.url(), flags=self.flags())
359         m = {"dn": ldb.Dn(l, "dc=foo5"),
360              "bla": "bla",
361              "objectUUID": b"0123456789abcdef"}
362         self.assertEqual(len(l.search()), 0)
363         l.add(m)
364         try:
365             self.assertEqual(len(l.search()), 1)
366         finally:
367             l.delete(ldb.Dn(l, "dc=foo5"))
368
369     def test_add_dict_string_dn(self):
370         l = ldb.Ldb(self.url(), flags=self.flags())
371         m = {"dn": "dc=foo6", "bla": b"bla",
372              "objectUUID": b"0123456789abcdef"}
373         self.assertEqual(len(l.search()), 0)
374         l.add(m)
375         try:
376             self.assertEqual(len(l.search()), 1)
377         finally:
378             l.delete(ldb.Dn(l, "dc=foo6"))
379
380     def test_add_dict_bytes_dn(self):
381         l = ldb.Ldb(self.url(), flags=self.flags())
382         m = {"dn": b"dc=foo6", "bla": b"bla",
383               "objectUUID": b"0123456789abcdef"}
384         self.assertEqual(len(l.search()), 0)
385         l.add(m)
386         try:
387             self.assertEqual(len(l.search()), 1)
388         finally:
389             l.delete(ldb.Dn(l, "dc=foo6"))
390
391     def test_rename(self):
392         l = ldb.Ldb(self.url(), flags=self.flags())
393         m = ldb.Message()
394         m.dn = ldb.Dn(l, "dc=foo7")
395         m["bla"] = b"bla"
396         m["objectUUID"] = b"0123456789abcdef"
397         self.assertEqual(len(l.search()), 0)
398         l.add(m)
399         try:
400             l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
401             self.assertEqual(len(l.search()), 1)
402         finally:
403             l.delete(ldb.Dn(l, "dc=bar"))
404
405     def test_rename_string_dns(self):
406         l = ldb.Ldb(self.url(), flags=self.flags())
407         m = ldb.Message()
408         m.dn = ldb.Dn(l, "dc=foo8")
409         m["bla"] = b"bla"
410         m["objectUUID"] = b"0123456789abcdef"
411         self.assertEqual(len(l.search()), 0)
412         l.add(m)
413         self.assertEqual(len(l.search()), 1)
414         try:
415             l.rename("dc=foo8", "dc=bar")
416             self.assertEqual(len(l.search()), 1)
417         finally:
418             l.delete(ldb.Dn(l, "dc=bar"))
419
420     def test_empty_dn(self):
421         l = ldb.Ldb(self.url(), flags=self.flags())
422         self.assertEqual(0, len(l.search()))
423         m = ldb.Message()
424         m.dn = ldb.Dn(l, "dc=empty")
425         m["objectUUID"] = b"0123456789abcdef"
426         l.add(m)
427         rm = l.search()
428         self.assertEqual(1, len(rm))
429         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
430                          set(rm[0].keys()))
431
432         rm = l.search(m.dn)
433         self.assertEqual(1, len(rm))
434         self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
435                          set(rm[0].keys()))
436         rm = l.search(m.dn, attrs=["blah"])
437         self.assertEqual(1, len(rm))
438         self.assertEqual(0, len(rm[0]))
439
440     def test_modify_delete(self):
441         l = ldb.Ldb(self.url(), flags=self.flags())
442         m = ldb.Message()
443         m.dn = ldb.Dn(l, "dc=modifydelete")
444         m["bla"] = [b"1234"]
445         m["objectUUID"] = b"0123456789abcdef"
446         l.add(m)
447         rm = l.search(m.dn)[0]
448         self.assertEqual([b"1234"], list(rm["bla"]))
449         try:
450             m = ldb.Message()
451             m.dn = ldb.Dn(l, "dc=modifydelete")
452             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
453             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
454             l.modify(m)
455             rm = l.search(m.dn)
456             self.assertEqual(1, len(rm))
457             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
458                              set(rm[0].keys()))
459             rm = l.search(m.dn, attrs=["bla"])
460             self.assertEqual(1, len(rm))
461             self.assertEqual(0, len(rm[0]))
462         finally:
463             l.delete(ldb.Dn(l, "dc=modifydelete"))
464
465     def test_modify_delete_text(self):
466         l = ldb.Ldb(self.url(), flags=self.flags())
467         m = ldb.Message()
468         m.dn = ldb.Dn(l, "dc=modifydelete")
469         m.text["bla"] = ["1234"]
470         m["objectUUID"] = b"0123456789abcdef"
471         l.add(m)
472         rm = l.search(m.dn)[0]
473         self.assertEqual(["1234"], list(rm.text["bla"]))
474         try:
475             m = ldb.Message()
476             m.dn = ldb.Dn(l, "dc=modifydelete")
477             m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
478             self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
479             l.modify(m)
480             rm = l.search(m.dn)
481             self.assertEqual(1, len(rm))
482             self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
483                              set(rm[0].keys()))
484             rm = l.search(m.dn, attrs=["bla"])
485             self.assertEqual(1, len(rm))
486             self.assertEqual(0, len(rm[0]))
487         finally:
488             l.delete(ldb.Dn(l, "dc=modifydelete"))
489
490     def test_modify_add(self):
491         l = ldb.Ldb(self.url(), flags=self.flags())
492         m = ldb.Message()
493         m.dn = ldb.Dn(l, "dc=add")
494         m["bla"] = [b"1234"]
495         m["objectUUID"] = b"0123456789abcdef"
496         l.add(m)
497         try:
498             m = ldb.Message()
499             m.dn = ldb.Dn(l, "dc=add")
500             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
501             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
502             l.modify(m)
503             rm = l.search(m.dn)[0]
504             self.assertEqual(3, len(rm))
505             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
506         finally:
507             l.delete(ldb.Dn(l, "dc=add"))
508
509     def test_modify_add_text(self):
510         l = ldb.Ldb(self.url(), flags=self.flags())
511         m = ldb.Message()
512         m.dn = ldb.Dn(l, "dc=add")
513         m.text["bla"] = ["1234"]
514         m["objectUUID"] = b"0123456789abcdef"
515         l.add(m)
516         try:
517             m = ldb.Message()
518             m.dn = ldb.Dn(l, "dc=add")
519             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
520             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
521             l.modify(m)
522             rm = l.search(m.dn)[0]
523             self.assertEqual(3, len(rm))
524             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
525         finally:
526             l.delete(ldb.Dn(l, "dc=add"))
527
528     def test_modify_replace(self):
529         l = ldb.Ldb(self.url(), flags=self.flags())
530         m = ldb.Message()
531         m.dn = ldb.Dn(l, "dc=modify2")
532         m["bla"] = [b"1234", b"456"]
533         m["objectUUID"] = b"0123456789abcdef"
534         l.add(m)
535         try:
536             m = ldb.Message()
537             m.dn = ldb.Dn(l, "dc=modify2")
538             m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
539             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
540             l.modify(m)
541             rm = l.search(m.dn)[0]
542             self.assertEqual(3, len(rm))
543             self.assertEqual([b"789"], list(rm["bla"]))
544             rm = l.search(m.dn, attrs=["bla"])[0]
545             self.assertEqual(1, len(rm))
546         finally:
547             l.delete(ldb.Dn(l, "dc=modify2"))
548
549     def test_modify_replace_text(self):
550         l = ldb.Ldb(self.url(), flags=self.flags())
551         m = ldb.Message()
552         m.dn = ldb.Dn(l, "dc=modify2")
553         m.text["bla"] = ["1234", "456"]
554         m["objectUUID"] = b"0123456789abcdef"
555         l.add(m)
556         try:
557             m = ldb.Message()
558             m.dn = ldb.Dn(l, "dc=modify2")
559             m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
560             self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
561             l.modify(m)
562             rm = l.search(m.dn)[0]
563             self.assertEqual(3, len(rm))
564             self.assertEqual(["789"], list(rm.text["bla"]))
565             rm = l.search(m.dn, attrs=["bla"])[0]
566             self.assertEqual(1, len(rm))
567         finally:
568             l.delete(ldb.Dn(l, "dc=modify2"))
569
570     def test_modify_flags_change(self):
571         l = ldb.Ldb(self.url(), flags=self.flags())
572         m = ldb.Message()
573         m.dn = ldb.Dn(l, "dc=add")
574         m["bla"] = [b"1234"]
575         m["objectUUID"] = b"0123456789abcdef"
576         l.add(m)
577         try:
578             m = ldb.Message()
579             m.dn = ldb.Dn(l, "dc=add")
580             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
581             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
582             l.modify(m)
583             rm = l.search(m.dn)[0]
584             self.assertEqual(3, len(rm))
585             self.assertEqual([b"1234", b"456"], list(rm["bla"]))
586
587             # Now create another modify, but switch the flags before we do it
588             m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
589             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
590             l.modify(m)
591             rm = l.search(m.dn, attrs=["bla"])[0]
592             self.assertEqual(1, len(rm))
593             self.assertEqual([b"1234"], list(rm["bla"]))
594         finally:
595             l.delete(ldb.Dn(l, "dc=add"))
596
597     def test_modify_flags_change_text(self):
598         l = ldb.Ldb(self.url(), flags=self.flags())
599         m = ldb.Message()
600         m.dn = ldb.Dn(l, "dc=add")
601         m.text["bla"] = ["1234"]
602         m["objectUUID"] = b"0123456789abcdef"
603         l.add(m)
604         try:
605             m = ldb.Message()
606             m.dn = ldb.Dn(l, "dc=add")
607             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
608             self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
609             l.modify(m)
610             rm = l.search(m.dn)[0]
611             self.assertEqual(3, len(rm))
612             self.assertEqual(["1234", "456"], list(rm.text["bla"]))
613
614             # Now create another modify, but switch the flags before we do it
615             m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
616             m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
617             l.modify(m)
618             rm = l.search(m.dn, attrs=["bla"])[0]
619             self.assertEqual(1, len(rm))
620             self.assertEqual(["1234"], list(rm.text["bla"]))
621         finally:
622             l.delete(ldb.Dn(l, "dc=add"))
623
624     def test_transaction_commit(self):
625         l = ldb.Ldb(self.url(), flags=self.flags())
626         l.transaction_start()
627         m = ldb.Message(ldb.Dn(l, "dc=foo9"))
628         m["foo"] = [b"bar"]
629         m["objectUUID"] = b"0123456789abcdef"
630         l.add(m)
631         l.transaction_commit()
632         l.delete(m.dn)
633
634     def test_transaction_cancel(self):
635         l = ldb.Ldb(self.url(), flags=self.flags())
636         l.transaction_start()
637         m = ldb.Message(ldb.Dn(l, "dc=foo10"))
638         m["foo"] = [b"bar"]
639         m["objectUUID"] = b"0123456789abcdee"
640         l.add(m)
641         l.transaction_cancel()
642         self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
643
644     def test_set_debug(self):
645         def my_report_fn(level, text):
646             pass
647         l = ldb.Ldb(self.url(), flags=self.flags())
648         l.set_debug(my_report_fn)
649
650     def test_zero_byte_string(self):
651         """Testing we do not get trapped in the \0 byte in a property string."""
652         l = ldb.Ldb(self.url(), flags=self.flags())
653         l.add({
654             "dn" : b"dc=somedn",
655             "objectclass" : b"user",
656             "cN" : b"LDAPtestUSER",
657             "givenname" : b"ldap",
658             "displayname" : b"foo\0bar",
659             "objectUUID" : b"0123456789abcdef"
660         })
661         res = l.search(expression="(dn=dc=somedn)")
662         self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
663
664     def test_no_crash_broken_expr(self):
665         l = ldb.Ldb(self.url(), flags=self.flags())
666         self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
667
668 # Run the SimpleLdb tests against an lmdb backend
669 class SimpleLdbLmdb(SimpleLdb):
670
671     def setUp(self):
672         self.prefix = MDB_PREFIX
673         self.index = {"dn": "@INDEXLIST",
674                       "@IDXONE": [b"1"],
675                       "@IDXGUID": [b"objectUUID"],
676                       "@IDX_DN_GUID": [b"GUID"]}
677         super(SimpleLdbLmdb, self).setUp()
678
679     def tearDown(self):
680         super(SimpleLdbLmdb, self).tearDown()
681
682 class SearchTests(LdbBaseTest):
683     def tearDown(self):
684         shutil.rmtree(self.testdir)
685         super(SearchTests, self).tearDown()
686
687         # Ensure the LDB is closed now, so we close the FD
688         del(self.l)
689
690
691     def setUp(self):
692         super(SearchTests, self).setUp()
693         self.testdir = tempdir()
694         self.filename = os.path.join(self.testdir, "search_test.ldb")
695         self.l = ldb.Ldb(self.url(),
696                          flags=self.flags(),
697                          options=["modules:rdn_name"])
698         try:
699             self.l.add(self.index)
700         except AttributeError:
701             pass
702
703         self.l.add({"dn": "@ATTRIBUTES",
704                     "DC": "CASE_INSENSITIVE"})
705
706         # Note that we can't use the name objectGUID here, as we
707         # want to stay clear of the objectGUID handler in LDB and
708         # instead use just the 16 bytes raw, which we just keep
709         # to printable chars here for ease of handling.
710
711         self.l.add({"dn": "DC=SAMBA,DC=ORG",
712                     "name": b"samba.org",
713                     "objectUUID": b"0123456789abcdef"})
714         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
715                     "name": b"Admins",
716                     "x": "z", "y": "a",
717                     "objectUUID": b"0123456789abcde1"})
718         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
719                     "name": b"Users",
720                     "x": "z", "y": "a",
721                     "objectUUID": b"0123456789abcde2"})
722         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
723                     "name": b"OU #1",
724                     "x": "y", "y": "a",
725                     "objectUUID": b"0123456789abcde3"})
726         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
727                     "name": b"OU #2",
728                     "x": "y", "y": "a",
729                     "objectUUID": b"0123456789abcde4"})
730         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
731                     "name": b"OU #3",
732                     "x": "y", "y": "a",
733                     "objectUUID": b"0123456789abcde5"})
734         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
735                     "name": b"OU #4",
736                     "x": "y", "y": "a",
737                     "objectUUID": b"0123456789abcde6"})
738         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
739                     "name": b"OU #5",
740                     "x": "y", "y": "a",
741                     "objectUUID": b"0123456789abcde7"})
742         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
743                     "name": b"OU #6",
744                     "x": "y", "y": "a",
745                     "objectUUID": b"0123456789abcde8"})
746         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
747                     "name": b"OU #7",
748                     "x": "y", "y": "a",
749                     "objectUUID": b"0123456789abcde9"})
750         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
751                     "name": b"OU #8",
752                     "x": "y", "y": "a",
753                     "objectUUID": b"0123456789abcde0"})
754         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
755                     "name": b"OU #9",
756                     "x": "y", "y": "a",
757                     "objectUUID": b"0123456789abcdea"})
758         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
759                     "name": b"OU #10",
760                     "x": "y", "y": "a",
761                     "objectUUID": b"0123456789abcdeb"})
762         self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
763                     "name": b"OU #10",
764                     "x": "y", "y": "a",
765                     "objectUUID": b"0123456789abcdec"})
766         self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
767                     "name": b"OU #10",
768                     "x": "y", "y": "b",
769                     "objectUUID": b"0123456789abcded"})
770         self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
771                     "name": b"OU #10",
772                     "x": "x", "y": "b",
773                     "objectUUID": b"0123456789abcdee"})
774         self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
775                     "name": b"OU #10",
776                     "x": "x", "y": "b",
777                     "objectUUID": b"0123456789abcd01"})
778         self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
779                     "name": b"OU #10",
780                     "x": "x", "y": "b",
781                     "objectUUID": b"0123456789abcd02"})
782         self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
783                     "name": b"OU #10",
784                     "x": "x", "y": "b",
785                     "objectUUID": b"0123456789abcd03"})
786         self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
787                     "name": b"OU #10",
788                     "x": "x", "y": "b",
789                     "objectUUID": b"0123456789abcd04"})
790         self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
791                     "name": b"OU #10",
792                     "x": "x", "y": "b",
793                     "objectUUID": b"0123456789abcd05"})
794         self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
795                     "name": b"OU #10",
796                     "x": "x", "y": "b",
797                     "objectUUID": b"0123456789abcd06"})
798         self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
799                     "name": b"OU #10",
800                     "x": "x", "y": "b",
801                     "objectUUID": b"0123456789abcd07"})
802         self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
803                     "name": b"OU #10",
804                     "x": "x", "y": "c",
805                     "objectUUID": b"0123456789abcd08"})
806         self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
807                     "name": b"OU #10",
808                     "x": "x", "y": "c",
809                     "objectUUID": b"0123456789abcd09"})
810
811     def test_base(self):
812         """Testing a search"""
813
814         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
815                               scope=ldb.SCOPE_BASE)
816         self.assertEqual(len(res11), 1)
817
818     def test_base_lower(self):
819         """Testing a search"""
820
821         res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
822                               scope=ldb.SCOPE_BASE)
823         self.assertEqual(len(res11), 1)
824
825     def test_base_or(self):
826         """Testing a search"""
827
828         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
829                               scope=ldb.SCOPE_BASE,
830                               expression="(|(ou=ou11)(ou=ou12))")
831         self.assertEqual(len(res11), 1)
832
833     def test_base_or2(self):
834         """Testing a search"""
835
836         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
837                               scope=ldb.SCOPE_BASE,
838                               expression="(|(x=y)(y=b))")
839         self.assertEqual(len(res11), 1)
840
841     def test_base_and(self):
842         """Testing a search"""
843
844         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
845                               scope=ldb.SCOPE_BASE,
846                               expression="(&(ou=ou11)(ou=ou12))")
847         self.assertEqual(len(res11), 0)
848
849     def test_base_and2(self):
850         """Testing a search"""
851
852         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
853                               scope=ldb.SCOPE_BASE,
854                               expression="(&(x=y)(y=a))")
855         self.assertEqual(len(res11), 1)
856
857     def test_base_false(self):
858         """Testing a search"""
859
860         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
861                               scope=ldb.SCOPE_BASE,
862                               expression="(|(ou=ou13)(ou=ou12))")
863         self.assertEqual(len(res11), 0)
864
865     def test_check_base_false(self):
866         """Testing a search"""
867         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
868                               scope=ldb.SCOPE_BASE,
869                               expression="(|(ou=ou13)(ou=ou12))")
870         self.assertEqual(len(res11), 0)
871
872     def test_check_base_error(self):
873         """Testing a search"""
874         checkbaseonsearch = {"dn": "@OPTIONS",
875                              "checkBaseOnSearch": b"TRUE"}
876         try:
877             self.l.add(checkbaseonsearch)
878         except ldb.LdbError as err:
879             enum = err.args[0]
880             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
881             m = ldb.Message.from_dict(self.l,
882                                       checkbaseonsearch)
883             self.l.modify(m)
884
885         try:
886             res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
887                                   scope=ldb.SCOPE_BASE,
888                                   expression="(|(ou=ou13)(ou=ou12))")
889             self.fail("Should have failed on missing base")
890         except ldb.LdbError as err:
891             enum = err.args[0]
892             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
893
894     def test_subtree_and(self):
895         """Testing a search"""
896
897         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
898                               scope=ldb.SCOPE_SUBTREE,
899                               expression="(&(ou=ou11)(ou=ou12))")
900         self.assertEqual(len(res11), 0)
901
902     def test_subtree_and2(self):
903         """Testing a search"""
904
905         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
906                               scope=ldb.SCOPE_SUBTREE,
907                               expression="(&(x=y)(|(y=b)(y=c)))")
908         self.assertEqual(len(res11), 1)
909
910     def test_subtree_and2_lower(self):
911         """Testing a search"""
912
913         res11 = self.l.search(base="DC=samba,DC=org",
914                               scope=ldb.SCOPE_SUBTREE,
915                               expression="(&(x=y)(|(y=b)(y=c)))")
916         self.assertEqual(len(res11), 1)
917
918     def test_subtree_or(self):
919         """Testing a search"""
920
921         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
922                               scope=ldb.SCOPE_SUBTREE,
923                               expression="(|(ou=ou11)(ou=ou12))")
924         self.assertEqual(len(res11), 2)
925
926     def test_subtree_or2(self):
927         """Testing a search"""
928
929         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
930                               scope=ldb.SCOPE_SUBTREE,
931                               expression="(|(x=y)(y=b))")
932         self.assertEqual(len(res11), 20)
933
934     def test_subtree_or3(self):
935         """Testing a search"""
936
937         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
938                               scope=ldb.SCOPE_SUBTREE,
939                               expression="(|(x=y)(y=b)(y=c))")
940         self.assertEqual(len(res11), 22)
941
942     def test_one_and(self):
943         """Testing a search"""
944
945         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
946                               scope=ldb.SCOPE_ONELEVEL,
947                               expression="(&(ou=ou11)(ou=ou12))")
948         self.assertEqual(len(res11), 0)
949
950     def test_one_and2(self):
951         """Testing a search"""
952
953         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
954                               scope=ldb.SCOPE_ONELEVEL,
955                               expression="(&(x=y)(y=b))")
956         self.assertEqual(len(res11), 1)
957
958     def test_one_or(self):
959         """Testing a search"""
960
961         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
962                               scope=ldb.SCOPE_ONELEVEL,
963                               expression="(|(ou=ou11)(ou=ou12))")
964         self.assertEqual(len(res11), 2)
965
966     def test_one_or2(self):
967         """Testing a search"""
968
969         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
970                               scope=ldb.SCOPE_ONELEVEL,
971                               expression="(|(x=y)(y=b))")
972         self.assertEqual(len(res11), 20)
973
974     def test_one_or2_lower(self):
975         """Testing a search"""
976
977         res11 = self.l.search(base="DC=samba,DC=org",
978                               scope=ldb.SCOPE_ONELEVEL,
979                               expression="(|(x=y)(y=b))")
980         self.assertEqual(len(res11), 20)
981
982     def test_subtree_and_or(self):
983         """Testing a search"""
984
985         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
986                               scope=ldb.SCOPE_SUBTREE,
987                               expression="(&(|(x=z)(y=b))(x=x)(y=c))")
988         self.assertEqual(len(res11), 0)
989
990     def test_subtree_and_or2(self):
991         """Testing a search"""
992
993         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
994                               scope=ldb.SCOPE_SUBTREE,
995                               expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
996         self.assertEqual(len(res11), 0)
997
998     def test_subtree_and_or3(self):
999         """Testing a search"""
1000
1001         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1002                               scope=ldb.SCOPE_SUBTREE,
1003                               expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1004         self.assertEqual(len(res11), 2)
1005
1006     def test_subtree_and_or4(self):
1007         """Testing a search"""
1008
1009         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1010                               scope=ldb.SCOPE_SUBTREE,
1011                               expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1012         self.assertEqual(len(res11), 2)
1013
1014     def test_subtree_and_or5(self):
1015         """Testing a search"""
1016
1017         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1018                               scope=ldb.SCOPE_SUBTREE,
1019                               expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1020         self.assertEqual(len(res11), 1)
1021
1022     def test_subtree_or_and(self):
1023         """Testing a search"""
1024
1025         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1026                               scope=ldb.SCOPE_SUBTREE,
1027                               expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1028         self.assertEqual(len(res11), 10)
1029
1030     def test_subtree_large_and_unique(self):
1031         """Testing a search"""
1032
1033         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1034                               scope=ldb.SCOPE_SUBTREE,
1035                               expression="(&(ou=ou10)(y=a))")
1036         self.assertEqual(len(res11), 1)
1037
1038     def test_subtree_and_none(self):
1039         """Testing a search"""
1040
1041         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1042                               scope=ldb.SCOPE_SUBTREE,
1043                               expression="(&(ou=ouX)(y=a))")
1044         self.assertEqual(len(res11), 0)
1045
1046     def test_subtree_and_idx_record(self):
1047         """Testing a search against the index record"""
1048
1049         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1050                               scope=ldb.SCOPE_SUBTREE,
1051                               expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1052         self.assertEqual(len(res11), 0)
1053
1054     def test_subtree_and_idxone_record(self):
1055         """Testing a search against the index record"""
1056
1057         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1058                               scope=ldb.SCOPE_SUBTREE,
1059                               expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1060         self.assertEqual(len(res11), 0)
1061
1062     def test_dn_filter_one(self):
1063         """Testing that a dn= filter succeeds
1064         (or fails with disallowDNFilter
1065         set and IDXGUID or (IDX and not IDXONE) mode)
1066         when the scope is SCOPE_ONELEVEL.
1067
1068         This should be made more consistent, but for now lock in
1069         the behaviour
1070
1071         """
1072
1073         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1074                               scope=ldb.SCOPE_ONELEVEL,
1075                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1076         if hasattr(self, 'disallowDNFilter') and \
1077            hasattr(self, 'IDX') and \
1078            (hasattr(self, 'IDXGUID') or \
1079             ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1080             self.assertEqual(len(res11), 0)
1081         else:
1082             self.assertEqual(len(res11), 1)
1083
1084     def test_dn_filter_subtree(self):
1085         """Testing that a dn= filter succeeds
1086         (or fails with disallowDNFilter set)
1087         when the scope is SCOPE_SUBTREE"""
1088
1089         res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1090                               scope=ldb.SCOPE_SUBTREE,
1091                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1092         if hasattr(self, 'disallowDNFilter') \
1093            and hasattr(self, 'IDX'):
1094             self.assertEqual(len(res11), 0)
1095         else:
1096             self.assertEqual(len(res11), 1)
1097
1098     def test_dn_filter_base(self):
1099         """Testing that (incorrectly) a dn= filter works
1100         when the scope is SCOPE_BASE"""
1101
1102         res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1103                               scope=ldb.SCOPE_BASE,
1104                               expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1105
1106         # At some point we should fix this, but it isn't trivial
1107         self.assertEqual(len(res11), 1)
1108
1109
1110 # Run the search tests against an lmdb backend
1111 class SearchTestsLmdb(SearchTests):
1112
1113     def setUp(self):
1114         self.prefix = MDB_PREFIX
1115         self.index = {"dn": "@INDEXLIST",
1116                       "@IDXGUID": [b"objectUUID"],
1117                       "@IDX_DN_GUID": [b"GUID"]}
1118         super(SearchTestsLmdb, self).setUp()
1119
1120     def tearDown(self):
1121         super(SearchTestsLmdb, self).tearDown()
1122
1123
1124 class IndexedSearchTests(SearchTests):
1125     """Test searches using the index, to ensure the index doesn't
1126        break things"""
1127     def setUp(self):
1128         super(IndexedSearchTests, self).setUp()
1129         self.l.add({"dn": "@INDEXLIST",
1130                     "@IDXATTR": [b"x", b"y", b"ou"]})
1131         self.IDX = True
1132
1133 class IndexedSearchDnFilterTests(SearchTests):
1134     """Test searches using the index, to ensure the index doesn't
1135        break things"""
1136     def setUp(self):
1137         super(IndexedSearchDnFilterTests, self).setUp()
1138         self.l.add({"dn": "@OPTIONS",
1139                     "disallowDNFilter": "TRUE"})
1140         self.disallowDNFilter = True
1141
1142         self.l.add({"dn": "@INDEXLIST",
1143                     "@IDXATTR": [b"x", b"y", b"ou"]})
1144         self.IDX = True
1145
1146 class IndexedAndOneLevelSearchTests(SearchTests):
1147     """Test searches using the index including @IDXONE, to ensure
1148        the index doesn't break things"""
1149     def setUp(self):
1150         super(IndexedAndOneLevelSearchTests, self).setUp()
1151         self.l.add({"dn": "@INDEXLIST",
1152                     "@IDXATTR": [b"x", b"y", b"ou"],
1153                     "@IDXONE": [b"1"]})
1154         self.IDX = True
1155
1156 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1157     """Test searches using the index including @IDXONE, to ensure
1158        the index doesn't break things"""
1159     def setUp(self):
1160         super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1161         self.l.add({"dn": "@OPTIONS",
1162                     "disallowDNFilter": "TRUE"})
1163         self.disallowDNFilter = True
1164
1165         self.l.add({"dn": "@INDEXLIST",
1166                     "@IDXATTR": [b"x", b"y", b"ou"],
1167                     "@IDXONE": [b"1"]})
1168         self.IDX = True
1169         self.IDXONE = True
1170
1171 class GUIDIndexedSearchTests(SearchTests):
1172     """Test searches using the index, to ensure the index doesn't
1173        break things"""
1174     def setUp(self):
1175         self.index = {"dn": "@INDEXLIST",
1176                       "@IDXATTR": [b"x", b"y", b"ou"],
1177                       "@IDXGUID": [b"objectUUID"],
1178                       "@IDX_DN_GUID": [b"GUID"]}
1179         super(GUIDIndexedSearchTests, self).setUp()
1180
1181         self.IDXGUID = True
1182         self.IDXONE = True
1183
1184
1185 class GUIDIndexedDNFilterSearchTests(SearchTests):
1186     """Test searches using the index, to ensure the index doesn't
1187        break things"""
1188     def setUp(self):
1189         self.index = {"dn": "@INDEXLIST",
1190                       "@IDXATTR": [b"x", b"y", b"ou"],
1191                       "@IDXGUID": [b"objectUUID"],
1192                       "@IDX_DN_GUID": [b"GUID"]}
1193         super(GUIDIndexedDNFilterSearchTests, self).setUp()
1194         self.l.add({"dn": "@OPTIONS",
1195                     "disallowDNFilter": "TRUE"})
1196         self.disallowDNFilter = True
1197         self.IDX = True
1198         self.IDXGUID = True
1199
1200 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1201     """Test searches using the index including @IDXONE, to ensure
1202        the index doesn't break things"""
1203     def setUp(self):
1204         self.index = {"dn": "@INDEXLIST",
1205                       "@IDXATTR": [b"x", b"y", b"ou"],
1206                       "@IDXGUID": [b"objectUUID"],
1207                       "@IDX_DN_GUID": [b"GUID"]}
1208         super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1209         self.l.add({"dn": "@OPTIONS",
1210                     "disallowDNFilter": "TRUE"})
1211         self.disallowDNFilter = True
1212         self.IDX = True
1213         self.IDXGUID = True
1214         self.IDXONE = True
1215
1216 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1217
1218     def setUp(self):
1219         self.prefix = MDB_PREFIX
1220         super(GUIDIndexedSearchTestsLmdb, self).setUp()
1221
1222     def tearDown(self):
1223         super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1224
1225
1226 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1227
1228     def setUp(self):
1229         self.prefix = MDB_PREFIX
1230         super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1231
1232     def tearDown(self):
1233         super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1234
1235
1236 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1237
1238     def setUp(self):
1239         self.prefix = MDB_PREFIX
1240         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1241
1242     def tearDown(self):
1243         super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1244
1245
1246 class AddModifyTests(LdbBaseTest):
1247     def tearDown(self):
1248         shutil.rmtree(self.testdir)
1249         super(AddModifyTests, self).tearDown()
1250
1251         # Ensure the LDB is closed now, so we close the FD
1252         del(self.l)
1253
1254     def setUp(self):
1255         super(AddModifyTests, self).setUp()
1256         self.testdir = tempdir()
1257         self.filename = os.path.join(self.testdir, "add_test.ldb")
1258         self.l = ldb.Ldb(self.url(),
1259                          flags=self.flags(),
1260                          options=["modules:rdn_name"])
1261         try:
1262             self.l.add(self.index)
1263         except AttributeError:
1264             pass
1265
1266         self.l.add({"dn": "DC=SAMBA,DC=ORG",
1267                     "name": b"samba.org",
1268                     "objectUUID": b"0123456789abcdef"})
1269         self.l.add({"dn": "@ATTRIBUTES",
1270                     "objectUUID": "UNIQUE_INDEX"})
1271
1272     def test_add_dup(self):
1273         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1274                     "name": b"Admins",
1275                     "x": "z", "y": "a",
1276                     "objectUUID": b"0123456789abcde1"})
1277         try:
1278             self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1279                         "name": b"Admins",
1280                         "x": "z", "y": "a",
1281                         "objectUUID": b"0123456789abcde2"})
1282             self.fail("Should have failed adding dupliate entry")
1283         except ldb.LdbError as err:
1284             enum = err.args[0]
1285             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1286
1287     def test_add_del_add(self):
1288         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1289                     "name": b"Admins",
1290                     "x": "z", "y": "a",
1291                     "objectUUID": b"0123456789abcde1"})
1292         self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1293         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1294                     "name": b"Admins",
1295                     "x": "z", "y": "a",
1296                     "objectUUID": b"0123456789abcde2"})
1297
1298     def test_add_move_add(self):
1299         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1300                     "name": b"Admins",
1301                     "x": "z", "y": "a",
1302                     "objectUUID": b"0123456789abcde1"})
1303         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1304                       "OU=DUP2,DC=SAMBA,DC=ORG")
1305         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1306                     "name": b"Admins",
1307                     "x": "z", "y": "a",
1308                     "objectUUID": b"0123456789abcde2"})
1309
1310     def test_add_move_fail_move_move(self):
1311         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1312                     "name": b"Admins",
1313                     "x": "z", "y": "a",
1314                     "objectUUID": b"0123456789abcde1"})
1315         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1316                     "name": b"Admins",
1317                     "x": "z", "y": "a",
1318                     "objectUUID": b"0123456789abcde2"})
1319
1320         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1321                              scope=ldb.SCOPE_SUBTREE,
1322                              expression="(objectUUID=0123456789abcde1)")
1323         self.assertEqual(len(res2), 1)
1324         self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1325
1326         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1327                              scope=ldb.SCOPE_SUBTREE,
1328                              expression="(objectUUID=0123456789abcde2)")
1329         self.assertEqual(len(res3), 1)
1330         self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1331
1332         try:
1333             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1334                           "OU=DUP2,DC=SAMBA,DC=ORG")
1335             self.fail("Should have failed on duplicate DN")
1336         except ldb.LdbError as err:
1337             enum = err.args[0]
1338             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1339
1340         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1341                       "OU=DUP3,DC=SAMBA,DC=ORG")
1342
1343         self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1344                       "OU=DUP2,DC=SAMBA,DC=ORG")
1345
1346         res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1347                              scope=ldb.SCOPE_SUBTREE,
1348                              expression="(objectUUID=0123456789abcde1)")
1349         self.assertEqual(len(res2), 1)
1350         self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1351
1352         res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1353                              scope=ldb.SCOPE_SUBTREE,
1354                              expression="(objectUUID=0123456789abcde2)")
1355         self.assertEqual(len(res3), 1)
1356         self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1357
1358     def test_move_missing(self):
1359         try:
1360             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1361                           "OU=DUP2,DC=SAMBA,DC=ORG")
1362             self.fail("Should have failed on missing")
1363         except ldb.LdbError as err:
1364             enum = err.args[0]
1365             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1366
1367     def test_move_missing2(self):
1368         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1369                     "name": b"Admins",
1370                     "x": "z", "y": "a",
1371                     "objectUUID": b"0123456789abcde2"})
1372
1373         try:
1374             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1375                           "OU=DUP2,DC=SAMBA,DC=ORG")
1376             self.fail("Should have failed on missing")
1377         except ldb.LdbError as err:
1378             enum = err.args[0]
1379             self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1380
1381     def test_move_fail_move_add(self):
1382         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1383                     "name": b"Admins",
1384                     "x": "z", "y": "a",
1385                     "objectUUID": b"0123456789abcde1"})
1386         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1387                     "name": b"Admins",
1388                     "x": "z", "y": "a",
1389                     "objectUUID": b"0123456789abcde2"})
1390         try:
1391             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1392                           "OU=DUP2,DC=SAMBA,DC=ORG")
1393             self.fail("Should have failed on duplicate DN")
1394         except ldb.LdbError as err:
1395             enum = err.args[0]
1396             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1397
1398         self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1399                       "OU=DUP3,DC=SAMBA,DC=ORG")
1400
1401         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1402                     "name": b"Admins",
1403                     "x": "z", "y": "a",
1404                     "objectUUID": b"0123456789abcde3"})
1405
1406
1407 class AddModifyTestsLmdb(AddModifyTests):
1408
1409     def setUp(self):
1410         self.prefix = MDB_PREFIX
1411         # TODO self.index =
1412         super(AddModifyTestsLmdb, self).setUp()
1413
1414     def tearDown(self):
1415         super(AddModifyTestsLmdb, self).tearDown()
1416
1417 class IndexedAddModifyTests(AddModifyTests):
1418     """Test searches using the index, to ensure the index doesn't
1419        break things"""
1420     def setUp(self):
1421         if not hasattr(self, 'index'):
1422             self.index = {"dn": "@INDEXLIST",
1423                           "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1424                           "@IDXONE": [b"1"]}
1425         super(IndexedAddModifyTests, self).setUp()
1426
1427     def test_duplicate_GUID(self):
1428         try:
1429             self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1430                         "name": b"Admins",
1431                         "x": "z", "y": "a",
1432                         "objectUUID": b"0123456789abcdef"})
1433             self.fail("Should have failed adding dupliate GUID")
1434         except ldb.LdbError as err:
1435             enum = err.args[0]
1436             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1437
1438     def test_duplicate_name_dup_GUID(self):
1439         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1440                     "name": b"Admins",
1441                     "x": "z", "y": "a",
1442                     "objectUUID": b"a123456789abcdef"})
1443         try:
1444             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1445                         "name": b"Admins",
1446                         "x": "z", "y": "a",
1447                         "objectUUID": b"a123456789abcdef"})
1448             self.fail("Should have failed adding dupliate GUID")
1449         except ldb.LdbError as err:
1450             enum = err.args[0]
1451             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1452
1453     def test_duplicate_name_dup_GUID2(self):
1454         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1455                     "name": b"Admins",
1456                     "x": "z", "y": "a",
1457                     "objectUUID": b"abc3456789abcdef"})
1458         try:
1459             self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1460                         "name": b"Admins",
1461                         "x": "z", "y": "a",
1462                         "objectUUID": b"aaa3456789abcdef"})
1463             self.fail("Should have failed adding dupliate DN")
1464         except ldb.LdbError as err:
1465             enum = err.args[0]
1466             self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1467
1468         # Checking the GUID didn't stick in the index
1469         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1470                     "name": b"Admins",
1471                     "x": "z", "y": "a",
1472                     "objectUUID": b"aaa3456789abcdef"})
1473
1474     def test_add_dup_guid_add(self):
1475         self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1476                     "name": b"Admins",
1477                     "x": "z", "y": "a",
1478                     "objectUUID": b"0123456789abcde1"})
1479         try:
1480             self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1481                         "name": b"Admins",
1482                         "x": "z", "y": "a",
1483                         "objectUUID": b"0123456789abcde1"})
1484             self.fail("Should have failed on duplicate GUID")
1485
1486         except ldb.LdbError as err:
1487             enum = err.args[0]
1488             self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1489
1490         self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1491                     "name": b"Admins",
1492                     "x": "z", "y": "a",
1493                     "objectUUID": b"0123456789abcde2"})
1494
1495 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1496     """Test searches using the index, to ensure the index doesn't
1497        break things"""
1498     def setUp(self):
1499         self.index = {"dn": "@INDEXLIST",
1500                       "@IDXATTR": [b"x", b"y", b"ou"],
1501                       "@IDXONE": [b"1"],
1502                       "@IDXGUID": [b"objectUUID"],
1503                       "@IDX_DN_GUID": [b"GUID"]}
1504         super(GUIDIndexedAddModifyTests, self).setUp()
1505
1506
1507 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1508     """Test GUID index behaviour insdie the transaction"""
1509     def setUp(self):
1510         super(GUIDTransIndexedAddModifyTests, self).setUp()
1511         self.l.transaction_start()
1512
1513     def tearDown(self):
1514         self.l.transaction_commit()
1515         super(GUIDTransIndexedAddModifyTests, self).tearDown()
1516
1517 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1518     """Test index behaviour insdie the transaction"""
1519     def setUp(self):
1520         super(TransIndexedAddModifyTests, self).setUp()
1521         self.l.transaction_start()
1522
1523     def tearDown(self):
1524         self.l.transaction_commit()
1525         super(TransIndexedAddModifyTests, self).tearDown()
1526
1527 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1528
1529     def setUp(self):
1530         self.prefix = MDB_PREFIX
1531         super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1532
1533     def tearDown(self):
1534         super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1535
1536 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1537
1538     def setUp(self):
1539         self.prefix = MDB_PREFIX
1540         super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1541
1542     def tearDown(self):
1543         super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1544
1545 class BadIndexTests(LdbBaseTest):
1546     def setUp(self):
1547         super(BadIndexTests, self).setUp()
1548         self.testdir = tempdir()
1549         self.filename = os.path.join(self.testdir, "test.ldb")
1550         self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1551         if hasattr(self, 'IDXGUID'):
1552             self.ldb.add({"dn": "@INDEXLIST",
1553                           "@IDXATTR": [b"x", b"y", b"ou"],
1554                           "@IDXGUID": [b"objectUUID"],
1555                           "@IDX_DN_GUID": [b"GUID"]})
1556         else:
1557             self.ldb.add({"dn": "@INDEXLIST",
1558                           "@IDXATTR": [b"x", b"y", b"ou"]})
1559
1560         super(BadIndexTests, self).setUp()
1561
1562     def test_unique(self):
1563         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1564                       "objectUUID": b"0123456789abcde1",
1565                       "y": "1"})
1566         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1567                       "objectUUID": b"0123456789abcde2",
1568                       "y": "1"})
1569         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1570                       "objectUUID": b"0123456789abcde3",
1571                       "y": "1"})
1572
1573         res = self.ldb.search(expression="(y=1)",
1574                               base="dc=samba,dc=org")
1575         self.assertEquals(len(res), 3)
1576
1577         # Now set this to unique index, but forget to check the result
1578         try:
1579             self.ldb.add({"dn": "@ATTRIBUTES",
1580                         "y": "UNIQUE_INDEX"})
1581             self.fail()
1582         except ldb.LdbError:
1583             pass
1584
1585         # We must still have a working index
1586         res = self.ldb.search(expression="(y=1)",
1587                               base="dc=samba,dc=org")
1588         self.assertEquals(len(res), 3)
1589
1590     def test_unique_transaction(self):
1591         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1592                       "objectUUID": b"0123456789abcde1",
1593                       "y": "1"})
1594         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1595                       "objectUUID": b"0123456789abcde2",
1596                       "y": "1"})
1597         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1598                       "objectUUID": b"0123456789abcde3",
1599                       "y": "1"})
1600
1601         res = self.ldb.search(expression="(y=1)",
1602                               base="dc=samba,dc=org")
1603         self.assertEquals(len(res), 3)
1604
1605         self.ldb.transaction_start()
1606
1607         # Now set this to unique index, but forget to check the result
1608         try:
1609             self.ldb.add({"dn": "@ATTRIBUTES",
1610                         "y": "UNIQUE_INDEX"})
1611         except ldb.LdbError:
1612             pass
1613
1614         try:
1615             self.ldb.transaction_commit()
1616             self.fail()
1617
1618         except ldb.LdbError as err:
1619             enum = err.args[0]
1620             self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1621
1622         # We must still have a working index
1623         res = self.ldb.search(expression="(y=1)",
1624                               base="dc=samba,dc=org")
1625
1626         self.assertEquals(len(res), 3)
1627
1628     def test_casefold(self):
1629         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1630                       "objectUUID": b"0123456789abcde1",
1631                       "y": "a"})
1632         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1633                       "objectUUID": b"0123456789abcde2",
1634                       "y": "A"})
1635         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1636                       "objectUUID": b"0123456789abcde3",
1637                       "y": ["a", "A"]})
1638
1639         res = self.ldb.search(expression="(y=a)",
1640                               base="dc=samba,dc=org")
1641         self.assertEquals(len(res), 2)
1642
1643         self.ldb.add({"dn": "@ATTRIBUTES",
1644                       "y": "CASE_INSENSITIVE"})
1645
1646         # We must still have a working index
1647         res = self.ldb.search(expression="(y=a)",
1648                               base="dc=samba,dc=org")
1649
1650         if hasattr(self, 'IDXGUID'):
1651             self.assertEquals(len(res), 3)
1652         else:
1653             # We should not return this entry twice, but sadly
1654             # we have not yet fixed
1655             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1656             self.assertEquals(len(res), 4)
1657
1658     def test_casefold_transaction(self):
1659         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1660                       "objectUUID": b"0123456789abcde1",
1661                       "y": "a"})
1662         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1663                       "objectUUID": b"0123456789abcde2",
1664                       "y": "A"})
1665         self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1666                       "objectUUID": b"0123456789abcde3",
1667                       "y": ["a", "A"]})
1668
1669         res = self.ldb.search(expression="(y=a)",
1670                               base="dc=samba,dc=org")
1671         self.assertEquals(len(res), 2)
1672
1673         self.ldb.transaction_start()
1674
1675         self.ldb.add({"dn": "@ATTRIBUTES",
1676                       "y": "CASE_INSENSITIVE"})
1677
1678         self.ldb.transaction_commit()
1679
1680         # We must still have a working index
1681         res = self.ldb.search(expression="(y=a)",
1682                               base="dc=samba,dc=org")
1683
1684         if hasattr(self, 'IDXGUID'):
1685             self.assertEquals(len(res), 3)
1686         else:
1687             # We should not return this entry twice, but sadly
1688             # we have not yet fixed
1689             # https://bugzilla.samba.org/show_bug.cgi?id=13361
1690             self.assertEquals(len(res), 4)
1691
1692
1693     def tearDown(self):
1694         super(BadIndexTests, self).tearDown()
1695
1696
1697 class GUIDBadIndexTests(BadIndexTests):
1698     """Test Bad index things with GUID index mode"""
1699     def setUp(self):
1700         self.IDXGUID = True
1701
1702         super(GUIDBadIndexTests, self).setUp()
1703
1704 class DnTests(TestCase):
1705
1706     def setUp(self):
1707         super(DnTests, self).setUp()
1708         self.ldb = ldb.Ldb()
1709
1710     def tearDown(self):
1711         super(DnTests, self).tearDown()
1712         del(self.ldb)
1713
1714     def test_set_dn_invalid(self):
1715         x = ldb.Message()
1716         def assign():
1717             x.dn = "astring"
1718         self.assertRaises(TypeError, assign)
1719
1720     def test_eq(self):
1721         x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1722         y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1723         self.assertEqual(x, y)
1724         y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1725         self.assertNotEqual(x, y)
1726
1727     def test_str(self):
1728         x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1729         self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1730
1731     def test_repr(self):
1732         x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1733         self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1734
1735     def test_get_casefold(self):
1736         x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1737         self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1738
1739     def test_validate(self):
1740         x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1741         self.assertTrue(x.validate())
1742
1743     def test_parent(self):
1744         x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1745         self.assertEqual("bar=bloe", x.parent().__str__())
1746
1747     def test_parent_nonexistent(self):
1748         x = ldb.Dn(self.ldb, "@BLA")
1749         self.assertEqual(None, x.parent())
1750
1751     def test_is_valid(self):
1752         x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1753         self.assertTrue(x.is_valid())
1754         x = ldb.Dn(self.ldb, "")
1755         self.assertTrue(x.is_valid())
1756
1757     def test_is_special(self):
1758         x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1759         self.assertFalse(x.is_special())
1760         x = ldb.Dn(self.ldb, "@FOOBAR")
1761         self.assertTrue(x.is_special())
1762
1763     def test_check_special(self):
1764         x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1765         self.assertFalse(x.check_special("FOOBAR"))
1766         x = ldb.Dn(self.ldb, "@FOOBAR")
1767         self.assertTrue(x.check_special("@FOOBAR"))
1768
1769     def test_len(self):
1770         x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1771         self.assertEqual(2, len(x))
1772         x = ldb.Dn(self.ldb, "dc=foo21")
1773         self.assertEqual(1, len(x))
1774
1775     def test_add_child(self):
1776         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1777         self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1778         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1779
1780     def test_add_base(self):
1781         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1782         base = ldb.Dn(self.ldb, "bla=bloe")
1783         self.assertTrue(x.add_base(base))
1784         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1785
1786     def test_add_child_str(self):
1787         x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1788         self.assertTrue(x.add_child("bla=bloe"))
1789         self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1790
1791     def test_add_base_str(self):
1792         x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1793         base = "bla=bloe"
1794         self.assertTrue(x.add_base(base))
1795         self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1796
1797     def test_add(self):
1798         x = ldb.Dn(self.ldb, "dc=foo24")
1799         y = ldb.Dn(self.ldb, "bar=bla")
1800         self.assertEqual("dc=foo24,bar=bla", str(x + y))
1801
1802     def test_remove_base_components(self):
1803         x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1804         x.remove_base_components(len(x)-1)
1805         self.assertEqual("dc=foo24", str(x))
1806
1807     def test_parse_ldif(self):
1808         msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1809         msg = next(msgs)
1810         self.assertEqual("foo=bar", str(msg[1].dn))
1811         self.assertTrue(isinstance(msg[1], ldb.Message))
1812         ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1813         self.assertEqual("dn: foo=bar\n\n", ldif)
1814
1815     def test_parse_ldif_more(self):
1816         msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1817         msg = next(msgs)
1818         self.assertEqual("foo=bar", str(msg[1].dn))
1819         msg = next(msgs)
1820         self.assertEqual("bar=bar", str(msg[1].dn))
1821
1822     def test_canonical_string(self):
1823         x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1824         self.assertEqual("/bloe/foo25", x.canonical_str())
1825
1826     def test_canonical_ex_string(self):
1827         x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1828         self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1829
1830     def test_ldb_is_child_of(self):
1831         """Testing ldb_dn_compare_dn"""
1832         dn1 = ldb.Dn(self.ldb, "dc=base")
1833         dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1834         dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1835         dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1836
1837         self.assertTrue(dn1.is_child_of(dn1))
1838         self.assertTrue(dn2.is_child_of(dn1))
1839         self.assertTrue(dn4.is_child_of(dn1))
1840         self.assertTrue(dn4.is_child_of(dn3))
1841         self.assertTrue(dn4.is_child_of(dn4))
1842         self.assertFalse(dn3.is_child_of(dn2))
1843         self.assertFalse(dn1.is_child_of(dn4))
1844
1845     def test_ldb_is_child_of_str(self):
1846         """Testing ldb_dn_compare_dn"""
1847         dn1_str = "dc=base"
1848         dn2_str = "cn=foo,dc=base"
1849         dn3_str = "cn=bar,dc=base"
1850         dn4_str = "cn=baz,cn=bar,dc=base"
1851
1852         dn1 = ldb.Dn(self.ldb, dn1_str)
1853         dn2 = ldb.Dn(self.ldb, dn2_str)
1854         dn3 = ldb.Dn(self.ldb, dn3_str)
1855         dn4 = ldb.Dn(self.ldb, dn4_str)
1856
1857         self.assertTrue(dn1.is_child_of(dn1_str))
1858         self.assertTrue(dn2.is_child_of(dn1_str))
1859         self.assertTrue(dn4.is_child_of(dn1_str))
1860         self.assertTrue(dn4.is_child_of(dn3_str))
1861         self.assertTrue(dn4.is_child_of(dn4_str))
1862         self.assertFalse(dn3.is_child_of(dn2_str))
1863         self.assertFalse(dn1.is_child_of(dn4_str))
1864
1865     def test_get_component_name(self):
1866         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1867         self.assertEqual(dn.get_component_name(0), 'cn')
1868         self.assertEqual(dn.get_component_name(1), 'dc')
1869         self.assertEqual(dn.get_component_name(2), None)
1870         self.assertEqual(dn.get_component_name(-1), None)
1871
1872     def test_get_component_value(self):
1873         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1874         self.assertEqual(dn.get_component_value(0), 'foo')
1875         self.assertEqual(dn.get_component_value(1), 'base')
1876         self.assertEqual(dn.get_component_name(2), None)
1877         self.assertEqual(dn.get_component_name(-1), None)
1878
1879     def test_set_component(self):
1880         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1881         dn.set_component(0, 'cn', 'bar')
1882         self.assertEqual(str(dn), "cn=bar,dc=base")
1883         dn.set_component(1, 'o', 'asep')
1884         self.assertEqual(str(dn), "cn=bar,o=asep")
1885         self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1886         self.assertEqual(str(dn), "cn=bar,o=asep")
1887         dn.set_component(1, 'o', 'a,b+c')
1888         self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1889
1890     def test_set_component_bytes(self):
1891         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1892         dn.set_component(0, 'cn', b'bar')
1893         self.assertEqual(str(dn), "cn=bar,dc=base")
1894         dn.set_component(1, 'o', b'asep')
1895         self.assertEqual(str(dn), "cn=bar,o=asep")
1896
1897     def test_set_component_none(self):
1898         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1899         self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1900
1901     def test_get_extended_component_null(self):
1902         dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1903         self.assertEqual(dn.get_extended_component("TEST"), None)
1904
1905     def test_get_extended_component(self):
1906         self.ldb._register_test_extensions()
1907         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1908         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1909
1910     def test_set_extended_component(self):
1911         self.ldb._register_test_extensions()
1912         dn = ldb.Dn(self.ldb, "dc=base")
1913         dn.set_extended_component("TEST", "foo")
1914         self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1915         dn.set_extended_component("TEST", b"bar")
1916         self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1917
1918     def test_extended_str(self):
1919         self.ldb._register_test_extensions()
1920         dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1921         self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1922
1923     def test_get_rdn_name(self):
1924         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1925         self.assertEqual(dn.get_rdn_name(), 'cn')
1926
1927     def test_get_rdn_value(self):
1928         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1929         self.assertEqual(dn.get_rdn_value(), 'foo')
1930
1931     def test_get_casefold(self):
1932         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1933         self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1934
1935     def test_get_linearized(self):
1936         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1937         self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1938
1939     def test_is_null(self):
1940         dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1941         self.assertFalse(dn.is_null())
1942
1943         dn = ldb.Dn(self.ldb, '')
1944         self.assertTrue(dn.is_null())
1945
1946 class LdbMsgTests(TestCase):
1947
1948     def setUp(self):
1949         super(LdbMsgTests, self).setUp()
1950         self.msg = ldb.Message()
1951
1952     def test_init_dn(self):
1953         self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1954         self.assertEqual("dc=foo27", str(self.msg.dn))
1955
1956     def test_iter_items(self):
1957         self.assertEqual(0, len(self.msg.items()))
1958         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
1959         self.assertEqual(1, len(self.msg.items()))
1960
1961     def test_repr(self):
1962         self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
1963         self.msg["dc"] = b"foo"
1964         if PY3:
1965             self.assertIn(repr(self.msg), [
1966                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1967                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1968             ])
1969             self.assertIn(repr(self.msg.text), [
1970                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1971                 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1972             ])
1973         else:
1974             self.assertEqual(
1975                 repr(self.msg),
1976                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1977             self.assertEqual(
1978                 repr(self.msg.text),
1979                 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1980
1981     def test_len(self):
1982         self.assertEqual(0, len(self.msg))
1983
1984     def test_notpresent(self):
1985         self.assertRaises(KeyError, lambda: self.msg["foo"])
1986
1987     def test_del(self):
1988         del self.msg["foo"]
1989
1990     def test_add(self):
1991         self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1992
1993     def test_add_text(self):
1994         self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1995
1996     def test_elements_empty(self):
1997         self.assertEqual([], self.msg.elements())
1998
1999     def test_elements(self):
2000         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2001         self.msg.add(el)
2002         self.assertEqual([el], self.msg.elements())
2003         self.assertEqual([el.text], self.msg.text.elements())
2004
2005     def test_add_value(self):
2006         self.assertEqual(0, len(self.msg))
2007         self.msg["foo"] = [b"foo"]
2008         self.assertEqual(1, len(self.msg))
2009
2010     def test_add_value_text(self):
2011         self.assertEqual(0, len(self.msg))
2012         self.msg["foo"] = ["foo"]
2013         self.assertEqual(1, len(self.msg))
2014
2015     def test_add_value_multiple(self):
2016         self.assertEqual(0, len(self.msg))
2017         self.msg["foo"] = [b"foo", b"bla"]
2018         self.assertEqual(1, len(self.msg))
2019         self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2020
2021     def test_add_value_multiple_text(self):
2022         self.assertEqual(0, len(self.msg))
2023         self.msg["foo"] = ["foo", "bla"]
2024         self.assertEqual(1, len(self.msg))
2025         self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2026
2027     def test_set_value(self):
2028         self.msg["foo"] = [b"fool"]
2029         self.assertEqual([b"fool"], list(self.msg["foo"]))
2030         self.msg["foo"] = [b"bar"]
2031         self.assertEqual([b"bar"], list(self.msg["foo"]))
2032
2033     def test_set_value_text(self):
2034         self.msg["foo"] = ["fool"]
2035         self.assertEqual(["fool"], list(self.msg.text["foo"]))
2036         self.msg["foo"] = ["bar"]
2037         self.assertEqual(["bar"], list(self.msg.text["foo"]))
2038
2039     def test_keys(self):
2040         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2041         self.msg["foo"] = [b"bla"]
2042         self.msg["bar"] = [b"bla"]
2043         self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
2044
2045     def test_keys_text(self):
2046         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2047         self.msg["foo"] = ["bla"]
2048         self.msg["bar"] = ["bla"]
2049         self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
2050
2051     def test_dn(self):
2052         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2053         self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2054
2055     def test_get_dn(self):
2056         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2057         self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2058
2059     def test_dn_text(self):
2060         self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2061         self.assertEqual("@BASEINFO", str(self.msg.dn))
2062         self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2063
2064     def test_get_dn_text(self):
2065         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2066         self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2067         self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2068
2069     def test_get_invalid(self):
2070         self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2071         self.assertRaises(TypeError, self.msg.get, 42)
2072
2073     def test_get_other(self):
2074         self.msg["foo"] = [b"bar"]
2075         self.assertEqual(b"bar", self.msg.get("foo")[0])
2076         self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2077         self.assertEqual(None, self.msg.get("foo", idx=1))
2078         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2079
2080     def test_get_other_text(self):
2081         self.msg["foo"] = ["bar"]
2082         self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2083         self.assertEqual("bar", self.msg.text.get("foo")[0])
2084         self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2085         self.assertEqual(None, self.msg.get("foo", idx=1))
2086         self.assertEqual("", self.msg.get("foo", default='', idx=1))
2087
2088     def test_get_default(self):
2089         self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2090         self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2091
2092     def test_get_default_text(self):
2093         self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2094         self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2095
2096     def test_get_unknown(self):
2097         self.assertEqual(None, self.msg.get("lalalala"))
2098
2099     def test_get_unknown_text(self):
2100         self.assertEqual(None, self.msg.text.get("lalalala"))
2101
2102     def test_msg_diff(self):
2103         l = ldb.Ldb()
2104         msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2105         msg1 = next(msgs)[1]
2106         msg2 = next(msgs)[1]
2107         msgdiff = l.msg_diff(msg1, msg2)
2108         self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2109         self.assertRaises(KeyError, lambda: msgdiff["foo"])
2110         self.assertEqual(1, len(msgdiff))
2111
2112     def test_equal_empty(self):
2113         msg1 = ldb.Message()
2114         msg2 = ldb.Message()
2115         self.assertEqual(msg1, msg2)
2116
2117     def test_equal_simplel(self):
2118         db = ldb.Ldb()
2119         msg1 = ldb.Message()
2120         msg1.dn = ldb.Dn(db, "foo=bar")
2121         msg2 = ldb.Message()
2122         msg2.dn = ldb.Dn(db, "foo=bar")
2123         self.assertEqual(msg1, msg2)
2124         msg1['foo'] = b'bar'
2125         msg2['foo'] = b'bar'
2126         self.assertEqual(msg1, msg2)
2127         msg2['foo'] = b'blie'
2128         self.assertNotEqual(msg1, msg2)
2129         msg2['foo'] = b'blie'
2130
2131     def test_from_dict(self):
2132         rec = {"dn": "dc=fromdict",
2133                "a1": [b"a1-val1", b"a1-val1"]}
2134         l = ldb.Ldb()
2135         # check different types of input Flags
2136         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2137             m = ldb.Message.from_dict(l, rec, flags)
2138             self.assertEqual(rec["a1"], list(m["a1"]))
2139             self.assertEqual(flags, m["a1"].flags())
2140         # check input params
2141         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2142         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2143         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2144         # Message.from_dict expects dictionary with 'dn'
2145         err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2146         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2147
2148     def test_from_dict_text(self):
2149         rec = {"dn": "dc=fromdict",
2150                "a1": ["a1-val1", "a1-val1"]}
2151         l = ldb.Ldb()
2152         # check different types of input Flags
2153         for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2154             m = ldb.Message.from_dict(l, rec, flags)
2155             self.assertEqual(rec["a1"], list(m.text["a1"]))
2156             self.assertEqual(flags, m.text["a1"].flags())
2157         # check input params
2158         self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2159         self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2160         self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2161         # Message.from_dict expects dictionary with 'dn'
2162         err_rec = {"a1": ["a1-val1", "a1-val1"]}
2163         self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2164
2165     def test_copy_add_message_element(self):
2166         m = ldb.Message()
2167         m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2168         m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2169         mto = ldb.Message()
2170         mto["1"] = m["1"]
2171         mto["2"] = m["2"]
2172         self.assertEqual(mto["1"], m["1"])
2173         self.assertEqual(mto["2"], m["2"])
2174         mto = ldb.Message()
2175         mto.add(m["1"])
2176         mto.add(m["2"])
2177         self.assertEqual(mto["1"], m["1"])
2178         self.assertEqual(mto["2"], m["2"])
2179
2180     def test_copy_add_message_element_text(self):
2181         m = ldb.Message()
2182         m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2183         m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2184         mto = ldb.Message()
2185         mto["1"] = m["1"]
2186         mto["2"] = m["2"]
2187         self.assertEqual(mto["1"], m.text["1"])
2188         self.assertEqual(mto["2"], m.text["2"])
2189         mto = ldb.Message()
2190         mto.add(m["1"])
2191         mto.add(m["2"])
2192         self.assertEqual(mto.text["1"], m.text["1"])
2193         self.assertEqual(mto.text["2"], m.text["2"])
2194         self.assertEqual(mto["1"], m["1"])
2195         self.assertEqual(mto["2"], m["2"])
2196
2197
2198 class MessageElementTests(TestCase):
2199
2200     def test_cmp_element(self):
2201         x = ldb.MessageElement([b"foo"])
2202         y = ldb.MessageElement([b"foo"])
2203         z = ldb.MessageElement([b"bzr"])
2204         self.assertEqual(x, y)
2205         self.assertNotEqual(x, z)
2206
2207     def test_cmp_element_text(self):
2208         x = ldb.MessageElement([b"foo"])
2209         y = ldb.MessageElement(["foo"])
2210         self.assertEqual(x, y)
2211
2212     def test_create_iterable(self):
2213         x = ldb.MessageElement([b"foo"])
2214         self.assertEqual([b"foo"], list(x))
2215         self.assertEqual(["foo"], list(x.text))
2216
2217     def test_repr(self):
2218         x = ldb.MessageElement([b"foo"])
2219         if PY3:
2220             self.assertEqual("MessageElement([b'foo'])", repr(x))
2221             self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2222         else:
2223             self.assertEqual("MessageElement(['foo'])", repr(x))
2224             self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2225         x = ldb.MessageElement([b"foo", b"bla"])
2226         self.assertEqual(2, len(x))
2227         if PY3:
2228             self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2229             self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2230         else:
2231             self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2232             self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2233
2234     def test_get_item(self):
2235         x = ldb.MessageElement([b"foo", b"bar"])
2236         self.assertEqual(b"foo", x[0])
2237         self.assertEqual(b"bar", x[1])
2238         self.assertEqual(b"bar", x[-1])
2239         self.assertRaises(IndexError, lambda: x[45])
2240
2241     def test_get_item_text(self):
2242         x = ldb.MessageElement(["foo", "bar"])
2243         self.assertEqual("foo", x.text[0])
2244         self.assertEqual("bar", x.text[1])
2245         self.assertEqual("bar", x.text[-1])
2246         self.assertRaises(IndexError, lambda: x[45])
2247
2248     def test_len(self):
2249         x = ldb.MessageElement([b"foo", b"bar"])
2250         self.assertEqual(2, len(x))
2251
2252     def test_eq(self):
2253         x = ldb.MessageElement([b"foo", b"bar"])
2254         y = ldb.MessageElement([b"foo", b"bar"])
2255         self.assertEqual(y, x)
2256         x = ldb.MessageElement([b"foo"])
2257         self.assertNotEqual(y, x)
2258         y = ldb.MessageElement([b"foo"])
2259         self.assertEqual(y, x)
2260
2261     def test_extended(self):
2262         el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2263         if PY3:
2264             self.assertEqual("MessageElement([b'456'])", repr(el))
2265             self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2266         else:
2267             self.assertEqual("MessageElement(['456'])", repr(el))
2268             self.assertEqual("MessageElement(['456']).text", repr(el.text))
2269
2270     def test_bad_text(self):
2271         el = ldb.MessageElement(b'\xba\xdd')
2272         self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2273
2274
2275 class ModuleTests(TestCase):
2276
2277     def setUp(self):
2278         super(ModuleTests, self).setUp()
2279         self.testdir = tempdir()
2280         self.filename = os.path.join(self.testdir, "test.ldb")
2281         self.ldb = ldb.Ldb(self.filename)
2282
2283     def tearDown(self):
2284         shutil.rmtree(self.testdir)
2285         super(ModuleTests, self).setUp()
2286
2287     def test_register_module(self):
2288         class ExampleModule:
2289             name = "example"
2290         ldb.register_module(ExampleModule)
2291
2292     def test_use_module(self):
2293         ops = []
2294         class ExampleModule:
2295             name = "bla"
2296
2297             def __init__(self, ldb, next):
2298                 ops.append("init")
2299                 self.next = next
2300
2301             def search(self, *args, **kwargs):
2302                 return self.next.search(*args, **kwargs)
2303
2304             def request(self, *args, **kwargs):
2305                 pass
2306
2307         ldb.register_module(ExampleModule)
2308         l = ldb.Ldb(self.filename)
2309         l.add({"dn": "@MODULES", "@LIST": "bla"})
2310         self.assertEqual([], ops)
2311         l = ldb.Ldb(self.filename)
2312         self.assertEqual(["init"], ops)
2313
2314 class LdbResultTests(LdbBaseTest):
2315
2316     def setUp(self):
2317         super(LdbResultTests, self).setUp()
2318         self.testdir = tempdir()
2319         self.filename = os.path.join(self.testdir, "test.ldb")
2320         self.l = ldb.Ldb(self.url(), flags=self.flags())
2321         try:
2322             self.l.add(self.index)
2323         except AttributeError:
2324             pass
2325         self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2326                     "objectUUID": b"0123456789abcde0"})
2327         self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2328                     "objectUUID": b"0123456789abcde1"})
2329         self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2330                     "objectUUID": b"0123456789abcde2"})
2331         self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2332                     "objectUUID": b"0123456789abcde3"})
2333         self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2334                     "objectUUID": b"0123456789abcde4"})
2335         self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2336                     "objectUUID": b"0123456789abcde5"})
2337         self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2338                     "objectUUID": b"0123456789abcde6"})
2339         self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2340                     "objectUUID": b"0123456789abcde7"})
2341         self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2342                     "objectUUID": b"0123456789abcde8"})
2343         self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2344                     "objectUUID": b"0123456789abcde9"})
2345         self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2346                     "objectUUID": b"0123456789abcdea"})
2347         self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2348                     "objectUUID": b"0123456789abcdeb"})
2349         self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2350                     "objectUUID": b"0123456789abcdec"})
2351
2352     def tearDown(self):
2353         shutil.rmtree(self.testdir)
2354         super(LdbResultTests, self).tearDown()
2355         # Ensure the LDB is closed now, so we close the FD
2356         del(self.l)
2357
2358     def test_return_type(self):
2359         res = self.l.search()
2360         self.assertEqual(str(res), "<ldb result>")
2361
2362     def test_get_msgs(self):
2363         res = self.l.search()
2364         list = res.msgs
2365
2366     def test_get_controls(self):
2367         res = self.l.search()
2368         list = res.controls
2369
2370     def test_get_referals(self):
2371         res = self.l.search()
2372         list = res.referals
2373
2374     def test_iter_msgs(self):
2375         found = False
2376         for l in self.l.search().msgs:
2377             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2378                 found = True
2379         self.assertTrue(found)
2380
2381     def test_iter_msgs_count(self):
2382         self.assertTrue(self.l.search().count > 0)
2383         # 13 objects has been added to the DC=SAMBA, DC=ORG
2384         self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2385
2386     def test_iter_controls(self):
2387         res = self.l.search().controls
2388         it = iter(res)
2389
2390     def test_create_control(self):
2391         self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2392         c = ldb.Control(self.l, "relax:1")
2393         self.assertEqual(c.critical, True)
2394         self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2395
2396     def test_iter_refs(self):
2397         res = self.l.search().referals
2398         it = iter(res)
2399
2400     def test_search_sequence_msgs(self):
2401         found = False
2402         res = self.l.search().msgs
2403
2404         for i in range(0, len(res)):
2405             l = res[i]
2406             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2407                 found = True
2408         self.assertTrue(found)
2409
2410     def test_search_as_iter(self):
2411         found = False
2412         res = self.l.search()
2413
2414         for l in res:
2415             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2416                 found = True
2417         self.assertTrue(found)
2418
2419     def test_search_iter(self):
2420         found = False
2421         res = self.l.search_iterator()
2422
2423         for l in res:
2424             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2425                 found = True
2426         self.assertTrue(found)
2427
2428
2429     # Show that search results can't see into a transaction
2430     def test_search_against_trans(self):
2431         found11 = False
2432
2433         (r1, w1) = os.pipe()
2434
2435         (r2, w2) = os.pipe()
2436
2437         # For the first element, fork a child that will
2438         # write to the DB
2439         pid = os.fork()
2440         if pid == 0:
2441             # In the child, re-open
2442             del(self.l)
2443             gc.collect()
2444
2445             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2446             # start a transaction
2447             child_ldb.transaction_start()
2448
2449             # write to it
2450             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2451                            "name": b"samba.org",
2452                            "objectUUID": b"o123456789acbdef"})
2453
2454             os.write(w1, b"added")
2455
2456             # Now wait for the search to be done
2457             os.read(r2, 6)
2458
2459             # and commit
2460             try:
2461                 child_ldb.transaction_commit()
2462             except LdbError as err:
2463                 # We print this here to see what went wrong in the child
2464                 print(err)
2465                 os._exit(1)
2466
2467             os.write(w1, b"transaction")
2468             os._exit(0)
2469
2470         self.assertEqual(os.read(r1, 5), b"added")
2471
2472         # This should not turn up until the transaction is concluded
2473         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2474                             scope=ldb.SCOPE_BASE)
2475         self.assertEqual(len(res11), 0)
2476
2477         os.write(w2, b"search")
2478
2479         # Now wait for the transaction to be done.  This should
2480         # deadlock, but the search doesn't hold a read lock for the
2481         # iterator lifetime currently.
2482         self.assertEqual(os.read(r1, 11), b"transaction")
2483
2484         # This should now turn up, as the transaction is over
2485         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2486                             scope=ldb.SCOPE_BASE)
2487         self.assertEqual(len(res11), 1)
2488
2489         self.assertFalse(found11)
2490
2491         (got_pid, status) = os.waitpid(pid, 0)
2492         self.assertEqual(got_pid, pid)
2493
2494
2495     def test_search_iter_against_trans(self):
2496         found = False
2497         found11 = False
2498
2499         # We need to hold this iterator open to hold the all-record
2500         # lock
2501         res = self.l.search_iterator()
2502
2503         (r1, w1) = os.pipe()
2504
2505         (r2, w2) = os.pipe()
2506
2507         # For the first element, with the sequence open (which
2508         # means with ldb locks held), fork a child that will
2509         # write to the DB
2510         pid = os.fork()
2511         if pid == 0:
2512             # In the child, re-open
2513             del(res)
2514             del(self.l)
2515             gc.collect()
2516
2517             child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2518             # start a transaction
2519             child_ldb.transaction_start()
2520
2521             # write to it
2522             child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2523                            "name": b"samba.org",
2524                            "objectUUID": b"o123456789acbdef"})
2525
2526             os.write(w1, b"added")
2527
2528             # Now wait for the search to be done
2529             os.read(r2, 6)
2530
2531             # and commit
2532             try:
2533                 child_ldb.transaction_commit()
2534             except LdbError as err:
2535                 # We print this here to see what went wrong in the child
2536                 print(err)
2537                 os._exit(1)
2538
2539             os.write(w1, b"transaction")
2540             os._exit(0)
2541
2542         self.assertEqual(os.read(r1, 5), b"added")
2543
2544         # This should not turn up until the transaction is concluded
2545         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2546                             scope=ldb.SCOPE_BASE)
2547         self.assertEqual(len(res11), 0)
2548
2549         os.write(w2, b"search")
2550
2551         # allow the transaction to start
2552         time.sleep(1)
2553
2554         # This should not turn up until the search finishes and
2555         # removed the read lock, but for ldb_tdb that happened as soon
2556         # as we called the first res.next()
2557         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2558                             scope=ldb.SCOPE_BASE)
2559         self.assertEqual(len(res11), 0)
2560
2561         # These results are all collected at the first next(res) call
2562         for l in res:
2563             if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2564                 found = True
2565             if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2566                 found11 = True
2567
2568         # Now wait for the transaction to be done.
2569         self.assertEqual(os.read(r1, 11), b"transaction")
2570
2571         # This should now turn up, as the transaction is over and all
2572         # read locks are gone
2573         res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2574                             scope=ldb.SCOPE_BASE)
2575         self.assertEqual(len(res11), 1)
2576
2577         self.assertTrue(found)
2578         self.assertFalse(found11)
2579
2580         (got_pid, status) = os.waitpid(pid, 0)
2581         self.assertEqual(got_pid, pid)
2582
2583
2584 class LdbResultTestsLmdb(LdbResultTests):
2585
2586     def setUp(self):
2587         self.prefix = MDB_PREFIX
2588         self.index = {"dn": "@INDEXLIST",
2589                       "@IDXGUID": [b"objectUUID"],
2590                       "@IDX_DN_GUID": [b"GUID"]}
2591         super(LdbResultTestsLmdb, self).setUp()
2592
2593     def tearDown(self):
2594         super(LdbResultTestsLmdb, self).tearDown()
2595
2596
2597 class BadTypeTests(TestCase):
2598     def test_control(self):
2599         l = ldb.Ldb()
2600         self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2601         self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2602
2603     def test_modify(self):
2604         l = ldb.Ldb()
2605         dn = ldb.Dn(l, 'a=b')
2606         m = ldb.Message(dn)
2607         self.assertRaises(TypeError, l.modify, '<bad type>')
2608         self.assertRaises(TypeError, l.modify, m, '<bad type>')
2609
2610     def test_add(self):
2611         l = ldb.Ldb()
2612         dn = ldb.Dn(l, 'a=b')
2613         m = ldb.Message(dn)
2614         self.assertRaises(TypeError, l.add, '<bad type>')
2615         self.assertRaises(TypeError, l.add, m, '<bad type>')
2616
2617     def test_delete(self):
2618         l = ldb.Ldb()
2619         dn = ldb.Dn(l, 'a=b')
2620         self.assertRaises(TypeError, l.add, '<bad type>')
2621         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2622
2623     def test_rename(self):
2624         l = ldb.Ldb()
2625         dn = ldb.Dn(l, 'a=b')
2626         self.assertRaises(TypeError, l.add, '<bad type>', dn)
2627         self.assertRaises(TypeError, l.add, dn, '<bad type>')
2628         self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2629
2630     def test_search(self):
2631         l = ldb.Ldb()
2632         self.assertRaises(TypeError, l.search, base=1234)
2633         self.assertRaises(TypeError, l.search, scope='<bad type>')
2634         self.assertRaises(TypeError, l.search, expression=1234)
2635         self.assertRaises(TypeError, l.search, attrs='<bad type>')
2636         self.assertRaises(TypeError, l.search, controls='<bad type>')
2637
2638
2639 class VersionTests(TestCase):
2640
2641     def test_version(self):
2642         self.assertTrue(isinstance(ldb.__version__, str))
2643
2644
2645 if __name__ == '__main__':
2646     import unittest
2647     unittest.TestProgram()