2 # -*- coding: utf-8 -*-
8 sys.path.append("bin/python")
10 samba.ensure_external_module("subunit", "subunit/python")
11 samba.ensure_external_module("testtools", "testtools")
13 import samba.getopt as options
15 from samba.auth import system_session
16 from ldb import (SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT, Message,
17 MessageElement, Dn, FLAG_MOD_REPLACE)
18 from samba.samdb import SamDB
21 from subunit.run import SubunitTestRunner
24 parser = optparse.OptionParser("urgent_replication [options] <host>")
25 sambaopts = options.SambaOptions(parser)
26 parser.add_option_group(sambaopts)
27 parser.add_option_group(options.VersionOptions(parser))
28 # use command line creds if available
29 credopts = options.CredentialsOptions(parser)
30 parser.add_option_group(credopts)
31 opts, args = parser.parse_args()
39 lp = sambaopts.get_loadparm()
40 creds = credopts.get_credentials(lp)
42 class UrgentReplicationTests(samba.tests.TestCase):
44 def delete_force(self, ldb, dn):
46 ldb.delete(dn, ["relax:0"])
47 except LdbError, (num, _):
48 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
50 def find_basedn(self, ldb):
51 res = ldb.search(base="", expression="", scope=SCOPE_BASE,
52 attrs=["defaultNamingContext"])
53 self.assertEquals(len(res), 1)
54 return res[0]["defaultNamingContext"][0]
57 super(UrgentReplicationTests, self).setUp()
59 self.base_dn = self.find_basedn(ldb)
61 print "baseDN: %s\n" % self.base_dn
63 def test_nonurgent_object(self):
64 """Test if the urgent replication is not activated
65 when handling a non urgent object"""
67 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
69 "samaccountname":"nonurgenttest",
70 "description":"nonurgenttest description"});
72 # urgent replication should not be enabled when creating
73 res = self.ldb.load_partition_usn(self.base_dn)
74 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
76 # urgent replication should not be enabled when modifying
78 m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
79 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
82 res = self.ldb.load_partition_usn(self.base_dn)
83 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
85 # urgent replication should not be enabled when deleting
86 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
87 res = self.ldb.load_partition_usn(self.base_dn)
88 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
91 def test_nTDSDSA_object(self):
92 '''Test if the urgent replication is activated
93 when handling a nTDSDSA object'''
95 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
96 "objectclass":"server",
99 "systemFlags":"50000000"});
102 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
104 cn: NTDS Settings test
107 systemFlags: 33554432""", ["relax:0"]);
109 # urgent replication should be enabled when creation
110 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
111 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
113 # urgent replication should NOT be enabled when modifying
115 m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
116 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
119 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
120 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
122 # urgent replication should be enabled when deleting
123 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
124 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
125 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
127 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
130 def test_crossRef_object(self):
131 '''Test if the urgent replication is activated
132 when handling a crossRef object'''
134 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
135 "objectClass": "crossRef",
136 "cn": "test crossRef",
137 "dnsRoot": lp.get("realm").lower(),
139 "nCName": self.base_dn,
140 "showInAdvancedViewOnly": "TRUE",
141 "name": "test crossRef",
142 "systemFlags": "1"});
144 # urgent replication should be enabled when creating
145 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
146 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
148 # urgent replication should NOT be enabled when modifying
150 m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
151 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
154 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
155 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
158 # urgent replication should be enabled when deleting
159 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
160 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
161 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
165 def test_attributeSchema_object(self):
166 '''Test if the urgent replication is activated
167 when handling an attributeSchema object'''
171 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
172 objectClass: attributeSchema
173 cn: test attributeSchema
175 isSingleValued: FALSE
176 showInAdvancedViewOnly: FALSE
177 attributeID: 0.9.2342.19200300.100.1.1
178 attributeSyntax: 2.5.5.12
179 adminDisplayName: test attributeSchema
180 adminDescription: test attributeSchema
184 lDAPDisplayName: test attributeSchema
185 name: test attributeSchema
188 # urgent replication should be enabled when creating
189 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
190 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
193 print "Not testing urgent replication when creating attributeSchema object ...\n"
195 # urgent replication should be enabled when modifying
197 m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
198 m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
201 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
202 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
205 def test_classSchema_object(self):
206 '''Test if the urgent replication is activated
207 when handling a classSchema object'''
210 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
211 objectClass: classSchema
215 governsID: 1.2.840.113556.1.5.999
217 showInAdvancedViewOnly: TRUE
218 adminDisplayName: test classSchema
219 adminDescription: test classSchema
220 objectClassCategory: 1
221 lDAPDisplayName: test classSchema
222 name: test classSchema
224 systemPossSuperiors: dfsConfiguration
225 systemMustContain: msDFS-SchemaMajorVersion
226 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
227 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
229 defaultHidingValue: TRUE""");
231 # urgent replication should be enabled when creating
232 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
233 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
236 print "Not testing urgent replication when creating classSchema object ...\n"
238 # urgent replication should be enabled when modifying
240 m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
241 m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
244 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
245 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
248 def test_secret_object(self):
249 '''Test if the urgent replication is activated
250 when handling a secret object'''
253 "dn": "cn=test secret,cn=System," + self.base_dn,
254 "objectClass":"secret",
256 "name":"test secret",
257 "currentValue":"xxxxxxx"}, ["relax:0"]);
259 # urgent replication should be enabled when creating
260 res = self.ldb.load_partition_usn(self.base_dn)
261 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
263 # urgent replication should be enabled when modifying
265 m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
266 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
269 res = self.ldb.load_partition_usn(self.base_dn)
270 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
272 # urgent replication should NOT be enabled when deleting
273 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
274 res = self.ldb.load_partition_usn(self.base_dn)
275 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
278 def test_rIDManager_object(self):
279 '''Test if the urgent replication is activated
280 when handling a rIDManager object'''
282 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
283 objectClass: rIDManager
286 showInAdvancedViewOnly: TRUE
287 name: RID Manager test
288 systemFlags: -1946157056
289 isCriticalSystemObject: TRUE
290 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
292 # urgent replication should be enabled when creating
293 res = self.ldb.load_partition_usn(self.base_dn)
294 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
296 # urgent replication should be enabled when modifying
298 m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
299 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
302 res = self.ldb.load_partition_usn(self.base_dn)
303 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
305 # urgent replication should NOT be enabled when deleting
306 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
307 res = self.ldb.load_partition_usn(self.base_dn)
308 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
311 def test_urgent_attributes(self):
312 '''Test if the urgent replication is activated
313 when handling urgent attributes of an object'''
316 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
317 "objectclass":"user",
318 "samaccountname":"user UrgAttr test",
319 "userAccountControl":"1",
322 "description":"urgent attributes test description"});
324 # urgent replication should NOT be enabled when creating
325 res = self.ldb.load_partition_usn(self.base_dn)
326 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
328 # urgent replication should be enabled when modifying userAccountControl
330 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
331 m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
332 "userAccountControl")
334 res = self.ldb.load_partition_usn(self.base_dn)
335 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
337 # urgent replication should be enabled when modifying lockoutTime
339 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
340 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
343 res = self.ldb.load_partition_usn(self.base_dn)
344 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
346 # urgent replication should be enabled when modifying pwdLastSet
348 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
349 m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
352 res = self.ldb.load_partition_usn(self.base_dn)
353 self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
355 # urgent replication should NOT be enabled when modifying a not-urgent
358 m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
359 m["description"] = MessageElement("updated urgent attributes test description",
360 FLAG_MOD_REPLACE, "description")
362 res = self.ldb.load_partition_usn(self.base_dn)
363 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
365 # urgent replication should NOT be enabled when deleting
366 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
367 res = self.ldb.load_partition_usn(self.base_dn)
368 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
371 if not "://" in host:
372 if os.path.isfile(host):
373 host = "tdb://%s" % host
375 host = "ldap://%s" % host
378 ldb = SamDB(host, credentials=creds, session_info=system_session(), lp=lp,
381 runner = SubunitTestRunner()
383 if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():