81f8b1cbb57b105b52471ed03bfe420f3d8e8d15
[metze/samba/wip.git] / source4 / dsdb / tests / python / urgent_replication.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 import optparse
5 import sys
6 sys.path.insert(0, "bin/python")
7 import samba
8 from samba.tests.subunitrun import TestProgram, SubunitOptions
9
10 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
11     MessageElement, Dn, FLAG_MOD_REPLACE)
12 import samba.tests
13 import samba.dsdb as dsdb
14 import samba.getopt as options
15 import random
16
17 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
18 sambaopts = options.SambaOptions(parser)
19 parser.add_option_group(sambaopts)
20 parser.add_option_group(options.VersionOptions(parser))
21
22 # use command line creds if available
23 credopts = options.CredentialsOptions(parser)
24 parser.add_option_group(credopts)
25 subunitopts = SubunitOptions(parser)
26 parser.add_option_group(subunitopts)
27 opts, args = parser.parse_args()
28
29 if len(args) < 1:
30     parser.print_usage()
31     sys.exit(1)
32
33 host = args[0]
34
35
36 class UrgentReplicationTests(samba.tests.TestCase):
37
38     def delete_force(self, ldb, dn):
39         try:
40             ldb.delete(dn, ["relax:0"])
41         except LdbError, (num, _):
42             self.assertEquals(num, ERR_NO_SUCH_OBJECT)
43
44     def setUp(self):
45         super(UrgentReplicationTests, self).setUp()
46         self.ldb = samba.tests.connect_samdb(host, global_schema=False)
47         self.base_dn = self.ldb.domain_dn()
48
49         print "baseDN: %s\n" % self.base_dn
50
51     def test_nonurgent_object(self):
52         """Test if the urgent replication is not activated when handling a non urgent object."""
53         self.ldb.add({
54             "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
55             "objectclass":"user",
56             "samaccountname":"nonurgenttest",
57             "description":"nonurgenttest description"})
58
59         # urgent replication should not be enabled when creating
60         res = self.ldb.load_partition_usn(self.base_dn)
61         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
62
63         # urgent replication should not be enabled when modifying
64         m = Message()
65         m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
66         m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
67           "description")
68         self.ldb.modify(m)
69         res = self.ldb.load_partition_usn(self.base_dn)
70         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
71
72         # urgent replication should not be enabled when deleting
73         self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
74         res = self.ldb.load_partition_usn(self.base_dn)
75         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
76
77     def test_nTDSDSA_object(self):
78         """Test if the urgent replication is activated when handling a nTDSDSA object."""
79         self.ldb.add({
80             "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
81                 self.ldb.get_config_basedn(),
82             "objectclass":"server",
83             "cn":"test server",
84             "name":"test server",
85             "systemFlags":"50000000"}, ["relax:0"])
86
87         self.ldb.add_ldif(
88             """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
89 objectclass: nTDSDSA
90 cn: NTDS Settings test
91 options: 1
92 instanceType: 4
93 systemFlags: 33554432""", ["relax:0"])
94
95         # urgent replication should be enabled when creation
96         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
97         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
98
99         # urgent replication should NOT be enabled when modifying
100         m = Message()
101         m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
102         m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
103           "options")
104         self.ldb.modify(m)
105         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
106         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
107
108         # urgent replication should be enabled when deleting
109         self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
110         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
111         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
112
113         self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
114
115     def test_crossRef_object(self):
116         """Test if the urgent replication is activated when handling a crossRef object."""
117         self.ldb.add({
118                       "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
119                       "objectClass": "crossRef",
120                       "cn": "test crossRef",
121                       "dnsRoot": self.get_loadparm().get("realm").lower(),
122                       "instanceType": "4",
123                       "nCName": self.base_dn,
124                       "showInAdvancedViewOnly": "TRUE",
125                       "name": "test crossRef",
126                       "systemFlags": "1"}, ["relax:0"])
127
128         # urgent replication should be enabled when creating
129         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
130         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
131
132         # urgent replication should NOT be enabled when modifying
133         m = Message()
134         m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
135         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
136           "systemFlags")
137         self.ldb.modify(m)
138         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
139         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
140
141
142         # urgent replication should be enabled when deleting
143         self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
144         res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
145         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
146
147     def test_attributeSchema_object(self):
148         """Test if the urgent replication is activated when handling an attributeSchema object"""
149
150         self.ldb.add_ldif(
151             """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
152 objectClass: attributeSchema
153 cn: test attributeSchema
154 instanceType: 4
155 isSingleValued: FALSE
156 showInAdvancedViewOnly: FALSE
157 attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1,100000)) + """
158 attributeSyntax: 2.5.5.12
159 adminDisplayName: test attributeSchema
160 adminDescription: test attributeSchema
161 oMSyntax: 64
162 systemOnly: FALSE
163 searchFlags: 8
164 lDAPDisplayName: testAttributeSchema
165 name: test attributeSchema""")
166
167         # urgent replication should be enabled when creating
168         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
169         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
170
171         # urgent replication should be enabled when modifying
172         m = Message()
173         m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
174         m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
175           "lDAPDisplayName")
176         self.ldb.modify(m)
177         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
178         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
179
180     def test_classSchema_object(self):
181         """Test if the urgent replication is activated when handling a classSchema object."""
182         try:
183             self.ldb.add_ldif(
184                             """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
185 objectClass: classSchema
186 cn: test classSchema
187 instanceType: 4
188 subClassOf: top
189 governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1,100000)) + """
190 rDNAttID: cn
191 showInAdvancedViewOnly: TRUE
192 adminDisplayName: test classSchema
193 adminDescription: test classSchema
194 objectClassCategory: 1
195 lDAPDisplayName: testClassSchema
196 name: test classSchema
197 systemOnly: FALSE
198 systemPossSuperiors: dfsConfiguration
199 systemMustContain: msDFS-SchemaMajorVersion
200 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
201  CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
202 systemFlags: 16
203 defaultHidingValue: TRUE""")
204
205             # urgent replication should be enabled when creating
206             res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
207             self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
208
209         except LdbError:
210             print "Not testing urgent replication when creating classSchema object ...\n"
211
212         # urgent replication should be enabled when modifying 
213         m = Message()
214         m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
215         m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
216           "lDAPDisplayName")
217         self.ldb.modify(m)
218         res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
219         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
220
221     def test_secret_object(self):
222         """Test if the urgent replication is activated when handling a secret object."""
223
224         self.ldb.add({
225             "dn": "cn=test secret,cn=System," + self.base_dn,
226             "objectClass":"secret",
227             "cn":"test secret",
228             "name":"test secret",
229             "currentValue":"xxxxxxx"})
230
231         # urgent replication should be enabled when creating
232         res = self.ldb.load_partition_usn(self.base_dn)
233         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
234
235         # urgent replication should be enabled when modifying
236         m = Message()
237         m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
238         m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
239           "currentValue")
240         self.ldb.modify(m)
241         res = self.ldb.load_partition_usn(self.base_dn)
242         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
243
244         # urgent replication should NOT be enabled when deleting
245         self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
246         res = self.ldb.load_partition_usn(self.base_dn)
247         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
248
249     def test_rIDManager_object(self):
250         """Test if the urgent replication is activated when handling a rIDManager object."""
251         self.ldb.add_ldif(
252             """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
253 objectClass: rIDManager
254 cn: RID Manager test
255 instanceType: 4
256 showInAdvancedViewOnly: TRUE
257 name: RID Manager test
258 systemFlags: -1946157056
259 isCriticalSystemObject: TRUE
260 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
261
262         # urgent replication should be enabled when creating
263         res = self.ldb.load_partition_usn(self.base_dn)
264         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
265
266         # urgent replication should be enabled when modifying
267         m = Message()
268         m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
269         m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
270           "systemFlags")
271         self.ldb.modify(m)
272         res = self.ldb.load_partition_usn(self.base_dn)
273         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
274
275         # urgent replication should NOT be enabled when deleting 
276         self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
277         res = self.ldb.load_partition_usn(self.base_dn)
278         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
279
280     def test_urgent_attributes(self):
281         """Test if the urgent replication is activated when handling urgent attributes of an object."""
282
283         self.ldb.add({
284             "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
285             "objectclass":"user",
286             "samaccountname":"user UrgAttr test",
287             "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
288             "lockoutTime":"0",
289             "pwdLastSet":"0",
290             "description":"urgent attributes test description"})
291
292         # urgent replication should NOT be enabled when creating
293         res = self.ldb.load_partition_usn(self.base_dn)
294         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
295
296         # urgent replication should be enabled when modifying userAccountControl
297         m = Message()
298         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
299         m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
300           "userAccountControl")
301         self.ldb.modify(m)
302         res = self.ldb.load_partition_usn(self.base_dn)
303         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
304
305         # urgent replication should be enabled when modifying lockoutTime
306         m = Message()
307         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
308         m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
309           "lockoutTime")
310         self.ldb.modify(m)
311         res = self.ldb.load_partition_usn(self.base_dn)
312         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
313
314         # urgent replication should be enabled when modifying pwdLastSet
315         m = Message()
316         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
317         m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
318           "pwdLastSet")
319         self.ldb.modify(m)
320         res = self.ldb.load_partition_usn(self.base_dn)
321         self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
322
323         # urgent replication should NOT be enabled when modifying a not-urgent
324         # attribute
325         m = Message()
326         m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
327         m["description"] = MessageElement("updated urgent attributes test description",
328                                           FLAG_MOD_REPLACE, "description")
329         self.ldb.modify(m)
330         res = self.ldb.load_partition_usn(self.base_dn)
331         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
332
333         # urgent replication should NOT be enabled when deleting
334         self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
335         res = self.ldb.load_partition_usn(self.base_dn)
336         self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
337
338
339 TestProgram(module=__name__, opts=subunitopts)