2 # -*- coding: utf-8 -*-
8 sys.path.append("bin/python")
10 samba.ensure_external_module("subunit", "subunit/python")
11 samba.ensure_external_module("testtools", "testtools")
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17 MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
21 from subunit.run import SubunitTestRunner
24 parser = optparse.OptionParser("urgent_replication [options] <host>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 opts, args = parser.parse_args()
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
42 class UrgentReplicationTests(samba.tests.TestCase):
44 def delete_force(self, ldb, dn):
46 ldb.delete(dn, ["relax:0"])
47 except LdbError, (num, _):
48 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
50 def find_basedn(self, ldb):
51 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
52 attrs=["defaultNamingContext"])
53 self.assertEquals(len(res), 1)
54 return res[0]["defaultNamingContext"][0]
57 super(UrgentReplicationTests, self).setUp()
59 self.base_dn = self.find_basedn(ldb)
61 print "baseDN: %s\n" % self.base_dn
63 def test_nonurgent_object(self):
64 """Test if the urgent replication is not activated
65 when handling a non urgent object"""
67 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
69 "samaccountname":"nonurgenttest",
70 "description":"nonurgenttest description"});
72 # urgent replication should not be enabled when creating
73 res = self.ldb.load_partition_usn(self.base_dn)
74 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
76 # urgent replication should not be enabled when modifying
78 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
79 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
82 res = self.ldb.load_partition_usn(self.base_dn)
83 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
85 # urgent replication should not be enabled when deleting
86 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
87 res = self.ldb.load_partition_usn(self.base_dn)
88 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
91 def test_nTDSDSA_object(self):
92 '''Test if the urgent replication is activated
93 when handling a nTDSDSA object'''
95 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
96 "objectclass":"server",
99 "systemFlags":"50000000", ["relax:0"]});
102 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
104 cn: NTDS Settings test
107 systemFlags: 33554432""", ["relax:0"]);
109 # urgent replication should be enabled when creation
110 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
111 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
113 # urgent replication should NOT be enabled when modifying
115 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
116 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
119 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
122 # urgent replication should be enabled when deleting
123 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
124 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
125 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
127 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
130 def test_crossRef_object(self):
131 '''Test if the urgent replication is activated
132 when handling a crossRef object'''
134 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
135 "objectClass": "crossRef",
136 "cn": "test crossRef",
137 "dnsRoot": lp.get("realm").lower(),
139 "nCName": self.base_dn,
140 "showInAdvancedViewOnly": "TRUE",
141 "name": "test crossRef",
142 "systemFlags": "1", ["relax:0"]});
144 # urgent replication should be enabled when creating
145 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
146 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
148 # urgent replication should NOT be enabled when modifying
150 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
151 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
154 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
155 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
158 # urgent replication should be enabled when deleting
159 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
160 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
161 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
165 def test_attributeSchema_object(self):
166 '''Test if the urgent replication is activated
167 when handling an attributeSchema object'''
171 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
172 objectClass: attributeSchema
173 cn: test attributeSchema
175 isSingleValued: FALSE
176 showInAdvancedViewOnly: FALSE
177 attributeID: 0.9.2342.19200300.100.1.1
178 attributeSyntax: 2.5.5.12
179 adminDisplayName: test attributeSchema
180 adminDescription: test attributeSchema
184 lDAPDisplayName: test attributeSchema
185 name: test attributeSchema""");
187 # urgent replication should be enabled when creating
188 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
189 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
192 print "Not testing urgent replication when creating attributeSchema object ...\n"
194 # urgent replication should be enabled when modifying
196 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
197 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
200 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
201 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
204 def test_classSchema_object(self):
205 '''Test if the urgent replication is activated
206 when handling a classSchema object'''
209 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
210 objectClass: classSchema
214 governsID: 1.2.840.113556.1.5.999
216 showInAdvancedViewOnly: TRUE
217 adminDisplayName: test classSchema
218 adminDescription: test classSchema
219 objectClassCategory: 1
220 lDAPDisplayName: test classSchema
221 name: test classSchema
223 systemPossSuperiors: dfsConfiguration
224 systemMustContain: msDFS-SchemaMajorVersion
225 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
226 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
228 defaultHidingValue: TRUE""");
230 # urgent replication should be enabled when creating
231 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
232 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
235 print "Not testing urgent replication when creating classSchema object ...\n"
237 # urgent replication should be enabled when modifying
239 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
240 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
243 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
244 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
247 def test_secret_object(self):
248 '''Test if the urgent replication is activated
249 when handling a secret object'''
252 "dn": "cn=test secret,cn=System," + self.base_dn,
253 "objectClass":"secret",
255 "name":"test secret",
256 "currentValue":"xxxxxxx"}, ["relax:0"]);
258 # urgent replication should be enabled when creating
259 res = self.ldb.load_partition_usn(self.base_dn)
260 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
262 # urgent replication should be enabled when modifying
264 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
265 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
268 res = self.ldb.load_partition_usn(self.base_dn)
269 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
271 # urgent replication should NOT be enabled when deleting
272 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
273 res = self.ldb.load_partition_usn(self.base_dn)
274 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
277 def test_rIDManager_object(self):
278 '''Test if the urgent replication is activated
279 when handling a rIDManager object'''
281 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
282 objectClass: rIDManager
285 showInAdvancedViewOnly: TRUE
286 name: RID Manager test
287 systemFlags: -1946157056
288 isCriticalSystemObject: TRUE
289 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
291 # urgent replication should be enabled when creating
292 res = self.ldb.load_partition_usn(self.base_dn)
293 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
295 # urgent replication should be enabled when modifying
297 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
298 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
301 res = self.ldb.load_partition_usn(self.base_dn)
302 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
304 # urgent replication should NOT be enabled when deleting
305 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
306 res = self.ldb.load_partition_usn(self.base_dn)
307 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
310 def test_urgent_attributes(self):
311 '''Test if the urgent replication is activated
312 when handling urgent attributes of an object'''
315 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
316 "objectclass":"user",
317 "samaccountname":"user UrgAttr test",
318 "userAccountControl":"1",
321 "description":"urgent attributes test description"});
323 # urgent replication should NOT be enabled when creating
324 res = self.ldb.load_partition_usn(self.base_dn)
325 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
327 # urgent replication should be enabled when modifying userAccountControl
329 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
330 m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
331 "userAccountControl")
333 res = self.ldb.load_partition_usn(self.base_dn)
334 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
336 # urgent replication should be enabled when modifying lockoutTime
338 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
339 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
342 res = self.ldb.load_partition_usn(self.base_dn)
343 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
345 # urgent replication should be enabled when modifying pwdLastSet
347 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
348 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
351 res = self.ldb.load_partition_usn(self.base_dn)
352 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
354 # urgent replication should NOT be enabled when modifying a not-urgent
357 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
358 m["description"] = MessageElement("updated urgent attributes test description",
359 FLAG_MOD_REPLACE, "description")
361 res = self.ldb.load_partition_usn(self.base_dn)
362 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
364 # urgent replication should NOT be enabled when deleting
365 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
366 res = self.ldb.load_partition_usn(self.base_dn)
367 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
370 if not "://" in host:
371 if os.path.isfile(host):
372 host = "tdb://%s" % host
374 host = "ldap://%s" % host
377 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
380 runner = SubunitTestRunner()
382 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():