dsdb python tests: convert 'except X, (tuple)' to 'except X as e'
[metze/samba/wip.git] / source4 / dsdb / tests / python / deletetest.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import optparse
5 import sys
6 import os
7
8 sys.path.insert(0, "bin/python")
9 import samba
10
11 from samba.tests.subunitrun import SubunitOptions, TestProgram
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 from ldb import SCOPE_BASE, LdbError, Message, MessageElement, Dn, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
17 from ldb import ERR_NO_SUCH_OBJECT, ERR_NOT_ALLOWED_ON_NON_LEAF, ERR_ENTRY_ALREADY_EXISTS, ERR_ATTRIBUTE_OR_VALUE_EXISTS
18 from ldb import ERR_UNWILLING_TO_PERFORM, ERR_OPERATIONS_ERROR
19 from samba.samdb import SamDB
20 from samba.tests import delete_force
21
22 parser = optparse.OptionParser("deletetest.py [options] <host|file>")
23 sambaopts = options.SambaOptions(parser)
24 parser.add_option_group(sambaopts)
25 parser.add_option_group(options.VersionOptions(parser))
26 # use command line creds if available
27 credopts = options.CredentialsOptions(parser)
28 parser.add_option_group(credopts)
29 subunitopts = SubunitOptions(parser)
30 parser.add_option_group(subunitopts)
31 opts, args = parser.parse_args()
32
33 if len(args) < 1:
34     parser.print_usage()
35     sys.exit(1)
36
37 host = args[0]
38
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
41
42 class BaseDeleteTests(samba.tests.TestCase):
43
44     def GUID_string(self, guid):
45         return self.ldb.schema_format_value("objectGUID", guid)
46
47     def setUp(self):
48         super(BaseDeleteTests, self).setUp()
49         self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
50
51         self.base_dn = self.ldb.domain_dn()
52         self.configuration_dn = self.ldb.get_config_basedn().get_linearized()
53
54     def search_guid(self, guid):
55         print "SEARCH by GUID %s" % self.GUID_string(guid)
56
57         res = self.ldb.search(base="<GUID=%s>" % self.GUID_string(guid),
58                          scope=SCOPE_BASE, controls=["show_deleted:1"])
59         self.assertEquals(len(res), 1)
60         return res[0]
61
62     def search_dn(self,dn):
63         print "SEARCH by DN %s" % dn
64
65         res = self.ldb.search(expression="(objectClass=*)",
66                          base=dn,
67                          scope=SCOPE_BASE,
68                          controls=["show_deleted:1"])
69         self.assertEquals(len(res), 1)
70         return res[0]
71
72
73 class BasicDeleteTests(BaseDeleteTests):
74
75     def setUp(self):
76         super(BasicDeleteTests, self).setUp()
77
78     def del_attr_values(self, delObj):
79         print "Checking attributes for %s" % delObj["dn"]
80
81         self.assertEquals(delObj["isDeleted"][0],"TRUE")
82         self.assertTrue(not("objectCategory" in delObj))
83         self.assertTrue(not("sAMAccountType" in delObj))
84
85     def preserved_attributes_list(self, liveObj, delObj):
86         print "Checking for preserved attributes list"
87
88         preserved_list = ["nTSecurityDescriptor", "attributeID", "attributeSyntax", "dNReferenceUpdate", "dNSHostName",
89         "flatName", "governsID", "groupType", "instanceType", "lDAPDisplayName", "legacyExchangeDN",
90         "isDeleted", "isRecycled", "lastKnownParent", "msDS-LastKnownRDN", "mS-DS-CreatorSID",
91         "mSMQOwnerID", "nCName", "objectClass", "distinguishedName", "objectGUID", "objectSid",
92         "oMSyntax", "proxiedObjectName", "name", "replPropertyMetaData", "sAMAccountName",
93         "securityIdentifier", "sIDHistory", "subClassOf", "systemFlags", "trustPartner", "trustDirection",
94         "trustType", "trustAttributes", "userAccountControl", "uSNChanged", "uSNCreated", "whenCreated"]
95
96         for a in liveObj:
97             if a in preserved_list:
98                 self.assertTrue(a in delObj)
99
100     def check_rdn(self, liveObj, delObj, rdnName):
101         print "Checking for correct rDN"
102         rdn=liveObj[rdnName][0]
103         rdn2=delObj[rdnName][0]
104         name2=delObj["name"][0]
105         dn_rdn=delObj.dn.get_rdn_value()
106         guid=liveObj["objectGUID"][0]
107         self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid))
108         self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid))
109         self.assertEquals(name2, dn_rdn)
110
111     def delete_deleted(self, ldb, dn):
112         print "Testing the deletion of the already deleted dn %s" % dn
113
114         try:
115             ldb.delete(dn)
116             self.fail()
117         except LdbError as e:
118             (num, _) = e.args
119             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
120
121     def test_delete_protection(self):
122         """Delete protection tests"""
123
124         print self.base_dn
125
126         delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
127         delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
128         delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
129
130         self.ldb.add({
131             "dn": "cn=ldaptestcontainer," + self.base_dn,
132             "objectclass": "container"})
133         self.ldb.add({
134             "dn": "cn=entry1,cn=ldaptestcontainer," + self.base_dn,
135             "objectclass": "container"})
136         self.ldb.add({
137             "dn": "cn=entry2,cn=ldaptestcontainer," + self.base_dn,
138             "objectclass": "container"})
139
140         try:
141             self.ldb.delete("cn=ldaptestcontainer," + self.base_dn)
142             self.fail()
143         except LdbError as e1:
144             (num, _) = e1.args
145             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
146
147         self.ldb.delete("cn=ldaptestcontainer," + self.base_dn, ["tree_delete:1"])
148
149         try:
150             res = self.ldb.search("cn=ldaptestcontainer," + self.base_dn,
151                              scope=SCOPE_BASE, attrs=[])
152             self.fail()
153         except LdbError as e2:
154             (num, _) = e2.args
155             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
156         try:
157             res = self.ldb.search("cn=entry1,cn=ldaptestcontainer," + self.base_dn,
158                              scope=SCOPE_BASE, attrs=[])
159             self.fail()
160         except LdbError as e3:
161             (num, _) = e3.args
162             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
163         try:
164             res = self.ldb.search("cn=entry2,cn=ldaptestcontainer," + self.base_dn,
165                              scope=SCOPE_BASE, attrs=[])
166             self.fail()
167         except LdbError as e4:
168             (num, _) = e4.args
169             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
170
171         delete_force(self.ldb, "cn=entry1,cn=ldaptestcontainer," + self.base_dn)
172         delete_force(self.ldb, "cn=entry2,cn=ldaptestcontainer," + self.base_dn)
173         delete_force(self.ldb, "cn=ldaptestcontainer," + self.base_dn)
174
175         # Performs some protected object delete testing
176
177         res = self.ldb.search(base="", expression="", scope=SCOPE_BASE,
178                          attrs=["dsServiceName", "dNSHostName"])
179         self.assertEquals(len(res), 1)
180
181         # Delete failing since DC's nTDSDSA object is protected
182         try:
183             self.ldb.delete(res[0]["dsServiceName"][0])
184             self.fail()
185         except LdbError as e5:
186             (num, _) = e5.args
187             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
188
189         res = self.ldb.search(self.base_dn, attrs=["rIDSetReferences"],
190                          expression="(&(objectClass=computer)(dNSHostName=" + res[0]["dNSHostName"][0] + "))")
191         self.assertEquals(len(res), 1)
192
193         # Deletes failing since DC's rIDSet object is protected
194         try:
195             self.ldb.delete(res[0]["rIDSetReferences"][0])
196             self.fail()
197         except LdbError as e6:
198             (num, _) = e6.args
199             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
200         try:
201             self.ldb.delete(res[0]["rIDSetReferences"][0], ["tree_delete:1"])
202             self.fail()
203         except LdbError as e7:
204             (num, _) = e7.args
205             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
206
207         # Deletes failing since three main crossRef objects are protected
208
209         try:
210             self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn)
211             self.fail()
212         except LdbError as e8:
213             (num, _) = e8.args
214             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
215         try:
216             self.ldb.delete("cn=Enterprise Schema,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
217             self.fail()
218         except LdbError as e9:
219             (num, _) = e9.args
220             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
221
222         try:
223             self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn)
224             self.fail()
225         except LdbError as e10:
226             (num, _) = e10.args
227             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
228         try:
229             self.ldb.delete("cn=Enterprise Configuration,cn=Partitions," + self.configuration_dn, ["tree_delete:1"])
230             self.fail()
231         except LdbError as e11:
232             (num, _) = e11.args
233             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
234
235         res = self.ldb.search("cn=Partitions," + self.configuration_dn, attrs=[],
236                          expression="(nCName=%s)" % self.base_dn)
237         self.assertEquals(len(res), 1)
238
239         try:
240             self.ldb.delete(res[0].dn)
241             self.fail()
242         except LdbError as e12:
243             (num, _) = e12.args
244             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
245         try:
246             self.ldb.delete(res[0].dn, ["tree_delete:1"])
247             self.fail()
248         except LdbError as e13:
249             (num, _) = e13.args
250             self.assertEquals(num, ERR_NOT_ALLOWED_ON_NON_LEAF)
251
252         # Delete failing since "SYSTEM_FLAG_DISALLOW_DELETE"
253         try:
254             self.ldb.delete("CN=Users," + self.base_dn)
255             self.fail()
256         except LdbError as e14:
257             (num, _) = e14.args
258             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
259
260         # Tree-delete failing since "isCriticalSystemObject"
261         try:
262             self.ldb.delete("CN=Computers," + self.base_dn, ["tree_delete:1"])
263             self.fail()
264         except LdbError as e15:
265             (num, _) = e15.args
266             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
267
268     def test_all(self):
269         """Basic delete tests"""
270
271         print self.base_dn
272
273         # user current time in ms to make unique objects
274         import time
275         marker = str(int(round(time.time()*1000)))
276         usr1_name = "u_" + marker
277         usr2_name = "u2_" + marker
278         grp_name = "g1_" + marker
279         site_name = "s1_" + marker
280
281         usr1 = "cn=%s,cn=users,%s" % (usr1_name, self.base_dn)
282         usr2 = "cn=%s,cn=users,%s" % (usr2_name, self.base_dn)
283         grp1 = "cn=%s,cn=users,%s" % (grp_name, self.base_dn)
284         sit1 = "cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
285         ss1 = "cn=NTDS Site Settings,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
286         srv1 = "cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
287         srv2 = "cn=TESTSRV,cn=Servers,cn=%s,cn=sites,%s" % (site_name, self.configuration_dn)
288
289         delete_force(self.ldb, usr1)
290         delete_force(self.ldb, usr2)
291         delete_force(self.ldb, grp1)
292         delete_force(self.ldb, ss1)
293         delete_force(self.ldb, srv2)
294         delete_force(self.ldb, srv1)
295         delete_force(self.ldb, sit1)
296
297         self.ldb.add({
298             "dn": usr1,
299             "objectclass": "user",
300             "description": "test user description",
301             "samaccountname": usr1_name})
302
303         self.ldb.add({
304             "dn": usr2,
305             "objectclass": "user",
306             "description": "test user 2 description",
307             "samaccountname": usr2_name})
308
309         self.ldb.add({
310             "dn": grp1,
311             "objectclass": "group",
312             "description": "test group",
313             "samaccountname": grp_name,
314             "member": [ usr1, usr2 ],
315             "isDeleted": "FALSE" })
316
317         self.ldb.add({
318             "dn": sit1,
319             "objectclass": "site" })
320
321         self.ldb.add({
322             "dn": ss1,
323             "objectclass": ["applicationSiteSettings", "nTDSSiteSettings"] })
324
325         self.ldb.add({
326             "dn": srv1,
327             "objectclass": "serversContainer" })
328
329         self.ldb.add({
330             "dn": srv2,
331             "objectClass": "server" })
332
333         objLive1 = self.search_dn(usr1)
334         guid1=objLive1["objectGUID"][0]
335
336         objLive2 = self.search_dn(usr2)
337         guid2=objLive2["objectGUID"][0]
338
339         objLive3 = self.search_dn(grp1)
340         guid3=objLive3["objectGUID"][0]
341
342         objLive4 = self.search_dn(sit1)
343         guid4=objLive4["objectGUID"][0]
344
345         objLive5 = self.search_dn(ss1)
346         guid5=objLive5["objectGUID"][0]
347
348         objLive6 = self.search_dn(srv1)
349         guid6=objLive6["objectGUID"][0]
350
351         objLive7 = self.search_dn(srv2)
352         guid7=objLive7["objectGUID"][0]
353
354         self.ldb.delete(usr1)
355         self.ldb.delete(usr2)
356         self.ldb.delete(grp1)
357         self.ldb.delete(srv1, ["tree_delete:1"])
358         self.ldb.delete(sit1, ["tree_delete:1"])
359
360         objDeleted1 = self.search_guid(guid1)
361         objDeleted2 = self.search_guid(guid2)
362         objDeleted3 = self.search_guid(guid3)
363         objDeleted4 = self.search_guid(guid4)
364         objDeleted5 = self.search_guid(guid5)
365         objDeleted6 = self.search_guid(guid6)
366         objDeleted7 = self.search_guid(guid7)
367
368         self.del_attr_values(objDeleted1)
369         self.del_attr_values(objDeleted2)
370         self.del_attr_values(objDeleted3)
371         self.del_attr_values(objDeleted4)
372         self.del_attr_values(objDeleted5)
373         self.del_attr_values(objDeleted6)
374         self.del_attr_values(objDeleted7)
375
376         self.preserved_attributes_list(objLive1, objDeleted1)
377         self.preserved_attributes_list(objLive2, objDeleted2)
378         self.preserved_attributes_list(objLive3, objDeleted3)
379         self.preserved_attributes_list(objLive4, objDeleted4)
380         self.preserved_attributes_list(objLive5, objDeleted5)
381         self.preserved_attributes_list(objLive6, objDeleted6)
382         self.preserved_attributes_list(objLive7, objDeleted7)
383
384         self.check_rdn(objLive1, objDeleted1, "cn")
385         self.check_rdn(objLive2, objDeleted2, "cn")
386         self.check_rdn(objLive3, objDeleted3, "cn")
387         self.check_rdn(objLive4, objDeleted4, "cn")
388         self.check_rdn(objLive5, objDeleted5, "cn")
389         self.check_rdn(objLive6, objDeleted6, "cn")
390         self.check_rdn(objLive7, objDeleted7, "cn")
391
392         self.delete_deleted(self.ldb, usr1)
393         self.delete_deleted(self.ldb, usr2)
394         self.delete_deleted(self.ldb, grp1)
395         self.delete_deleted(self.ldb, sit1)
396         self.delete_deleted(self.ldb, ss1)
397         self.delete_deleted(self.ldb, srv1)
398         self.delete_deleted(self.ldb, srv2)
399
400         self.assertTrue("CN=Deleted Objects" in str(objDeleted1.dn))
401         self.assertTrue("CN=Deleted Objects" in str(objDeleted2.dn))
402         self.assertTrue("CN=Deleted Objects" in str(objDeleted3.dn))
403         self.assertFalse("CN=Deleted Objects" in str(objDeleted4.dn))
404         self.assertTrue("CN=Deleted Objects" in str(objDeleted5.dn))
405         self.assertFalse("CN=Deleted Objects" in str(objDeleted6.dn))
406         self.assertFalse("CN=Deleted Objects" in str(objDeleted7.dn))
407
408
409 if not "://" in host:
410     if os.path.isfile(host):
411         host = "tdb://%s" % host
412     else:
413         host = "ldap://%s" % host
414
415 TestProgram(module=__name__, opts=subunitopts)