2 # -*- coding: utf-8 -*-
6 sys.path.insert(0, "bin/python")
8 from samba.tests.subunitrun import TestProgram, SubunitOptions
10 from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
11 MessageElement, Dn, FLAG_MOD_REPLACE)
13 import samba.dsdb as dsdb
14 import samba.getopt as options
17 parser = optparse.OptionParser("urgent_replication.py [options] <host>")
18 sambaopts = options.SambaOptions(parser)
19 parser.add_option_group(sambaopts)
20 parser.add_option_group(options.VersionOptions(parser))
22 # use command line creds if available
23 credopts = options.CredentialsOptions(parser)
24 parser.add_option_group(credopts)
25 subunitopts = SubunitOptions(parser)
26 parser.add_option_group(subunitopts)
27 opts, args = parser.parse_args()
36 class UrgentReplicationTests(samba.tests.TestCase):
38 def delete_force(self, ldb, dn):
40 ldb.delete(dn, ["relax:0"])
41 except LdbError, (num, _):
42 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
45 super(UrgentReplicationTests, self).setUp()
46 self.ldb = samba.tests.connect_samdb(host, global_schema=False)
47 self.base_dn = self.ldb.domain_dn()
49 print "baseDN: %s\n" % self.base_dn
51 def test_nonurgent_object(self):
52 """Test if the urgent replication is not activated when handling a non urgent object."""
54 "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
56 "samaccountname":"nonurgenttest",
57 "description":"nonurgenttest description"})
59 # urgent replication should not be enabled when creating
60 res = self.ldb.load_partition_usn(self.base_dn)
61 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
63 # urgent replication should not be enabled when modifying
65 m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
66 m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
69 res = self.ldb.load_partition_usn(self.base_dn)
70 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
72 # urgent replication should not be enabled when deleting
73 self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
74 res = self.ldb.load_partition_usn(self.base_dn)
75 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
77 def test_nTDSDSA_object(self):
78 """Test if the urgent replication is activated when handling a nTDSDSA object."""
80 "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
81 self.ldb.get_config_basedn(),
82 "objectclass":"server",
85 "systemFlags":"50000000"}, ["relax:0"])
88 """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
90 cn: NTDS Settings test
93 systemFlags: 33554432""", ["relax:0"])
95 # urgent replication should be enabled when creation
96 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
97 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
99 # urgent replication should NOT be enabled when modifying
101 m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
102 m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
105 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
106 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
108 # urgent replication should be enabled when deleting
109 self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
110 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
111 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
113 self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
115 def test_crossRef_object(self):
116 """Test if the urgent replication is activated when handling a crossRef object."""
118 "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
119 "objectClass": "crossRef",
120 "cn": "test crossRef",
121 "dnsRoot": self.get_loadparm().get("realm").lower(),
123 "nCName": self.base_dn,
124 "showInAdvancedViewOnly": "TRUE",
125 "name": "test crossRef",
126 "systemFlags": "1"}, ["relax:0"])
128 # urgent replication should be enabled when creating
129 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
130 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
132 # urgent replication should NOT be enabled when modifying
134 m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
135 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
138 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
139 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
142 # urgent replication should be enabled when deleting
143 self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
144 res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
145 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
147 def test_attributeSchema_object(self):
148 """Test if the urgent replication is activated when handling an attributeSchema object"""
151 """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
152 objectClass: attributeSchema
153 cn: test attributeSchema
155 isSingleValued: FALSE
156 showInAdvancedViewOnly: FALSE
157 attributeID: 1.3.6.1.4.1.7165.4.6.1.4.""" + str(random.randint(1,100000)) + """
158 attributeSyntax: 2.5.5.12
159 adminDisplayName: test attributeSchema
160 adminDescription: test attributeSchema
164 lDAPDisplayName: testAttributeSchema
165 name: test attributeSchema""")
167 # urgent replication should be enabled when creating
168 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
169 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
171 # urgent replication should be enabled when modifying
173 m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
174 m["lDAPDisplayName"] = MessageElement("updatedTestAttributeSchema", FLAG_MOD_REPLACE,
177 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
178 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
180 def test_classSchema_object(self):
181 """Test if the urgent replication is activated when handling a classSchema object."""
184 """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
185 objectClass: classSchema
189 governsId: 1.3.6.1.4.1.7165.4.6.2.4.""" + str(random.randint(1,100000)) + """
191 showInAdvancedViewOnly: TRUE
192 adminDisplayName: test classSchema
193 adminDescription: test classSchema
194 objectClassCategory: 1
195 lDAPDisplayName: testClassSchema
196 name: test classSchema
198 systemPossSuperiors: dfsConfiguration
199 systemMustContain: msDFS-SchemaMajorVersion
200 defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
201 CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
203 defaultHidingValue: TRUE""")
205 # urgent replication should be enabled when creating
206 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
207 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
210 print "Not testing urgent replication when creating classSchema object ...\n"
212 # urgent replication should be enabled when modifying
214 m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
215 m["lDAPDisplayName"] = MessageElement("updatedTestClassSchema", FLAG_MOD_REPLACE,
218 res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
219 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
221 def test_secret_object(self):
222 """Test if the urgent replication is activated when handling a secret object."""
225 "dn": "cn=test secret,cn=System," + self.base_dn,
226 "objectClass":"secret",
228 "name":"test secret",
229 "currentValue":"xxxxxxx"})
231 # urgent replication should be enabled when creating
232 res = self.ldb.load_partition_usn(self.base_dn)
233 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
235 # urgent replication should be enabled when modifying
237 m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
238 m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
241 res = self.ldb.load_partition_usn(self.base_dn)
242 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
244 # urgent replication should NOT be enabled when deleting
245 self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
246 res = self.ldb.load_partition_usn(self.base_dn)
247 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
249 def test_rIDManager_object(self):
250 """Test if the urgent replication is activated when handling a rIDManager object."""
252 """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
253 objectClass: rIDManager
256 showInAdvancedViewOnly: TRUE
257 name: RID Manager test
258 systemFlags: -1946157056
259 isCriticalSystemObject: TRUE
260 rIDAvailablePool: 133001-1073741823""", ["relax:0"])
262 # urgent replication should be enabled when creating
263 res = self.ldb.load_partition_usn(self.base_dn)
264 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
266 # urgent replication should be enabled when modifying
268 m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
269 m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
272 res = self.ldb.load_partition_usn(self.base_dn)
273 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
275 # urgent replication should NOT be enabled when deleting
276 self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
277 res = self.ldb.load_partition_usn(self.base_dn)
278 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
280 def test_urgent_attributes(self):
281 """Test if the urgent replication is activated when handling urgent attributes of an object."""
284 "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
285 "objectclass":"user",
286 "samaccountname":"user UrgAttr test",
287 "userAccountControl":str(dsdb.UF_NORMAL_ACCOUNT),
290 "description":"urgent attributes test description"})
292 # urgent replication should NOT be enabled when creating
293 res = self.ldb.load_partition_usn(self.base_dn)
294 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
296 # urgent replication should be enabled when modifying userAccountControl
298 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
299 m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_DONT_EXPIRE_PASSWD), FLAG_MOD_REPLACE,
300 "userAccountControl")
302 res = self.ldb.load_partition_usn(self.base_dn)
303 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
305 # urgent replication should be enabled when modifying lockoutTime
307 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
308 m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
311 res = self.ldb.load_partition_usn(self.base_dn)
312 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
314 # urgent replication should be enabled when modifying pwdLastSet
316 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
317 m["pwdLastSet"] = MessageElement("-1", FLAG_MOD_REPLACE,
320 res = self.ldb.load_partition_usn(self.base_dn)
321 self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
323 # urgent replication should NOT be enabled when modifying a not-urgent
326 m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
327 m["description"] = MessageElement("updated urgent attributes test description",
328 FLAG_MOD_REPLACE, "description")
330 res = self.ldb.load_partition_usn(self.base_dn)
331 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
333 # urgent replication should NOT be enabled when deleting
334 self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
335 res = self.ldb.load_partition_usn(self.base_dn)
336 self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
339 TestProgram(module=__name__, opts=subunitopts)