3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, "bin/python")
25 from samba.tests.subunitrun import TestProgram, SubunitOptions
27 import samba.getopt as options
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs
35 from samba.ndr import ndr_unpack, ndr_pack
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
42 from samba.tests import delete_force
44 parser = optparse.OptionParser("dirsync.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
62 ldaphost = "ldap://%s" % host
63 ldapshost = "ldaps://%s" % host
66 start = host.rindex("://")
67 host = host.lstrip(start + 3)
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
77 class DirsyncBaseTests(samba.tests.TestCase):
80 super(DirsyncBaseTests, self).setUp()
81 self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
82 self.base_dn = self.ldb_admin.domain_dn()
83 self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
84 self.user_pass = samba.generate_random_password(12, 16)
85 self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
86 self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
87 # used for anonymous login
88 print("baseDN: %s" % self.base_dn)
90 def get_user_dn(self, name):
91 return "CN=%s,CN=Users,%s" % (name, self.base_dn)
93 def get_ldb_connection(self, target_username, target_password):
94 creds_tmp = Credentials()
95 creds_tmp.set_username(target_username)
96 creds_tmp.set_password(target_password)
97 creds_tmp.set_domain(creds.get_domain())
98 creds_tmp.set_realm(creds.get_realm())
99 creds_tmp.set_workstation(creds.get_workstation())
100 creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
101 | gensec.FEATURE_SEAL)
102 creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
103 ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
107 # tests on ldap add operations
108 class SimpleDirsyncTests(DirsyncBaseTests):
111 super(SimpleDirsyncTests, self).setUp()
113 self.dirsync_user = "test_dirsync_user"
114 self.simple_user = "test_simple_user"
115 self.admin_user = "test_admin_user"
118 self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
119 self.ldb_admin.newuser(self.simple_user, self.user_pass)
120 self.ldb_admin.newuser(self.admin_user, self.user_pass)
121 self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
123 user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
124 mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
126 self.sd_utils.dacl_add_ace(self.base_dn, mod)
128 # add admins to the Domain Admins group
129 self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
130 add_members_operation=True)
133 super(SimpleDirsyncTests, self).tearDown()
134 delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
135 delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
136 delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
138 delete_force(self.ldb_admin, self.ouname)
139 self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
141 # def test_dirsync_errors(self):
143 def test_dirsync_supported(self):
144 """Test the basic of the dirsync is supported"""
145 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
146 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
147 res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
148 res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
150 self.ldb_simple.search(self.base_dn,
151 expression="samaccountname=*",
152 controls=["dirsync:1:0:1"])
153 except LdbError as l:
154 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
156 def test_parentGUID_referrals(self):
157 res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
159 res = self.ldb_admin.search(self.base_dn,
160 expression="name=Configuration",
161 controls=["dirsync:1:0:1"])
162 self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
164 def test_ok_not_rootdc(self):
165 """Test if it's ok to do dirsync on another NC that is not the root DC"""
166 self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
167 expression="samaccountname=*",
168 controls=["dirsync:1:0:1"])
170 def test_dirsync_errors(self):
171 """Test if dirsync returns the correct LDAP errors in case of pb"""
172 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
173 self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
175 self.ldb_simple.search(self.base_dn,
176 expression="samaccountname=*",
177 controls=["dirsync:1:0:1"])
178 except LdbError as l:
180 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
183 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
184 expression="samaccountname=*",
185 controls=["dirsync:1:0:1"])
186 except LdbError as l:
188 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
191 self.ldb_simple.search("CN=Users,%s" % self.base_dn,
192 expression="samaccountname=*",
193 controls=["dirsync:1:1:1"])
194 except LdbError as l:
196 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
199 self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
200 expression="samaccountname=*",
201 controls=["dirsync:1:0:1"])
202 except LdbError as l:
204 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
207 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
208 expression="samaccountname=*",
209 controls=["dirsync:1:0:1"])
210 except LdbError as l:
212 self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
215 self.ldb_admin.search("CN=Users,%s" % self.base_dn,
216 expression="samaccountname=*",
217 controls=["dirsync:1:1:1"])
218 except LdbError as l:
220 self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
222 def test_dirsync_attributes(self):
223 """Check behavior with some attributes """
224 res = self.ldb_admin.search(self.base_dn,
225 expression="samaccountname=*",
226 controls=["dirsync:1:0:1"])
227 # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
228 self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") is not None)
229 # Check that non replicated attributes are not returned
230 self.assertTrue(res.msgs[0].get("badPwdCount") is None)
231 # Check that non forward link are not returned
232 self.assertTrue(res.msgs[0].get("memberof") is None)
234 # Asking for instanceType will return also objectGUID
235 res = self.ldb_admin.search(self.base_dn,
236 expression="samaccountname=Administrator",
237 attrs=["instanceType"],
238 controls=["dirsync:1:0:1"])
239 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
240 self.assertTrue(res.msgs[0].get("instanceType") is not None)
242 # We don't return an entry if asked for objectGUID
243 res = self.ldb_admin.search(self.base_dn,
244 expression="(distinguishedName=%s)" % str(self.base_dn),
245 attrs=["objectGUID"],
246 controls=["dirsync:1:0:1"])
247 self.assertEqual(len(res.msgs), 0)
249 # a request on the root of a NC didn't return parentGUID
250 res = self.ldb_admin.search(self.base_dn,
251 expression="(distinguishedName=%s)" % str(self.base_dn),
253 controls=["dirsync:1:0:1"])
254 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
255 self.assertTrue(res.msgs[0].get("name") is not None)
256 self.assertTrue(res.msgs[0].get("parentGUID") is None)
257 self.assertTrue(res.msgs[0].get("instanceType") is not None)
259 # Asking for name will return also objectGUID and parentGUID
260 # and instanceType and of course name
261 res = self.ldb_admin.search(self.base_dn,
262 expression="samaccountname=Administrator",
264 controls=["dirsync:1:0:1"])
265 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
266 self.assertTrue(res.msgs[0].get("name") is not None)
267 self.assertTrue(res.msgs[0].get("parentGUID") is not None)
268 self.assertTrue(res.msgs[0].get("instanceType") is not None)
270 # Asking for dn will not return not only DN but more like if attrs=*
271 # parentGUID should be returned
272 res = self.ldb_admin.search(self.base_dn,
273 expression="samaccountname=Administrator",
275 controls=["dirsync:1:0:1"])
276 count = len(res.msgs[0])
277 res2 = self.ldb_admin.search(self.base_dn,
278 expression="samaccountname=Administrator",
279 controls=["dirsync:1:0:1"])
280 count2 = len(res2.msgs[0])
281 self.assertEqual(count, count2)
283 # Asking for cn will return nothing on objects that have CN as RDN
284 res = self.ldb_admin.search(self.base_dn,
285 expression="samaccountname=Administrator",
287 controls=["dirsync:1:0:1"])
288 self.assertEqual(len(res.msgs), 0)
289 # Asking for parentGUID will return nothing too
290 res = self.ldb_admin.search(self.base_dn,
291 expression="samaccountname=Administrator",
292 attrs=["parentGUID"],
293 controls=["dirsync:1:0:1"])
294 self.assertEqual(len(res.msgs), 0)
295 ouname = "OU=testou,%s" % self.base_dn
297 self.ldb_admin.create_ou(ouname)
299 delta.dn = Dn(self.ldb_admin, str(ouname))
300 delta["cn"] = MessageElement("test ou",
303 self.ldb_admin.modify(delta)
304 res = self.ldb_admin.search(self.base_dn,
305 expression="name=testou",
307 controls=["dirsync:1:0:1"])
309 self.assertEqual(len(res.msgs), 1)
310 self.assertEqual(len(res.msgs[0]), 3)
311 delete_force(self.ldb_admin, ouname)
313 def test_dirsync_with_controls(self):
314 """Check that dirsync return correct information when dealing with the NC"""
315 res = self.ldb_admin.search(self.base_dn,
316 expression="(distinguishedName=%s)" % str(self.base_dn),
318 controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
320 def test_dirsync_basenc(self):
321 """Check that dirsync return correct information when dealing with the NC"""
322 res = self.ldb_admin.search(self.base_dn,
323 expression="(distinguishedName=%s)" % str(self.base_dn),
325 controls=["dirsync:1:0:10000"])
326 self.assertEqual(len(res.msgs), 1)
327 self.assertEqual(len(res.msgs[0]), 3)
329 res = self.ldb_admin.search(self.base_dn,
330 expression="(distinguishedName=%s)" % str(self.base_dn),
331 attrs=["ntSecurityDescriptor"],
332 controls=["dirsync:1:0:10000"])
333 self.assertEqual(len(res.msgs), 1)
334 self.assertEqual(len(res.msgs[0]), 3)
336 def test_dirsync_othernc(self):
337 """Check that dirsync return information for entries that are normally referrals (ie. other NCs)"""
338 res = self.ldb_admin.search(self.base_dn,
339 expression="(objectclass=configuration)",
341 controls=["dirsync:1:0:10000"])
342 self.assertEqual(len(res.msgs), 1)
343 self.assertEqual(len(res.msgs[0]), 4)
345 res = self.ldb_admin.search(self.base_dn,
346 expression="(objectclass=configuration)",
347 attrs=["ntSecurityDescriptor"],
348 controls=["dirsync:1:0:10000"])
349 self.assertEqual(len(res.msgs), 1)
350 self.assertEqual(len(res.msgs[0]), 3)
352 res = self.ldb_admin.search(self.base_dn,
353 expression="(objectclass=domaindns)",
354 attrs=["ntSecurityDescriptor"],
355 controls=["dirsync:1:0:10000"])
358 # only sub nc returns a result when asked for objectGUID
359 res = self.ldb_admin.search(self.base_dn,
360 expression="(objectclass=domaindns)",
361 attrs=["objectGUID"],
362 controls=["dirsync:1:0:0"])
363 self.assertEqual(len(res.msgs), nb - 1)
365 self.assertTrue(res.msgs[0].get("objectGUID") is not None)
367 res = self.ldb_admin.search(self.base_dn,
368 expression="(objectclass=configuration)",
369 attrs=["objectGUID"],
370 controls=["dirsync:1:0:0"])
372 def test_dirsync_send_delta(self):
373 """Check that dirsync return correct delta when sending the last cookie"""
374 res = self.ldb_admin.search(self.base_dn,
375 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
376 controls=["dirsync:1:0:10000"])
377 ctl = str(res.controls[0]).split(":")
381 control = str(":".join(ctl))
382 res = self.ldb_admin.search(self.base_dn,
383 expression="(&(samaccountname=test*)(!(isDeleted=*)))",
385 self.assertEqual(len(res), 0)
387 res = self.ldb_admin.search(self.base_dn,
388 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
389 controls=["dirsync:1:0:100000"])
391 ctl = str(res.controls[0]).split(":")
395 control2 = str(":".join(ctl))
398 ouname = "OU=testou2,%s" % self.base_dn
400 self.ldb_admin.create_ou(ouname)
401 res = self.ldb_admin.search(self.base_dn,
402 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
404 self.assertEqual(len(res), 1)
405 ctl = str(res.controls[0]).split(":")
409 control3 = str(":".join(ctl))
412 delta.dn = Dn(self.ldb_admin, str(ouname))
414 delta["cn"] = MessageElement("test ou",
417 self.ldb_admin.modify(delta)
418 res = self.ldb_admin.search(self.base_dn,
419 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
422 self.assertEqual(len(res.msgs), 1)
423 # 3 attributes: instanceType, cn and objectGUID
424 self.assertEqual(len(res.msgs[0]), 3)
427 delta.dn = Dn(self.ldb_admin, str(ouname))
428 delta["cn"] = MessageElement([],
431 self.ldb_admin.modify(delta)
432 res = self.ldb_admin.search(self.base_dn,
433 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
436 self.assertEqual(len(res.msgs), 1)
437 # So we won't have much attribute returned but instanceType and GUID
439 # 3 attributes: instanceType and objectGUID and cn but empty
440 self.assertEqual(len(res.msgs[0]), 3)
441 ouname = "OU=newouname,%s" % self.base_dn
442 self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
444 ctl = str(res.controls[0]).split(":")
448 control4 = str(":".join(ctl))
449 res = self.ldb_admin.search(self.base_dn,
450 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
453 self.assertTrue(res[0].get("parentGUID") is not None)
454 self.assertTrue(res[0].get("name") is not None)
455 delete_force(self.ldb_admin, ouname)
457 def test_dirsync_linkedattributes_OBJECT_SECURITY(self):
458 """Check that dirsync returned deleted objects too"""
459 # Let's search for members
460 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
461 res = self.ldb_simple.search(self.base_dn,
462 expression="(name=Administrators)",
463 controls=["dirsync:1:1:1"])
465 self.assertTrue(len(res[0].get("member")) > 0)
466 size = len(res[0].get("member"))
468 ctl = str(res.controls[0]).split(":")
472 control1 = str(":".join(ctl))
473 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
474 add_members_operation=True)
476 res = self.ldb_simple.search(self.base_dn,
477 expression="(name=Administrators)",
480 self.assertEqual(len(res[0].get("member")), size + 1)
481 ctl = str(res.controls[0]).split(":")
485 control1 = str(":".join(ctl))
487 # remove the user from the group
488 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
489 add_members_operation=False)
491 res = self.ldb_simple.search(self.base_dn,
492 expression="(name=Administrators)",
495 self.assertEqual(len(res[0].get("member")), size)
497 self.ldb_admin.newgroup("testgroup")
498 self.addCleanup(self.ldb_admin.deletegroup, "testgroup")
499 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
500 add_members_operation=True)
502 res = self.ldb_admin.search(self.base_dn,
503 expression="(name=testgroup)",
504 controls=["dirsync:1:0:1"])
506 self.assertEqual(len(res[0].get("member")), 1)
507 self.assertTrue(res[0].get("member") != "")
509 ctl = str(res.controls[0]).split(":")
513 control1 = str(":".join(ctl))
515 # Check that reasking the same question but with an updated cookie
516 # didn't return any results.
518 res = self.ldb_admin.search(self.base_dn,
519 expression="(name=testgroup)",
521 self.assertEqual(len(res), 0)
523 ctl = str(res.controls[0]).split(":")
527 control1 = str(":".join(ctl))
529 self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
530 add_members_operation=False)
532 res = self.ldb_admin.search(self.base_dn,
533 expression="(name=testgroup)",
537 self.assertEqual(len(res[0].get("member")), 0)
539 def test_dirsync_deleted_items(self):
540 """Check that dirsync returned deleted objects too"""
542 ouname = "OU=testou3,%s" % self.base_dn
544 self.ldb_admin.create_ou(ouname)
545 res = self.ldb_admin.search(self.base_dn,
546 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
547 controls=["dirsync:1:0:1"])
550 if str(e["name"]) == "testou3":
551 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
553 ctl = str(res.controls[0]).split(":")
557 control1 = str(":".join(ctl))
559 # So now delete the object and check that
560 # we can see the object but deleted when admin
561 delete_force(self.ldb_admin, ouname)
563 res = self.ldb_admin.search(self.base_dn,
564 expression="(objectClass=organizationalUnit)",
566 self.assertEqual(len(res), 1)
567 guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
568 self.assertEqual(guid2, guid)
569 self.assertTrue(res[0].get("isDeleted"))
570 self.assertTrue(res[0].get("name") is not None)
572 def test_cookie_from_others(self):
573 res = self.ldb_admin.search(self.base_dn,
574 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
575 controls=["dirsync:1:0:1"])
576 ctl = str(res.controls[0]).split(":")
577 cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
578 cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
579 controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
580 res = self.ldb_admin.search(self.base_dn,
581 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
584 def test_dirsync_linkedattributes_range(self):
585 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
586 res = self.ldb_admin.search(self.base_dn,
587 attrs=["member;range=1-1"],
588 expression="(name=Administrators)",
589 controls=["dirsync:1:0:0"])
591 self.assertTrue(len(res) > 0)
592 self.assertTrue(res[0].get("member;range=1-1") is None)
593 self.assertTrue(res[0].get("member") is not None)
594 self.assertTrue(len(res[0].get("member")) > 0)
596 def test_dirsync_linkedattributes_range_user(self):
597 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
599 res = self.ldb_simple.search(self.base_dn,
600 attrs=["member;range=1-1"],
601 expression="(name=Administrators)",
602 controls=["dirsync:1:0:0"])
603 except LdbError as e:
605 self.assertEqual(num, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
609 def test_dirsync_linkedattributes(self):
610 flag_incr_linked = 2147483648
611 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
612 res = self.ldb_admin.search(self.base_dn,
614 expression="(name=Administrators)",
615 controls=["dirsync:1:%d:1" % flag_incr_linked])
617 self.assertTrue(res[0].get("member;range=1-1") is not None)
618 self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
619 size = len(res[0].get("member;range=1-1"))
621 ctl = str(res.controls[0]).split(":")
623 ctl[2] = "%d" % flag_incr_linked
625 control1 = str(":".join(ctl))
626 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
627 add_members_operation=True)
628 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
629 add_members_operation=True)
631 res = self.ldb_admin.search(self.base_dn,
632 expression="(name=Administrators)",
635 self.assertEqual(len(res[0].get("member;range=1-1")), 2)
636 ctl = str(res.controls[0]).split(":")
638 ctl[2] = "%d" % flag_incr_linked
640 control1 = str(":".join(ctl))
642 # remove the user from the group
643 self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
644 add_members_operation=False)
646 res = self.ldb_admin.search(self.base_dn,
647 expression="(name=Administrators)",
650 self.assertEqual(res[0].get("member;range=1-1"), None)
651 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
653 ctl = str(res.controls[0]).split(":")
655 ctl[2] = "%d" % flag_incr_linked
657 control2 = str(":".join(ctl))
659 self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
660 add_members_operation=False)
662 res = self.ldb_admin.search(self.base_dn,
663 expression="(name=Administrators)",
666 self.assertEqual(res[0].get("member;range=1-1"), None)
667 self.assertEqual(len(res[0].get("member;range=0-0")), 1)
669 res = self.ldb_admin.search(self.base_dn,
670 expression="(name=Administrators)",
673 self.assertEqual(res[0].get("member;range=1-1"), None)
674 self.assertEqual(len(res[0].get("member;range=0-0")), 2)
676 def test_dirsync_extended_dn(self):
677 """Check that dirsync works together with the extended_dn control"""
678 # Let's search for members
679 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
680 res = self.ldb_simple.search(self.base_dn,
681 expression="(name=Administrators)",
682 controls=["dirsync:1:1:1"])
684 self.assertTrue(len(res[0].get("member")) > 0)
685 size = len(res[0].get("member"))
687 resEX1 = self.ldb_simple.search(self.base_dn,
688 expression="(name=Administrators)",
689 controls=["dirsync:1:1:1","extended_dn:1:1"])
690 self.assertTrue(len(resEX1[0].get("member")) > 0)
691 sizeEX1 = len(resEX1[0].get("member"))
692 self.assertEqual(sizeEX1, size)
693 self.assertIn(res[0]["member"][0], resEX1[0]["member"][0])
694 self.assertIn(b"<GUID=", resEX1[0]["member"][0])
695 self.assertIn(b">;<SID=S-1-5-21-", resEX1[0]["member"][0])
697 resEX0 = self.ldb_simple.search(self.base_dn,
698 expression="(name=Administrators)",
699 controls=["dirsync:1:1:1","extended_dn:1:0"])
700 self.assertTrue(len(resEX0[0].get("member")) > 0)
701 sizeEX0 = len(resEX0[0].get("member"))
702 self.assertEqual(sizeEX0, size)
703 self.assertIn(res[0]["member"][0], resEX0[0]["member"][0])
704 self.assertIn(b"<GUID=", resEX0[0]["member"][0])
705 self.assertIn(b">;<SID=010500000000000515", resEX0[0]["member"][0])
707 def test_dirsync_deleted_items_OBJECT_SECURITY(self):
708 """Check that dirsync returned deleted objects too"""
710 self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
711 ouname = "OU=testou3,%s" % self.base_dn
713 self.ldb_admin.create_ou(ouname)
715 # Specify LDAP_DIRSYNC_OBJECT_SECURITY
716 res = self.ldb_simple.search(self.base_dn,
717 expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
718 controls=["dirsync:1:1:1"])
722 if str(e["name"]) == "testou3":
723 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
725 self.assertTrue(guid is not None)
726 ctl = str(res.controls[0]).split(":")
730 control1 = str(":".join(ctl))
732 # So now delete the object and check that
733 # we can see the object but deleted when admin
734 # we just see the objectGUID when simple user
735 delete_force(self.ldb_admin, ouname)
737 res = self.ldb_simple.search(self.base_dn,
738 expression="(objectClass=organizationalUnit)",
740 self.assertEqual(len(res), 1)
741 guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
742 self.assertEqual(guid2, guid)
743 self.assertEqual(str(res[0].dn), "")
745 def test_dirsync_unicodePwd(self):
746 res = self.ldb_admin.search(self.base_dn,
747 attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
748 expression="(samAccountName=krbtgt)",
749 controls=["dirsync:1:0:0"])
751 self.assertTrue(len(res) == 1)
752 # This form ensures this is a case insensitive comparison
753 self.assertTrue("samAccountName" in res[0])
754 self.assertTrue(res[0].get("samAccountName"))
755 self.assertTrue(res[0].get("unicodePwd") is None)
756 self.assertTrue(res[0].get("supplementalCredentials") is None)
758 if not getattr(opts, "listtests", False):
759 lp = sambaopts.get_loadparm()
760 samba.tests.cmdline_credentials = credopts.get_credentials(lp)
763 TestProgram(module=__name__, opts=subunitopts)