2 # Simple tests for the ldb python bindings.
3 # Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
6 from unittest import TestCase
13 PY3 = sys.version_info > (3, 0)
22 dir_prefix = os.path.join(os.environ["SELFTEST_PREFIX"], "tmp")
25 return tempfile.mkdtemp(dir=dir_prefix)
28 class NoContextTests(TestCase):
30 def test_valid_attr_name(self):
31 self.assertTrue(ldb.valid_attr_name("foo"))
32 self.assertFalse(ldb.valid_attr_name("24foo"))
34 def test_timestring(self):
35 self.assertEqual("19700101000000.0Z", ldb.timestring(0))
36 self.assertEqual("20071119191012.0Z", ldb.timestring(1195499412))
38 def test_string_to_time(self):
39 self.assertEqual(0, ldb.string_to_time("19700101000000.0Z"))
40 self.assertEqual(1195499412, ldb.string_to_time("20071119191012.0Z"))
42 def test_binary_encode(self):
43 encoded = ldb.binary_encode(b'test\\x')
44 decoded = ldb.binary_decode(encoded)
45 self.assertEqual(decoded, b'test\\x')
47 encoded2 = ldb.binary_encode('test\\x')
48 self.assertEqual(encoded2, encoded)
51 class LdbBaseTest(TestCase):
53 super(LdbBaseTest, self).setUp()
55 if self.prefix is None:
56 self.prefix = TDB_PREFIX
57 except AttributeError:
58 self.prefix = TDB_PREFIX
61 super(LdbBaseTest, self).tearDown()
64 return self.prefix + self.filename
67 if self.prefix == MDB_PREFIX:
73 class SimpleLdb(LdbBaseTest):
76 super(SimpleLdb, self).setUp()
77 self.testdir = tempdir()
78 self.filename = os.path.join(self.testdir, "test.ldb")
79 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
81 self.ldb.add(self.index)
82 except AttributeError:
86 shutil.rmtree(self.testdir)
87 super(SimpleLdb, self).tearDown()
88 # Ensure the LDB is closed now, so we close the FD
91 def test_connect(self):
92 ldb.Ldb(self.url(), flags=self.flags())
94 def test_connect_none(self):
97 def test_connect_later(self):
99 x.connect(self.url(), flags=self.flags())
103 self.assertTrue(repr(x).startswith("<ldb connection"))
105 def test_set_create_perms(self):
107 x.set_create_perms(0o600)
109 def test_modules_none(self):
111 self.assertEqual([], x.modules())
113 def test_modules_tdb(self):
114 x = ldb.Ldb(self.url(), flags=self.flags())
115 self.assertEqual("[<ldb module 'tdb'>]", repr(x.modules()))
117 def test_firstmodule_none(self):
119 self.assertEqual(x.firstmodule, None)
121 def test_firstmodule_tdb(self):
122 x = ldb.Ldb(self.url(), flags=self.flags())
124 self.assertEqual(repr(mod), "<ldb module 'tdb'>")
126 def test_search(self):
127 l = ldb.Ldb(self.url(), flags=self.flags())
128 self.assertEqual(len(l.search()), 0)
130 def test_search_controls(self):
131 l = ldb.Ldb(self.url(), flags=self.flags())
132 self.assertEqual(len(l.search(controls=["paged_results:0:5"])), 0)
134 def test_search_attrs(self):
135 l = ldb.Ldb(self.url(), flags=self.flags())
136 self.assertEqual(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
138 def test_search_string_dn(self):
139 l = ldb.Ldb(self.url(), flags=self.flags())
140 self.assertEqual(len(l.search("", ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
142 def test_search_attr_string(self):
143 l = ldb.Ldb(self.url(), flags=self.flags())
144 self.assertRaises(TypeError, l.search, attrs="dc")
145 self.assertRaises(TypeError, l.search, attrs=b"dc")
147 def test_opaque(self):
148 l = ldb.Ldb(self.url(), flags=self.flags())
149 l.set_opaque("my_opaque", l)
150 self.assertTrue(l.get_opaque("my_opaque") is not None)
151 self.assertEqual(None, l.get_opaque("unknown"))
153 def test_search_scope_base_empty_db(self):
154 l = ldb.Ldb(self.url(), flags=self.flags())
155 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
158 def test_search_scope_onelevel_empty_db(self):
159 l = ldb.Ldb(self.url(), flags=self.flags())
160 self.assertEqual(len(l.search(ldb.Dn(l, "dc=foo1"),
161 ldb.SCOPE_ONELEVEL)), 0)
163 def test_delete(self):
164 l = ldb.Ldb(self.url(), flags=self.flags())
165 self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo2")))
167 def test_delete_w_unhandled_ctrl(self):
168 l = ldb.Ldb(self.url(), flags=self.flags())
170 m.dn = ldb.Dn(l, "dc=foo1")
172 m["objectUUID"] = b"0123456789abcdef"
174 self.assertRaises(ldb.LdbError, lambda: l.delete(m.dn, ["search_options:1:2"]))
177 def test_contains(self):
179 l = ldb.Ldb(name, flags=self.flags())
180 self.assertFalse(ldb.Dn(l, "dc=foo3") in l)
181 l = ldb.Ldb(name, flags=self.flags())
183 m.dn = ldb.Dn(l, "dc=foo3")
185 m["objectUUID"] = b"0123456789abcdef"
188 self.assertTrue(ldb.Dn(l, "dc=foo3") in l)
189 self.assertFalse(ldb.Dn(l, "dc=foo4") in l)
193 def test_get_config_basedn(self):
194 l = ldb.Ldb(self.url(), flags=self.flags())
195 self.assertEqual(None, l.get_config_basedn())
197 def test_get_root_basedn(self):
198 l = ldb.Ldb(self.url(), flags=self.flags())
199 self.assertEqual(None, l.get_root_basedn())
201 def test_get_schema_basedn(self):
202 l = ldb.Ldb(self.url(), flags=self.flags())
203 self.assertEqual(None, l.get_schema_basedn())
205 def test_get_default_basedn(self):
206 l = ldb.Ldb(self.url(), flags=self.flags())
207 self.assertEqual(None, l.get_default_basedn())
210 l = ldb.Ldb(self.url(), flags=self.flags())
212 m.dn = ldb.Dn(l, "dc=foo4")
214 m["objectUUID"] = b"0123456789abcdef"
215 self.assertEqual(len(l.search()), 0)
218 self.assertEqual(len(l.search()), 1)
220 l.delete(ldb.Dn(l, "dc=foo4"))
222 def test_search_iterator(self):
223 l = ldb.Ldb(self.url(), flags=self.flags())
224 s = l.search_iterator()
230 except RuntimeError as re:
235 except RuntimeError as re:
240 except RuntimeError as re:
243 s = l.search_iterator()
246 self.assertTrue(isinstance(me, ldb.Message))
249 self.assertEqual(len(r), 0)
250 self.assertEqual(count, 0)
253 m1.dn = ldb.Dn(l, "dc=foo4")
255 m1["objectUUID"] = b"0123456789abcdef"
258 s = l.search_iterator()
261 self.assertTrue(isinstance(me, ldb.Message))
265 self.assertEqual(len(r), 0)
266 self.assertEqual(len(msgs), 1)
267 self.assertEqual(msgs[0].dn, m1.dn)
270 m2.dn = ldb.Dn(l, "dc=foo5")
272 m2["objectUUID"] = b"0123456789abcdee"
275 s = l.search_iterator()
278 self.assertTrue(isinstance(me, ldb.Message))
282 self.assertEqual(len(r), 0)
283 self.assertEqual(len(msgs), 2)
284 if msgs[0].dn == m1.dn:
285 self.assertEqual(msgs[0].dn, m1.dn)
286 self.assertEqual(msgs[1].dn, m2.dn)
288 self.assertEqual(msgs[0].dn, m2.dn)
289 self.assertEqual(msgs[1].dn, m1.dn)
291 s = l.search_iterator()
294 self.assertTrue(isinstance(me, ldb.Message))
301 except RuntimeError as re:
304 self.assertTrue(isinstance(me, ldb.Message))
312 self.assertEqual(len(r), 0)
313 self.assertEqual(len(msgs), 2)
314 if msgs[0].dn == m1.dn:
315 self.assertEqual(msgs[0].dn, m1.dn)
316 self.assertEqual(msgs[1].dn, m2.dn)
318 self.assertEqual(msgs[0].dn, m2.dn)
319 self.assertEqual(msgs[1].dn, m1.dn)
321 l.delete(ldb.Dn(l, "dc=foo4"))
322 l.delete(ldb.Dn(l, "dc=foo5"))
324 def test_add_text(self):
325 l = ldb.Ldb(self.url(), flags=self.flags())
327 m.dn = ldb.Dn(l, "dc=foo4")
329 m["objectUUID"] = b"0123456789abcdef"
330 self.assertEqual(len(l.search()), 0)
333 self.assertEqual(len(l.search()), 1)
335 l.delete(ldb.Dn(l, "dc=foo4"))
337 def test_add_w_unhandled_ctrl(self):
338 l = ldb.Ldb(self.url(), flags=self.flags())
340 m.dn = ldb.Dn(l, "dc=foo4")
342 self.assertEqual(len(l.search()), 0)
343 self.assertRaises(ldb.LdbError, lambda: l.add(m,["search_options:1:2"]))
345 def test_add_dict(self):
346 l = ldb.Ldb(self.url(), flags=self.flags())
347 m = {"dn": ldb.Dn(l, "dc=foo5"),
349 "objectUUID": b"0123456789abcdef"}
350 self.assertEqual(len(l.search()), 0)
353 self.assertEqual(len(l.search()), 1)
355 l.delete(ldb.Dn(l, "dc=foo5"))
357 def test_add_dict_text(self):
358 l = ldb.Ldb(self.url(), flags=self.flags())
359 m = {"dn": ldb.Dn(l, "dc=foo5"),
361 "objectUUID": b"0123456789abcdef"}
362 self.assertEqual(len(l.search()), 0)
365 self.assertEqual(len(l.search()), 1)
367 l.delete(ldb.Dn(l, "dc=foo5"))
369 def test_add_dict_string_dn(self):
370 l = ldb.Ldb(self.url(), flags=self.flags())
371 m = {"dn": "dc=foo6", "bla": b"bla",
372 "objectUUID": b"0123456789abcdef"}
373 self.assertEqual(len(l.search()), 0)
376 self.assertEqual(len(l.search()), 1)
378 l.delete(ldb.Dn(l, "dc=foo6"))
380 def test_add_dict_bytes_dn(self):
381 l = ldb.Ldb(self.url(), flags=self.flags())
382 m = {"dn": b"dc=foo6", "bla": b"bla",
383 "objectUUID": b"0123456789abcdef"}
384 self.assertEqual(len(l.search()), 0)
387 self.assertEqual(len(l.search()), 1)
389 l.delete(ldb.Dn(l, "dc=foo6"))
391 def test_rename(self):
392 l = ldb.Ldb(self.url(), flags=self.flags())
394 m.dn = ldb.Dn(l, "dc=foo7")
396 m["objectUUID"] = b"0123456789abcdef"
397 self.assertEqual(len(l.search()), 0)
400 l.rename(ldb.Dn(l, "dc=foo7"), ldb.Dn(l, "dc=bar"))
401 self.assertEqual(len(l.search()), 1)
403 l.delete(ldb.Dn(l, "dc=bar"))
405 def test_rename_string_dns(self):
406 l = ldb.Ldb(self.url(), flags=self.flags())
408 m.dn = ldb.Dn(l, "dc=foo8")
410 m["objectUUID"] = b"0123456789abcdef"
411 self.assertEqual(len(l.search()), 0)
413 self.assertEqual(len(l.search()), 1)
415 l.rename("dc=foo8", "dc=bar")
416 self.assertEqual(len(l.search()), 1)
418 l.delete(ldb.Dn(l, "dc=bar"))
420 def test_empty_dn(self):
421 l = ldb.Ldb(self.url(), flags=self.flags())
422 self.assertEqual(0, len(l.search()))
424 m.dn = ldb.Dn(l, "dc=empty")
425 m["objectUUID"] = b"0123456789abcdef"
428 self.assertEqual(1, len(rm))
429 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
433 self.assertEqual(1, len(rm))
434 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
436 rm = l.search(m.dn, attrs=["blah"])
437 self.assertEqual(1, len(rm))
438 self.assertEqual(0, len(rm[0]))
440 def test_modify_delete(self):
441 l = ldb.Ldb(self.url(), flags=self.flags())
443 m.dn = ldb.Dn(l, "dc=modifydelete")
445 m["objectUUID"] = b"0123456789abcdef"
447 rm = l.search(m.dn)[0]
448 self.assertEqual([b"1234"], list(rm["bla"]))
451 m.dn = ldb.Dn(l, "dc=modifydelete")
452 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
453 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
456 self.assertEqual(1, len(rm))
457 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
459 rm = l.search(m.dn, attrs=["bla"])
460 self.assertEqual(1, len(rm))
461 self.assertEqual(0, len(rm[0]))
463 l.delete(ldb.Dn(l, "dc=modifydelete"))
465 def test_modify_delete_text(self):
466 l = ldb.Ldb(self.url(), flags=self.flags())
468 m.dn = ldb.Dn(l, "dc=modifydelete")
469 m.text["bla"] = ["1234"]
470 m["objectUUID"] = b"0123456789abcdef"
472 rm = l.search(m.dn)[0]
473 self.assertEqual(["1234"], list(rm.text["bla"]))
476 m.dn = ldb.Dn(l, "dc=modifydelete")
477 m["bla"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "bla")
478 self.assertEqual(ldb.FLAG_MOD_DELETE, m["bla"].flags())
481 self.assertEqual(1, len(rm))
482 self.assertEqual(set(["dn", "distinguishedName", "objectUUID"]),
484 rm = l.search(m.dn, attrs=["bla"])
485 self.assertEqual(1, len(rm))
486 self.assertEqual(0, len(rm[0]))
488 l.delete(ldb.Dn(l, "dc=modifydelete"))
490 def test_modify_add(self):
491 l = ldb.Ldb(self.url(), flags=self.flags())
493 m.dn = ldb.Dn(l, "dc=add")
495 m["objectUUID"] = b"0123456789abcdef"
499 m.dn = ldb.Dn(l, "dc=add")
500 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
501 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
503 rm = l.search(m.dn)[0]
504 self.assertEqual(3, len(rm))
505 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
507 l.delete(ldb.Dn(l, "dc=add"))
509 def test_modify_add_text(self):
510 l = ldb.Ldb(self.url(), flags=self.flags())
512 m.dn = ldb.Dn(l, "dc=add")
513 m.text["bla"] = ["1234"]
514 m["objectUUID"] = b"0123456789abcdef"
518 m.dn = ldb.Dn(l, "dc=add")
519 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
520 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
522 rm = l.search(m.dn)[0]
523 self.assertEqual(3, len(rm))
524 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
526 l.delete(ldb.Dn(l, "dc=add"))
528 def test_modify_replace(self):
529 l = ldb.Ldb(self.url(), flags=self.flags())
531 m.dn = ldb.Dn(l, "dc=modify2")
532 m["bla"] = [b"1234", b"456"]
533 m["objectUUID"] = b"0123456789abcdef"
537 m.dn = ldb.Dn(l, "dc=modify2")
538 m["bla"] = ldb.MessageElement([b"789"], ldb.FLAG_MOD_REPLACE, "bla")
539 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
541 rm = l.search(m.dn)[0]
542 self.assertEqual(3, len(rm))
543 self.assertEqual([b"789"], list(rm["bla"]))
544 rm = l.search(m.dn, attrs=["bla"])[0]
545 self.assertEqual(1, len(rm))
547 l.delete(ldb.Dn(l, "dc=modify2"))
549 def test_modify_replace_text(self):
550 l = ldb.Ldb(self.url(), flags=self.flags())
552 m.dn = ldb.Dn(l, "dc=modify2")
553 m.text["bla"] = ["1234", "456"]
554 m["objectUUID"] = b"0123456789abcdef"
558 m.dn = ldb.Dn(l, "dc=modify2")
559 m["bla"] = ldb.MessageElement(["789"], ldb.FLAG_MOD_REPLACE, "bla")
560 self.assertEqual(ldb.FLAG_MOD_REPLACE, m["bla"].flags())
562 rm = l.search(m.dn)[0]
563 self.assertEqual(3, len(rm))
564 self.assertEqual(["789"], list(rm.text["bla"]))
565 rm = l.search(m.dn, attrs=["bla"])[0]
566 self.assertEqual(1, len(rm))
568 l.delete(ldb.Dn(l, "dc=modify2"))
570 def test_modify_flags_change(self):
571 l = ldb.Ldb(self.url(), flags=self.flags())
573 m.dn = ldb.Dn(l, "dc=add")
575 m["objectUUID"] = b"0123456789abcdef"
579 m.dn = ldb.Dn(l, "dc=add")
580 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
581 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
583 rm = l.search(m.dn)[0]
584 self.assertEqual(3, len(rm))
585 self.assertEqual([b"1234", b"456"], list(rm["bla"]))
587 # Now create another modify, but switch the flags before we do it
588 m["bla"] = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
589 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
591 rm = l.search(m.dn, attrs=["bla"])[0]
592 self.assertEqual(1, len(rm))
593 self.assertEqual([b"1234"], list(rm["bla"]))
595 l.delete(ldb.Dn(l, "dc=add"))
597 def test_modify_flags_change_text(self):
598 l = ldb.Ldb(self.url(), flags=self.flags())
600 m.dn = ldb.Dn(l, "dc=add")
601 m.text["bla"] = ["1234"]
602 m["objectUUID"] = b"0123456789abcdef"
606 m.dn = ldb.Dn(l, "dc=add")
607 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
608 self.assertEqual(ldb.FLAG_MOD_ADD, m["bla"].flags())
610 rm = l.search(m.dn)[0]
611 self.assertEqual(3, len(rm))
612 self.assertEqual(["1234", "456"], list(rm.text["bla"]))
614 # Now create another modify, but switch the flags before we do it
615 m["bla"] = ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla")
616 m["bla"].set_flags(ldb.FLAG_MOD_DELETE)
618 rm = l.search(m.dn, attrs=["bla"])[0]
619 self.assertEqual(1, len(rm))
620 self.assertEqual(["1234"], list(rm.text["bla"]))
622 l.delete(ldb.Dn(l, "dc=add"))
624 def test_transaction_commit(self):
625 l = ldb.Ldb(self.url(), flags=self.flags())
626 l.transaction_start()
627 m = ldb.Message(ldb.Dn(l, "dc=foo9"))
629 m["objectUUID"] = b"0123456789abcdef"
631 l.transaction_commit()
634 def test_transaction_cancel(self):
635 l = ldb.Ldb(self.url(), flags=self.flags())
636 l.transaction_start()
637 m = ldb.Message(ldb.Dn(l, "dc=foo10"))
639 m["objectUUID"] = b"0123456789abcdee"
641 l.transaction_cancel()
642 self.assertEqual(0, len(l.search(ldb.Dn(l, "dc=foo10"))))
644 def test_set_debug(self):
645 def my_report_fn(level, text):
647 l = ldb.Ldb(self.url(), flags=self.flags())
648 l.set_debug(my_report_fn)
650 def test_zero_byte_string(self):
651 """Testing we do not get trapped in the \0 byte in a property string."""
652 l = ldb.Ldb(self.url(), flags=self.flags())
655 "objectclass" : b"user",
656 "cN" : b"LDAPtestUSER",
657 "givenname" : b"ldap",
658 "displayname" : b"foo\0bar",
659 "objectUUID" : b"0123456789abcdef"
661 res = l.search(expression="(dn=dc=somedn)")
662 self.assertEqual(b"foo\0bar", res[0]["displayname"][0])
664 def test_no_crash_broken_expr(self):
665 l = ldb.Ldb(self.url(), flags=self.flags())
666 self.assertRaises(ldb.LdbError,lambda: l.search("", ldb.SCOPE_SUBTREE, "&(dc=*)(dn=*)", ["dc"]))
668 # Run the SimpleLdb tests against an lmdb backend
669 class SimpleLdbLmdb(SimpleLdb):
672 self.prefix = MDB_PREFIX
673 self.index = {"dn": "@INDEXLIST",
675 "@IDXGUID": [b"objectUUID"],
676 "@IDX_DN_GUID": [b"GUID"]}
677 super(SimpleLdbLmdb, self).setUp()
680 super(SimpleLdbLmdb, self).tearDown()
682 class SearchTests(LdbBaseTest):
684 shutil.rmtree(self.testdir)
685 super(SearchTests, self).tearDown()
687 # Ensure the LDB is closed now, so we close the FD
692 super(SearchTests, self).setUp()
693 self.testdir = tempdir()
694 self.filename = os.path.join(self.testdir, "search_test.ldb")
695 self.l = ldb.Ldb(self.url(),
697 options=["modules:rdn_name"])
699 self.l.add(self.index)
700 except AttributeError:
703 self.l.add({"dn": "@ATTRIBUTES",
704 "DC": "CASE_INSENSITIVE"})
706 # Note that we can't use the name objectGUID here, as we
707 # want to stay clear of the objectGUID handler in LDB and
708 # instead use just the 16 bytes raw, which we just keep
709 # to printable chars here for ease of handling.
711 self.l.add({"dn": "DC=SAMBA,DC=ORG",
712 "name": b"samba.org",
713 "objectUUID": b"0123456789abcdef"})
714 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
717 "objectUUID": b"0123456789abcde1"})
718 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG",
721 "objectUUID": b"0123456789abcde2"})
722 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG",
725 "objectUUID": b"0123456789abcde3"})
726 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG",
729 "objectUUID": b"0123456789abcde4"})
730 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG",
733 "objectUUID": b"0123456789abcde5"})
734 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG",
737 "objectUUID": b"0123456789abcde6"})
738 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG",
741 "objectUUID": b"0123456789abcde7"})
742 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG",
745 "objectUUID": b"0123456789abcde8"})
746 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG",
749 "objectUUID": b"0123456789abcde9"})
750 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG",
753 "objectUUID": b"0123456789abcde0"})
754 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG",
757 "objectUUID": b"0123456789abcdea"})
758 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG",
761 "objectUUID": b"0123456789abcdeb"})
762 self.l.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
765 "objectUUID": b"0123456789abcdec"})
766 self.l.add({"dn": "OU=OU12,DC=SAMBA,DC=ORG",
769 "objectUUID": b"0123456789abcded"})
770 self.l.add({"dn": "OU=OU13,DC=SAMBA,DC=ORG",
773 "objectUUID": b"0123456789abcdee"})
774 self.l.add({"dn": "OU=OU14,DC=SAMBA,DC=ORG",
777 "objectUUID": b"0123456789abcd01"})
778 self.l.add({"dn": "OU=OU15,DC=SAMBA,DC=ORG",
781 "objectUUID": b"0123456789abcd02"})
782 self.l.add({"dn": "OU=OU16,DC=SAMBA,DC=ORG",
785 "objectUUID": b"0123456789abcd03"})
786 self.l.add({"dn": "OU=OU17,DC=SAMBA,DC=ORG",
789 "objectUUID": b"0123456789abcd04"})
790 self.l.add({"dn": "OU=OU18,DC=SAMBA,DC=ORG",
793 "objectUUID": b"0123456789abcd05"})
794 self.l.add({"dn": "OU=OU19,DC=SAMBA,DC=ORG",
797 "objectUUID": b"0123456789abcd06"})
798 self.l.add({"dn": "OU=OU20,DC=SAMBA,DC=ORG",
801 "objectUUID": b"0123456789abcd07"})
802 self.l.add({"dn": "OU=OU21,DC=SAMBA,DC=ORG",
805 "objectUUID": b"0123456789abcd08"})
806 self.l.add({"dn": "OU=OU22,DC=SAMBA,DC=ORG",
809 "objectUUID": b"0123456789abcd09"})
812 """Testing a search"""
814 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
815 scope=ldb.SCOPE_BASE)
816 self.assertEqual(len(res11), 1)
818 def test_base_lower(self):
819 """Testing a search"""
821 res11 = self.l.search(base="OU=OU11,DC=samba,DC=org",
822 scope=ldb.SCOPE_BASE)
823 self.assertEqual(len(res11), 1)
825 def test_base_or(self):
826 """Testing a search"""
828 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
829 scope=ldb.SCOPE_BASE,
830 expression="(|(ou=ou11)(ou=ou12))")
831 self.assertEqual(len(res11), 1)
833 def test_base_or2(self):
834 """Testing a search"""
836 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
837 scope=ldb.SCOPE_BASE,
838 expression="(|(x=y)(y=b))")
839 self.assertEqual(len(res11), 1)
841 def test_base_and(self):
842 """Testing a search"""
844 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
845 scope=ldb.SCOPE_BASE,
846 expression="(&(ou=ou11)(ou=ou12))")
847 self.assertEqual(len(res11), 0)
849 def test_base_and2(self):
850 """Testing a search"""
852 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
853 scope=ldb.SCOPE_BASE,
854 expression="(&(x=y)(y=a))")
855 self.assertEqual(len(res11), 1)
857 def test_base_false(self):
858 """Testing a search"""
860 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
861 scope=ldb.SCOPE_BASE,
862 expression="(|(ou=ou13)(ou=ou12))")
863 self.assertEqual(len(res11), 0)
865 def test_check_base_false(self):
866 """Testing a search"""
867 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
868 scope=ldb.SCOPE_BASE,
869 expression="(|(ou=ou13)(ou=ou12))")
870 self.assertEqual(len(res11), 0)
872 def test_check_base_error(self):
873 """Testing a search"""
874 checkbaseonsearch = {"dn": "@OPTIONS",
875 "checkBaseOnSearch": b"TRUE"}
877 self.l.add(checkbaseonsearch)
878 except ldb.LdbError as err:
880 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
881 m = ldb.Message.from_dict(self.l,
886 res11 = self.l.search(base="OU=OU11x,DC=SAMBA,DC=ORG",
887 scope=ldb.SCOPE_BASE,
888 expression="(|(ou=ou13)(ou=ou12))")
889 self.fail("Should have failed on missing base")
890 except ldb.LdbError as err:
892 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
894 def test_subtree_and(self):
895 """Testing a search"""
897 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
898 scope=ldb.SCOPE_SUBTREE,
899 expression="(&(ou=ou11)(ou=ou12))")
900 self.assertEqual(len(res11), 0)
902 def test_subtree_and2(self):
903 """Testing a search"""
905 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
906 scope=ldb.SCOPE_SUBTREE,
907 expression="(&(x=y)(|(y=b)(y=c)))")
908 self.assertEqual(len(res11), 1)
910 def test_subtree_and2_lower(self):
911 """Testing a search"""
913 res11 = self.l.search(base="DC=samba,DC=org",
914 scope=ldb.SCOPE_SUBTREE,
915 expression="(&(x=y)(|(y=b)(y=c)))")
916 self.assertEqual(len(res11), 1)
918 def test_subtree_or(self):
919 """Testing a search"""
921 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
922 scope=ldb.SCOPE_SUBTREE,
923 expression="(|(ou=ou11)(ou=ou12))")
924 self.assertEqual(len(res11), 2)
926 def test_subtree_or2(self):
927 """Testing a search"""
929 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
930 scope=ldb.SCOPE_SUBTREE,
931 expression="(|(x=y)(y=b))")
932 self.assertEqual(len(res11), 20)
934 def test_subtree_or3(self):
935 """Testing a search"""
937 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
938 scope=ldb.SCOPE_SUBTREE,
939 expression="(|(x=y)(y=b)(y=c))")
940 self.assertEqual(len(res11), 22)
942 def test_one_and(self):
943 """Testing a search"""
945 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
946 scope=ldb.SCOPE_ONELEVEL,
947 expression="(&(ou=ou11)(ou=ou12))")
948 self.assertEqual(len(res11), 0)
950 def test_one_and2(self):
951 """Testing a search"""
953 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
954 scope=ldb.SCOPE_ONELEVEL,
955 expression="(&(x=y)(y=b))")
956 self.assertEqual(len(res11), 1)
958 def test_one_or(self):
959 """Testing a search"""
961 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
962 scope=ldb.SCOPE_ONELEVEL,
963 expression="(|(ou=ou11)(ou=ou12))")
964 self.assertEqual(len(res11), 2)
966 def test_one_or2(self):
967 """Testing a search"""
969 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
970 scope=ldb.SCOPE_ONELEVEL,
971 expression="(|(x=y)(y=b))")
972 self.assertEqual(len(res11), 20)
974 def test_one_or2_lower(self):
975 """Testing a search"""
977 res11 = self.l.search(base="DC=samba,DC=org",
978 scope=ldb.SCOPE_ONELEVEL,
979 expression="(|(x=y)(y=b))")
980 self.assertEqual(len(res11), 20)
982 def test_subtree_and_or(self):
983 """Testing a search"""
985 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
986 scope=ldb.SCOPE_SUBTREE,
987 expression="(&(|(x=z)(y=b))(x=x)(y=c))")
988 self.assertEqual(len(res11), 0)
990 def test_subtree_and_or2(self):
991 """Testing a search"""
993 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
994 scope=ldb.SCOPE_SUBTREE,
995 expression="(&(x=x)(y=c)(|(x=z)(y=b)))")
996 self.assertEqual(len(res11), 0)
998 def test_subtree_and_or3(self):
999 """Testing a search"""
1001 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1002 scope=ldb.SCOPE_SUBTREE,
1003 expression="(&(|(ou=ou11)(ou=ou10))(|(x=y)(y=b)(y=c)))")
1004 self.assertEqual(len(res11), 2)
1006 def test_subtree_and_or4(self):
1007 """Testing a search"""
1009 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1010 scope=ldb.SCOPE_SUBTREE,
1011 expression="(&(|(x=y)(y=b)(y=c))(|(ou=ou11)(ou=ou10)))")
1012 self.assertEqual(len(res11), 2)
1014 def test_subtree_and_or5(self):
1015 """Testing a search"""
1017 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1018 scope=ldb.SCOPE_SUBTREE,
1019 expression="(&(|(x=y)(y=b)(y=c))(ou=ou11))")
1020 self.assertEqual(len(res11), 1)
1022 def test_subtree_or_and(self):
1023 """Testing a search"""
1025 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1026 scope=ldb.SCOPE_SUBTREE,
1027 expression="(|(x=x)(y=c)(&(x=z)(y=b)))")
1028 self.assertEqual(len(res11), 10)
1030 def test_subtree_large_and_unique(self):
1031 """Testing a search"""
1033 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1034 scope=ldb.SCOPE_SUBTREE,
1035 expression="(&(ou=ou10)(y=a))")
1036 self.assertEqual(len(res11), 1)
1038 def test_subtree_and_none(self):
1039 """Testing a search"""
1041 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1042 scope=ldb.SCOPE_SUBTREE,
1043 expression="(&(ou=ouX)(y=a))")
1044 self.assertEqual(len(res11), 0)
1046 def test_subtree_and_idx_record(self):
1047 """Testing a search against the index record"""
1049 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1050 scope=ldb.SCOPE_SUBTREE,
1051 expression="(@IDXDN=DC=SAMBA,DC=ORG)")
1052 self.assertEqual(len(res11), 0)
1054 def test_subtree_and_idxone_record(self):
1055 """Testing a search against the index record"""
1057 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1058 scope=ldb.SCOPE_SUBTREE,
1059 expression="(@IDXONE=DC=SAMBA,DC=ORG)")
1060 self.assertEqual(len(res11), 0)
1062 def test_dn_filter_one(self):
1063 """Testing that a dn= filter succeeds
1064 (or fails with disallowDNFilter
1065 set and IDXGUID or (IDX and not IDXONE) mode)
1066 when the scope is SCOPE_ONELEVEL.
1068 This should be made more consistent, but for now lock in
1073 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1074 scope=ldb.SCOPE_ONELEVEL,
1075 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1076 if hasattr(self, 'disallowDNFilter') and \
1077 hasattr(self, 'IDX') and \
1078 (hasattr(self, 'IDXGUID') or \
1079 ((hasattr(self, 'IDXONE') == False and hasattr(self, 'IDX')))):
1080 self.assertEqual(len(res11), 0)
1082 self.assertEqual(len(res11), 1)
1084 def test_dn_filter_subtree(self):
1085 """Testing that a dn= filter succeeds
1086 (or fails with disallowDNFilter set)
1087 when the scope is SCOPE_SUBTREE"""
1089 res11 = self.l.search(base="DC=SAMBA,DC=ORG",
1090 scope=ldb.SCOPE_SUBTREE,
1091 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1092 if hasattr(self, 'disallowDNFilter') \
1093 and hasattr(self, 'IDX'):
1094 self.assertEqual(len(res11), 0)
1096 self.assertEqual(len(res11), 1)
1098 def test_dn_filter_base(self):
1099 """Testing that (incorrectly) a dn= filter works
1100 when the scope is SCOPE_BASE"""
1102 res11 = self.l.search(base="OU=OU1,DC=SAMBA,DC=ORG",
1103 scope=ldb.SCOPE_BASE,
1104 expression="(dn=OU=OU1,DC=SAMBA,DC=ORG)")
1106 # At some point we should fix this, but it isn't trivial
1107 self.assertEqual(len(res11), 1)
1110 # Run the search tests against an lmdb backend
1111 class SearchTestsLmdb(SearchTests):
1114 self.prefix = MDB_PREFIX
1115 self.index = {"dn": "@INDEXLIST",
1116 "@IDXGUID": [b"objectUUID"],
1117 "@IDX_DN_GUID": [b"GUID"]}
1118 super(SearchTestsLmdb, self).setUp()
1121 super(SearchTestsLmdb, self).tearDown()
1124 class IndexedSearchTests(SearchTests):
1125 """Test searches using the index, to ensure the index doesn't
1128 super(IndexedSearchTests, self).setUp()
1129 self.l.add({"dn": "@INDEXLIST",
1130 "@IDXATTR": [b"x", b"y", b"ou"]})
1133 class IndexedSearchDnFilterTests(SearchTests):
1134 """Test searches using the index, to ensure the index doesn't
1137 super(IndexedSearchDnFilterTests, self).setUp()
1138 self.l.add({"dn": "@OPTIONS",
1139 "disallowDNFilter": "TRUE"})
1140 self.disallowDNFilter = True
1142 self.l.add({"dn": "@INDEXLIST",
1143 "@IDXATTR": [b"x", b"y", b"ou"]})
1146 class IndexedAndOneLevelSearchTests(SearchTests):
1147 """Test searches using the index including @IDXONE, to ensure
1148 the index doesn't break things"""
1150 super(IndexedAndOneLevelSearchTests, self).setUp()
1151 self.l.add({"dn": "@INDEXLIST",
1152 "@IDXATTR": [b"x", b"y", b"ou"],
1156 class IndexedAndOneLevelDNFilterSearchTests(SearchTests):
1157 """Test searches using the index including @IDXONE, to ensure
1158 the index doesn't break things"""
1160 super(IndexedAndOneLevelDNFilterSearchTests, self).setUp()
1161 self.l.add({"dn": "@OPTIONS",
1162 "disallowDNFilter": "TRUE"})
1163 self.disallowDNFilter = True
1165 self.l.add({"dn": "@INDEXLIST",
1166 "@IDXATTR": [b"x", b"y", b"ou"],
1171 class GUIDIndexedSearchTests(SearchTests):
1172 """Test searches using the index, to ensure the index doesn't
1175 self.index = {"dn": "@INDEXLIST",
1176 "@IDXATTR": [b"x", b"y", b"ou"],
1177 "@IDXGUID": [b"objectUUID"],
1178 "@IDX_DN_GUID": [b"GUID"]}
1179 super(GUIDIndexedSearchTests, self).setUp()
1185 class GUIDIndexedDNFilterSearchTests(SearchTests):
1186 """Test searches using the index, to ensure the index doesn't
1189 self.index = {"dn": "@INDEXLIST",
1190 "@IDXATTR": [b"x", b"y", b"ou"],
1191 "@IDXGUID": [b"objectUUID"],
1192 "@IDX_DN_GUID": [b"GUID"]}
1193 super(GUIDIndexedDNFilterSearchTests, self).setUp()
1194 self.l.add({"dn": "@OPTIONS",
1195 "disallowDNFilter": "TRUE"})
1196 self.disallowDNFilter = True
1200 class GUIDAndOneLevelIndexedSearchTests(SearchTests):
1201 """Test searches using the index including @IDXONE, to ensure
1202 the index doesn't break things"""
1204 self.index = {"dn": "@INDEXLIST",
1205 "@IDXATTR": [b"x", b"y", b"ou"],
1206 "@IDXGUID": [b"objectUUID"],
1207 "@IDX_DN_GUID": [b"GUID"]}
1208 super(GUIDAndOneLevelIndexedSearchTests, self).setUp()
1209 self.l.add({"dn": "@OPTIONS",
1210 "disallowDNFilter": "TRUE"})
1211 self.disallowDNFilter = True
1216 class GUIDIndexedSearchTestsLmdb(GUIDIndexedSearchTests):
1219 self.prefix = MDB_PREFIX
1220 super(GUIDIndexedSearchTestsLmdb, self).setUp()
1223 super(GUIDIndexedSearchTestsLmdb, self).tearDown()
1226 class GUIDIndexedDNFilterSearchTestsLmdb(GUIDIndexedDNFilterSearchTests):
1229 self.prefix = MDB_PREFIX
1230 super(GUIDIndexedDNFilterSearchTestsLmdb, self).setUp()
1233 super(GUIDIndexedDNFilterSearchTestsLmdb, self).tearDown()
1236 class GUIDAndOneLevelIndexedSearchTestsLmdb(GUIDAndOneLevelIndexedSearchTests):
1239 self.prefix = MDB_PREFIX
1240 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).setUp()
1243 super(GUIDAndOneLevelIndexedSearchTestsLmdb, self).tearDown()
1246 class AddModifyTests(LdbBaseTest):
1248 shutil.rmtree(self.testdir)
1249 super(AddModifyTests, self).tearDown()
1251 # Ensure the LDB is closed now, so we close the FD
1255 super(AddModifyTests, self).setUp()
1256 self.testdir = tempdir()
1257 self.filename = os.path.join(self.testdir, "add_test.ldb")
1258 self.l = ldb.Ldb(self.url(),
1260 options=["modules:rdn_name"])
1262 self.l.add(self.index)
1263 except AttributeError:
1266 self.l.add({"dn": "DC=SAMBA,DC=ORG",
1267 "name": b"samba.org",
1268 "objectUUID": b"0123456789abcdef"})
1269 self.l.add({"dn": "@ATTRIBUTES",
1270 "objectUUID": "UNIQUE_INDEX"})
1272 def test_add_dup(self):
1273 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1276 "objectUUID": b"0123456789abcde1"})
1278 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1281 "objectUUID": b"0123456789abcde2"})
1282 self.fail("Should have failed adding dupliate entry")
1283 except ldb.LdbError as err:
1285 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1287 def test_add_del_add(self):
1288 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1291 "objectUUID": b"0123456789abcde1"})
1292 self.l.delete("OU=DUP,DC=SAMBA,DC=ORG")
1293 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1296 "objectUUID": b"0123456789abcde2"})
1298 def test_add_move_add(self):
1299 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1302 "objectUUID": b"0123456789abcde1"})
1303 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1304 "OU=DUP2,DC=SAMBA,DC=ORG")
1305 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1308 "objectUUID": b"0123456789abcde2"})
1310 def test_add_move_fail_move_move(self):
1311 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1314 "objectUUID": b"0123456789abcde1"})
1315 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1318 "objectUUID": b"0123456789abcde2"})
1320 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1321 scope=ldb.SCOPE_SUBTREE,
1322 expression="(objectUUID=0123456789abcde1)")
1323 self.assertEqual(len(res2), 1)
1324 self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
1326 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1327 scope=ldb.SCOPE_SUBTREE,
1328 expression="(objectUUID=0123456789abcde2)")
1329 self.assertEqual(len(res3), 1)
1330 self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1333 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1334 "OU=DUP2,DC=SAMBA,DC=ORG")
1335 self.fail("Should have failed on duplicate DN")
1336 except ldb.LdbError as err:
1338 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1340 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1341 "OU=DUP3,DC=SAMBA,DC=ORG")
1343 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1344 "OU=DUP2,DC=SAMBA,DC=ORG")
1346 res2 = self.l.search(base="DC=SAMBA,DC=ORG",
1347 scope=ldb.SCOPE_SUBTREE,
1348 expression="(objectUUID=0123456789abcde1)")
1349 self.assertEqual(len(res2), 1)
1350 self.assertEqual(str(res2[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
1352 res3 = self.l.search(base="DC=SAMBA,DC=ORG",
1353 scope=ldb.SCOPE_SUBTREE,
1354 expression="(objectUUID=0123456789abcde2)")
1355 self.assertEqual(len(res3), 1)
1356 self.assertEqual(str(res3[0].dn), "OU=DUP3,DC=SAMBA,DC=ORG")
1358 def test_move_missing(self):
1360 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1361 "OU=DUP2,DC=SAMBA,DC=ORG")
1362 self.fail("Should have failed on missing")
1363 except ldb.LdbError as err:
1365 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1367 def test_move_missing2(self):
1368 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1371 "objectUUID": b"0123456789abcde2"})
1374 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1375 "OU=DUP2,DC=SAMBA,DC=ORG")
1376 self.fail("Should have failed on missing")
1377 except ldb.LdbError as err:
1379 self.assertEqual(enum, ldb.ERR_NO_SUCH_OBJECT)
1381 def test_move_fail_move_add(self):
1382 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1385 "objectUUID": b"0123456789abcde1"})
1386 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1389 "objectUUID": b"0123456789abcde2"})
1391 self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
1392 "OU=DUP2,DC=SAMBA,DC=ORG")
1393 self.fail("Should have failed on duplicate DN")
1394 except ldb.LdbError as err:
1396 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1398 self.l.rename("OU=DUP2,DC=SAMBA,DC=ORG",
1399 "OU=DUP3,DC=SAMBA,DC=ORG")
1401 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1404 "objectUUID": b"0123456789abcde3"})
1407 class AddModifyTestsLmdb(AddModifyTests):
1410 self.prefix = MDB_PREFIX
1412 super(AddModifyTestsLmdb, self).setUp()
1415 super(AddModifyTestsLmdb, self).tearDown()
1417 class IndexedAddModifyTests(AddModifyTests):
1418 """Test searches using the index, to ensure the index doesn't
1421 if not hasattr(self, 'index'):
1422 self.index = {"dn": "@INDEXLIST",
1423 "@IDXATTR": [b"x", b"y", b"ou", b"objectUUID"],
1425 super(IndexedAddModifyTests, self).setUp()
1427 def test_duplicate_GUID(self):
1429 self.l.add({"dn": "OU=DUPGUID,DC=SAMBA,DC=ORG",
1432 "objectUUID": b"0123456789abcdef"})
1433 self.fail("Should have failed adding dupliate GUID")
1434 except ldb.LdbError as err:
1436 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1438 def test_duplicate_name_dup_GUID(self):
1439 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1442 "objectUUID": b"a123456789abcdef"})
1444 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1447 "objectUUID": b"a123456789abcdef"})
1448 self.fail("Should have failed adding dupliate GUID")
1449 except ldb.LdbError as err:
1451 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1453 def test_duplicate_name_dup_GUID2(self):
1454 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1457 "objectUUID": b"abc3456789abcdef"})
1459 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG",
1462 "objectUUID": b"aaa3456789abcdef"})
1463 self.fail("Should have failed adding dupliate DN")
1464 except ldb.LdbError as err:
1466 self.assertEqual(enum, ldb.ERR_ENTRY_ALREADY_EXISTS)
1468 # Checking the GUID didn't stick in the index
1469 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1472 "objectUUID": b"aaa3456789abcdef"})
1474 def test_add_dup_guid_add(self):
1475 self.l.add({"dn": "OU=DUP,DC=SAMBA,DC=ORG",
1478 "objectUUID": b"0123456789abcde1"})
1480 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1483 "objectUUID": b"0123456789abcde1"})
1484 self.fail("Should have failed on duplicate GUID")
1486 except ldb.LdbError as err:
1488 self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
1490 self.l.add({"dn": "OU=DUP2,DC=SAMBA,DC=ORG",
1493 "objectUUID": b"0123456789abcde2"})
1495 class GUIDIndexedAddModifyTests(IndexedAddModifyTests):
1496 """Test searches using the index, to ensure the index doesn't
1499 self.index = {"dn": "@INDEXLIST",
1500 "@IDXATTR": [b"x", b"y", b"ou"],
1502 "@IDXGUID": [b"objectUUID"],
1503 "@IDX_DN_GUID": [b"GUID"]}
1504 super(GUIDIndexedAddModifyTests, self).setUp()
1507 class GUIDTransIndexedAddModifyTests(GUIDIndexedAddModifyTests):
1508 """Test GUID index behaviour insdie the transaction"""
1510 super(GUIDTransIndexedAddModifyTests, self).setUp()
1511 self.l.transaction_start()
1514 self.l.transaction_commit()
1515 super(GUIDTransIndexedAddModifyTests, self).tearDown()
1517 class TransIndexedAddModifyTests(IndexedAddModifyTests):
1518 """Test index behaviour insdie the transaction"""
1520 super(TransIndexedAddModifyTests, self).setUp()
1521 self.l.transaction_start()
1524 self.l.transaction_commit()
1525 super(TransIndexedAddModifyTests, self).tearDown()
1527 class GuidIndexedAddModifyTestsLmdb(GUIDIndexedAddModifyTests):
1530 self.prefix = MDB_PREFIX
1531 super(GuidIndexedAddModifyTestsLmdb, self).setUp()
1534 super(GuidIndexedAddModifyTestsLmdb, self).tearDown()
1536 class GuidTransIndexedAddModifyTestsLmdb(GUIDTransIndexedAddModifyTests):
1539 self.prefix = MDB_PREFIX
1540 super(GuidTransIndexedAddModifyTestsLmdb, self).setUp()
1543 super(GuidTransIndexedAddModifyTestsLmdb, self).tearDown()
1545 class BadIndexTests(LdbBaseTest):
1547 super(BadIndexTests, self).setUp()
1548 self.testdir = tempdir()
1549 self.filename = os.path.join(self.testdir, "test.ldb")
1550 self.ldb = ldb.Ldb(self.url(), flags=self.flags())
1551 if hasattr(self, 'IDXGUID'):
1552 self.ldb.add({"dn": "@INDEXLIST",
1553 "@IDXATTR": [b"x", b"y", b"ou"],
1554 "@IDXGUID": [b"objectUUID"],
1555 "@IDX_DN_GUID": [b"GUID"]})
1557 self.ldb.add({"dn": "@INDEXLIST",
1558 "@IDXATTR": [b"x", b"y", b"ou"]})
1560 super(BadIndexTests, self).setUp()
1562 def test_unique(self):
1563 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1564 "objectUUID": b"0123456789abcde1",
1566 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1567 "objectUUID": b"0123456789abcde2",
1569 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1570 "objectUUID": b"0123456789abcde3",
1573 res = self.ldb.search(expression="(y=1)",
1574 base="dc=samba,dc=org")
1575 self.assertEquals(len(res), 3)
1577 # Now set this to unique index, but forget to check the result
1579 self.ldb.add({"dn": "@ATTRIBUTES",
1580 "y": "UNIQUE_INDEX"})
1582 except ldb.LdbError:
1585 # We must still have a working index
1586 res = self.ldb.search(expression="(y=1)",
1587 base="dc=samba,dc=org")
1588 self.assertEquals(len(res), 3)
1590 def test_unique_transaction(self):
1591 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1592 "objectUUID": b"0123456789abcde1",
1594 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1595 "objectUUID": b"0123456789abcde2",
1597 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1598 "objectUUID": b"0123456789abcde3",
1601 res = self.ldb.search(expression="(y=1)",
1602 base="dc=samba,dc=org")
1603 self.assertEquals(len(res), 3)
1605 self.ldb.transaction_start()
1607 # Now set this to unique index, but forget to check the result
1609 self.ldb.add({"dn": "@ATTRIBUTES",
1610 "y": "UNIQUE_INDEX"})
1611 except ldb.LdbError:
1615 self.ldb.transaction_commit()
1618 except ldb.LdbError as err:
1620 self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
1622 # We must still have a working index
1623 res = self.ldb.search(expression="(y=1)",
1624 base="dc=samba,dc=org")
1626 self.assertEquals(len(res), 3)
1628 def test_casefold(self):
1629 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1630 "objectUUID": b"0123456789abcde1",
1632 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1633 "objectUUID": b"0123456789abcde2",
1635 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1636 "objectUUID": b"0123456789abcde3",
1639 res = self.ldb.search(expression="(y=a)",
1640 base="dc=samba,dc=org")
1641 self.assertEquals(len(res), 2)
1643 self.ldb.add({"dn": "@ATTRIBUTES",
1644 "y": "CASE_INSENSITIVE"})
1646 # We must still have a working index
1647 res = self.ldb.search(expression="(y=a)",
1648 base="dc=samba,dc=org")
1650 if hasattr(self, 'IDXGUID'):
1651 self.assertEquals(len(res), 3)
1653 # We should not return this entry twice, but sadly
1654 # we have not yet fixed
1655 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1656 self.assertEquals(len(res), 4)
1658 def test_casefold_transaction(self):
1659 self.ldb.add({"dn": "x=x,dc=samba,dc=org",
1660 "objectUUID": b"0123456789abcde1",
1662 self.ldb.add({"dn": "x=y,dc=samba,dc=org",
1663 "objectUUID": b"0123456789abcde2",
1665 self.ldb.add({"dn": "x=z,dc=samba,dc=org",
1666 "objectUUID": b"0123456789abcde3",
1669 res = self.ldb.search(expression="(y=a)",
1670 base="dc=samba,dc=org")
1671 self.assertEquals(len(res), 2)
1673 self.ldb.transaction_start()
1675 self.ldb.add({"dn": "@ATTRIBUTES",
1676 "y": "CASE_INSENSITIVE"})
1678 self.ldb.transaction_commit()
1680 # We must still have a working index
1681 res = self.ldb.search(expression="(y=a)",
1682 base="dc=samba,dc=org")
1684 if hasattr(self, 'IDXGUID'):
1685 self.assertEquals(len(res), 3)
1687 # We should not return this entry twice, but sadly
1688 # we have not yet fixed
1689 # https://bugzilla.samba.org/show_bug.cgi?id=13361
1690 self.assertEquals(len(res), 4)
1694 super(BadIndexTests, self).tearDown()
1697 class GUIDBadIndexTests(BadIndexTests):
1698 """Test Bad index things with GUID index mode"""
1702 super(GUIDBadIndexTests, self).setUp()
1704 class DnTests(TestCase):
1707 super(DnTests, self).setUp()
1708 self.ldb = ldb.Ldb()
1711 super(DnTests, self).tearDown()
1714 def test_set_dn_invalid(self):
1718 self.assertRaises(TypeError, assign)
1721 x = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1722 y = ldb.Dn(self.ldb, "dc=foo11,bar=bloe")
1723 self.assertEqual(x, y)
1724 y = ldb.Dn(self.ldb, "dc=foo11,bar=blie")
1725 self.assertNotEqual(x, y)
1728 x = ldb.Dn(self.ldb, "dc=foo12,bar=bloe")
1729 self.assertEqual(x.__str__(), "dc=foo12,bar=bloe")
1731 def test_repr(self):
1732 x = ldb.Dn(self.ldb, "dc=foo13,bla=blie")
1733 self.assertEqual(x.__repr__(), "Dn('dc=foo13,bla=blie')")
1735 def test_get_casefold(self):
1736 x = ldb.Dn(self.ldb, "dc=foo14,bar=bloe")
1737 self.assertEqual(x.get_casefold(), "DC=FOO14,BAR=bloe")
1739 def test_validate(self):
1740 x = ldb.Dn(self.ldb, "dc=foo15,bar=bloe")
1741 self.assertTrue(x.validate())
1743 def test_parent(self):
1744 x = ldb.Dn(self.ldb, "dc=foo16,bar=bloe")
1745 self.assertEqual("bar=bloe", x.parent().__str__())
1747 def test_parent_nonexistent(self):
1748 x = ldb.Dn(self.ldb, "@BLA")
1749 self.assertEqual(None, x.parent())
1751 def test_is_valid(self):
1752 x = ldb.Dn(self.ldb, "dc=foo18,dc=bloe")
1753 self.assertTrue(x.is_valid())
1754 x = ldb.Dn(self.ldb, "")
1755 self.assertTrue(x.is_valid())
1757 def test_is_special(self):
1758 x = ldb.Dn(self.ldb, "dc=foo19,bar=bloe")
1759 self.assertFalse(x.is_special())
1760 x = ldb.Dn(self.ldb, "@FOOBAR")
1761 self.assertTrue(x.is_special())
1763 def test_check_special(self):
1764 x = ldb.Dn(self.ldb, "dc=foo20,bar=bloe")
1765 self.assertFalse(x.check_special("FOOBAR"))
1766 x = ldb.Dn(self.ldb, "@FOOBAR")
1767 self.assertTrue(x.check_special("@FOOBAR"))
1770 x = ldb.Dn(self.ldb, "dc=foo21,bar=bloe")
1771 self.assertEqual(2, len(x))
1772 x = ldb.Dn(self.ldb, "dc=foo21")
1773 self.assertEqual(1, len(x))
1775 def test_add_child(self):
1776 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1777 self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
1778 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1780 def test_add_base(self):
1781 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1782 base = ldb.Dn(self.ldb, "bla=bloe")
1783 self.assertTrue(x.add_base(base))
1784 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1786 def test_add_child_str(self):
1787 x = ldb.Dn(self.ldb, "dc=foo22,bar=bloe")
1788 self.assertTrue(x.add_child("bla=bloe"))
1789 self.assertEqual("bla=bloe,dc=foo22,bar=bloe", x.__str__())
1791 def test_add_base_str(self):
1792 x = ldb.Dn(self.ldb, "dc=foo23,bar=bloe")
1794 self.assertTrue(x.add_base(base))
1795 self.assertEqual("dc=foo23,bar=bloe,bla=bloe", x.__str__())
1798 x = ldb.Dn(self.ldb, "dc=foo24")
1799 y = ldb.Dn(self.ldb, "bar=bla")
1800 self.assertEqual("dc=foo24,bar=bla", str(x + y))
1802 def test_remove_base_components(self):
1803 x = ldb.Dn(self.ldb, "dc=foo24,dc=samba,dc=org")
1804 x.remove_base_components(len(x)-1)
1805 self.assertEqual("dc=foo24", str(x))
1807 def test_parse_ldif(self):
1808 msgs = self.ldb.parse_ldif("dn: foo=bar\n")
1810 self.assertEqual("foo=bar", str(msg[1].dn))
1811 self.assertTrue(isinstance(msg[1], ldb.Message))
1812 ldif = self.ldb.write_ldif(msg[1], ldb.CHANGETYPE_NONE)
1813 self.assertEqual("dn: foo=bar\n\n", ldif)
1815 def test_parse_ldif_more(self):
1816 msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
1818 self.assertEqual("foo=bar", str(msg[1].dn))
1820 self.assertEqual("bar=bar", str(msg[1].dn))
1822 def test_canonical_string(self):
1823 x = ldb.Dn(self.ldb, "dc=foo25,bar=bloe")
1824 self.assertEqual("/bloe/foo25", x.canonical_str())
1826 def test_canonical_ex_string(self):
1827 x = ldb.Dn(self.ldb, "dc=foo26,bar=bloe")
1828 self.assertEqual("/bloe\nfoo26", x.canonical_ex_str())
1830 def test_ldb_is_child_of(self):
1831 """Testing ldb_dn_compare_dn"""
1832 dn1 = ldb.Dn(self.ldb, "dc=base")
1833 dn2 = ldb.Dn(self.ldb, "cn=foo,dc=base")
1834 dn3 = ldb.Dn(self.ldb, "cn=bar,dc=base")
1835 dn4 = ldb.Dn(self.ldb, "cn=baz,cn=bar,dc=base")
1837 self.assertTrue(dn1.is_child_of(dn1))
1838 self.assertTrue(dn2.is_child_of(dn1))
1839 self.assertTrue(dn4.is_child_of(dn1))
1840 self.assertTrue(dn4.is_child_of(dn3))
1841 self.assertTrue(dn4.is_child_of(dn4))
1842 self.assertFalse(dn3.is_child_of(dn2))
1843 self.assertFalse(dn1.is_child_of(dn4))
1845 def test_ldb_is_child_of_str(self):
1846 """Testing ldb_dn_compare_dn"""
1848 dn2_str = "cn=foo,dc=base"
1849 dn3_str = "cn=bar,dc=base"
1850 dn4_str = "cn=baz,cn=bar,dc=base"
1852 dn1 = ldb.Dn(self.ldb, dn1_str)
1853 dn2 = ldb.Dn(self.ldb, dn2_str)
1854 dn3 = ldb.Dn(self.ldb, dn3_str)
1855 dn4 = ldb.Dn(self.ldb, dn4_str)
1857 self.assertTrue(dn1.is_child_of(dn1_str))
1858 self.assertTrue(dn2.is_child_of(dn1_str))
1859 self.assertTrue(dn4.is_child_of(dn1_str))
1860 self.assertTrue(dn4.is_child_of(dn3_str))
1861 self.assertTrue(dn4.is_child_of(dn4_str))
1862 self.assertFalse(dn3.is_child_of(dn2_str))
1863 self.assertFalse(dn1.is_child_of(dn4_str))
1865 def test_get_component_name(self):
1866 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1867 self.assertEqual(dn.get_component_name(0), 'cn')
1868 self.assertEqual(dn.get_component_name(1), 'dc')
1869 self.assertEqual(dn.get_component_name(2), None)
1870 self.assertEqual(dn.get_component_name(-1), None)
1872 def test_get_component_value(self):
1873 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1874 self.assertEqual(dn.get_component_value(0), 'foo')
1875 self.assertEqual(dn.get_component_value(1), 'base')
1876 self.assertEqual(dn.get_component_name(2), None)
1877 self.assertEqual(dn.get_component_name(-1), None)
1879 def test_set_component(self):
1880 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1881 dn.set_component(0, 'cn', 'bar')
1882 self.assertEqual(str(dn), "cn=bar,dc=base")
1883 dn.set_component(1, 'o', 'asep')
1884 self.assertEqual(str(dn), "cn=bar,o=asep")
1885 self.assertRaises(TypeError, dn.set_component, 2, 'dc', 'base')
1886 self.assertEqual(str(dn), "cn=bar,o=asep")
1887 dn.set_component(1, 'o', 'a,b+c')
1888 self.assertEqual(str(dn), r"cn=bar,o=a\,b\+c")
1890 def test_set_component_bytes(self):
1891 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1892 dn.set_component(0, 'cn', b'bar')
1893 self.assertEqual(str(dn), "cn=bar,dc=base")
1894 dn.set_component(1, 'o', b'asep')
1895 self.assertEqual(str(dn), "cn=bar,o=asep")
1897 def test_set_component_none(self):
1898 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1899 self.assertRaises(TypeError, dn.set_component, 1, 'cn', None)
1901 def test_get_extended_component_null(self):
1902 dn = ldb.Dn(self.ldb, "cn=foo,cn=bar,dc=base")
1903 self.assertEqual(dn.get_extended_component("TEST"), None)
1905 def test_get_extended_component(self):
1906 self.ldb._register_test_extensions()
1907 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1908 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1910 def test_set_extended_component(self):
1911 self.ldb._register_test_extensions()
1912 dn = ldb.Dn(self.ldb, "dc=base")
1913 dn.set_extended_component("TEST", "foo")
1914 self.assertEqual(dn.get_extended_component("TEST"), b"foo")
1915 dn.set_extended_component("TEST", b"bar")
1916 self.assertEqual(dn.get_extended_component("TEST"), b"bar")
1918 def test_extended_str(self):
1919 self.ldb._register_test_extensions()
1920 dn = ldb.Dn(self.ldb, "<TEST=foo>;cn=bar,dc=base")
1921 self.assertEqual(dn.extended_str(), "<TEST=foo>;cn=bar,dc=base")
1923 def test_get_rdn_name(self):
1924 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1925 self.assertEqual(dn.get_rdn_name(), 'cn')
1927 def test_get_rdn_value(self):
1928 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1929 self.assertEqual(dn.get_rdn_value(), 'foo')
1931 def test_get_casefold(self):
1932 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1933 self.assertEqual(dn.get_casefold(), 'CN=FOO,DC=BASE')
1935 def test_get_linearized(self):
1936 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1937 self.assertEqual(dn.get_linearized(), 'cn=foo,dc=base')
1939 def test_is_null(self):
1940 dn = ldb.Dn(self.ldb, "cn=foo,dc=base")
1941 self.assertFalse(dn.is_null())
1943 dn = ldb.Dn(self.ldb, '')
1944 self.assertTrue(dn.is_null())
1946 class LdbMsgTests(TestCase):
1949 super(LdbMsgTests, self).setUp()
1950 self.msg = ldb.Message()
1952 def test_init_dn(self):
1953 self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo27"))
1954 self.assertEqual("dc=foo27", str(self.msg.dn))
1956 def test_iter_items(self):
1957 self.assertEqual(0, len(self.msg.items()))
1958 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo28")
1959 self.assertEqual(1, len(self.msg.items()))
1961 def test_repr(self):
1962 self.msg.dn = ldb.Dn(ldb.Ldb(), "dc=foo29")
1963 self.msg["dc"] = b"foo"
1965 self.assertIn(repr(self.msg), [
1966 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])})",
1967 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')})",
1969 self.assertIn(repr(self.msg.text), [
1970 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement([b'foo'])}).text",
1971 "Message({'dc': MessageElement([b'foo']), 'dn': Dn('dc=foo29')}).text",
1976 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])})")
1978 repr(self.msg.text),
1979 "Message({'dn': Dn('dc=foo29'), 'dc': MessageElement(['foo'])}).text")
1982 self.assertEqual(0, len(self.msg))
1984 def test_notpresent(self):
1985 self.assertRaises(KeyError, lambda: self.msg["foo"])
1991 self.msg.add(ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla"))
1993 def test_add_text(self):
1994 self.msg.add(ldb.MessageElement(["456"], ldb.FLAG_MOD_ADD, "bla"))
1996 def test_elements_empty(self):
1997 self.assertEqual([], self.msg.elements())
1999 def test_elements(self):
2000 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2002 self.assertEqual([el], self.msg.elements())
2003 self.assertEqual([el.text], self.msg.text.elements())
2005 def test_add_value(self):
2006 self.assertEqual(0, len(self.msg))
2007 self.msg["foo"] = [b"foo"]
2008 self.assertEqual(1, len(self.msg))
2010 def test_add_value_text(self):
2011 self.assertEqual(0, len(self.msg))
2012 self.msg["foo"] = ["foo"]
2013 self.assertEqual(1, len(self.msg))
2015 def test_add_value_multiple(self):
2016 self.assertEqual(0, len(self.msg))
2017 self.msg["foo"] = [b"foo", b"bla"]
2018 self.assertEqual(1, len(self.msg))
2019 self.assertEqual([b"foo", b"bla"], list(self.msg["foo"]))
2021 def test_add_value_multiple_text(self):
2022 self.assertEqual(0, len(self.msg))
2023 self.msg["foo"] = ["foo", "bla"]
2024 self.assertEqual(1, len(self.msg))
2025 self.assertEqual(["foo", "bla"], list(self.msg.text["foo"]))
2027 def test_set_value(self):
2028 self.msg["foo"] = [b"fool"]
2029 self.assertEqual([b"fool"], list(self.msg["foo"]))
2030 self.msg["foo"] = [b"bar"]
2031 self.assertEqual([b"bar"], list(self.msg["foo"]))
2033 def test_set_value_text(self):
2034 self.msg["foo"] = ["fool"]
2035 self.assertEqual(["fool"], list(self.msg.text["foo"]))
2036 self.msg["foo"] = ["bar"]
2037 self.assertEqual(["bar"], list(self.msg.text["foo"]))
2039 def test_keys(self):
2040 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2041 self.msg["foo"] = [b"bla"]
2042 self.msg["bar"] = [b"bla"]
2043 self.assertEqual(["dn", "foo", "bar"], self.msg.keys())
2045 def test_keys_text(self):
2046 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2047 self.msg["foo"] = ["bla"]
2048 self.msg["bar"] = ["bla"]
2049 self.assertEqual(["dn", "foo", "bar"], self.msg.text.keys())
2052 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2053 self.assertEqual("@BASEINFO", self.msg.dn.__str__())
2055 def test_get_dn(self):
2056 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2057 self.assertEqual("@BASEINFO", self.msg.get("dn").__str__())
2059 def test_dn_text(self):
2060 self.msg.text.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2061 self.assertEqual("@BASEINFO", str(self.msg.dn))
2062 self.assertEqual("@BASEINFO", str(self.msg.text.dn))
2064 def test_get_dn_text(self):
2065 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2066 self.assertEqual("@BASEINFO", str(self.msg.get("dn")))
2067 self.assertEqual("@BASEINFO", str(self.msg.text.get("dn")))
2069 def test_get_invalid(self):
2070 self.msg.dn = ldb.Dn(ldb.Ldb(), "@BASEINFO")
2071 self.assertRaises(TypeError, self.msg.get, 42)
2073 def test_get_other(self):
2074 self.msg["foo"] = [b"bar"]
2075 self.assertEqual(b"bar", self.msg.get("foo")[0])
2076 self.assertEqual(b"bar", self.msg.get("foo", idx=0))
2077 self.assertEqual(None, self.msg.get("foo", idx=1))
2078 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2080 def test_get_other_text(self):
2081 self.msg["foo"] = ["bar"]
2082 self.assertEqual(["bar"], list(self.msg.text.get("foo")))
2083 self.assertEqual("bar", self.msg.text.get("foo")[0])
2084 self.assertEqual("bar", self.msg.text.get("foo", idx=0))
2085 self.assertEqual(None, self.msg.get("foo", idx=1))
2086 self.assertEqual("", self.msg.get("foo", default='', idx=1))
2088 def test_get_default(self):
2089 self.assertEqual(None, self.msg.get("tatayoyo", idx=0))
2090 self.assertEqual("anniecordie", self.msg.get("tatayoyo", "anniecordie"))
2092 def test_get_default_text(self):
2093 self.assertEqual(None, self.msg.text.get("tatayoyo", idx=0))
2094 self.assertEqual("anniecordie", self.msg.text.get("tatayoyo", "anniecordie"))
2096 def test_get_unknown(self):
2097 self.assertEqual(None, self.msg.get("lalalala"))
2099 def test_get_unknown_text(self):
2100 self.assertEqual(None, self.msg.text.get("lalalala"))
2102 def test_msg_diff(self):
2104 msgs = l.parse_ldif("dn: foo=bar\nfoo: bar\nbaz: do\n\ndn: foo=bar\nfoo: bar\nbaz: dont\n")
2105 msg1 = next(msgs)[1]
2106 msg2 = next(msgs)[1]
2107 msgdiff = l.msg_diff(msg1, msg2)
2108 self.assertEqual("foo=bar", msgdiff.get("dn").__str__())
2109 self.assertRaises(KeyError, lambda: msgdiff["foo"])
2110 self.assertEqual(1, len(msgdiff))
2112 def test_equal_empty(self):
2113 msg1 = ldb.Message()
2114 msg2 = ldb.Message()
2115 self.assertEqual(msg1, msg2)
2117 def test_equal_simplel(self):
2119 msg1 = ldb.Message()
2120 msg1.dn = ldb.Dn(db, "foo=bar")
2121 msg2 = ldb.Message()
2122 msg2.dn = ldb.Dn(db, "foo=bar")
2123 self.assertEqual(msg1, msg2)
2124 msg1['foo'] = b'bar'
2125 msg2['foo'] = b'bar'
2126 self.assertEqual(msg1, msg2)
2127 msg2['foo'] = b'blie'
2128 self.assertNotEqual(msg1, msg2)
2129 msg2['foo'] = b'blie'
2131 def test_from_dict(self):
2132 rec = {"dn": "dc=fromdict",
2133 "a1": [b"a1-val1", b"a1-val1"]}
2135 # check different types of input Flags
2136 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2137 m = ldb.Message.from_dict(l, rec, flags)
2138 self.assertEqual(rec["a1"], list(m["a1"]))
2139 self.assertEqual(flags, m["a1"].flags())
2140 # check input params
2141 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2142 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2143 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2144 # Message.from_dict expects dictionary with 'dn'
2145 err_rec = {"a1": [b"a1-val1", b"a1-val1"]}
2146 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2148 def test_from_dict_text(self):
2149 rec = {"dn": "dc=fromdict",
2150 "a1": ["a1-val1", "a1-val1"]}
2152 # check different types of input Flags
2153 for flags in [ldb.FLAG_MOD_ADD, ldb.FLAG_MOD_REPLACE, ldb.FLAG_MOD_DELETE]:
2154 m = ldb.Message.from_dict(l, rec, flags)
2155 self.assertEqual(rec["a1"], list(m.text["a1"]))
2156 self.assertEqual(flags, m.text["a1"].flags())
2157 # check input params
2158 self.assertRaises(TypeError, ldb.Message.from_dict, dict(), rec, ldb.FLAG_MOD_REPLACE)
2159 self.assertRaises(TypeError, ldb.Message.from_dict, l, list(), ldb.FLAG_MOD_REPLACE)
2160 self.assertRaises(ValueError, ldb.Message.from_dict, l, rec, 0)
2161 # Message.from_dict expects dictionary with 'dn'
2162 err_rec = {"a1": ["a1-val1", "a1-val1"]}
2163 self.assertRaises(TypeError, ldb.Message.from_dict, l, err_rec, ldb.FLAG_MOD_REPLACE)
2165 def test_copy_add_message_element(self):
2167 m["1"] = ldb.MessageElement([b"val 111"], ldb.FLAG_MOD_ADD, "1")
2168 m["2"] = ldb.MessageElement([b"val 222"], ldb.FLAG_MOD_ADD, "2")
2172 self.assertEqual(mto["1"], m["1"])
2173 self.assertEqual(mto["2"], m["2"])
2177 self.assertEqual(mto["1"], m["1"])
2178 self.assertEqual(mto["2"], m["2"])
2180 def test_copy_add_message_element_text(self):
2182 m["1"] = ldb.MessageElement(["val 111"], ldb.FLAG_MOD_ADD, "1")
2183 m["2"] = ldb.MessageElement(["val 222"], ldb.FLAG_MOD_ADD, "2")
2187 self.assertEqual(mto["1"], m.text["1"])
2188 self.assertEqual(mto["2"], m.text["2"])
2192 self.assertEqual(mto.text["1"], m.text["1"])
2193 self.assertEqual(mto.text["2"], m.text["2"])
2194 self.assertEqual(mto["1"], m["1"])
2195 self.assertEqual(mto["2"], m["2"])
2198 class MessageElementTests(TestCase):
2200 def test_cmp_element(self):
2201 x = ldb.MessageElement([b"foo"])
2202 y = ldb.MessageElement([b"foo"])
2203 z = ldb.MessageElement([b"bzr"])
2204 self.assertEqual(x, y)
2205 self.assertNotEqual(x, z)
2207 def test_cmp_element_text(self):
2208 x = ldb.MessageElement([b"foo"])
2209 y = ldb.MessageElement(["foo"])
2210 self.assertEqual(x, y)
2212 def test_create_iterable(self):
2213 x = ldb.MessageElement([b"foo"])
2214 self.assertEqual([b"foo"], list(x))
2215 self.assertEqual(["foo"], list(x.text))
2217 def test_repr(self):
2218 x = ldb.MessageElement([b"foo"])
2220 self.assertEqual("MessageElement([b'foo'])", repr(x))
2221 self.assertEqual("MessageElement([b'foo']).text", repr(x.text))
2223 self.assertEqual("MessageElement(['foo'])", repr(x))
2224 self.assertEqual("MessageElement(['foo']).text", repr(x.text))
2225 x = ldb.MessageElement([b"foo", b"bla"])
2226 self.assertEqual(2, len(x))
2228 self.assertEqual("MessageElement([b'foo',b'bla'])", repr(x))
2229 self.assertEqual("MessageElement([b'foo',b'bla']).text", repr(x.text))
2231 self.assertEqual("MessageElement(['foo','bla'])", repr(x))
2232 self.assertEqual("MessageElement(['foo','bla']).text", repr(x.text))
2234 def test_get_item(self):
2235 x = ldb.MessageElement([b"foo", b"bar"])
2236 self.assertEqual(b"foo", x[0])
2237 self.assertEqual(b"bar", x[1])
2238 self.assertEqual(b"bar", x[-1])
2239 self.assertRaises(IndexError, lambda: x[45])
2241 def test_get_item_text(self):
2242 x = ldb.MessageElement(["foo", "bar"])
2243 self.assertEqual("foo", x.text[0])
2244 self.assertEqual("bar", x.text[1])
2245 self.assertEqual("bar", x.text[-1])
2246 self.assertRaises(IndexError, lambda: x[45])
2249 x = ldb.MessageElement([b"foo", b"bar"])
2250 self.assertEqual(2, len(x))
2253 x = ldb.MessageElement([b"foo", b"bar"])
2254 y = ldb.MessageElement([b"foo", b"bar"])
2255 self.assertEqual(y, x)
2256 x = ldb.MessageElement([b"foo"])
2257 self.assertNotEqual(y, x)
2258 y = ldb.MessageElement([b"foo"])
2259 self.assertEqual(y, x)
2261 def test_extended(self):
2262 el = ldb.MessageElement([b"456"], ldb.FLAG_MOD_ADD, "bla")
2264 self.assertEqual("MessageElement([b'456'])", repr(el))
2265 self.assertEqual("MessageElement([b'456']).text", repr(el.text))
2267 self.assertEqual("MessageElement(['456'])", repr(el))
2268 self.assertEqual("MessageElement(['456']).text", repr(el.text))
2270 def test_bad_text(self):
2271 el = ldb.MessageElement(b'\xba\xdd')
2272 self.assertRaises(UnicodeDecodeError, el.text.__getitem__, 0)
2275 class ModuleTests(TestCase):
2278 super(ModuleTests, self).setUp()
2279 self.testdir = tempdir()
2280 self.filename = os.path.join(self.testdir, "test.ldb")
2281 self.ldb = ldb.Ldb(self.filename)
2284 shutil.rmtree(self.testdir)
2285 super(ModuleTests, self).setUp()
2287 def test_register_module(self):
2288 class ExampleModule:
2290 ldb.register_module(ExampleModule)
2292 def test_use_module(self):
2294 class ExampleModule:
2297 def __init__(self, ldb, next):
2301 def search(self, *args, **kwargs):
2302 return self.next.search(*args, **kwargs)
2304 def request(self, *args, **kwargs):
2307 ldb.register_module(ExampleModule)
2308 l = ldb.Ldb(self.filename)
2309 l.add({"dn": "@MODULES", "@LIST": "bla"})
2310 self.assertEqual([], ops)
2311 l = ldb.Ldb(self.filename)
2312 self.assertEqual(["init"], ops)
2314 class LdbResultTests(LdbBaseTest):
2317 super(LdbResultTests, self).setUp()
2318 self.testdir = tempdir()
2319 self.filename = os.path.join(self.testdir, "test.ldb")
2320 self.l = ldb.Ldb(self.url(), flags=self.flags())
2322 self.l.add(self.index)
2323 except AttributeError:
2325 self.l.add({"dn": "DC=SAMBA,DC=ORG", "name": b"samba.org",
2326 "objectUUID": b"0123456789abcde0"})
2327 self.l.add({"dn": "OU=ADMIN,DC=SAMBA,DC=ORG", "name": b"Admins",
2328 "objectUUID": b"0123456789abcde1"})
2329 self.l.add({"dn": "OU=USERS,DC=SAMBA,DC=ORG", "name": b"Users",
2330 "objectUUID": b"0123456789abcde2"})
2331 self.l.add({"dn": "OU=OU1,DC=SAMBA,DC=ORG", "name": b"OU #1",
2332 "objectUUID": b"0123456789abcde3"})
2333 self.l.add({"dn": "OU=OU2,DC=SAMBA,DC=ORG", "name": b"OU #2",
2334 "objectUUID": b"0123456789abcde4"})
2335 self.l.add({"dn": "OU=OU3,DC=SAMBA,DC=ORG", "name": b"OU #3",
2336 "objectUUID": b"0123456789abcde5"})
2337 self.l.add({"dn": "OU=OU4,DC=SAMBA,DC=ORG", "name": b"OU #4",
2338 "objectUUID": b"0123456789abcde6"})
2339 self.l.add({"dn": "OU=OU5,DC=SAMBA,DC=ORG", "name": b"OU #5",
2340 "objectUUID": b"0123456789abcde7"})
2341 self.l.add({"dn": "OU=OU6,DC=SAMBA,DC=ORG", "name": b"OU #6",
2342 "objectUUID": b"0123456789abcde8"})
2343 self.l.add({"dn": "OU=OU7,DC=SAMBA,DC=ORG", "name": b"OU #7",
2344 "objectUUID": b"0123456789abcde9"})
2345 self.l.add({"dn": "OU=OU8,DC=SAMBA,DC=ORG", "name": b"OU #8",
2346 "objectUUID": b"0123456789abcdea"})
2347 self.l.add({"dn": "OU=OU9,DC=SAMBA,DC=ORG", "name": b"OU #9",
2348 "objectUUID": b"0123456789abcdeb"})
2349 self.l.add({"dn": "OU=OU10,DC=SAMBA,DC=ORG", "name": b"OU #10",
2350 "objectUUID": b"0123456789abcdec"})
2353 shutil.rmtree(self.testdir)
2354 super(LdbResultTests, self).tearDown()
2355 # Ensure the LDB is closed now, so we close the FD
2358 def test_return_type(self):
2359 res = self.l.search()
2360 self.assertEqual(str(res), "<ldb result>")
2362 def test_get_msgs(self):
2363 res = self.l.search()
2366 def test_get_controls(self):
2367 res = self.l.search()
2370 def test_get_referals(self):
2371 res = self.l.search()
2374 def test_iter_msgs(self):
2376 for l in self.l.search().msgs:
2377 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2379 self.assertTrue(found)
2381 def test_iter_msgs_count(self):
2382 self.assertTrue(self.l.search().count > 0)
2383 # 13 objects has been added to the DC=SAMBA, DC=ORG
2384 self.assertEqual(self.l.search(base="DC=SAMBA,DC=ORG").count, 13)
2386 def test_iter_controls(self):
2387 res = self.l.search().controls
2390 def test_create_control(self):
2391 self.assertRaises(ValueError, ldb.Control, self.l, "tatayoyo:0")
2392 c = ldb.Control(self.l, "relax:1")
2393 self.assertEqual(c.critical, True)
2394 self.assertEqual(c.oid, "1.3.6.1.4.1.4203.666.5.12")
2396 def test_iter_refs(self):
2397 res = self.l.search().referals
2400 def test_search_sequence_msgs(self):
2402 res = self.l.search().msgs
2404 for i in range(0, len(res)):
2406 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2408 self.assertTrue(found)
2410 def test_search_as_iter(self):
2412 res = self.l.search()
2415 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2417 self.assertTrue(found)
2419 def test_search_iter(self):
2421 res = self.l.search_iterator()
2424 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2426 self.assertTrue(found)
2429 # Show that search results can't see into a transaction
2430 def test_search_against_trans(self):
2433 (r1, w1) = os.pipe()
2435 (r2, w2) = os.pipe()
2437 # For the first element, fork a child that will
2441 # In the child, re-open
2445 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2446 # start a transaction
2447 child_ldb.transaction_start()
2450 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2451 "name": b"samba.org",
2452 "objectUUID": b"o123456789acbdef"})
2454 os.write(w1, b"added")
2456 # Now wait for the search to be done
2461 child_ldb.transaction_commit()
2462 except LdbError as err:
2463 # We print this here to see what went wrong in the child
2467 os.write(w1, b"transaction")
2470 self.assertEqual(os.read(r1, 5), b"added")
2472 # This should not turn up until the transaction is concluded
2473 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2474 scope=ldb.SCOPE_BASE)
2475 self.assertEqual(len(res11), 0)
2477 os.write(w2, b"search")
2479 # Now wait for the transaction to be done. This should
2480 # deadlock, but the search doesn't hold a read lock for the
2481 # iterator lifetime currently.
2482 self.assertEqual(os.read(r1, 11), b"transaction")
2484 # This should now turn up, as the transaction is over
2485 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2486 scope=ldb.SCOPE_BASE)
2487 self.assertEqual(len(res11), 1)
2489 self.assertFalse(found11)
2491 (got_pid, status) = os.waitpid(pid, 0)
2492 self.assertEqual(got_pid, pid)
2495 def test_search_iter_against_trans(self):
2499 # We need to hold this iterator open to hold the all-record
2501 res = self.l.search_iterator()
2503 (r1, w1) = os.pipe()
2505 (r2, w2) = os.pipe()
2507 # For the first element, with the sequence open (which
2508 # means with ldb locks held), fork a child that will
2512 # In the child, re-open
2517 child_ldb = ldb.Ldb(self.url(), flags=self.flags())
2518 # start a transaction
2519 child_ldb.transaction_start()
2522 child_ldb.add({"dn": "OU=OU11,DC=SAMBA,DC=ORG",
2523 "name": b"samba.org",
2524 "objectUUID": b"o123456789acbdef"})
2526 os.write(w1, b"added")
2528 # Now wait for the search to be done
2533 child_ldb.transaction_commit()
2534 except LdbError as err:
2535 # We print this here to see what went wrong in the child
2539 os.write(w1, b"transaction")
2542 self.assertEqual(os.read(r1, 5), b"added")
2544 # This should not turn up until the transaction is concluded
2545 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2546 scope=ldb.SCOPE_BASE)
2547 self.assertEqual(len(res11), 0)
2549 os.write(w2, b"search")
2551 # allow the transaction to start
2554 # This should not turn up until the search finishes and
2555 # removed the read lock, but for ldb_tdb that happened as soon
2556 # as we called the first res.next()
2557 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2558 scope=ldb.SCOPE_BASE)
2559 self.assertEqual(len(res11), 0)
2561 # These results are all collected at the first next(res) call
2563 if str(l.dn) == "OU=OU10,DC=SAMBA,DC=ORG":
2565 if str(l.dn) == "OU=OU11,DC=SAMBA,DC=ORG":
2568 # Now wait for the transaction to be done.
2569 self.assertEqual(os.read(r1, 11), b"transaction")
2571 # This should now turn up, as the transaction is over and all
2572 # read locks are gone
2573 res11 = self.l.search(base="OU=OU11,DC=SAMBA,DC=ORG",
2574 scope=ldb.SCOPE_BASE)
2575 self.assertEqual(len(res11), 1)
2577 self.assertTrue(found)
2578 self.assertFalse(found11)
2580 (got_pid, status) = os.waitpid(pid, 0)
2581 self.assertEqual(got_pid, pid)
2584 class LdbResultTestsLmdb(LdbResultTests):
2587 self.prefix = MDB_PREFIX
2588 self.index = {"dn": "@INDEXLIST",
2589 "@IDXGUID": [b"objectUUID"],
2590 "@IDX_DN_GUID": [b"GUID"]}
2591 super(LdbResultTestsLmdb, self).setUp()
2594 super(LdbResultTestsLmdb, self).tearDown()
2597 class BadTypeTests(TestCase):
2598 def test_control(self):
2600 self.assertRaises(TypeError, ldb.Control, '<bad type>', 'relax:1')
2601 self.assertRaises(TypeError, ldb.Control, ldb, 1234)
2603 def test_modify(self):
2605 dn = ldb.Dn(l, 'a=b')
2607 self.assertRaises(TypeError, l.modify, '<bad type>')
2608 self.assertRaises(TypeError, l.modify, m, '<bad type>')
2612 dn = ldb.Dn(l, 'a=b')
2614 self.assertRaises(TypeError, l.add, '<bad type>')
2615 self.assertRaises(TypeError, l.add, m, '<bad type>')
2617 def test_delete(self):
2619 dn = ldb.Dn(l, 'a=b')
2620 self.assertRaises(TypeError, l.add, '<bad type>')
2621 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2623 def test_rename(self):
2625 dn = ldb.Dn(l, 'a=b')
2626 self.assertRaises(TypeError, l.add, '<bad type>', dn)
2627 self.assertRaises(TypeError, l.add, dn, '<bad type>')
2628 self.assertRaises(TypeError, l.add, dn, dn, '<bad type>')
2630 def test_search(self):
2632 self.assertRaises(TypeError, l.search, base=1234)
2633 self.assertRaises(TypeError, l.search, scope='<bad type>')
2634 self.assertRaises(TypeError, l.search, expression=1234)
2635 self.assertRaises(TypeError, l.search, attrs='<bad type>')
2636 self.assertRaises(TypeError, l.search, controls='<bad type>')
2639 class VersionTests(TestCase):
2641 def test_version(self):
2642 self.assertTrue(isinstance(ldb.__version__, str))
2645 if __name__ == '__main__':
2647 unittest.TestProgram()