s4-join: Import DNS zones in AD DC join
[ddiss/samba.git] / source4 / dsdb / tests / python / acl.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # This is unit with tests for LDAP access checks
4
5 import optparse
6 import sys
7 import base64
8 import re
9 sys.path.insert(0, "bin/python")
10 import samba
11 samba.ensure_external_module("testtools", "testtools")
12 samba.ensure_external_module("subunit", "subunit/python")
13
14 import samba.getopt as options
15 from samba.join import dc_join
16
17 from ldb import (
18     SCOPE_BASE, SCOPE_SUBTREE, LdbError, ERR_NO_SUCH_OBJECT,
19     ERR_UNWILLING_TO_PERFORM, ERR_INSUFFICIENT_ACCESS_RIGHTS)
20 from ldb import ERR_CONSTRAINT_VIOLATION
21 from ldb import ERR_OPERATIONS_ERROR
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, FLAG_MOD_ADD
24 from samba.dcerpc import security, drsuapi, misc
25
26 from samba.auth import system_session
27 from samba import gensec, sd_utils
28 from samba.samdb import SamDB
29 from samba.credentials import Credentials, DONT_USE_KERBEROS
30 import samba.tests
31 from samba.tests import delete_force
32 from subunit.run import SubunitTestRunner
33 import unittest
34 import samba.dsdb
35
36 parser = optparse.OptionParser("acl.py [options] <host>")
37 sambaopts = options.SambaOptions(parser)
38 parser.add_option_group(sambaopts)
39 parser.add_option_group(options.VersionOptions(parser))
40
41 # use command line creds if available
42 credopts = options.CredentialsOptions(parser)
43 parser.add_option_group(credopts)
44 opts, args = parser.parse_args()
45
46 if len(args) < 1:
47     parser.print_usage()
48     sys.exit(1)
49
50 host = args[0]
51 if not "://" in host:
52     ldaphost = "ldap://%s" % host
53 else:
54     ldaphost = host
55     start = host.rindex("://")
56     host = host.lstrip(start+3)
57
58 lp = sambaopts.get_loadparm()
59 creds = credopts.get_credentials(lp)
60 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
61
62 #
63 # Tests start here
64 #
65
66 class AclTests(samba.tests.TestCase):
67
68     def setUp(self):
69         super(AclTests, self).setUp()
70         self.ldb_admin = ldb
71         self.base_dn = ldb.domain_dn()
72         self.domain_sid = security.dom_sid(ldb.get_domain_sid())
73         self.user_pass = "samba123@"
74         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
75         self.sd_utils = sd_utils.SDUtils(ldb)
76         #used for anonymous login
77         self.creds_tmp = Credentials()
78         self.creds_tmp.set_username("")
79         self.creds_tmp.set_password("")
80         self.creds_tmp.set_domain(creds.get_domain())
81         self.creds_tmp.set_realm(creds.get_realm())
82         self.creds_tmp.set_workstation(creds.get_workstation())
83         print "baseDN: %s" % self.base_dn
84
85     def get_user_dn(self, name):
86         return "CN=%s,CN=Users,%s" % (name, self.base_dn)
87
88     def get_ldb_connection(self, target_username, target_password):
89         creds_tmp = Credentials()
90         creds_tmp.set_username(target_username)
91         creds_tmp.set_password(target_password)
92         creds_tmp.set_domain(creds.get_domain())
93         creds_tmp.set_realm(creds.get_realm())
94         creds_tmp.set_workstation(creds.get_workstation())
95         creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
96                                       | gensec.FEATURE_SEAL)
97         creds_tmp.set_kerberos_state(DONT_USE_KERBEROS) # kinit is too expensive to use in a tight loop
98         ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
99         return ldb_target
100
101     # Test if we have any additional groups for users than default ones
102     def assert_user_no_group_member(self, username):
103         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % self.get_user_dn(username))
104         try:
105             self.assertEqual(res[0]["memberOf"][0], "")
106         except KeyError:
107             pass
108         else:
109             self.fail()
110
111 #tests on ldap add operations
112 class AclAddTests(AclTests):
113
114     def setUp(self):
115         super(AclAddTests, self).setUp()
116         # Domain admin that will be creator of OU parent-child structure
117         self.usr_admin_owner = "acl_add_user1"
118         # Second domain admin that will not be creator of OU parent-child structure
119         self.usr_admin_not_owner = "acl_add_user2"
120         # Regular user
121         self.regular_user = "acl_add_user3"
122         self.test_user1 = "test_add_user1"
123         self.test_group1 = "test_add_group1"
124         self.ou1 = "OU=test_add_ou1"
125         self.ou2 = "OU=test_add_ou2,%s" % self.ou1
126         self.ldb_admin.newuser(self.usr_admin_owner, self.user_pass)
127         self.ldb_admin.newuser(self.usr_admin_not_owner, self.user_pass)
128         self.ldb_admin.newuser(self.regular_user, self.user_pass)
129
130         # add admins to the Domain Admins group
131         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_owner],
132                        add_members_operation=True)
133         self.ldb_admin.add_remove_group_members("Domain Admins", [self.usr_admin_not_owner],
134                        add_members_operation=True)
135
136         self.ldb_owner = self.get_ldb_connection(self.usr_admin_owner, self.user_pass)
137         self.ldb_notowner = self.get_ldb_connection(self.usr_admin_not_owner, self.user_pass)
138         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
139
140     def tearDown(self):
141         super(AclAddTests, self).tearDown()
142         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
143                           (self.test_user1, self.ou2, self.base_dn))
144         delete_force(self.ldb_admin, "CN=%s,%s,%s" %
145                           (self.test_group1, self.ou2, self.base_dn))
146         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
147         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
148         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_owner))
149         delete_force(self.ldb_admin, self.get_user_dn(self.usr_admin_not_owner))
150         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
151         delete_force(self.ldb_admin, self.get_user_dn("test_add_anonymous"))
152
153     # Make sure top OU is deleted (and so everything under it)
154     def assert_top_ou_deleted(self):
155         res = self.ldb_admin.search(self.base_dn,
156             expression="(distinguishedName=%s,%s)" % (
157                 "OU=test_add_ou1", self.base_dn))
158         self.assertEqual(len(res), 0)
159
160     def test_add_u1(self):
161         """Testing OU with the rights of Doman Admin not creator of the OU """
162         self.assert_top_ou_deleted()
163         # Change descriptor for top level OU
164         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
165         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
166         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.usr_admin_not_owner))
167         mod = "(D;CI;WPCC;;;%s)" % str(user_sid)
168         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
169         # Test user and group creation with another domain admin's credentials
170         self.ldb_notowner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
171         self.ldb_notowner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
172                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
173         # Make sure we HAVE created the two objects -- user and group
174         # !!! We should not be able to do that, but however beacuse of ACE ordering our inherited Deny ACE
175         # !!! comes after explicit (A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA) that comes from somewhere
176         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
177         self.assertTrue(len(res) > 0)
178         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
179         self.assertTrue(len(res) > 0)
180
181     def test_add_u2(self):
182         """Testing OU with the regular user that has no rights granted over the OU """
183         self.assert_top_ou_deleted()
184         # Create a parent-child OU structure with domain admin credentials
185         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
186         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
187         # Test user and group creation with regular user credentials
188         try:
189             self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2)
190             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
191                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
192         except LdbError, (num, _):
193             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
194         else:
195             self.fail()
196         # Make sure we HAVEN'T created any of two objects -- user or group
197         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
198         self.assertEqual(len(res), 0)
199         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
200         self.assertEqual(len(res), 0)
201
202     def test_add_u3(self):
203         """Testing OU with the rights of regular user granted the right 'Create User child objects' """
204         self.assert_top_ou_deleted()
205         # Change descriptor for top level OU
206         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
207         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
208         mod = "(OA;CI;CC;bf967aba-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
209         self.sd_utils.dacl_add_ace("OU=test_add_ou1," + self.base_dn, mod)
210         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
211         # Test user and group creation with granted user only to one of the objects
212         self.ldb_user.newuser(self.test_user1, self.user_pass, userou=self.ou2, setpassword=False)
213         try:
214             self.ldb_user.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
215                                    grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
216         except LdbError, (num, _):
217             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
218         else:
219             self.fail()
220         # Make sure we HAVE created the one of two objects -- user
221         res = self.ldb_admin.search(self.base_dn,
222                 expression="(distinguishedName=%s,%s)" %
223                 ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1",
224                     self.base_dn))
225         self.assertNotEqual(len(res), 0)
226         res = self.ldb_admin.search(self.base_dn,
227                 expression="(distinguishedName=%s,%s)" %
228                 ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1",
229                     self.base_dn) )
230         self.assertEqual(len(res), 0)
231
232     def test_add_u4(self):
233         """ 4 Testing OU with the rights of Doman Admin creator of the OU"""
234         self.assert_top_ou_deleted()
235         self.ldb_owner.create_ou("OU=test_add_ou1," + self.base_dn)
236         self.ldb_owner.create_ou("OU=test_add_ou2,OU=test_add_ou1," + self.base_dn)
237         self.ldb_owner.newuser(self.test_user1, self.user_pass, userou=self.ou2)
238         self.ldb_owner.newgroup("test_add_group1", groupou="OU=test_add_ou2,OU=test_add_ou1",
239                                  grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
240         # Make sure we have successfully created the two objects -- user and group
241         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s,%s)" % ("CN=test_add_user1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
242         self.assertTrue(len(res) > 0)
243         res = self.ldb_admin.search(self.base_dn,
244                 expression="(distinguishedName=%s,%s)" % ("CN=test_add_group1,OU=test_add_ou2,OU=test_add_ou1", self.base_dn))
245         self.assertTrue(len(res) > 0)
246
247     def test_add_anonymous(self):
248         """Test add operation with anonymous user"""
249         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
250         try:
251             anonymous.newuser("test_add_anonymous", self.user_pass)
252         except LdbError, (num, _):
253             self.assertEquals(num, ERR_OPERATIONS_ERROR)
254         else:
255             self.fail()
256
257 #tests on ldap modify operations
258 class AclModifyTests(AclTests):
259
260     def setUp(self):
261         super(AclModifyTests, self).setUp()
262         self.user_with_wp = "acl_mod_user1"
263         self.user_with_sm = "acl_mod_user2"
264         self.user_with_group_sm = "acl_mod_user3"
265         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
266         self.ldb_admin.newuser(self.user_with_sm, self.user_pass)
267         self.ldb_admin.newuser(self.user_with_group_sm, self.user_pass)
268         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
269         self.ldb_user2 = self.get_ldb_connection(self.user_with_sm, self.user_pass)
270         self.ldb_user3 = self.get_ldb_connection(self.user_with_group_sm, self.user_pass)
271         self.user_sid = self.sd_utils.get_object_sid( self.get_user_dn(self.user_with_wp))
272         self.ldb_admin.newgroup("test_modify_group2", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
273         self.ldb_admin.newgroup("test_modify_group3", grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
274         self.ldb_admin.newuser("test_modify_user2", self.user_pass)
275
276     def tearDown(self):
277         super(AclModifyTests, self).tearDown()
278         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
279         delete_force(self.ldb_admin, "CN=test_modify_group1,CN=Users," + self.base_dn)
280         delete_force(self.ldb_admin, "CN=test_modify_group2,CN=Users," + self.base_dn)
281         delete_force(self.ldb_admin, "CN=test_modify_group3,CN=Users," + self.base_dn)
282         delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
283         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
284         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_sm))
285         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_group_sm))
286         delete_force(self.ldb_admin, self.get_user_dn("test_modify_user2"))
287         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
288
289     def test_modify_u1(self):
290         """5 Modify one attribute if you have DS_WRITE_PROPERTY for it"""
291         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
292         # First test object -- User
293         print "Testing modify on User object"
294         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
295         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
296         ldif = """
297 dn: """ + self.get_user_dn("test_modify_user1") + """
298 changetype: modify
299 replace: displayName
300 displayName: test_changed"""
301         self.ldb_user.modify_ldif(ldif)
302         res = self.ldb_admin.search(self.base_dn,
303                 expression="(distinguishedName=%s)" % self.get_user_dn("test_modify_user1"))
304         self.assertEqual(res[0]["displayName"][0], "test_changed")
305         # Second test object -- Group
306         print "Testing modify on Group object"
307         self.ldb_admin.newgroup("test_modify_group1",
308                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
309         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
310         ldif = """
311 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
312 changetype: modify
313 replace: displayName
314 displayName: test_changed"""
315         self.ldb_user.modify_ldif(ldif)
316         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("CN=test_modify_group1,CN=Users," + self.base_dn))
317         self.assertEqual(res[0]["displayName"][0], "test_changed")
318         # Third test object -- Organizational Unit
319         print "Testing modify on OU object"
320         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
321         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
322         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
323         ldif = """
324 dn: OU=test_modify_ou1,""" + self.base_dn + """
325 changetype: modify
326 replace: displayName
327 displayName: test_changed"""
328         self.ldb_user.modify_ldif(ldif)
329         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % str("OU=test_modify_ou1," + self.base_dn))
330         self.assertEqual(res[0]["displayName"][0], "test_changed")
331
332     def test_modify_u2(self):
333         """6 Modify two attributes as you have DS_WRITE_PROPERTY granted only for one of them"""
334         mod = "(OA;;WP;bf967953-0de6-11d0-a285-00aa003049e2;;%s)" % str(self.user_sid)
335         # First test object -- User
336         print "Testing modify on User object"
337         #delete_force(self.ldb_admin, self.get_user_dn("test_modify_user1"))
338         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
339         self.sd_utils.dacl_add_ace(self.get_user_dn("test_modify_user1"), mod)
340         # Modify on attribute you have rights for
341         ldif = """
342 dn: """ + self.get_user_dn("test_modify_user1") + """
343 changetype: modify
344 replace: displayName
345 displayName: test_changed"""
346         self.ldb_user.modify_ldif(ldif)
347         res = self.ldb_admin.search(self.base_dn,
348                 expression="(distinguishedName=%s)" %
349                 self.get_user_dn("test_modify_user1"))
350         self.assertEqual(res[0]["displayName"][0], "test_changed")
351         # Modify on attribute you do not have rights for granted
352         ldif = """
353 dn: """ + self.get_user_dn("test_modify_user1") + """
354 changetype: modify
355 replace: url
356 url: www.samba.org"""
357         try:
358             self.ldb_user.modify_ldif(ldif)
359         except LdbError, (num, _):
360             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
361         else:
362             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
363             self.fail()
364         # Second test object -- Group
365         print "Testing modify on Group object"
366         self.ldb_admin.newgroup("test_modify_group1",
367                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
368         self.sd_utils.dacl_add_ace("CN=test_modify_group1,CN=Users," + self.base_dn, mod)
369         ldif = """
370 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
371 changetype: modify
372 replace: displayName
373 displayName: test_changed"""
374         self.ldb_user.modify_ldif(ldif)
375         res = self.ldb_admin.search(self.base_dn,
376                 expression="(distinguishedName=%s)" %
377                 str("CN=test_modify_group1,CN=Users," + self.base_dn))
378         self.assertEqual(res[0]["displayName"][0], "test_changed")
379         # Modify on attribute you do not have rights for granted
380         ldif = """
381 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
382 changetype: modify
383 replace: url
384 url: www.samba.org"""
385         try:
386             self.ldb_user.modify_ldif(ldif)
387         except LdbError, (num, _):
388             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
389         else:
390             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
391             self.fail()
392         # Second test object -- Organizational Unit
393         print "Testing modify on OU object"
394         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
395         self.sd_utils.dacl_add_ace("OU=test_modify_ou1," + self.base_dn, mod)
396         ldif = """
397 dn: OU=test_modify_ou1,""" + self.base_dn + """
398 changetype: modify
399 replace: displayName
400 displayName: test_changed"""
401         self.ldb_user.modify_ldif(ldif)
402         res = self.ldb_admin.search(self.base_dn,
403                 expression="(distinguishedName=%s)" % str("OU=test_modify_ou1,"
404                     + self.base_dn))
405         self.assertEqual(res[0]["displayName"][0], "test_changed")
406         # Modify on attribute you do not have rights for granted
407         ldif = """
408 dn: OU=test_modify_ou1,""" + self.base_dn + """
409 changetype: modify
410 replace: url
411 url: www.samba.org"""
412         try:
413             self.ldb_user.modify_ldif(ldif)
414         except LdbError, (num, _):
415             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
416         else:
417             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
418             self.fail()
419
420     def test_modify_u3(self):
421         """7 Modify one attribute as you have no what so ever rights granted"""
422         # First test object -- User
423         print "Testing modify on User object"
424         self.ldb_admin.newuser("test_modify_user1", self.user_pass)
425         # Modify on attribute you do not have rights for granted
426         ldif = """
427 dn: """ + self.get_user_dn("test_modify_user1") + """
428 changetype: modify
429 replace: url
430 url: www.samba.org"""
431         try:
432             self.ldb_user.modify_ldif(ldif)
433         except LdbError, (num, _):
434             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
435         else:
436             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
437             self.fail()
438
439         # Second test object -- Group
440         print "Testing modify on Group object"
441         self.ldb_admin.newgroup("test_modify_group1",
442                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
443         # Modify on attribute you do not have rights for granted
444         ldif = """
445 dn: CN=test_modify_group1,CN=Users,""" + self.base_dn + """
446 changetype: modify
447 replace: url
448 url: www.samba.org"""
449         try:
450             self.ldb_user.modify_ldif(ldif)
451         except LdbError, (num, _):
452             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
453         else:
454             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
455             self.fail()
456
457         # Second test object -- Organizational Unit
458         print "Testing modify on OU object"
459         #delete_force(self.ldb_admin, "OU=test_modify_ou1," + self.base_dn)
460         self.ldb_admin.create_ou("OU=test_modify_ou1," + self.base_dn)
461         # Modify on attribute you do not have rights for granted
462         ldif = """
463 dn: OU=test_modify_ou1,""" + self.base_dn + """
464 changetype: modify
465 replace: url
466 url: www.samba.org"""
467         try:
468             self.ldb_user.modify_ldif(ldif)
469         except LdbError, (num, _):
470             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
471         else:
472             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
473             self.fail()
474
475
476     def test_modify_u4(self):
477         """11 Grant WP to PRINCIPAL_SELF and test modify"""
478         ldif = """
479 dn: """ + self.get_user_dn(self.user_with_wp) + """
480 changetype: modify
481 add: adminDescription
482 adminDescription: blah blah blah"""
483         try:
484             self.ldb_user.modify_ldif(ldif)
485         except LdbError, (num, _):
486             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
487         else:
488             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
489             self.fail()
490
491         mod = "(OA;;WP;bf967919-0de6-11d0-a285-00aa003049e2;;PS)"
492         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
493         # Modify on attribute you have rights for
494         self.ldb_user.modify_ldif(ldif)
495         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" \
496                                     % self.get_user_dn(self.user_with_wp), attrs=["adminDescription"] )
497         self.assertEqual(res[0]["adminDescription"][0], "blah blah blah")
498
499     def test_modify_u5(self):
500         """12 test self membership"""
501         ldif = """
502 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
503 changetype: modify
504 add: Member
505 Member: """ +  self.get_user_dn(self.user_with_sm)
506 #the user has no rights granted, this should fail
507         try:
508             self.ldb_user2.modify_ldif(ldif)
509         except LdbError, (num, _):
510             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
511         else:
512             # This 'modify' operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
513             self.fail()
514
515 #grant self-membership, should be able to add himself
516         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
517         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
518         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
519         self.ldb_user2.modify_ldif(ldif)
520         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
521                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
522         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_sm))
523 #but not other users
524         ldif = """
525 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
526 changetype: modify
527 add: Member
528 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
529         try:
530             self.ldb_user2.modify_ldif(ldif)
531         except LdbError, (num, _):
532             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
533         else:
534             self.fail()
535
536     def test_modify_u6(self):
537         """13 test self membership"""
538         ldif = """
539 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
540 changetype: modify
541 add: Member
542 Member: """ +  self.get_user_dn(self.user_with_sm) + """
543 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
544
545 #grant self-membership, should be able to add himself  but not others at the same time
546         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_sm))
547         mod = "(OA;;SW;bf9679c0-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
548         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
549         try:
550             self.ldb_user2.modify_ldif(ldif)
551         except LdbError, (num, _):
552             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
553         else:
554             self.fail()
555
556     def test_modify_u7(self):
557         """13 User with WP modifying Member"""
558 #a second user is given write property permission
559         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.user_with_wp))
560         mod = "(A;;WP;;;%s)" % str(user_sid)
561         self.sd_utils.dacl_add_ace("CN=test_modify_group2,CN=Users," + self.base_dn, mod)
562         ldif = """
563 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
564 changetype: modify
565 add: Member
566 Member: """ +  self.get_user_dn(self.user_with_wp)
567         self.ldb_user.modify_ldif(ldif)
568         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
569                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
570         self.assertEqual(res[0]["Member"][0], self.get_user_dn(self.user_with_wp))
571         ldif = """
572 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
573 changetype: modify
574 delete: Member"""
575         self.ldb_user.modify_ldif(ldif)
576         ldif = """
577 dn: CN=test_modify_group2,CN=Users,""" + self.base_dn + """
578 changetype: modify
579 add: Member
580 Member: CN=test_modify_user2,CN=Users,""" + self.base_dn
581         self.ldb_user.modify_ldif(ldif)
582         res = self.ldb_admin.search( self.base_dn, expression="(distinguishedName=%s)" \
583                                     % ("CN=test_modify_group2,CN=Users," + self.base_dn), attrs=["Member"])
584         self.assertEqual(res[0]["Member"][0], "CN=test_modify_user2,CN=Users," + self.base_dn)
585
586     def test_modify_anonymous(self):
587         """Test add operation with anonymous user"""
588         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
589         self.ldb_admin.newuser("test_anonymous", "samba123@")
590         m = Message()
591         m.dn = Dn(anonymous, self.get_user_dn("test_anonymous"))
592
593         m["description"] = MessageElement("sambauser2",
594                                           FLAG_MOD_ADD,
595                                           "description")
596         try:
597             anonymous.modify(m)
598         except LdbError, (num, _):
599             self.assertEquals(num, ERR_OPERATIONS_ERROR)
600         else:
601             self.fail()
602
603 #enable these when we have search implemented
604 class AclSearchTests(AclTests):
605
606     def setUp(self):
607         super(AclSearchTests, self).setUp()
608         self.u1 = "search_u1"
609         self.u2 = "search_u2"
610         self.u3 = "search_u3"
611         self.group1 = "group1"
612         self.ldb_admin.newuser(self.u1, self.user_pass)
613         self.ldb_admin.newuser(self.u2, self.user_pass)
614         self.ldb_admin.newuser(self.u3, self.user_pass)
615         self.ldb_admin.newgroup(self.group1, grouptype=samba.dsdb.GTYPE_SECURITY_GLOBAL_GROUP)
616         self.ldb_admin.add_remove_group_members(self.group1, [self.u2],
617                                                 add_members_operation=True)
618         self.ldb_user = self.get_ldb_connection(self.u1, self.user_pass)
619         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
620         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
621         self.full_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
622                           Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
623                           Dn(self.ldb_admin,  "OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
624                           Dn(self.ldb_admin,  "OU=ou4,OU=ou2,OU=ou1," + self.base_dn),
625                           Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
626                           Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
627         self.user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
628         self.group_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.group1))
629
630     def create_clean_ou(self, object_dn):
631         """ Base repeating setup for unittests to follow """
632         res = self.ldb_admin.search(base=self.base_dn, scope=SCOPE_SUBTREE, \
633                 expression="distinguishedName=%s" % object_dn)
634         # Make sure top testing OU has been deleted before starting the test
635         self.assertEqual(len(res), 0)
636         self.ldb_admin.create_ou(object_dn)
637         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
638         # Make sure there are inheritable ACEs initially
639         self.assertTrue("CI" in desc_sddl or "OI" in desc_sddl)
640         # Find and remove all inherit ACEs
641         res = re.findall("\(.*?\)", desc_sddl)
642         res = [x for x in res if ("CI" in x) or ("OI" in x)]
643         for x in res:
644             desc_sddl = desc_sddl.replace(x, "")
645         # Add flag 'protected' in both DACL and SACL so no inherit ACEs
646         # can propagate from above
647         # remove SACL, we are not interested
648         desc_sddl = desc_sddl.replace(":AI", ":AIP")
649         self.sd_utils.modify_sd_on_dn(object_dn, desc_sddl)
650         # Verify all inheritable ACEs are gone
651         desc_sddl = self.sd_utils.get_sd_as_sddl(object_dn)
652         self.assertFalse("CI" in desc_sddl)
653         self.assertFalse("OI" in desc_sddl)
654
655     def tearDown(self):
656         super(AclSearchTests, self).tearDown()
657         delete_force(self.ldb_admin, "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
658         delete_force(self.ldb_admin, "OU=test_search_ou1," + self.base_dn)
659         delete_force(self.ldb_admin, "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
660         delete_force(self.ldb_admin, "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
661         delete_force(self.ldb_admin, "OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
662         delete_force(self.ldb_admin, "OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
663         delete_force(self.ldb_admin, "OU=ou2,OU=ou1," + self.base_dn)
664         delete_force(self.ldb_admin, "OU=ou1," + self.base_dn)
665         delete_force(self.ldb_admin, self.get_user_dn("search_u1"))
666         delete_force(self.ldb_admin, self.get_user_dn("search_u2"))
667         delete_force(self.ldb_admin, self.get_user_dn("search_u3"))
668         delete_force(self.ldb_admin, self.get_user_dn("group1"))
669
670     def test_search_anonymous1(self):
671         """Verify access of rootDSE with the correct request"""
672         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
673         res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_BASE)
674         self.assertEquals(len(res), 1)
675         #verify some of the attributes
676         #dont care about values
677         self.assertTrue("ldapServiceName" in res[0])
678         self.assertTrue("namingContexts" in res[0])
679         self.assertTrue("isSynchronized" in res[0])
680         self.assertTrue("dsServiceName" in res[0])
681         self.assertTrue("supportedSASLMechanisms" in res[0])
682         self.assertTrue("isGlobalCatalogReady" in res[0])
683         self.assertTrue("domainControllerFunctionality" in res[0])
684         self.assertTrue("serverName" in res[0])
685
686     def test_search_anonymous2(self):
687         """Make sure we cannot access anything else"""
688         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
689         try:
690             res = anonymous.search("", expression="(objectClass=*)", scope=SCOPE_SUBTREE)
691         except LdbError, (num, _):
692             self.assertEquals(num, ERR_OPERATIONS_ERROR)
693         else:
694             self.fail()
695         try:
696             res = anonymous.search(self.base_dn, expression="(objectClass=*)", scope=SCOPE_SUBTREE)
697         except LdbError, (num, _):
698             self.assertEquals(num, ERR_OPERATIONS_ERROR)
699         else:
700             self.fail()
701         try:
702             res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
703                                         scope=SCOPE_SUBTREE)
704         except LdbError, (num, _):
705             self.assertEquals(num, ERR_OPERATIONS_ERROR)
706         else:
707             self.fail()
708
709     def test_search_anonymous3(self):
710         """Set dsHeuristics and repeat"""
711         self.ldb_admin.set_dsheuristics("0000002")
712         self.ldb_admin.create_ou("OU=test_search_ou1," + self.base_dn)
713         mod = "(A;CI;LC;;;AN)"
714         self.sd_utils.dacl_add_ace("OU=test_search_ou1," + self.base_dn, mod)
715         self.ldb_admin.create_ou("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn)
716         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
717         res = anonymous.search("OU=test_search_ou2,OU=test_search_ou1," + self.base_dn,
718                                expression="(objectClass=*)", scope=SCOPE_SUBTREE)
719         self.assertEquals(len(res), 1)
720         self.assertTrue("dn" in res[0])
721         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin,
722                                            "OU=test_search_ou2,OU=test_search_ou1," + self.base_dn))
723         res = anonymous.search(anonymous.get_config_basedn(), expression="(objectClass=*)",
724                                scope=SCOPE_SUBTREE)
725         self.assertEquals(len(res), 1)
726         self.assertTrue("dn" in res[0])
727         self.assertTrue(res[0]["dn"] == Dn(self.ldb_admin, self.configuration_dn))
728
729     def test_search1(self):
730         """Make sure users can see us if given LC to user and group"""
731         self.create_clean_ou("OU=ou1," + self.base_dn)
732         mod = "(A;;LC;;;%s)(A;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
733         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
734         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
735                                                  self.domain_sid)
736         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
737         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
738         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
739         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
740         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
741
742         #regular users must see only ou1 and ou2
743         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
744                                     scope=SCOPE_SUBTREE)
745         self.assertEquals(len(res), 2)
746         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
747                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
748
749         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
750         self.assertEquals(sorted(res_list), sorted(ok_list))
751
752         #these users should see all ous
753         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
754                                     scope=SCOPE_SUBTREE)
755         self.assertEquals(len(res), 6)
756         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
757         self.assertEquals(sorted(res_list), sorted(self.full_list))
758
759         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
760                                     scope=SCOPE_SUBTREE)
761         self.assertEquals(len(res), 6)
762         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
763         self.assertEquals(sorted(res_list), sorted(self.full_list))
764
765     def test_search2(self):
766         """Make sure users can't see us if access is explicitly denied"""
767         self.create_clean_ou("OU=ou1," + self.base_dn)
768         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn)
769         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
770         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
771         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn)
772         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)
773         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid)) 
774         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
775         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
776                                     scope=SCOPE_SUBTREE)
777         #this user should see all ous
778         res_list = [ x["dn"] for x in res if x["dn"] in self.full_list ]
779         self.assertEquals(sorted(res_list), sorted(self.full_list))
780
781         #these users should see ou1, 2, 5 and 6 but not 3 and 4
782         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
783                                     scope=SCOPE_SUBTREE)
784         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
785                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
786                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
787                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
788         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
789         self.assertEquals(sorted(res_list), sorted(ok_list))
790
791         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
792                                     scope=SCOPE_SUBTREE)
793         self.assertEquals(len(res), 4)
794         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
795         self.assertEquals(sorted(res_list), sorted(ok_list))
796
797     def test_search3(self):
798         """Make sure users can't see ous if access is explicitly denied - 2"""
799         self.create_clean_ou("OU=ou1," + self.base_dn)
800         mod = "(A;CI;LC;;;%s)(A;CI;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
801         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
802         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
803                                                  self.domain_sid)
804         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
805         self.ldb_admin.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
806         self.ldb_admin.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
807         self.ldb_admin.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
808         self.ldb_admin.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
809
810         print "Testing correct behavior on nonaccessible search base"
811         try:
812              self.ldb_user3.search("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
813                                    scope=SCOPE_BASE)
814         except LdbError, (num, _):
815             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
816         else:
817             self.fail()
818
819         mod = "(D;;LC;;;%s)(D;;LC;;;%s)" % (str(self.user_sid), str(self.group_sid))
820         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
821
822         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
823                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
824
825         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
826                                     scope=SCOPE_SUBTREE)
827         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
828         self.assertEquals(sorted(res_list), sorted(ok_list))
829
830         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
831                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn),
832                    Dn(self.ldb_admin,  "OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn),
833                    Dn(self.ldb_admin,  "OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn)]
834
835         #should not see ou3 and ou4, but should see ou5 and ou6
836         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
837                                     scope=SCOPE_SUBTREE)
838         self.assertEquals(len(res), 4)
839         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
840         self.assertEquals(sorted(res_list), sorted(ok_list))
841
842         res = self.ldb_user2.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
843                                     scope=SCOPE_SUBTREE)
844         self.assertEquals(len(res), 4)
845         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
846         self.assertEquals(sorted(res_list), sorted(ok_list))
847
848     def test_search4(self):
849         """There is no difference in visibility if the user is also creator"""
850         self.create_clean_ou("OU=ou1," + self.base_dn)
851         mod = "(A;CI;CC;;;%s)" % (str(self.user_sid))
852         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
853         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
854                                                  self.domain_sid)
855         self.ldb_user.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
856         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
857         self.ldb_user.create_ou("OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
858         self.ldb_user.create_ou("OU=ou5,OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
859         self.ldb_user.create_ou("OU=ou6,OU=ou4,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
860
861         ok_list = [Dn(self.ldb_admin,  "OU=ou2,OU=ou1," + self.base_dn),
862                    Dn(self.ldb_admin,  "OU=ou1," + self.base_dn)]
863         res = self.ldb_user3.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
864                                     scope=SCOPE_SUBTREE)
865         self.assertEquals(len(res), 2)
866         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
867         self.assertEquals(sorted(res_list), sorted(ok_list))
868
869         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(objectClass=*)",
870                                     scope=SCOPE_SUBTREE)
871         self.assertEquals(len(res), 2)
872         res_list = [ x["dn"] for x in res if x["dn"] in ok_list ]
873         self.assertEquals(sorted(res_list), sorted(ok_list))
874
875     def test_search5(self):
876         """Make sure users can see only attributes they are allowed to see"""
877         self.create_clean_ou("OU=ou1," + self.base_dn)
878         mod = "(A;CI;LC;;;%s)" % (str(self.user_sid))
879         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
880         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
881                                                  self.domain_sid)
882         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
883         # assert user can only see dn
884         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
885                                     scope=SCOPE_SUBTREE)
886         ok_list = ['dn']
887         self.assertEquals(len(res), 1)
888         res_list = res[0].keys()
889         self.assertEquals(res_list, ok_list)
890
891         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
892                                     scope=SCOPE_BASE, attrs=["ou"])
893
894         self.assertEquals(len(res), 1)
895         res_list = res[0].keys()
896         self.assertEquals(res_list, ok_list)
897
898         #give read property on ou and assert user can only see dn and ou
899         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
900         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
901         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
902         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
903                                     scope=SCOPE_SUBTREE)
904         ok_list = ['dn', 'ou']
905         self.assertEquals(len(res), 1)
906         res_list = res[0].keys()
907         self.assertEquals(sorted(res_list), sorted(ok_list))
908
909         #give read property on Public Information and assert user can see ou and other members
910         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
911         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
912         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
913         res = self.ldb_user.search("OU=ou2,OU=ou1," + self.base_dn, expression="(objectClass=*)",
914                                     scope=SCOPE_SUBTREE)
915
916         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
917         res_list = res[0].keys()
918         self.assertEquals(sorted(res_list), sorted(ok_list))
919
920     def test_search6(self):
921         """If an attribute that cannot be read is used in a filter, it is as if the attribute does not exist"""
922         self.create_clean_ou("OU=ou1," + self.base_dn)
923         mod = "(A;CI;LCCC;;;%s)" % (str(self.user_sid))
924         self.sd_utils.dacl_add_ace("OU=ou1," + self.base_dn, mod)
925         tmp_desc = security.descriptor.from_sddl("D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)" + mod,
926                                                  self.domain_sid)
927         self.ldb_admin.create_ou("OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
928         self.ldb_user.create_ou("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, sd=tmp_desc)
929
930         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
931                                     scope=SCOPE_SUBTREE)
932         #nothing should be returned as ou is not accessible
933         self.assertEquals(len(res), 0)
934
935         #give read property on ou and assert user can only see dn and ou
936         mod = "(OA;;RP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % (str(self.user_sid))
937         self.sd_utils.dacl_add_ace("OU=ou3,OU=ou2,OU=ou1," + self.base_dn, mod)
938         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou3)",
939                                     scope=SCOPE_SUBTREE)
940         self.assertEquals(len(res), 1)
941         ok_list = ['dn', 'ou']
942         res_list = res[0].keys()
943         self.assertEquals(sorted(res_list), sorted(ok_list))
944
945         #give read property on Public Information and assert user can see ou and other members
946         mod = "(OA;;RP;e48d0154-bcf8-11d1-8702-00c04fb96050;;%s)" % (str(self.user_sid))
947         self.sd_utils.dacl_add_ace("OU=ou2,OU=ou1," + self.base_dn, mod)
948         res = self.ldb_user.search("OU=ou1," + self.base_dn, expression="(ou=ou2)",
949                                    scope=SCOPE_SUBTREE)
950         self.assertEquals(len(res), 1)
951         ok_list = ['dn', 'objectClass', 'ou', 'distinguishedName', 'name', 'objectGUID', 'objectCategory']
952         res_list = res[0].keys()
953         self.assertEquals(sorted(res_list), sorted(ok_list))
954
955 #tests on ldap delete operations
956 class AclDeleteTests(AclTests):
957
958     def setUp(self):
959         super(AclDeleteTests, self).setUp()
960         self.regular_user = "acl_delete_user1"
961         # Create regular user
962         self.ldb_admin.newuser(self.regular_user, self.user_pass)
963         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
964
965     def tearDown(self):
966         super(AclDeleteTests, self).tearDown()
967         delete_force(self.ldb_admin, self.get_user_dn("test_delete_user1"))
968         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
969         delete_force(self.ldb_admin, self.get_user_dn("test_anonymous"))
970
971     def test_delete_u1(self):
972         """User is prohibited by default to delete another User object"""
973         # Create user that we try to delete
974         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
975         # Here delete User object should ALWAYS through exception
976         try:
977             self.ldb_user.delete(self.get_user_dn("test_delete_user1"))
978         except LdbError, (num, _):
979             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
980         else:
981             self.fail()
982
983     def test_delete_u2(self):
984         """User's group has RIGHT_DELETE to another User object"""
985         user_dn = self.get_user_dn("test_delete_user1")
986         # Create user that we try to delete
987         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
988         mod = "(A;;SD;;;AU)"
989         self.sd_utils.dacl_add_ace(user_dn, mod)
990         # Try to delete User object
991         self.ldb_user.delete(user_dn)
992         res = self.ldb_admin.search(self.base_dn,
993                 expression="(distinguishedName=%s)" % user_dn)
994         self.assertEqual(len(res), 0)
995
996     def test_delete_u3(self):
997         """User indentified by SID has RIGHT_DELETE to another User object"""
998         user_dn = self.get_user_dn("test_delete_user1")
999         # Create user that we try to delete
1000         self.ldb_admin.newuser("test_delete_user1", self.user_pass)
1001         mod = "(A;;SD;;;%s)" % self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1002         self.sd_utils.dacl_add_ace(user_dn, mod)
1003         # Try to delete User object
1004         self.ldb_user.delete(user_dn)
1005         res = self.ldb_admin.search(self.base_dn,
1006                 expression="(distinguishedName=%s)" % user_dn)
1007         self.assertEqual(len(res), 0)
1008
1009     def test_delete_anonymous(self):
1010         """Test add operation with anonymous user"""
1011         anonymous = SamDB(url=ldaphost, credentials=self.creds_tmp, lp=lp)
1012         self.ldb_admin.newuser("test_anonymous", "samba123@")
1013
1014         try:
1015             anonymous.delete(self.get_user_dn("test_anonymous"))
1016         except LdbError, (num, _):
1017             self.assertEquals(num, ERR_OPERATIONS_ERROR)
1018         else:
1019             self.fail()
1020
1021 #tests on ldap rename operations
1022 class AclRenameTests(AclTests):
1023
1024     def setUp(self):
1025         super(AclRenameTests, self).setUp()
1026         self.regular_user = "acl_rename_user1"
1027         self.ou1 = "OU=test_rename_ou1"
1028         self.ou2 = "OU=test_rename_ou2"
1029         self.ou3 = "OU=test_rename_ou3,%s" % self.ou2
1030         self.testuser1 = "test_rename_user1"
1031         self.testuser2 = "test_rename_user2"
1032         self.testuser3 = "test_rename_user3"
1033         self.testuser4 = "test_rename_user4"
1034         self.testuser5 = "test_rename_user5"
1035         # Create regular user
1036         self.ldb_admin.newuser(self.regular_user, self.user_pass)
1037         self.ldb_user = self.get_ldb_connection(self.regular_user, self.user_pass)
1038
1039     def tearDown(self):
1040         super(AclRenameTests, self).tearDown()
1041         # Rename OU3
1042         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou3, self.base_dn))
1043         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou3, self.base_dn))
1044         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou3, self.base_dn))
1045         delete_force(self.ldb_admin, "%s,%s" % (self.ou3, self.base_dn))
1046         # Rename OU2
1047         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou2, self.base_dn))
1048         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou2, self.base_dn))
1049         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou2, self.base_dn))
1050         delete_force(self.ldb_admin, "%s,%s" % (self.ou2, self.base_dn))
1051         # Rename OU1
1052         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn))
1053         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser2, self.ou1, self.base_dn))
1054         delete_force(self.ldb_admin, "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1055         delete_force(self.ldb_admin, "OU=test_rename_ou3,%s,%s" % (self.ou1, self.base_dn))
1056         delete_force(self.ldb_admin, "%s,%s" % (self.ou1, self.base_dn))
1057         delete_force(self.ldb_admin, self.get_user_dn(self.regular_user))
1058
1059     def test_rename_u1(self):
1060         """Regular user fails to rename 'User object' within single OU"""
1061         # Create OU structure
1062         self.ldb_admin.create_ou("OU=test_rename_ou1," + self.base_dn)
1063         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1064         try:
1065             self.ldb_user.rename("CN=%s,%s,%s" % (self.testuser1, self.ou1, self.base_dn), \
1066                                      "CN=%s,%s,%s" % (self.testuser5, self.ou1, self.base_dn))
1067         except LdbError, (num, _):
1068             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1069         else:
1070             self.fail()
1071
1072     def test_rename_u2(self):
1073         """Grant WRITE_PROPERTY to AU so regular user can rename 'User object' within single OU"""
1074         ou_dn = "OU=test_rename_ou1," + self.base_dn
1075         user_dn = "CN=test_rename_user1," + ou_dn
1076         rename_user_dn = "CN=test_rename_user5," + ou_dn
1077         # Create OU structure
1078         self.ldb_admin.create_ou(ou_dn)
1079         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1080         mod = "(A;;WP;;;AU)"
1081         self.sd_utils.dacl_add_ace(user_dn, mod)
1082         # Rename 'User object' having WP to AU
1083         self.ldb_user.rename(user_dn, rename_user_dn)
1084         res = self.ldb_admin.search(self.base_dn,
1085                 expression="(distinguishedName=%s)" % user_dn)
1086         self.assertEqual(len(res), 0)
1087         res = self.ldb_admin.search(self.base_dn,
1088                 expression="(distinguishedName=%s)" % rename_user_dn)
1089         self.assertNotEqual(len(res), 0)
1090
1091     def test_rename_u3(self):
1092         """Test rename with rights granted to 'User object' SID"""
1093         ou_dn = "OU=test_rename_ou1," + self.base_dn
1094         user_dn = "CN=test_rename_user1," + ou_dn
1095         rename_user_dn = "CN=test_rename_user5," + ou_dn
1096         # Create OU structure
1097         self.ldb_admin.create_ou(ou_dn)
1098         self.ldb_admin.newuser(self.testuser1, self.user_pass, userou=self.ou1)
1099         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1100         mod = "(A;;WP;;;%s)" % str(sid)
1101         self.sd_utils.dacl_add_ace(user_dn, mod)
1102         # Rename 'User object' having WP to AU
1103         self.ldb_user.rename(user_dn, rename_user_dn)
1104         res = self.ldb_admin.search(self.base_dn,
1105                 expression="(distinguishedName=%s)" % user_dn)
1106         self.assertEqual(len(res), 0)
1107         res = self.ldb_admin.search(self.base_dn,
1108                 expression="(distinguishedName=%s)" % rename_user_dn)
1109         self.assertNotEqual(len(res), 0)
1110
1111     def test_rename_u4(self):
1112         """Rename 'User object' cross OU with WP, SD and CC right granted on reg. user to AU"""
1113         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1114         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1115         user_dn = "CN=test_rename_user2," + ou1_dn
1116         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1117         # Create OU structure
1118         self.ldb_admin.create_ou(ou1_dn)
1119         self.ldb_admin.create_ou(ou2_dn)
1120         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1121         mod = "(A;;WPSD;;;AU)"
1122         self.sd_utils.dacl_add_ace(user_dn, mod)
1123         mod = "(A;;CC;;;AU)"
1124         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1125         # Rename 'User object' having SD and CC to AU
1126         self.ldb_user.rename(user_dn, rename_user_dn)
1127         res = self.ldb_admin.search(self.base_dn,
1128                 expression="(distinguishedName=%s)" % user_dn)
1129         self.assertEqual(len(res), 0)
1130         res = self.ldb_admin.search(self.base_dn,
1131                 expression="(distinguishedName=%s)" % rename_user_dn)
1132         self.assertNotEqual(len(res), 0)
1133
1134     def test_rename_u5(self):
1135         """Test rename with rights granted to 'User object' SID"""
1136         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1137         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1138         user_dn = "CN=test_rename_user2," + ou1_dn
1139         rename_user_dn = "CN=test_rename_user5," + ou2_dn
1140         # Create OU structure
1141         self.ldb_admin.create_ou(ou1_dn)
1142         self.ldb_admin.create_ou(ou2_dn)
1143         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1144         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1145         mod = "(A;;WPSD;;;%s)" % str(sid)
1146         self.sd_utils.dacl_add_ace(user_dn, mod)
1147         mod = "(A;;CC;;;%s)" % str(sid)
1148         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1149         # Rename 'User object' having SD and CC to AU
1150         self.ldb_user.rename(user_dn, rename_user_dn)
1151         res = self.ldb_admin.search(self.base_dn,
1152                 expression="(distinguishedName=%s)" % user_dn)
1153         self.assertEqual(len(res), 0)
1154         res = self.ldb_admin.search(self.base_dn,
1155                 expression="(distinguishedName=%s)" % rename_user_dn)
1156         self.assertNotEqual(len(res), 0)
1157
1158     def test_rename_u6(self):
1159         """Rename 'User object' cross OU with WP, DC and CC right granted on OU & user to AU"""
1160         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1161         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1162         user_dn = "CN=test_rename_user2," + ou1_dn
1163         rename_user_dn = "CN=test_rename_user2," + ou2_dn
1164         # Create OU structure
1165         self.ldb_admin.create_ou(ou1_dn)
1166         self.ldb_admin.create_ou(ou2_dn)
1167         #mod = "(A;CI;DCWP;;;AU)"
1168         mod = "(A;;DC;;;AU)"
1169         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1170         mod = "(A;;CC;;;AU)"
1171         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1172         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1173         mod = "(A;;WP;;;AU)"
1174         self.sd_utils.dacl_add_ace(user_dn, mod)
1175         # Rename 'User object' having SD and CC to AU
1176         self.ldb_user.rename(user_dn, rename_user_dn)
1177         res = self.ldb_admin.search(self.base_dn,
1178                 expression="(distinguishedName=%s)" % user_dn)
1179         self.assertEqual(len(res), 0)
1180         res = self.ldb_admin.search(self.base_dn,
1181                 expression="(distinguishedName=%s)" % rename_user_dn)
1182         self.assertNotEqual(len(res), 0)
1183
1184     def test_rename_u7(self):
1185         """Rename 'User object' cross OU (second level) with WP, DC and CC right granted on OU to AU"""
1186         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1187         ou2_dn = "OU=test_rename_ou2," + self.base_dn
1188         ou3_dn = "OU=test_rename_ou3," + ou2_dn
1189         user_dn = "CN=test_rename_user2," + ou1_dn
1190         rename_user_dn = "CN=test_rename_user5," + ou3_dn
1191         # Create OU structure
1192         self.ldb_admin.create_ou(ou1_dn)
1193         self.ldb_admin.create_ou(ou2_dn)
1194         self.ldb_admin.create_ou(ou3_dn)
1195         mod = "(A;CI;WPDC;;;AU)"
1196         self.sd_utils.dacl_add_ace(ou1_dn, mod)
1197         mod = "(A;;CC;;;AU)"
1198         self.sd_utils.dacl_add_ace(ou3_dn, mod)
1199         self.ldb_admin.newuser(self.testuser2, self.user_pass, userou=self.ou1)
1200         # Rename 'User object' having SD and CC to AU
1201         self.ldb_user.rename(user_dn, rename_user_dn)
1202         res = self.ldb_admin.search(self.base_dn,
1203                 expression="(distinguishedName=%s)" % user_dn)
1204         self.assertEqual(len(res), 0)
1205         res = self.ldb_admin.search(self.base_dn,
1206                 expression="(distinguishedName=%s)" % rename_user_dn)
1207         self.assertNotEqual(len(res), 0)
1208
1209     def test_rename_u8(self):
1210         """Test rename on an object with and without modify access on the RDN attribute"""
1211         ou1_dn = "OU=test_rename_ou1," + self.base_dn
1212         ou2_dn = "OU=test_rename_ou2," + ou1_dn
1213         ou3_dn = "OU=test_rename_ou3," + ou1_dn
1214         # Create OU structure
1215         self.ldb_admin.create_ou(ou1_dn)
1216         self.ldb_admin.create_ou(ou2_dn)
1217         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1218         mod = "(OA;;WP;bf967a0e-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1219         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1220         mod = "(OD;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1221         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1222         try:
1223             self.ldb_user.rename(ou2_dn, ou3_dn)
1224         except LdbError, (num, _):
1225             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1226         else:
1227             # This rename operation should always throw ERR_INSUFFICIENT_ACCESS_RIGHTS
1228             self.fail()
1229         sid = self.sd_utils.get_object_sid(self.get_user_dn(self.regular_user))
1230         mod = "(A;;WP;bf9679f0-0de6-11d0-a285-00aa003049e2;;%s)" % str(sid)
1231         self.sd_utils.dacl_add_ace(ou2_dn, mod)
1232         self.ldb_user.rename(ou2_dn, ou3_dn)
1233         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou2_dn)
1234         self.assertEqual(len(res), 0)
1235         res = self.ldb_admin.search(self.base_dn, expression="(distinguishedName=%s)" % ou3_dn)
1236         self.assertNotEqual(len(res), 0)
1237
1238 #tests on Control Access Rights
1239 class AclCARTests(AclTests):
1240
1241     def setUp(self):
1242         super(AclCARTests, self).setUp()
1243         self.user_with_wp = "acl_car_user1"
1244         self.user_with_pc = "acl_car_user2"
1245         self.ldb_admin.newuser(self.user_with_wp, self.user_pass)
1246         self.ldb_admin.newuser(self.user_with_pc, self.user_pass)
1247         self.ldb_user = self.get_ldb_connection(self.user_with_wp, self.user_pass)
1248         self.ldb_user2 = self.get_ldb_connection(self.user_with_pc, self.user_pass)
1249
1250     def tearDown(self):
1251         super(AclCARTests, self).tearDown()
1252         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_wp))
1253         delete_force(self.ldb_admin, self.get_user_dn(self.user_with_pc))
1254
1255     def test_change_password1(self):
1256         """Try a password change operation without any CARs given"""
1257         #users have change password by default - remove for negative testing
1258         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1259         sddl = desc.as_sddl(self.domain_sid)
1260         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1261         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1262         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1263         try:
1264             self.ldb_user.modify_ldif("""
1265 dn: """ + self.get_user_dn(self.user_with_wp) + """
1266 changetype: modify
1267 delete: unicodePwd
1268 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1269 add: unicodePwd
1270 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1271 """)
1272         except LdbError, (num, _):
1273             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1274         else:
1275             # for some reason we get constraint violation instead of insufficient access error
1276             self.fail()
1277
1278     def test_change_password2(self):
1279         """Make sure WP has no influence"""
1280         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1281         sddl = desc.as_sddl(self.domain_sid)
1282         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1283         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1284         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1285         mod = "(A;;WP;;;PS)"
1286         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1287         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1288         sddl = desc.as_sddl(self.domain_sid)
1289         try:
1290             self.ldb_user.modify_ldif("""
1291 dn: """ + self.get_user_dn(self.user_with_wp) + """
1292 changetype: modify
1293 delete: unicodePwd
1294 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1295 add: unicodePwd
1296 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1297 """)
1298         except LdbError, (num, _):
1299             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1300         else:
1301             # for some reason we get constraint violation instead of insufficient access error
1302             self.fail()
1303
1304     def test_change_password3(self):
1305         """Make sure WP has no influence"""
1306         mod = "(D;;WP;;;PS)"
1307         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1308         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1309         sddl = desc.as_sddl(self.domain_sid)
1310         self.ldb_user.modify_ldif("""
1311 dn: """ + self.get_user_dn(self.user_with_wp) + """
1312 changetype: modify
1313 delete: unicodePwd
1314 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1315 add: unicodePwd
1316 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1317 """)
1318
1319     def test_change_password5(self):
1320         """Make sure rights have no influence on dBCSPwd"""
1321         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1322         sddl = desc.as_sddl(self.domain_sid)
1323         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;WD)", "")
1324         sddl = sddl.replace("(OA;;CR;ab721a53-1e2f-11d0-9819-00aa0040529b;;PS)", "")
1325         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1326         mod = "(D;;WP;;;PS)"
1327         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1328         try:
1329             self.ldb_user.modify_ldif("""
1330 dn: """ + self.get_user_dn(self.user_with_wp) + """
1331 changetype: modify
1332 delete: dBCSPwd
1333 dBCSPwd: XXXXXXXXXXXXXXXX
1334 add: dBCSPwd
1335 dBCSPwd: YYYYYYYYYYYYYYYY
1336 """)
1337         except LdbError, (num, _):
1338             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
1339         else:
1340             self.fail()
1341
1342     def test_change_password6(self):
1343         """Test uneven delete/adds"""
1344         try:
1345             self.ldb_user.modify_ldif("""
1346 dn: """ + self.get_user_dn(self.user_with_wp) + """
1347 changetype: modify
1348 delete: userPassword
1349 userPassword: thatsAcomplPASS1
1350 delete: userPassword
1351 userPassword: thatsAcomplPASS1
1352 add: userPassword
1353 userPassword: thatsAcomplPASS2
1354 """)
1355         except LdbError, (num, _):
1356             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1357         else:
1358             self.fail()
1359         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1360         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1361         try:
1362             self.ldb_user.modify_ldif("""
1363 dn: """ + self.get_user_dn(self.user_with_wp) + """
1364 changetype: modify
1365 delete: userPassword
1366 userPassword: thatsAcomplPASS1
1367 delete: userPassword
1368 userPassword: thatsAcomplPASS1
1369 add: userPassword
1370 userPassword: thatsAcomplPASS2
1371 """)
1372             # This fails on Windows 2000 domain level with constraint violation
1373         except LdbError, (num, _):
1374             self.assertTrue(num == ERR_CONSTRAINT_VIOLATION or
1375                             num == ERR_UNWILLING_TO_PERFORM)
1376         else:
1377             self.fail()
1378
1379
1380     def test_change_password7(self):
1381         """Try a password change operation without any CARs given"""
1382         #users have change password by default - remove for negative testing
1383         desc = self.sd_utils.read_sd_on_dn(self.get_user_dn(self.user_with_wp))
1384         sddl = desc.as_sddl(self.domain_sid)
1385         self.sd_utils.modify_sd_on_dn(self.get_user_dn(self.user_with_wp), sddl)
1386         #first change our own password
1387         self.ldb_user2.modify_ldif("""
1388 dn: """ + self.get_user_dn(self.user_with_pc) + """
1389 changetype: modify
1390 delete: unicodePwd
1391 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1392 add: unicodePwd
1393 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1394 """)
1395         #then someone else's
1396         self.ldb_user2.modify_ldif("""
1397 dn: """ + self.get_user_dn(self.user_with_wp) + """
1398 changetype: modify
1399 delete: unicodePwd
1400 unicodePwd:: """ + base64.b64encode("\"samba123@\"".encode('utf-16-le')) + """
1401 add: unicodePwd
1402 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS2\"".encode('utf-16-le')) + """
1403 """)
1404
1405     def test_reset_password1(self):
1406         """Try a user password reset operation (unicodePwd) before and after granting CAR"""
1407         try:
1408             self.ldb_user.modify_ldif("""
1409 dn: """ + self.get_user_dn(self.user_with_wp) + """
1410 changetype: modify
1411 replace: unicodePwd
1412 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1413 """)
1414         except LdbError, (num, _):
1415             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1416         else:
1417             self.fail()
1418         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1419         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1420         self.ldb_user.modify_ldif("""
1421 dn: """ + self.get_user_dn(self.user_with_wp) + """
1422 changetype: modify
1423 replace: unicodePwd
1424 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1425 """)
1426
1427     def test_reset_password2(self):
1428         """Try a user password reset operation (userPassword) before and after granting CAR"""
1429         try:
1430             self.ldb_user.modify_ldif("""
1431 dn: """ + self.get_user_dn(self.user_with_wp) + """
1432 changetype: modify
1433 replace: userPassword
1434 userPassword: thatsAcomplPASS1
1435 """)
1436         except LdbError, (num, _):
1437             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1438         else:
1439             self.fail()
1440         mod = "(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1441         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1442         try:
1443             self.ldb_user.modify_ldif("""
1444 dn: """ + self.get_user_dn(self.user_with_wp) + """
1445 changetype: modify
1446 replace: userPassword
1447 userPassword: thatsAcomplPASS1
1448 """)
1449             # This fails on Windows 2000 domain level with constraint violation
1450         except LdbError, (num, _):
1451             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1452
1453     def test_reset_password3(self):
1454         """Grant WP and see what happens (unicodePwd)"""
1455         mod = "(A;;WP;;;PS)"
1456         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1457         try:
1458             self.ldb_user.modify_ldif("""
1459 dn: """ + self.get_user_dn(self.user_with_wp) + """
1460 changetype: modify
1461 replace: unicodePwd
1462 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1463 """)
1464         except LdbError, (num, _):
1465             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1466         else:
1467             self.fail()
1468
1469     def test_reset_password4(self):
1470         """Grant WP and see what happens (userPassword)"""
1471         mod = "(A;;WP;;;PS)"
1472         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1473         try:
1474             self.ldb_user.modify_ldif("""
1475 dn: """ + self.get_user_dn(self.user_with_wp) + """
1476 changetype: modify
1477 replace: userPassword
1478 userPassword: thatsAcomplPASS1
1479 """)
1480         except LdbError, (num, _):
1481             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1482         else:
1483             self.fail()
1484
1485     def test_reset_password5(self):
1486         """Explicitly deny WP but grant CAR (unicodePwd)"""
1487         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1488         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1489         self.ldb_user.modify_ldif("""
1490 dn: """ + self.get_user_dn(self.user_with_wp) + """
1491 changetype: modify
1492 replace: unicodePwd
1493 unicodePwd:: """ + base64.b64encode("\"thatsAcomplPASS1\"".encode('utf-16-le')) + """
1494 """)
1495
1496     def test_reset_password6(self):
1497         """Explicitly deny WP but grant CAR (userPassword)"""
1498         mod = "(D;;WP;;;PS)(OA;;CR;00299570-246d-11d0-a768-00aa006e0529;;PS)"
1499         self.sd_utils.dacl_add_ace(self.get_user_dn(self.user_with_wp), mod)
1500         try:
1501             self.ldb_user.modify_ldif("""
1502 dn: """ + self.get_user_dn(self.user_with_wp) + """
1503 changetype: modify
1504 replace: userPassword
1505 userPassword: thatsAcomplPASS1
1506 """)
1507             # This fails on Windows 2000 domain level with constraint violation
1508         except LdbError, (num, _):
1509             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1510
1511 class AclExtendedTests(AclTests):
1512
1513     def setUp(self):
1514         super(AclExtendedTests, self).setUp()
1515         #regular user, will be the creator
1516         self.u1 = "ext_u1"
1517         #regular user
1518         self.u2 = "ext_u2"
1519         #admin user
1520         self.u3 = "ext_u3"
1521         self.ldb_admin.newuser(self.u1, self.user_pass)
1522         self.ldb_admin.newuser(self.u2, self.user_pass)
1523         self.ldb_admin.newuser(self.u3, self.user_pass)
1524         self.ldb_admin.add_remove_group_members("Domain Admins", [self.u3],
1525                                                 add_members_operation=True)
1526         self.ldb_user1 = self.get_ldb_connection(self.u1, self.user_pass)
1527         self.ldb_user2 = self.get_ldb_connection(self.u2, self.user_pass)
1528         self.ldb_user3 = self.get_ldb_connection(self.u3, self.user_pass)
1529         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.u1))
1530         self.user_sid2 = self.sd_utils.get_object_sid(self.get_user_dn(self.u2))
1531
1532     def tearDown(self):
1533         super(AclExtendedTests, self).tearDown()
1534         delete_force(self.ldb_admin, self.get_user_dn(self.u1))
1535         delete_force(self.ldb_admin, self.get_user_dn(self.u2))
1536         delete_force(self.ldb_admin, self.get_user_dn(self.u3))
1537         delete_force(self.ldb_admin, "CN=ext_group1,OU=ext_ou1," + self.base_dn)
1538         delete_force(self.ldb_admin, "ou=ext_ou1," + self.base_dn)
1539
1540     def test_ntSecurityDescriptor(self):
1541         #create empty ou
1542         self.ldb_admin.create_ou("ou=ext_ou1," + self.base_dn)
1543         #give u1 Create children access
1544         mod = "(A;;CC;;;%s)" % str(self.user_sid1)
1545         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1546         mod = "(A;;LC;;;%s)" % str(self.user_sid2)
1547         self.sd_utils.dacl_add_ace("OU=ext_ou1," + self.base_dn, mod)
1548         #create a group under that, grant RP to u2
1549         self.ldb_user1.newgroup("ext_group1", groupou="OU=ext_ou1",
1550                                 grouptype=samba.dsdb.GTYPE_DISTRIBUTION_DOMAIN_LOCAL_GROUP)
1551         mod = "(A;;RP;;;%s)" % str(self.user_sid2)
1552         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1553         #u2 must not read the descriptor
1554         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1555                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1556         self.assertNotEqual(len(res), 0)
1557         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1558         #grant RC to u2 - still no access
1559         mod = "(A;;RC;;;%s)" % str(self.user_sid2)
1560         self.sd_utils.dacl_add_ace("CN=ext_group1,OU=ext_ou1," + self.base_dn, mod)
1561         res = self.ldb_user2.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1562                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1563         self.assertNotEqual(len(res), 0)
1564         self.assertFalse("nTSecurityDescriptor" in res[0].keys())
1565         #u3 is member of administrators group, should be able to read sd
1566         res = self.ldb_user3.search("CN=ext_group1,OU=ext_ou1," + self.base_dn,
1567                                     SCOPE_BASE, None, ["nTSecurityDescriptor"])
1568         self.assertEqual(len(res),1)
1569         self.assertTrue("nTSecurityDescriptor" in res[0].keys())
1570
1571
1572 class AclSPNTests(AclTests):
1573
1574     def setUp(self):
1575         super(AclSPNTests, self).setUp()
1576         self.dcname = "TESTSRV8"
1577         self.rodcname = "TESTRODC8"
1578         self.computername = "testcomp8"
1579         self.test_user = "spn_test_user8"
1580         self.computerdn = "CN=%s,CN=computers,%s" % (self.computername, self.base_dn)
1581         self.dc_dn = "CN=%s,OU=Domain Controllers,%s" % (self.dcname, self.base_dn)
1582         self.site = "Default-First-Site-Name"
1583         self.rodcctx = dc_join(server=host, creds=creds, lp=lp,
1584             site=self.site, netbios_name=self.rodcname, targetdir=None,
1585             domain=None)
1586         self.dcctx = dc_join(server=host, creds=creds, lp=lp, site=self.site,
1587                 netbios_name=self.dcname, targetdir=None, domain=None)
1588         self.ldb_admin.newuser(self.test_user, self.user_pass)
1589         self.ldb_user1 = self.get_ldb_connection(self.test_user, self.user_pass)
1590         self.user_sid1 = self.sd_utils.get_object_sid(self.get_user_dn(self.test_user))
1591         self.create_computer(self.computername, self.dcctx.dnsdomain)
1592         self.create_rodc(self.rodcctx)
1593         self.create_dc(self.dcctx)
1594
1595     def tearDown(self):
1596         super(AclSPNTests, self).tearDown()
1597         self.rodcctx.cleanup_old_join()
1598         self.dcctx.cleanup_old_join()
1599         delete_force(self.ldb_admin, "cn=%s,cn=computers,%s" % (self.computername, self.base_dn))
1600         delete_force(self.ldb_admin, self.get_user_dn(self.test_user))
1601
1602     def replace_spn(self, _ldb, dn, spn):
1603         print "Setting spn %s on %s" % (spn, dn)
1604         res = self.ldb_admin.search(dn, expression="(objectClass=*)",
1605                                     scope=SCOPE_BASE, attrs=["servicePrincipalName"])
1606         if "servicePrincipalName" in res[0].keys():
1607             flag = FLAG_MOD_REPLACE
1608         else:
1609             flag = FLAG_MOD_ADD
1610
1611         msg = Message()
1612         msg.dn = Dn(self.ldb_admin, dn)
1613         msg["servicePrincipalName"] = MessageElement(spn, flag,
1614                                                          "servicePrincipalName")
1615         _ldb.modify(msg)
1616
1617     def create_computer(self, computername, domainname):
1618         dn = "CN=%s,CN=computers,%s" % (computername, self.base_dn)
1619         samaccountname = computername + "$"
1620         dnshostname = "%s.%s" % (computername, domainname)
1621         self.ldb_admin.add({
1622             "dn": dn,
1623             "objectclass": "computer",
1624             "sAMAccountName": samaccountname,
1625             "userAccountControl": str(samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT),
1626             "dNSHostName": dnshostname})
1627
1628     # same as for join_RODC, but do not set any SPNs
1629     def create_rodc(self, ctx):
1630          ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1631          ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
1632
1633          ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
1634                                   "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
1635                                   "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
1636                                   "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
1637                                   "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
1638          ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
1639
1640          mysid = ctx.get_mysid()
1641          admin_dn = "<SID=%s>" % mysid
1642          ctx.managedby = admin_dn
1643
1644          ctx.userAccountControl = (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT |
1645                               samba.dsdb.UF_TRUSTED_TO_AUTHENTICATE_FOR_DELEGATION |
1646                               samba.dsdb.UF_PARTIAL_SECRETS_ACCOUNT)
1647
1648          ctx.connection_dn = "CN=RODC Connection (FRS),%s" % ctx.ntds_dn
1649          ctx.secure_channel_type = misc.SEC_CHAN_RODC
1650          ctx.RODC = True
1651          ctx.replica_flags  =  (drsuapi.DRSUAPI_DRS_INIT_SYNC |
1652                                 drsuapi.DRSUAPI_DRS_PER_SYNC |
1653                                 drsuapi.DRSUAPI_DRS_GET_ANC |
1654                                 drsuapi.DRSUAPI_DRS_NEVER_SYNCED |
1655                                 drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING)
1656
1657          ctx.join_add_objects()
1658
1659     def create_dc(self, ctx):
1660         ctx.nc_list = [ ctx.base_dn, ctx.config_dn, ctx.schema_dn ]
1661         ctx.userAccountControl = samba.dsdb.UF_SERVER_TRUST_ACCOUNT | samba.dsdb.UF_TRUSTED_FOR_DELEGATION
1662         ctx.secure_channel_type = misc.SEC_CHAN_BDC
1663         ctx.replica_flags = (drsuapi.DRSUAPI_DRS_WRIT_REP |
1664                              drsuapi.DRSUAPI_DRS_INIT_SYNC |
1665                              drsuapi.DRSUAPI_DRS_PER_SYNC |
1666                              drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
1667                              drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
1668
1669         ctx.join_add_objects()
1670
1671     def dc_spn_test(self, ctx):
1672         netbiosdomain = self.dcctx.get_domain_name()
1673         try:
1674             self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1675         except LdbError, (num, _):
1676             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1677
1678         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1679         self.sd_utils.dacl_add_ace(ctx.acct_dn, mod)
1680         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, netbiosdomain))
1681         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s" % (ctx.myname))
1682         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1683                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1684         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s/%s" % (ctx.myname, ctx.dnsdomain))
1685         self.replace_spn(self.ldb_user1, ctx.acct_dn, "HOST/%s.%s/%s" %
1686                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1687         self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1688                          (ctx.myname, ctx.dnsdomain, ctx.dnsforest))
1689         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, netbiosdomain))
1690         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1691                          (ctx.myname, ctx.dnsdomain, netbiosdomain))
1692         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s" % (ctx.myname))
1693         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s/%s" % (ctx.myname, ctx.dnsdomain))
1694         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/%s" %
1695                          (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1696         self.replace_spn(self.ldb_user1, ctx.acct_dn, "DNS/%s/%s" % (ctx.myname, ctx.dnsdomain))
1697         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s/%s" %
1698                          (ctx.myname, ctx.dnsdomain))
1699         self.replace_spn(self.ldb_user1, ctx.acct_dn, "RestrictedKrbHost/%s" %
1700                          (ctx.myname))
1701         self.replace_spn(self.ldb_user1, ctx.acct_dn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1702                          (ctx.myname, ctx.dnsdomain))
1703         self.replace_spn(self.ldb_user1, ctx.acct_dn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1704                          (ctx.myname, ctx.dnsdomain))
1705         self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s._msdcs.%s" %
1706                          (ctx.ntds_guid, ctx.dnsdomain))
1707
1708         #the following spns do not match the restrictions and should fail
1709         try:
1710             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/ForestDnsZones.%s" %
1711                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1712         except LdbError, (num, _):
1713             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1714         try:
1715             self.replace_spn(self.ldb_user1, ctx.acct_dn, "ldap/%s.%s/DomainDnsZones.%s" %
1716                              (ctx.myname, ctx.dnsdomain, ctx.dnsdomain))
1717         except LdbError, (num, _):
1718             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1719         try:
1720             self.replace_spn(self.ldb_user1, ctx.acct_dn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1721         except LdbError, (num, _):
1722             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1723         try:
1724             self.replace_spn(self.ldb_user1, ctx.acct_dn, "GC/%s.%s/%s" %
1725                              (ctx.myname, ctx.dnsdomain, netbiosdomain))
1726         except LdbError, (num, _):
1727             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1728         try:
1729             self.replace_spn(self.ldb_user1, ctx.acct_dn, "E3514235-4B06-11D1-AB04-00C04FC2DCD2/%s/%s" %
1730                              (ctx.ntds_guid, ctx.dnsdomain))
1731         except LdbError, (num, _):
1732             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1733
1734     def test_computer_spn(self):
1735         # with WP, any value can be set
1736         netbiosdomain = self.dcctx.get_domain_name()
1737         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1738                          (self.computername, netbiosdomain))
1739         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s" % (self.computername))
1740         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1741                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1742         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s/%s" %
1743                          (self.computername, self.dcctx.dnsdomain))
1744         self.replace_spn(self.ldb_admin, self.computerdn, "HOST/%s.%s/%s" %
1745                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1746         self.replace_spn(self.ldb_admin, self.computerdn, "GC/%s.%s/%s" %
1747                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1748         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1749         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1750                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1751         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/DomainDnsZones.%s" %
1752                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1753         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1754                          (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1755         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s" % (self.computername))
1756         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s/%s" %
1757                          (self.computername, self.dcctx.dnsdomain))
1758         self.replace_spn(self.ldb_admin, self.computerdn, "ldap/%s.%s/%s" %
1759                          (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1760         self.replace_spn(self.ldb_admin, self.computerdn, "DNS/%s/%s" %
1761                          (self.computername, self.dcctx.dnsdomain))
1762         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s/%s" %
1763                          (self.computername, self.dcctx.dnsdomain))
1764         self.replace_spn(self.ldb_admin, self.computerdn, "RestrictedKrbHost/%s" %
1765                          (self.computername))
1766         self.replace_spn(self.ldb_admin, self.computerdn, "Dfsr-12F9A27C-BF97-4787-9364-D31B6C55EB04/%s/%s" %
1767                          (self.computername, self.dcctx.dnsdomain))
1768         self.replace_spn(self.ldb_admin, self.computerdn, "NtFrs-88f5d2bd-b646-11d2-a6d3-00c04fc9b232/%s/%s" %
1769                          (self.computername, self.dcctx.dnsdomain))
1770         self.replace_spn(self.ldb_admin, self.computerdn, "nosuchservice/%s/%s" % ("abcd", "abcd"))
1771
1772         #user has neither WP nor Validated-SPN, access denied expected
1773         try:
1774             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1775         except LdbError, (num, _):
1776             self.assertEquals(num, ERR_INSUFFICIENT_ACCESS_RIGHTS)
1777
1778         mod = "(OA;;SW;f3a64788-5306-11d1-a9c5-0000f80367c1;;%s)" % str(self.user_sid1)
1779         self.sd_utils.dacl_add_ace(self.computerdn, mod)
1780         #grant Validated-SPN and check which values are accepted
1781         #see 3.1.1.5.3.1.1.4 servicePrincipalName for reference
1782
1783         # for regular computer objects we shouldalways get constraint violation
1784
1785         # This does not pass against Windows, although it should according to docs
1786         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s" % (self.computername))
1787         self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s" %
1788                              (self.computername, self.dcctx.dnsdomain))
1789
1790         try:
1791             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" % (self.computername, netbiosdomain))
1792         except LdbError, (num, _):
1793             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1794         try:
1795             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1796                              (self.computername, self.dcctx.dnsdomain, netbiosdomain))
1797         except LdbError, (num, _):
1798             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1799         try:
1800             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s/%s" %
1801                              (self.computername, self.dcctx.dnsdomain))
1802         except LdbError, (num, _):
1803             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1804         try:
1805             self.replace_spn(self.ldb_user1, self.computerdn, "HOST/%s.%s/%s" %
1806                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1807         except LdbError, (num, _):
1808             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1809         try:
1810             self.replace_spn(self.ldb_user1, self.computerdn, "GC/%s.%s/%s" %
1811                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsforest))
1812         except LdbError, (num, _):
1813             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1814         try:
1815             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s/%s" % (self.computername, netbiosdomain))
1816         except LdbError, (num, _):
1817             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1818         try:
1819             self.replace_spn(self.ldb_user1, self.computerdn, "ldap/%s.%s/ForestDnsZones.%s" %
1820                              (self.computername, self.dcctx.dnsdomain, self.dcctx.dnsdomain))
1821         except LdbError, (num, _):
1822             self.assertEquals(num, ERR_CONSTRAINT_VIOLATION)
1823
1824     def test_spn_rwdc(self):
1825         self.dc_spn_test(self.dcctx)
1826
1827     def test_spn_rodc(self):
1828         self.dc_spn_test(self.rodcctx)
1829
1830
1831 # Important unit running information
1832
1833 ldb = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
1834
1835 runner = SubunitTestRunner()
1836 rc = 0
1837 if not runner.run(unittest.makeSuite(AclAddTests)).wasSuccessful():
1838     rc = 1
1839 if not runner.run(unittest.makeSuite(AclModifyTests)).wasSuccessful():
1840     rc = 1
1841 if not runner.run(unittest.makeSuite(AclDeleteTests)).wasSuccessful():
1842     rc = 1
1843 if not runner.run(unittest.makeSuite(AclRenameTests)).wasSuccessful():
1844     rc = 1
1845
1846 # Get the old "dSHeuristics" if it was set
1847 dsheuristics = ldb.get_dsheuristics()
1848 # Set the "dSHeuristics" to activate the correct "userPassword" behaviour
1849 ldb.set_dsheuristics("000000001")
1850 # Get the old "minPwdAge"
1851 minPwdAge = ldb.get_minPwdAge()
1852 # Set it temporarely to "0"
1853 ldb.set_minPwdAge("0")
1854 if not runner.run(unittest.makeSuite(AclCARTests)).wasSuccessful():
1855     rc = 1
1856 if not runner.run(unittest.makeSuite(AclSearchTests)).wasSuccessful():
1857     rc = 1
1858 # Reset the "dSHeuristics" as they were before
1859 ldb.set_dsheuristics(dsheuristics)
1860 # Reset the "minPwdAge" as it was before
1861 ldb.set_minPwdAge(minPwdAge)
1862
1863 if not runner.run(unittest.makeSuite(AclExtendedTests)).wasSuccessful():
1864     rc = 1
1865 if not runner.run(unittest.makeSuite(AclSPNTests)).wasSuccessful():
1866     rc = 1
1867 sys.exit(rc)