2 # -*- coding: utf-8 -*-
4 # Tests various schema replication scenarios
6 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU General Public License for more details.
18 # You should have received a copy of the GNU General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 # export DC1=dc1_dns_name
25 # export DC2=dc2_dns_name
26 # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN replica_sync -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
30 from __future__ import print_function
37 SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
39 class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
40 """Intended as a black box test case for DsReplicaSync
41 implementation. It should test the behavior of this
42 case in cases when inbound replication is disabled"""
45 super(DrsReplicaSyncTestCase, self).setUp()
46 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True)
47 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
52 self._cleanup_object(self.ou1)
53 self._cleanup_object(self.ou2)
55 # re-enable replication
56 self._enable_inbound_repl(self.dnsname_dc1)
57 self._enable_inbound_repl(self.dnsname_dc2)
59 super(DrsReplicaSyncTestCase, self).tearDown()
61 def _cleanup_object(self, guid):
62 """Cleans up a test object, if it still exists"""
65 self.ldb_dc2.delete('<GUID=%s>' % guid, ["tree_delete:1"])
68 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
70 self.ldb_dc1.delete('<GUID=%s>' % guid, ["tree_delete:1"])
71 except LdbError as e1:
73 self.assertEquals(num, ERR_NO_SUCH_OBJECT)
75 def test_ReplEnabled(self):
76 """Tests we can replicate when replication is enabled"""
77 self._enable_inbound_repl(self.dnsname_dc1)
78 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False)
80 def test_ReplDisabled(self):
81 """Tests we cann't replicate when replication is disabled"""
82 self._disable_inbound_repl(self.dnsname_dc1)
84 ccache_name = self.get_creds_ccache_name()
86 # Tunnel the command line credentials down to the
87 # subcommand to avoid a new kinit
88 cmdline_auth = "--krb5-ccache=%s" % ccache_name
90 # bin/samba-tool drs <drs_command> <cmdline_auth>
91 cmd_list = ["drs", "replicate", cmdline_auth]
93 nc_dn = self.domain_dn
94 # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
95 cmd_list += [self.dnsname_dc1, self.dnsname_dc2, nc_dn]
97 (result, out, err) = self.runsubcmd(*cmd_list)
98 self.assertCmdFail(result)
99 self.assertTrue('WERR_DS_DRA_SINK_DISABLED' in err)
101 def test_ReplDisabledForced(self):
102 """Tests we can force replicate when replication is disabled"""
103 self._disable_inbound_repl(self.dnsname_dc1)
104 out = self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True)
106 def test_ReplLocal(self):
107 """Tests we can replicate direct to the local db"""
108 self._enable_inbound_repl(self.dnsname_dc1)
109 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=False, local=True, full_sync=True)
111 def _create_ou(self, samdb, name):
114 objectClass: organizationalUnit
115 """ % (name, self.domain_dn)
117 res = samdb.search(base="%s,%s" % (name, self.domain_dn),
118 scope=SCOPE_BASE, attrs=["objectGUID"])
119 return self._GUID_string(res[0]["objectGUID"][0])
121 def _check_deleted(self, sam_ldb, guid):
122 # search the user by guid as it may be deleted
123 res = sam_ldb.search(base='<GUID=%s>' % guid,
124 controls=["show_deleted:1"],
125 attrs=["isDeleted", "objectCategory", "ou"])
126 self.assertEquals(len(res), 1)
128 # Deleted Object base DN
129 dodn = self._deleted_objects_dn(sam_ldb)
130 # now check properties of the user
131 name_cur = ou_cur["ou"][0]
132 self.assertEquals(ou_cur["isDeleted"][0],"TRUE")
133 self.assertTrue(not("objectCategory" in ou_cur))
134 self.assertTrue(dodn in str(ou_cur["dn"]),
135 "OU %s is deleted but it is not located under %s!" % (name_cur, dodn))
137 def test_ReplConflictsFullSync(self):
138 """Tests that objects created in conflict become conflict DNs (honour full sync override)"""
140 # First confirm local replication (so when we test against windows, this fails fast without creating objects)
141 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
143 self._disable_inbound_repl(self.dnsname_dc1)
144 self._disable_inbound_repl(self.dnsname_dc2)
146 # Create conflicting objects on DC1 and DC2, with DC1 object created first
147 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Full Sync")
148 # We have to sleep to ensure that the two objects have different timestamps
150 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Full Sync")
152 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, local=True, forced=True, full_sync=True)
154 # Check that DC2 got the DC1 object, and OU1 was make into conflict
155 res1 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou1,
156 scope=SCOPE_BASE, attrs=["name"])
157 res2 = self.ldb_dc2.search(base="<GUID=%s>" % self.ou2,
158 scope=SCOPE_BASE, attrs=["name"])
159 print(res1[0]["name"][0])
160 print(res2[0]["name"][0])
161 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
162 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
163 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res1[0].dn))
164 self.assertTrue(self._lost_and_found_dn(self.ldb_dc2, self.domain_dn) not in str(res2[0].dn))
165 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
166 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
168 # Delete both objects by GUID on DC2
170 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
171 self.ldb_dc2.delete('<GUID=%s>' % self.ou2)
173 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=True)
175 self._check_deleted(self.ldb_dc1, self.ou1)
176 self._check_deleted(self.ldb_dc1, self.ou2)
177 # Check deleted on DC2
178 self._check_deleted(self.ldb_dc2, self.ou1)
179 self._check_deleted(self.ldb_dc2, self.ou2)
181 def test_ReplConflictsRemoteWin(self):
182 """Tests that objects created in conflict become conflict DNs"""
183 self._disable_inbound_repl(self.dnsname_dc1)
184 self._disable_inbound_repl(self.dnsname_dc2)
186 # Create conflicting objects on DC1 and DC2, with DC1 object created first
187 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Conflict")
188 # We have to sleep to ensure that the two objects have different timestamps
190 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Conflict")
192 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
194 # Check that DC2 got the DC1 object, and OU1 was make into conflict
195 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
196 scope=SCOPE_BASE, attrs=["name"])
197 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
198 scope=SCOPE_BASE, attrs=["name"])
199 print(res1[0]["name"][0])
200 print(res2[0]["name"][0])
201 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
202 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
203 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
204 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
205 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
207 # Delete both objects by GUID on DC1
209 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
210 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
212 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
214 self._check_deleted(self.ldb_dc1, self.ou1)
215 self._check_deleted(self.ldb_dc1, self.ou2)
216 # Check deleted on DC2
217 self._check_deleted(self.ldb_dc2, self.ou1)
218 self._check_deleted(self.ldb_dc2, self.ou2)
220 def test_ReplConflictsLocalWin(self):
221 """Tests that objects created in conflict become conflict DNs"""
222 self._disable_inbound_repl(self.dnsname_dc1)
223 self._disable_inbound_repl(self.dnsname_dc2)
225 # Create conflicting objects on DC1 and DC2, with DC2 object created first
226 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Local Conflict")
227 # We have to sleep to ensure that the two objects have different timestamps
229 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Local Conflict")
231 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
233 # Check that DC2 got the DC1 object, and OU2 was make into conflict
234 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
235 scope=SCOPE_BASE, attrs=["name"])
236 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
237 scope=SCOPE_BASE, attrs=["name"])
238 print(res1[0]["name"][0])
239 print(res2[0]["name"][0])
240 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]), "Got %s for %s" % (str(res2[0]["name"][0]), self.ou2))
241 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
242 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
243 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
244 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
246 # Delete both objects by GUID on DC1
248 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
249 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
251 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
253 self._check_deleted(self.ldb_dc1, self.ou1)
254 self._check_deleted(self.ldb_dc1, self.ou2)
255 # Check deleted on DC2
256 self._check_deleted(self.ldb_dc2, self.ou1)
257 self._check_deleted(self.ldb_dc2, self.ou2)
259 def test_ReplConflictsRemoteWin_with_child(self):
260 """Tests that objects created in conflict become conflict DNs"""
261 self._disable_inbound_repl(self.dnsname_dc1)
262 self._disable_inbound_repl(self.dnsname_dc2)
264 # Create conflicting objects on DC1 and DC2, with DC1 object created first
265 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Conflict")
266 # We have to sleep to ensure that the two objects have different timestamps
268 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Conflict")
269 # Create children on DC2
270 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Conflict")
271 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Conflict")
273 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
275 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
276 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
277 scope=SCOPE_BASE, attrs=["name"])
278 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
279 scope=SCOPE_BASE, attrs=["name"])
280 print(res1[0]["name"][0])
281 print(res2[0]["name"][0])
282 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
283 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
284 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
285 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
286 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
288 # Delete both objects by GUID on DC1
290 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
291 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
293 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
295 self._check_deleted(self.ldb_dc1, self.ou1)
296 self._check_deleted(self.ldb_dc1, self.ou2)
297 # Check deleted on DC2
298 self._check_deleted(self.ldb_dc2, self.ou1)
299 self._check_deleted(self.ldb_dc2, self.ou2)
301 self._check_deleted(self.ldb_dc1, ou1_child)
302 self._check_deleted(self.ldb_dc1, ou2_child)
303 # Check deleted on DC2
304 self._check_deleted(self.ldb_dc2, ou1_child)
305 self._check_deleted(self.ldb_dc2, ou2_child)
307 def test_ReplConflictsRenamedVsNewRemoteWin(self):
308 """Tests resolving a DN conflict between a renamed object and a new object"""
309 self._disable_inbound_repl(self.dnsname_dc1)
310 self._disable_inbound_repl(self.dnsname_dc2)
312 # Create an OU and rename it on DC1
313 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict orig")
314 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict,%s" % self.domain_dn)
316 # We have to sleep to ensure that the two objects have different timestamps
319 # create a conflicting object with the same DN on DC2
320 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict")
322 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
324 # Check that DC2 got the DC1 object, and SELF.OU1 was made into conflict
325 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
326 scope=SCOPE_BASE, attrs=["name"])
327 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
328 scope=SCOPE_BASE, attrs=["name"])
329 print(res1[0]["name"][0])
330 print(res2[0]["name"][0])
331 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
332 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
333 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
334 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
335 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
337 # Delete both objects by GUID on DC1
338 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
339 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
341 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
343 self._check_deleted(self.ldb_dc1, self.ou1)
344 self._check_deleted(self.ldb_dc1, self.ou2)
345 # Check deleted on DC2
346 self._check_deleted(self.ldb_dc2, self.ou1)
347 self._check_deleted(self.ldb_dc2, self.ou2)
349 def test_ReplConflictsRenamedVsNewLocalWin(self):
350 """Tests resolving a DN conflict between a renamed object and a new object"""
351 self._disable_inbound_repl(self.dnsname_dc1)
352 self._disable_inbound_repl(self.dnsname_dc2)
354 # Create conflicting objects on DC1 and DC2, where the DC2 object has been renamed
355 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict orig")
356 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict,%s" % self.domain_dn)
357 # We have to sleep to ensure that the two objects have different timestamps
359 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
361 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
363 # Check that DC2 got the DC1 object, and OU2 was made into conflict
364 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
365 scope=SCOPE_BASE, attrs=["name"])
366 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
367 scope=SCOPE_BASE, attrs=["name"])
368 print(res1[0]["name"][0])
369 print(res2[0]["name"][0])
370 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
371 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
372 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
373 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
374 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
376 # Delete both objects by GUID on DC1
377 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
378 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
380 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
382 self._check_deleted(self.ldb_dc1, self.ou1)
383 self._check_deleted(self.ldb_dc1, self.ou2)
384 # Check deleted on DC2
385 self._check_deleted(self.ldb_dc2, self.ou1)
386 self._check_deleted(self.ldb_dc2, self.ou2)
388 def test_ReplConflictsRenameRemoteWin(self):
389 """Tests that objects created in conflict become conflict DNs"""
390 self._disable_inbound_repl(self.dnsname_dc1)
391 self._disable_inbound_repl(self.dnsname_dc2)
393 # Create conflicting objects on DC1 and DC2, with DC1 object created first
394 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Remote Rename Conflict")
395 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Remote Rename Conflict 2")
397 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
399 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Remote Rename Conflict 3,%s" % self.domain_dn)
400 # We have to sleep to ensure that the two objects have different timestamps
402 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Remote Rename Conflict 3,%s" % self.domain_dn)
404 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
406 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
407 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
408 scope=SCOPE_BASE, attrs=["name"])
409 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
410 scope=SCOPE_BASE, attrs=["name"])
411 print(res1[0]["name"][0])
412 print(res2[0]["name"][0])
413 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
414 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
415 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
416 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
417 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
419 # Delete both objects by GUID on DC1
421 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
422 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
424 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
426 self._check_deleted(self.ldb_dc1, self.ou1)
427 self._check_deleted(self.ldb_dc1, self.ou2)
428 # Check deleted on DC2
429 self._check_deleted(self.ldb_dc2, self.ou1)
430 self._check_deleted(self.ldb_dc2, self.ou2)
433 def test_ReplConflictsRenameRemoteWin_with_child(self):
434 """Tests that objects created in conflict become conflict DNs"""
435 self._disable_inbound_repl(self.dnsname_dc1)
436 self._disable_inbound_repl(self.dnsname_dc2)
438 # Create conflicting objects on DC1 and DC2, with DC1 object created first
439 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Parent Remote Rename Conflict")
440 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Parent Remote Rename Conflict 2")
441 # Create children on DC2
442 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Test Parent Remote Rename Conflict")
443 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Test Parent Remote Rename Conflict 2")
445 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
447 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Parent Remote Rename Conflict 3,%s" % self.domain_dn)
448 # We have to sleep to ensure that the two objects have different timestamps
450 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Parent Remote Rename Conflict 3,%s" % self.domain_dn)
452 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
454 # Check that DC2 got the DC1 object, and SELF.OU1 was make into conflict
455 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
456 scope=SCOPE_BASE, attrs=["name"])
457 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
458 scope=SCOPE_BASE, attrs=["name"])
459 print(res1[0]["name"][0])
460 print(res2[0]["name"][0])
461 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
462 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
463 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
464 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
465 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
467 # Delete both objects by GUID on DC1
469 self.ldb_dc1.delete('<GUID=%s>' % self.ou1, ["tree_delete:1"])
470 self.ldb_dc1.delete('<GUID=%s>' % self.ou2, ["tree_delete:1"])
472 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
474 self._check_deleted(self.ldb_dc1, self.ou1)
475 self._check_deleted(self.ldb_dc1, self.ou2)
476 # Check deleted on DC2
477 self._check_deleted(self.ldb_dc2, self.ou1)
478 self._check_deleted(self.ldb_dc2, self.ou2)
480 self._check_deleted(self.ldb_dc1, ou1_child)
481 self._check_deleted(self.ldb_dc1, ou2_child)
482 # Check deleted on DC2
483 self._check_deleted(self.ldb_dc2, ou1_child)
484 self._check_deleted(self.ldb_dc2, ou2_child)
487 def test_ReplConflictsRenameLocalWin(self):
488 """Tests that objects created in conflict become conflict DNs"""
489 self._disable_inbound_repl(self.dnsname_dc1)
490 self._disable_inbound_repl(self.dnsname_dc2)
492 # Create conflicting objects on DC1 and DC2, with DC1 object created first
493 self.ou1 = self._create_ou(self.ldb_dc1, "OU=Test Rename Local Conflict")
494 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Test Rename Local Conflict 2")
496 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
498 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Test Rename Local Conflict 3,%s" % self.domain_dn)
499 # We have to sleep to ensure that the two objects have different timestamps
501 self.ldb_dc1.rename("<GUID=%s>" % self.ou1, "OU=Test Rename Local Conflict 3,%s" % self.domain_dn)
503 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
505 # Check that DC2 got the DC1 object, and OU2 was make into conflict
506 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
507 scope=SCOPE_BASE, attrs=["name"])
508 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
509 scope=SCOPE_BASE, attrs=["name"])
510 print(res1[0]["name"][0])
511 print(res2[0]["name"][0])
512 self.assertTrue('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
513 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res1[0].dn))
514 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) not in str(res2[0].dn))
515 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
516 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
518 # Delete both objects by GUID on DC1
520 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
521 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
523 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
525 self._check_deleted(self.ldb_dc1, self.ou1)
526 self._check_deleted(self.ldb_dc1, self.ou2)
527 # Check deleted on DC2
528 self._check_deleted(self.ldb_dc2, self.ou1)
529 self._check_deleted(self.ldb_dc2, self.ou2)
531 def test_ReplLostAndFound(self):
532 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
533 self._disable_inbound_repl(self.dnsname_dc1)
534 self._disable_inbound_repl(self.dnsname_dc2)
536 # Create two OUs on DC2
537 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Deleted parent")
538 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Deleted parent 2")
540 # replicate them from DC2 to DC1
541 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
543 # Delete both objects by GUID on DC1
545 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
546 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
548 # Create children on DC2
549 ou1_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent")
550 ou2_child = self._create_ou(self.ldb_dc2, "OU=Test Child,OU=Deleted parent 2")
553 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
555 # Check the sub-OUs are now in lostAndFound and the first one is a conflict DN
557 # Check that DC2 got the DC1 object, and one or other object was make into conflict
558 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
559 scope=SCOPE_BASE, attrs=["name"])
560 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
561 scope=SCOPE_BASE, attrs=["name"])
562 print(res1[0]["name"][0])
563 print(res2[0]["name"][0])
564 self.assertTrue('CNF:%s' % ou1_child in str(res1[0]["name"][0]) or 'CNF:%s' % ou2_child in str(res2[0]["name"][0]))
565 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res1[0].dn))
566 self.assertTrue(self._lost_and_found_dn(self.ldb_dc1, self.domain_dn) in str(res2[0].dn))
567 self.assertEqual(str(res1[0]["name"][0]), res1[0].dn.get_rdn_value())
568 self.assertEqual(str(res2[0]["name"][0]), res2[0].dn.get_rdn_value())
570 # Delete all objects by GUID on DC1
572 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
573 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
575 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
578 # Check all deleted on DC1
579 self._check_deleted(self.ldb_dc1, self.ou1)
580 self._check_deleted(self.ldb_dc1, self.ou2)
581 self._check_deleted(self.ldb_dc1, ou1_child)
582 self._check_deleted(self.ldb_dc1, ou2_child)
583 # Check all deleted on DC2
584 self._check_deleted(self.ldb_dc2, self.ou1)
585 self._check_deleted(self.ldb_dc2, self.ou2)
586 self._check_deleted(self.ldb_dc2, ou1_child)
587 self._check_deleted(self.ldb_dc2, ou2_child)
589 def test_ReplRenames(self):
590 """Tests that objects created under a OU deleted eleswhere end up in lostAndFound"""
591 self._disable_inbound_repl(self.dnsname_dc1)
592 self._disable_inbound_repl(self.dnsname_dc2)
594 # Create two OUs on DC2
595 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Original parent")
596 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Original parent 2")
598 # replicate them from DC2 to DC1
599 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
601 # Create children on DC1
602 ou1_child = self._create_ou(self.ldb_dc1, "OU=Test Child,OU=Original parent")
603 ou2_child = self._create_ou(self.ldb_dc1, "OU=Test Child 2,OU=Original parent")
604 ou3_child = self._create_ou(self.ldb_dc1, "OU=Test Case Child,OU=Original parent")
606 # replicate them from DC1 to DC2
607 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
609 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child 3,OU=Original parent 2,%s" % self.domain_dn)
610 self.ldb_dc1.rename("<GUID=%s>" % ou1_child, "OU=Test Child 2,OU=Original parent 2,%s" % self.domain_dn)
611 self.ldb_dc1.rename("<GUID=%s>" % ou2_child, "OU=Test Child,OU=Original parent 2,%s" % self.domain_dn)
612 self.ldb_dc1.rename("<GUID=%s>" % ou3_child, "OU=Test CASE Child,OU=Original parent,%s" % self.domain_dn)
613 self.ldb_dc2.rename("<GUID=%s>" % self.ou2, "OU=Original parent 3,%s" % self.domain_dn)
614 self.ldb_dc2.rename("<GUID=%s>" % self.ou1, "OU=Original parent 2,%s" % self.domain_dn)
616 # replicate them from DC1 to DC2
617 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
619 # Check the sub-OUs are now under Original Parent 3 (original
620 # parent 2 for Test CASE Child), and both have the right names
622 # Check that DC2 got the DC1 object, and the renames are all correct
623 res1 = self.ldb_dc2.search(base="<GUID=%s>" % ou1_child,
624 scope=SCOPE_BASE, attrs=["name"])
625 res2 = self.ldb_dc2.search(base="<GUID=%s>" % ou2_child,
626 scope=SCOPE_BASE, attrs=["name"])
627 res3 = self.ldb_dc2.search(base="<GUID=%s>" % ou3_child,
628 scope=SCOPE_BASE, attrs=["name"])
632 self.assertEqual('Test Child 2', res1[0]["name"][0])
633 self.assertEqual('Test Child', res2[0]["name"][0])
634 self.assertEqual('Test CASE Child', res3[0]["name"][0])
635 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.domain_dn)
636 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.domain_dn)
637 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.domain_dn)
639 # replicate them from DC2 to DC1
640 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
642 # Check that DC1 got the DC2 object, and the renames are all correct
643 res1 = self.ldb_dc1.search(base="<GUID=%s>" % ou1_child,
644 scope=SCOPE_BASE, attrs=["name"])
645 res2 = self.ldb_dc1.search(base="<GUID=%s>" % ou2_child,
646 scope=SCOPE_BASE, attrs=["name"])
647 res3 = self.ldb_dc1.search(base="<GUID=%s>" % ou3_child,
648 scope=SCOPE_BASE, attrs=["name"])
652 self.assertEqual('Test Child 2', res1[0]["name"][0])
653 self.assertEqual('Test Child', res2[0]["name"][0])
654 self.assertEqual('Test CASE Child', res3[0]["name"][0])
655 self.assertEqual(str(res1[0].dn), "OU=Test Child 2,OU=Original parent 3,%s" % self.domain_dn)
656 self.assertEqual(str(res2[0].dn), "OU=Test Child,OU=Original parent 3,%s" % self.domain_dn)
657 self.assertEqual(str(res3[0].dn), "OU=Test CASE Child,OU=Original parent 2,%s" % self.domain_dn)
659 # Delete all objects by GUID on DC1
661 self.ldb_dc1.delete('<GUID=%s>' % ou1_child)
662 self.ldb_dc1.delete('<GUID=%s>' % ou2_child)
663 self.ldb_dc1.delete('<GUID=%s>' % ou3_child)
664 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
665 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
667 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
670 # Check all deleted on DC1
671 self._check_deleted(self.ldb_dc1, self.ou1)
672 self._check_deleted(self.ldb_dc1, self.ou2)
673 self._check_deleted(self.ldb_dc1, ou1_child)
674 self._check_deleted(self.ldb_dc1, ou2_child)
675 self._check_deleted(self.ldb_dc1, ou3_child)
676 # Check all deleted on DC2
677 self._check_deleted(self.ldb_dc2, self.ou1)
678 self._check_deleted(self.ldb_dc2, self.ou2)
679 self._check_deleted(self.ldb_dc2, ou1_child)
680 self._check_deleted(self.ldb_dc2, ou2_child)
681 self._check_deleted(self.ldb_dc2, ou3_child)
683 def reanimate_object(self, samdb, guid, new_dn):
684 """Re-animates a deleted object"""
685 res = samdb.search(base="<GUID=%s>" % guid, attrs=["isDeleted"],
686 controls=['show_deleted:1'], scope=SCOPE_BASE)
692 msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
693 msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
694 samdb.modify(msg, ["show_deleted:1"])
696 def test_ReplReanimationConflict(self):
698 Checks that if a reanimated object conflicts with a new object, then
699 the conflict is resolved correctly.
702 self._disable_inbound_repl(self.dnsname_dc1)
703 self._disable_inbound_repl(self.dnsname_dc2)
705 # create an object, "accidentally" delete it, and replicate the changes to both DCs
706 self.ou1 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
707 self.ldb_dc2.delete('<GUID=%s>' % self.ou1)
708 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
710 # Now pretend that the admin for one DC resolves the problem by
711 # re-animating the object...
712 self.reanimate_object(self.ldb_dc1, self.ou1, "OU=Conflict object,%s" % self.domain_dn)
714 # ...whereas another admin just creates a user with the same name
715 # again on a different DC
717 self.ou2 = self._create_ou(self.ldb_dc2, "OU=Conflict object")
719 # Now sync the DCs to resolve the conflict
720 self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, forced=True, full_sync=False)
722 # Check the latest change won and SELF.OU1 was made into a conflict
723 res1 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou1,
724 scope=SCOPE_BASE, attrs=["name"])
725 res2 = self.ldb_dc1.search(base="<GUID=%s>" % self.ou2,
726 scope=SCOPE_BASE, attrs=["name"])
727 print(res1[0]["name"][0])
728 print(res2[0]["name"][0])
729 self.assertTrue('CNF:%s' % self.ou1 in str(res1[0]["name"][0]))
730 self.assertFalse('CNF:%s' % self.ou2 in str(res2[0]["name"][0]))
732 # Delete both objects by GUID on DC1
733 self.ldb_dc1.delete('<GUID=%s>' % self.ou1)
734 self.ldb_dc1.delete('<GUID=%s>' % self.ou2)
736 self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, forced=True, full_sync=False)
738 self._check_deleted(self.ldb_dc1, self.ou1)
739 self._check_deleted(self.ldb_dc1, self.ou2)
740 # Check deleted on DC2
741 self._check_deleted(self.ldb_dc2, self.ou1)
742 self._check_deleted(self.ldb_dc2, self.ou2)