2 # -*- coding: utf-8 -*-
8 sys.path.insert(0, "bin/python")
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
18 from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
19 from samba.samdb import SamDB
20 from samba.tests import delete_force
22 parser = optparse.OptionParser("deletetest.py [options] <host|file>")
23 sambaopts = options.SambaOptions(parser)
24 parser.add_option_group(sambaopts)
25 parser.add_option_group(options.VersionOptions(parser))
26 # use command line creds if available
27 credopts = options.CredentialsOptions(parser)
28 parser.add_option_group(credopts)
29 subunitopts = SubunitOptions(parser)
30 parser.add_option_group(subunitopts)
31 opts, args = parser.parse_args()
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
42 class BaseDeleteTests(samba.tests.TestCase):
44 def GUID_string(self, guid):
45 return self.ldb.schema_format_value("objectGUID", guid)
48 super(BaseDeleteTests, self).setUp()
49 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
51 self.base_dn = self.ldb.domain_dn()
52 self.configuration_dn = self.ldb.get_config_basedn().get_linearized()
54 def search_guid(self, guid):
55 print "SEARCH by GUID %s" % self.GUID_string(guid)
57 res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
58 scope=SCOPE_BASE, controls=["show_deleted:1"])
59 self.assertEquals(len(res), 1)
62 def search_dn(self,dn):
63 print "SEARCH by DN %s" % dn
65 res = self.ldb.search(expression="(objectClass=*)",
68 controls=["show_deleted:1"])
69 self.assertEquals(len(res), 1)
73 class BasicDeleteTests(BaseDeleteTests):
76 super(BasicDeleteTests, self).setUp()
78 def del_attr_values(self, delObj):
79 print "Checking attributes for %s" % delObj["dn"]
81 self.assertEquals(delObj["isDeleted"][0],"TRUE")
82 self.assertTrue(not("objectCategory" in delObj))
83 self.assertTrue(not("sAMAccountType" in delObj))
85 def preserved_attributes_list(self, liveObj, delObj):
86 print "Checking for preserved attributes list"
88 preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
89 "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
90 "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
91 "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
92 "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
93 "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
94 "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
97 if a in preserved_list:
98 self.assertTrue(a in delObj)
100 def check_rdn(self, liveObj, delObj, rdnName):
101 print "Checking for correct rDN"
102 rdn=liveObj[rdnName][0]
103 rdn2=delObj[rdnName][0]
104 name2=delObj["name"][0]
105 dn_rdn=delObj.dn.get_rdn_value()
106 guid=liveObj["objectGUID"][0]
107 self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid))
108 self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid))
109 self.assertEquals(name2, dn_rdn)
111 def delete_deleted(self, ldb, dn):
112 print "Testing the deletion of the already deleted dn %s" % dn
117 except LdbError as e:
119 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
121 def test_delete_protection(self):
122 """Delete protection tests"""
126 delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
127 delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
128 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
131 "dn": "cn=ldaptestcontainer," + self.base_dn,
132 "objectclass": "container"})
134 "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn,
135 "objectclass": "container"})
137 "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn,
138 "objectclass": "container"})
141 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn)
143 except LdbError as e1:
145 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
147 self.ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
150 res = self.ldb.search("cn=ldaptestcontainer," + self.base_dn,
151 scope=SCOPE_BASE, attrs=[])
153 except LdbError as e2:
155 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
157 res = self.ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
158 scope=SCOPE_BASE, attrs=[])
160 except LdbError as e3:
162 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
164 res = self.ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
165 scope=SCOPE_BASE, attrs=[])
167 except LdbError as e4:
169 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
171 delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
172 delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
173 delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
175 # Performs some protected object delete testing
177 res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
178 attrs=["dsServiceName", "dNSHostName"])
179 self.assertEquals(len(res), 1)
181 # Delete failing since DC's nTDSDSA object is protected
183 self.ldb.delete(res[0]["dsServiceName"][0])
185 except LdbError as e5:
187 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
189 res = self.ldb.search(self.base_dn, attrs=["rIDSetReferences"],
190 expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))")
191 self.assertEquals(len(res), 1)
193 # Deletes failing since DC's rIDSet object is protected
195 self.ldb.delete(res[0]["rIDSetReferences"][0])
197 except LdbError as e6:
199 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
201 self.ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
203 except LdbError as e7:
205 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
207 # Deletes failing since three main crossRef objects are protected
210 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
212 except LdbError as e8:
214 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
216 self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
218 except LdbError as e9:
220 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
223 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
225 except LdbError as e10:
227 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
229 self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
231 except LdbError as e11:
233 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
235 res = self.ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
236 expression="(nCName=%s)" % self.base_dn)
237 self.assertEquals(len(res), 1)
240 self.ldb.delete(res[0].dn)
242 except LdbError as e12:
244 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
246 self.ldb.delete(res[0].dn, ["tree_delete:1"])
248 except LdbError as e13:
250 self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
252 # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
254 self.ldb.delete("CN=Users," + self.base_dn)
256 except LdbError as e14:
258 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
260 # Tree-delete failing since "isCriticalSystemObject"
262 self.ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
264 except LdbError as e15:
266 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
269 """Basic delete tests"""
273 # user current time in ms to make unique objects
275 marker = str(int(round(time.time()*1000)))
276 usr1_name = "u_" + marker
277 usr2_name = "u2_" + marker
278 grp_name = "g1_" + marker
279 site_name = "s1_" + marker
281 usr1 = "cn=%s,cn=users,%s" % (usr1_name, self.base_dn)
282 usr2 = "cn=%s,cn=users,%s" % (usr2_name, self.base_dn)
283 grp1 = "cn=%s,cn=users,%s" % (grp_name, self.base_dn)
284 sit1 = "cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
285 ss1 = "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
286 srv1 = "cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
287 srv2 = "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
289 delete_force(self.ldb, usr1)
290 delete_force(self.ldb, usr2)
291 delete_force(self.ldb, grp1)
292 delete_force(self.ldb, ss1)
293 delete_force(self.ldb, srv2)
294 delete_force(self.ldb, srv1)
295 delete_force(self.ldb, sit1)
299 "objectclass": "user",
300 "description": "test user description",
301 "samaccountname": usr1_name})
305 "objectclass": "user",
306 "description": "test user 2 description",
307 "samaccountname": usr2_name})
311 "objectclass": "group",
312 "description": "test group",
313 "samaccountname": grp_name,
314 "member": [ usr1, usr2 ],
315 "isDeleted": "FALSE" })
319 "objectclass": "site" })
323 "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
327 "objectclass": "serversContainer" })
331 "objectClass": "server" })
333 objLive1 = self.search_dn(usr1)
334 guid1=objLive1["objectGUID"][0]
336 objLive2 = self.search_dn(usr2)
337 guid2=objLive2["objectGUID"][0]
339 objLive3 = self.search_dn(grp1)
340 guid3=objLive3["objectGUID"][0]
342 objLive4 = self.search_dn(sit1)
343 guid4=objLive4["objectGUID"][0]
345 objLive5 = self.search_dn(ss1)
346 guid5=objLive5["objectGUID"][0]
348 objLive6 = self.search_dn(srv1)
349 guid6=objLive6["objectGUID"][0]
351 objLive7 = self.search_dn(srv2)
352 guid7=objLive7["objectGUID"][0]
354 self.ldb.delete(usr1)
355 self.ldb.delete(usr2)
356 self.ldb.delete(grp1)
357 self.ldb.delete(srv1, ["tree_delete:1"])
358 self.ldb.delete(sit1, ["tree_delete:1"])
360 objDeleted1 = self.search_guid(guid1)
361 objDeleted2 = self.search_guid(guid2)
362 objDeleted3 = self.search_guid(guid3)
363 objDeleted4 = self.search_guid(guid4)
364 objDeleted5 = self.search_guid(guid5)
365 objDeleted6 = self.search_guid(guid6)
366 objDeleted7 = self.search_guid(guid7)
368 self.del_attr_values(objDeleted1)
369 self.del_attr_values(objDeleted2)
370 self.del_attr_values(objDeleted3)
371 self.del_attr_values(objDeleted4)
372 self.del_attr_values(objDeleted5)
373 self.del_attr_values(objDeleted6)
374 self.del_attr_values(objDeleted7)
376 self.preserved_attributes_list(objLive1, objDeleted1)
377 self.preserved_attributes_list(objLive2, objDeleted2)
378 self.preserved_attributes_list(objLive3, objDeleted3)
379 self.preserved_attributes_list(objLive4, objDeleted4)
380 self.preserved_attributes_list(objLive5, objDeleted5)
381 self.preserved_attributes_list(objLive6, objDeleted6)
382 self.preserved_attributes_list(objLive7, objDeleted7)
384 self.check_rdn(objLive1, objDeleted1, "cn")
385 self.check_rdn(objLive2, objDeleted2, "cn")
386 self.check_rdn(objLive3, objDeleted3, "cn")
387 self.check_rdn(objLive4, objDeleted4, "cn")
388 self.check_rdn(objLive5, objDeleted5, "cn")
389 self.check_rdn(objLive6, objDeleted6, "cn")
390 self.check_rdn(objLive7, objDeleted7, "cn")
392 self.delete_deleted(self.ldb, usr1)
393 self.delete_deleted(self.ldb, usr2)
394 self.delete_deleted(self.ldb, grp1)
395 self.delete_deleted(self.ldb, sit1)
396 self.delete_deleted(self.ldb, ss1)
397 self.delete_deleted(self.ldb, srv1)
398 self.delete_deleted(self.ldb, srv2)
400 self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn))
401 self.assertTrue("CN=Deleted Objects" in str(objDeleted2.dn))
402 self.assertTrue("CN=Deleted Objects" in str(objDeleted3.dn))
403 self.assertFalse("CN=Deleted Objects" in str(objDeleted4.dn))
404 self.assertTrue("CN=Deleted Objects" in str(objDeleted5.dn))
405 self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
406 self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
409 if not "://" in host:
410 if os.path.isfile(host):
411 host = "tdb://%s" % host
413 host = "ldap://%s" % host
415 TestProgram(module=__name__, opts=subunitopts)