d5d069e3e6b6089418330f55b7717664c398f2b9
[metze/samba/wip.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.insert(0, "bin/python")
10 import samba
11
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
13
14 import samba.getopt as options
15 from samba.join import dc_join
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD, FLAG_MOD_DELETE
24 from samba.dcerpc import security, drsuapi, misc
25
26 from samba.auth import system_session
27 from samba import gensec, sd_utils
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials, DONT_USE_KERBEROS
30 import samba.tests
31 from samba.tests import delete_force
32 import samba.dsdb
33
34 parser = optparse.OptionParser("acl.py [options] <host>")
35 sambaopts = options.SambaOptions(parser)
36 parser.add_option_group(sambaopts)
37 parser.add_option_group(options.VersionOptions(parser))
38
39 # use command line creds if available
40 credopts = options.CredentialsOptions(parser)
41 parser.add_option_group(credopts)
42 subunitopts = SubunitOptions(parser)
43 parser.add_option_group(subunitopts)
44
45 opts, args = parser.parse_args()
46
47 if len(args) < 1:
48     parser.print_usage()
49     sys.exit(1)
50
51 host = args[0]
52 if not "://" in host:
53     ldaphost = "ldap://%s" % host
54 else:
55     ldaphost = host
56     start = host.rindex("://")
57     host = host.lstrip(start+3)
58
59 lp = sambaopts.get_loadparm()
60 creds = credopts.get_credentials(lp)
61 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
62
63 #
64 # Tests start here
65 #
66
67 class AclTests(samba.tests.TestCase):
68
69     def setUp(self):
70         super(AclTests, self).setUp()
71         self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
72         self.base_dn = self.ldb_admin.domain_dn()
73         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
74         self.user_pass = "samba123@"
75         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
76         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
77         self.addCleanup(self.delete_admin_connection)
78         #used for anonymous login
79         self.creds_tmp = Credentials()
80         self.creds_tmp.set_username("")
81         self.creds_tmp.set_password("")
82         self.creds_tmp.set_domain(creds.get_domain())
83         self.creds_tmp.set_realm(creds.get_realm())
84         self.creds_tmp.set_workstation(creds.get_workstation())
85         print "baseDN: %s" % self.base_dn
86
87     def get_user_dn(self, name):
88         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
89
90     def get_ldb_connection(self, target_username, target_password):
91         creds_tmp = Credentials()
92         creds_tmp.set_username(target_username)
93         creds_tmp.set_password(target_password)
94         creds_tmp.set_domain(creds.get_domain())
95         creds_tmp.set_realm(creds.get_realm())
96         creds_tmp.set_workstation(creds.get_workstation())
97         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
98                                       | gensec.FEATURE_SEAL)
99         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
100         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
101         return ldb_target
102
103     # Test if we have any additional groups for users than default ones
104     def assert_user_no_group_member(self, username):
105         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
106         try:
107             self.assertEqual(res[0]["memberOf"][0], "")
108         except KeyError:
109             pass
110         else:
111             self.fail()
112
113     def delete_admin_connection(self):
114         del self.sd_utils
115         del self.ldb_admin
116
117 #tests on ldap add operations
118 class AclAddTests(AclTests):
119
120     def setUp(self):
121         super(AclAddTests, self).setUp()
122         # Domain admin that will be creator of OU parent-child structure
123         self.usr_admin_owner = "acl_add_user1"
124         # Second domain admin that will not be creator of OU parent-child structure
125         self.usr_admin_not_owner = "acl_add_user2"
126         # Regular user
127         self.regular_user = "acl_add_user3"
128         self.test_user1 = "test_add_user1"
129         self.test_group1 = "test_add_group1"
130         self.ou1 = "OU=test_add_ou1"
131         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
132         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
133         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
134         self.ldb_admin.newuser(self.regular_user, self.user_pass)
135
136         # add admins to the Domain Admins group
137         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
138                        add_members_operation=True)
139         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
140                        add_members_operation=True)
141
142         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
143         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
144         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
145
146     def tearDown(self):
147         super(AclAddTests, self).tearDown()
148         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
149                           (self.test_user1, self.ou2, self.base_dn))
150         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
151                           (self.test_group1, self.ou2, self.base_dn))
152         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
153         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
154         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
155         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
156         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
157         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
158
159         del self.ldb_notowner
160         del self.ldb_owner
161         del self.ldb_user
162
163     # Make sure top OU is deleted (and so everything under it)
164     def assert_top_ou_deleted(self):
165         res = self.ldb_admin.search(self.base_dn,
166             expression="(distinguishedName=%s,%s)" % (
167                 "OU=test_add_ou1", self.base_dn))
168         self.assertEqual(len(res), 0)
169
170     def test_add_u1(self):
171         """Testing OU with the rights of Doman Admin not creator of the OU """
172         self.assert_top_ou_deleted()
173         # Change descriptor for top level OU
174         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
175         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
176         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
177         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
178         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
179         # Test user and group creation with another domain admin's credentials
180         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
181         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
182                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
183         # Make sure we HAVE created the two objects -- user and group
184         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
185         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
186         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
187         self.assertTrue(len(res) > 0)
188         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
189         self.assertTrue(len(res) > 0)
190
191     def test_add_u2(self):
192         """Testing OU with the regular user that has no rights granted over the OU """
193         self.assert_top_ou_deleted()
194         # Create a parent-child OU structure with domain admin credentials
195         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
196         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
197         # Test user and group creation with regular user credentials
198         try:
199             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
200             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
201                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
202         except LdbError, (num, _):
203             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
204         else:
205             self.fail()
206         # Make sure we HAVEN'T created any of two objects -- user or group
207         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
208         self.assertEqual(len(res), 0)
209         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
210         self.assertEqual(len(res), 0)
211
212     def test_add_u3(self):
213         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
214         self.assert_top_ou_deleted()
215         # Change descriptor for top level OU
216         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
217         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
218         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
219         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
220         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
221         # Test user and group creation with granted user only to one of the objects
222         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
223         try:
224             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
225                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
226         except LdbError, (num, _):
227             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
228         else:
229             self.fail()
230         # Make sure we HAVE created the one of two objects -- user
231         res = self.ldb_admin.search(self.base_dn,
232                 expression="(distinguishedName=%s,%s)" %
233                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
234                     self.base_dn))
235         self.assertNotEqual(len(res), 0)
236         res = self.ldb_admin.search(self.base_dn,
237                 expression="(distinguishedName=%s,%s)" %
238                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
239                     self.base_dn) )
240         self.assertEqual(len(res), 0)
241
242     def test_add_u4(self):
243         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
244         self.assert_top_ou_deleted()
245         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
246         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
247         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
248         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
249                                  grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
250         # Make sure we have successfully created the two objects -- user and group
251         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
252         self.assertTrue(len(res) > 0)
253         res = self.ldb_admin.search(self.base_dn,
254                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
255         self.assertTrue(len(res) > 0)
256
257     def test_add_anonymous(self):
258         """Test add operation with anonymous user"""
259         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
260         try:
261             anonymous.newuser("test_add_anonymous", self.user_pass)
262         except LdbError, (num, _):
263             self.assertEquals(num, ERR_OPERATIONS_ERROR)
264         else:
265             self.fail()
266
267 #tests on ldap modify operations
268 class AclModifyTests(AclTests):
269
270     def setUp(self):
271         super(AclModifyTests, self).setUp()
272         self.user_with_wp = "acl_mod_user1"
273         self.user_with_sm = "acl_mod_user2"
274         self.user_with_group_sm = "acl_mod_user3"
275         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
276         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
277         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
278         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
279         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
280         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
281         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
282         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
283         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
284         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
285
286     def tearDown(self):
287         super(AclModifyTests, self).tearDown()
288         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
289         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
290         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
291         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
292         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
293         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
294         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
295         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
296         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
297         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
298
299         del self.ldb_user
300         del self.ldb_user2
301         del self.ldb_user3
302
303     def test_modify_u1(self):
304         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
305         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
306         # First test object -- User
307         print "Testing modify on User object"
308         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
309         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
310         ldif = """
311 dn: """ + self.get_user_dn("test_modify_user1") + """
312 changetype: modify
313 replace: displayName
314 displayName: test_changed"""
315         self.ldb_user.modify_ldif(ldif)
316         res = self.ldb_admin.search(self.base_dn,
317                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
318         self.assertEqual(res[0]["displayName"][0], "test_changed")
319         # Second test object -- Group
320         print "Testing modify on Group object"
321         self.ldb_admin.newgroup("test_modify_group1",
322                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
323         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
324         ldif = """
325 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
326 changetype: modify
327 replace: displayName
328 displayName: test_changed"""
329         self.ldb_user.modify_ldif(ldif)
330         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
331         self.assertEqual(res[0]["displayName"][0], "test_changed")
332         # Third test object -- Organizational Unit
333         print "Testing modify on OU object"
334         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
335         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
336         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
337         ldif = """
338 dn: OU=test_modify_ou1,""" + self.base_dn + """
339 changetype: modify
340 replace: displayName
341 displayName: test_changed"""
342         self.ldb_user.modify_ldif(ldif)
343         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
344         self.assertEqual(res[0]["displayName"][0], "test_changed")
345
346     def test_modify_u2(self):
347         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
348         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
349         # First test object -- User
350         print "Testing modify on User object"
351         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
352         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
353         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
354         # Modify on attribute you have rights for
355         ldif = """
356 dn: """ + self.get_user_dn("test_modify_user1") + """
357 changetype: modify
358 replace: displayName
359 displayName: test_changed"""
360         self.ldb_user.modify_ldif(ldif)
361         res = self.ldb_admin.search(self.base_dn,
362                 expression="(distinguishedName=%s)" %
363                 self.get_user_dn("test_modify_user1"))
364         self.assertEqual(res[0]["displayName"][0], "test_changed")
365         # Modify on attribute you do not have rights for granted
366         ldif = """
367 dn: """ + self.get_user_dn("test_modify_user1") + """
368 changetype: modify
369 replace: url
370 url: www.samba.org"""
371         try:
372             self.ldb_user.modify_ldif(ldif)
373         except LdbError, (num, _):
374             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
375         else:
376             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
377             self.fail()
378         # Second test object -- Group
379         print "Testing modify on Group object"
380         self.ldb_admin.newgroup("test_modify_group1",
381                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
382         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
383         ldif = """
384 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
385 changetype: modify
386 replace: displayName
387 displayName: test_changed"""
388         self.ldb_user.modify_ldif(ldif)
389         res = self.ldb_admin.search(self.base_dn,
390                 expression="(distinguishedName=%s)" %
391                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
392         self.assertEqual(res[0]["displayName"][0], "test_changed")
393         # Modify on attribute you do not have rights for granted
394         ldif = """
395 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
396 changetype: modify
397 replace: url
398 url: www.samba.org"""
399         try:
400             self.ldb_user.modify_ldif(ldif)
401         except LdbError, (num, _):
402             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
403         else:
404             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
405             self.fail()
406         # Modify on attribute you do not have rights for granted while also modifying something you do have rights for
407         ldif = """
408 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
409 changetype: modify
410 replace: url
411 url: www.samba.org
412 replace: displayName
413 displayName: test_changed"""
414         try:
415             self.ldb_user.modify_ldif(ldif)
416         except LdbError, (num, _):
417             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
418         else:
419             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
420             self.fail()
421         # Second test object -- Organizational Unit
422         print "Testing modify on OU object"
423         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
424         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
425         ldif = """
426 dn: OU=test_modify_ou1,""" + self.base_dn + """
427 changetype: modify
428 replace: displayName
429 displayName: test_changed"""
430         self.ldb_user.modify_ldif(ldif)
431         res = self.ldb_admin.search(self.base_dn,
432                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
433                     + self.base_dn))
434         self.assertEqual(res[0]["displayName"][0], "test_changed")
435         # Modify on attribute you do not have rights for granted
436         ldif = """
437 dn: OU=test_modify_ou1,""" + self.base_dn + """
438 changetype: modify
439 replace: url
440 url: www.samba.org"""
441         try:
442             self.ldb_user.modify_ldif(ldif)
443         except LdbError, (num, _):
444             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
445         else:
446             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
447             self.fail()
448
449     def test_modify_u3(self):
450         """7 Modify one attribute as you have no what so ever rights granted"""
451         # First test object -- User
452         print "Testing modify on User object"
453         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
454         # Modify on attribute you do not have rights for granted
455         ldif = """
456 dn: """ + self.get_user_dn("test_modify_user1") + """
457 changetype: modify
458 replace: url
459 url: www.samba.org"""
460         try:
461             self.ldb_user.modify_ldif(ldif)
462         except LdbError, (num, _):
463             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
464         else:
465             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
466             self.fail()
467
468         # Second test object -- Group
469         print "Testing modify on Group object"
470         self.ldb_admin.newgroup("test_modify_group1",
471                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
472         # Modify on attribute you do not have rights for granted
473         ldif = """
474 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
475 changetype: modify
476 replace: url
477 url: www.samba.org"""
478         try:
479             self.ldb_user.modify_ldif(ldif)
480         except LdbError, (num, _):
481             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
482         else:
483             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
484             self.fail()
485
486         # Second test object -- Organizational Unit
487         print "Testing modify on OU object"
488         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
489         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
490         # Modify on attribute you do not have rights for granted
491         ldif = """
492 dn: OU=test_modify_ou1,""" + self.base_dn + """
493 changetype: modify
494 replace: url
495 url: www.samba.org"""
496         try:
497             self.ldb_user.modify_ldif(ldif)
498         except LdbError, (num, _):
499             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
500         else:
501             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
502             self.fail()
503
504
505     def test_modify_u4(self):
506         """11 Grant WP to PRINCIPAL_SELF and test modify"""
507         ldif = """
508 dn: """ + self.get_user_dn(self.user_with_wp) + """
509 changetype: modify
510 add: adminDescription
511 adminDescription: blah blah blah"""
512         try:
513             self.ldb_user.modify_ldif(ldif)
514         except LdbError, (num, _):
515             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
516         else:
517             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
518             self.fail()
519
520         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
521         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
522         # Modify on attribute you have rights for
523         self.ldb_user.modify_ldif(ldif)
524         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
525                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
526         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
527
528     def test_modify_u5(self):
529         """12 test self membership"""
530         ldif = """
531 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
532 changetype: modify
533 add: Member
534 Member: """ +  self.get_user_dn(self.user_with_sm)
535 #the user has no rights granted, this should fail
536         try:
537             self.ldb_user2.modify_ldif(ldif)
538         except LdbError, (num, _):
539             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
540         else:
541             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
542             self.fail()
543
544 #grant self-membership, should be able to add himself
545         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
546         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
547         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
548         self.ldb_user2.modify_ldif(ldif)
549         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
550                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
551         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
552 #but not other users
553         ldif = """
554 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
555 changetype: modify
556 add: Member
557 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
558         try:
559             self.ldb_user2.modify_ldif(ldif)
560         except LdbError, (num, _):
561             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
562         else:
563             self.fail()
564
565     def test_modify_u6(self):
566         """13 test self membership"""
567         ldif = """
568 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
569 changetype: modify
570 add: Member
571 Member: """ +  self.get_user_dn(self.user_with_sm) + """
572 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
573
574 #grant self-membership, should be able to add himself  but not others at the same time
575         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
576         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
577         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
578         try:
579             self.ldb_user2.modify_ldif(ldif)
580         except LdbError, (num, _):
581             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
582         else:
583             self.fail()
584
585     def test_modify_u7(self):
586         """13 User with WP modifying Member"""
587 #a second user is given write property permission
588         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
589         mod = "(A;;WP;;;%s)" % str(user_sid)
590         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
591         ldif = """
592 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
593 changetype: modify
594 add: Member
595 Member: """ +  self.get_user_dn(self.user_with_wp)
596         self.ldb_user.modify_ldif(ldif)
597         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
598                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
599         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
600         ldif = """
601 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
602 changetype: modify
603 delete: Member"""
604         self.ldb_user.modify_ldif(ldif)
605         ldif = """
606 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
607 changetype: modify
608 add: Member
609 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
610         self.ldb_user.modify_ldif(ldif)
611         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
612                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
613         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
614
615     def test_modify_anonymous(self):
616         """Test add operation with anonymous user"""
617         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
618         self.ldb_admin.newuser("test_anonymous", "samba123@")
619         m = Message()
620         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
621
622         m["description"] = MessageElement("sambauser2",
623                                           FLAG_MOD_ADD,
624                                           "description")
625         try:
626             anonymous.modify(m)
627         except LdbError, (num, _):
628             self.assertEquals(num, ERR_OPERATIONS_ERROR)
629         else:
630             self.fail()
631
632 #enable these when we have search implemented
633 class AclSearchTests(AclTests):
634
635     def setUp(self):
636         super(AclSearchTests, self).setUp()
637         # Get the old "dSHeuristics" if it was set
638         dsheuristics = self.ldb_admin.get_dsheuristics()
639         # Reset the "dSHeuristics" as they were before
640         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
641         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
642         self.ldb_admin.set_dsheuristics("000000001")
643         # Get the old "minPwdAge"
644         minPwdAge = self.ldb_admin.get_minPwdAge()
645         # Reset the "minPwdAge" as it was before
646         self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
647         # Set it temporarely to "0"
648         self.ldb_admin.set_minPwdAge("0")
649
650         self.u1 = "search_u1"
651         self.u2 = "search_u2"
652         self.u3 = "search_u3"
653         self.group1 = "group1"
654         self.ldb_admin.newuser(self.u1, self.user_pass)
655         self.ldb_admin.newuser(self.u2, self.user_pass)
656         self.ldb_admin.newuser(self.u3, self.user_pass)
657         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
658         self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
659                                                 add_members_operation=True)
660         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
661         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
662         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
663         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
664                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
665                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
666                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
667                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
668                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
669         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
670         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
671
672     def create_clean_ou(self, object_dn):
673         """ Base repeating setup for unittests to follow """
674         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
675                 expression="distinguishedName=%s" % object_dn)
676         # Make sure top testing OU has been deleted before starting the test
677         self.assertEqual(len(res), 0)
678         self.ldb_admin.create_ou(object_dn)
679         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
680         # Make sure there are inheritable ACEs initially
681         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
682         # Find and remove all inherit ACEs
683         res = re.findall("\(.*?\)", desc_sddl)
684         res = [x for x in res if ("CI" in x) or ("OI" in x)]
685         for x in res:
686             desc_sddl = desc_sddl.replace(x, "")
687         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
688         # can propagate from above
689         # remove SACL, we are not interested
690         desc_sddl = desc_sddl.replace(":AI", ":AIP")
691         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
692         # Verify all inheritable ACEs are gone
693         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
694         self.assertFalse("CI" in desc_sddl)
695         self.assertFalse("OI" in desc_sddl)
696
697     def tearDown(self):
698         super(AclSearchTests, self).tearDown()
699         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
700         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
701         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
702         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
703         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
704         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
705         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
706         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
707         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
708         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
709         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
710         delete_force(self.ldb_admin, self.get_user_dn("group1"))
711
712         del self.ldb_user
713         del self.ldb_user2
714         del self.ldb_user3
715
716     def test_search_anonymous1(self):
717         """Verify access of rootDSE with the correct request"""
718         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
719         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
720         self.assertEquals(len(res), 1)
721         #verify some of the attributes
722         #don't care about values
723         self.assertTrue("ldapServiceName" in res[0])
724         self.assertTrue("namingContexts" in res[0])
725         self.assertTrue("isSynchronized" in res[0])
726         self.assertTrue("dsServiceName" in res[0])
727         self.assertTrue("supportedSASLMechanisms" in res[0])
728         self.assertTrue("isGlobalCatalogReady" in res[0])
729         self.assertTrue("domainControllerFunctionality" in res[0])
730         self.assertTrue("serverName" in res[0])
731
732     def test_search_anonymous2(self):
733         """Make sure we cannot access anything else"""
734         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
735         try:
736             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
737         except LdbError, (num, _):
738             self.assertEquals(num, ERR_OPERATIONS_ERROR)
739         else:
740             self.fail()
741         try:
742             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
743         except LdbError, (num, _):
744             self.assertEquals(num, ERR_OPERATIONS_ERROR)
745         else:
746             self.fail()
747         try:
748             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
749                                         scope=SCOPE_SUBTREE)
750         except LdbError, (num, _):
751             self.assertEquals(num, ERR_OPERATIONS_ERROR)
752         else:
753             self.fail()
754
755     def test_search_anonymous3(self):
756         """Set dsHeuristics and repeat"""
757         self.ldb_admin.set_dsheuristics("0000002")
758         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
759         mod = "(A;CI;LC;;;AN)"
760         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
761         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
762         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
763         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
764                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
765         self.assertEquals(len(res), 1)
766         self.assertTrue("dn" in res[0])
767         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
768                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
769         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
770                                scope=SCOPE_SUBTREE)
771         self.assertEquals(len(res), 1)
772         self.assertTrue("dn" in res[0])
773         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
774
775     def test_search1(self):
776         """Make sure users can see us if given LC to user and group"""
777         self.create_clean_ou("OU=ou1," + self.base_dn)
778         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
779         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
780         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
781                                                  self.domain_sid)
782         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
783         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
784         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
785         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
786         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
787
788         #regular users must see only ou1 and ou2
789         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
790                                     scope=SCOPE_SUBTREE)
791         self.assertEquals(len(res), 2)
792         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
793                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
794
795         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
796         self.assertEquals(sorted(res_list), sorted(ok_list))
797
798         #these users should see all ous
799         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
800                                     scope=SCOPE_SUBTREE)
801         self.assertEquals(len(res), 6)
802         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
803         self.assertEquals(sorted(res_list), sorted(self.full_list))
804
805         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
806                                     scope=SCOPE_SUBTREE)
807         self.assertEquals(len(res), 6)
808         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
809         self.assertEquals(sorted(res_list), sorted(self.full_list))
810
811     def test_search2(self):
812         """Make sure users can't see us if access is explicitly denied"""
813         self.create_clean_ou("OU=ou1," + self.base_dn)
814         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
815         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
816         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
817         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
818         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
819         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
820         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
821         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
822                                     scope=SCOPE_SUBTREE)
823         #this user should see all ous
824         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
825         self.assertEquals(sorted(res_list), sorted(self.full_list))
826
827         #these users should see ou1, 2, 5 and 6 but not 3 and 4
828         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
829                                     scope=SCOPE_SUBTREE)
830         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
831                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
832                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
833                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
834         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
835         self.assertEquals(sorted(res_list), sorted(ok_list))
836
837         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
838                                     scope=SCOPE_SUBTREE)
839         self.assertEquals(len(res), 4)
840         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
841         self.assertEquals(sorted(res_list), sorted(ok_list))
842
843     def test_search3(self):
844         """Make sure users can't see ous if access is explicitly denied - 2"""
845         self.create_clean_ou("OU=ou1," + self.base_dn)
846         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
847         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
848         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
849                                                  self.domain_sid)
850         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
851         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
852         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
853         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
854         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
855
856         print "Testing correct behavior on nonaccessible search base"
857         try:
858              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
859                                    scope=SCOPE_BASE)
860         except LdbError, (num, _):
861             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
862         else:
863             self.fail()
864
865         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
866         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
867
868         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
869                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
870
871         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
872                                     scope=SCOPE_SUBTREE)
873         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
874         self.assertEquals(sorted(res_list), sorted(ok_list))
875
876         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
877                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
878                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
879                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
880
881         #should not see ou3 and ou4, but should see ou5 and ou6
882         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
883                                     scope=SCOPE_SUBTREE)
884         self.assertEquals(len(res), 4)
885         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
886         self.assertEquals(sorted(res_list), sorted(ok_list))
887
888         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
889                                     scope=SCOPE_SUBTREE)
890         self.assertEquals(len(res), 4)
891         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
892         self.assertEquals(sorted(res_list), sorted(ok_list))
893
894     def test_search4(self):
895         """There is no difference in visibility if the user is also creator"""
896         self.create_clean_ou("OU=ou1," + self.base_dn)
897         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
898         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
899         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
900                                                  self.domain_sid)
901         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
902         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
903         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
904         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
905         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
906
907         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
908                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
909         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
910                                     scope=SCOPE_SUBTREE)
911         self.assertEquals(len(res), 2)
912         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
913         self.assertEquals(sorted(res_list), sorted(ok_list))
914
915         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
916                                     scope=SCOPE_SUBTREE)
917         self.assertEquals(len(res), 2)
918         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
919         self.assertEquals(sorted(res_list), sorted(ok_list))
920
921     def test_search5(self):
922         """Make sure users can see only attributes they are allowed to see"""
923         self.create_clean_ou("OU=ou1," + self.base_dn)
924         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
925         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
926         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
927                                                  self.domain_sid)
928         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
929         # assert user can only see dn
930         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
931                                     scope=SCOPE_SUBTREE)
932         ok_list = ['dn']
933         self.assertEquals(len(res), 1)
934         res_list = res[0].keys()
935         self.assertEquals(res_list, ok_list)
936
937         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
938                                     scope=SCOPE_BASE, attrs=["ou"])
939
940         self.assertEquals(len(res), 1)
941         res_list = res[0].keys()
942         self.assertEquals(res_list, ok_list)
943
944         #give read property on ou and assert user can only see dn and ou
945         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
946         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
947         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
948         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
949                                     scope=SCOPE_SUBTREE)
950         ok_list = ['dn', 'ou']
951         self.assertEquals(len(res), 1)
952         res_list = res[0].keys()
953         self.assertEquals(sorted(res_list), sorted(ok_list))
954
955         #give read property on Public Information and assert user can see ou and other members
956         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
957         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
958         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
959         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
960                                     scope=SCOPE_SUBTREE)
961
962         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
963         res_list = res[0].keys()
964         self.assertEquals(sorted(res_list), sorted(ok_list))
965
966     def test_search6(self):
967         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
968         self.create_clean_ou("OU=ou1," + self.base_dn)
969         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
970         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
971         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
972                                                  self.domain_sid)
973         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
974         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
975
976         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
977                                     scope=SCOPE_SUBTREE)
978         #nothing should be returned as ou is not accessible
979         self.assertEquals(len(res), 0)
980
981         #give read property on ou and assert user can only see dn and ou
982         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
983         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
984         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
985                                     scope=SCOPE_SUBTREE)
986         self.assertEquals(len(res), 1)
987         ok_list = ['dn', 'ou']
988         res_list = res[0].keys()
989         self.assertEquals(sorted(res_list), sorted(ok_list))
990
991         #give read property on Public Information and assert user can see ou and other members
992         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
993         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
994         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
995                                    scope=SCOPE_SUBTREE)
996         self.assertEquals(len(res), 1)
997         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
998         res_list = res[0].keys()
999         self.assertEquals(sorted(res_list), sorted(ok_list))
1000
1001 #tests on ldap delete operations
1002 class AclDeleteTests(AclTests):
1003
1004     def setUp(self):
1005         super(AclDeleteTests, self).setUp()
1006         self.regular_user = "acl_delete_user1"
1007         # Create regular user
1008         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1009         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1010
1011     def tearDown(self):
1012         super(AclDeleteTests, self).tearDown()
1013         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
1014         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1015         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
1016
1017         del self.ldb_user
1018
1019     def test_delete_u1(self):
1020         """User is prohibited by default to delete another User object"""
1021         # Create user that we try to delete
1022         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1023         # Here delete User object should ALWAYS through exception
1024         try:
1025             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
1026         except LdbError, (num, _):
1027             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1028         else:
1029             self.fail()
1030
1031     def test_delete_u2(self):
1032         """User's group has RIGHT_DELETE to another User object"""
1033         user_dn = self.get_user_dn("test_delete_user1")
1034         # Create user that we try to delete
1035         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1036         mod = "(A;;SD;;;AU)"
1037         self.sd_utils.dacl_add_ace(user_dn, mod)
1038         # Try to delete User object
1039         self.ldb_user.delete(user_dn)
1040         res = self.ldb_admin.search(self.base_dn,
1041                 expression="(distinguishedName=%s)" % user_dn)
1042         self.assertEqual(len(res), 0)
1043
1044     def test_delete_u3(self):
1045         """User indentified by SID has RIGHT_DELETE to another User object"""
1046         user_dn = self.get_user_dn("test_delete_user1")
1047         # Create user that we try to delete
1048         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1049         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1050         self.sd_utils.dacl_add_ace(user_dn, mod)
1051         # Try to delete User object
1052         self.ldb_user.delete(user_dn)
1053         res = self.ldb_admin.search(self.base_dn,
1054                 expression="(distinguishedName=%s)" % user_dn)
1055         self.assertEqual(len(res), 0)
1056
1057     def test_delete_anonymous(self):
1058         """Test add operation with anonymous user"""
1059         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1060         self.ldb_admin.newuser("test_anonymous", "samba123@")
1061
1062         try:
1063             anonymous.delete(self.get_user_dn("test_anonymous"))
1064         except LdbError, (num, _):
1065             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1066         else:
1067             self.fail()
1068
1069 #tests on ldap rename operations
1070 class AclRenameTests(AclTests):
1071
1072     def setUp(self):
1073         super(AclRenameTests, self).setUp()
1074         self.regular_user = "acl_rename_user1"
1075         self.ou1 = "OU=test_rename_ou1"
1076         self.ou2 = "OU=test_rename_ou2"
1077         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1078         self.testuser1 = "test_rename_user1"
1079         self.testuser2 = "test_rename_user2"
1080         self.testuser3 = "test_rename_user3"
1081         self.testuser4 = "test_rename_user4"
1082         self.testuser5 = "test_rename_user5"
1083         # Create regular user
1084         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1085         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1086
1087     def tearDown(self):
1088         super(AclRenameTests, self).tearDown()
1089         # Rename OU3
1090         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1091         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1092         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1093         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1094         # Rename OU2
1095         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1096         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1097         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1098         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1099         # Rename OU1
1100         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1101         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1102         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1103         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1104         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1105         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1106
1107         del self.ldb_user
1108
1109     def test_rename_u1(self):
1110         """Regular user fails to rename 'User object' within single OU"""
1111         # Create OU structure
1112         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1113         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1114         try:
1115             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1116                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1117         except LdbError, (num, _):
1118             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1119         else:
1120             self.fail()
1121
1122     def test_rename_u2(self):
1123         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1124         ou_dn = "OU=test_rename_ou1," + self.base_dn
1125         user_dn = "CN=test_rename_user1," + ou_dn
1126         rename_user_dn = "CN=test_rename_user5," + ou_dn
1127         # Create OU structure
1128         self.ldb_admin.create_ou(ou_dn)
1129         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1130         mod = "(A;;WP;;;AU)"
1131         self.sd_utils.dacl_add_ace(user_dn, mod)
1132         # Rename 'User object' having WP to AU
1133         self.ldb_user.rename(user_dn, rename_user_dn)
1134         res = self.ldb_admin.search(self.base_dn,
1135                 expression="(distinguishedName=%s)" % user_dn)
1136         self.assertEqual(len(res), 0)
1137         res = self.ldb_admin.search(self.base_dn,
1138                 expression="(distinguishedName=%s)" % rename_user_dn)
1139         self.assertNotEqual(len(res), 0)
1140
1141     def test_rename_u3(self):
1142         """Test rename with rights granted to 'User object' SID"""
1143         ou_dn = "OU=test_rename_ou1," + self.base_dn
1144         user_dn = "CN=test_rename_user1," + ou_dn
1145         rename_user_dn = "CN=test_rename_user5," + ou_dn
1146         # Create OU structure
1147         self.ldb_admin.create_ou(ou_dn)
1148         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1149         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1150         mod = "(A;;WP;;;%s)" % str(sid)
1151         self.sd_utils.dacl_add_ace(user_dn, mod)
1152         # Rename 'User object' having WP to AU
1153         self.ldb_user.rename(user_dn, rename_user_dn)
1154         res = self.ldb_admin.search(self.base_dn,
1155                 expression="(distinguishedName=%s)" % user_dn)
1156         self.assertEqual(len(res), 0)
1157         res = self.ldb_admin.search(self.base_dn,
1158                 expression="(distinguishedName=%s)" % rename_user_dn)
1159         self.assertNotEqual(len(res), 0)
1160
1161     def test_rename_u4(self):
1162         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1163         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1164         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1165         user_dn = "CN=test_rename_user2," + ou1_dn
1166         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1167         # Create OU structure
1168         self.ldb_admin.create_ou(ou1_dn)
1169         self.ldb_admin.create_ou(ou2_dn)
1170         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1171         mod = "(A;;WPSD;;;AU)"
1172         self.sd_utils.dacl_add_ace(user_dn, mod)
1173         mod = "(A;;CC;;;AU)"
1174         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1175         # Rename 'User object' having SD and CC to AU
1176         self.ldb_user.rename(user_dn, rename_user_dn)
1177         res = self.ldb_admin.search(self.base_dn,
1178                 expression="(distinguishedName=%s)" % user_dn)
1179         self.assertEqual(len(res), 0)
1180         res = self.ldb_admin.search(self.base_dn,
1181                 expression="(distinguishedName=%s)" % rename_user_dn)
1182         self.assertNotEqual(len(res), 0)
1183
1184     def test_rename_u5(self):
1185         """Test rename with rights granted to 'User object' SID"""
1186         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1187         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1188         user_dn = "CN=test_rename_user2," + ou1_dn
1189         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1190         # Create OU structure
1191         self.ldb_admin.create_ou(ou1_dn)
1192         self.ldb_admin.create_ou(ou2_dn)
1193         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1194         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1195         mod = "(A;;WPSD;;;%s)" % str(sid)
1196         self.sd_utils.dacl_add_ace(user_dn, mod)
1197         mod = "(A;;CC;;;%s)" % str(sid)
1198         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1199         # Rename 'User object' having SD and CC to AU
1200         self.ldb_user.rename(user_dn, rename_user_dn)
1201         res = self.ldb_admin.search(self.base_dn,
1202                 expression="(distinguishedName=%s)" % user_dn)
1203         self.assertEqual(len(res), 0)
1204         res = self.ldb_admin.search(self.base_dn,
1205                 expression="(distinguishedName=%s)" % rename_user_dn)
1206         self.assertNotEqual(len(res), 0)
1207
1208     def test_rename_u6(self):
1209         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1210         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1211         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1212         user_dn = "CN=test_rename_user2," + ou1_dn
1213         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1214         # Create OU structure
1215         self.ldb_admin.create_ou(ou1_dn)
1216         self.ldb_admin.create_ou(ou2_dn)
1217         #mod = "(A;CI;DCWP;;;AU)"
1218         mod = "(A;;DC;;;AU)"
1219         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1220         mod = "(A;;CC;;;AU)"
1221         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1222         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1223         mod = "(A;;WP;;;AU)"
1224         self.sd_utils.dacl_add_ace(user_dn, mod)
1225         # Rename 'User object' having SD and CC to AU
1226         self.ldb_user.rename(user_dn, rename_user_dn)
1227         res = self.ldb_admin.search(self.base_dn,
1228                 expression="(distinguishedName=%s)" % user_dn)
1229         self.assertEqual(len(res), 0)
1230         res = self.ldb_admin.search(self.base_dn,
1231                 expression="(distinguishedName=%s)" % rename_user_dn)
1232         self.assertNotEqual(len(res), 0)
1233
1234     def test_rename_u7(self):
1235         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1236         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1237         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1238         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1239         user_dn = "CN=test_rename_user2," + ou1_dn
1240         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1241         # Create OU structure
1242         self.ldb_admin.create_ou(ou1_dn)
1243         self.ldb_admin.create_ou(ou2_dn)
1244         self.ldb_admin.create_ou(ou3_dn)
1245         mod = "(A;CI;WPDC;;;AU)"
1246         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1247         mod = "(A;;CC;;;AU)"
1248         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1249         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1250         # Rename 'User object' having SD and CC to AU
1251         self.ldb_user.rename(user_dn, rename_user_dn)
1252         res = self.ldb_admin.search(self.base_dn,
1253                 expression="(distinguishedName=%s)" % user_dn)
1254         self.assertEqual(len(res), 0)
1255         res = self.ldb_admin.search(self.base_dn,
1256                 expression="(distinguishedName=%s)" % rename_user_dn)
1257         self.assertNotEqual(len(res), 0)
1258
1259     def test_rename_u8(self):
1260         """Test rename on an object with and without modify access on the RDN attribute"""
1261         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1262         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1263         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1264         # Create OU structure
1265         self.ldb_admin.create_ou(ou1_dn)
1266         self.ldb_admin.create_ou(ou2_dn)
1267         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1268         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1269         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1270         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1271         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1272         try:
1273             self.ldb_user.rename(ou2_dn, ou3_dn)
1274         except LdbError, (num, _):
1275             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1276         else:
1277             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1278             self.fail()
1279         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1280         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1281         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1282         self.ldb_user.rename(ou2_dn, ou3_dn)
1283         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1284         self.assertEqual(len(res), 0)
1285         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1286         self.assertNotEqual(len(res), 0)
1287
1288     def test_rename_u9(self):
1289         """Rename 'User object' cross OU, with explicit deny on sd and dc"""
1290         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1291         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1292         user_dn = "CN=test_rename_user2," + ou1_dn
1293         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1294         # Create OU structure
1295         self.ldb_admin.create_ou(ou1_dn)
1296         self.ldb_admin.create_ou(ou2_dn)
1297         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1298         mod = "(D;;SD;;;DA)"
1299         self.sd_utils.dacl_add_ace(user_dn, mod)
1300         mod = "(D;;DC;;;DA)"
1301         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1302         # Rename 'User object' having SD and CC to AU
1303         try:
1304             self.ldb_admin.rename(user_dn, rename_user_dn)
1305         except LdbError, (num, _):
1306             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1307         else:
1308             self.fail()
1309         #add an allow ace so we can delete this ou
1310         mod = "(A;;DC;;;DA)"
1311         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1312
1313
1314 #tests on Control Access Rights
1315 class AclCARTests(AclTests):
1316
1317     def setUp(self):
1318         super(AclCARTests, self).setUp()
1319
1320         # Get the old "dSHeuristics" if it was set
1321         dsheuristics = self.ldb_admin.get_dsheuristics()
1322         # Reset the "dSHeuristics" as they were before
1323         self.addCleanup(self.ldb_admin.set_dsheuristics, dsheuristics)
1324         # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1325         self.ldb_admin.set_dsheuristics("000000001")
1326         # Get the old "minPwdAge"
1327         minPwdAge = self.ldb_admin.get_minPwdAge()
1328         # Reset the "minPwdAge" as it was before
1329         self.addCleanup(self.ldb_admin.set_minPwdAge, minPwdAge)
1330         # Set it temporarely to "0"
1331         self.ldb_admin.set_minPwdAge("0")
1332
1333         self.user_with_wp = "acl_car_user1"
1334         self.user_with_pc = "acl_car_user2"
1335         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1336         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1337         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1338         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1339
1340     def tearDown(self):
1341         super(AclCARTests, self).tearDown()
1342         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1343         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1344
1345         del self.ldb_user
1346         del self.ldb_user2
1347
1348     def test_change_password1(self):
1349         """Try a password change operation without any CARs given"""
1350         #users have change password by default - remove for negative testing
1351         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1352         sddl = desc.as_sddl(self.domain_sid)
1353         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1354         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1355         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1356         try:
1357             self.ldb_user.modify_ldif("""
1358 dn: """ + self.get_user_dn(self.user_with_wp) + """
1359 changetype: modify
1360 delete: unicodePwd
1361 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1362 add: unicodePwd
1363 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1364 """)
1365         except LdbError, (num, _):
1366             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1367         else:
1368             # for some reason we get constraint violation instead of insufficient access error
1369             self.fail()
1370
1371     def test_change_password2(self):
1372         """Make sure WP has no influence"""
1373         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1374         sddl = desc.as_sddl(self.domain_sid)
1375         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1376         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1377         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1378         mod = "(A;;WP;;;PS)"
1379         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1380         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1381         sddl = desc.as_sddl(self.domain_sid)
1382         try:
1383             self.ldb_user.modify_ldif("""
1384 dn: """ + self.get_user_dn(self.user_with_wp) + """
1385 changetype: modify
1386 delete: unicodePwd
1387 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1388 add: unicodePwd
1389 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1390 """)
1391         except LdbError, (num, _):
1392             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1393         else:
1394             # for some reason we get constraint violation instead of insufficient access error
1395             self.fail()
1396
1397     def test_change_password3(self):
1398         """Make sure WP has no influence"""
1399         mod = "(D;;WP;;;PS)"
1400         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1401         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1402         sddl = desc.as_sddl(self.domain_sid)
1403         self.ldb_user.modify_ldif("""
1404 dn: """ + self.get_user_dn(self.user_with_wp) + """
1405 changetype: modify
1406 delete: unicodePwd
1407 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1408 add: unicodePwd
1409 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1410 """)
1411
1412     def test_change_password5(self):
1413         """Make sure rights have no influence on dBCSPwd"""
1414         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1415         sddl = desc.as_sddl(self.domain_sid)
1416         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1417         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1418         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1419         mod = "(D;;WP;;;PS)"
1420         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1421         try:
1422             self.ldb_user.modify_ldif("""
1423 dn: """ + self.get_user_dn(self.user_with_wp) + """
1424 changetype: modify
1425 delete: dBCSPwd
1426 dBCSPwd: XXXXXXXXXXXXXXXX
1427 add: dBCSPwd
1428 dBCSPwd: YYYYYYYYYYYYYYYY
1429 """)
1430         except LdbError, (num, _):
1431             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1432         else:
1433             self.fail()
1434
1435     def test_change_password6(self):
1436         """Test uneven delete/adds"""
1437         try:
1438             self.ldb_user.modify_ldif("""
1439 dn: """ + self.get_user_dn(self.user_with_wp) + """
1440 changetype: modify
1441 delete: userPassword
1442 userPassword: thatsAcomplPASS1
1443 delete: userPassword
1444 userPassword: thatsAcomplPASS1
1445 add: userPassword
1446 userPassword: thatsAcomplPASS2
1447 """)
1448         except LdbError, (num, _):
1449             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1450         else:
1451             self.fail()
1452         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1453         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1454         try:
1455             self.ldb_user.modify_ldif("""
1456 dn: """ + self.get_user_dn(self.user_with_wp) + """
1457 changetype: modify
1458 delete: userPassword
1459 userPassword: thatsAcomplPASS1
1460 delete: userPassword
1461 userPassword: thatsAcomplPASS1
1462 add: userPassword
1463 userPassword: thatsAcomplPASS2
1464 """)
1465             # This fails on Windows 2000 domain level with constraint violation
1466         except LdbError, (num, _):
1467             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1468                             num == ERR_UNWILLING_TO_PERFORM)
1469         else:
1470             self.fail()
1471
1472
1473     def test_change_password7(self):
1474         """Try a password change operation without any CARs given"""
1475         #users have change password by default - remove for negative testing
1476         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1477         sddl = desc.as_sddl(self.domain_sid)
1478         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1479         #first change our own password
1480         self.ldb_user2.modify_ldif("""
1481 dn: """ + self.get_user_dn(self.user_with_pc) + """
1482 changetype: modify
1483 delete: unicodePwd
1484 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1485 add: unicodePwd
1486 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1487 """)
1488         #then someone else's
1489         self.ldb_user2.modify_ldif("""
1490 dn: """ + self.get_user_dn(self.user_with_wp) + """
1491 changetype: modify
1492 delete: unicodePwd
1493 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1494 add: unicodePwd
1495 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1496 """)
1497
1498     def test_reset_password1(self):
1499         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1500         try:
1501             self.ldb_user.modify_ldif("""
1502 dn: """ + self.get_user_dn(self.user_with_wp) + """
1503 changetype: modify
1504 replace: unicodePwd
1505 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1506 """)
1507         except LdbError, (num, _):
1508             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1509         else:
1510             self.fail()
1511         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1512         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1513         self.ldb_user.modify_ldif("""
1514 dn: """ + self.get_user_dn(self.user_with_wp) + """
1515 changetype: modify
1516 replace: unicodePwd
1517 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1518 """)
1519
1520     def test_reset_password2(self):
1521         """Try a user password reset operation (userPassword) before and after granting CAR"""
1522         try:
1523             self.ldb_user.modify_ldif("""
1524 dn: """ + self.get_user_dn(self.user_with_wp) + """
1525 changetype: modify
1526 replace: userPassword
1527 userPassword: thatsAcomplPASS1
1528 """)
1529         except LdbError, (num, _):
1530             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1531         else:
1532             self.fail()
1533         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1534         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1535         try:
1536             self.ldb_user.modify_ldif("""
1537 dn: """ + self.get_user_dn(self.user_with_wp) + """
1538 changetype: modify
1539 replace: userPassword
1540 userPassword: thatsAcomplPASS1
1541 """)
1542             # This fails on Windows 2000 domain level with constraint violation
1543         except LdbError, (num, _):
1544             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1545
1546     def test_reset_password3(self):
1547         """Grant WP and see what happens (unicodePwd)"""
1548         mod = "(A;;WP;;;PS)"
1549         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1550         try:
1551             self.ldb_user.modify_ldif("""
1552 dn: """ + self.get_user_dn(self.user_with_wp) + """
1553 changetype: modify
1554 replace: unicodePwd
1555 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1556 """)
1557         except LdbError, (num, _):
1558             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1559         else:
1560             self.fail()
1561
1562     def test_reset_password4(self):
1563         """Grant WP and see what happens (userPassword)"""
1564         mod = "(A;;WP;;;PS)"
1565         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1566         try:
1567             self.ldb_user.modify_ldif("""
1568 dn: """ + self.get_user_dn(self.user_with_wp) + """
1569 changetype: modify
1570 replace: userPassword
1571 userPassword: thatsAcomplPASS1
1572 """)
1573         except LdbError, (num, _):
1574             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1575         else:
1576             self.fail()
1577
1578     def test_reset_password5(self):
1579         """Explicitly deny WP but grant CAR (unicodePwd)"""
1580         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1581         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1582         self.ldb_user.modify_ldif("""
1583 dn: """ + self.get_user_dn(self.user_with_wp) + """
1584 changetype: modify
1585 replace: unicodePwd
1586 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1587 """)
1588
1589     def test_reset_password6(self):
1590         """Explicitly deny WP but grant CAR (userPassword)"""
1591         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1592         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1593         try:
1594             self.ldb_user.modify_ldif("""
1595 dn: """ + self.get_user_dn(self.user_with_wp) + """
1596 changetype: modify
1597 replace: userPassword
1598 userPassword: thatsAcomplPASS1
1599 """)
1600             # This fails on Windows 2000 domain level with constraint violation
1601         except LdbError, (num, _):
1602             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1603
1604 class AclExtendedTests(AclTests):
1605
1606     def setUp(self):
1607         super(AclExtendedTests, self).setUp()
1608         #regular user, will be the creator
1609         self.u1 = "ext_u1"
1610         #regular user
1611         self.u2 = "ext_u2"
1612         #admin user
1613         self.u3 = "ext_u3"
1614         self.ldb_admin.newuser(self.u1, self.user_pass)
1615         self.ldb_admin.newuser(self.u2, self.user_pass)
1616         self.ldb_admin.newuser(self.u3, self.user_pass)
1617         self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
1618                                                 add_members_operation=True)
1619         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1620         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1621         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1622         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1623         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1624
1625     def tearDown(self):
1626         super(AclExtendedTests, self).tearDown()
1627         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1628         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1629         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1630         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1631         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1632
1633         del self.ldb_user1
1634         del self.ldb_user2
1635         del self.ldb_user3
1636
1637     def test_ntSecurityDescriptor(self):
1638         #create empty ou
1639         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1640         #give u1 Create children access
1641         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1642         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1643         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1644         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1645         #create a group under that, grant RP to u2
1646         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
1647                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1648         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1649         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1650         #u2 must not read the descriptor
1651         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1652                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1653         self.assertNotEqual(len(res), 0)
1654         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1655         #grant RC to u2 - still no access
1656         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1657         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1658         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1659                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1660         self.assertNotEqual(len(res), 0)
1661         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1662         #u3 is member of administrators group, should be able to read sd
1663         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1664                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1665         self.assertEqual(len(res),1)
1666         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1667
1668 class AclUndeleteTests(AclTests):
1669
1670     def setUp(self):
1671         super(AclUndeleteTests, self).setUp()
1672         self.regular_user = "undeleter1"
1673         self.ou1 = "OU=undeleted_ou,"
1674         self.testuser1 = "to_be_undeleted1"
1675         self.testuser2 = "to_be_undeleted2"
1676         self.testuser3 = "to_be_undeleted3"
1677         self.testuser4 = "to_be_undeleted4"
1678         self.testuser5 = "to_be_undeleted5"
1679         self.testuser6 = "to_be_undeleted6"
1680
1681         self.new_dn_ou = "CN="+ self.testuser4 + "," + self.ou1 + self.base_dn
1682
1683         # Create regular user
1684         self.testuser1_dn = self.get_user_dn(self.testuser1)
1685         self.testuser2_dn = self.get_user_dn(self.testuser2)
1686         self.testuser3_dn = self.get_user_dn(self.testuser3)
1687         self.testuser4_dn = self.get_user_dn(self.testuser4)
1688         self.testuser5_dn = self.get_user_dn(self.testuser5)
1689         self.deleted_dn1 = self.create_delete_user(self.testuser1)
1690         self.deleted_dn2 = self.create_delete_user(self.testuser2)
1691         self.deleted_dn3 = self.create_delete_user(self.testuser3)
1692         self.deleted_dn4 = self.create_delete_user(self.testuser4)
1693         self.deleted_dn5 = self.create_delete_user(self.testuser5)
1694
1695         self.ldb_admin.create_ou(self.ou1 + self.base_dn)
1696
1697         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1698         self.ldb_admin.add_remove_group_members("Domain Admins", [self.regular_user],
1699                        add_members_operation=True)
1700         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1701         self.sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1702
1703     def tearDown(self):
1704         super(AclUndeleteTests, self).tearDown()
1705         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1706         delete_force(self.ldb_admin, self.get_user_dn(self.testuser1))
1707         delete_force(self.ldb_admin, self.get_user_dn(self.testuser2))
1708         delete_force(self.ldb_admin, self.get_user_dn(self.testuser3))
1709         delete_force(self.ldb_admin, self.get_user_dn(self.testuser4))
1710         delete_force(self.ldb_admin, self.get_user_dn(self.testuser5))
1711         delete_force(self.ldb_admin, self.new_dn_ou)
1712         delete_force(self.ldb_admin, self.ou1 + self.base_dn)
1713
1714         del self.ldb_user
1715
1716     def GUID_string(self, guid):
1717         return ldb.schema_format_value("objectGUID", guid)
1718
1719     def create_delete_user(self, new_user):
1720         self.ldb_admin.newuser(new_user, self.user_pass)
1721
1722         res = self.ldb_admin.search(expression="(objectClass=*)",
1723                                     base=self.get_user_dn(new_user),
1724                                     scope=SCOPE_BASE,
1725                                     controls=["show_deleted:1"])
1726         guid = res[0]["objectGUID"][0]
1727         self.ldb_admin.delete(self.get_user_dn(new_user))
1728         res = self.ldb_admin.search(base="<GUID=%s>" % self.GUID_string(guid),
1729                          scope=SCOPE_BASE, controls=["show_deleted:1"])
1730         self.assertEquals(len(res), 1)
1731         return str(res[0].dn)
1732
1733     def undelete_deleted(self, olddn, newdn):
1734         msg = Message()
1735         msg.dn = Dn(self.ldb_user, olddn)
1736         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
1737         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
1738         res = self.ldb_user.modify(msg, ["show_recycled:1"])
1739
1740     def undelete_deleted_with_mod(self, olddn, newdn):
1741         msg = Message()
1742         msg.dn = Dn(ldb, olddn)
1743         msg["isDeleted"] = MessageElement([], FLAG_MOD_DELETE, "isDeleted")
1744         msg["distinguishedName"] = MessageElement([newdn], FLAG_MOD_REPLACE, "distinguishedName")
1745         msg["url"] = MessageElement(["www.samba.org"], FLAG_MOD_REPLACE, "url")
1746         res = self.ldb_user.modify(msg, ["show_deleted:1"])
1747
1748     def test_undelete(self):
1749         # it appears the user has to have LC on the old parent to be able to move the object
1750         # otherwise we get no such object. Since only System can modify the SD on deleted object
1751         # we cannot grant this permission via LDAP, and this leaves us with "negative" tests at the moment
1752
1753         # deny write property on rdn, should fail
1754         mod = "(OD;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1755         self.sd_utils.dacl_add_ace(self.deleted_dn1, mod)
1756         try:
1757             self.undelete_deleted(self.deleted_dn1, self.testuser1_dn)
1758             self.fail()
1759         except LdbError, (num, _):
1760             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1761
1762         # seems that permissions on isDeleted and distinguishedName are irrelevant
1763         mod = "(OD;;WP;bf96798f-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1764         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
1765         mod = "(OD;;WP;bf9679e4-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.sid)
1766         self.sd_utils.dacl_add_ace(self.deleted_dn2, mod)
1767         self.undelete_deleted(self.deleted_dn2, self.testuser2_dn)
1768
1769         # attempt undelete with simultanious addition of url, WP to which is denied
1770         mod = "(OD;;WP;9a9a0221-4a5b-11d1-a9c3-0000f80367c1;;%s)" % str(self.sid)
1771         self.sd_utils.dacl_add_ace(self.deleted_dn3, mod)
1772         try:
1773             self.undelete_deleted_with_mod(self.deleted_dn3, self.testuser3_dn)
1774             self.fail()
1775         except LdbError, (num, _):
1776             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1777
1778         # undelete in an ou, in which we have no right to create children
1779         mod = "(D;;CC;;;%s)" % str(self.sid)
1780         self.sd_utils.dacl_add_ace(self.ou1 + self.base_dn, mod)
1781         try:
1782             self.undelete_deleted(self.deleted_dn4, self.new_dn_ou)
1783             self.fail()
1784         except LdbError, (num, _):
1785             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1786
1787         # delete is not required
1788         mod = "(D;;SD;;;%s)" % str(self.sid)
1789         self.sd_utils.dacl_add_ace(self.deleted_dn5, mod)
1790         self.undelete_deleted(self.deleted_dn5, self.testuser5_dn)
1791
1792         # deny Reanimate-Tombstone, should fail
1793         mod = "(OD;;CR;45ec5156-db7e-47bb-b53f-dbeb2d03c40f;;%s)" % str(self.sid)
1794         self.sd_utils.dacl_add_ace(self.base_dn, mod)
1795         try:
1796             self.undelete_deleted(self.deleted_dn4, self.testuser4_dn)
1797             self.fail()
1798         except LdbError, (num, _):
1799             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1800
1801 class AclSPNTests(AclTests):
1802
1803     def setUp(self):
1804         super(AclSPNTests, self).setUp()
1805         self.dcname = "TESTSRV8"
1806         self.rodcname = "TESTRODC8"
1807         self.computername = "testcomp8"
1808         self.test_user = "spn_test_user8"
1809         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1810         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1811         self.site = "Default-First-Site-Name"
1812         self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
1813             site=self.site, netbios_name=self.rodcname, targetdir=None,
1814             domain=None)
1815         self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
1816                 netbios_name=self.dcname, targetdir=None, domain=None)
1817         self.ldb_admin.newuser(self.test_user, self.user_pass)
1818         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1819         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1820         self.create_computer(self.computername, self.dcctx.dnsdomain)
1821         self.create_rodc(self.rodcctx)
1822         self.create_dc(self.dcctx)
1823
1824     def tearDown(self):
1825         super(AclSPNTests, self).tearDown()
1826         self.rodcctx.cleanup_old_join()
1827         self.dcctx.cleanup_old_join()
1828         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1829         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1830
1831         del self.ldb_user1
1832
1833     def replace_spn(self, _ldb, dn, spn):
1834         print "Setting spn %s on %s" % (spn, dn)
1835         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1836                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1837         if "servicePrincipalName" in res[0].keys():
1838             flag = FLAG_MOD_REPLACE
1839         else:
1840             flag = FLAG_MOD_ADD
1841
1842         msg = Message()
1843         msg.dn = Dn(self.ldb_admin, dn)
1844         msg["servicePrincipalName"] = MessageElement(spn, flag,
1845                                                          "servicePrincipalName")
1846         _ldb.modify(msg)
1847
1848     def create_computer(self, computername, domainname):
1849         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1850         samaccountname = computername + "$"
1851         dnshostname = "%s.%s" % (computername, domainname)
1852         self.ldb_admin.add({
1853             "dn": dn,
1854             "objectclass": "computer",
1855             "sAMAccountName": samaccountname,
1856             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1857             "dNSHostName": dnshostname})
1858
1859     # same as for join_RODC, but do not set any SPNs
1860     def create_rodc(self, ctx):
1861          ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1862          ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1863          ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1864
1865          ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1866                                   "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1867                                   "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1868                                   "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1869                                   "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
1870          ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1871
1872          mysid = ctx.get_mysid()
1873          admin_dn = "<SID=%s>" % mysid
1874          ctx.managedby = admin_dn
1875
1876          ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1877                               samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1878                               samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1879
1880          ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1881          ctx.secure_channel_type = misc.SEC_CHAN_RODC
1882          ctx.RODC = True
1883          ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1884                                 drsuapi.DRSUAPI_DRS_PER_SYNC |
1885                                 drsuapi.DRSUAPI_DRS_GET_ANC |
1886                                 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1887                                 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1888
1889          ctx.join_add_objects()
1890
1891     def create_dc(self, ctx):
1892         ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1893         ctx.full_nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1894         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1895         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1896         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1897                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1898                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1899                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1900                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
1901
1902         ctx.join_add_objects()
1903
1904     def dc_spn_test(self, ctx):
1905         netbiosdomain = self.dcctx.get_domain_name()
1906         try:
1907             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1908         except LdbError, (num, _):
1909             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1910
1911         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1912         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
1913         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1914         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
1915         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1916                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1917         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
1918         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1919                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1920         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1921                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
1922         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
1923         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1924                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1925         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
1926         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
1927         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1928                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1929         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
1930         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
1931                          (ctx.myname, ctx.dnsdomain))
1932         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
1933                          (ctx.myname))
1934         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1935                          (ctx.myname, ctx.dnsdomain))
1936         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1937                          (ctx.myname, ctx.dnsdomain))
1938         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
1939                          (ctx.ntds_guid, ctx.dnsdomain))
1940
1941         #the following spns do not match the restrictions and should fail
1942         try:
1943             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
1944                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1945         except LdbError, (num, _):
1946             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1947         try:
1948             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
1949                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1950         except LdbError, (num, _):
1951             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1952         try:
1953             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1954         except LdbError, (num, _):
1955             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1956         try:
1957             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1958                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
1959         except LdbError, (num, _):
1960             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1961         try:
1962             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1963                              (ctx.ntds_guid, ctx.dnsdomain))
1964         except LdbError, (num, _):
1965             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1966
1967     def test_computer_spn(self):
1968         # with WP, any value can be set
1969         netbiosdomain = self.dcctx.get_domain_name()
1970         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1971                          (self.computername, netbiosdomain))
1972         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
1973         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1974                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1975         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1976                          (self.computername, self.dcctx.dnsdomain))
1977         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1978                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1979         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
1980                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1981         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1982         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1983                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1984         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
1985                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1986         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1987                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1988         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
1989         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
1990                          (self.computername, self.dcctx.dnsdomain))
1991         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1992                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1993         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
1994                          (self.computername, self.dcctx.dnsdomain))
1995         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
1996                          (self.computername, self.dcctx.dnsdomain))
1997         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
1998                          (self.computername))
1999         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
2000                          (self.computername, self.dcctx.dnsdomain))
2001         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
2002                          (self.computername, self.dcctx.dnsdomain))
2003         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
2004
2005         #user has neither WP nor Validated-SPN, access denied expected
2006         try:
2007             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
2008         except LdbError, (num, _):
2009             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
2010
2011         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
2012         self.sd_utils.dacl_add_ace(self.computerdn, mod)
2013         #grant Validated-SPN and check which values are accepted
2014         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
2015
2016         # for regular computer objects we shouldalways get constraint violation
2017
2018         # This does not pass against Windows, although it should according to docs
2019         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
2020         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
2021                              (self.computername, self.dcctx.dnsdomain))
2022
2023         try:
2024             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
2025         except LdbError, (num, _):
2026             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2027         try:
2028             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
2029                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
2030         except LdbError, (num, _):
2031             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2032         try:
2033             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
2034                              (self.computername, self.dcctx.dnsdomain))
2035         except LdbError, (num, _):
2036             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2037         try:
2038             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
2039                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2040         except LdbError, (num, _):
2041             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2042         try:
2043             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
2044                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
2045         except LdbError, (num, _):
2046             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2047         try:
2048             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
2049         except LdbError, (num, _):
2050             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2051         try:
2052             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
2053                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
2054         except LdbError, (num, _):
2055             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
2056
2057     def test_spn_rwdc(self):
2058         self.dc_spn_test(self.dcctx)
2059
2060     def test_spn_rodc(self):
2061         self.dc_spn_test(self.rodcctx)
2062
2063
2064 # Important unit running information
2065
2066 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
2067
2068 TestProgram(module=__name__, opts=subunitopts)