CVE-2023-4154 dsdb/tests: Check that secret attributes are not visible with DirSync...
[metze/samba-autobuild-v4-18-test/.git] / source4 / dsdb / tests / python / dirsync.py
1 #!/usr/bin/env python3
2 #
3 # Unit tests for dirsync control
4 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
5 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 import optparse
22 import sys
23 sys.path.insert(0, "bin/python")
24 import samba
25 from samba.tests.subunitrun import TestProgram, SubunitOptions
26
27 import samba.getopt as options
28 import base64
29
30 import ldb
31 from ldb import LdbError, SCOPE_BASE
32 from ldb import Message, MessageElement, Dn
33 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
34 from samba.dcerpc import security, misc, drsblobs
35 from samba.ndr import ndr_unpack, ndr_pack
36
37 from samba.auth import system_session
38 from samba import gensec, sd_utils
39 from samba.samdb import SamDB
40 from samba.credentials import Credentials, DONT_USE_KERBEROS
41 import samba.tests
42 from samba.tests import delete_force
43
44 parser = optparse.OptionParser("dirsync.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
48
49 # use command line creds if available
50 credopts = options.CredentialsOptions(parser)
51 parser.add_option_group(credopts)
52 subunitopts = SubunitOptions(parser)
53 parser.add_option_group(subunitopts)
54 opts, args = parser.parse_args()
55
56 if len(args) < 1:
57     parser.print_usage()
58     sys.exit(1)
59
60 host = args.pop()
61 if "://" not in host:
62     ldaphost = "ldap://%s" % host
63     ldapshost = "ldaps://%s" % host
64 else:
65     ldaphost = host
66     start = host.rindex("://")
67     host = host.lstrip(start + 3)
68
69 lp = sambaopts.get_loadparm()
70 creds = credopts.get_credentials(lp)
71
72 #
73 # Tests start here
74 #
75
76
77 class DirsyncBaseTests(samba.tests.TestCase):
78
79     def setUp(self):
80         super(DirsyncBaseTests, self).setUp()
81         self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
82         self.base_dn = self.ldb_admin.domain_dn()
83         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
84         self.user_pass = samba.generate_random_password(12, 16)
85         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
86         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
87         # used for anonymous login
88         print("baseDN: %s" % self.base_dn)
89
90     def get_user_dn(self, name):
91         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
92
93     def get_ldb_connection(self, target_username, target_password):
94         creds_tmp = Credentials()
95         creds_tmp.set_username(target_username)
96         creds_tmp.set_password(target_password)
97         creds_tmp.set_domain(creds.get_domain())
98         creds_tmp.set_realm(creds.get_realm())
99         creds_tmp.set_workstation(creds.get_workstation())
100         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
101                                       | gensec.FEATURE_SEAL)
102         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
103         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
104         return ldb_target
105
106
107 # tests on ldap add operations
108 class SimpleDirsyncTests(DirsyncBaseTests):
109
110     def setUp(self):
111         super(SimpleDirsyncTests, self).setUp()
112         # Regular user
113         self.dirsync_user = "test_dirsync_user"
114         self.simple_user = "test_simple_user"
115         self.admin_user = "test_admin_user"
116         self.ouname = None
117
118         self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
119         self.ldb_admin.newuser(self.simple_user, self.user_pass)
120         self.ldb_admin.newuser(self.admin_user, self.user_pass)
121         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
122
123         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
124         mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
125                                    str(user_sid))
126         self.sd_utils.dacl_add_ace(self.base_dn, mod)
127
128         # add admins to the Domain Admins group
129         self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
130                                                 add_members_operation=True)
131
132     def tearDown(self):
133         super(SimpleDirsyncTests, self).tearDown()
134         delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
135         delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
136         delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
137         if self.ouname:
138             delete_force(self.ldb_admin, self.ouname)
139         self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
140
141     # def test_dirsync_errors(self):
142
143     def test_dirsync_supported(self):
144         """Test the basic of the dirsync is supported"""
145         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
146         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
147         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
148         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
149         try:
150             self.ldb_simple.search(self.base_dn,
151                                    expression="samaccountname=*",
152                                    controls=["dirsync:1:0:1"])
153         except LdbError as l:
154             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
155
156     def test_parentGUID_referrals(self):
157         res2 = self.ldb_admin.search(self.base_dn, scope=SCOPE_BASE, attrs=["objectGUID"])
158
159         res = self.ldb_admin.search(self.base_dn,
160                                     expression="name=Configuration",
161                                     controls=["dirsync:1:0:1"])
162         self.assertEqual(res2[0].get("objectGUID"), res[0].get("parentGUID"))
163
164     def test_ok_not_rootdc(self):
165         """Test if it's ok to do dirsync on another NC that is not the root DC"""
166         self.ldb_admin.search(self.ldb_admin.get_config_basedn(),
167                               expression="samaccountname=*",
168                               controls=["dirsync:1:0:1"])
169
170     def test_dirsync_errors(self):
171         """Test if dirsync returns the correct LDAP errors in case of pb"""
172         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
173         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
174         try:
175             self.ldb_simple.search(self.base_dn,
176                                    expression="samaccountname=*",
177                                    controls=["dirsync:1:0:1"])
178         except LdbError as l:
179             print(l)
180             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
181
182         try:
183             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
184                                    expression="samaccountname=*",
185                                    controls=["dirsync:1:0:1"])
186         except LdbError as l:
187             print(l)
188             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
189
190         try:
191             self.ldb_simple.search("CN=Users,%s" % self.base_dn,
192                                    expression="samaccountname=*",
193                                    controls=["dirsync:1:1:1"])
194         except LdbError as l:
195             print(l)
196             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
197
198         try:
199             self.ldb_dirsync.search("CN=Users,%s" % self.base_dn,
200                                     expression="samaccountname=*",
201                                     controls=["dirsync:1:0:1"])
202         except LdbError as l:
203             print(l)
204             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
205
206         try:
207             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
208                                   expression="samaccountname=*",
209                                   controls=["dirsync:1:0:1"])
210         except LdbError as l:
211             print(l)
212             self.assertTrue(str(l).find("LDAP_INSUFFICIENT_ACCESS_RIGHTS") != -1)
213
214         try:
215             self.ldb_admin.search("CN=Users,%s" % self.base_dn,
216                                   expression="samaccountname=*",
217                                   controls=["dirsync:1:1:1"])
218         except LdbError as l:
219             print(l)
220             self.assertTrue(str(l).find("LDAP_UNWILLING_TO_PERFORM") != -1)
221
222     def test_dirsync_attributes(self):
223         """Check behavior with some attributes """
224         res = self.ldb_admin.search(self.base_dn,
225                                     expression="samaccountname=*",
226                                     controls=["dirsync:1:0:1"])
227         # Check that nTSecurityDescriptor is returned as it's the case when doing dirsync
228         self.assertTrue(res.msgs[0].get("ntsecuritydescriptor") is not None)
229         # Check that non replicated attributes are not returned
230         self.assertTrue(res.msgs[0].get("badPwdCount") is None)
231         # Check that non forward link are not returned
232         self.assertTrue(res.msgs[0].get("memberof") is None)
233
234         # Asking for instanceType will return also objectGUID
235         res = self.ldb_admin.search(self.base_dn,
236                                     expression="samaccountname=Administrator",
237                                     attrs=["instanceType"],
238                                     controls=["dirsync:1:0:1"])
239         self.assertTrue(res.msgs[0].get("objectGUID") is not None)
240         self.assertTrue(res.msgs[0].get("instanceType") is not None)
241
242         # We don't return an entry if asked for objectGUID
243         res = self.ldb_admin.search(self.base_dn,
244                                     expression="(distinguishedName=%s)" % str(self.base_dn),
245                                     attrs=["objectGUID"],
246                                     controls=["dirsync:1:0:1"])
247         self.assertEqual(len(res.msgs), 0)
248
249         # a request on the root of a NC didn't return parentGUID
250         res = self.ldb_admin.search(self.base_dn,
251                                     expression="(distinguishedName=%s)" % str(self.base_dn),
252                                     attrs=["name"],
253                                     controls=["dirsync:1:0:1"])
254         self.assertTrue(res.msgs[0].get("objectGUID") is not None)
255         self.assertTrue(res.msgs[0].get("name") is not None)
256         self.assertTrue(res.msgs[0].get("parentGUID") is None)
257         self.assertTrue(res.msgs[0].get("instanceType") is not None)
258
259         # Asking for name will return also objectGUID and parentGUID
260         # and instanceType and of course name
261         res = self.ldb_admin.search(self.base_dn,
262                                     expression="samaccountname=Administrator",
263                                     attrs=["name"],
264                                     controls=["dirsync:1:0:1"])
265         self.assertTrue(res.msgs[0].get("objectGUID") is not None)
266         self.assertTrue(res.msgs[0].get("name") is not None)
267         self.assertTrue(res.msgs[0].get("parentGUID") is not None)
268         self.assertTrue(res.msgs[0].get("instanceType") is not None)
269
270         # Asking for dn will not return not only DN but more like if attrs=*
271         # parentGUID should be returned
272         res = self.ldb_admin.search(self.base_dn,
273                                     expression="samaccountname=Administrator",
274                                     attrs=["dn"],
275                                     controls=["dirsync:1:0:1"])
276         count = len(res.msgs[0])
277         res2 = self.ldb_admin.search(self.base_dn,
278                                      expression="samaccountname=Administrator",
279                                      controls=["dirsync:1:0:1"])
280         count2 = len(res2.msgs[0])
281         self.assertEqual(count, count2)
282
283         # Asking for cn will return nothing on objects that have CN as RDN
284         res = self.ldb_admin.search(self.base_dn,
285                                     expression="samaccountname=Administrator",
286                                     attrs=["cn"],
287                                     controls=["dirsync:1:0:1"])
288         self.assertEqual(len(res.msgs), 0)
289         # Asking for parentGUID will return nothing too
290         res = self.ldb_admin.search(self.base_dn,
291                                     expression="samaccountname=Administrator",
292                                     attrs=["parentGUID"],
293                                     controls=["dirsync:1:0:1"])
294         self.assertEqual(len(res.msgs), 0)
295         ouname = "OU=testou,%s" % self.base_dn
296         self.ouname = ouname
297         self.ldb_admin.create_ou(ouname)
298         delta = Message()
299         delta.dn = Dn(self.ldb_admin, str(ouname))
300         delta["cn"] = MessageElement("test ou",
301                                      FLAG_MOD_ADD,
302                                      "cn")
303         self.ldb_admin.modify(delta)
304         res = self.ldb_admin.search(self.base_dn,
305                                     expression="name=testou",
306                                     attrs=["cn"],
307                                     controls=["dirsync:1:0:1"])
308
309         self.assertEqual(len(res.msgs), 1)
310         self.assertEqual(len(res.msgs[0]), 3)
311         delete_force(self.ldb_admin, ouname)
312
313     def test_dirsync_with_controls(self):
314         """Check that dirsync return correct information when dealing with the NC"""
315         res = self.ldb_admin.search(self.base_dn,
316                                     expression="(distinguishedName=%s)" % str(self.base_dn),
317                                     attrs=["name"],
318                                     controls=["dirsync:1:0:10000", "extended_dn:1", "show_deleted:1"])
319
320     def test_dirsync_basenc(self):
321         """Check that dirsync return correct information when dealing with the NC"""
322         res = self.ldb_admin.search(self.base_dn,
323                                     expression="(distinguishedName=%s)" % str(self.base_dn),
324                                     attrs=["name"],
325                                     controls=["dirsync:1:0:10000"])
326         self.assertEqual(len(res.msgs), 1)
327         self.assertEqual(len(res.msgs[0]), 3)
328
329         res = self.ldb_admin.search(self.base_dn,
330                                     expression="(distinguishedName=%s)" % str(self.base_dn),
331                                     attrs=["ntSecurityDescriptor"],
332                                     controls=["dirsync:1:0:10000"])
333         self.assertEqual(len(res.msgs), 1)
334         self.assertEqual(len(res.msgs[0]), 3)
335
336     def test_dirsync_othernc(self):
337         """Check that dirsync return information for entries that are normally referrals (ie. other NCs)"""
338         res = self.ldb_admin.search(self.base_dn,
339                                     expression="(objectclass=configuration)",
340                                     attrs=["name"],
341                                     controls=["dirsync:1:0:10000"])
342         self.assertEqual(len(res.msgs), 1)
343         self.assertEqual(len(res.msgs[0]), 4)
344
345         res = self.ldb_admin.search(self.base_dn,
346                                     expression="(objectclass=configuration)",
347                                     attrs=["ntSecurityDescriptor"],
348                                     controls=["dirsync:1:0:10000"])
349         self.assertEqual(len(res.msgs), 1)
350         self.assertEqual(len(res.msgs[0]), 3)
351
352         res = self.ldb_admin.search(self.base_dn,
353                                     expression="(objectclass=domaindns)",
354                                     attrs=["ntSecurityDescriptor"],
355                                     controls=["dirsync:1:0:10000"])
356         nb = len(res.msgs)
357
358         # only sub nc returns a result when asked for objectGUID
359         res = self.ldb_admin.search(self.base_dn,
360                                     expression="(objectclass=domaindns)",
361                                     attrs=["objectGUID"],
362                                     controls=["dirsync:1:0:0"])
363         self.assertEqual(len(res.msgs), nb - 1)
364         if nb > 1:
365             self.assertTrue(res.msgs[0].get("objectGUID") is not None)
366         else:
367             res = self.ldb_admin.search(self.base_dn,
368                                         expression="(objectclass=configuration)",
369                                         attrs=["objectGUID"],
370                                         controls=["dirsync:1:0:0"])
371
372     def test_dirsync_send_delta(self):
373         """Check that dirsync return correct delta when sending the last cookie"""
374         res = self.ldb_admin.search(self.base_dn,
375                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
376                                     controls=["dirsync:1:0:10000"])
377         ctl = str(res.controls[0]).split(":")
378         ctl[1] = "1"
379         ctl[2] = "0"
380         ctl[3] = "10000"
381         control = str(":".join(ctl))
382         res = self.ldb_admin.search(self.base_dn,
383                                     expression="(&(samaccountname=test*)(!(isDeleted=*)))",
384                                     controls=[control])
385         self.assertEqual(len(res), 0)
386
387         res = self.ldb_admin.search(self.base_dn,
388                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
389                                     controls=["dirsync:1:0:100000"])
390
391         ctl = str(res.controls[0]).split(":")
392         ctl[1] = "1"
393         ctl[2] = "0"
394         ctl[3] = "10000"
395         control2 = str(":".join(ctl))
396
397         # Let's create an OU
398         ouname = "OU=testou2,%s" % self.base_dn
399         self.ouname = ouname
400         self.ldb_admin.create_ou(ouname)
401         res = self.ldb_admin.search(self.base_dn,
402                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
403                                     controls=[control2])
404         self.assertEqual(len(res), 1)
405         ctl = str(res.controls[0]).split(":")
406         ctl[1] = "1"
407         ctl[2] = "0"
408         ctl[3] = "10000"
409         control3 = str(":".join(ctl))
410
411         delta = Message()
412         delta.dn = Dn(self.ldb_admin, str(ouname))
413
414         delta["cn"] = MessageElement("test ou",
415                                      FLAG_MOD_ADD,
416                                      "cn")
417         self.ldb_admin.modify(delta)
418         res = self.ldb_admin.search(self.base_dn,
419                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
420                                     controls=[control3])
421
422         self.assertEqual(len(res.msgs), 1)
423         # 3 attributes: instanceType, cn and objectGUID
424         self.assertEqual(len(res.msgs[0]), 3)
425
426         delta = Message()
427         delta.dn = Dn(self.ldb_admin, str(ouname))
428         delta["cn"] = MessageElement([],
429                                      FLAG_MOD_DELETE,
430                                      "cn")
431         self.ldb_admin.modify(delta)
432         res = self.ldb_admin.search(self.base_dn,
433                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
434                                     controls=[control3])
435
436         self.assertEqual(len(res.msgs), 1)
437         # So we won't have much attribute returned but instanceType and GUID
438         # are.
439         # 3 attributes: instanceType and objectGUID and cn but empty
440         self.assertEqual(len(res.msgs[0]), 3)
441         ouname = "OU=newouname,%s" % self.base_dn
442         self.ldb_admin.rename(str(res[0].dn), str(Dn(self.ldb_admin, ouname)))
443         self.ouname = ouname
444         ctl = str(res.controls[0]).split(":")
445         ctl[1] = "1"
446         ctl[2] = "0"
447         ctl[3] = "10000"
448         control4 = str(":".join(ctl))
449         res = self.ldb_admin.search(self.base_dn,
450                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
451                                     controls=[control3])
452
453         self.assertTrue(res[0].get("parentGUID") is not None)
454         self.assertTrue(res[0].get("name") is not None)
455         delete_force(self.ldb_admin, ouname)
456
457     def test_dirsync_linkedattributes_OBJECT_SECURITY(self):
458         """Check that dirsync returned deleted objects too"""
459         # Let's search for members
460         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
461         res = self.ldb_simple.search(self.base_dn,
462                                      expression="(name=Administrators)",
463                                      controls=["dirsync:1:1:1"])
464
465         self.assertTrue(len(res[0].get("member")) > 0)
466         size = len(res[0].get("member"))
467
468         ctl = str(res.controls[0]).split(":")
469         ctl[1] = "1"
470         ctl[2] = "1"
471         ctl[3] = "10000"
472         control1 = str(":".join(ctl))
473         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
474                                                 add_members_operation=True)
475
476         res = self.ldb_simple.search(self.base_dn,
477                                      expression="(name=Administrators)",
478                                      controls=[control1])
479
480         self.assertEqual(len(res[0].get("member")), size + 1)
481         ctl = str(res.controls[0]).split(":")
482         ctl[1] = "1"
483         ctl[2] = "1"
484         ctl[3] = "10000"
485         control1 = str(":".join(ctl))
486
487         # remove the user from the group
488         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
489                                                 add_members_operation=False)
490
491         res = self.ldb_simple.search(self.base_dn,
492                                      expression="(name=Administrators)",
493                                      controls=[control1])
494
495         self.assertEqual(len(res[0].get("member")), size)
496
497         self.ldb_admin.newgroup("testgroup")
498         self.addCleanup(self.ldb_admin.deletegroup, "testgroup")
499         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
500                                                 add_members_operation=True)
501
502         res = self.ldb_admin.search(self.base_dn,
503                                     expression="(name=testgroup)",
504                                     controls=["dirsync:1:0:1"])
505
506         self.assertEqual(len(res[0].get("member")), 1)
507         self.assertTrue(res[0].get("member") != "")
508
509         ctl = str(res.controls[0]).split(":")
510         ctl[1] = "1"
511         ctl[2] = "0"
512         ctl[3] = "1"
513         control1 = str(":".join(ctl))
514
515         # Check that reasking the same question but with an updated cookie
516         # didn't return any results.
517         print(control1)
518         res = self.ldb_admin.search(self.base_dn,
519                                     expression="(name=testgroup)",
520                                     controls=[control1])
521         self.assertEqual(len(res), 0)
522
523         ctl = str(res.controls[0]).split(":")
524         ctl[1] = "1"
525         ctl[2] = "1"
526         ctl[3] = "10000"
527         control1 = str(":".join(ctl))
528
529         self.ldb_admin.add_remove_group_members("testgroup", [self.simple_user],
530                                                 add_members_operation=False)
531
532         res = self.ldb_admin.search(self.base_dn,
533                                     expression="(name=testgroup)",
534                                     attrs=["member"],
535                                     controls=[control1])
536
537         self.assertEqual(len(res[0].get("member")), 0)
538
539     def test_dirsync_deleted_items(self):
540         """Check that dirsync returned deleted objects too"""
541         # Let's create an OU
542         ouname = "OU=testou3,%s" % self.base_dn
543         self.ouname = ouname
544         self.ldb_admin.create_ou(ouname)
545         res = self.ldb_admin.search(self.base_dn,
546                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
547                                     controls=["dirsync:1:0:1"])
548         guid = None
549         for e in res:
550             if str(e["name"]) == "testou3":
551                 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
552
553         ctl = str(res.controls[0]).split(":")
554         ctl[1] = "1"
555         ctl[2] = "0"
556         ctl[3] = "10000"
557         control1 = str(":".join(ctl))
558
559         # So now delete the object and check that
560         # we can see the object but deleted when admin
561         delete_force(self.ldb_admin, ouname)
562
563         res = self.ldb_admin.search(self.base_dn,
564                                     expression="(objectClass=organizationalUnit)",
565                                     controls=[control1])
566         self.assertEqual(len(res), 1)
567         guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
568         self.assertEqual(guid2, guid)
569         self.assertTrue(res[0].get("isDeleted"))
570         self.assertTrue(res[0].get("name") is not None)
571
572     def test_cookie_from_others(self):
573         res = self.ldb_admin.search(self.base_dn,
574                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
575                                     controls=["dirsync:1:0:1"])
576         ctl = str(res.controls[0]).split(":")
577         cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
578         cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
579         controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
580         res = self.ldb_admin.search(self.base_dn,
581                                     expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
582                                     controls=controls)
583
584     def test_dirsync_linkedattributes_range(self):
585         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
586         res = self.ldb_admin.search(self.base_dn,
587                                     attrs=["member;range=1-1"],
588                                     expression="(name=Administrators)",
589                                     controls=["dirsync:1:0:0"])
590
591         self.assertTrue(len(res) > 0)
592         self.assertTrue(res[0].get("member;range=1-1") is None)
593         self.assertTrue(res[0].get("member") is not None)
594         self.assertTrue(len(res[0].get("member")) > 0)
595
596     def test_dirsync_linkedattributes_range_user(self):
597         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
598         try:
599             res = self.ldb_simple.search(self.base_dn,
600                                          attrs=["member;range=1-1"],
601                                          expression="(name=Administrators)",
602                                         controls=["dirsync:1:0:0"])
603         except LdbError as e:
604             (num, _) = e.args
605             self.assertEqual(num, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
606         else:
607             self.fail()
608
609     def test_dirsync_linkedattributes(self):
610         flag_incr_linked = 2147483648
611         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
612         res = self.ldb_admin.search(self.base_dn,
613                                     attrs=["member"],
614                                     expression="(name=Administrators)",
615                                     controls=["dirsync:1:%d:1" % flag_incr_linked])
616
617         self.assertTrue(res[0].get("member;range=1-1") is not None)
618         self.assertTrue(len(res[0].get("member;range=1-1")) > 0)
619         size = len(res[0].get("member;range=1-1"))
620
621         ctl = str(res.controls[0]).split(":")
622         ctl[1] = "1"
623         ctl[2] = "%d" % flag_incr_linked
624         ctl[3] = "10000"
625         control1 = str(":".join(ctl))
626         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
627                                                 add_members_operation=True)
628         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
629                                                 add_members_operation=True)
630
631         res = self.ldb_admin.search(self.base_dn,
632                                     expression="(name=Administrators)",
633                                     controls=[control1])
634
635         self.assertEqual(len(res[0].get("member;range=1-1")), 2)
636         ctl = str(res.controls[0]).split(":")
637         ctl[1] = "1"
638         ctl[2] = "%d" % flag_incr_linked
639         ctl[3] = "10000"
640         control1 = str(":".join(ctl))
641
642         # remove the user from the group
643         self.ldb_admin.add_remove_group_members("Administrators", [self.simple_user],
644                                                 add_members_operation=False)
645
646         res = self.ldb_admin.search(self.base_dn,
647                                     expression="(name=Administrators)",
648                                     controls=[control1])
649
650         self.assertEqual(res[0].get("member;range=1-1"), None)
651         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
652
653         ctl = str(res.controls[0]).split(":")
654         ctl[1] = "1"
655         ctl[2] = "%d" % flag_incr_linked
656         ctl[3] = "10000"
657         control2 = str(":".join(ctl))
658
659         self.ldb_admin.add_remove_group_members("Administrators", [self.dirsync_user],
660                                                 add_members_operation=False)
661
662         res = self.ldb_admin.search(self.base_dn,
663                                     expression="(name=Administrators)",
664                                     controls=[control2])
665
666         self.assertEqual(res[0].get("member;range=1-1"), None)
667         self.assertEqual(len(res[0].get("member;range=0-0")), 1)
668
669         res = self.ldb_admin.search(self.base_dn,
670                                     expression="(name=Administrators)",
671                                     controls=[control1])
672
673         self.assertEqual(res[0].get("member;range=1-1"), None)
674         self.assertEqual(len(res[0].get("member;range=0-0")), 2)
675
676     def test_dirsync_extended_dn(self):
677         """Check that dirsync works together with the extended_dn control"""
678         # Let's search for members
679         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
680         res = self.ldb_simple.search(self.base_dn,
681                                      expression="(name=Administrators)",
682                                      controls=["dirsync:1:1:1"])
683
684         self.assertTrue(len(res[0].get("member")) > 0)
685         size = len(res[0].get("member"))
686
687         resEX1 = self.ldb_simple.search(self.base_dn,
688                                         expression="(name=Administrators)",
689                                         controls=["dirsync:1:1:1","extended_dn:1:1"])
690         self.assertTrue(len(resEX1[0].get("member")) > 0)
691         sizeEX1 = len(resEX1[0].get("member"))
692         self.assertEqual(sizeEX1, size)
693         self.assertIn(res[0]["member"][0], resEX1[0]["member"][0])
694         self.assertIn(b"<GUID=", resEX1[0]["member"][0])
695         self.assertIn(b">;<SID=S-1-5-21-", resEX1[0]["member"][0])
696
697         resEX0 = self.ldb_simple.search(self.base_dn,
698                                         expression="(name=Administrators)",
699                                         controls=["dirsync:1:1:1","extended_dn:1:0"])
700         self.assertTrue(len(resEX0[0].get("member")) > 0)
701         sizeEX0 = len(resEX0[0].get("member"))
702         self.assertEqual(sizeEX0, size)
703         self.assertIn(res[0]["member"][0], resEX0[0]["member"][0])
704         self.assertIn(b"<GUID=", resEX0[0]["member"][0])
705         self.assertIn(b">;<SID=010500000000000515", resEX0[0]["member"][0])
706
707     def test_dirsync_deleted_items_OBJECT_SECURITY(self):
708         """Check that dirsync returned deleted objects too"""
709         # Let's create an OU
710         self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
711         ouname = "OU=testou3,%s" % self.base_dn
712         self.ouname = ouname
713         self.ldb_admin.create_ou(ouname)
714
715         # Specify LDAP_DIRSYNC_OBJECT_SECURITY
716         res = self.ldb_simple.search(self.base_dn,
717                                      expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
718                                      controls=["dirsync:1:1:1"])
719
720         guid = None
721         for e in res:
722             if str(e["name"]) == "testou3":
723                 guid = str(ndr_unpack(misc.GUID, e.get("objectGUID")[0]))
724
725         self.assertTrue(guid is not None)
726         ctl = str(res.controls[0]).split(":")
727         ctl[1] = "1"
728         ctl[2] = "1"
729         ctl[3] = "10000"
730         control1 = str(":".join(ctl))
731
732         # So now delete the object and check that
733         # we can see the object but deleted when admin
734         # we just see the objectGUID when simple user
735         delete_force(self.ldb_admin, ouname)
736
737         res = self.ldb_simple.search(self.base_dn,
738                                      expression="(objectClass=organizationalUnit)",
739                                      controls=[control1])
740         self.assertEqual(len(res), 1)
741         guid2 = str(ndr_unpack(misc.GUID, res[0].get("objectGUID")[0]))
742         self.assertEqual(guid2, guid)
743         self.assertEqual(str(res[0].dn), "")
744
745     def test_dirsync_unicodePwd(self):
746         res = self.ldb_admin.search(self.base_dn,
747                                     attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
748                                     expression="(samAccountName=krbtgt)",
749                                     controls=["dirsync:1:0:0"])
750
751         self.assertTrue(len(res) == 1)
752         # This form ensures this is a case insensitive comparison
753         self.assertTrue("samAccountName" in res[0])
754         self.assertTrue(res[0].get("samAccountName"))
755         self.assertTrue(res[0].get("unicodePwd") is None)
756         self.assertTrue(res[0].get("supplementalCredentials") is None)
757
758 if not getattr(opts, "listtests", False):
759     lp = sambaopts.get_loadparm()
760     samba.tests.cmdline_credentials = credopts.get_credentials(lp)
761
762
763 TestProgram(module=__name__, opts=subunitopts)