s4:urgent_replication.py - fix up the system flags handling
[samba.git] / source4 / dsdb / tests / python / urgent_replication.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import optparse
5 import sys
6 import os
7
8 sys.path.append("bin/python")
9 import samba
10 samba.ensure_external_module("subunit", "subunit/python")
11 samba.ensure_external_module("testtools", "testtools")
12
13 import samba.getopt as options
14
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17     MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
19 import samba.tests
20
21 from subunit.run import SubunitTestRunner
22 import unittest
23
24 parser = optparse.OptionParser("urgent_replication [options] <host>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 opts, args = parser.parse_args()
32
33 if len(args) < 1:
34     parser.print_usage()
35     sys.exit(1)
36
37 host = args[0]
38
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
41
42 class UrgentReplicationTests(samba.tests.TestCase):
43
44     def delete_force(self, ldb, dn):
45         try:
46             ldb.delete(dn, ["relax:0"])
47         except LdbError, (num, _):
48             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
49
50     def find_basedn(self, ldb):
51         res = ldb.search(base="", expression="", scope=SCOPE_BASE,
52                          attrs=["defaultNamingContext"])
53         self.assertEquals(len(res), 1)
54         return res[0]["defaultNamingContext"][0]
55
56     def setUp(self):
57         super(UrgentReplicationTests, self).setUp()
58         self.ldb = ldb
59         self.base_dn = self.find_basedn(ldb)
60
61         print "baseDN: %s\n" % self.base_dn
62
63     def test_nonurgent_object(self):
64         """Test if the urgent replication is not activated
65            when handling a non urgent object"""
66         self.ldb.add({
67             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
68             "objectclass":"user",
69             "samaccountname":"nonurgenttest",
70             "description":"nonurgenttest description"});
71
72         # urgent replication should not be enabled when creating 
73         res = self.ldb.load_partition_usn(self.base_dn)
74         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
75
76         # urgent replication should not be enabled when modifying
77         m = Message()
78         m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
79         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
80           "description")
81         ldb.modify(m)
82         res = self.ldb.load_partition_usn(self.base_dn)
83         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
84
85         # urgent replication should not be enabled when deleting
86         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
87         res = self.ldb.load_partition_usn(self.base_dn)
88         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
89
90
91     def test_nTDSDSA_object(self):
92         '''Test if the urgent replication is activated
93            when handling a nTDSDSA object'''
94         self.ldb.add({
95             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
96             "objectclass":"server",
97             "cn":"test server",
98             "name":"test server",
99             "systemFlags":"50000000", ["relax:0"]});
100
101         self.ldb.add_ldif(
102             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
103 objectclass: nTDSDSA
104 cn: NTDS Settings test
105 options: 1
106 instanceType: 4
107 systemFlags: 33554432""", ["relax:0"]);
108
109         # urgent replication should be enabled when creation
110         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
111         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
112
113         # urgent replication should NOT be enabled when modifying
114         m = Message()
115         m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
116         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
117           "options")
118         ldb.modify(m)
119         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
121
122         # urgent replication should be enabled when deleting
123         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
124         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
125         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
126
127         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
128
129
130     def test_crossRef_object(self):
131         '''Test if the urgent replication is activated
132            when handling a crossRef object'''
133         self.ldb.add({
134                       "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
135                       "objectClass": "crossRef",
136                       "cn": "test crossRef",
137                       "dnsRoot": lp.get("realm").lower(),
138                       "instanceType": "4",
139                       "nCName": self.base_dn,
140                       "showInAdvancedViewOnly": "TRUE",
141                       "name": "test crossRef",
142                       "systemFlags": "1", ["relax:0"]});
143
144         # urgent replication should be enabled when creating
145         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
146         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
147
148         # urgent replication should NOT be enabled when modifying
149         m = Message()
150         m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
151         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
152           "systemFlags")
153         ldb.modify(m)
154         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
155         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
156
157
158         # urgent replication should be enabled when deleting
159         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
160         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
161         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
162
163
164
165     def test_attributeSchema_object(self):
166         '''Test if the urgent replication is activated
167            when handling an attributeSchema object'''
168
169         try:
170             self.ldb.add_ldif(
171                               """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
172 objectClass: attributeSchema
173 cn: test attributeSchema
174 instanceType: 4
175 isSingleValued: FALSE
176 showInAdvancedViewOnly: FALSE
177 attributeID: 0.9.2342.19200300.100.1.1
178 attributeSyntax: 2.5.5.12
179 adminDisplayName: test attributeSchema
180 adminDescription: test attributeSchema
181 oMSyntax: 64
182 systemOnly: FALSE
183 searchFlags: 8
184 lDAPDisplayName: test attributeSchema
185 name: test attributeSchema""");
186
187             # urgent replication should be enabled when creating
188             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
189             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
190
191         except LdbError:
192             print "Not testing urgent replication when creating attributeSchema object ...\n"
193
194         # urgent replication should be enabled when modifying 
195         m = Message()
196         m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
197         m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
198           "lDAPDisplayName")
199         ldb.modify(m)
200         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
201         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
202
203
204     def test_classSchema_object(self):
205         '''Test if the urgent replication is activated
206            when handling a classSchema object'''
207         try:
208             self.ldb.add_ldif(
209                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
210 objectClass: classSchema
211 cn: test classSchema
212 instanceType: 4
213 subClassOf: top
214 governsID: 1.2.840.113556.1.5.999
215 rDNAttID: cn
216 showInAdvancedViewOnly: TRUE
217 adminDisplayName: test classSchema
218 adminDescription: test classSchema
219 objectClassCategory: 1
220 lDAPDisplayName: test classSchema
221 name: test classSchema
222 systemOnly: FALSE
223 systemPossSuperiors: dfsConfiguration
224 systemMustContain: msDFS-SchemaMajorVersion
225 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
226  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
227 systemFlags: 16
228 defaultHidingValue: TRUE""");
229
230             # urgent replication should be enabled when creating
231             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
232             self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
233
234         except LdbError:
235             print "Not testing urgent replication when creating classSchema object ...\n"
236
237         # urgent replication should be enabled when modifying 
238         m = Message()
239         m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
240         m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
241           "lDAPDisplayName")
242         ldb.modify(m)
243         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
244         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
245
246
247     def test_secret_object(self):
248         '''Test if the urgent replication is activated
249            when handling a secret object'''
250
251         self.ldb.add({
252             "dn": "cn=test secret,cn=System," + self.base_dn,
253             "objectClass":"secret",
254             "cn":"test secret",
255             "name":"test secret",
256             "currentValue":"xxxxxxx"}, ["relax:0"]);
257
258         # urgent replication should be enabled when creating
259         res = self.ldb.load_partition_usn(self.base_dn)
260         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
261
262         # urgent replication should be enabled when modifying
263         m = Message()
264         m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
265         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
266           "currentValue")
267         ldb.modify(m)
268         res = self.ldb.load_partition_usn(self.base_dn)
269         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
270
271         # urgent replication should NOT be enabled when deleting 
272         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
273         res = self.ldb.load_partition_usn(self.base_dn)
274         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
275
276
277     def test_rIDManager_object(self):
278         '''Test if the urgent replication is activated
279             when handling a rIDManager object'''
280         self.ldb.add_ldif(
281             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
282 objectClass: rIDManager
283 cn: RID Manager test
284 instanceType: 4
285 showInAdvancedViewOnly: TRUE
286 name: RID Manager test
287 systemFlags: -1946157056
288 isCriticalSystemObject: TRUE
289 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
290
291         # urgent replication should be enabled when creating
292         res = self.ldb.load_partition_usn(self.base_dn)
293         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
294
295         # urgent replication should be enabled when modifying
296         m = Message()
297         m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
298         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
299           "systemFlags")
300         ldb.modify(m)
301         res = self.ldb.load_partition_usn(self.base_dn)
302         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
303
304         # urgent replication should NOT be enabled when deleting 
305         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
306         res = self.ldb.load_partition_usn(self.base_dn)
307         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
308
309
310     def test_urgent_attributes(self):
311         '''Test if the urgent replication is activated
312             when handling urgent attributes of an object'''
313
314         self.ldb.add({
315             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
316             "objectclass":"user",
317             "samaccountname":"user UrgAttr test",
318             "userAccountControl":"1",
319             "lockoutTime":"0",
320             "pwdLastSet":"0",
321             "description":"urgent attributes test description"});
322
323         # urgent replication should NOT be enabled when creating
324         res = self.ldb.load_partition_usn(self.base_dn)
325         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
326
327         # urgent replication should be enabled when modifying userAccountControl 
328         m = Message()
329         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
330         m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
331           "userAccountControl")
332         ldb.modify(m)
333         res = self.ldb.load_partition_usn(self.base_dn)
334         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
335
336         # urgent replication should be enabled when modifying lockoutTime
337         m = Message()
338         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
339         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
340           "lockoutTime")
341         ldb.modify(m)
342         res = self.ldb.load_partition_usn(self.base_dn)
343         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
344
345         # urgent replication should be enabled when modifying pwdLastSet
346         m = Message()
347         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
348         m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
349           "pwdLastSet")
350         ldb.modify(m)
351         res = self.ldb.load_partition_usn(self.base_dn)
352         self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
353
354         # urgent replication should NOT be enabled when modifying a not-urgent
355         # attribute
356         m = Message()
357         m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
358         m["description"] = MessageElement("updated urgent attributes test description",
359                                           FLAG_MOD_REPLACE, "description")
360         ldb.modify(m)
361         res = self.ldb.load_partition_usn(self.base_dn)
362         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
363
364         # urgent replication should NOT be enabled when deleting
365         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366         res = self.ldb.load_partition_usn(self.base_dn)
367         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
368
369
370 if not "://" in host:
371     if os.path.isfile(host):
372         host = "tdb://%s" % host
373     else:
374         host = "ldap://%s" % host
375
376
377 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
378             global_schema=False)
379
380 runner = SubunitTestRunner()
381 rc = 0
382 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
383     rc = 1
384 sys.exit(rc)