3c35af63bbe77d15f90494e1550b15cd9d3e14bc
[samba.git] / source4 / dsdb / tests / python / urgent_replication.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import optparse
5 import sys
6 import os
7
8 sys.path.append("bin/python")
9 import samba
10 samba.ensure_external_module("subunit", "subunit/python")
11 samba.ensure_external_module("testtools", "testtools")
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17     MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
19 import samba.tests
20
21 from subunit.run import SubunitTestRunner
22 import unittest
23
24 parser = optparse.OptionParser("urgent_replication [options] <host>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 opts, args = parser.parse_args()
32
33 if len(args) < 1:
34     parser.print_usage()
35     sys.exit(1)
36
37 host = args[0]
38
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
41
42 class UrgentReplicationTests(samba.tests.TestCase):
43
44     def delete_force(self, ldb, dn):
45         try:
46             ldb.delete(dn, ["relax:0"])
47         except LdbError, (num, _):
48             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
49
50     def find_basedn(self, ldb):
51         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
52                          attrs=["defaultNamingContext"])
53         self.assertEquals(len(res), 1)
54         return res[0]["defaultNamingContext"][0]
55
56     def setUp(self):
57         super(UrgentReplicationTests, self).setUp()
58         self.ldb = ldb
59         self.base_dn = self.find_basedn(ldb)
60
61         print "baseDN: %s\n" % self.base_dn
62
63     def test_nonurgent_object(self):
64         """Test if the urgent replication is not activated
65            when handling a non urgent object"""
66         self.ldb.add({
67             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
68             "objectclass":"user",
69             "samaccountname":"nonurgenttest",
70             "description":"nonurgenttest description"});
71
72         # urgent replication should not be enabled when creating 
73         res = self.ldb.load_partition_usn(self.base_dn)
74         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
75
76         # urgent replication should not be enabled when modifying
77         m = Message()
78         m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
79         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
80           "description")
81         ldb.modify(m)
82         res = self.ldb.load_partition_usn(self.base_dn)
83         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
84
85         # urgent replication should not be enabled when deleting
86         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
87         res = self.ldb.load_partition_usn(self.base_dn)
88         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
89
90
91     def test_nTDSDSA_object(self):
92         '''Test if the urgent replication is activated
93            when handling a nTDSDSA object'''
94         self.ldb.add({
95             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
96             "objectclass":"server",
97             "cn":"test server",
98             "name":"test server",
99             "systemFlags":"50000000"});
100
101         self.ldb.add_ldif(
102             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
103 objectclass: nTDSDSA
104 cn: NTDS Settings test
105 options: 1
106 instanceType: 4
107 systemFlags: 33554432""", ["relax:0"]);
108
109         # urgent replication should be enabled when creation
110         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
111         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
112
113         # urgent replication should NOT be enabled when modifying
114         m = Message()
115         m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
116         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
117           "options")
118         ldb.modify(m)
119         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
121
122         # urgent replication should be enabled when deleting
123         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
124         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
125         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
126
127         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
128
129
130     def test_crossRef_object(self):
131         '''Test if the urgent replication is activated
132            when handling a crossRef object'''
133         self.ldb.add({
134                       "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
135                       "objectClass": "crossRef",
136                       "cn": "test crossRef",
137                       "dnsRoot": lp.get("realm").lower(),
138                       "instanceType": "4",
139                       "nCName": self.base_dn,
140                       "showInAdvancedViewOnly": "TRUE",
141                       "name": "test crossRef",
142                       "systemFlags": "1"});
143
144         # urgent replication should be enabled when creating
145         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
146         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
147
148         # urgent replication should NOT be enabled when modifying
149         m = Message()
150         m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
151         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
152           "systemFlags")
153         ldb.modify(m)
154         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
155         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
156
157
158         # urgent replication should be enabled when deleting
159         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
160         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
161         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
162
163
164
165     def test_attributeSchema_object(self):
166         '''Test if the urgent replication is activated
167            when handling an attributeSchema object'''
168
169         try:
170             self.ldb.add_ldif(
171                               """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
172 objectClass: attributeSchema
173 cn: test attributeSchema
174 instanceType: 4
175 isSingleValued: FALSE
176 showInAdvancedViewOnly: FALSE
177 attributeID: 0.9.2342.19200300.100.1.1
178 attributeSyntax: 2.5.5.12
179 adminDisplayName: test attributeSchema
180 adminDescription: test attributeSchema
181 oMSyntax: 64
182 systemOnly: FALSE
183 searchFlags: 8
184 lDAPDisplayName: test attributeSchema
185 name: test attributeSchema
186 systemFlags: 0""");
187
188             # urgent replication should be enabled when creating
189             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
190             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
191
192         except LdbError:
193             print "Not testing urgent replication when creating attributeSchema object ...\n"
194
195         # urgent replication should be enabled when modifying 
196         m = Message()
197         m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
198         m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
199           "lDAPDisplayName")
200         ldb.modify(m)
201         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
202         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
203
204
205     def test_classSchema_object(self):
206         '''Test if the urgent replication is activated
207            when handling a classSchema object'''
208         try:
209             self.ldb.add_ldif(
210                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
211 objectClass: classSchema
212 cn: test classSchema
213 instanceType: 4
214 subClassOf: top
215 governsID: 1.2.840.113556.1.5.999
216 rDNAttID: cn
217 showInAdvancedViewOnly: TRUE
218 adminDisplayName: test classSchema
219 adminDescription: test classSchema
220 objectClassCategory: 1
221 lDAPDisplayName: test classSchema
222 name: test classSchema
223 systemOnly: FALSE
224 systemPossSuperiors: dfsConfiguration
225 systemMustContain: msDFS-SchemaMajorVersion
226 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
227  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
228 systemFlags: 16
229 defaultHidingValue: TRUE""");
230
231             # urgent replication should be enabled when creating
232             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
233             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
234
235         except LdbError:
236             print "Not testing urgent replication when creating classSchema object ...\n"
237
238         # urgent replication should be enabled when modifying 
239         m = Message()
240         m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
241         m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
242           "lDAPDisplayName")
243         ldb.modify(m)
244         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
245         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
246
247
248     def test_secret_object(self):
249         '''Test if the urgent replication is activated
250            when handling a secret object'''
251
252         self.ldb.add({
253             "dn": "cn=test secret,cn=System," + self.base_dn,
254             "objectClass":"secret",
255             "cn":"test secret",
256             "name":"test secret",
257             "currentValue":"xxxxxxx"}, ["relax:0"]);
258
259         # urgent replication should be enabled when creating
260         res = self.ldb.load_partition_usn(self.base_dn)
261         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
262
263         # urgent replication should be enabled when modifying
264         m = Message()
265         m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
266         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
267           "currentValue")
268         ldb.modify(m)
269         res = self.ldb.load_partition_usn(self.base_dn)
270         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
271
272         # urgent replication should NOT be enabled when deleting 
273         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
274         res = self.ldb.load_partition_usn(self.base_dn)
275         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
276
277
278     def test_rIDManager_object(self):
279         '''Test if the urgent replication is activated
280             when handling a rIDManager object'''
281         self.ldb.add_ldif(
282             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
283 objectClass: rIDManager
284 cn: RID Manager test
285 instanceType: 4
286 showInAdvancedViewOnly: TRUE
287 name: RID Manager test
288 systemFlags: -1946157056
289 isCriticalSystemObject: TRUE
290 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
291
292         # urgent replication should be enabled when creating
293         res = self.ldb.load_partition_usn(self.base_dn)
294         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
295
296         # urgent replication should be enabled when modifying
297         m = Message()
298         m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
299         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
300           "systemFlags")
301         ldb.modify(m)
302         res = self.ldb.load_partition_usn(self.base_dn)
303         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
304
305         # urgent replication should NOT be enabled when deleting 
306         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
307         res = self.ldb.load_partition_usn(self.base_dn)
308         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
309
310
311     def test_urgent_attributes(self):
312         '''Test if the urgent replication is activated
313             when handling urgent attributes of an object'''
314
315         self.ldb.add({
316             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
317             "objectclass":"user",
318             "samaccountname":"user UrgAttr test",
319             "userAccountControl":"1",
320             "lockoutTime":"0",
321             "pwdLastSet":"0",
322             "description":"urgent attributes test description"});
323
324         # urgent replication should NOT be enabled when creating
325         res = self.ldb.load_partition_usn(self.base_dn)
326         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
327
328         # urgent replication should be enabled when modifying userAccountControl 
329         m = Message()
330         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
331         m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
332           "userAccountControl")
333         ldb.modify(m)
334         res = self.ldb.load_partition_usn(self.base_dn)
335         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
336
337         # urgent replication should be enabled when modifying lockoutTime
338         m = Message()
339         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
340         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
341           "lockoutTime")
342         ldb.modify(m)
343         res = self.ldb.load_partition_usn(self.base_dn)
344         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
345
346         # urgent replication should be enabled when modifying pwdLastSet
347         m = Message()
348         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
349         m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
350           "pwdLastSet")
351         ldb.modify(m)
352         res = self.ldb.load_partition_usn(self.base_dn)
353         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
354
355         # urgent replication should NOT be enabled when modifying a not-urgent
356         # attribute
357         m = Message()
358         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
359         m["description"] = MessageElement("updated urgent attributes test description",
360                                           FLAG_MOD_REPLACE, "description")
361         ldb.modify(m)
362         res = self.ldb.load_partition_usn(self.base_dn)
363         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
364
365         # urgent replication should NOT be enabled when deleting
366         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
367         res = self.ldb.load_partition_usn(self.base_dn)
368         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
369
370
371 if not "://" in host:
372     if os.path.isfile(host):
373         host = "tdb://%s" % host
374     else:
375         host = "ldap://%s" % host
376
377
378 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
379             global_schema=False)
380
381 runner = SubunitTestRunner()
382 rc = 0
383 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
384     rc = 1
385 sys.exit(rc)