2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
13 PY3 = sys.version_info > (3, 0)
19 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
22 return tempfile.mkdtemp(dir=dir_prefix)
25 class NoContextTests(TestCase):
27 def test_valid_attr_name(self):
28 self.assertTrue(ldb.valid_attr_name("foo"))
29 self.assertFalse(ldb.valid_attr_name("24foo"))
31 def test_timestring(self):
32 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
33 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
35 def test_string_to_time(self):
36 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
37 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
39 def test_binary_encode(self):
40 encoded = ldb.binary_encode(b'test\\x')
41 decoded = ldb.binary_decode(encoded)
42 self.assertEqual(decoded, b'test\\x')
44 encoded2 = ldb.binary_encode('test\\x')
45 self.assertEqual(encoded2, encoded)
47 class SimpleLdb(TestCase):
50 super(SimpleLdb, self).setUp()
51 self.testdir = tempdir()
52 self.filename = os.path.join(self.testdir, "test.ldb")
53 self.ldb = ldb.Ldb(self.filename)
56 shutil.rmtree(self.testdir)
57 super(SimpleLdb, self).tearDown()
59 def test_connect(self):
60 ldb.Ldb(self.filename)
62 def test_connect_none(self):
65 def test_connect_later(self):
67 x.connect(self.filename)
71 self.assertTrue(repr(x).startswith("<ldb connection"))
73 def test_set_create_perms(self):
75 x.set_create_perms(0o600)
77 def test_modules_none(self):
79 self.assertEqual([], x.modules())
81 def test_modules_tdb(self):
82 x = ldb.Ldb(self.filename)
83 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
85 def test_firstmodule_none(self):
87 self.assertEqual(x.firstmodule, None)
89 def test_firstmodule_tdb(self):
90 x = ldb.Ldb(self.filename)
92 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
94 def test_search(self):
95 l = ldb.Ldb(self.filename)
96 self.assertEqual(len(l.search()), 0)
98 def test_search_controls(self):
99 l = ldb.Ldb(self.filename)
100 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
102 def test_search_attrs(self):
103 l = ldb.Ldb(self.filename)
104 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
106 def test_search_string_dn(self):
107 l = ldb.Ldb(self.filename)
108 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
110 def test_search_attr_string(self):
111 l = ldb.Ldb(self.filename)
112 self.assertRaises(TypeError, l.search, attrs="dc")
113 self.assertRaises(TypeError, l.search, attrs=b"dc")
115 def test_opaque(self):
116 l = ldb.Ldb(self.filename)
117 l.set_opaque("my_opaque", l)
118 self.assertTrue(l.get_opaque("my_opaque") is not None)
119 self.assertEqual(None, l.get_opaque("unknown"))
121 def test_search_scope_base_empty_db(self):
122 l = ldb.Ldb(self.filename)
123 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
126 def test_search_scope_onelevel_empty_db(self):
127 l = ldb.Ldb(self.filename)
128 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
129 ldb.SCOPE_ONELEVEL)), 0)
131 def test_delete(self):
132 l = ldb.Ldb(self.filename)
133 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
135 def test_delete_w_unhandled_ctrl(self):
136 l = ldb.Ldb(self.filename)
138 m.dn = ldb.Dn(l, "dc=foo1")
141 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
144 def test_contains(self):
147 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
150 m.dn = ldb.Dn(l, "dc=foo3")
154 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
155 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
159 def test_get_config_basedn(self):
160 l = ldb.Ldb(self.filename)
161 self.assertEqual(None, l.get_config_basedn())
163 def test_get_root_basedn(self):
164 l = ldb.Ldb(self.filename)
165 self.assertEqual(None, l.get_root_basedn())
167 def test_get_schema_basedn(self):
168 l = ldb.Ldb(self.filename)
169 self.assertEqual(None, l.get_schema_basedn())
171 def test_get_default_basedn(self):
172 l = ldb.Ldb(self.filename)
173 self.assertEqual(None, l.get_default_basedn())
176 l = ldb.Ldb(self.filename)
178 m.dn = ldb.Dn(l, "dc=foo4")
180 self.assertEqual(len(l.search()), 0)
183 self.assertEqual(len(l.search()), 1)
185 l.delete(ldb.Dn(l, "dc=foo4"))
187 def test_search_iterator(self):
188 l = ldb.Ldb(self.filename)
189 s = l.search_iterator()
195 except RuntimeError as re:
200 except RuntimeError as re:
205 except RuntimeError as re:
208 s = l.search_iterator()
211 self.assertTrue(isinstance(me, ldb.Message))
214 self.assertEqual(len(r), 0)
215 self.assertEqual(count, 0)
218 m1.dn = ldb.Dn(l, "dc=foo4")
222 s = l.search_iterator()
225 self.assertTrue(isinstance(me, ldb.Message))
229 self.assertEqual(len(r), 0)
230 self.assertEqual(len(msgs), 1)
231 self.assertEqual(msgs[0].dn, m1.dn)
234 m2.dn = ldb.Dn(l, "dc=foo5")
238 s = l.search_iterator()
241 self.assertTrue(isinstance(me, ldb.Message))
245 self.assertEqual(len(r), 0)
246 self.assertEqual(len(msgs), 2)
247 if msgs[0].dn == m1.dn:
248 self.assertEqual(msgs[0].dn, m1.dn)
249 self.assertEqual(msgs[1].dn, m2.dn)
251 self.assertEqual(msgs[0].dn, m2.dn)
252 self.assertEqual(msgs[1].dn, m1.dn)
254 s = l.search_iterator()
257 self.assertTrue(isinstance(me, ldb.Message))
264 except RuntimeError as re:
267 self.assertTrue(isinstance(me, ldb.Message))
275 self.assertEqual(len(r), 0)
276 self.assertEqual(len(msgs), 2)
277 if msgs[0].dn == m1.dn:
278 self.assertEqual(msgs[0].dn, m1.dn)
279 self.assertEqual(msgs[1].dn, m2.dn)
281 self.assertEqual(msgs[0].dn, m2.dn)
282 self.assertEqual(msgs[1].dn, m1.dn)
284 l.delete(ldb.Dn(l, "dc=foo4"))
285 l.delete(ldb.Dn(l, "dc=foo5"))
287 def test_add_text(self):
288 l = ldb.Ldb(self.filename)
290 m.dn = ldb.Dn(l, "dc=foo4")
292 self.assertEqual(len(l.search()), 0)
295 self.assertEqual(len(l.search()), 1)
297 l.delete(ldb.Dn(l, "dc=foo4"))
299 def test_add_w_unhandled_ctrl(self):
300 l = ldb.Ldb(self.filename)
302 m.dn = ldb.Dn(l, "dc=foo4")
304 self.assertEqual(len(l.search()), 0)
305 self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
307 def test_add_dict(self):
308 l = ldb.Ldb(self.filename)
309 m = {"dn": ldb.Dn(l, "dc=foo5"),
311 self.assertEqual(len(l.search()), 0)
314 self.assertEqual(len(l.search()), 1)
316 l.delete(ldb.Dn(l, "dc=foo5"))
318 def test_add_dict_text(self):
319 l = ldb.Ldb(self.filename)
320 m = {"dn": ldb.Dn(l, "dc=foo5"),
322 self.assertEqual(len(l.search()), 0)
325 self.assertEqual(len(l.search()), 1)
327 l.delete(ldb.Dn(l, "dc=foo5"))
329 def test_add_dict_string_dn(self):
330 l = ldb.Ldb(self.filename)
331 m = {"dn": "dc=foo6", "bla": b"bla"}
332 self.assertEqual(len(l.search()), 0)
335 self.assertEqual(len(l.search()), 1)
337 l.delete(ldb.Dn(l, "dc=foo6"))
339 def test_add_dict_bytes_dn(self):
340 l = ldb.Ldb(self.filename)
341 m = {"dn": b"dc=foo6", "bla": b"bla"}
342 self.assertEqual(len(l.search()), 0)
345 self.assertEqual(len(l.search()), 1)
347 l.delete(ldb.Dn(l, "dc=foo6"))
349 def test_rename(self):
350 l = ldb.Ldb(self.filename)
352 m.dn = ldb.Dn(l, "dc=foo7")
354 self.assertEqual(len(l.search()), 0)
357 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
358 self.assertEqual(len(l.search()), 1)
360 l.delete(ldb.Dn(l, "dc=bar"))
362 def test_rename_string_dns(self):
363 l = ldb.Ldb(self.filename)
365 m.dn = ldb.Dn(l, "dc=foo8")
367 self.assertEqual(len(l.search()), 0)
369 self.assertEqual(len(l.search()), 1)
371 l.rename("dc=foo8", "dc=bar")
372 self.assertEqual(len(l.search()), 1)
374 l.delete(ldb.Dn(l, "dc=bar"))
376 def test_empty_dn(self):
377 l = ldb.Ldb(self.filename)
378 self.assertEqual(0, len(l.search()))
380 m.dn = ldb.Dn(l, "dc=empty")
383 self.assertEqual(1, len(rm))
384 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
387 self.assertEqual(1, len(rm))
388 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
389 rm = l.search(m.dn, attrs=["blah"])
390 self.assertEqual(1, len(rm))
391 self.assertEqual(0, len(rm[0]))
393 def test_modify_delete(self):
394 l = ldb.Ldb(self.filename)
396 m.dn = ldb.Dn(l, "dc=modifydelete")
399 rm = l.search(m.dn)[0]
400 self.assertEqual([b"1234"], list(rm["bla"]))
403 m.dn = ldb.Dn(l, "dc=modifydelete")
404 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
405 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
408 self.assertEqual(1, len(rm))
409 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
410 rm = l.search(m.dn, attrs=["bla"])
411 self.assertEqual(1, len(rm))
412 self.assertEqual(0, len(rm[0]))
414 l.delete(ldb.Dn(l, "dc=modifydelete"))
416 def test_modify_delete_text(self):
417 l = ldb.Ldb(self.filename)
419 m.dn = ldb.Dn(l, "dc=modifydelete")
420 m.text["bla"] = ["1234"]
422 rm = l.search(m.dn)[0]
423 self.assertEqual(["1234"], list(rm.text["bla"]))
426 m.dn = ldb.Dn(l, "dc=modifydelete")
427 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
428 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
431 self.assertEqual(1, len(rm))
432 self.assertEqual(set(["dn", "distinguishedName"]), set(rm[0].keys()))
433 rm = l.search(m.dn, attrs=["bla"])
434 self.assertEqual(1, len(rm))
435 self.assertEqual(0, len(rm[0]))
437 l.delete(ldb.Dn(l, "dc=modifydelete"))
439 def test_modify_add(self):
440 l = ldb.Ldb(self.filename)
442 m.dn = ldb.Dn(l, "dc=add")
447 m.dn = ldb.Dn(l, "dc=add")
448 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
449 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
451 rm = l.search(m.dn)[0]
452 self.assertEqual(2, len(rm))
453 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
455 l.delete(ldb.Dn(l, "dc=add"))
457 def test_modify_add_text(self):
458 l = ldb.Ldb(self.filename)
460 m.dn = ldb.Dn(l, "dc=add")
461 m.text["bla"] = ["1234"]
465 m.dn = ldb.Dn(l, "dc=add")
466 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
467 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
469 rm = l.search(m.dn)[0]
470 self.assertEqual(2, len(rm))
471 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
473 l.delete(ldb.Dn(l, "dc=add"))
475 def test_modify_replace(self):
476 l = ldb.Ldb(self.filename)
478 m.dn = ldb.Dn(l, "dc=modify2")
479 m["bla"] = [b"1234", b"456"]
483 m.dn = ldb.Dn(l, "dc=modify2")
484 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
485 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
487 rm = l.search(m.dn)[0]
488 self.assertEqual(2, len(rm))
489 self.assertEqual([b"789"], list(rm["bla"]))
490 rm = l.search(m.dn, attrs=["bla"])[0]
491 self.assertEqual(1, len(rm))
493 l.delete(ldb.Dn(l, "dc=modify2"))
495 def test_modify_replace_text(self):
496 l = ldb.Ldb(self.filename)
498 m.dn = ldb.Dn(l, "dc=modify2")
499 m.text["bla"] = ["1234", "456"]
503 m.dn = ldb.Dn(l, "dc=modify2")
504 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
505 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
507 rm = l.search(m.dn)[0]
508 self.assertEqual(2, len(rm))
509 self.assertEqual(["789"], list(rm.text["bla"]))
510 rm = l.search(m.dn, attrs=["bla"])[0]
511 self.assertEqual(1, len(rm))
513 l.delete(ldb.Dn(l, "dc=modify2"))
515 def test_modify_flags_change(self):
516 l = ldb.Ldb(self.filename)
518 m.dn = ldb.Dn(l, "dc=add")
523 m.dn = ldb.Dn(l, "dc=add")
524 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
525 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
527 rm = l.search(m.dn)[0]
528 self.assertEqual(2, len(rm))
529 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
531 # Now create another modify, but switch the flags before we do it
532 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
533 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
535 rm = l.search(m.dn, attrs=["bla"])[0]
536 self.assertEqual(1, len(rm))
537 self.assertEqual([b"1234"], list(rm["bla"]))
539 l.delete(ldb.Dn(l, "dc=add"))
541 def test_modify_flags_change_text(self):
542 l = ldb.Ldb(self.filename)
544 m.dn = ldb.Dn(l, "dc=add")
545 m.text["bla"] = ["1234"]
549 m.dn = ldb.Dn(l, "dc=add")
550 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
551 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
553 rm = l.search(m.dn)[0]
554 self.assertEqual(2, len(rm))
555 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
557 # Now create another modify, but switch the flags before we do it
558 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
559 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
561 rm = l.search(m.dn, attrs=["bla"])[0]
562 self.assertEqual(1, len(rm))
563 self.assertEqual(["1234"], list(rm.text["bla"]))
565 l.delete(ldb.Dn(l, "dc=add"))
567 def test_transaction_commit(self):
568 l = ldb.Ldb(self.filename)
569 l.transaction_start()
570 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
573 l.transaction_commit()
576 def test_transaction_cancel(self):
577 l = ldb.Ldb(self.filename)
578 l.transaction_start()
579 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
582 l.transaction_cancel()
583 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
585 def test_set_debug(self):
586 def my_report_fn(level, text):
588 l = ldb.Ldb(self.filename)
589 l.set_debug(my_report_fn)
591 def test_zero_byte_string(self):
592 """Testing we do not get trapped in the \0 byte in a property string."""
593 l = ldb.Ldb(self.filename)
596 "objectclass" : b"user",
597 "cN" : b"LDAPtestUSER",
598 "givenname" : b"ldap",
599 "displayname" : b"foo\0bar",
601 res = l.search(expression="(dn=dc=somedn)")
602 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
604 def test_no_crash_broken_expr(self):
605 l = ldb.Ldb(self.filename)
606 self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
608 class SearchTests(TestCase):
610 shutil.rmtree(self.testdir)
611 super(SearchTests, self).tearDown()
614 super(SearchTests, self).setUp()
615 self.testdir = tempdir()
616 self.filename = os.path.join(self.testdir, "search_test.ldb")
617 self.l = ldb.Ldb(self.filename, options=["modules:rdn_name"])
619 self.l.add({"dn": "@ATTRIBUTES",
620 "DC": "CASE_INSENSITIVE"})
622 # Note that we can't use the name objectGUID here, as we
623 # want to stay clear of the objectGUID handler in LDB and
624 # instead use just the 16 bytes raw, which we just keep
625 # to printable chars here for ease of handling.
627 self.l.add({"dn": "DC=SAMBA,DC=ORG",
628 "name": b"samba.org",
629 "objectUUID": b"0123456789abcddf"})
630 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
633 "objectUUID": b"0123456789abcde1"})
634 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
637 "objectUUID": b"0123456789abcde2"})
638 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
641 "objectUUID": b"0123456789abcde3"})
642 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
645 "objectUUID": b"0123456789abcde4"})
646 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
649 "objectUUID": b"0123456789abcde5"})
650 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
653 "objectUUID": b"0123456789abcde6"})
654 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
657 "objectUUID": b"0123456789abcde7"})
658 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
661 "objectUUID": b"0123456789abcde8"})
662 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
665 "objectUUID": b"0123456789abcde9"})
666 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
669 "objectUUID": b"0123456789abcde0"})
670 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
673 "objectUUID": b"0123456789abcdea"})
674 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
677 "objectUUID": b"0123456789abcdeb"})
678 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
681 "objectUUID": b"0123456789abcdec"})
682 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
685 "objectUUID": b"0123456789abcded"})
686 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
689 "objectUUID": b"0123456789abcdee"})
690 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
693 "objectUUID": b"0123456789abcd01"})
694 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
697 "objectUUID": b"0123456789abcd02"})
698 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
701 "objectUUID": b"0123456789abcd03"})
702 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
705 "objectUUID": b"0123456789abcd04"})
706 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
709 "objectUUID": b"0123456789abcd05"})
710 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
713 "objectUUID": b"0123456789abcd06"})
714 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
717 "objectUUID": b"0123456789abcd07"})
718 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
721 "objectUUID": b"0123456789abcd08"})
722 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
725 "objectUUID": b"0123456789abcd09"})
728 """Testing a search"""
730 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
731 scope=ldb.SCOPE_BASE)
732 self.assertEqual(len(res11), 1)
734 def test_base_lower(self):
735 """Testing a search"""
737 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
738 scope=ldb.SCOPE_BASE)
739 self.assertEqual(len(res11), 1)
741 def test_base_or(self):
742 """Testing a search"""
744 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
745 scope=ldb.SCOPE_BASE,
746 expression="(|(ou=ou11)(ou=ou12))")
747 self.assertEqual(len(res11), 1)
749 def test_base_or2(self):
750 """Testing a search"""
752 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
753 scope=ldb.SCOPE_BASE,
754 expression="(|(x=y)(y=b))")
755 self.assertEqual(len(res11), 1)
757 def test_base_and(self):
758 """Testing a search"""
760 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
761 scope=ldb.SCOPE_BASE,
762 expression="(&(ou=ou11)(ou=ou12))")
763 self.assertEqual(len(res11), 0)
765 def test_base_and2(self):
766 """Testing a search"""
768 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
769 scope=ldb.SCOPE_BASE,
770 expression="(&(x=y)(y=a))")
771 self.assertEqual(len(res11), 1)
773 def test_base_false(self):
774 """Testing a search"""
776 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
777 scope=ldb.SCOPE_BASE,
778 expression="(|(ou=ou13)(ou=ou12))")
779 self.assertEqual(len(res11), 0)
781 def test_check_base_false(self):
782 """Testing a search"""
783 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
784 scope=ldb.SCOPE_BASE,
785 expression="(|(ou=ou13)(ou=ou12))")
786 self.assertEqual(len(res11), 0)
788 def test_check_base_error(self):
789 """Testing a search"""
790 self.l.add({"dn": "@OPTIONS", "checkBaseOnSearch": b"TRUE"})
793 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
794 scope=ldb.SCOPE_BASE,
795 expression="(|(ou=ou13)(ou=ou12))")
796 self.fail("Should have failed on missing base")
797 except ldb.LdbError as err:
799 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
801 def test_subtree_and(self):
802 """Testing a search"""
804 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
805 scope=ldb.SCOPE_SUBTREE,
806 expression="(&(ou=ou11)(ou=ou12))")
807 self.assertEqual(len(res11), 0)
809 def test_subtree_and2(self):
810 """Testing a search"""
812 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
813 scope=ldb.SCOPE_SUBTREE,
814 expression="(&(x=y)(|(y=b)(y=c)))")
815 self.assertEqual(len(res11), 1)
817 def test_subtree_and2_lower(self):
818 """Testing a search"""
820 res11 = self.l.search(base="DC=samba,DC=org",
821 scope=ldb.SCOPE_SUBTREE,
822 expression="(&(x=y)(|(y=b)(y=c)))")
823 self.assertEqual(len(res11), 1)
825 def test_subtree_or(self):
826 """Testing a search"""
828 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
829 scope=ldb.SCOPE_SUBTREE,
830 expression="(|(ou=ou11)(ou=ou12))")
831 self.assertEqual(len(res11), 2)
833 def test_subtree_or2(self):
834 """Testing a search"""
836 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
837 scope=ldb.SCOPE_SUBTREE,
838 expression="(|(x=y)(y=b))")
839 self.assertEqual(len(res11), 20)
841 def test_subtree_or3(self):
842 """Testing a search"""
844 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
845 scope=ldb.SCOPE_SUBTREE,
846 expression="(|(x=y)(y=b)(y=c))")
847 self.assertEqual(len(res11), 22)
849 def test_one_and(self):
850 """Testing a search"""
852 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
853 scope=ldb.SCOPE_ONELEVEL,
854 expression="(&(ou=ou11)(ou=ou12))")
855 self.assertEqual(len(res11), 0)
857 def test_one_and2(self):
858 """Testing a search"""
860 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
861 scope=ldb.SCOPE_ONELEVEL,
862 expression="(&(x=y)(y=b))")
863 self.assertEqual(len(res11), 1)
865 def test_one_or(self):
866 """Testing a search"""
868 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
869 scope=ldb.SCOPE_ONELEVEL,
870 expression="(|(ou=ou11)(ou=ou12))")
871 self.assertEqual(len(res11), 2)
873 def test_one_or2(self):
874 """Testing a search"""
876 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
877 scope=ldb.SCOPE_ONELEVEL,
878 expression="(|(x=y)(y=b))")
879 self.assertEqual(len(res11), 20)
881 def test_one_or2_lower(self):
882 """Testing a search"""
884 res11 = self.l.search(base="DC=samba,DC=org",
885 scope=ldb.SCOPE_ONELEVEL,
886 expression="(|(x=y)(y=b))")
887 self.assertEqual(len(res11), 20)
889 def test_subtree_and_or(self):
890 """Testing a search"""
892 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
893 scope=ldb.SCOPE_SUBTREE,
894 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
895 self.assertEqual(len(res11), 0)
897 def test_subtree_and_or2(self):
898 """Testing a search"""
900 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
901 scope=ldb.SCOPE_SUBTREE,
902 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
903 self.assertEqual(len(res11), 0)
905 def test_subtree_and_or3(self):
906 """Testing a search"""
908 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
909 scope=ldb.SCOPE_SUBTREE,
910 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
911 self.assertEqual(len(res11), 2)
913 def test_subtree_and_or4(self):
914 """Testing a search"""
916 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
917 scope=ldb.SCOPE_SUBTREE,
918 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
919 self.assertEqual(len(res11), 2)
921 def test_subtree_and_or5(self):
922 """Testing a search"""
924 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
925 scope=ldb.SCOPE_SUBTREE,
926 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
927 self.assertEqual(len(res11), 1)
929 def test_subtree_or_and(self):
930 """Testing a search"""
932 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
933 scope=ldb.SCOPE_SUBTREE,
934 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
935 self.assertEqual(len(res11), 10)
937 def test_subtree_large_and_unique(self):
938 """Testing a search"""
940 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
941 scope=ldb.SCOPE_SUBTREE,
942 expression="(&(ou=ou10)(y=a))")
943 self.assertEqual(len(res11), 1)
945 def test_subtree_and_none(self):
946 """Testing a search"""
948 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
949 scope=ldb.SCOPE_SUBTREE,
950 expression="(&(ou=ouX)(y=a))")
951 self.assertEqual(len(res11), 0)
954 class IndexedSearchTests(SearchTests):
955 """Test searches using the index, to ensure the index doesn't
958 super(IndexedSearchTests, self).setUp()
959 self.l.add({"dn": "@INDEXLIST",
960 "@IDXATTR": [b"x", b"y", b"ou"],
963 class GUIDIndexedSearchTests(SearchTests):
964 """Test searches using the index, to ensure the index doesn't
967 super(GUIDIndexedSearchTests, self).setUp()
968 self.l.add({"dn": "@INDEXLIST",
969 "@IDXATTR": [b"x", b"y", b"ou"],
971 "@IDXGUID": [b"objectUUID"],
972 "@IDX_DN_GUID": [b"GUID"]})
974 class AddModifyTests(TestCase):
976 shutil.rmtree(self.testdir)
977 super(AddModifyTests, self).tearDown()
980 super(AddModifyTests, self).setUp()
981 self.testdir = tempdir()
982 self.filename = os.path.join(self.testdir, "add_test.ldb")
983 self.l = ldb.Ldb(self.filename, options=["modules:rdn_name"])
984 self.l.add({"dn": "DC=SAMBA,DC=ORG",
985 "name": b"samba.org",
986 "objectUUID": b"0123456789abcdef"})
988 def test_add_dup(self):
989 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
992 "objectUUID": b"0123456789abcde1"})
994 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
997 "objectUUID": b"0123456789abcde2"})
998 self.fail("Should have failed adding dupliate entry")
999 except ldb.LdbError as err:
1001 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1003 def test_add_del_add(self):
1004 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1007 "objectUUID": b"0123456789abcde1"})
1008 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1009 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1012 "objectUUID": b"0123456789abcde2"})
1014 def test_add_move_add(self):
1015 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1018 "objectUUID": b"0123456789abcde1"})
1019 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1020 "OU=DUP2,DC=SAMBA,DC=ORG")
1021 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1024 "objectUUID": b"0123456789abcde2"})
1027 class IndexedAddModifyTests(AddModifyTests):
1028 """Test searches using the index, to ensure the index doesn't
1031 super(IndexedAddModifyTests, self).setUp()
1032 self.l.add({"dn": "@INDEXLIST",
1033 "@IDXATTR": [b"x", b"y", b"ou"],
1036 class GUIDIndexedAddModifyTests(AddModifyTests):
1037 """Test searches using the index, to ensure the index doesn't
1040 super(GUIDIndexedAddModifyTests, self).setUp()
1041 self.l.add({"dn": "@INDEXLIST",
1042 "@IDXATTR": [b"x", b"y", b"ou"],
1044 "@IDXGUID": [b"objectUUID"],
1045 "@IDX_DN_GUID": [b"GUID"]})
1047 def test_duplicate_GUID(self):
1049 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1052 "objectUUID": b"0123456789abcdef"})
1053 self.fail("Should have failed adding dupliate GUID")
1054 except ldb.LdbError as err:
1056 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1060 class DnTests(TestCase):
1063 super(DnTests, self).setUp()
1064 self.testdir = tempdir()
1065 self.filename = os.path.join(self.testdir, "test.ldb")
1066 self.ldb = ldb.Ldb(self.filename)
1069 shutil.rmtree(self.testdir)
1070 super(DnTests, self).tearDown()
1072 def test_set_dn_invalid(self):
1076 self.assertRaises(TypeError, assign)
1079 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1080 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1081 self.assertEqual(x, y)
1082 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1083 self.assertNotEqual(x, y)
1086 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1087 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1089 def test_repr(self):
1090 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1091 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1093 def test_get_casefold(self):
1094 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1095 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1097 def test_validate(self):
1098 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1099 self.assertTrue(x.validate())
1101 def test_parent(self):
1102 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1103 self.assertEqual("bar=bloe", x.parent().__str__())
1105 def test_parent_nonexistent(self):
1106 x = ldb.Dn(self.ldb, "@BLA")
1107 self.assertEqual(None, x.parent())
1109 def test_is_valid(self):
1110 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1111 self.assertTrue(x.is_valid())
1112 x = ldb.Dn(self.ldb, "")
1113 self.assertTrue(x.is_valid())
1115 def test_is_special(self):
1116 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1117 self.assertFalse(x.is_special())
1118 x = ldb.Dn(self.ldb, "@FOOBAR")
1119 self.assertTrue(x.is_special())
1121 def test_check_special(self):
1122 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1123 self.assertFalse(x.check_special("FOOBAR"))
1124 x = ldb.Dn(self.ldb, "@FOOBAR")
1125 self.assertTrue(x.check_special("@FOOBAR"))
1128 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1129 self.assertEqual(2, len(x))
1130 x = ldb.Dn(self.ldb, "dc=foo21")
1131 self.assertEqual(1, len(x))
1133 def test_add_child(self):
1134 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1135 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1136 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1138 def test_add_base(self):
1139 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1140 base = ldb.Dn(self.ldb, "bla=bloe")
1141 self.assertTrue(x.add_base(base))
1142 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1144 def test_add_child_str(self):
1145 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1146 self.assertTrue(x.add_child("bla=bloe"))
1147 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1149 def test_add_base_str(self):
1150 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1152 self.assertTrue(x.add_base(base))
1153 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1156 x = ldb.Dn(self.ldb, "dc=foo24")
1157 y = ldb.Dn(self.ldb, "bar=bla")
1158 self.assertEqual("dc=foo24,bar=bla", str(x + y))
1160 def test_remove_base_components(self):
1161 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1162 x.remove_base_components(len(x)-1)
1163 self.assertEqual("dc=foo24", str(x))
1165 def test_parse_ldif(self):
1166 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1168 self.assertEqual("foo=bar", str(msg[1].dn))
1169 self.assertTrue(isinstance(msg[1], ldb.Message))
1170 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1171 self.assertEqual("dn: foo=bar\n\n", ldif)
1173 def test_parse_ldif_more(self):
1174 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1176 self.assertEqual("foo=bar", str(msg[1].dn))
1178 self.assertEqual("bar=bar", str(msg[1].dn))
1180 def test_canonical_string(self):
1181 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1182 self.assertEqual("/bloe/foo25", x.canonical_str())
1184 def test_canonical_ex_string(self):
1185 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1186 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1188 def test_ldb_is_child_of(self):
1189 """Testing ldb_dn_compare_dn"""
1190 dn1 = ldb.Dn(self.ldb, "dc=base")
1191 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1192 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1193 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1195 self.assertTrue(dn2.is_child_of(dn1))
1196 self.assertTrue(dn4.is_child_of(dn1))
1197 self.assertTrue(dn4.is_child_of(dn3))
1198 self.assertFalse(dn3.is_child_of(dn2))
1199 self.assertFalse(dn1.is_child_of(dn4))
1201 def test_ldb_is_child_of_str(self):
1202 """Testing ldb_dn_compare_dn"""
1204 dn2_str = "cn=foo,dc=base"
1205 dn3_str = "cn=bar,dc=base"
1206 dn4_str = "cn=baz,cn=bar,dc=base"
1208 dn1 = ldb.Dn(self.ldb, dn1_str)
1209 dn2 = ldb.Dn(self.ldb, dn2_str)
1210 dn3 = ldb.Dn(self.ldb, dn3_str)
1211 dn4 = ldb.Dn(self.ldb, dn4_str)
1213 self.assertTrue(dn2.is_child_of(dn1_str))
1214 self.assertTrue(dn4.is_child_of(dn1_str))
1215 self.assertTrue(dn4.is_child_of(dn3_str))
1216 self.assertFalse(dn3.is_child_of(dn2_str))
1217 self.assertFalse(dn1.is_child_of(dn4_str))
1219 def test_get_component_name(self):
1220 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1221 self.assertEqual(dn.get_component_name(0), 'cn')
1222 self.assertEqual(dn.get_component_name(1), 'dc')
1223 self.assertEqual(dn.get_component_name(2), None)
1224 self.assertEqual(dn.get_component_name(-1), None)
1226 def test_get_component_value(self):
1227 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1228 self.assertEqual(dn.get_component_value(0), 'foo')
1229 self.assertEqual(dn.get_component_value(1), 'base')
1230 self.assertEqual(dn.get_component_name(2), None)
1231 self.assertEqual(dn.get_component_name(-1), None)
1233 def test_set_component(self):
1234 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1235 dn.set_component(0, 'cn', 'bar')
1236 self.assertEqual(str(dn), "cn=bar,dc=base")
1237 dn.set_component(1, 'o', 'asep')
1238 self.assertEqual(str(dn), "cn=bar,o=asep")
1239 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1240 self.assertEqual(str(dn), "cn=bar,o=asep")
1241 dn.set_component(1, 'o', 'a,b+c')
1242 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1244 def test_set_component_bytes(self):
1245 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1246 dn.set_component(0, 'cn', b'bar')
1247 self.assertEqual(str(dn), "cn=bar,dc=base")
1248 dn.set_component(1, 'o', b'asep')
1249 self.assertEqual(str(dn), "cn=bar,o=asep")
1251 def test_set_component_none(self):
1252 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1253 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1255 def test_get_extended_component_null(self):
1256 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1257 self.assertEqual(dn.get_extended_component("TEST"), None)
1259 def test_get_extended_component(self):
1260 self.ldb._register_test_extensions()
1261 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1262 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1264 def test_set_extended_component(self):
1265 self.ldb._register_test_extensions()
1266 dn = ldb.Dn(self.ldb, "dc=base")
1267 dn.set_extended_component("TEST", "foo")
1268 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1269 dn.set_extended_component("TEST", b"bar")
1270 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1272 def test_extended_str(self):
1273 self.ldb._register_test_extensions()
1274 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1275 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1277 def test_get_rdn_name(self):
1278 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1279 self.assertEqual(dn.get_rdn_name(), 'cn')
1281 def test_get_rdn_value(self):
1282 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1283 self.assertEqual(dn.get_rdn_value(), 'foo')
1285 def test_get_casefold(self):
1286 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1287 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1289 def test_get_linearized(self):
1290 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1291 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1293 def test_is_null(self):
1294 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1295 self.assertFalse(dn.is_null())
1297 dn = ldb.Dn(self.ldb, '')
1298 self.assertTrue(dn.is_null())
1300 class LdbMsgTests(TestCase):
1303 super(LdbMsgTests, self).setUp()
1304 self.msg = ldb.Message()
1305 self.testdir = tempdir()
1306 self.filename = os.path.join(self.testdir, "test.ldb")
1309 shutil.rmtree(self.testdir)
1310 super(LdbMsgTests, self).tearDown()
1312 def test_init_dn(self):
1313 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1314 self.assertEqual("dc=foo27", str(self.msg.dn))
1316 def test_iter_items(self):
1317 self.assertEqual(0, len(self.msg.items()))
1318 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo28")
1319 self.assertEqual(1, len(self.msg.items()))
1321 def test_repr(self):
1322 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "dc=foo29")
1323 self.msg["dc"] = b"foo"
1325 self.assertIn(repr(self.msg), [
1326 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1327 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1329 self.assertIn(repr(self.msg.text), [
1330 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1331 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1336 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1338 repr(self.msg.text),
1339 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1342 self.assertEqual(0, len(self.msg))
1344 def test_notpresent(self):
1345 self.assertRaises(KeyError, lambda: self.msg["foo"])
1351 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1353 def test_add_text(self):
1354 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1356 def test_elements_empty(self):
1357 self.assertEqual([], self.msg.elements())
1359 def test_elements(self):
1360 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1362 self.assertEqual([el], self.msg.elements())
1363 self.assertEqual([el.text], self.msg.text.elements())
1365 def test_add_value(self):
1366 self.assertEqual(0, len(self.msg))
1367 self.msg["foo"] = [b"foo"]
1368 self.assertEqual(1, len(self.msg))
1370 def test_add_value_text(self):
1371 self.assertEqual(0, len(self.msg))
1372 self.msg["foo"] = ["foo"]
1373 self.assertEqual(1, len(self.msg))
1375 def test_add_value_multiple(self):
1376 self.assertEqual(0, len(self.msg))
1377 self.msg["foo"] = [b"foo", b"bla"]
1378 self.assertEqual(1, len(self.msg))
1379 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
1381 def test_add_value_multiple_text(self):
1382 self.assertEqual(0, len(self.msg))
1383 self.msg["foo"] = ["foo", "bla"]
1384 self.assertEqual(1, len(self.msg))
1385 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
1387 def test_set_value(self):
1388 self.msg["foo"] = [b"fool"]
1389 self.assertEqual([b"fool"], list(self.msg["foo"]))
1390 self.msg["foo"] = [b"bar"]
1391 self.assertEqual([b"bar"], list(self.msg["foo"]))
1393 def test_set_value_text(self):
1394 self.msg["foo"] = ["fool"]
1395 self.assertEqual(["fool"], list(self.msg.text["foo"]))
1396 self.msg["foo"] = ["bar"]
1397 self.assertEqual(["bar"], list(self.msg.text["foo"]))
1399 def test_keys(self):
1400 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1401 self.msg["foo"] = [b"bla"]
1402 self.msg["bar"] = [b"bla"]
1403 self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
1405 def test_keys_text(self):
1406 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1407 self.msg["foo"] = ["bla"]
1408 self.msg["bar"] = ["bla"]
1409 self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
1412 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1413 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
1415 def test_get_dn(self):
1416 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1417 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
1419 def test_dn_text(self):
1420 self.msg.text.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1421 self.assertEqual("@BASEINFO", str(self.msg.dn))
1422 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
1424 def test_get_dn_text(self):
1425 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1426 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
1427 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
1429 def test_get_invalid(self):
1430 self.msg.dn = ldb.Dn(ldb.Ldb(self.filename), "@BASEINFO")
1431 self.assertRaises(TypeError, self.msg.get, 42)
1433 def test_get_other(self):
1434 self.msg["foo"] = [b"bar"]
1435 self.assertEqual(b"bar", self.msg.get("foo")[0])
1436 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
1437 self.assertEqual(None, self.msg.get("foo", idx=1))
1438 self.assertEqual("", self.msg.get("foo", default='', idx=1))
1440 def test_get_other_text(self):
1441 self.msg["foo"] = ["bar"]
1442 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
1443 self.assertEqual("bar", self.msg.text.get("foo")[0])
1444 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
1445 self.assertEqual(None, self.msg.get("foo", idx=1))
1446 self.assertEqual("", self.msg.get("foo", default='', idx=1))
1448 def test_get_default(self):
1449 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
1450 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
1452 def test_get_default_text(self):
1453 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
1454 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
1456 def test_get_unknown(self):
1457 self.assertEqual(None, self.msg.get("lalalala"))
1459 def test_get_unknown_text(self):
1460 self.assertEqual(None, self.msg.text.get("lalalala"))
1462 def test_msg_diff(self):
1464 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
1465 msg1 = next(msgs)[1]
1466 msg2 = next(msgs)[1]
1467 msgdiff = l.msg_diff(msg1, msg2)
1468 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
1469 self.assertRaises(KeyError, lambda: msgdiff["foo"])
1470 self.assertEqual(1, len(msgdiff))
1472 def test_equal_empty(self):
1473 msg1 = ldb.Message()
1474 msg2 = ldb.Message()
1475 self.assertEqual(msg1, msg2)
1477 def test_equal_simplel(self):
1478 db = ldb.Ldb(self.filename)
1479 msg1 = ldb.Message()
1480 msg1.dn = ldb.Dn(db, "foo=bar")
1481 msg2 = ldb.Message()
1482 msg2.dn = ldb.Dn(db, "foo=bar")
1483 self.assertEqual(msg1, msg2)
1484 msg1['foo'] = b'bar'
1485 msg2['foo'] = b'bar'
1486 self.assertEqual(msg1, msg2)
1487 msg2['foo'] = b'blie'
1488 self.assertNotEqual(msg1, msg2)
1489 msg2['foo'] = b'blie'
1491 def test_from_dict(self):
1492 rec = {"dn": "dc=fromdict",
1493 "a1": [b"a1-val1", b"a1-val1"]}
1495 # check different types of input Flags
1496 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
1497 m = ldb.Message.from_dict(l, rec, flags)
1498 self.assertEqual(rec["a1"], list(m["a1"]))
1499 self.assertEqual(flags, m["a1"].flags())
1500 # check input params
1501 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
1502 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
1503 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
1504 # Message.from_dict expects dictionary with 'dn'
1505 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
1506 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
1508 def test_from_dict_text(self):
1509 rec = {"dn": "dc=fromdict",
1510 "a1": ["a1-val1", "a1-val1"]}
1512 # check different types of input Flags
1513 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
1514 m = ldb.Message.from_dict(l, rec, flags)
1515 self.assertEqual(rec["a1"], list(m.text["a1"]))
1516 self.assertEqual(flags, m.text["a1"].flags())
1517 # check input params
1518 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
1519 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
1520 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
1521 # Message.from_dict expects dictionary with 'dn'
1522 err_rec = {"a1": ["a1-val1", "a1-val1"]}
1523 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
1525 def test_copy_add_message_element(self):
1527 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
1528 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
1532 self.assertEqual(mto["1"], m["1"])
1533 self.assertEqual(mto["2"], m["2"])
1537 self.assertEqual(mto["1"], m["1"])
1538 self.assertEqual(mto["2"], m["2"])
1540 def test_copy_add_message_element_text(self):
1542 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
1543 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
1547 self.assertEqual(mto["1"], m.text["1"])
1548 self.assertEqual(mto["2"], m.text["2"])
1552 self.assertEqual(mto.text["1"], m.text["1"])
1553 self.assertEqual(mto.text["2"], m.text["2"])
1554 self.assertEqual(mto["1"], m["1"])
1555 self.assertEqual(mto["2"], m["2"])
1558 class MessageElementTests(TestCase):
1560 def test_cmp_element(self):
1561 x = ldb.MessageElement([b"foo"])
1562 y = ldb.MessageElement([b"foo"])
1563 z = ldb.MessageElement([b"bzr"])
1564 self.assertEqual(x, y)
1565 self.assertNotEqual(x, z)
1567 def test_cmp_element_text(self):
1568 x = ldb.MessageElement([b"foo"])
1569 y = ldb.MessageElement(["foo"])
1570 self.assertEqual(x, y)
1572 def test_create_iterable(self):
1573 x = ldb.MessageElement([b"foo"])
1574 self.assertEqual([b"foo"], list(x))
1575 self.assertEqual(["foo"], list(x.text))
1577 def test_repr(self):
1578 x = ldb.MessageElement([b"foo"])
1580 self.assertEqual("MessageElement([b'foo'])", repr(x))
1581 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
1583 self.assertEqual("MessageElement(['foo'])", repr(x))
1584 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
1585 x = ldb.MessageElement([b"foo", b"bla"])
1586 self.assertEqual(2, len(x))
1588 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
1589 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
1591 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
1592 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
1594 def test_get_item(self):
1595 x = ldb.MessageElement([b"foo", b"bar"])
1596 self.assertEqual(b"foo", x[0])
1597 self.assertEqual(b"bar", x[1])
1598 self.assertEqual(b"bar", x[-1])
1599 self.assertRaises(IndexError, lambda: x[45])
1601 def test_get_item_text(self):
1602 x = ldb.MessageElement(["foo", "bar"])
1603 self.assertEqual("foo", x.text[0])
1604 self.assertEqual("bar", x.text[1])
1605 self.assertEqual("bar", x.text[-1])
1606 self.assertRaises(IndexError, lambda: x[45])
1609 x = ldb.MessageElement([b"foo", b"bar"])
1610 self.assertEqual(2, len(x))
1613 x = ldb.MessageElement([b"foo", b"bar"])
1614 y = ldb.MessageElement([b"foo", b"bar"])
1615 self.assertEqual(y, x)
1616 x = ldb.MessageElement([b"foo"])
1617 self.assertNotEqual(y, x)
1618 y = ldb.MessageElement([b"foo"])
1619 self.assertEqual(y, x)
1621 def test_extended(self):
1622 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1624 self.assertEqual("MessageElement([b'456'])", repr(el))
1625 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
1627 self.assertEqual("MessageElement(['456'])", repr(el))
1628 self.assertEqual("MessageElement(['456']).text", repr(el.text))
1630 def test_bad_text(self):
1631 el = ldb.MessageElement(b'\xba\xdd')
1632 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
1635 class ModuleTests(TestCase):
1638 super(ModuleTests, self).setUp()
1639 self.testdir = tempdir()
1640 self.filename = os.path.join(self.testdir, "test.ldb")
1641 self.ldb = ldb.Ldb(self.filename)
1644 shutil.rmtree(self.testdir)
1645 super(ModuleTests, self).setUp()
1647 def test_register_module(self):
1648 class ExampleModule:
1650 ldb.register_module(ExampleModule)
1652 def test_use_module(self):
1654 class ExampleModule:
1657 def __init__(self, ldb, next):
1661 def search(self, *args, **kwargs):
1662 return self.next.search(*args, **kwargs)
1664 def request(self, *args, **kwargs):
1667 ldb.register_module(ExampleModule)
1668 l = ldb.Ldb(self.filename)
1669 l.add({"dn": "@MODULES", "@LIST": "bla"})
1670 self.assertEqual([], ops)
1671 l = ldb.Ldb(self.filename)
1672 self.assertEqual(["init"], ops)
1674 class LdbResultTests(TestCase):
1677 super(LdbResultTests, self).setUp()
1678 self.testdir = tempdir()
1679 self.filename = os.path.join(self.testdir, "test.ldb")
1680 self.l = ldb.Ldb(self.filename)
1681 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org"})
1682 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins"})
1683 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users"})
1684 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1"})
1685 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2"})
1686 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3"})
1687 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4"})
1688 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5"})
1689 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6"})
1690 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7"})
1691 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8"})
1692 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9"})
1693 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10"})
1696 shutil.rmtree(self.testdir)
1697 super(LdbResultTests, self).tearDown()
1699 def test_return_type(self):
1700 res = self.l.search()
1701 self.assertEqual(str(res), "<ldb result>")
1703 def test_get_msgs(self):
1704 res = self.l.search()
1707 def test_get_controls(self):
1708 res = self.l.search()
1711 def test_get_referals(self):
1712 res = self.l.search()
1715 def test_iter_msgs(self):
1717 for l in self.l.search().msgs:
1718 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1720 self.assertTrue(found)
1722 def test_iter_msgs_count(self):
1723 self.assertTrue(self.l.search().count > 0)
1724 # 13 objects has been added to the DC=SAMBA, DC=ORG
1725 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
1727 def test_iter_controls(self):
1728 res = self.l.search().controls
1731 def test_create_control(self):
1732 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
1733 c = ldb.Control(self.l, "relax:1")
1734 self.assertEqual(c.critical, True)
1735 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
1737 def test_iter_refs(self):
1738 res = self.l.search().referals
1741 def test_search_sequence_msgs(self):
1743 res = self.l.search().msgs
1745 for i in range(0, len(res)):
1747 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1749 self.assertTrue(found)
1751 def test_search_as_iter(self):
1753 res = self.l.search()
1756 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1758 self.assertTrue(found)
1760 def test_search_iter(self):
1762 res = self.l.search_iterator()
1765 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1767 self.assertTrue(found)
1770 # Show that search results can't see into a transaction
1771 def test_search_against_trans(self):
1774 (r1, w1) = os.pipe()
1776 (r2, w2) = os.pipe()
1778 # For the first element, fork a child that will
1782 # In the child, re-open
1786 child_ldb = ldb.Ldb(self.filename)
1787 # start a transaction
1788 child_ldb.transaction_start()
1791 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
1792 "name": b"samba.org"})
1794 os.write(w1, b"added")
1796 # Now wait for the search to be done
1801 child_ldb.transaction_commit()
1802 except LdbError as err:
1803 # We print this here to see what went wrong in the child
1807 os.write(w1, b"transaction")
1810 self.assertEqual(os.read(r1, 5), b"added")
1812 # This should not turn up until the transaction is concluded
1813 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1814 scope=ldb.SCOPE_BASE)
1815 self.assertEqual(len(res11), 0)
1817 os.write(w2, b"search")
1819 # Now wait for the transaction to be done. This should
1820 # deadlock, but the search doesn't hold a read lock for the
1821 # iterator lifetime currently.
1822 self.assertEqual(os.read(r1, 11), b"transaction")
1824 # This should now turn up, as the transaction is over
1825 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1826 scope=ldb.SCOPE_BASE)
1827 self.assertEqual(len(res11), 1)
1829 self.assertFalse(found11)
1831 (got_pid, status) = os.waitpid(pid, 0)
1832 self.assertEqual(got_pid, pid)
1835 def test_search_iter_against_trans(self):
1839 # We need to hold this iterator open to hold the all-record
1841 res = self.l.search_iterator()
1843 (r1, w1) = os.pipe()
1845 (r2, w2) = os.pipe()
1847 # For the first element, with the sequence open (which
1848 # means with ldb locks held), fork a child that will
1852 # In the child, re-open
1857 child_ldb = ldb.Ldb(self.filename)
1858 # start a transaction
1859 child_ldb.transaction_start()
1862 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
1863 "name": b"samba.org"})
1865 os.write(w1, b"added")
1867 # Now wait for the search to be done
1872 child_ldb.transaction_commit()
1873 except LdbError as err:
1874 # We print this here to see what went wrong in the child
1878 os.write(w1, b"transaction")
1881 self.assertEqual(os.read(r1, 5), b"added")
1883 # This should not turn up until the transaction is concluded
1884 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1885 scope=ldb.SCOPE_BASE)
1886 self.assertEqual(len(res11), 0)
1888 os.write(w2, b"search")
1890 # allow the transaction to start
1893 # This should not turn up until the search finishes and
1894 # removed the read lock, but for ldb_tdb that happened as soon
1895 # as we called the first res.next()
1896 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1897 scope=ldb.SCOPE_BASE)
1898 self.assertEqual(len(res11), 0)
1900 # These results are all collected at the first next(res) call
1902 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
1904 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
1907 # Now wait for the transaction to be done.
1908 self.assertEqual(os.read(r1, 11), b"transaction")
1910 # This should now turn up, as the transaction is over and all
1911 # read locks are gone
1912 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
1913 scope=ldb.SCOPE_BASE)
1914 self.assertEqual(len(res11), 1)
1916 self.assertTrue(found)
1917 self.assertFalse(found11)
1919 (got_pid, status) = os.waitpid(pid, 0)
1920 self.assertEqual(got_pid, pid)
1923 class BadTypeTests(TestCase):
1924 def test_control(self):
1926 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
1927 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
1929 def test_modify(self):
1931 dn = ldb.Dn(l, 'a=b')
1933 self.assertRaises(TypeError, l.modify, '<bad type>')
1934 self.assertRaises(TypeError, l.modify, m, '<bad type>')
1938 dn = ldb.Dn(l, 'a=b')
1940 self.assertRaises(TypeError, l.add, '<bad type>')
1941 self.assertRaises(TypeError, l.add, m, '<bad type>')
1943 def test_delete(self):
1945 dn = ldb.Dn(l, 'a=b')
1946 self.assertRaises(TypeError, l.add, '<bad type>')
1947 self.assertRaises(TypeError, l.add, dn, '<bad type>')
1949 def test_rename(self):
1951 dn = ldb.Dn(l, 'a=b')
1952 self.assertRaises(TypeError, l.add, '<bad type>', dn)
1953 self.assertRaises(TypeError, l.add, dn, '<bad type>')
1954 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
1956 def test_search(self):
1958 self.assertRaises(TypeError, l.search, base=1234)
1959 self.assertRaises(TypeError, l.search, scope='<bad type>')
1960 self.assertRaises(TypeError, l.search, expression=1234)
1961 self.assertRaises(TypeError, l.search, attrs='<bad type>')
1962 self.assertRaises(TypeError, l.search, controls='<bad type>')
1965 class VersionTests(TestCase):
1967 def test_version(self):
1968 self.assertTrue(isinstance(ldb.__version__, str))
1971 if __name__ == '__main__':
1973 unittest.TestProgram()