getncchanges.py: Add a test for dropped cross-partition links
[metze/samba/wip.git] / source4 / torture / drs / python / getncchanges.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Catalyst.Net Ltd. 2017
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 import drs_base
31 import samba.tests
32 import ldb
33 from ldb import SCOPE_BASE
34 import random
35
36 from samba.dcerpc import drsuapi
37
38 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
39     def setUp(self):
40         super(DrsReplicaSyncIntegrityTestCase, self).setUp()
41
42         self.init_test_state()
43
44         # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
45         # the vampire_dc), so we point this test directly at that DC
46         self.set_test_ldb_dc(self.ldb_dc2)
47
48         # add some randomness to the test OU. (Deletion of the last test's
49         # objects can be slow to replicate out. So the OU created by a previous
50         # testenv may still exist at this point).
51         rand = random.randint(1, 10000000)
52         self.base_dn = self.test_ldb_dc.get_default_basedn()
53         self.ou = "OU=getncchanges%d_test,%s" %(rand, self.base_dn)
54         self.test_ldb_dc.add({
55             "dn": self.ou,
56             "objectclass": "organizationalUnit"})
57
58         self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
59         self.set_dc_connection(self.default_conn)
60
61     def tearDown(self):
62         super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
63         # tidyup groups and users
64         try:
65             self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
66         except ldb.LdbError as (enum, string):
67             if enum == ldb.ERR_NO_SUCH_OBJECT:
68                 pass
69
70     def init_test_state(self):
71         self.rxd_dn_list = []
72         self.rxd_links = []
73         self.rxd_guids = []
74         self.last_ctr = None
75
76         # 100 is the minimum max_objects that Microsoft seems to honour
77         # (the max honoured is 400ish), so we use that in these tests
78         self.max_objects = 100
79
80         # store whether we used GET_TGT/GET_ANC flags in the requests
81         self.used_get_tgt = False
82         self.used_get_anc = False
83
84     def add_object(self, dn, objectclass="organizationalunit"):
85         """Adds an OU object"""
86         self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
87         res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
88         self.assertEquals(len(res), 1)
89
90     def modify_object(self, dn, attr, value):
91         """Modifies an object's USN by adding an attribute value to it"""
92         m = ldb.Message()
93         m.dn = ldb.Dn(self.test_ldb_dc, dn)
94         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
95         self.test_ldb_dc.modify(m)
96
97     def delete_attribute(self, dn, attr, value):
98         """Deletes an attribute from an object"""
99         m = ldb.Message()
100         m.dn = ldb.Dn(self.test_ldb_dc, dn)
101         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
102         self.test_ldb_dc.modify(m)
103
104     def start_new_repl_cycle(self):
105         """Resets enough state info to start a new replication cycle"""
106         # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
107         # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve
108         self.rxd_links = []
109
110         self.used_get_tgt = False
111         self.used_get_anc = False
112         # mostly preserve self.last_ctr, so that we use the last HWM
113         if self.last_ctr is not None:
114             self.last_ctr.more_data = True
115
116     def create_object_range(self, start, end, prefix="",
117                             children=None, parent_list=None):
118         """
119         Creates a block of objects. Object names are numbered sequentially,
120         using the optional prefix supplied. If the children parameter is
121         supplied it will create a parent-child hierarchy and return the
122         top-level parents separately.
123         """
124         dn_list = []
125
126         # Use dummy/empty lists if we're not creating a parent/child hierarchy
127         if children is None:
128             children = []
129
130         if parent_list is None:
131             parent_list = []
132
133         # Create the parents first, then the children.
134         # This makes it easier to see in debug when GET_ANC takes effect
135         # because the parent/children become interleaved (by default,
136         # this approach means the objects are organized into blocks of
137         # parents and blocks of children together)
138         for x in range(start, end):
139             ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
140             self.add_object(ou)
141             dn_list.append(ou)
142
143             # keep track of the top-level parents (if needed)
144             parent_list.append(ou)
145
146         # create the block of children (if needed)
147         for x in range(start, end):
148             for child in children:
149                 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
150                 self.add_object(ou)
151                 dn_list.append(ou)
152
153         return dn_list
154
155     def assert_expected_data(self, expected_list):
156         """
157         Asserts that we received all the DNs that we expected and
158         none are missing.
159         """
160         received_list = self.rxd_dn_list
161
162         # Note that with GET_ANC Windows can end up sending the same parent
163         # object multiple times, so this might be noteworthy but doesn't
164         # warrant failing the test
165         if (len(received_list) != len(expected_list)):
166             print("Note: received %d objects but expected %d" %(len(received_list),
167                                                                 len(expected_list)))
168
169         # Check that we received every object that we were expecting
170         for dn in expected_list:
171             self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
172
173     def test_repl_integrity(self):
174         """
175         Modify the objects being replicated while the replication is still
176         in progress and check that no object loss occurs.
177         """
178
179         # The server behaviour differs between samba and Windows. Samba returns
180         # the objects in the original order (up to the pre-modify HWM). Windows
181         # incorporates the modified objects and returns them in the new order
182         # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
183         # docs state the Windows behaviour is optional.
184
185         # Create a range of objects to replicate.
186         expected_dn_list = self.create_object_range(0, 400)
187         (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
188
189         # We ask for the first page of 100 objects.
190         # For this test, we don't care what order we receive the objects in,
191         # so long as by the end we've received everything
192         self.repl_get_next()
193
194         # Modify some of the second page of objects. This should bump the highwatermark
195         for x in range(100, 200):
196             self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
197
198         (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
199         self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
200
201         # Get the remaining blocks of data
202         while not self.replication_complete():
203             self.repl_get_next()
204
205         # Check we still receive all the objects we're expecting
206         self.assert_expected_data(expected_dn_list)
207
208     def is_parent_known(self, dn, known_dn_list):
209         """
210         Returns True if the parent of the dn specified is in known_dn_list
211         """
212
213         # we can sometimes get system objects like the RID Manager returned.
214         # Ignore anything that is not under the test OU we created
215         if self.ou not in dn:
216             return True
217
218         # Remove the child portion from the name to get the parent's DN
219         name_substrings = dn.split(",")
220         del name_substrings[0]
221
222         parent_dn = ",".join(name_substrings)
223
224         # check either this object is a parent (it's parent is the top-level
225         # test object), or its parent has been seen previously
226         return parent_dn == self.ou or parent_dn in known_dn_list
227
228     def _repl_send_request(self, get_anc=False, get_tgt=False):
229         """Sends a GetNCChanges request for the next block of replication data."""
230
231         # we're just trying to mimic regular client behaviour here, so just
232         # use the highwatermark in the last response we received
233         if self.last_ctr:
234             highwatermark = self.last_ctr.new_highwatermark
235             uptodateness_vector = self.last_ctr.uptodateness_vector
236         else:
237             # this is the first replication chunk
238             highwatermark = None
239             uptodateness_vector = None
240
241         # Ask for the next block of replication data
242         replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
243         more_flags = 0
244
245         if get_anc:
246             replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC
247             self.used_get_anc = True
248
249         if get_tgt:
250             more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
251             self.used_get_tgt = True
252
253         # return the response from the DC
254         return self._get_replication(replica_flags,
255                                      max_objects=self.max_objects,
256                                      highwatermark=highwatermark,
257                                      uptodateness_vector=uptodateness_vector,
258                                      more_flags=more_flags)
259
260     def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
261         """
262         Requests the next block of replication data. This tries to simulate
263         client behaviour - if we receive a replicated object that we don't know
264         the parent of, then re-request the block with the GET_ANC flag set.
265         If we don't know the target object for a linked attribute, then
266         re-request with GET_TGT.
267         """
268
269         # send a request to the DC and get the response
270         ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
271
272         # extract the object DNs and their GUIDs from the response
273         rxd_dn_list = self._get_ctr6_dn_list(ctr6)
274         rxd_guid_list = self._get_ctr6_object_guids(ctr6)
275
276         # we'll add new objects as we discover them, so take a copy of the
277         # ones we already know about, so we can modify these lists safely
278         known_objects = self.rxd_dn_list[:]
279         known_guids = self.rxd_guids[:]
280
281         # check that we know the parent for every object received
282         for i in range(0, len(rxd_dn_list)):
283
284             dn = rxd_dn_list[i]
285             guid = rxd_guid_list[i]
286
287             if self.is_parent_known(dn, known_objects):
288
289                 # the new DN is now known so add it to the list.
290                 # It may be the parent of another child in this block
291                 known_objects.append(dn)
292                 known_guids.append(guid)
293             else:
294                 # If we've already set the GET_ANC flag then it should mean
295                 # we receive the parents before the child
296                 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
297
298                 print("Unknown parent for %s - try GET_ANC" % dn)
299
300                 # try the same thing again with the GET_ANC flag set this time
301                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
302                                           assert_links=assert_links)
303
304         # check we know about references to any objects in the linked attritbutes
305         received_links = self._get_ctr6_links(ctr6)
306
307         # This is so that older versions of Samba fail - we want the links to be
308         # sent roughly with the objects, rather than getting all links at the end
309         if assert_links:
310             self.assertTrue(len(received_links) > 0,
311                             "Links were expected in the GetNCChanges response")
312
313         for link in received_links:
314
315             # skip any links that aren't part of the test
316             if self.ou not in link.targetDN:
317                 continue
318
319             # check the source object is known (Windows can actually send links
320             # where we don't know the source object yet). Samba shouldn't ever
321             # hit this case because it gets the links based on the source
322             if link.identifier not in known_guids:
323
324                 # If we've already set the GET_ANC flag then it should mean
325                 # this case doesn't happen
326                 self.assertFalse(get_anc, "Unknown source object for GUID %s"
327                                  % link.identifier)
328
329                 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
330
331                 # try the same thing again with the GET_ANC flag set this time
332                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
333                                           assert_links=assert_links)
334
335             # check we know the target object
336             if link.targetGUID not in known_guids:
337
338                 # If we've already set the GET_TGT flag then we should have
339                 # already received any objects we need to know about
340                 self.assertFalse(get_tgt, "Unknown linked target for object %s"
341                                  % link.targetDN)
342
343                 print("Unknown target for %s - try GET_TGT" % link.targetDN)
344
345                 # try the same thing again with the GET_TGT flag set this time
346                 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
347                                           assert_links=assert_links)
348
349         # store the last successful result so we know what HWM to request next
350         self.last_ctr = ctr6
351
352         # store the objects, GUIDs, and links we received
353         self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
354         self.rxd_links += self._get_ctr6_links(ctr6)
355         self.rxd_guids += self._get_ctr6_object_guids(ctr6)
356
357         return ctr6
358
359     def replication_complete(self):
360         """Returns True if the current/last replication cycle is complete"""
361
362         if self.last_ctr is None or self.last_ctr.more_data:
363             return False
364         else:
365             return True
366
367     def test_repl_integrity_get_anc(self):
368         """
369         Modify the parent objects being replicated while the replication is still
370         in progress (using GET_ANC) and check that no object loss occurs.
371         """
372
373         # Note that GET_ANC behaviour varies between Windows and Samba.
374         # On Samba GET_ANC results in the replication restarting from the very
375         # beginning. After that, Samba remembers GET_ANC and also sends the
376         # parents in subsequent requests (regardless of whether GET_ANC is
377         # specified in the later request).
378         # Windows only sends the parents if GET_ANC was specified in the last
379         # request. It will also resend a parent, even if it's already sent the
380         # parent in a previous response (whereas Samba doesn't).
381
382         # Create a small block of 50 parents, each with 2 children (A and B)
383         # This is so that we receive some children in the first block, so we
384         # can resend with GET_ANC before we learn too many parents
385         parent_dn_list = []
386         expected_dn_list = self.create_object_range(0, 50, prefix="parent",
387                                                     children=("A", "B"),
388                                                     parent_list=parent_dn_list)
389
390         # create the remaining parents and children
391         expected_dn_list += self.create_object_range(50, 150, prefix="parent",
392                                                      children=("A", "B"),
393                                                      parent_list=parent_dn_list)
394
395         # We've now got objects in the following order:
396         # [50 parents][100 children][100 parents][200 children]
397
398         # Modify the first parent so that it's now ordered last by USN
399         # This means we set the GET_ANC flag pretty much straight away
400         # because we receive the first child before the first parent
401         self.modify_object(parent_dn_list[0], "displayName", "OU0")
402
403         # modify a later block of parents so they also get reordered
404         for x in range(50, 100):
405             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
406
407         # Get the first block of objects - this should resend the request with
408         # GET_ANC set because we won't know about the first child's parent.
409         # On samba GET_ANC essentially starts the sync from scratch again, so
410         # we get this over with early before we learn too many parents
411         self.repl_get_next()
412
413         # modify the last chunk of parents. They should now have a USN higher
414         # than the highwater-mark for the replication cycle
415         for x in range(100, 150):
416             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
417
418         # Get the remaining blocks of data - this will resend the request with
419         # GET_ANC if it encounters an object it doesn't have the parent for.
420         while not self.replication_complete():
421             self.repl_get_next()
422
423         # The way the test objects have been created should force
424         # self.repl_get_next() to use the GET_ANC flag. If this doesn't
425         # actually happen, then the test isn't doing its job properly
426         self.assertTrue(self.used_get_anc,
427                         "Test didn't use the GET_ANC flag as expected")
428
429         # Check we get all the objects we're expecting
430         self.assert_expected_data(expected_dn_list)
431
432     def assert_expected_links(self, objects_with_links, link_attr="managedBy",
433                               num_expected=None):
434         """
435         Asserts that a GetNCChanges response contains any expected links
436         for the objects it contains.
437         """
438         received_links = self.rxd_links
439
440         if num_expected is None:
441             num_expected = len(objects_with_links)
442
443         self.assertTrue(len(received_links) == num_expected,
444                         "Received %d links but expected %d"
445                         %(len(received_links), num_expected))
446
447         for dn in objects_with_links:
448             self.assert_object_has_link(dn, link_attr, received_links)
449
450     def assert_object_has_link(self, dn, link_attr, received_links):
451         """
452         Queries the object in the DB and asserts there is a link in the
453         GetNCChanges response that matches.
454         """
455
456         # Look up the link attribute in the DB
457         # The extended_dn option will dump the GUID info for the link
458         # attribute (as a hex blob)
459         res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr],
460                                       controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE)
461
462         # We didn't find the expected link attribute in the DB for the object.
463         # Something has gone wrong somewhere...
464         self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s"
465                         %(dn, link_attr))
466
467         # find the received link in the list and assert that the target and
468         # source GUIDs match what's in the DB
469         for val in res[0][link_attr]:
470             # Work out the expected source and target GUIDs for the DB link
471             target_dn = ldb.Dn(self.test_ldb_dc, val)
472             targetGUID_blob = target_dn.get_extended_component("GUID")
473             sourceGUID_blob = res[0].dn.get_extended_component("GUID")
474
475             found = False
476
477             for link in received_links:
478                 if link.selfGUID_blob == sourceGUID_blob and \
479                    link.targetGUID_blob == targetGUID_blob:
480
481                     found = True
482
483                     if self._debug:
484                         print("Link %s --> %s" %(dn[:25], link.targetDN[:25]))
485                     break
486
487             self.assertTrue(found, "Did not receive expected link for DN %s" % dn)
488
489     def test_repl_get_tgt(self):
490         """
491         Creates a scenario where we should receive the linked attribute before
492         we know about the target object, and therefore need to use GET_TGT.
493         Note: Samba currently avoids this problem by sending all its links last
494         """
495
496         # create the test objects
497         reportees = self.create_object_range(0, 100, prefix="reportee")
498         managers = self.create_object_range(0, 100, prefix="manager")
499         all_objects = managers + reportees
500         expected_links = reportees
501
502         # add a link attribute to each reportee object that points to the
503         # corresponding manager object as the target
504         for i in range(0, 100):
505             self.modify_object(reportees[i], "managedBy", managers[i])
506
507         # touch the managers (the link-target objects) again to make sure the
508         # reportees (link source objects) get returned first by the replication
509         for i in range(0, 100):
510             self.modify_object(managers[i], "displayName", "OU%d" % i)
511
512         links_expected = True
513
514         # Get all the replication data - this code should resend the requests
515         # with GET_TGT
516         while not self.replication_complete():
517
518             # get the next block of replication data (this sets GET_TGT if needed)
519             self.repl_get_next(assert_links=links_expected)
520             links_expected = len(self.rxd_links) < len(expected_links)
521
522         # The way the test objects have been created should force
523         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
524         # actually happen, then the test isn't doing its job properly
525         self.assertTrue(self.used_get_tgt,
526                         "Test didn't use the GET_TGT flag as expected")
527
528         # Check we get all the objects we're expecting
529         self.assert_expected_data(all_objects)
530
531         # Check we received links for all the reportees
532         self.assert_expected_links(expected_links)
533
534     def test_repl_get_tgt_chain(self):
535         """
536         Tests the behaviour of GET_TGT with a more complicated scenario.
537         Here we create a chain of objects linked together, so if we follow
538         the link target, then we'd traverse ~200 objects each time.
539         """
540
541         # create the test objects
542         objectsA = self.create_object_range(0, 100, prefix="AAA")
543         objectsB = self.create_object_range(0, 100, prefix="BBB")
544         objectsC = self.create_object_range(0, 100, prefix="CCC")
545
546         # create a complex set of object links:
547         #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
548         # Basically each object-A should link to a circular chain of 200 B/C
549         # objects. We create the links in separate chunks here, as it makes it
550         # clearer what happens with the USN (links on Windows have their own
551         # USN, so this approach means the A->B/B->C links aren't interleaved)
552         for i in range(0, 100):
553             self.modify_object(objectsA[i], "managedBy", objectsB[i])
554
555         for i in range(0, 100):
556             self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100])
557
558         for i in range(0, 100):
559             self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100])
560
561         all_objects = objectsA + objectsB + objectsC
562         expected_links = all_objects
563
564         # the default order the objects now get returned in should be:
565         # [A0-A99][B0-B99][C0-C99]
566
567         links_expected = True
568
569         # Get all the replication data - this code should resend the requests
570         # with GET_TGT
571         while not self.replication_complete():
572
573             # get the next block of replication data (this sets GET_TGT if needed)
574             self.repl_get_next(assert_links=links_expected)
575             links_expected = len(self.rxd_links) < len(expected_links)
576
577         # The way the test objects have been created should force
578         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
579         # actually happen, then the test isn't doing its job properly
580         self.assertTrue(self.used_get_tgt,
581                         "Test didn't use the GET_TGT flag as expected")
582
583         # Check we get all the objects we're expecting
584         self.assert_expected_data(all_objects)
585
586         # Check we received links for all the reportees
587         self.assert_expected_links(expected_links)
588
589     def test_repl_integrity_link_attr(self):
590         """
591         Tests adding links to new objects while a replication is in progress.
592         """
593
594         # create some source objects for the linked attributes, sandwiched
595         # between 2 blocks of filler objects
596         filler = self.create_object_range(0, 100, prefix="filler")
597         reportees = self.create_object_range(0, 100, prefix="reportee")
598         filler += self.create_object_range(100, 200, prefix="filler")
599
600         # Start the replication and get the first block of filler objects
601         # (We're being mean here and setting the GET_TGT flag right from the
602         # start. On earlier Samba versions, if the client encountered an
603         # unknown target object and retried with GET_TGT, it would restart the
604         # replication cycle from scratch, which avoids the problem).
605         self.repl_get_next(get_tgt=True)
606
607         # create the target objects and add the links. These objects should be
608         # outside the scope of the Samba replication cycle, but the links should
609         # still get sent with the source object
610         managers = self.create_object_range(0, 100, prefix="manager")
611
612         for i in range(0, 100):
613             self.modify_object(reportees[i], "managedBy", managers[i])
614
615         expected_objects = managers + reportees + filler
616         expected_links = reportees
617
618         # complete the replication
619         while not self.replication_complete():
620             self.repl_get_next(get_tgt=True)
621
622         # If we didn't receive the most recently created objects in the last
623         # replication cycle, then kick off another replication to get them
624         if len(self.rxd_dn_list) < len(expected_objects):
625             self.repl_get_next()
626
627             while not self.replication_complete():
628                 self.repl_get_next()
629
630         # Check we get all the objects we're expecting
631         self.assert_expected_data(expected_objects)
632
633         # Check we received links for all the parents
634         self.assert_expected_links(expected_links)
635
636     def test_repl_get_anc_link_attr(self):
637         """
638         A basic GET_ANC test where the parents have linked attributes
639         """
640
641         # Create a block of 100 parents and 100 children
642         parent_dn_list = []
643         expected_dn_list = self.create_object_range(0, 100, prefix="parent",
644                                                     children=("A"),
645                                                     parent_list=parent_dn_list)
646
647         # Add links from the parents to the children
648         for x in range(0, 100):
649             self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100])
650
651         # add some filler objects at the end. This allows us to easily see
652         # which chunk the links get sent in
653         expected_dn_list += self.create_object_range(0, 100, prefix="filler")
654
655         # We've now got objects in the following order:
656         # [100 x children][100 x parents][100 x filler]
657
658         # Get the replication data - because the block of children come first,
659         # this should retry the request with GET_ANC
660         while not self.replication_complete():
661             self.repl_get_next()
662
663         self.assertTrue(self.used_get_anc,
664                         "Test didn't use the GET_ANC flag as expected")
665
666         # Check we get all the objects we're expecting
667         self.assert_expected_data(expected_dn_list)
668
669         # Check we received links for all the parents
670         self.assert_expected_links(parent_dn_list)
671
672     def test_repl_get_tgt_and_anc(self):
673         """
674         Check we can resolve an unknown ancestor when fetching the link target,
675         i.e. tests using GET_TGT and GET_ANC in combination
676         """
677
678         # Create some parent/child objects (the child will be the link target)
679         parents = []
680         all_objects = self.create_object_range(0, 100, prefix="parent",
681                                                children=["la_tgt"],
682                                                parent_list=parents)
683
684         children = [item for item in all_objects if item not in parents]
685
686         # create the link source objects and link them to the child/target
687         la_sources = self.create_object_range(0, 100, prefix="la_src")
688         all_objects += la_sources
689
690         for i in range(0, 100):
691             self.modify_object(la_sources[i], "managedBy", children[i])
692
693         expected_links = la_sources
694
695         # modify the children/targets so they come after the link source
696         for x in range(0, 100):
697             self.modify_object(children[x], "displayName", "OU%d" % x)
698
699         # modify the parents, so they now come last in the replication
700         for x in range(0, 100):
701             self.modify_object(parents[x], "displayName", "OU%d" % x)
702
703         # We've now got objects in the following order:
704         # [100 la_source][100 la_target][100 parents (of la_target)]
705
706         links_expected = True
707
708         # Get all the replication data - this code should resend the requests
709         # with GET_TGT and GET_ANC
710         while not self.replication_complete():
711
712             # get the next block of replication data (this sets GET_TGT/GET_ANC)
713             self.repl_get_next(assert_links=links_expected)
714             links_expected = len(self.rxd_links) < len(expected_links)
715
716         # The way the test objects have been created should force
717         # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
718         # doesn't actually happen, then the test isn't doing its job properly
719         self.assertTrue(self.used_get_tgt,
720                         "Test didn't use the GET_TGT flag as expected")
721         self.assertTrue(self.used_get_anc,
722                         "Test didn't use the GET_ANC flag as expected")
723
724         # Check we get all the objects we're expecting
725         self.assert_expected_data(all_objects)
726
727         # Check we received links for all the link sources
728         self.assert_expected_links(expected_links)
729
730         # Second part of test. Add some extra objects and kick off another
731         # replication. The test code will use the HWM from the last replication
732         # so we'll only receive the objects we modify below
733         self.start_new_repl_cycle()
734
735         # add an extra level of grandchildren that hang off a child
736         # that got created last time
737         new_parent = "OU=test_new_parent,%s" % children[0]
738         self.add_object(new_parent)
739         new_children = []
740
741         for x in range(0, 50):
742             dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
743             self.add_object(dn)
744             new_children.append(dn)
745
746         # replace half of the links to point to the new children
747         for x in range(0, 50):
748             self.delete_attribute(la_sources[x], "managedBy", children[x])
749             self.modify_object(la_sources[x], "managedBy", new_children[x])
750
751         # add some filler objects to fill up the 1st chunk
752         filler = self.create_object_range(0, 100, prefix="filler")
753
754         # modify the new children/targets so they come after the link source
755         for x in range(0, 50):
756             self.modify_object(new_children[x], "displayName", "OU-%d" % x)
757
758         # modify the parent, so it now comes last in the replication
759         self.modify_object(new_parent, "displayName", "OU%d" % x)
760
761         # We should now get the modified objects in the following order:
762         # [50 links (x 2)][100 filler][50 new children][new parent]
763         # Note that the link sources aren't actually sent (their new linked
764         # attributes are sent, but apart from that, nothing has changed)
765         all_objects = filler + new_children + [new_parent]
766         expected_links = la_sources[:50]
767
768         links_expected = True
769
770         while not self.replication_complete():
771             self.repl_get_next(assert_links=links_expected)
772             links_expected = len(self.rxd_links) < len(expected_links)
773
774         self.assertTrue(self.used_get_tgt,
775                         "Test didn't use the GET_TGT flag as expected")
776         self.assertTrue(self.used_get_anc,
777                         "Test didn't use the GET_ANC flag as expected")
778
779         # Check we get all the objects we're expecting
780         self.assert_expected_data(all_objects)
781
782         # Check we received links (50 deleted links and 50 new)
783         self.assert_expected_links(expected_links, num_expected=100)
784
785     def _repl_integrity_obj_deletion(self, delete_link_source=True):
786         """
787         Tests deleting link objects while a replication is in progress.
788         """
789
790         # create some objects and link them together, with some filler
791         # object in between the link sources
792         la_sources = self.create_object_range(0, 100, prefix="la_source")
793         la_targets = self.create_object_range(0, 100, prefix="la_targets")
794
795         for i in range(0, 50):
796             self.modify_object(la_sources[i], "managedBy", la_targets[i])
797
798         filler = self.create_object_range(0, 100, prefix="filler")
799
800         for i in range(50, 100):
801             self.modify_object(la_sources[i], "managedBy", la_targets[i])
802
803         # touch the targets so that the sources get replicated first
804         for i in range(0, 100):
805             self.modify_object(la_targets[i], "displayName", "OU%d" % i)
806
807         # objects should now be in the following USN order:
808         # [50 la_source][100 filler][50 la_source][100 la_target]
809
810         # Get the first block containing 50 link sources
811         self.repl_get_next()
812
813         # delete either the link targets or link source objects
814         if delete_link_source:
815             objects_to_delete = la_sources
816             # in GET_TGT testenvs we only receive the first 50 source objects
817             expected_objects = la_sources[:50] + la_targets + filler
818         else:
819             objects_to_delete = la_targets
820             expected_objects = la_sources + filler
821
822         for obj in objects_to_delete:
823             self.ldb_dc2.delete(obj)
824
825         # complete the replication
826         while not self.replication_complete():
827             self.repl_get_next()
828
829         # Check we get all the objects we're expecting
830         self.assert_expected_data(expected_objects)
831
832         # we can't use assert_expected_links() here because it tries to check
833         # against the deleted objects on the DC. (Although we receive some
834         # links from the first block processed, the Samba client should end up
835         # deleting these, as the source/target object involved is deleted)
836         self.assertTrue(len(self.rxd_links) == 50,
837                         "Expected 50 links, not %d" % len(self.rxd_links))
838
839     def test_repl_integrity_src_obj_deletion(self):
840         self._repl_integrity_obj_deletion(delete_link_source=True)
841
842     def test_repl_integrity_tgt_obj_deletion(self):
843         self._repl_integrity_obj_deletion(delete_link_source=False)
844
845     def restore_deleted_object(self, guid, new_dn):
846         """Re-animates a deleted object"""
847
848         res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"],
849                                   controls=['show_deleted:1'], scope=ldb.SCOPE_BASE)
850         if len(res) != 1:
851             return
852
853         msg = ldb.Message()
854         msg.dn = res[0].dn
855         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
856         msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
857         self.test_ldb_dc.modify(msg, ["show_deleted:1"])
858
859     def sync_DCs(self, nc_dn=None):
860         # make sure DC1 has all the changes we've made to DC2
861         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn)
862
863     def get_object_guid(self, dn):
864         res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
865         return res[0]['objectGUID'][0]
866
867
868     def set_dc_connection(self, conn):
869         """
870         Switches over the connection state info that the underlying drs_base
871         class uses so that we replicate with a different DC.
872         """
873         self.default_hwm = conn.default_hwm
874         self.default_utdv = conn.default_utdv
875         self.drs = conn.drs
876         self.drs_handle = conn.drs_handle
877         self.set_test_ldb_dc(conn.ldb_dc)
878
879     def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
880                                              expected_links):
881         """
882         Replicates against both the primary and secondary DCs in the testenv
883         and checks that both return the expected results.
884         """
885         print("Checking replication against primary test DC...")
886
887         # get the replication data from the test DC first
888         while not self.replication_complete():
889             self.repl_get_next()
890
891         # Check we get all the objects and links we're expecting
892         self.assert_expected_data(all_objects)
893         self.assert_expected_links(expected_links)
894
895         # switch over the DC state info so we now talk to the peer DC
896         self.set_dc_connection(peer_conn)
897         self.init_test_state()
898
899         print("Checking replication against secondary test DC...")
900
901         # check that we get the same information from the 2nd DC
902         while not self.replication_complete():
903             self.repl_get_next()
904
905         self.assert_expected_data(all_objects)
906         self.assert_expected_links(expected_links)
907
908         # switch back to using the default connection
909         self.set_dc_connection(self.default_conn)
910
911     def test_repl_integrity_obj_reanimation(self):
912         """
913         Checks receiving links for a re-animated object doesn't lose links.
914         We test this against the peer DC to make sure it doesn't drop links.
915         """
916
917         # This test is a little different in that we're particularly interested
918         # in exercising the replmd client code on the second DC.
919         # First, make sure the peer DC has the base OU, then connect to it (so
920         # we store its inital HWM)
921         self.sync_DCs()
922         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
923
924         # create the link source/target objects
925         la_sources = self.create_object_range(0, 100, prefix="la_src")
926         la_targets = self.create_object_range(0, 100, prefix="la_tgt")
927
928         # store the target object's GUIDs (we need to know these to reanimate them)
929         target_guids = []
930
931         for dn in la_targets:
932             target_guids.append(self.get_object_guid(dn))
933
934         # delete the link target
935         for x in range(0, 100):
936             self.ldb_dc2.delete(la_targets[x])
937
938         # sync the DCs, then disable replication. We want the peer DC to get
939         # all the following changes in a single replication cycle
940         self.sync_DCs()
941         self._disable_all_repl(self.dnsname_dc2)
942
943         # restore the target objects for the linked attributes again
944         for x in range(0, 100):
945             self.restore_deleted_object(target_guids[x], la_targets[x])
946
947         # add the links
948         for x in range(0, 100):
949             self.modify_object(la_sources[x], "managedBy", la_targets[x])
950
951         # create some additional filler objects
952         filler = self.create_object_range(0, 100, prefix="filler")
953
954         # modify the targets so they now come last
955         for x in range(0, 100):
956             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
957
958         # the objects should now be sent in the following order:
959         # [la sources + links][filler][la targets]
960         all_objects = la_sources + la_targets + filler
961         expected_links = la_sources
962
963         # Enable replication again make sure the 2 DCs are back in sync
964         self._enable_all_repl(self.dnsname_dc2)
965         self.sync_DCs()
966
967         # Get the replication data from each DC in turn.
968         # Check that both give us all the objects and links we're expecting,
969         # i.e. no links were lost
970         self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
971                                                   expected_links)
972
973     def test_repl_integrity_cross_partition_links(self):
974         """
975         Checks that a cross-partition link to an unknown target object does
976         not result in missing links.
977         """
978
979         # check the peer DC is up-to-date, then connect (storing its HWM)
980         self.sync_DCs()
981         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
982
983         # stop replication so the peer gets the following objects in one go
984         self._disable_all_repl(self.dnsname_dc2)
985
986         # create a link source object in the main NC
987         la_source = "OU=cross_nc_src,%s" % self.ou
988         self.add_object(la_source)
989
990         # create the link target (a server object) in the config NC
991         rand = random.randint(1, 10000000)
992         la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \
993                     "CN=Sites,%s" %(rand, self.config_dn)
994         self.add_object(la_target, objectclass="server")
995
996         # add a cross-partition link between the two
997         self.modify_object(la_source, "managedBy", la_target)
998
999         # First, sync across to the peer the NC containing the link source object
1000         self.sync_DCs()
1001
1002         # Now, before the peer has received the partition containing the target
1003         # object, try replicating from the peer. It will only know about half
1004         # of the link at this point, but it should be a valid scenario
1005         self.set_dc_connection(peer_conn)
1006
1007         while not self.replication_complete():
1008             # pretend we've received other link targets out of order and that's
1009             # forced us to use GET_TGT. This checks the peer doesn't fail trying
1010             # to fetch a cross-partition target object that doesn't exist
1011             self.repl_get_next(get_tgt=True)
1012
1013         self.set_dc_connection(self.default_conn)
1014         self.init_test_state()
1015
1016         # Now sync across the partition containing the link target object
1017         self.sync_DCs(nc_dn=self.config_dn)
1018         self._enable_all_repl(self.dnsname_dc2)
1019
1020         # Get the replication data from each DC in turn.
1021         # Check that both return the cross-partition link (note we're not
1022         # checking the config domain NC here for simplicity)
1023         self.assert_DCs_replication_is_consistent(peer_conn,
1024                                                   all_objects=[la_source],
1025                                                   expected_links=[la_source])
1026
1027         # the cross-partition linked attribute has a missing backlink. Check
1028         # that we can still delete it successfully
1029         self.delete_attribute(la_source, "managedBy", la_target)
1030         self.sync_DCs()
1031
1032         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1033                                       attrs=["managedBy"],
1034                                       controls=['extended_dn:1:0'],
1035                                       scope=ldb.SCOPE_BASE)
1036         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1037                          % la_source)
1038         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1039                                       attrs=["managedBy"],
1040                                       controls=['extended_dn:1:0'],
1041                                       scope=ldb.SCOPE_BASE)
1042         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1043                          % la_source)
1044
1045         # cleanup the server object we created in the Configuration partition
1046         self.test_ldb_dc.delete(la_source)
1047
1048 class DcConnection:
1049     """Helper class to track a connection to another DC"""
1050
1051     def __init__(self, drs_base, ldb_dc, dnsname_dc):
1052         self.ldb_dc = ldb_dc
1053         (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1054         (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1055
1056