2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
13 PY3 = sys.version_info > (3, 0)
22 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
25 return tempfile.mkdtemp(dir=dir_prefix)
28 class NoContextTests(TestCase):
30 def test_valid_attr_name(self):
31 self.assertTrue(ldb.valid_attr_name("foo"))
32 self.assertFalse(ldb.valid_attr_name("24foo"))
34 def test_timestring(self):
35 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
38 def test_string_to_time(self):
39 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
42 def test_binary_encode(self):
43 encoded = ldb.binary_encode(b'test\\x')
44 decoded = ldb.binary_decode(encoded)
45 self.assertEqual(decoded, b'test\\x')
47 encoded2 = ldb.binary_encode('test\\x')
48 self.assertEqual(encoded2, encoded)
51 class LdbBaseTest(TestCase):
53 super(LdbBaseTest, self).setUp()
55 if self.prefix is None:
56 self.prefix = TDB_PREFIX
57 except AttributeError:
58 self.prefix = TDB_PREFIX
61 super(LdbBaseTest, self).tearDown()
64 return self.prefix + self.filename
67 if self.prefix == MDB_PREFIX:
73 class SimpleLdb(LdbBaseTest):
76 super(SimpleLdb, self).setUp()
77 self.testdir = tempdir()
78 self.filename = os.path.join(self.testdir, "test.ldb")
79 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
81 self.ldb.add(self.index)
82 except AttributeError:
86 shutil.rmtree(self.testdir)
87 super(SimpleLdb, self).tearDown()
88 # Ensure the LDB is closed now, so we close the FD
91 def test_connect(self):
92 ldb.Ldb(self.url(), flags=self.flags())
94 def test_connect_none(self):
97 def test_connect_later(self):
99 x.connect(self.url(), flags=self.flags())
103 self.assertTrue(repr(x).startswith("<ldb connection"))
105 def test_set_create_perms(self):
107 x.set_create_perms(0o600)
109 def test_modules_none(self):
111 self.assertEqual([], x.modules())
113 def test_modules_tdb(self):
114 x = ldb.Ldb(self.url(), flags=self.flags())
115 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
117 def test_firstmodule_none(self):
119 self.assertEqual(x.firstmodule, None)
121 def test_firstmodule_tdb(self):
122 x = ldb.Ldb(self.url(), flags=self.flags())
124 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
126 def test_search(self):
127 l = ldb.Ldb(self.url(), flags=self.flags())
128 self.assertEqual(len(l.search()), 0)
130 def test_search_controls(self):
131 l = ldb.Ldb(self.url(), flags=self.flags())
132 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
134 def test_search_attrs(self):
135 l = ldb.Ldb(self.url(), flags=self.flags())
136 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
138 def test_search_string_dn(self):
139 l = ldb.Ldb(self.url(), flags=self.flags())
140 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
142 def test_search_attr_string(self):
143 l = ldb.Ldb(self.url(), flags=self.flags())
144 self.assertRaises(TypeError, l.search, attrs="dc")
145 self.assertRaises(TypeError, l.search, attrs=b"dc")
147 def test_opaque(self):
148 l = ldb.Ldb(self.url(), flags=self.flags())
149 l.set_opaque("my_opaque", l)
150 self.assertTrue(l.get_opaque("my_opaque") is not None)
151 self.assertEqual(None, l.get_opaque("unknown"))
153 def test_search_scope_base_empty_db(self):
154 l = ldb.Ldb(self.url(), flags=self.flags())
155 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
158 def test_search_scope_onelevel_empty_db(self):
159 l = ldb.Ldb(self.url(), flags=self.flags())
160 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
161 ldb.SCOPE_ONELEVEL)), 0)
163 def test_delete(self):
164 l = ldb.Ldb(self.url(), flags=self.flags())
165 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
167 def test_delete_w_unhandled_ctrl(self):
168 l = ldb.Ldb(self.url(), flags=self.flags())
170 m.dn = ldb.Dn(l, "dc=foo1")
172 m["objectUUID"] = b"0123456789abcdef"
174 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
177 def test_contains(self):
179 l = ldb.Ldb(name, flags=self.flags())
180 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
181 l = ldb.Ldb(name, flags=self.flags())
183 m.dn = ldb.Dn(l, "dc=foo3")
185 m["objectUUID"] = b"0123456789abcdef"
188 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
189 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
193 def test_get_config_basedn(self):
194 l = ldb.Ldb(self.url(), flags=self.flags())
195 self.assertEqual(None, l.get_config_basedn())
197 def test_get_root_basedn(self):
198 l = ldb.Ldb(self.url(), flags=self.flags())
199 self.assertEqual(None, l.get_root_basedn())
201 def test_get_schema_basedn(self):
202 l = ldb.Ldb(self.url(), flags=self.flags())
203 self.assertEqual(None, l.get_schema_basedn())
205 def test_get_default_basedn(self):
206 l = ldb.Ldb(self.url(), flags=self.flags())
207 self.assertEqual(None, l.get_default_basedn())
210 l = ldb.Ldb(self.url(), flags=self.flags())
212 m.dn = ldb.Dn(l, "dc=foo4")
214 m["objectUUID"] = b"0123456789abcdef"
215 self.assertEqual(len(l.search()), 0)
218 self.assertEqual(len(l.search()), 1)
220 l.delete(ldb.Dn(l, "dc=foo4"))
222 def test_search_iterator(self):
223 l = ldb.Ldb(self.url(), flags=self.flags())
224 s = l.search_iterator()
230 except RuntimeError as re:
235 except RuntimeError as re:
240 except RuntimeError as re:
243 s = l.search_iterator()
246 self.assertTrue(isinstance(me, ldb.Message))
249 self.assertEqual(len(r), 0)
250 self.assertEqual(count, 0)
253 m1.dn = ldb.Dn(l, "dc=foo4")
255 m1["objectUUID"] = b"0123456789abcdef"
258 s = l.search_iterator()
261 self.assertTrue(isinstance(me, ldb.Message))
265 self.assertEqual(len(r), 0)
266 self.assertEqual(len(msgs), 1)
267 self.assertEqual(msgs[0].dn, m1.dn)
270 m2.dn = ldb.Dn(l, "dc=foo5")
272 m2["objectUUID"] = b"0123456789abcdee"
275 s = l.search_iterator()
278 self.assertTrue(isinstance(me, ldb.Message))
282 self.assertEqual(len(r), 0)
283 self.assertEqual(len(msgs), 2)
284 if msgs[0].dn == m1.dn:
285 self.assertEqual(msgs[0].dn, m1.dn)
286 self.assertEqual(msgs[1].dn, m2.dn)
288 self.assertEqual(msgs[0].dn, m2.dn)
289 self.assertEqual(msgs[1].dn, m1.dn)
291 s = l.search_iterator()
294 self.assertTrue(isinstance(me, ldb.Message))
301 except RuntimeError as re:
304 self.assertTrue(isinstance(me, ldb.Message))
312 self.assertEqual(len(r), 0)
313 self.assertEqual(len(msgs), 2)
314 if msgs[0].dn == m1.dn:
315 self.assertEqual(msgs[0].dn, m1.dn)
316 self.assertEqual(msgs[1].dn, m2.dn)
318 self.assertEqual(msgs[0].dn, m2.dn)
319 self.assertEqual(msgs[1].dn, m1.dn)
321 l.delete(ldb.Dn(l, "dc=foo4"))
322 l.delete(ldb.Dn(l, "dc=foo5"))
324 def test_add_text(self):
325 l = ldb.Ldb(self.url(), flags=self.flags())
327 m.dn = ldb.Dn(l, "dc=foo4")
329 m["objectUUID"] = b"0123456789abcdef"
330 self.assertEqual(len(l.search()), 0)
333 self.assertEqual(len(l.search()), 1)
335 l.delete(ldb.Dn(l, "dc=foo4"))
337 def test_add_w_unhandled_ctrl(self):
338 l = ldb.Ldb(self.url(), flags=self.flags())
340 m.dn = ldb.Dn(l, "dc=foo4")
342 self.assertEqual(len(l.search()), 0)
343 self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
345 def test_add_dict(self):
346 l = ldb.Ldb(self.url(), flags=self.flags())
347 m = {"dn": ldb.Dn(l, "dc=foo5"),
349 "objectUUID": b"0123456789abcdef"}
350 self.assertEqual(len(l.search()), 0)
353 self.assertEqual(len(l.search()), 1)
355 l.delete(ldb.Dn(l, "dc=foo5"))
357 def test_add_dict_text(self):
358 l = ldb.Ldb(self.url(), flags=self.flags())
359 m = {"dn": ldb.Dn(l, "dc=foo5"),
361 "objectUUID": b"0123456789abcdef"}
362 self.assertEqual(len(l.search()), 0)
365 self.assertEqual(len(l.search()), 1)
367 l.delete(ldb.Dn(l, "dc=foo5"))
369 def test_add_dict_string_dn(self):
370 l = ldb.Ldb(self.url(), flags=self.flags())
371 m = {"dn": "dc=foo6", "bla": b"bla",
372 "objectUUID": b"0123456789abcdef"}
373 self.assertEqual(len(l.search()), 0)
376 self.assertEqual(len(l.search()), 1)
378 l.delete(ldb.Dn(l, "dc=foo6"))
380 def test_add_dict_bytes_dn(self):
381 l = ldb.Ldb(self.url(), flags=self.flags())
382 m = {"dn": b"dc=foo6", "bla": b"bla",
383 "objectUUID": b"0123456789abcdef"}
384 self.assertEqual(len(l.search()), 0)
387 self.assertEqual(len(l.search()), 1)
389 l.delete(ldb.Dn(l, "dc=foo6"))
391 def test_rename(self):
392 l = ldb.Ldb(self.url(), flags=self.flags())
394 m.dn = ldb.Dn(l, "dc=foo7")
396 m["objectUUID"] = b"0123456789abcdef"
397 self.assertEqual(len(l.search()), 0)
400 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
401 self.assertEqual(len(l.search()), 1)
403 l.delete(ldb.Dn(l, "dc=bar"))
405 def test_rename_string_dns(self):
406 l = ldb.Ldb(self.url(), flags=self.flags())
408 m.dn = ldb.Dn(l, "dc=foo8")
410 m["objectUUID"] = b"0123456789abcdef"
411 self.assertEqual(len(l.search()), 0)
413 self.assertEqual(len(l.search()), 1)
415 l.rename("dc=foo8", "dc=bar")
416 self.assertEqual(len(l.search()), 1)
418 l.delete(ldb.Dn(l, "dc=bar"))
420 def test_empty_dn(self):
421 l = ldb.Ldb(self.url(), flags=self.flags())
422 self.assertEqual(0, len(l.search()))
424 m.dn = ldb.Dn(l, "dc=empty")
425 m["objectUUID"] = b"0123456789abcdef"
428 self.assertEqual(1, len(rm))
429 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
433 self.assertEqual(1, len(rm))
434 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
436 rm = l.search(m.dn, attrs=["blah"])
437 self.assertEqual(1, len(rm))
438 self.assertEqual(0, len(rm[0]))
440 def test_modify_delete(self):
441 l = ldb.Ldb(self.url(), flags=self.flags())
443 m.dn = ldb.Dn(l, "dc=modifydelete")
445 m["objectUUID"] = b"0123456789abcdef"
447 rm = l.search(m.dn)[0]
448 self.assertEqual([b"1234"], list(rm["bla"]))
451 m.dn = ldb.Dn(l, "dc=modifydelete")
452 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
453 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
456 self.assertEqual(1, len(rm))
457 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
459 rm = l.search(m.dn, attrs=["bla"])
460 self.assertEqual(1, len(rm))
461 self.assertEqual(0, len(rm[0]))
463 l.delete(ldb.Dn(l, "dc=modifydelete"))
465 def test_modify_delete_text(self):
466 l = ldb.Ldb(self.url(), flags=self.flags())
468 m.dn = ldb.Dn(l, "dc=modifydelete")
469 m.text["bla"] = ["1234"]
470 m["objectUUID"] = b"0123456789abcdef"
472 rm = l.search(m.dn)[0]
473 self.assertEqual(["1234"], list(rm.text["bla"]))
476 m.dn = ldb.Dn(l, "dc=modifydelete")
477 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
478 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
481 self.assertEqual(1, len(rm))
482 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
484 rm = l.search(m.dn, attrs=["bla"])
485 self.assertEqual(1, len(rm))
486 self.assertEqual(0, len(rm[0]))
488 l.delete(ldb.Dn(l, "dc=modifydelete"))
490 def test_modify_add(self):
491 l = ldb.Ldb(self.url(), flags=self.flags())
493 m.dn = ldb.Dn(l, "dc=add")
495 m["objectUUID"] = b"0123456789abcdef"
499 m.dn = ldb.Dn(l, "dc=add")
500 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
501 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
503 rm = l.search(m.dn)[0]
504 self.assertEqual(3, len(rm))
505 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
507 l.delete(ldb.Dn(l, "dc=add"))
509 def test_modify_add_text(self):
510 l = ldb.Ldb(self.url(), flags=self.flags())
512 m.dn = ldb.Dn(l, "dc=add")
513 m.text["bla"] = ["1234"]
514 m["objectUUID"] = b"0123456789abcdef"
518 m.dn = ldb.Dn(l, "dc=add")
519 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
520 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
522 rm = l.search(m.dn)[0]
523 self.assertEqual(3, len(rm))
524 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
526 l.delete(ldb.Dn(l, "dc=add"))
528 def test_modify_replace(self):
529 l = ldb.Ldb(self.url(), flags=self.flags())
531 m.dn = ldb.Dn(l, "dc=modify2")
532 m["bla"] = [b"1234", b"456"]
533 m["objectUUID"] = b"0123456789abcdef"
537 m.dn = ldb.Dn(l, "dc=modify2")
538 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
539 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
541 rm = l.search(m.dn)[0]
542 self.assertEqual(3, len(rm))
543 self.assertEqual([b"789"], list(rm["bla"]))
544 rm = l.search(m.dn, attrs=["bla"])[0]
545 self.assertEqual(1, len(rm))
547 l.delete(ldb.Dn(l, "dc=modify2"))
549 def test_modify_replace_text(self):
550 l = ldb.Ldb(self.url(), flags=self.flags())
552 m.dn = ldb.Dn(l, "dc=modify2")
553 m.text["bla"] = ["1234", "456"]
554 m["objectUUID"] = b"0123456789abcdef"
558 m.dn = ldb.Dn(l, "dc=modify2")
559 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
560 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
562 rm = l.search(m.dn)[0]
563 self.assertEqual(3, len(rm))
564 self.assertEqual(["789"], list(rm.text["bla"]))
565 rm = l.search(m.dn, attrs=["bla"])[0]
566 self.assertEqual(1, len(rm))
568 l.delete(ldb.Dn(l, "dc=modify2"))
570 def test_modify_flags_change(self):
571 l = ldb.Ldb(self.url(), flags=self.flags())
573 m.dn = ldb.Dn(l, "dc=add")
575 m["objectUUID"] = b"0123456789abcdef"
579 m.dn = ldb.Dn(l, "dc=add")
580 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
581 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
583 rm = l.search(m.dn)[0]
584 self.assertEqual(3, len(rm))
585 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
587 # Now create another modify, but switch the flags before we do it
588 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
589 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
591 rm = l.search(m.dn, attrs=["bla"])[0]
592 self.assertEqual(1, len(rm))
593 self.assertEqual([b"1234"], list(rm["bla"]))
595 l.delete(ldb.Dn(l, "dc=add"))
597 def test_modify_flags_change_text(self):
598 l = ldb.Ldb(self.url(), flags=self.flags())
600 m.dn = ldb.Dn(l, "dc=add")
601 m.text["bla"] = ["1234"]
602 m["objectUUID"] = b"0123456789abcdef"
606 m.dn = ldb.Dn(l, "dc=add")
607 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
608 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
610 rm = l.search(m.dn)[0]
611 self.assertEqual(3, len(rm))
612 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
614 # Now create another modify, but switch the flags before we do it
615 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
616 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
618 rm = l.search(m.dn, attrs=["bla"])[0]
619 self.assertEqual(1, len(rm))
620 self.assertEqual(["1234"], list(rm.text["bla"]))
622 l.delete(ldb.Dn(l, "dc=add"))
624 def test_transaction_commit(self):
625 l = ldb.Ldb(self.url(), flags=self.flags())
626 l.transaction_start()
627 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
629 m["objectUUID"] = b"0123456789abcdef"
631 l.transaction_commit()
634 def test_transaction_cancel(self):
635 l = ldb.Ldb(self.url(), flags=self.flags())
636 l.transaction_start()
637 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
639 m["objectUUID"] = b"0123456789abcdee"
641 l.transaction_cancel()
642 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
644 def test_set_debug(self):
645 def my_report_fn(level, text):
647 l = ldb.Ldb(self.url(), flags=self.flags())
648 l.set_debug(my_report_fn)
650 def test_zero_byte_string(self):
651 """Testing we do not get trapped in the \0 byte in a property string."""
652 l = ldb.Ldb(self.url(), flags=self.flags())
655 "objectclass" : b"user",
656 "cN" : b"LDAPtestUSER",
657 "givenname" : b"ldap",
658 "displayname" : b"foo\0bar",
659 "objectUUID" : b"0123456789abcdef"
661 res = l.search(expression="(dn=dc=somedn)")
662 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
664 def test_no_crash_broken_expr(self):
665 l = ldb.Ldb(self.url(), flags=self.flags())
666 self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
668 # Run the SimpleLdb tests against an lmdb backend
669 class SimpleLdbLmdb(SimpleLdb):
672 self.prefix = MDB_PREFIX
673 super(SimpleLdbLmdb, self).setUp()
676 super(SimpleLdbLmdb, self).tearDown()
678 class SearchTests(LdbBaseTest):
680 shutil.rmtree(self.testdir)
681 super(SearchTests, self).tearDown()
683 # Ensure the LDB is closed now, so we close the FD
688 super(SearchTests, self).setUp()
689 self.testdir = tempdir()
690 self.filename = os.path.join(self.testdir, "search_test.ldb")
691 self.l = ldb.Ldb(self.url(),
693 options=["modules:rdn_name"])
695 self.l.add(self.index)
696 except AttributeError:
699 self.l.add({"dn": "@ATTRIBUTES",
700 "DC": "CASE_INSENSITIVE"})
702 # Note that we can't use the name objectGUID here, as we
703 # want to stay clear of the objectGUID handler in LDB and
704 # instead use just the 16 bytes raw, which we just keep
705 # to printable chars here for ease of handling.
707 self.l.add({"dn": "DC=SAMBA,DC=ORG",
708 "name": b"samba.org",
709 "objectUUID": b"0123456789abcdef"})
710 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
713 "objectUUID": b"0123456789abcde1"})
714 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
717 "objectUUID": b"0123456789abcde2"})
718 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
721 "objectUUID": b"0123456789abcde3"})
722 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
725 "objectUUID": b"0123456789abcde4"})
726 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
729 "objectUUID": b"0123456789abcde5"})
730 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
733 "objectUUID": b"0123456789abcde6"})
734 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
737 "objectUUID": b"0123456789abcde7"})
738 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
741 "objectUUID": b"0123456789abcde8"})
742 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
745 "objectUUID": b"0123456789abcde9"})
746 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
749 "objectUUID": b"0123456789abcde0"})
750 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
753 "objectUUID": b"0123456789abcdea"})
754 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
757 "objectUUID": b"0123456789abcdeb"})
758 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
761 "objectUUID": b"0123456789abcdec"})
762 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
765 "objectUUID": b"0123456789abcded"})
766 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
769 "objectUUID": b"0123456789abcdee"})
770 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
773 "objectUUID": b"0123456789abcd01"})
774 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
777 "objectUUID": b"0123456789abcd02"})
778 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
781 "objectUUID": b"0123456789abcd03"})
782 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
785 "objectUUID": b"0123456789abcd04"})
786 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
789 "objectUUID": b"0123456789abcd05"})
790 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
793 "objectUUID": b"0123456789abcd06"})
794 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
797 "objectUUID": b"0123456789abcd07"})
798 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
801 "objectUUID": b"0123456789abcd08"})
802 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
805 "objectUUID": b"0123456789abcd09"})
808 """Testing a search"""
810 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
811 scope=ldb.SCOPE_BASE)
812 self.assertEqual(len(res11), 1)
814 def test_base_lower(self):
815 """Testing a search"""
817 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
818 scope=ldb.SCOPE_BASE)
819 self.assertEqual(len(res11), 1)
821 def test_base_or(self):
822 """Testing a search"""
824 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
825 scope=ldb.SCOPE_BASE,
826 expression="(|(ou=ou11)(ou=ou12))")
827 self.assertEqual(len(res11), 1)
829 def test_base_or2(self):
830 """Testing a search"""
832 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
833 scope=ldb.SCOPE_BASE,
834 expression="(|(x=y)(y=b))")
835 self.assertEqual(len(res11), 1)
837 def test_base_and(self):
838 """Testing a search"""
840 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
841 scope=ldb.SCOPE_BASE,
842 expression="(&(ou=ou11)(ou=ou12))")
843 self.assertEqual(len(res11), 0)
845 def test_base_and2(self):
846 """Testing a search"""
848 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
849 scope=ldb.SCOPE_BASE,
850 expression="(&(x=y)(y=a))")
851 self.assertEqual(len(res11), 1)
853 def test_base_false(self):
854 """Testing a search"""
856 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
857 scope=ldb.SCOPE_BASE,
858 expression="(|(ou=ou13)(ou=ou12))")
859 self.assertEqual(len(res11), 0)
861 def test_check_base_false(self):
862 """Testing a search"""
863 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
864 scope=ldb.SCOPE_BASE,
865 expression="(|(ou=ou13)(ou=ou12))")
866 self.assertEqual(len(res11), 0)
868 def test_check_base_error(self):
869 """Testing a search"""
870 checkbaseonsearch = {"dn": "@OPTIONS",
871 "checkBaseOnSearch": b"TRUE"}
873 self.l.add(checkbaseonsearch)
874 except ldb.LdbError as err:
876 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
877 m = ldb.Message.from_dict(self.l,
882 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
883 scope=ldb.SCOPE_BASE,
884 expression="(|(ou=ou13)(ou=ou12))")
885 self.fail("Should have failed on missing base")
886 except ldb.LdbError as err:
888 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
890 def test_subtree_and(self):
891 """Testing a search"""
893 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
894 scope=ldb.SCOPE_SUBTREE,
895 expression="(&(ou=ou11)(ou=ou12))")
896 self.assertEqual(len(res11), 0)
898 def test_subtree_and2(self):
899 """Testing a search"""
901 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
902 scope=ldb.SCOPE_SUBTREE,
903 expression="(&(x=y)(|(y=b)(y=c)))")
904 self.assertEqual(len(res11), 1)
906 def test_subtree_and2_lower(self):
907 """Testing a search"""
909 res11 = self.l.search(base="DC=samba,DC=org",
910 scope=ldb.SCOPE_SUBTREE,
911 expression="(&(x=y)(|(y=b)(y=c)))")
912 self.assertEqual(len(res11), 1)
914 def test_subtree_or(self):
915 """Testing a search"""
917 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
918 scope=ldb.SCOPE_SUBTREE,
919 expression="(|(ou=ou11)(ou=ou12))")
920 self.assertEqual(len(res11), 2)
922 def test_subtree_or2(self):
923 """Testing a search"""
925 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
926 scope=ldb.SCOPE_SUBTREE,
927 expression="(|(x=y)(y=b))")
928 self.assertEqual(len(res11), 20)
930 def test_subtree_or3(self):
931 """Testing a search"""
933 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
934 scope=ldb.SCOPE_SUBTREE,
935 expression="(|(x=y)(y=b)(y=c))")
936 self.assertEqual(len(res11), 22)
938 def test_one_and(self):
939 """Testing a search"""
941 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
942 scope=ldb.SCOPE_ONELEVEL,
943 expression="(&(ou=ou11)(ou=ou12))")
944 self.assertEqual(len(res11), 0)
946 def test_one_and2(self):
947 """Testing a search"""
949 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
950 scope=ldb.SCOPE_ONELEVEL,
951 expression="(&(x=y)(y=b))")
952 self.assertEqual(len(res11), 1)
954 def test_one_or(self):
955 """Testing a search"""
957 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
958 scope=ldb.SCOPE_ONELEVEL,
959 expression="(|(ou=ou11)(ou=ou12))")
960 self.assertEqual(len(res11), 2)
962 def test_one_or2(self):
963 """Testing a search"""
965 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
966 scope=ldb.SCOPE_ONELEVEL,
967 expression="(|(x=y)(y=b))")
968 self.assertEqual(len(res11), 20)
970 def test_one_or2_lower(self):
971 """Testing a search"""
973 res11 = self.l.search(base="DC=samba,DC=org",
974 scope=ldb.SCOPE_ONELEVEL,
975 expression="(|(x=y)(y=b))")
976 self.assertEqual(len(res11), 20)
978 def test_subtree_and_or(self):
979 """Testing a search"""
981 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
982 scope=ldb.SCOPE_SUBTREE,
983 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
984 self.assertEqual(len(res11), 0)
986 def test_subtree_and_or2(self):
987 """Testing a search"""
989 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
990 scope=ldb.SCOPE_SUBTREE,
991 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
992 self.assertEqual(len(res11), 0)
994 def test_subtree_and_or3(self):
995 """Testing a search"""
997 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
998 scope=ldb.SCOPE_SUBTREE,
999 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1000 self.assertEqual(len(res11), 2)
1002 def test_subtree_and_or4(self):
1003 """Testing a search"""
1005 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1006 scope=ldb.SCOPE_SUBTREE,
1007 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1008 self.assertEqual(len(res11), 2)
1010 def test_subtree_and_or5(self):
1011 """Testing a search"""
1013 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1014 scope=ldb.SCOPE_SUBTREE,
1015 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1016 self.assertEqual(len(res11), 1)
1018 def test_subtree_or_and(self):
1019 """Testing a search"""
1021 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1022 scope=ldb.SCOPE_SUBTREE,
1023 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1024 self.assertEqual(len(res11), 10)
1026 def test_subtree_large_and_unique(self):
1027 """Testing a search"""
1029 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1030 scope=ldb.SCOPE_SUBTREE,
1031 expression="(&(ou=ou10)(y=a))")
1032 self.assertEqual(len(res11), 1)
1034 def test_subtree_and_none(self):
1035 """Testing a search"""
1037 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1038 scope=ldb.SCOPE_SUBTREE,
1039 expression="(&(ou=ouX)(y=a))")
1040 self.assertEqual(len(res11), 0)
1042 def test_subtree_and_idx_record(self):
1043 """Testing a search against the index record"""
1045 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1046 scope=ldb.SCOPE_SUBTREE,
1047 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1048 self.assertEqual(len(res11), 0)
1050 def test_subtree_and_idxone_record(self):
1051 """Testing a search against the index record"""
1053 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1054 scope=ldb.SCOPE_SUBTREE,
1055 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1056 self.assertEqual(len(res11), 0)
1058 def test_dn_filter_one(self):
1059 """Testing that a dn= filter succeeds
1060 (or fails with disallowDNFilter
1061 set and IDXGUID or (IDX and not IDXONE) mode)
1062 when the scope is SCOPE_ONELEVEL.
1064 This should be made more consistent, but for now lock in
1069 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1070 scope=ldb.SCOPE_ONELEVEL,
1071 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1072 if hasattr(self, 'disallowDNFilter') and \
1073 hasattr(self, 'IDX') and \
1074 (hasattr(self, 'IDXGUID') or \
1075 ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1076 self.assertEqual(len(res11), 0)
1078 self.assertEqual(len(res11), 1)
1080 def test_dn_filter_subtree(self):
1081 """Testing that a dn= filter succeeds
1082 (or fails with disallowDNFilter set)
1083 when the scope is SCOPE_SUBTREE"""
1085 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1086 scope=ldb.SCOPE_SUBTREE,
1087 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1088 if hasattr(self, 'disallowDNFilter') \
1089 and hasattr(self, 'IDX'):
1090 self.assertEqual(len(res11), 0)
1092 self.assertEqual(len(res11), 1)
1094 def test_dn_filter_base(self):
1095 """Testing that (incorrectly) a dn= filter works
1096 when the scope is SCOPE_BASE"""
1098 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1099 scope=ldb.SCOPE_BASE,
1100 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1102 # At some point we should fix this, but it isn't trivial
1103 self.assertEqual(len(res11), 1)
1106 # Run the search tests against an lmdb backend
1107 class SearchTestsLmdb(SearchTests):
1110 self.prefix = MDB_PREFIX
1111 super(SearchTestsLmdb, self).setUp()
1114 super(SearchTestsLmdb, self).tearDown()
1117 class IndexedSearchTests(SearchTests):
1118 """Test searches using the index, to ensure the index doesn't
1121 super(IndexedSearchTests, self).setUp()
1122 self.l.add({"dn": "@INDEXLIST",
1123 "@IDXATTR": [b"x", b"y", b"ou"]})
1126 class IndexedSearchDnFilterTests(SearchTests):
1127 """Test searches using the index, to ensure the index doesn't
1130 super(IndexedSearchDnFilterTests, self).setUp()
1131 self.l.add({"dn": "@OPTIONS",
1132 "disallowDNFilter": "TRUE"})
1133 self.disallowDNFilter = True
1135 self.l.add({"dn": "@INDEXLIST",
1136 "@IDXATTR": [b"x", b"y", b"ou"]})
1139 class IndexedAndOneLevelSearchTests(SearchTests):
1140 """Test searches using the index including @IDXONE, to ensure
1141 the index doesn't break things"""
1143 super(IndexedAndOneLevelSearchTests, self).setUp()
1144 self.l.add({"dn": "@INDEXLIST",
1145 "@IDXATTR": [b"x", b"y", b"ou"],
1149 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1150 """Test searches using the index including @IDXONE, to ensure
1151 the index doesn't break things"""
1153 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1154 self.l.add({"dn": "@OPTIONS",
1155 "disallowDNFilter": "TRUE"})
1156 self.disallowDNFilter = True
1158 self.l.add({"dn": "@INDEXLIST",
1159 "@IDXATTR": [b"x", b"y", b"ou"],
1164 class GUIDIndexedSearchTests(SearchTests):
1165 """Test searches using the index, to ensure the index doesn't
1168 self.index = {"dn": "@INDEXLIST",
1169 "@IDXATTR": [b"x", b"y", b"ou"],
1170 "@IDXGUID": [b"objectUUID"],
1171 "@IDX_DN_GUID": [b"GUID"]}
1172 super(GUIDIndexedSearchTests, self).setUp()
1178 class GUIDIndexedDNFilterSearchTests(SearchTests):
1179 """Test searches using the index, to ensure the index doesn't
1182 self.index = {"dn": "@INDEXLIST",
1183 "@IDXATTR": [b"x", b"y", b"ou"],
1184 "@IDXGUID": [b"objectUUID"],
1185 "@IDX_DN_GUID": [b"GUID"]}
1186 super(GUIDIndexedDNFilterSearchTests, self).setUp()
1187 self.l.add({"dn": "@OPTIONS",
1188 "disallowDNFilter": "TRUE"})
1189 self.disallowDNFilter = True
1193 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1194 """Test searches using the index including @IDXONE, to ensure
1195 the index doesn't break things"""
1197 self.index = {"dn": "@INDEXLIST",
1198 "@IDXATTR": [b"x", b"y", b"ou"],
1199 "@IDXGUID": [b"objectUUID"],
1200 "@IDX_DN_GUID": [b"GUID"]}
1201 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1202 self.l.add({"dn": "@OPTIONS",
1203 "disallowDNFilter": "TRUE"})
1204 self.disallowDNFilter = True
1209 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1212 self.prefix = MDB_PREFIX
1213 super(GUIDIndexedSearchTestsLmdb, self).setUp()
1216 super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1219 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1222 self.prefix = MDB_PREFIX
1223 super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1226 super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1229 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1232 self.prefix = MDB_PREFIX
1233 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1236 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1239 class AddModifyTests(LdbBaseTest):
1241 shutil.rmtree(self.testdir)
1242 super(AddModifyTests, self).tearDown()
1244 # Ensure the LDB is closed now, so we close the FD
1248 super(AddModifyTests, self).setUp()
1249 self.testdir = tempdir()
1250 self.filename = os.path.join(self.testdir, "add_test.ldb")
1251 self.l = ldb.Ldb(self.url(),
1253 options=["modules:rdn_name"])
1255 self.l.add(self.index)
1256 except AttributeError:
1259 self.l.add({"dn": "DC=SAMBA,DC=ORG",
1260 "name": b"samba.org",
1261 "objectUUID": b"0123456789abcdef"})
1262 self.l.add({"dn": "@ATTRIBUTES",
1263 "objectUUID": "UNIQUE_INDEX"})
1265 def test_add_dup(self):
1266 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1269 "objectUUID": b"0123456789abcde1"})
1271 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1274 "objectUUID": b"0123456789abcde2"})
1275 self.fail("Should have failed adding dupliate entry")
1276 except ldb.LdbError as err:
1278 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1280 def test_add_del_add(self):
1281 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1284 "objectUUID": b"0123456789abcde1"})
1285 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1286 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1289 "objectUUID": b"0123456789abcde2"})
1291 def test_add_move_add(self):
1292 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1295 "objectUUID": b"0123456789abcde1"})
1296 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1297 "OU=DUP2,DC=SAMBA,DC=ORG")
1298 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1301 "objectUUID": b"0123456789abcde2"})
1303 def test_add_move_fail_move_move(self):
1304 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1307 "objectUUID": b"0123456789abcde1"})
1308 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1311 "objectUUID": b"0123456789abcde2"})
1313 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1314 scope=ldb.SCOPE_SUBTREE,
1315 expression="(objectUUID=0123456789abcde1)")
1316 self.assertEqual(len(res2), 1)
1317 self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1319 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1320 scope=ldb.SCOPE_SUBTREE,
1321 expression="(objectUUID=0123456789abcde2)")
1322 self.assertEqual(len(res3), 1)
1323 self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1326 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1327 "OU=DUP2,DC=SAMBA,DC=ORG")
1328 self.fail("Should have failed on duplicate DN")
1329 except ldb.LdbError as err:
1331 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1333 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1334 "OU=DUP3,DC=SAMBA,DC=ORG")
1336 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1337 "OU=DUP2,DC=SAMBA,DC=ORG")
1339 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1340 scope=ldb.SCOPE_SUBTREE,
1341 expression="(objectUUID=0123456789abcde1)")
1342 self.assertEqual(len(res2), 1)
1343 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1345 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1346 scope=ldb.SCOPE_SUBTREE,
1347 expression="(objectUUID=0123456789abcde2)")
1348 self.assertEqual(len(res3), 1)
1349 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1351 def test_move_missing(self):
1353 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1354 "OU=DUP2,DC=SAMBA,DC=ORG")
1355 self.fail("Should have failed on missing")
1356 except ldb.LdbError as err:
1358 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1360 def test_move_missing2(self):
1361 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1364 "objectUUID": b"0123456789abcde2"})
1367 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1368 "OU=DUP2,DC=SAMBA,DC=ORG")
1369 self.fail("Should have failed on missing")
1370 except ldb.LdbError as err:
1372 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1374 def test_move_fail_move_add(self):
1375 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1378 "objectUUID": b"0123456789abcde1"})
1379 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1382 "objectUUID": b"0123456789abcde2"})
1384 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1385 "OU=DUP2,DC=SAMBA,DC=ORG")
1386 self.fail("Should have failed on duplicate DN")
1387 except ldb.LdbError as err:
1389 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1391 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1392 "OU=DUP3,DC=SAMBA,DC=ORG")
1394 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1397 "objectUUID": b"0123456789abcde3"})
1400 class AddModifyTestsLmdb(AddModifyTests):
1403 self.prefix = MDB_PREFIX
1404 super(AddModifyTestsLmdb, self).setUp()
1407 super(AddModifyTestsLmdb, self).tearDown()
1409 class IndexedAddModifyTests(AddModifyTests):
1410 """Test searches using the index, to ensure the index doesn't
1413 if not hasattr(self, 'index'):
1414 self.index = {"dn": "@INDEXLIST",
1415 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1417 super(IndexedAddModifyTests, self).setUp()
1419 def test_duplicate_GUID(self):
1421 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1424 "objectUUID": b"0123456789abcdef"})
1425 self.fail("Should have failed adding dupliate GUID")
1426 except ldb.LdbError as err:
1428 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1430 def test_duplicate_name_dup_GUID(self):
1431 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1434 "objectUUID": b"a123456789abcdef"})
1436 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1439 "objectUUID": b"a123456789abcdef"})
1440 self.fail("Should have failed adding dupliate GUID")
1441 except ldb.LdbError as err:
1443 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1445 def test_duplicate_name_dup_GUID2(self):
1446 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1449 "objectUUID": b"abc3456789abcdef"})
1451 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1454 "objectUUID": b"aaa3456789abcdef"})
1455 self.fail("Should have failed adding dupliate DN")
1456 except ldb.LdbError as err:
1458 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1460 # Checking the GUID didn't stick in the index
1461 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1464 "objectUUID": b"aaa3456789abcdef"})
1466 def test_add_dup_guid_add(self):
1467 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1470 "objectUUID": b"0123456789abcde1"})
1472 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1475 "objectUUID": b"0123456789abcde1"})
1476 self.fail("Should have failed on duplicate GUID")
1478 except ldb.LdbError as err:
1480 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1482 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1485 "objectUUID": b"0123456789abcde2"})
1487 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1488 """Test searches using the index, to ensure the index doesn't
1491 self.index = {"dn": "@INDEXLIST",
1492 "@IDXATTR": [b"x", b"y", b"ou"],
1494 "@IDXGUID": [b"objectUUID"],
1495 "@IDX_DN_GUID": [b"GUID"]}
1496 super(GUIDIndexedAddModifyTests, self).setUp()
1499 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1500 """Test GUID index behaviour insdie the transaction"""
1502 super(GUIDTransIndexedAddModifyTests, self).setUp()
1503 self.l.transaction_start()
1506 self.l.transaction_commit()
1507 super(GUIDTransIndexedAddModifyTests, self).tearDown()
1509 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1510 """Test index behaviour insdie the transaction"""
1512 super(TransIndexedAddModifyTests, self).setUp()
1513 self.l.transaction_start()
1516 self.l.transaction_commit()
1517 super(TransIndexedAddModifyTests, self).tearDown()
1519 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1522 self.prefix = MDB_PREFIX
1523 super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1526 super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1528 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1531 self.prefix = MDB_PREFIX
1532 super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1535 super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1537 class BadIndexTests(LdbBaseTest):
1539 super(BadIndexTests, self).setUp()
1540 self.testdir = tempdir()
1541 self.filename = os.path.join(self.testdir, "test.ldb")
1542 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1543 if hasattr(self, 'IDXGUID'):
1544 self.ldb.add({"dn": "@INDEXLIST",
1545 "@IDXATTR": [b"x", b"y", b"ou"],
1546 "@IDXGUID": [b"objectUUID"],
1547 "@IDX_DN_GUID": [b"GUID"]})
1549 self.ldb.add({"dn": "@INDEXLIST",
1550 "@IDXATTR": [b"x", b"y", b"ou"]})
1552 super(BadIndexTests, self).setUp()
1554 def test_unique(self):
1555 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1556 "objectUUID": b"0123456789abcde1",
1558 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1559 "objectUUID": b"0123456789abcde2",
1561 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1562 "objectUUID": b"0123456789abcde3",
1565 res = self.ldb.search(expression="(y=1)",
1566 base="dc=samba,dc=org")
1567 self.assertEquals(len(res), 3)
1569 # Now set this to unique index, but forget to check the result
1571 self.ldb.add({"dn": "@ATTRIBUTES",
1572 "y": "UNIQUE_INDEX"})
1574 except ldb.LdbError:
1577 # We must still have a working index
1578 res = self.ldb.search(expression="(y=1)",
1579 base="dc=samba,dc=org")
1580 self.assertEquals(len(res), 3)
1582 def test_unique_transaction(self):
1583 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1584 "objectUUID": b"0123456789abcde1",
1586 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1587 "objectUUID": b"0123456789abcde2",
1589 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1590 "objectUUID": b"0123456789abcde3",
1593 res = self.ldb.search(expression="(y=1)",
1594 base="dc=samba,dc=org")
1595 self.assertEquals(len(res), 3)
1597 self.ldb.transaction_start()
1599 # Now set this to unique index, but forget to check the result
1601 self.ldb.add({"dn": "@ATTRIBUTES",
1602 "y": "UNIQUE_INDEX"})
1603 except ldb.LdbError:
1607 self.ldb.transaction_commit()
1610 except ldb.LdbError as err:
1612 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1614 # We must still have a working index
1615 res = self.ldb.search(expression="(y=1)",
1616 base="dc=samba,dc=org")
1618 self.assertEquals(len(res), 3)
1620 def test_casefold(self):
1621 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1622 "objectUUID": b"0123456789abcde1",
1624 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1625 "objectUUID": b"0123456789abcde2",
1627 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1628 "objectUUID": b"0123456789abcde3",
1631 res = self.ldb.search(expression="(y=a)",
1632 base="dc=samba,dc=org")
1633 self.assertEquals(len(res), 2)
1635 self.ldb.add({"dn": "@ATTRIBUTES",
1636 "y": "CASE_INSENSITIVE"})
1638 # We must still have a working index
1639 res = self.ldb.search(expression="(y=a)",
1640 base="dc=samba,dc=org")
1642 if hasattr(self, 'IDXGUID'):
1643 self.assertEquals(len(res), 3)
1645 # We should not return this entry twice, but sadly
1646 # we have not yet fixed
1647 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1648 self.assertEquals(len(res), 4)
1650 def test_casefold_transaction(self):
1651 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1652 "objectUUID": b"0123456789abcde1",
1654 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1655 "objectUUID": b"0123456789abcde2",
1657 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1658 "objectUUID": b"0123456789abcde3",
1661 res = self.ldb.search(expression="(y=a)",
1662 base="dc=samba,dc=org")
1663 self.assertEquals(len(res), 2)
1665 self.ldb.transaction_start()
1667 self.ldb.add({"dn": "@ATTRIBUTES",
1668 "y": "CASE_INSENSITIVE"})
1670 self.ldb.transaction_commit()
1672 # We must still have a working index
1673 res = self.ldb.search(expression="(y=a)",
1674 base="dc=samba,dc=org")
1676 if hasattr(self, 'IDXGUID'):
1677 self.assertEquals(len(res), 3)
1679 # We should not return this entry twice, but sadly
1680 # we have not yet fixed
1681 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1682 self.assertEquals(len(res), 4)
1686 super(BadIndexTests, self).tearDown()
1689 class GUIDBadIndexTests(BadIndexTests):
1690 """Test Bad index things with GUID index mode"""
1694 super(GUIDBadIndexTests, self).setUp()
1696 class DnTests(TestCase):
1699 super(DnTests, self).setUp()
1700 self.ldb = ldb.Ldb()
1703 super(DnTests, self).tearDown()
1706 def test_set_dn_invalid(self):
1710 self.assertRaises(TypeError, assign)
1713 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1714 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1715 self.assertEqual(x, y)
1716 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1717 self.assertNotEqual(x, y)
1720 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1721 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1723 def test_repr(self):
1724 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1725 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1727 def test_get_casefold(self):
1728 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1729 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1731 def test_validate(self):
1732 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1733 self.assertTrue(x.validate())
1735 def test_parent(self):
1736 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1737 self.assertEqual("bar=bloe", x.parent().__str__())
1739 def test_parent_nonexistent(self):
1740 x = ldb.Dn(self.ldb, "@BLA")
1741 self.assertEqual(None, x.parent())
1743 def test_is_valid(self):
1744 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1745 self.assertTrue(x.is_valid())
1746 x = ldb.Dn(self.ldb, "")
1747 self.assertTrue(x.is_valid())
1749 def test_is_special(self):
1750 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1751 self.assertFalse(x.is_special())
1752 x = ldb.Dn(self.ldb, "@FOOBAR")
1753 self.assertTrue(x.is_special())
1755 def test_check_special(self):
1756 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1757 self.assertFalse(x.check_special("FOOBAR"))
1758 x = ldb.Dn(self.ldb, "@FOOBAR")
1759 self.assertTrue(x.check_special("@FOOBAR"))
1762 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1763 self.assertEqual(2, len(x))
1764 x = ldb.Dn(self.ldb, "dc=foo21")
1765 self.assertEqual(1, len(x))
1767 def test_add_child(self):
1768 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1769 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1770 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1772 def test_add_base(self):
1773 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1774 base = ldb.Dn(self.ldb, "bla=bloe")
1775 self.assertTrue(x.add_base(base))
1776 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1778 def test_add_child_str(self):
1779 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1780 self.assertTrue(x.add_child("bla=bloe"))
1781 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1783 def test_add_base_str(self):
1784 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1786 self.assertTrue(x.add_base(base))
1787 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1790 x = ldb.Dn(self.ldb, "dc=foo24")
1791 y = ldb.Dn(self.ldb, "bar=bla")
1792 self.assertEqual("dc=foo24,bar=bla", str(x + y))
1794 def test_remove_base_components(self):
1795 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1796 x.remove_base_components(len(x)-1)
1797 self.assertEqual("dc=foo24", str(x))
1799 def test_parse_ldif(self):
1800 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1802 self.assertEqual("foo=bar", str(msg[1].dn))
1803 self.assertTrue(isinstance(msg[1], ldb.Message))
1804 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1805 self.assertEqual("dn: foo=bar\n\n", ldif)
1807 def test_parse_ldif_more(self):
1808 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1810 self.assertEqual("foo=bar", str(msg[1].dn))
1812 self.assertEqual("bar=bar", str(msg[1].dn))
1814 def test_canonical_string(self):
1815 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1816 self.assertEqual("/bloe/foo25", x.canonical_str())
1818 def test_canonical_ex_string(self):
1819 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1820 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1822 def test_ldb_is_child_of(self):
1823 """Testing ldb_dn_compare_dn"""
1824 dn1 = ldb.Dn(self.ldb, "dc=base")
1825 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1826 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1827 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1829 self.assertTrue(dn1.is_child_of(dn1))
1830 self.assertTrue(dn2.is_child_of(dn1))
1831 self.assertTrue(dn4.is_child_of(dn1))
1832 self.assertTrue(dn4.is_child_of(dn3))
1833 self.assertTrue(dn4.is_child_of(dn4))
1834 self.assertFalse(dn3.is_child_of(dn2))
1835 self.assertFalse(dn1.is_child_of(dn4))
1837 def test_ldb_is_child_of_str(self):
1838 """Testing ldb_dn_compare_dn"""
1840 dn2_str = "cn=foo,dc=base"
1841 dn3_str = "cn=bar,dc=base"
1842 dn4_str = "cn=baz,cn=bar,dc=base"
1844 dn1 = ldb.Dn(self.ldb, dn1_str)
1845 dn2 = ldb.Dn(self.ldb, dn2_str)
1846 dn3 = ldb.Dn(self.ldb, dn3_str)
1847 dn4 = ldb.Dn(self.ldb, dn4_str)
1849 self.assertTrue(dn1.is_child_of(dn1_str))
1850 self.assertTrue(dn2.is_child_of(dn1_str))
1851 self.assertTrue(dn4.is_child_of(dn1_str))
1852 self.assertTrue(dn4.is_child_of(dn3_str))
1853 self.assertTrue(dn4.is_child_of(dn4_str))
1854 self.assertFalse(dn3.is_child_of(dn2_str))
1855 self.assertFalse(dn1.is_child_of(dn4_str))
1857 def test_get_component_name(self):
1858 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1859 self.assertEqual(dn.get_component_name(0), 'cn')
1860 self.assertEqual(dn.get_component_name(1), 'dc')
1861 self.assertEqual(dn.get_component_name(2), None)
1862 self.assertEqual(dn.get_component_name(-1), None)
1864 def test_get_component_value(self):
1865 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1866 self.assertEqual(dn.get_component_value(0), 'foo')
1867 self.assertEqual(dn.get_component_value(1), 'base')
1868 self.assertEqual(dn.get_component_name(2), None)
1869 self.assertEqual(dn.get_component_name(-1), None)
1871 def test_set_component(self):
1872 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1873 dn.set_component(0, 'cn', 'bar')
1874 self.assertEqual(str(dn), "cn=bar,dc=base")
1875 dn.set_component(1, 'o', 'asep')
1876 self.assertEqual(str(dn), "cn=bar,o=asep")
1877 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1878 self.assertEqual(str(dn), "cn=bar,o=asep")
1879 dn.set_component(1, 'o', 'a,b+c')
1880 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1882 def test_set_component_bytes(self):
1883 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1884 dn.set_component(0, 'cn', b'bar')
1885 self.assertEqual(str(dn), "cn=bar,dc=base")
1886 dn.set_component(1, 'o', b'asep')
1887 self.assertEqual(str(dn), "cn=bar,o=asep")
1889 def test_set_component_none(self):
1890 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1891 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1893 def test_get_extended_component_null(self):
1894 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1895 self.assertEqual(dn.get_extended_component("TEST"), None)
1897 def test_get_extended_component(self):
1898 self.ldb._register_test_extensions()
1899 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1900 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1902 def test_set_extended_component(self):
1903 self.ldb._register_test_extensions()
1904 dn = ldb.Dn(self.ldb, "dc=base")
1905 dn.set_extended_component("TEST", "foo")
1906 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1907 dn.set_extended_component("TEST", b"bar")
1908 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1910 def test_extended_str(self):
1911 self.ldb._register_test_extensions()
1912 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1913 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1915 def test_get_rdn_name(self):
1916 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1917 self.assertEqual(dn.get_rdn_name(), 'cn')
1919 def test_get_rdn_value(self):
1920 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1921 self.assertEqual(dn.get_rdn_value(), 'foo')
1923 def test_get_casefold(self):
1924 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1925 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1927 def test_get_linearized(self):
1928 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1929 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1931 def test_is_null(self):
1932 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1933 self.assertFalse(dn.is_null())
1935 dn = ldb.Dn(self.ldb, '')
1936 self.assertTrue(dn.is_null())
1938 class LdbMsgTests(TestCase):
1941 super(LdbMsgTests, self).setUp()
1942 self.msg = ldb.Message()
1944 def test_init_dn(self):
1945 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1946 self.assertEqual("dc=foo27", str(self.msg.dn))
1948 def test_iter_items(self):
1949 self.assertEqual(0, len(self.msg.items()))
1950 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
1951 self.assertEqual(1, len(self.msg.items()))
1953 def test_repr(self):
1954 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
1955 self.msg["dc"] = b"foo"
1957 self.assertIn(repr(self.msg), [
1958 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1959 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1961 self.assertIn(repr(self.msg.text), [
1962 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1963 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1968 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1970 repr(self.msg.text),
1971 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1974 self.assertEqual(0, len(self.msg))
1976 def test_notpresent(self):
1977 self.assertRaises(KeyError, lambda: self.msg["foo"])
1983 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1985 def test_add_text(self):
1986 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1988 def test_elements_empty(self):
1989 self.assertEqual([], self.msg.elements())
1991 def test_elements(self):
1992 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
1994 self.assertEqual([el], self.msg.elements())
1995 self.assertEqual([el.text], self.msg.text.elements())
1997 def test_add_value(self):
1998 self.assertEqual(0, len(self.msg))
1999 self.msg["foo"] = [b"foo"]
2000 self.assertEqual(1, len(self.msg))
2002 def test_add_value_text(self):
2003 self.assertEqual(0, len(self.msg))
2004 self.msg["foo"] = ["foo"]
2005 self.assertEqual(1, len(self.msg))
2007 def test_add_value_multiple(self):
2008 self.assertEqual(0, len(self.msg))
2009 self.msg["foo"] = [b"foo", b"bla"]
2010 self.assertEqual(1, len(self.msg))
2011 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2013 def test_add_value_multiple_text(self):
2014 self.assertEqual(0, len(self.msg))
2015 self.msg["foo"] = ["foo", "bla"]
2016 self.assertEqual(1, len(self.msg))
2017 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2019 def test_set_value(self):
2020 self.msg["foo"] = [b"fool"]
2021 self.assertEqual([b"fool"], list(self.msg["foo"]))
2022 self.msg["foo"] = [b"bar"]
2023 self.assertEqual([b"bar"], list(self.msg["foo"]))
2025 def test_set_value_text(self):
2026 self.msg["foo"] = ["fool"]
2027 self.assertEqual(["fool"], list(self.msg.text["foo"]))
2028 self.msg["foo"] = ["bar"]
2029 self.assertEqual(["bar"], list(self.msg.text["foo"]))
2031 def test_keys(self):
2032 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2033 self.msg["foo"] = [b"bla"]
2034 self.msg["bar"] = [b"bla"]
2035 self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
2037 def test_keys_text(self):
2038 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2039 self.msg["foo"] = ["bla"]
2040 self.msg["bar"] = ["bla"]
2041 self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
2044 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2045 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2047 def test_get_dn(self):
2048 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2049 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2051 def test_dn_text(self):
2052 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2053 self.assertEqual("@BASEINFO", str(self.msg.dn))
2054 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2056 def test_get_dn_text(self):
2057 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2058 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2059 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2061 def test_get_invalid(self):
2062 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2063 self.assertRaises(TypeError, self.msg.get, 42)
2065 def test_get_other(self):
2066 self.msg["foo"] = [b"bar"]
2067 self.assertEqual(b"bar", self.msg.get("foo")[0])
2068 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2069 self.assertEqual(None, self.msg.get("foo", idx=1))
2070 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2072 def test_get_other_text(self):
2073 self.msg["foo"] = ["bar"]
2074 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2075 self.assertEqual("bar", self.msg.text.get("foo")[0])
2076 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2077 self.assertEqual(None, self.msg.get("foo", idx=1))
2078 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2080 def test_get_default(self):
2081 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2082 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2084 def test_get_default_text(self):
2085 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2086 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2088 def test_get_unknown(self):
2089 self.assertEqual(None, self.msg.get("lalalala"))
2091 def test_get_unknown_text(self):
2092 self.assertEqual(None, self.msg.text.get("lalalala"))
2094 def test_msg_diff(self):
2096 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2097 msg1 = next(msgs)[1]
2098 msg2 = next(msgs)[1]
2099 msgdiff = l.msg_diff(msg1, msg2)
2100 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2101 self.assertRaises(KeyError, lambda: msgdiff["foo"])
2102 self.assertEqual(1, len(msgdiff))
2104 def test_equal_empty(self):
2105 msg1 = ldb.Message()
2106 msg2 = ldb.Message()
2107 self.assertEqual(msg1, msg2)
2109 def test_equal_simplel(self):
2111 msg1 = ldb.Message()
2112 msg1.dn = ldb.Dn(db, "foo=bar")
2113 msg2 = ldb.Message()
2114 msg2.dn = ldb.Dn(db, "foo=bar")
2115 self.assertEqual(msg1, msg2)
2116 msg1['foo'] = b'bar'
2117 msg2['foo'] = b'bar'
2118 self.assertEqual(msg1, msg2)
2119 msg2['foo'] = b'blie'
2120 self.assertNotEqual(msg1, msg2)
2121 msg2['foo'] = b'blie'
2123 def test_from_dict(self):
2124 rec = {"dn": "dc=fromdict",
2125 "a1": [b"a1-val1", b"a1-val1"]}
2127 # check different types of input Flags
2128 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2129 m = ldb.Message.from_dict(l, rec, flags)
2130 self.assertEqual(rec["a1"], list(m["a1"]))
2131 self.assertEqual(flags, m["a1"].flags())
2132 # check input params
2133 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2134 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2135 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2136 # Message.from_dict expects dictionary with 'dn'
2137 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2138 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2140 def test_from_dict_text(self):
2141 rec = {"dn": "dc=fromdict",
2142 "a1": ["a1-val1", "a1-val1"]}
2144 # check different types of input Flags
2145 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2146 m = ldb.Message.from_dict(l, rec, flags)
2147 self.assertEqual(rec["a1"], list(m.text["a1"]))
2148 self.assertEqual(flags, m.text["a1"].flags())
2149 # check input params
2150 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2151 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2152 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2153 # Message.from_dict expects dictionary with 'dn'
2154 err_rec = {"a1": ["a1-val1", "a1-val1"]}
2155 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2157 def test_copy_add_message_element(self):
2159 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2160 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2164 self.assertEqual(mto["1"], m["1"])
2165 self.assertEqual(mto["2"], m["2"])
2169 self.assertEqual(mto["1"], m["1"])
2170 self.assertEqual(mto["2"], m["2"])
2172 def test_copy_add_message_element_text(self):
2174 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2175 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2179 self.assertEqual(mto["1"], m.text["1"])
2180 self.assertEqual(mto["2"], m.text["2"])
2184 self.assertEqual(mto.text["1"], m.text["1"])
2185 self.assertEqual(mto.text["2"], m.text["2"])
2186 self.assertEqual(mto["1"], m["1"])
2187 self.assertEqual(mto["2"], m["2"])
2190 class MessageElementTests(TestCase):
2192 def test_cmp_element(self):
2193 x = ldb.MessageElement([b"foo"])
2194 y = ldb.MessageElement([b"foo"])
2195 z = ldb.MessageElement([b"bzr"])
2196 self.assertEqual(x, y)
2197 self.assertNotEqual(x, z)
2199 def test_cmp_element_text(self):
2200 x = ldb.MessageElement([b"foo"])
2201 y = ldb.MessageElement(["foo"])
2202 self.assertEqual(x, y)
2204 def test_create_iterable(self):
2205 x = ldb.MessageElement([b"foo"])
2206 self.assertEqual([b"foo"], list(x))
2207 self.assertEqual(["foo"], list(x.text))
2209 def test_repr(self):
2210 x = ldb.MessageElement([b"foo"])
2212 self.assertEqual("MessageElement([b'foo'])", repr(x))
2213 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2215 self.assertEqual("MessageElement(['foo'])", repr(x))
2216 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2217 x = ldb.MessageElement([b"foo", b"bla"])
2218 self.assertEqual(2, len(x))
2220 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2221 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2223 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2224 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2226 def test_get_item(self):
2227 x = ldb.MessageElement([b"foo", b"bar"])
2228 self.assertEqual(b"foo", x[0])
2229 self.assertEqual(b"bar", x[1])
2230 self.assertEqual(b"bar", x[-1])
2231 self.assertRaises(IndexError, lambda: x[45])
2233 def test_get_item_text(self):
2234 x = ldb.MessageElement(["foo", "bar"])
2235 self.assertEqual("foo", x.text[0])
2236 self.assertEqual("bar", x.text[1])
2237 self.assertEqual("bar", x.text[-1])
2238 self.assertRaises(IndexError, lambda: x[45])
2241 x = ldb.MessageElement([b"foo", b"bar"])
2242 self.assertEqual(2, len(x))
2245 x = ldb.MessageElement([b"foo", b"bar"])
2246 y = ldb.MessageElement([b"foo", b"bar"])
2247 self.assertEqual(y, x)
2248 x = ldb.MessageElement([b"foo"])
2249 self.assertNotEqual(y, x)
2250 y = ldb.MessageElement([b"foo"])
2251 self.assertEqual(y, x)
2253 def test_extended(self):
2254 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2256 self.assertEqual("MessageElement([b'456'])", repr(el))
2257 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2259 self.assertEqual("MessageElement(['456'])", repr(el))
2260 self.assertEqual("MessageElement(['456']).text", repr(el.text))
2262 def test_bad_text(self):
2263 el = ldb.MessageElement(b'\xba\xdd')
2264 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2267 class ModuleTests(TestCase):
2270 super(ModuleTests, self).setUp()
2271 self.testdir = tempdir()
2272 self.filename = os.path.join(self.testdir, "test.ldb")
2273 self.ldb = ldb.Ldb(self.filename)
2276 shutil.rmtree(self.testdir)
2277 super(ModuleTests, self).setUp()
2279 def test_register_module(self):
2280 class ExampleModule:
2282 ldb.register_module(ExampleModule)
2284 def test_use_module(self):
2286 class ExampleModule:
2289 def __init__(self, ldb, next):
2293 def search(self, *args, **kwargs):
2294 return self.next.search(*args, **kwargs)
2296 def request(self, *args, **kwargs):
2299 ldb.register_module(ExampleModule)
2300 l = ldb.Ldb(self.filename)
2301 l.add({"dn": "@MODULES", "@LIST": "bla"})
2302 self.assertEqual([], ops)
2303 l = ldb.Ldb(self.filename)
2304 self.assertEqual(["init"], ops)
2306 class LdbResultTests(LdbBaseTest):
2309 super(LdbResultTests, self).setUp()
2310 self.testdir = tempdir()
2311 self.filename = os.path.join(self.testdir, "test.ldb")
2312 self.l = ldb.Ldb(self.url(), flags=self.flags())
2314 self.l.add(self.index)
2315 except AttributeError:
2317 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2318 "objectUUID": b"0123456789abcde0"})
2319 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2320 "objectUUID": b"0123456789abcde1"})
2321 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2322 "objectUUID": b"0123456789abcde2"})
2323 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2324 "objectUUID": b"0123456789abcde3"})
2325 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2326 "objectUUID": b"0123456789abcde4"})
2327 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2328 "objectUUID": b"0123456789abcde5"})
2329 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2330 "objectUUID": b"0123456789abcde6"})
2331 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2332 "objectUUID": b"0123456789abcde7"})
2333 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2334 "objectUUID": b"0123456789abcde8"})
2335 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2336 "objectUUID": b"0123456789abcde9"})
2337 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2338 "objectUUID": b"0123456789abcdea"})
2339 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2340 "objectUUID": b"0123456789abcdeb"})
2341 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2342 "objectUUID": b"0123456789abcdec"})
2345 shutil.rmtree(self.testdir)
2346 super(LdbResultTests, self).tearDown()
2347 # Ensure the LDB is closed now, so we close the FD
2350 def test_return_type(self):
2351 res = self.l.search()
2352 self.assertEqual(str(res), "<ldb result>")
2354 def test_get_msgs(self):
2355 res = self.l.search()
2358 def test_get_controls(self):
2359 res = self.l.search()
2362 def test_get_referals(self):
2363 res = self.l.search()
2366 def test_iter_msgs(self):
2368 for l in self.l.search().msgs:
2369 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2371 self.assertTrue(found)
2373 def test_iter_msgs_count(self):
2374 self.assertTrue(self.l.search().count > 0)
2375 # 13 objects has been added to the DC=SAMBA, DC=ORG
2376 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2378 def test_iter_controls(self):
2379 res = self.l.search().controls
2382 def test_create_control(self):
2383 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2384 c = ldb.Control(self.l, "relax:1")
2385 self.assertEqual(c.critical, True)
2386 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2388 def test_iter_refs(self):
2389 res = self.l.search().referals
2392 def test_search_sequence_msgs(self):
2394 res = self.l.search().msgs
2396 for i in range(0, len(res)):
2398 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2400 self.assertTrue(found)
2402 def test_search_as_iter(self):
2404 res = self.l.search()
2407 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2409 self.assertTrue(found)
2411 def test_search_iter(self):
2413 res = self.l.search_iterator()
2416 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2418 self.assertTrue(found)
2421 # Show that search results can't see into a transaction
2422 def test_search_against_trans(self):
2425 (r1, w1) = os.pipe()
2427 (r2, w2) = os.pipe()
2429 # For the first element, fork a child that will
2433 # In the child, re-open
2437 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2438 # start a transaction
2439 child_ldb.transaction_start()
2442 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2443 "name": b"samba.org",
2444 "objectUUID": b"o123456789acbdef"})
2446 os.write(w1, b"added")
2448 # Now wait for the search to be done
2453 child_ldb.transaction_commit()
2454 except LdbError as err:
2455 # We print this here to see what went wrong in the child
2459 os.write(w1, b"transaction")
2462 self.assertEqual(os.read(r1, 5), b"added")
2464 # This should not turn up until the transaction is concluded
2465 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2466 scope=ldb.SCOPE_BASE)
2467 self.assertEqual(len(res11), 0)
2469 os.write(w2, b"search")
2471 # Now wait for the transaction to be done. This should
2472 # deadlock, but the search doesn't hold a read lock for the
2473 # iterator lifetime currently.
2474 self.assertEqual(os.read(r1, 11), b"transaction")
2476 # This should now turn up, as the transaction is over
2477 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2478 scope=ldb.SCOPE_BASE)
2479 self.assertEqual(len(res11), 1)
2481 self.assertFalse(found11)
2483 (got_pid, status) = os.waitpid(pid, 0)
2484 self.assertEqual(got_pid, pid)
2487 def test_search_iter_against_trans(self):
2491 # We need to hold this iterator open to hold the all-record
2493 res = self.l.search_iterator()
2495 (r1, w1) = os.pipe()
2497 (r2, w2) = os.pipe()
2499 # For the first element, with the sequence open (which
2500 # means with ldb locks held), fork a child that will
2504 # In the child, re-open
2509 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2510 # start a transaction
2511 child_ldb.transaction_start()
2514 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2515 "name": b"samba.org",
2516 "objectUUID": b"o123456789acbdef"})
2518 os.write(w1, b"added")
2520 # Now wait for the search to be done
2525 child_ldb.transaction_commit()
2526 except LdbError as err:
2527 # We print this here to see what went wrong in the child
2531 os.write(w1, b"transaction")
2534 self.assertEqual(os.read(r1, 5), b"added")
2536 # This should not turn up until the transaction is concluded
2537 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2538 scope=ldb.SCOPE_BASE)
2539 self.assertEqual(len(res11), 0)
2541 os.write(w2, b"search")
2543 # allow the transaction to start
2546 # This should not turn up until the search finishes and
2547 # removed the read lock, but for ldb_tdb that happened as soon
2548 # as we called the first res.next()
2549 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2550 scope=ldb.SCOPE_BASE)
2551 self.assertEqual(len(res11), 0)
2553 # These results are all collected at the first next(res) call
2555 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2557 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2560 # Now wait for the transaction to be done.
2561 self.assertEqual(os.read(r1, 11), b"transaction")
2563 # This should now turn up, as the transaction is over and all
2564 # read locks are gone
2565 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2566 scope=ldb.SCOPE_BASE)
2567 self.assertEqual(len(res11), 1)
2569 self.assertTrue(found)
2570 self.assertFalse(found11)
2572 (got_pid, status) = os.waitpid(pid, 0)
2573 self.assertEqual(got_pid, pid)
2576 class LdbResultTestsLmdb(LdbResultTests):
2579 self.prefix = MDB_PREFIX
2580 super(LdbResultTestsLmdb, self).setUp()
2583 super(LdbResultTestsLmdb, self).tearDown()
2586 class BadTypeTests(TestCase):
2587 def test_control(self):
2589 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2590 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2592 def test_modify(self):
2594 dn = ldb.Dn(l, 'a=b')
2596 self.assertRaises(TypeError, l.modify, '<bad type>')
2597 self.assertRaises(TypeError, l.modify, m, '<bad type>')
2601 dn = ldb.Dn(l, 'a=b')
2603 self.assertRaises(TypeError, l.add, '<bad type>')
2604 self.assertRaises(TypeError, l.add, m, '<bad type>')
2606 def test_delete(self):
2608 dn = ldb.Dn(l, 'a=b')
2609 self.assertRaises(TypeError, l.add, '<bad type>')
2610 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2612 def test_rename(self):
2614 dn = ldb.Dn(l, 'a=b')
2615 self.assertRaises(TypeError, l.add, '<bad type>', dn)
2616 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2617 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2619 def test_search(self):
2621 self.assertRaises(TypeError, l.search, base=1234)
2622 self.assertRaises(TypeError, l.search, scope='<bad type>')
2623 self.assertRaises(TypeError, l.search, expression=1234)
2624 self.assertRaises(TypeError, l.search, attrs='<bad type>')
2625 self.assertRaises(TypeError, l.search, controls='<bad type>')
2628 class VersionTests(TestCase):
2630 def test_version(self):
2631 self.assertTrue(isinstance(ldb.__version__, str))
2634 if __name__ == '__main__':
2636 unittest.TestProgram()