2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
13 PY3 = sys.version_info > (3, 0)
22 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
25 return tempfile.mkdtemp(dir=dir_prefix)
28 class NoContextTests(TestCase):
30 def test_valid_attr_name(self):
31 self.assertTrue(ldb.valid_attr_name("foo"))
32 self.assertFalse(ldb.valid_attr_name("24foo"))
34 def test_timestring(self):
35 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
38 def test_string_to_time(self):
39 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
42 def test_binary_encode(self):
43 encoded = ldb.binary_encode(b'test\\x')
44 decoded = ldb.binary_decode(encoded)
45 self.assertEqual(decoded, b'test\\x')
47 encoded2 = ldb.binary_encode('test\\x')
48 self.assertEqual(encoded2, encoded)
51 class LdbBaseTest(TestCase):
53 super(LdbBaseTest, self).setUp()
55 if self.prefix is None:
56 self.prefix = TDB_PREFIX
57 except AttributeError:
58 self.prefix = TDB_PREFIX
61 super(LdbBaseTest, self).tearDown()
64 return self.prefix + self.filename
67 if self.prefix == MDB_PREFIX:
73 class SimpleLdb(LdbBaseTest):
76 super(SimpleLdb, self).setUp()
77 self.testdir = tempdir()
78 self.filename = os.path.join(self.testdir, "test.ldb")
79 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
82 shutil.rmtree(self.testdir)
83 super(SimpleLdb, self).tearDown()
84 # Ensure the LDB is closed now, so we close the FD
87 def test_connect(self):
88 ldb.Ldb(self.url(), flags=self.flags())
90 def test_connect_none(self):
93 def test_connect_later(self):
95 x.connect(self.url(), flags=self.flags())
99 self.assertTrue(repr(x).startswith("<ldb connection"))
101 def test_set_create_perms(self):
103 x.set_create_perms(0o600)
105 def test_modules_none(self):
107 self.assertEqual([], x.modules())
109 def test_modules_tdb(self):
110 x = ldb.Ldb(self.url(), flags=self.flags())
111 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
113 def test_firstmodule_none(self):
115 self.assertEqual(x.firstmodule, None)
117 def test_firstmodule_tdb(self):
118 x = ldb.Ldb(self.url(), flags=self.flags())
120 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
122 def test_search(self):
123 l = ldb.Ldb(self.url(), flags=self.flags())
124 self.assertEqual(len(l.search()), 0)
126 def test_search_controls(self):
127 l = ldb.Ldb(self.url(), flags=self.flags())
128 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
130 def test_search_attrs(self):
131 l = ldb.Ldb(self.url(), flags=self.flags())
132 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
134 def test_search_string_dn(self):
135 l = ldb.Ldb(self.url(), flags=self.flags())
136 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
138 def test_search_attr_string(self):
139 l = ldb.Ldb(self.url(), flags=self.flags())
140 self.assertRaises(TypeError, l.search, attrs="dc")
141 self.assertRaises(TypeError, l.search, attrs=b"dc")
143 def test_opaque(self):
144 l = ldb.Ldb(self.url(), flags=self.flags())
145 l.set_opaque("my_opaque", l)
146 self.assertTrue(l.get_opaque("my_opaque") is not None)
147 self.assertEqual(None, l.get_opaque("unknown"))
149 def test_search_scope_base_empty_db(self):
150 l = ldb.Ldb(self.url(), flags=self.flags())
151 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
154 def test_search_scope_onelevel_empty_db(self):
155 l = ldb.Ldb(self.url(), flags=self.flags())
156 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
157 ldb.SCOPE_ONELEVEL)), 0)
159 def test_delete(self):
160 l = ldb.Ldb(self.url(), flags=self.flags())
161 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
163 def test_delete_w_unhandled_ctrl(self):
164 l = ldb.Ldb(self.url(), flags=self.flags())
166 m.dn = ldb.Dn(l, "dc=foo1")
169 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
172 def test_contains(self):
174 l = ldb.Ldb(name, flags=self.flags())
175 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
176 l = ldb.Ldb(name, flags=self.flags())
178 m.dn = ldb.Dn(l, "dc=foo3")
182 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
183 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
187 def test_get_config_basedn(self):
188 l = ldb.Ldb(self.url(), flags=self.flags())
189 self.assertEqual(None, l.get_config_basedn())
191 def test_get_root_basedn(self):
192 l = ldb.Ldb(self.url(), flags=self.flags())
193 self.assertEqual(None, l.get_root_basedn())
195 def test_get_schema_basedn(self):
196 l = ldb.Ldb(self.url(), flags=self.flags())
197 self.assertEqual(None, l.get_schema_basedn())
199 def test_get_default_basedn(self):
200 l = ldb.Ldb(self.url(), flags=self.flags())
201 self.assertEqual(None, l.get_default_basedn())
204 l = ldb.Ldb(self.url(), flags=self.flags())
206 m.dn = ldb.Dn(l, "dc=foo4")
208 self.assertEqual(len(l.search()), 0)
211 self.assertEqual(len(l.search()), 1)
213 l.delete(ldb.Dn(l, "dc=foo4"))
215 def test_search_iterator(self):
216 l = ldb.Ldb(self.url(), flags=self.flags())
217 s = l.search_iterator()
223 except RuntimeError as re:
228 except RuntimeError as re:
233 except RuntimeError as re:
236 s = l.search_iterator()
239 self.assertTrue(isinstance(me, ldb.Message))
242 self.assertEqual(len(r), 0)
243 self.assertEqual(count, 0)
246 m1.dn = ldb.Dn(l, "dc=foo4")
250 s = l.search_iterator()
253 self.assertTrue(isinstance(me, ldb.Message))
257 self.assertEqual(len(r), 0)
258 self.assertEqual(len(msgs), 1)
259 self.assertEqual(msgs[0].dn, m1.dn)
262 m2.dn = ldb.Dn(l, "dc=foo5")
266 s = l.search_iterator()
269 self.assertTrue(isinstance(me, ldb.Message))
273 self.assertEqual(len(r), 0)
274 self.assertEqual(len(msgs), 2)
275 if msgs[0].dn == m1.dn:
276 self.assertEqual(msgs[0].dn, m1.dn)
277 self.assertEqual(msgs[1].dn, m2.dn)
279 self.assertEqual(msgs[0].dn, m2.dn)
280 self.assertEqual(msgs[1].dn, m1.dn)
282 s = l.search_iterator()
285 self.assertTrue(isinstance(me, ldb.Message))
292 except RuntimeError as re:
295 self.assertTrue(isinstance(me, ldb.Message))
303 self.assertEqual(len(r), 0)
304 self.assertEqual(len(msgs), 2)
305 if msgs[0].dn == m1.dn:
306 self.assertEqual(msgs[0].dn, m1.dn)
307 self.assertEqual(msgs[1].dn, m2.dn)
309 self.assertEqual(msgs[0].dn, m2.dn)
310 self.assertEqual(msgs[1].dn, m1.dn)
312 l.delete(ldb.Dn(l, "dc=foo4"))
313 l.delete(ldb.Dn(l, "dc=foo5"))
315 def test_add_text(self):
316 l = ldb.Ldb(self.url(), flags=self.flags())
318 m.dn = ldb.Dn(l, "dc=foo4")
320 self.assertEqual(len(l.search()), 0)
323 self.assertEqual(len(l.search()), 1)
325 l.delete(ldb.Dn(l, "dc=foo4"))
327 def test_add_w_unhandled_ctrl(self):
328 l = ldb.Ldb(self.url(), flags=self.flags())
330 m.dn = ldb.Dn(l, "dc=foo4")
332 self.assertEqual(len(l.search()), 0)
333 self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
335 def test_add_dict(self):
336 l = ldb.Ldb(self.url(), flags=self.flags())
337 m = {"dn": ldb.Dn(l, "dc=foo5"),
339 self.assertEqual(len(l.search()), 0)
342 self.assertEqual(len(l.search()), 1)
344 l.delete(ldb.Dn(l, "dc=foo5"))
346 def test_add_dict_text(self):
347 l = ldb.Ldb(self.url(), flags=self.flags())
348 m = {"dn": ldb.Dn(l, "dc=foo5"),
350 self.assertEqual(len(l.search()), 0)
353 self.assertEqual(len(l.search()), 1)
355 l.delete(ldb.Dn(l, "dc=foo5"))
357 def test_add_dict_string_dn(self):
358 l = ldb.Ldb(self.url(), flags=self.flags())
359 m = {"dn": "dc=foo6", "bla": b"bla"}
360 self.assertEqual(len(l.search()), 0)
363 self.assertEqual(len(l.search()), 1)
365 l.delete(ldb.Dn(l, "dc=foo6"))
367 def test_add_dict_bytes_dn(self):
368 l = ldb.Ldb(self.url(), flags=self.flags())
369 m = {"dn": b"dc=foo6", "bla": b"bla"}
370 self.assertEqual(len(l.search()), 0)
373 self.assertEqual(len(l.search()), 1)
375 l.delete(ldb.Dn(l, "dc=foo6"))
377 def test_rename(self):
378 l = ldb.Ldb(self.url(), flags=self.flags())
380 m.dn = ldb.Dn(l, "dc=foo7")
382 self.assertEqual(len(l.search()), 0)
385 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
386 self.assertEqual(len(l.search()), 1)
388 l.delete(ldb.Dn(l, "dc=bar"))
390 def test_rename_string_dns(self):
391 l = ldb.Ldb(self.url(), flags=self.flags())
393 m.dn = ldb.Dn(l, "dc=foo8")
395 self.assertEqual(len(l.search()), 0)
397 self.assertEqual(len(l.search()), 1)
399 l.rename("dc=foo8", "dc=bar")
400 self.assertEqual(len(l.search()), 1)
402 l.delete(ldb.Dn(l, "dc=bar"))
404 def test_empty_dn(self):
405 l = ldb.Ldb(self.url(), flags=self.flags())
406 self.assertEqual(0, len(l.search()))
408 m.dn = ldb.Dn(l, "dc=empty")
411 self.assertEqual(1, len(rm))
412 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
415 self.assertEqual(1, len(rm))
416 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
417 rm = l.search(m.dn, attrs=["blah"])
418 self.assertEqual(1, len(rm))
419 self.assertEqual(0, len(rm[0]))
421 def test_modify_delete(self):
422 l = ldb.Ldb(self.url(), flags=self.flags())
424 m.dn = ldb.Dn(l, "dc=modifydelete")
427 rm = l.search(m.dn)[0]
428 self.assertEqual([b"1234"], list(rm["bla"]))
431 m.dn = ldb.Dn(l, "dc=modifydelete")
432 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
433 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
436 self.assertEqual(1, len(rm))
437 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
438 rm = l.search(m.dn, attrs=["bla"])
439 self.assertEqual(1, len(rm))
440 self.assertEqual(0, len(rm[0]))
442 l.delete(ldb.Dn(l, "dc=modifydelete"))
444 def test_modify_delete_text(self):
445 l = ldb.Ldb(self.url(), flags=self.flags())
447 m.dn = ldb.Dn(l, "dc=modifydelete")
448 m.text["bla"] = ["1234"]
450 rm = l.search(m.dn)[0]
451 self.assertEqual(["1234"], list(rm.text["bla"]))
454 m.dn = ldb.Dn(l, "dc=modifydelete")
455 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
456 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
459 self.assertEqual(1, len(rm))
460 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
461 rm = l.search(m.dn, attrs=["bla"])
462 self.assertEqual(1, len(rm))
463 self.assertEqual(0, len(rm[0]))
465 l.delete(ldb.Dn(l, "dc=modifydelete"))
467 def test_modify_add(self):
468 l = ldb.Ldb(self.url(), flags=self.flags())
470 m.dn = ldb.Dn(l, "dc=add")
475 m.dn = ldb.Dn(l, "dc=add")
476 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
477 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
479 rm = l.search(m.dn)[0]
480 self.assertEqual(2, len(rm))
481 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
483 l.delete(ldb.Dn(l, "dc=add"))
485 def test_modify_add_text(self):
486 l = ldb.Ldb(self.url(), flags=self.flags())
488 m.dn = ldb.Dn(l, "dc=add")
489 m.text["bla"] = ["1234"]
493 m.dn = ldb.Dn(l, "dc=add")
494 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
495 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
497 rm = l.search(m.dn)[0]
498 self.assertEqual(2, len(rm))
499 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
501 l.delete(ldb.Dn(l, "dc=add"))
503 def test_modify_replace(self):
504 l = ldb.Ldb(self.url(), flags=self.flags())
506 m.dn = ldb.Dn(l, "dc=modify2")
507 m["bla"] = [b"1234", b"456"]
511 m.dn = ldb.Dn(l, "dc=modify2")
512 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
513 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
515 rm = l.search(m.dn)[0]
516 self.assertEqual(2, len(rm))
517 self.assertEqual([b"789"], list(rm["bla"]))
518 rm = l.search(m.dn, attrs=["bla"])[0]
519 self.assertEqual(1, len(rm))
521 l.delete(ldb.Dn(l, "dc=modify2"))
523 def test_modify_replace_text(self):
524 l = ldb.Ldb(self.url(), flags=self.flags())
526 m.dn = ldb.Dn(l, "dc=modify2")
527 m.text["bla"] = ["1234", "456"]
531 m.dn = ldb.Dn(l, "dc=modify2")
532 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
533 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
535 rm = l.search(m.dn)[0]
536 self.assertEqual(2, len(rm))
537 self.assertEqual(["789"], list(rm.text["bla"]))
538 rm = l.search(m.dn, attrs=["bla"])[0]
539 self.assertEqual(1, len(rm))
541 l.delete(ldb.Dn(l, "dc=modify2"))
543 def test_modify_flags_change(self):
544 l = ldb.Ldb(self.url(), flags=self.flags())
546 m.dn = ldb.Dn(l, "dc=add")
551 m.dn = ldb.Dn(l, "dc=add")
552 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
553 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
555 rm = l.search(m.dn)[0]
556 self.assertEqual(2, len(rm))
557 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
559 # Now create another modify, but switch the flags before we do it
560 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
561 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
563 rm = l.search(m.dn, attrs=["bla"])[0]
564 self.assertEqual(1, len(rm))
565 self.assertEqual([b"1234"], list(rm["bla"]))
567 l.delete(ldb.Dn(l, "dc=add"))
569 def test_modify_flags_change_text(self):
570 l = ldb.Ldb(self.url(), flags=self.flags())
572 m.dn = ldb.Dn(l, "dc=add")
573 m.text["bla"] = ["1234"]
577 m.dn = ldb.Dn(l, "dc=add")
578 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
579 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
581 rm = l.search(m.dn)[0]
582 self.assertEqual(2, len(rm))
583 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
585 # Now create another modify, but switch the flags before we do it
586 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
587 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
589 rm = l.search(m.dn, attrs=["bla"])[0]
590 self.assertEqual(1, len(rm))
591 self.assertEqual(["1234"], list(rm.text["bla"]))
593 l.delete(ldb.Dn(l, "dc=add"))
595 def test_transaction_commit(self):
596 l = ldb.Ldb(self.url(), flags=self.flags())
597 l.transaction_start()
598 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
601 l.transaction_commit()
604 def test_transaction_cancel(self):
605 l = ldb.Ldb(self.url(), flags=self.flags())
606 l.transaction_start()
607 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
610 l.transaction_cancel()
611 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
613 def test_set_debug(self):
614 def my_report_fn(level, text):
616 l = ldb.Ldb(self.url(), flags=self.flags())
617 l.set_debug(my_report_fn)
619 def test_zero_byte_string(self):
620 """Testing we do not get trapped in the \0 byte in a property string."""
621 l = ldb.Ldb(self.url(), flags=self.flags())
624 "objectclass" : b"user",
625 "cN" : b"LDAPtestUSER",
626 "givenname" : b"ldap",
627 "displayname" : b"foo\0bar",
629 res = l.search(expression="(dn=dc=somedn)")
630 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
632 def test_no_crash_broken_expr(self):
633 l = ldb.Ldb(self.url(), flags=self.flags())
634 self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
636 class SearchTests(LdbBaseTest):
638 shutil.rmtree(self.testdir)
639 super(SearchTests, self).tearDown()
641 # Ensure the LDB is closed now, so we close the FD
646 super(SearchTests, self).setUp()
647 self.testdir = tempdir()
648 self.filename = os.path.join(self.testdir, "search_test.ldb")
649 self.l = ldb.Ldb(self.url(),
651 options=["modules:rdn_name"])
653 self.l.add({"dn": "@ATTRIBUTES",
654 "DC": "CASE_INSENSITIVE"})
656 # Note that we can't use the name objectGUID here, as we
657 # want to stay clear of the objectGUID handler in LDB and
658 # instead use just the 16 bytes raw, which we just keep
659 # to printable chars here for ease of handling.
661 self.l.add({"dn": "DC=SAMBA,DC=ORG",
662 "name": b"samba.org",
663 "objectUUID": b"0123456789abcddf"})
664 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
667 "objectUUID": b"0123456789abcde1"})
668 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
671 "objectUUID": b"0123456789abcde2"})
672 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
675 "objectUUID": b"0123456789abcde3"})
676 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
679 "objectUUID": b"0123456789abcde4"})
680 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
683 "objectUUID": b"0123456789abcde5"})
684 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
687 "objectUUID": b"0123456789abcde6"})
688 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
691 "objectUUID": b"0123456789abcde7"})
692 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
695 "objectUUID": b"0123456789abcde8"})
696 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
699 "objectUUID": b"0123456789abcde9"})
700 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
703 "objectUUID": b"0123456789abcde0"})
704 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
707 "objectUUID": b"0123456789abcdea"})
708 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
711 "objectUUID": b"0123456789abcdeb"})
712 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
715 "objectUUID": b"0123456789abcdec"})
716 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
719 "objectUUID": b"0123456789abcded"})
720 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
723 "objectUUID": b"0123456789abcdee"})
724 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
727 "objectUUID": b"0123456789abcd01"})
728 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
731 "objectUUID": b"0123456789abcd02"})
732 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
735 "objectUUID": b"0123456789abcd03"})
736 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
739 "objectUUID": b"0123456789abcd04"})
740 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
743 "objectUUID": b"0123456789abcd05"})
744 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
747 "objectUUID": b"0123456789abcd06"})
748 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
751 "objectUUID": b"0123456789abcd07"})
752 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
755 "objectUUID": b"0123456789abcd08"})
756 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
759 "objectUUID": b"0123456789abcd09"})
762 """Testing a search"""
764 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
765 scope=ldb.SCOPE_BASE)
766 self.assertEqual(len(res11), 1)
768 def test_base_lower(self):
769 """Testing a search"""
771 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
772 scope=ldb.SCOPE_BASE)
773 self.assertEqual(len(res11), 1)
775 def test_base_or(self):
776 """Testing a search"""
778 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
779 scope=ldb.SCOPE_BASE,
780 expression="(|(ou=ou11)(ou=ou12))")
781 self.assertEqual(len(res11), 1)
783 def test_base_or2(self):
784 """Testing a search"""
786 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
787 scope=ldb.SCOPE_BASE,
788 expression="(|(x=y)(y=b))")
789 self.assertEqual(len(res11), 1)
791 def test_base_and(self):
792 """Testing a search"""
794 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
795 scope=ldb.SCOPE_BASE,
796 expression="(&(ou=ou11)(ou=ou12))")
797 self.assertEqual(len(res11), 0)
799 def test_base_and2(self):
800 """Testing a search"""
802 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
803 scope=ldb.SCOPE_BASE,
804 expression="(&(x=y)(y=a))")
805 self.assertEqual(len(res11), 1)
807 def test_base_false(self):
808 """Testing a search"""
810 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
811 scope=ldb.SCOPE_BASE,
812 expression="(|(ou=ou13)(ou=ou12))")
813 self.assertEqual(len(res11), 0)
815 def test_check_base_false(self):
816 """Testing a search"""
817 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
818 scope=ldb.SCOPE_BASE,
819 expression="(|(ou=ou13)(ou=ou12))")
820 self.assertEqual(len(res11), 0)
822 def test_check_base_error(self):
823 """Testing a search"""
824 checkbaseonsearch = {"dn": "@OPTIONS",
825 "checkBaseOnSearch": b"TRUE"}
827 self.l.add(checkbaseonsearch)
828 except ldb.LdbError as err:
830 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
831 m = ldb.Message.from_dict(self.l,
836 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
837 scope=ldb.SCOPE_BASE,
838 expression="(|(ou=ou13)(ou=ou12))")
839 self.fail("Should have failed on missing base")
840 except ldb.LdbError as err:
842 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
844 def test_subtree_and(self):
845 """Testing a search"""
847 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
848 scope=ldb.SCOPE_SUBTREE,
849 expression="(&(ou=ou11)(ou=ou12))")
850 self.assertEqual(len(res11), 0)
852 def test_subtree_and2(self):
853 """Testing a search"""
855 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
856 scope=ldb.SCOPE_SUBTREE,
857 expression="(&(x=y)(|(y=b)(y=c)))")
858 self.assertEqual(len(res11), 1)
860 def test_subtree_and2_lower(self):
861 """Testing a search"""
863 res11 = self.l.search(base="DC=samba,DC=org",
864 scope=ldb.SCOPE_SUBTREE,
865 expression="(&(x=y)(|(y=b)(y=c)))")
866 self.assertEqual(len(res11), 1)
868 def test_subtree_or(self):
869 """Testing a search"""
871 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
872 scope=ldb.SCOPE_SUBTREE,
873 expression="(|(ou=ou11)(ou=ou12))")
874 self.assertEqual(len(res11), 2)
876 def test_subtree_or2(self):
877 """Testing a search"""
879 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
880 scope=ldb.SCOPE_SUBTREE,
881 expression="(|(x=y)(y=b))")
882 self.assertEqual(len(res11), 20)
884 def test_subtree_or3(self):
885 """Testing a search"""
887 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
888 scope=ldb.SCOPE_SUBTREE,
889 expression="(|(x=y)(y=b)(y=c))")
890 self.assertEqual(len(res11), 22)
892 def test_one_and(self):
893 """Testing a search"""
895 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
896 scope=ldb.SCOPE_ONELEVEL,
897 expression="(&(ou=ou11)(ou=ou12))")
898 self.assertEqual(len(res11), 0)
900 def test_one_and2(self):
901 """Testing a search"""
903 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
904 scope=ldb.SCOPE_ONELEVEL,
905 expression="(&(x=y)(y=b))")
906 self.assertEqual(len(res11), 1)
908 def test_one_or(self):
909 """Testing a search"""
911 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
912 scope=ldb.SCOPE_ONELEVEL,
913 expression="(|(ou=ou11)(ou=ou12))")
914 self.assertEqual(len(res11), 2)
916 def test_one_or2(self):
917 """Testing a search"""
919 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
920 scope=ldb.SCOPE_ONELEVEL,
921 expression="(|(x=y)(y=b))")
922 self.assertEqual(len(res11), 20)
924 def test_one_or2_lower(self):
925 """Testing a search"""
927 res11 = self.l.search(base="DC=samba,DC=org",
928 scope=ldb.SCOPE_ONELEVEL,
929 expression="(|(x=y)(y=b))")
930 self.assertEqual(len(res11), 20)
932 def test_subtree_and_or(self):
933 """Testing a search"""
935 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
936 scope=ldb.SCOPE_SUBTREE,
937 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
938 self.assertEqual(len(res11), 0)
940 def test_subtree_and_or2(self):
941 """Testing a search"""
943 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
944 scope=ldb.SCOPE_SUBTREE,
945 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
946 self.assertEqual(len(res11), 0)
948 def test_subtree_and_or3(self):
949 """Testing a search"""
951 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
952 scope=ldb.SCOPE_SUBTREE,
953 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
954 self.assertEqual(len(res11), 2)
956 def test_subtree_and_or4(self):
957 """Testing a search"""
959 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
960 scope=ldb.SCOPE_SUBTREE,
961 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
962 self.assertEqual(len(res11), 2)
964 def test_subtree_and_or5(self):
965 """Testing a search"""
967 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
968 scope=ldb.SCOPE_SUBTREE,
969 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
970 self.assertEqual(len(res11), 1)
972 def test_subtree_or_and(self):
973 """Testing a search"""
975 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
976 scope=ldb.SCOPE_SUBTREE,
977 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
978 self.assertEqual(len(res11), 10)
980 def test_subtree_large_and_unique(self):
981 """Testing a search"""
983 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
984 scope=ldb.SCOPE_SUBTREE,
985 expression="(&(ou=ou10)(y=a))")
986 self.assertEqual(len(res11), 1)
988 def test_subtree_and_none(self):
989 """Testing a search"""
991 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
992 scope=ldb.SCOPE_SUBTREE,
993 expression="(&(ou=ouX)(y=a))")
994 self.assertEqual(len(res11), 0)
996 def test_subtree_and_idx_record(self):
997 """Testing a search against the index record"""
999 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1000 scope=ldb.SCOPE_SUBTREE,
1001 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1002 self.assertEqual(len(res11), 0)
1004 def test_subtree_and_idxone_record(self):
1005 """Testing a search against the index record"""
1007 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1008 scope=ldb.SCOPE_SUBTREE,
1009 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1010 self.assertEqual(len(res11), 0)
1012 def test_dn_filter_one(self):
1013 """Testing that a dn= filter succeeds
1014 (or fails with disallowDNFilter
1015 set and IDXGUID or (IDX and not IDXONE) mode)
1016 when the scope is SCOPE_ONELEVEL.
1018 This should be made more consistent, but for now lock in
1023 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1024 scope=ldb.SCOPE_ONELEVEL,
1025 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1026 if hasattr(self, 'disallowDNFilter') and \
1027 hasattr(self, 'IDX') and \
1028 (hasattr(self, 'IDXGUID') or \
1029 ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1030 self.assertEqual(len(res11), 0)
1032 self.assertEqual(len(res11), 1)
1034 def test_dn_filter_subtree(self):
1035 """Testing that a dn= filter succeeds
1036 (or fails with disallowDNFilter set)
1037 when the scope is SCOPE_SUBTREE"""
1039 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1040 scope=ldb.SCOPE_SUBTREE,
1041 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1042 if hasattr(self, 'disallowDNFilter') \
1043 and hasattr(self, 'IDX'):
1044 self.assertEqual(len(res11), 0)
1046 self.assertEqual(len(res11), 1)
1048 def test_dn_filter_base(self):
1049 """Testing that (incorrectly) a dn= filter works
1050 when the scope is SCOPE_BASE"""
1052 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1053 scope=ldb.SCOPE_BASE,
1054 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1056 # At some point we should fix this, but it isn't trivial
1057 self.assertEqual(len(res11), 1)
1060 class IndexedSearchTests(SearchTests):
1061 """Test searches using the index, to ensure the index doesn't
1064 super(IndexedSearchTests, self).setUp()
1065 self.l.add({"dn": "@INDEXLIST",
1066 "@IDXATTR": [b"x", b"y", b"ou"]})
1069 class IndexedSearchDnFilterTests(SearchTests):
1070 """Test searches using the index, to ensure the index doesn't
1073 super(IndexedSearchDnFilterTests, self).setUp()
1074 self.l.add({"dn": "@OPTIONS",
1075 "disallowDNFilter": "TRUE"})
1076 self.disallowDNFilter = True
1078 self.l.add({"dn": "@INDEXLIST",
1079 "@IDXATTR": [b"x", b"y", b"ou"]})
1082 class IndexedAndOneLevelSearchTests(SearchTests):
1083 """Test searches using the index including @IDXONE, to ensure
1084 the index doesn't break things"""
1086 super(IndexedAndOneLevelSearchTests, self).setUp()
1087 self.l.add({"dn": "@INDEXLIST",
1088 "@IDXATTR": [b"x", b"y", b"ou"],
1092 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1093 """Test searches using the index including @IDXONE, to ensure
1094 the index doesn't break things"""
1096 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1097 self.l.add({"dn": "@OPTIONS",
1098 "disallowDNFilter": "TRUE"})
1099 self.disallowDNFilter = True
1101 self.l.add({"dn": "@INDEXLIST",
1102 "@IDXATTR": [b"x", b"y", b"ou"],
1107 class GUIDIndexedSearchTests(SearchTests):
1108 """Test searches using the index, to ensure the index doesn't
1111 super(GUIDIndexedSearchTests, self).setUp()
1113 self.l.add({"dn": "@INDEXLIST",
1114 "@IDXATTR": [b"x", b"y", b"ou"],
1115 "@IDXGUID": [b"objectUUID"],
1116 "@IDX_DN_GUID": [b"GUID"]})
1121 class GUIDIndexedDNFilterSearchTests(SearchTests):
1122 """Test searches using the index, to ensure the index doesn't
1125 super(GUIDIndexedDNFilterSearchTests, self).setUp()
1126 self.l.add({"dn": "@OPTIONS",
1127 "disallowDNFilter": "TRUE"})
1128 self.disallowDNFilter = True
1130 self.l.add({"dn": "@INDEXLIST",
1131 "@IDXATTR": [b"x", b"y", b"ou"],
1132 "@IDXGUID": [b"objectUUID"],
1133 "@IDX_DN_GUID": [b"GUID"]})
1137 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1138 """Test searches using the index including @IDXONE, to ensure
1139 the index doesn't break things"""
1141 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1142 self.l.add({"dn": "@OPTIONS",
1143 "disallowDNFilter": "TRUE"})
1144 self.disallowDNFilter = True
1146 self.l.add({"dn": "@INDEXLIST",
1147 "@IDXATTR": [b"x", b"y", b"ou"],
1149 "@IDXGUID": [b"objectUUID"],
1150 "@IDX_DN_GUID": [b"GUID"]})
1156 class AddModifyTests(LdbBaseTest):
1158 shutil.rmtree(self.testdir)
1159 super(AddModifyTests, self).tearDown()
1161 # Ensure the LDB is closed now, so we close the FD
1165 super(AddModifyTests, self).setUp()
1166 self.testdir = tempdir()
1167 self.filename = os.path.join(self.testdir, "add_test.ldb")
1168 self.l = ldb.Ldb(self.url(),
1170 options=["modules:rdn_name"])
1171 self.l.add({"dn": "DC=SAMBA,DC=ORG",
1172 "name": b"samba.org",
1173 "objectUUID": b"0123456789abcdef"})
1174 self.l.add({"dn": "@ATTRIBUTES",
1175 "objectUUID": "UNIQUE_INDEX"})
1177 def test_add_dup(self):
1178 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1181 "objectUUID": b"0123456789abcde1"})
1183 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1186 "objectUUID": b"0123456789abcde2"})
1187 self.fail("Should have failed adding dupliate entry")
1188 except ldb.LdbError as err:
1190 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1192 def test_add_del_add(self):
1193 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1196 "objectUUID": b"0123456789abcde1"})
1197 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1198 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1201 "objectUUID": b"0123456789abcde2"})
1203 def test_add_move_add(self):
1204 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1207 "objectUUID": b"0123456789abcde1"})
1208 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1209 "OU=DUP2,DC=SAMBA,DC=ORG")
1210 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1213 "objectUUID": b"0123456789abcde2"})
1215 def test_add_move_fail_move_move(self):
1216 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1219 "objectUUID": b"0123456789abcde1"})
1220 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1223 "objectUUID": b"0123456789abcde2"})
1225 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1226 "OU=DUP2,DC=SAMBA,DC=ORG")
1227 self.fail("Should have failed on duplicate DN")
1228 except ldb.LdbError as err:
1230 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1232 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1233 "OU=DUP3,DC=SAMBA,DC=ORG")
1235 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1236 "OU=DUP2,DC=SAMBA,DC=ORG")
1238 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1239 scope=ldb.SCOPE_SUBTREE,
1240 expression="(objectUUID=0123456789abcde1)")
1241 self.assertEqual(len(res2), 1)
1242 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1244 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1245 scope=ldb.SCOPE_SUBTREE,
1246 expression="(objectUUID=0123456789abcde2)")
1247 self.assertEqual(len(res3), 1)
1248 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1250 def test_move_missing(self):
1252 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1253 "OU=DUP2,DC=SAMBA,DC=ORG")
1254 self.fail("Should have failed on missing")
1255 except ldb.LdbError as err:
1257 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1259 def test_move_missing2(self):
1260 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1263 "objectUUID": b"0123456789abcde2"})
1266 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1267 "OU=DUP2,DC=SAMBA,DC=ORG")
1268 self.fail("Should have failed on missing")
1269 except ldb.LdbError as err:
1271 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1273 def test_move_fail_move_add(self):
1274 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1277 "objectUUID": b"0123456789abcde1"})
1278 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1281 "objectUUID": b"0123456789abcde2"})
1283 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1284 "OU=DUP2,DC=SAMBA,DC=ORG")
1285 self.fail("Should have failed on duplicate DN")
1286 except ldb.LdbError as err:
1288 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1290 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1291 "OU=DUP3,DC=SAMBA,DC=ORG")
1293 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1296 "objectUUID": b"0123456789abcde3"})
1299 class IndexedAddModifyTests(AddModifyTests):
1300 """Test searches using the index, to ensure the index doesn't
1303 super(IndexedAddModifyTests, self).setUp()
1304 self.l.add({"dn": "@INDEXLIST",
1305 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1308 def test_duplicate_GUID(self):
1310 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1313 "objectUUID": b"0123456789abcdef"})
1314 self.fail("Should have failed adding dupliate GUID")
1315 except ldb.LdbError as err:
1317 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1319 def test_duplicate_name_dup_GUID(self):
1320 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1323 "objectUUID": b"a123456789abcdef"})
1325 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1328 "objectUUID": b"a123456789abcdef"})
1329 self.fail("Should have failed adding dupliate GUID")
1330 except ldb.LdbError as err:
1332 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1334 def test_duplicate_name_dup_GUID2(self):
1335 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1338 "objectUUID": b"abc3456789abcdef"})
1340 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1343 "objectUUID": b"aaa3456789abcdef"})
1344 self.fail("Should have failed adding dupliate DN")
1345 except ldb.LdbError as err:
1347 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1349 # Checking the GUID didn't stick in the index
1350 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1353 "objectUUID": b"aaa3456789abcdef"})
1355 def test_add_dup_guid_add(self):
1356 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1359 "objectUUID": b"0123456789abcde1"})
1361 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1364 "objectUUID": b"0123456789abcde1"})
1365 self.fail("Should have failed on duplicate GUID")
1367 except ldb.LdbError as err:
1369 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1371 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1374 "objectUUID": b"0123456789abcde2"})
1376 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1377 """Test searches using the index, to ensure the index doesn't
1380 super(GUIDIndexedAddModifyTests, self).setUp()
1381 indexlist = {"dn": "@INDEXLIST",
1382 "@IDXATTR": [b"x", b"y", b"ou"],
1384 "@IDXGUID": [b"objectUUID"],
1385 "@IDX_DN_GUID": [b"GUID"]}
1386 m = ldb.Message.from_dict(self.l, indexlist, ldb.FLAG_MOD_REPLACE)
1390 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1391 """Test GUID index behaviour insdie the transaction"""
1393 super(GUIDTransIndexedAddModifyTests, self).setUp()
1394 self.l.transaction_start()
1397 self.l.transaction_commit()
1398 super(GUIDTransIndexedAddModifyTests, self).tearDown()
1400 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1401 """Test index behaviour insdie the transaction"""
1403 super(TransIndexedAddModifyTests, self).setUp()
1404 self.l.transaction_start()
1407 self.l.transaction_commit()
1408 super(TransIndexedAddModifyTests, self).tearDown()
1411 class BadIndexTests(LdbBaseTest):
1413 super(BadIndexTests, self).setUp()
1414 self.testdir = tempdir()
1415 self.filename = os.path.join(self.testdir, "test.ldb")
1416 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1417 if hasattr(self, 'IDXGUID'):
1418 self.ldb.add({"dn": "@INDEXLIST",
1419 "@IDXATTR": [b"x", b"y", b"ou"],
1420 "@IDXGUID": [b"objectUUID"],
1421 "@IDX_DN_GUID": [b"GUID"]})
1423 self.ldb.add({"dn": "@INDEXLIST",
1424 "@IDXATTR": [b"x", b"y", b"ou"]})
1426 super(BadIndexTests, self).setUp()
1428 def test_unique(self):
1429 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1430 "objectUUID": b"0123456789abcde1",
1432 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1433 "objectUUID": b"0123456789abcde2",
1435 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1436 "objectUUID": b"0123456789abcde3",
1439 res = self.ldb.search(expression="(y=1)",
1440 base="dc=samba,dc=org")
1441 self.assertEquals(len(res), 3)
1443 # Now set this to unique index, but forget to check the result
1445 self.ldb.add({"dn": "@ATTRIBUTES",
1446 "y": "UNIQUE_INDEX"})
1448 except ldb.LdbError:
1451 # We must still have a working index
1452 res = self.ldb.search(expression="(y=1)",
1453 base="dc=samba,dc=org")
1454 self.assertEquals(len(res), 3)
1456 def test_unique_transaction(self):
1457 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1458 "objectUUID": b"0123456789abcde1",
1460 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1461 "objectUUID": b"0123456789abcde2",
1463 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1464 "objectUUID": b"0123456789abcde3",
1467 res = self.ldb.search(expression="(y=1)",
1468 base="dc=samba,dc=org")
1469 self.assertEquals(len(res), 3)
1471 self.ldb.transaction_start()
1473 # Now set this to unique index, but forget to check the result
1475 self.ldb.add({"dn": "@ATTRIBUTES",
1476 "y": "UNIQUE_INDEX"})
1477 except ldb.LdbError:
1481 self.ldb.transaction_commit()
1484 except ldb.LdbError as err:
1486 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1488 # We must still have a working index
1489 res = self.ldb.search(expression="(y=1)",
1490 base="dc=samba,dc=org")
1492 self.assertEquals(len(res), 3)
1494 def test_casefold(self):
1495 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1496 "objectUUID": b"0123456789abcde1",
1498 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1499 "objectUUID": b"0123456789abcde2",
1501 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1502 "objectUUID": b"0123456789abcde3",
1505 res = self.ldb.search(expression="(y=a)",
1506 base="dc=samba,dc=org")
1507 self.assertEquals(len(res), 2)
1509 self.ldb.add({"dn": "@ATTRIBUTES",
1510 "y": "CASE_INSENSITIVE"})
1512 # We must still have a working index
1513 res = self.ldb.search(expression="(y=a)",
1514 base="dc=samba,dc=org")
1516 if hasattr(self, 'IDXGUID'):
1517 self.assertEquals(len(res), 3)
1519 # We should not return this entry twice, but sadly
1520 # we have not yet fixed
1521 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1522 self.assertEquals(len(res), 4)
1524 def test_casefold_transaction(self):
1525 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1526 "objectUUID": b"0123456789abcde1",
1528 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1529 "objectUUID": b"0123456789abcde2",
1531 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1532 "objectUUID": b"0123456789abcde3",
1535 res = self.ldb.search(expression="(y=a)",
1536 base="dc=samba,dc=org")
1537 self.assertEquals(len(res), 2)
1539 self.ldb.transaction_start()
1541 self.ldb.add({"dn": "@ATTRIBUTES",
1542 "y": "CASE_INSENSITIVE"})
1544 self.ldb.transaction_commit()
1546 # We must still have a working index
1547 res = self.ldb.search(expression="(y=a)",
1548 base="dc=samba,dc=org")
1550 if hasattr(self, 'IDXGUID'):
1551 self.assertEquals(len(res), 3)
1553 # We should not return this entry twice, but sadly
1554 # we have not yet fixed
1555 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1556 self.assertEquals(len(res), 4)
1560 super(BadIndexTests, self).tearDown()
1563 class GUIDBadIndexTests(BadIndexTests):
1564 """Test Bad index things with GUID index mode"""
1568 super(GUIDBadIndexTests, self).setUp()
1571 class DnTests(TestCase):
1574 super(DnTests, self).setUp()
1575 self.ldb = ldb.Ldb()
1578 super(DnTests, self).tearDown()
1581 def test_set_dn_invalid(self):
1585 self.assertRaises(TypeError, assign)
1588 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1589 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1590 self.assertEqual(x, y)
1591 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1592 self.assertNotEqual(x, y)
1595 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1596 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1598 def test_repr(self):
1599 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1600 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1602 def test_get_casefold(self):
1603 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1604 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1606 def test_validate(self):
1607 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1608 self.assertTrue(x.validate())
1610 def test_parent(self):
1611 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1612 self.assertEqual("bar=bloe", x.parent().__str__())
1614 def test_parent_nonexistent(self):
1615 x = ldb.Dn(self.ldb, "@BLA")
1616 self.assertEqual(None, x.parent())
1618 def test_is_valid(self):
1619 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1620 self.assertTrue(x.is_valid())
1621 x = ldb.Dn(self.ldb, "")
1622 self.assertTrue(x.is_valid())
1624 def test_is_special(self):
1625 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1626 self.assertFalse(x.is_special())
1627 x = ldb.Dn(self.ldb, "@FOOBAR")
1628 self.assertTrue(x.is_special())
1630 def test_check_special(self):
1631 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1632 self.assertFalse(x.check_special("FOOBAR"))
1633 x = ldb.Dn(self.ldb, "@FOOBAR")
1634 self.assertTrue(x.check_special("@FOOBAR"))
1637 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1638 self.assertEqual(2, len(x))
1639 x = ldb.Dn(self.ldb, "dc=foo21")
1640 self.assertEqual(1, len(x))
1642 def test_add_child(self):
1643 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1644 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1645 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1647 def test_add_base(self):
1648 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1649 base = ldb.Dn(self.ldb, "bla=bloe")
1650 self.assertTrue(x.add_base(base))
1651 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1653 def test_add_child_str(self):
1654 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1655 self.assertTrue(x.add_child("bla=bloe"))
1656 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1658 def test_add_base_str(self):
1659 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1661 self.assertTrue(x.add_base(base))
1662 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1665 x = ldb.Dn(self.ldb, "dc=foo24")
1666 y = ldb.Dn(self.ldb, "bar=bla")
1667 self.assertEqual("dc=foo24,bar=bla", str(x + y))
1669 def test_remove_base_components(self):
1670 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1671 x.remove_base_components(len(x)-1)
1672 self.assertEqual("dc=foo24", str(x))
1674 def test_parse_ldif(self):
1675 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1677 self.assertEqual("foo=bar", str(msg[1].dn))
1678 self.assertTrue(isinstance(msg[1], ldb.Message))
1679 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1680 self.assertEqual("dn: foo=bar\n\n", ldif)
1682 def test_parse_ldif_more(self):
1683 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1685 self.assertEqual("foo=bar", str(msg[1].dn))
1687 self.assertEqual("bar=bar", str(msg[1].dn))
1689 def test_canonical_string(self):
1690 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1691 self.assertEqual("/bloe/foo25", x.canonical_str())
1693 def test_canonical_ex_string(self):
1694 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1695 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1697 def test_ldb_is_child_of(self):
1698 """Testing ldb_dn_compare_dn"""
1699 dn1 = ldb.Dn(self.ldb, "dc=base")
1700 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1701 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1702 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1704 self.assertTrue(dn1.is_child_of(dn1))
1705 self.assertTrue(dn2.is_child_of(dn1))
1706 self.assertTrue(dn4.is_child_of(dn1))
1707 self.assertTrue(dn4.is_child_of(dn3))
1708 self.assertTrue(dn4.is_child_of(dn4))
1709 self.assertFalse(dn3.is_child_of(dn2))
1710 self.assertFalse(dn1.is_child_of(dn4))
1712 def test_ldb_is_child_of_str(self):
1713 """Testing ldb_dn_compare_dn"""
1715 dn2_str = "cn=foo,dc=base"
1716 dn3_str = "cn=bar,dc=base"
1717 dn4_str = "cn=baz,cn=bar,dc=base"
1719 dn1 = ldb.Dn(self.ldb, dn1_str)
1720 dn2 = ldb.Dn(self.ldb, dn2_str)
1721 dn3 = ldb.Dn(self.ldb, dn3_str)
1722 dn4 = ldb.Dn(self.ldb, dn4_str)
1724 self.assertTrue(dn1.is_child_of(dn1_str))
1725 self.assertTrue(dn2.is_child_of(dn1_str))
1726 self.assertTrue(dn4.is_child_of(dn1_str))
1727 self.assertTrue(dn4.is_child_of(dn3_str))
1728 self.assertTrue(dn4.is_child_of(dn4_str))
1729 self.assertFalse(dn3.is_child_of(dn2_str))
1730 self.assertFalse(dn1.is_child_of(dn4_str))
1732 def test_get_component_name(self):
1733 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1734 self.assertEqual(dn.get_component_name(0), 'cn')
1735 self.assertEqual(dn.get_component_name(1), 'dc')
1736 self.assertEqual(dn.get_component_name(2), None)
1737 self.assertEqual(dn.get_component_name(-1), None)
1739 def test_get_component_value(self):
1740 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1741 self.assertEqual(dn.get_component_value(0), 'foo')
1742 self.assertEqual(dn.get_component_value(1), 'base')
1743 self.assertEqual(dn.get_component_name(2), None)
1744 self.assertEqual(dn.get_component_name(-1), None)
1746 def test_set_component(self):
1747 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1748 dn.set_component(0, 'cn', 'bar')
1749 self.assertEqual(str(dn), "cn=bar,dc=base")
1750 dn.set_component(1, 'o', 'asep')
1751 self.assertEqual(str(dn), "cn=bar,o=asep")
1752 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1753 self.assertEqual(str(dn), "cn=bar,o=asep")
1754 dn.set_component(1, 'o', 'a,b+c')
1755 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1757 def test_set_component_bytes(self):
1758 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1759 dn.set_component(0, 'cn', b'bar')
1760 self.assertEqual(str(dn), "cn=bar,dc=base")
1761 dn.set_component(1, 'o', b'asep')
1762 self.assertEqual(str(dn), "cn=bar,o=asep")
1764 def test_set_component_none(self):
1765 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1766 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1768 def test_get_extended_component_null(self):
1769 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1770 self.assertEqual(dn.get_extended_component("TEST"), None)
1772 def test_get_extended_component(self):
1773 self.ldb._register_test_extensions()
1774 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1775 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1777 def test_set_extended_component(self):
1778 self.ldb._register_test_extensions()
1779 dn = ldb.Dn(self.ldb, "dc=base")
1780 dn.set_extended_component("TEST", "foo")
1781 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1782 dn.set_extended_component("TEST", b"bar")
1783 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1785 def test_extended_str(self):
1786 self.ldb._register_test_extensions()
1787 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1788 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1790 def test_get_rdn_name(self):
1791 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1792 self.assertEqual(dn.get_rdn_name(), 'cn')
1794 def test_get_rdn_value(self):
1795 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1796 self.assertEqual(dn.get_rdn_value(), 'foo')
1798 def test_get_casefold(self):
1799 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1800 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1802 def test_get_linearized(self):
1803 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1804 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1806 def test_is_null(self):
1807 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1808 self.assertFalse(dn.is_null())
1810 dn = ldb.Dn(self.ldb, '')
1811 self.assertTrue(dn.is_null())
1813 class LdbMsgTests(TestCase):
1816 super(LdbMsgTests, self).setUp()
1817 self.msg = ldb.Message()
1819 def test_init_dn(self):
1820 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1821 self.assertEqual("dc=foo27", str(self.msg.dn))
1823 def test_iter_items(self):
1824 self.assertEqual(0, len(self.msg.items()))
1825 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
1826 self.assertEqual(1, len(self.msg.items()))
1828 def test_repr(self):
1829 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
1830 self.msg["dc"] = b"foo"
1832 self.assertIn(repr(self.msg), [
1833 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1834 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1836 self.assertIn(repr(self.msg.text), [
1837 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1838 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1843 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1845 repr(self.msg.text),
1846 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1849 self.assertEqual(0, len(self.msg))
1851 def test_notpresent(self):
1852 self.assertRaises(KeyError, lambda: self.msg["foo"])
1858 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1860 def test_add_text(self):
1861 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1863 def test_elements_empty(self):
1864 self.assertEqual([], self.msg.elements())
1866 def test_elements(self):
1867 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1869 self.assertEqual([el], self.msg.elements())
1870 self.assertEqual([el.text], self.msg.text.elements())
1872 def test_add_value(self):
1873 self.assertEqual(0, len(self.msg))
1874 self.msg["foo"] = [b"foo"]
1875 self.assertEqual(1, len(self.msg))
1877 def test_add_value_text(self):
1878 self.assertEqual(0, len(self.msg))
1879 self.msg["foo"] = ["foo"]
1880 self.assertEqual(1, len(self.msg))
1882 def test_add_value_multiple(self):
1883 self.assertEqual(0, len(self.msg))
1884 self.msg["foo"] = [b"foo", b"bla"]
1885 self.assertEqual(1, len(self.msg))
1886 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
1888 def test_add_value_multiple_text(self):
1889 self.assertEqual(0, len(self.msg))
1890 self.msg["foo"] = ["foo", "bla"]
1891 self.assertEqual(1, len(self.msg))
1892 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
1894 def test_set_value(self):
1895 self.msg["foo"] = [b"fool"]
1896 self.assertEqual([b"fool"], list(self.msg["foo"]))
1897 self.msg["foo"] = [b"bar"]
1898 self.assertEqual([b"bar"], list(self.msg["foo"]))
1900 def test_set_value_text(self):
1901 self.msg["foo"] = ["fool"]
1902 self.assertEqual(["fool"], list(self.msg.text["foo"]))
1903 self.msg["foo"] = ["bar"]
1904 self.assertEqual(["bar"], list(self.msg.text["foo"]))
1906 def test_keys(self):
1907 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1908 self.msg["foo"] = [b"bla"]
1909 self.msg["bar"] = [b"bla"]
1910 self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
1912 def test_keys_text(self):
1913 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1914 self.msg["foo"] = ["bla"]
1915 self.msg["bar"] = ["bla"]
1916 self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
1919 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1920 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
1922 def test_get_dn(self):
1923 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1924 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
1926 def test_dn_text(self):
1927 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1928 self.assertEqual("@BASEINFO", str(self.msg.dn))
1929 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
1931 def test_get_dn_text(self):
1932 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1933 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
1934 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
1936 def test_get_invalid(self):
1937 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
1938 self.assertRaises(TypeError, self.msg.get, 42)
1940 def test_get_other(self):
1941 self.msg["foo"] = [b"bar"]
1942 self.assertEqual(b"bar", self.msg.get("foo")[0])
1943 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
1944 self.assertEqual(None, self.msg.get("foo", idx=1))
1945 self.assertEqual("", self.msg.get("foo", default='', idx=1))
1947 def test_get_other_text(self):
1948 self.msg["foo"] = ["bar"]
1949 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
1950 self.assertEqual("bar", self.msg.text.get("foo")[0])
1951 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
1952 self.assertEqual(None, self.msg.get("foo", idx=1))
1953 self.assertEqual("", self.msg.get("foo", default='', idx=1))
1955 def test_get_default(self):
1956 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
1957 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
1959 def test_get_default_text(self):
1960 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
1961 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
1963 def test_get_unknown(self):
1964 self.assertEqual(None, self.msg.get("lalalala"))
1966 def test_get_unknown_text(self):
1967 self.assertEqual(None, self.msg.text.get("lalalala"))
1969 def test_msg_diff(self):
1971 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
1972 msg1 = next(msgs)[1]
1973 msg2 = next(msgs)[1]
1974 msgdiff = l.msg_diff(msg1, msg2)
1975 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
1976 self.assertRaises(KeyError, lambda: msgdiff["foo"])
1977 self.assertEqual(1, len(msgdiff))
1979 def test_equal_empty(self):
1980 msg1 = ldb.Message()
1981 msg2 = ldb.Message()
1982 self.assertEqual(msg1, msg2)
1984 def test_equal_simplel(self):
1986 msg1 = ldb.Message()
1987 msg1.dn = ldb.Dn(db, "foo=bar")
1988 msg2 = ldb.Message()
1989 msg2.dn = ldb.Dn(db, "foo=bar")
1990 self.assertEqual(msg1, msg2)
1991 msg1['foo'] = b'bar'
1992 msg2['foo'] = b'bar'
1993 self.assertEqual(msg1, msg2)
1994 msg2['foo'] = b'blie'
1995 self.assertNotEqual(msg1, msg2)
1996 msg2['foo'] = b'blie'
1998 def test_from_dict(self):
1999 rec = {"dn": "dc=fromdict",
2000 "a1": [b"a1-val1", b"a1-val1"]}
2002 # check different types of input Flags
2003 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2004 m = ldb.Message.from_dict(l, rec, flags)
2005 self.assertEqual(rec["a1"], list(m["a1"]))
2006 self.assertEqual(flags, m["a1"].flags())
2007 # check input params
2008 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2009 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2010 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2011 # Message.from_dict expects dictionary with 'dn'
2012 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2013 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2015 def test_from_dict_text(self):
2016 rec = {"dn": "dc=fromdict",
2017 "a1": ["a1-val1", "a1-val1"]}
2019 # check different types of input Flags
2020 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2021 m = ldb.Message.from_dict(l, rec, flags)
2022 self.assertEqual(rec["a1"], list(m.text["a1"]))
2023 self.assertEqual(flags, m.text["a1"].flags())
2024 # check input params
2025 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2026 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2027 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2028 # Message.from_dict expects dictionary with 'dn'
2029 err_rec = {"a1": ["a1-val1", "a1-val1"]}
2030 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2032 def test_copy_add_message_element(self):
2034 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2035 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2039 self.assertEqual(mto["1"], m["1"])
2040 self.assertEqual(mto["2"], m["2"])
2044 self.assertEqual(mto["1"], m["1"])
2045 self.assertEqual(mto["2"], m["2"])
2047 def test_copy_add_message_element_text(self):
2049 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2050 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2054 self.assertEqual(mto["1"], m.text["1"])
2055 self.assertEqual(mto["2"], m.text["2"])
2059 self.assertEqual(mto.text["1"], m.text["1"])
2060 self.assertEqual(mto.text["2"], m.text["2"])
2061 self.assertEqual(mto["1"], m["1"])
2062 self.assertEqual(mto["2"], m["2"])
2065 class MessageElementTests(TestCase):
2067 def test_cmp_element(self):
2068 x = ldb.MessageElement([b"foo"])
2069 y = ldb.MessageElement([b"foo"])
2070 z = ldb.MessageElement([b"bzr"])
2071 self.assertEqual(x, y)
2072 self.assertNotEqual(x, z)
2074 def test_cmp_element_text(self):
2075 x = ldb.MessageElement([b"foo"])
2076 y = ldb.MessageElement(["foo"])
2077 self.assertEqual(x, y)
2079 def test_create_iterable(self):
2080 x = ldb.MessageElement([b"foo"])
2081 self.assertEqual([b"foo"], list(x))
2082 self.assertEqual(["foo"], list(x.text))
2084 def test_repr(self):
2085 x = ldb.MessageElement([b"foo"])
2087 self.assertEqual("MessageElement([b'foo'])", repr(x))
2088 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2090 self.assertEqual("MessageElement(['foo'])", repr(x))
2091 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2092 x = ldb.MessageElement([b"foo", b"bla"])
2093 self.assertEqual(2, len(x))
2095 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2096 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2098 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2099 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2101 def test_get_item(self):
2102 x = ldb.MessageElement([b"foo", b"bar"])
2103 self.assertEqual(b"foo", x[0])
2104 self.assertEqual(b"bar", x[1])
2105 self.assertEqual(b"bar", x[-1])
2106 self.assertRaises(IndexError, lambda: x[45])
2108 def test_get_item_text(self):
2109 x = ldb.MessageElement(["foo", "bar"])
2110 self.assertEqual("foo", x.text[0])
2111 self.assertEqual("bar", x.text[1])
2112 self.assertEqual("bar", x.text[-1])
2113 self.assertRaises(IndexError, lambda: x[45])
2116 x = ldb.MessageElement([b"foo", b"bar"])
2117 self.assertEqual(2, len(x))
2120 x = ldb.MessageElement([b"foo", b"bar"])
2121 y = ldb.MessageElement([b"foo", b"bar"])
2122 self.assertEqual(y, x)
2123 x = ldb.MessageElement([b"foo"])
2124 self.assertNotEqual(y, x)
2125 y = ldb.MessageElement([b"foo"])
2126 self.assertEqual(y, x)
2128 def test_extended(self):
2129 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2131 self.assertEqual("MessageElement([b'456'])", repr(el))
2132 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2134 self.assertEqual("MessageElement(['456'])", repr(el))
2135 self.assertEqual("MessageElement(['456']).text", repr(el.text))
2137 def test_bad_text(self):
2138 el = ldb.MessageElement(b'\xba\xdd')
2139 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2142 class ModuleTests(TestCase):
2145 super(ModuleTests, self).setUp()
2146 self.testdir = tempdir()
2147 self.filename = os.path.join(self.testdir, "test.ldb")
2148 self.ldb = ldb.Ldb(self.filename)
2151 shutil.rmtree(self.testdir)
2152 super(ModuleTests, self).setUp()
2154 def test_register_module(self):
2155 class ExampleModule:
2157 ldb.register_module(ExampleModule)
2159 def test_use_module(self):
2161 class ExampleModule:
2164 def __init__(self, ldb, next):
2168 def search(self, *args, **kwargs):
2169 return self.next.search(*args, **kwargs)
2171 def request(self, *args, **kwargs):
2174 ldb.register_module(ExampleModule)
2175 l = ldb.Ldb(self.filename)
2176 l.add({"dn": "@MODULES", "@LIST": "bla"})
2177 self.assertEqual([], ops)
2178 l = ldb.Ldb(self.filename)
2179 self.assertEqual(["init"], ops)
2181 class LdbResultTests(LdbBaseTest):
2184 super(LdbResultTests, self).setUp()
2185 self.testdir = tempdir()
2186 self.filename = os.path.join(self.testdir, "test.ldb")
2187 self.l = ldb.Ldb(self.url(), flags=self.flags())
2188 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
2189 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
2190 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
2191 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1"})
2192 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2"})
2193 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3"})
2194 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4"})
2195 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5"})
2196 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6"})
2197 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7"})
2198 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8"})
2199 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9"})
2200 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
2203 shutil.rmtree(self.testdir)
2204 super(LdbResultTests, self).tearDown()
2205 # Ensure the LDB is closed now, so we close the FD
2208 def test_return_type(self):
2209 res = self.l.search()
2210 self.assertEqual(str(res), "<ldb result>")
2212 def test_get_msgs(self):
2213 res = self.l.search()
2216 def test_get_controls(self):
2217 res = self.l.search()
2220 def test_get_referals(self):
2221 res = self.l.search()
2224 def test_iter_msgs(self):
2226 for l in self.l.search().msgs:
2227 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2229 self.assertTrue(found)
2231 def test_iter_msgs_count(self):
2232 self.assertTrue(self.l.search().count > 0)
2233 # 13 objects has been added to the DC=SAMBA, DC=ORG
2234 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2236 def test_iter_controls(self):
2237 res = self.l.search().controls
2240 def test_create_control(self):
2241 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2242 c = ldb.Control(self.l, "relax:1")
2243 self.assertEqual(c.critical, True)
2244 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2246 def test_iter_refs(self):
2247 res = self.l.search().referals
2250 def test_search_sequence_msgs(self):
2252 res = self.l.search().msgs
2254 for i in range(0, len(res)):
2256 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2258 self.assertTrue(found)
2260 def test_search_as_iter(self):
2262 res = self.l.search()
2265 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2267 self.assertTrue(found)
2269 def test_search_iter(self):
2271 res = self.l.search_iterator()
2274 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2276 self.assertTrue(found)
2279 # Show that search results can't see into a transaction
2280 def test_search_against_trans(self):
2283 (r1, w1) = os.pipe()
2285 (r2, w2) = os.pipe()
2287 # For the first element, fork a child that will
2291 # In the child, re-open
2295 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2296 # start a transaction
2297 child_ldb.transaction_start()
2300 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2301 "name": b"samba.org"})
2303 os.write(w1, b"added")
2305 # Now wait for the search to be done
2310 child_ldb.transaction_commit()
2311 except LdbError as err:
2312 # We print this here to see what went wrong in the child
2316 os.write(w1, b"transaction")
2319 self.assertEqual(os.read(r1, 5), b"added")
2321 # This should not turn up until the transaction is concluded
2322 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2323 scope=ldb.SCOPE_BASE)
2324 self.assertEqual(len(res11), 0)
2326 os.write(w2, b"search")
2328 # Now wait for the transaction to be done. This should
2329 # deadlock, but the search doesn't hold a read lock for the
2330 # iterator lifetime currently.
2331 self.assertEqual(os.read(r1, 11), b"transaction")
2333 # This should now turn up, as the transaction is over
2334 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2335 scope=ldb.SCOPE_BASE)
2336 self.assertEqual(len(res11), 1)
2338 self.assertFalse(found11)
2340 (got_pid, status) = os.waitpid(pid, 0)
2341 self.assertEqual(got_pid, pid)
2344 def test_search_iter_against_trans(self):
2348 # We need to hold this iterator open to hold the all-record
2350 res = self.l.search_iterator()
2352 (r1, w1) = os.pipe()
2354 (r2, w2) = os.pipe()
2356 # For the first element, with the sequence open (which
2357 # means with ldb locks held), fork a child that will
2361 # In the child, re-open
2366 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2367 # start a transaction
2368 child_ldb.transaction_start()
2371 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2372 "name": b"samba.org"})
2374 os.write(w1, b"added")
2376 # Now wait for the search to be done
2381 child_ldb.transaction_commit()
2382 except LdbError as err:
2383 # We print this here to see what went wrong in the child
2387 os.write(w1, b"transaction")
2390 self.assertEqual(os.read(r1, 5), b"added")
2392 # This should not turn up until the transaction is concluded
2393 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2394 scope=ldb.SCOPE_BASE)
2395 self.assertEqual(len(res11), 0)
2397 os.write(w2, b"search")
2399 # allow the transaction to start
2402 # This should not turn up until the search finishes and
2403 # removed the read lock, but for ldb_tdb that happened as soon
2404 # as we called the first res.next()
2405 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2406 scope=ldb.SCOPE_BASE)
2407 self.assertEqual(len(res11), 0)
2409 # These results are all collected at the first next(res) call
2411 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2413 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2416 # Now wait for the transaction to be done.
2417 self.assertEqual(os.read(r1, 11), b"transaction")
2419 # This should now turn up, as the transaction is over and all
2420 # read locks are gone
2421 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2422 scope=ldb.SCOPE_BASE)
2423 self.assertEqual(len(res11), 1)
2425 self.assertTrue(found)
2426 self.assertFalse(found11)
2428 (got_pid, status) = os.waitpid(pid, 0)
2429 self.assertEqual(got_pid, pid)
2432 class BadTypeTests(TestCase):
2433 def test_control(self):
2435 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2436 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2438 def test_modify(self):
2440 dn = ldb.Dn(l, 'a=b')
2442 self.assertRaises(TypeError, l.modify, '<bad type>')
2443 self.assertRaises(TypeError, l.modify, m, '<bad type>')
2447 dn = ldb.Dn(l, 'a=b')
2449 self.assertRaises(TypeError, l.add, '<bad type>')
2450 self.assertRaises(TypeError, l.add, m, '<bad type>')
2452 def test_delete(self):
2454 dn = ldb.Dn(l, 'a=b')
2455 self.assertRaises(TypeError, l.add, '<bad type>')
2456 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2458 def test_rename(self):
2460 dn = ldb.Dn(l, 'a=b')
2461 self.assertRaises(TypeError, l.add, '<bad type>', dn)
2462 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2463 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2465 def test_search(self):
2467 self.assertRaises(TypeError, l.search, base=1234)
2468 self.assertRaises(TypeError, l.search, scope='<bad type>')
2469 self.assertRaises(TypeError, l.search, expression=1234)
2470 self.assertRaises(TypeError, l.search, attrs='<bad type>')
2471 self.assertRaises(TypeError, l.search, controls='<bad type>')
2474 class VersionTests(TestCase):
2476 def test_version(self):
2477 self.assertTrue(isinstance(ldb.__version__, str))
2480 if __name__ == '__main__':
2482 unittest.TestProgram()