s4/torture/drs: convert print func to be py2/py3 compatible
[metze/samba/wip.git] / source4 / torture / drs / python / getncchanges.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # Tests various schema replication scenarios
5 #
6 # Copyright (C) Catalyst.Net Ltd. 2017
7 #
8 # This program is free software; you can redistribute it and/or modify
9 # it under the terms of the GNU General Public License as published by
10 # the Free Software Foundation; either version 3 of the License, or
11 # (at your option) any later version.
12 #
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 # GNU General Public License for more details.
17 #
18 # You should have received a copy of the GNU General Public License
19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 #
21
22 #
23 # Usage:
24 #  export DC1=dc1_dns_name
25 #  export DC2=dc2_dns_name
26 #  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
27 #  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
28 #
29
30 from __future__ import print_function
31 import drs_base
32 import samba.tests
33 import ldb
34 from ldb import SCOPE_BASE
35 import random
36
37 from samba.dcerpc import drsuapi
38
39 class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
40     def setUp(self):
41         super(DrsReplicaSyncIntegrityTestCase, self).setUp()
42
43         self.init_test_state()
44
45         # Note that DC2 is the DC with the testenv-specific quirks (e.g. it's
46         # the vampire_dc), so we point this test directly at that DC
47         self.set_test_ldb_dc(self.ldb_dc2)
48
49         # add some randomness to the test OU. (Deletion of the last test's
50         # objects can be slow to replicate out. So the OU created by a previous
51         # testenv may still exist at this point).
52         rand = random.randint(1, 10000000)
53         self.base_dn = self.test_ldb_dc.get_default_basedn()
54         self.ou = "OU=getncchanges%d_test,%s" %(rand, self.base_dn)
55         self.test_ldb_dc.add({
56             "dn": self.ou,
57             "objectclass": "organizationalUnit"})
58
59         self.default_conn = DcConnection(self, self.ldb_dc2, self.dnsname_dc2)
60         self.set_dc_connection(self.default_conn)
61
62     def tearDown(self):
63         super(DrsReplicaSyncIntegrityTestCase, self).tearDown()
64         # tidyup groups and users
65         try:
66             self.ldb_dc2.delete(self.ou, ["tree_delete:1"])
67         except ldb.LdbError as e:
68             (enum, string) = e.args
69             if enum == ldb.ERR_NO_SUCH_OBJECT:
70                 pass
71
72     def init_test_state(self):
73         self.rxd_dn_list = []
74         self.rxd_links = []
75         self.rxd_guids = []
76         self.last_ctr = None
77
78         # 100 is the minimum max_objects that Microsoft seems to honour
79         # (the max honoured is 400ish), so we use that in these tests
80         self.max_objects = 100
81
82         # store whether we used GET_TGT/GET_ANC flags in the requests
83         self.used_get_tgt = False
84         self.used_get_anc = False
85
86     def add_object(self, dn, objectclass="organizationalunit"):
87         """Adds an OU object"""
88         self.test_ldb_dc.add({"dn": dn, "objectclass": objectclass})
89         res = self.test_ldb_dc.search(base=dn, scope=SCOPE_BASE)
90         self.assertEquals(len(res), 1)
91
92     def modify_object(self, dn, attr, value):
93         """Modifies an object's USN by adding an attribute value to it"""
94         m = ldb.Message()
95         m.dn = ldb.Dn(self.test_ldb_dc, dn)
96         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_ADD, attr)
97         self.test_ldb_dc.modify(m)
98
99     def delete_attribute(self, dn, attr, value):
100         """Deletes an attribute from an object"""
101         m = ldb.Message()
102         m.dn = ldb.Dn(self.test_ldb_dc, dn)
103         m[attr] = ldb.MessageElement(value, ldb.FLAG_MOD_DELETE, attr)
104         self.test_ldb_dc.modify(m)
105
106     def start_new_repl_cycle(self):
107         """Resets enough state info to start a new replication cycle"""
108         # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
109         # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve
110         self.rxd_links = []
111
112         self.used_get_tgt = False
113         self.used_get_anc = False
114         # mostly preserve self.last_ctr, so that we use the last HWM
115         if self.last_ctr is not None:
116             self.last_ctr.more_data = True
117
118     def create_object_range(self, start, end, prefix="",
119                             children=None, parent_list=None):
120         """
121         Creates a block of objects. Object names are numbered sequentially,
122         using the optional prefix supplied. If the children parameter is
123         supplied it will create a parent-child hierarchy and return the
124         top-level parents separately.
125         """
126         dn_list = []
127
128         # Use dummy/empty lists if we're not creating a parent/child hierarchy
129         if children is None:
130             children = []
131
132         if parent_list is None:
133             parent_list = []
134
135         # Create the parents first, then the children.
136         # This makes it easier to see in debug when GET_ANC takes effect
137         # because the parent/children become interleaved (by default,
138         # this approach means the objects are organized into blocks of
139         # parents and blocks of children together)
140         for x in range(start, end):
141             ou = "OU=test_ou_%s%d,%s" % (prefix, x, self.ou)
142             self.add_object(ou)
143             dn_list.append(ou)
144
145             # keep track of the top-level parents (if needed)
146             parent_list.append(ou)
147
148         # create the block of children (if needed)
149         for x in range(start, end):
150             for child in children:
151                 ou = "OU=test_ou_child%s%d,%s" % (child, x, parent_list[x])
152                 self.add_object(ou)
153                 dn_list.append(ou)
154
155         return dn_list
156
157     def assert_expected_data(self, expected_list):
158         """
159         Asserts that we received all the DNs that we expected and
160         none are missing.
161         """
162         received_list = self.rxd_dn_list
163
164         # Note that with GET_ANC Windows can end up sending the same parent
165         # object multiple times, so this might be noteworthy but doesn't
166         # warrant failing the test
167         if (len(received_list) != len(expected_list)):
168             print("Note: received %d objects but expected %d" %(len(received_list),
169                                                                 len(expected_list)))
170
171         # Check that we received every object that we were expecting
172         for dn in expected_list:
173             self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn)
174
175     def test_repl_integrity(self):
176         """
177         Modify the objects being replicated while the replication is still
178         in progress and check that no object loss occurs.
179         """
180
181         # The server behaviour differs between samba and Windows. Samba returns
182         # the objects in the original order (up to the pre-modify HWM). Windows
183         # incorporates the modified objects and returns them in the new order
184         # (i.e. modified objects last), up to the post-modify HWM. The Microsoft
185         # docs state the Windows behaviour is optional.
186
187         # Create a range of objects to replicate.
188         expected_dn_list = self.create_object_range(0, 400)
189         (orig_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
190
191         # We ask for the first page of 100 objects.
192         # For this test, we don't care what order we receive the objects in,
193         # so long as by the end we've received everything
194         self.repl_get_next()
195
196         # Modify some of the second page of objects. This should bump the highwatermark
197         for x in range(100, 200):
198             self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
199
200         (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc)
201         self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
202
203         # Get the remaining blocks of data
204         while not self.replication_complete():
205             self.repl_get_next()
206
207         # Check we still receive all the objects we're expecting
208         self.assert_expected_data(expected_dn_list)
209
210     def is_parent_known(self, dn, known_dn_list):
211         """
212         Returns True if the parent of the dn specified is in known_dn_list
213         """
214
215         # we can sometimes get system objects like the RID Manager returned.
216         # Ignore anything that is not under the test OU we created
217         if self.ou not in dn:
218             return True
219
220         # Remove the child portion from the name to get the parent's DN
221         name_substrings = dn.split(",")
222         del name_substrings[0]
223
224         parent_dn = ",".join(name_substrings)
225
226         # check either this object is a parent (it's parent is the top-level
227         # test object), or its parent has been seen previously
228         return parent_dn == self.ou or parent_dn in known_dn_list
229
230     def _repl_send_request(self, get_anc=False, get_tgt=False):
231         """Sends a GetNCChanges request for the next block of replication data."""
232
233         # we're just trying to mimic regular client behaviour here, so just
234         # use the highwatermark in the last response we received
235         if self.last_ctr:
236             highwatermark = self.last_ctr.new_highwatermark
237             uptodateness_vector = self.last_ctr.uptodateness_vector
238         else:
239             # this is the first replication chunk
240             highwatermark = None
241             uptodateness_vector = None
242
243         # Ask for the next block of replication data
244         replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP
245         more_flags = 0
246
247         if get_anc:
248             replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC
249             self.used_get_anc = True
250
251         if get_tgt:
252             more_flags = drsuapi.DRSUAPI_DRS_GET_TGT
253             self.used_get_tgt = True
254
255         # return the response from the DC
256         return self._get_replication(replica_flags,
257                                      max_objects=self.max_objects,
258                                      highwatermark=highwatermark,
259                                      uptodateness_vector=uptodateness_vector,
260                                      more_flags=more_flags)
261
262     def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
263         """
264         Requests the next block of replication data. This tries to simulate
265         client behaviour - if we receive a replicated object that we don't know
266         the parent of, then re-request the block with the GET_ANC flag set.
267         If we don't know the target object for a linked attribute, then
268         re-request with GET_TGT.
269         """
270
271         # send a request to the DC and get the response
272         ctr6 = self._repl_send_request(get_anc=get_anc, get_tgt=get_tgt)
273
274         # extract the object DNs and their GUIDs from the response
275         rxd_dn_list = self._get_ctr6_dn_list(ctr6)
276         rxd_guid_list = self._get_ctr6_object_guids(ctr6)
277
278         # we'll add new objects as we discover them, so take a copy of the
279         # ones we already know about, so we can modify these lists safely
280         known_objects = self.rxd_dn_list[:]
281         known_guids = self.rxd_guids[:]
282
283         # check that we know the parent for every object received
284         for i in range(0, len(rxd_dn_list)):
285
286             dn = rxd_dn_list[i]
287             guid = rxd_guid_list[i]
288
289             if self.is_parent_known(dn, known_objects):
290
291                 # the new DN is now known so add it to the list.
292                 # It may be the parent of another child in this block
293                 known_objects.append(dn)
294                 known_guids.append(guid)
295             else:
296                 # If we've already set the GET_ANC flag then it should mean
297                 # we receive the parents before the child
298                 self.assertFalse(get_anc, "Unknown parent for object %s" % dn)
299
300                 print("Unknown parent for %s - try GET_ANC" % dn)
301
302                 # try the same thing again with the GET_ANC flag set this time
303                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
304                                           assert_links=assert_links)
305
306         # check we know about references to any objects in the linked attritbutes
307         received_links = self._get_ctr6_links(ctr6)
308
309         # This is so that older versions of Samba fail - we want the links to be
310         # sent roughly with the objects, rather than getting all links at the end
311         if assert_links:
312             self.assertTrue(len(received_links) > 0,
313                             "Links were expected in the GetNCChanges response")
314
315         for link in received_links:
316
317             # skip any links that aren't part of the test
318             if self.ou not in link.targetDN:
319                 continue
320
321             # check the source object is known (Windows can actually send links
322             # where we don't know the source object yet). Samba shouldn't ever
323             # hit this case because it gets the links based on the source
324             if link.identifier not in known_guids:
325
326                 # If we've already set the GET_ANC flag then it should mean
327                 # this case doesn't happen
328                 self.assertFalse(get_anc, "Unknown source object for GUID %s"
329                                  % link.identifier)
330
331                 print("Unknown source GUID %s - try GET_ANC" % link.identifier)
332
333                 # try the same thing again with the GET_ANC flag set this time
334                 return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
335                                           assert_links=assert_links)
336
337             # check we know the target object
338             if link.targetGUID not in known_guids:
339
340                 # If we've already set the GET_TGT flag then we should have
341                 # already received any objects we need to know about
342                 self.assertFalse(get_tgt, "Unknown linked target for object %s"
343                                  % link.targetDN)
344
345                 print("Unknown target for %s - try GET_TGT" % link.targetDN)
346
347                 # try the same thing again with the GET_TGT flag set this time
348                 return self.repl_get_next(get_anc=get_anc, get_tgt=True,
349                                           assert_links=assert_links)
350
351         # store the last successful result so we know what HWM to request next
352         self.last_ctr = ctr6
353
354         # store the objects, GUIDs, and links we received
355         self.rxd_dn_list += self._get_ctr6_dn_list(ctr6)
356         self.rxd_links += self._get_ctr6_links(ctr6)
357         self.rxd_guids += self._get_ctr6_object_guids(ctr6)
358
359         return ctr6
360
361     def replication_complete(self):
362         """Returns True if the current/last replication cycle is complete"""
363
364         if self.last_ctr is None or self.last_ctr.more_data:
365             return False
366         else:
367             return True
368
369     def test_repl_integrity_get_anc(self):
370         """
371         Modify the parent objects being replicated while the replication is still
372         in progress (using GET_ANC) and check that no object loss occurs.
373         """
374
375         # Note that GET_ANC behaviour varies between Windows and Samba.
376         # On Samba GET_ANC results in the replication restarting from the very
377         # beginning. After that, Samba remembers GET_ANC and also sends the
378         # parents in subsequent requests (regardless of whether GET_ANC is
379         # specified in the later request).
380         # Windows only sends the parents if GET_ANC was specified in the last
381         # request. It will also resend a parent, even if it's already sent the
382         # parent in a previous response (whereas Samba doesn't).
383
384         # Create a small block of 50 parents, each with 2 children (A and B)
385         # This is so that we receive some children in the first block, so we
386         # can resend with GET_ANC before we learn too many parents
387         parent_dn_list = []
388         expected_dn_list = self.create_object_range(0, 50, prefix="parent",
389                                                     children=("A", "B"),
390                                                     parent_list=parent_dn_list)
391
392         # create the remaining parents and children
393         expected_dn_list += self.create_object_range(50, 150, prefix="parent",
394                                                      children=("A", "B"),
395                                                      parent_list=parent_dn_list)
396
397         # We've now got objects in the following order:
398         # [50 parents][100 children][100 parents][200 children]
399
400         # Modify the first parent so that it's now ordered last by USN
401         # This means we set the GET_ANC flag pretty much straight away
402         # because we receive the first child before the first parent
403         self.modify_object(parent_dn_list[0], "displayName", "OU0")
404
405         # modify a later block of parents so they also get reordered
406         for x in range(50, 100):
407             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
408
409         # Get the first block of objects - this should resend the request with
410         # GET_ANC set because we won't know about the first child's parent.
411         # On samba GET_ANC essentially starts the sync from scratch again, so
412         # we get this over with early before we learn too many parents
413         self.repl_get_next()
414
415         # modify the last chunk of parents. They should now have a USN higher
416         # than the highwater-mark for the replication cycle
417         for x in range(100, 150):
418             self.modify_object(parent_dn_list[x], "displayName", "OU%d" % x)
419
420         # Get the remaining blocks of data - this will resend the request with
421         # GET_ANC if it encounters an object it doesn't have the parent for.
422         while not self.replication_complete():
423             self.repl_get_next()
424
425         # The way the test objects have been created should force
426         # self.repl_get_next() to use the GET_ANC flag. If this doesn't
427         # actually happen, then the test isn't doing its job properly
428         self.assertTrue(self.used_get_anc,
429                         "Test didn't use the GET_ANC flag as expected")
430
431         # Check we get all the objects we're expecting
432         self.assert_expected_data(expected_dn_list)
433
434     def assert_expected_links(self, objects_with_links, link_attr="managedBy",
435                               num_expected=None):
436         """
437         Asserts that a GetNCChanges response contains any expected links
438         for the objects it contains.
439         """
440         received_links = self.rxd_links
441
442         if num_expected is None:
443             num_expected = len(objects_with_links)
444
445         self.assertTrue(len(received_links) == num_expected,
446                         "Received %d links but expected %d"
447                         %(len(received_links), num_expected))
448
449         for dn in objects_with_links:
450             self.assert_object_has_link(dn, link_attr, received_links)
451
452     def assert_object_has_link(self, dn, link_attr, received_links):
453         """
454         Queries the object in the DB and asserts there is a link in the
455         GetNCChanges response that matches.
456         """
457
458         # Look up the link attribute in the DB
459         # The extended_dn option will dump the GUID info for the link
460         # attribute (as a hex blob)
461         res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr],
462                                       controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE)
463
464         # We didn't find the expected link attribute in the DB for the object.
465         # Something has gone wrong somewhere...
466         self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s"
467                         %(dn, link_attr))
468
469         # find the received link in the list and assert that the target and
470         # source GUIDs match what's in the DB
471         for val in res[0][link_attr]:
472             # Work out the expected source and target GUIDs for the DB link
473             target_dn = ldb.Dn(self.test_ldb_dc, val)
474             targetGUID_blob = target_dn.get_extended_component("GUID")
475             sourceGUID_blob = res[0].dn.get_extended_component("GUID")
476
477             found = False
478
479             for link in received_links:
480                 if link.selfGUID_blob == sourceGUID_blob and \
481                    link.targetGUID_blob == targetGUID_blob:
482
483                     found = True
484
485                     if self._debug:
486                         print("Link %s --> %s" %(dn[:25], link.targetDN[:25]))
487                     break
488
489             self.assertTrue(found, "Did not receive expected link for DN %s" % dn)
490
491     def test_repl_get_tgt(self):
492         """
493         Creates a scenario where we should receive the linked attribute before
494         we know about the target object, and therefore need to use GET_TGT.
495         Note: Samba currently avoids this problem by sending all its links last
496         """
497
498         # create the test objects
499         reportees = self.create_object_range(0, 100, prefix="reportee")
500         managers = self.create_object_range(0, 100, prefix="manager")
501         all_objects = managers + reportees
502         expected_links = reportees
503
504         # add a link attribute to each reportee object that points to the
505         # corresponding manager object as the target
506         for i in range(0, 100):
507             self.modify_object(reportees[i], "managedBy", managers[i])
508
509         # touch the managers (the link-target objects) again to make sure the
510         # reportees (link source objects) get returned first by the replication
511         for i in range(0, 100):
512             self.modify_object(managers[i], "displayName", "OU%d" % i)
513
514         links_expected = True
515
516         # Get all the replication data - this code should resend the requests
517         # with GET_TGT
518         while not self.replication_complete():
519
520             # get the next block of replication data (this sets GET_TGT if needed)
521             self.repl_get_next(assert_links=links_expected)
522             links_expected = len(self.rxd_links) < len(expected_links)
523
524         # The way the test objects have been created should force
525         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
526         # actually happen, then the test isn't doing its job properly
527         self.assertTrue(self.used_get_tgt,
528                         "Test didn't use the GET_TGT flag as expected")
529
530         # Check we get all the objects we're expecting
531         self.assert_expected_data(all_objects)
532
533         # Check we received links for all the reportees
534         self.assert_expected_links(expected_links)
535
536     def test_repl_get_tgt_chain(self):
537         """
538         Tests the behaviour of GET_TGT with a more complicated scenario.
539         Here we create a chain of objects linked together, so if we follow
540         the link target, then we'd traverse ~200 objects each time.
541         """
542
543         # create the test objects
544         objectsA = self.create_object_range(0, 100, prefix="AAA")
545         objectsB = self.create_object_range(0, 100, prefix="BBB")
546         objectsC = self.create_object_range(0, 100, prefix="CCC")
547
548         # create a complex set of object links:
549         #   A0-->B0-->C1-->B2-->C3-->B4-->and so on...
550         # Basically each object-A should link to a circular chain of 200 B/C
551         # objects. We create the links in separate chunks here, as it makes it
552         # clearer what happens with the USN (links on Windows have their own
553         # USN, so this approach means the A->B/B->C links aren't interleaved)
554         for i in range(0, 100):
555             self.modify_object(objectsA[i], "managedBy", objectsB[i])
556
557         for i in range(0, 100):
558             self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100])
559
560         for i in range(0, 100):
561             self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100])
562
563         all_objects = objectsA + objectsB + objectsC
564         expected_links = all_objects
565
566         # the default order the objects now get returned in should be:
567         # [A0-A99][B0-B99][C0-C99]
568
569         links_expected = True
570
571         # Get all the replication data - this code should resend the requests
572         # with GET_TGT
573         while not self.replication_complete():
574
575             # get the next block of replication data (this sets GET_TGT if needed)
576             self.repl_get_next(assert_links=links_expected)
577             links_expected = len(self.rxd_links) < len(expected_links)
578
579         # The way the test objects have been created should force
580         # self.repl_get_next() to use the GET_TGT flag. If this doesn't
581         # actually happen, then the test isn't doing its job properly
582         self.assertTrue(self.used_get_tgt,
583                         "Test didn't use the GET_TGT flag as expected")
584
585         # Check we get all the objects we're expecting
586         self.assert_expected_data(all_objects)
587
588         # Check we received links for all the reportees
589         self.assert_expected_links(expected_links)
590
591     def test_repl_integrity_link_attr(self):
592         """
593         Tests adding links to new objects while a replication is in progress.
594         """
595
596         # create some source objects for the linked attributes, sandwiched
597         # between 2 blocks of filler objects
598         filler = self.create_object_range(0, 100, prefix="filler")
599         reportees = self.create_object_range(0, 100, prefix="reportee")
600         filler += self.create_object_range(100, 200, prefix="filler")
601
602         # Start the replication and get the first block of filler objects
603         # (We're being mean here and setting the GET_TGT flag right from the
604         # start. On earlier Samba versions, if the client encountered an
605         # unknown target object and retried with GET_TGT, it would restart the
606         # replication cycle from scratch, which avoids the problem).
607         self.repl_get_next(get_tgt=True)
608
609         # create the target objects and add the links. These objects should be
610         # outside the scope of the Samba replication cycle, but the links should
611         # still get sent with the source object
612         managers = self.create_object_range(0, 100, prefix="manager")
613
614         for i in range(0, 100):
615             self.modify_object(reportees[i], "managedBy", managers[i])
616
617         expected_objects = managers + reportees + filler
618         expected_links = reportees
619
620         # complete the replication
621         while not self.replication_complete():
622             self.repl_get_next(get_tgt=True)
623
624         # If we didn't receive the most recently created objects in the last
625         # replication cycle, then kick off another replication to get them
626         if len(self.rxd_dn_list) < len(expected_objects):
627             self.repl_get_next()
628
629             while not self.replication_complete():
630                 self.repl_get_next()
631
632         # Check we get all the objects we're expecting
633         self.assert_expected_data(expected_objects)
634
635         # Check we received links for all the parents
636         self.assert_expected_links(expected_links)
637
638     def test_repl_get_anc_link_attr(self):
639         """
640         A basic GET_ANC test where the parents have linked attributes
641         """
642
643         # Create a block of 100 parents and 100 children
644         parent_dn_list = []
645         expected_dn_list = self.create_object_range(0, 100, prefix="parent",
646                                                     children=("A"),
647                                                     parent_list=parent_dn_list)
648
649         # Add links from the parents to the children
650         for x in range(0, 100):
651             self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100])
652
653         # add some filler objects at the end. This allows us to easily see
654         # which chunk the links get sent in
655         expected_dn_list += self.create_object_range(0, 100, prefix="filler")
656
657         # We've now got objects in the following order:
658         # [100 x children][100 x parents][100 x filler]
659
660         # Get the replication data - because the block of children come first,
661         # this should retry the request with GET_ANC
662         while not self.replication_complete():
663             self.repl_get_next()
664
665         self.assertTrue(self.used_get_anc,
666                         "Test didn't use the GET_ANC flag as expected")
667
668         # Check we get all the objects we're expecting
669         self.assert_expected_data(expected_dn_list)
670
671         # Check we received links for all the parents
672         self.assert_expected_links(parent_dn_list)
673
674     def test_repl_get_tgt_and_anc(self):
675         """
676         Check we can resolve an unknown ancestor when fetching the link target,
677         i.e. tests using GET_TGT and GET_ANC in combination
678         """
679
680         # Create some parent/child objects (the child will be the link target)
681         parents = []
682         all_objects = self.create_object_range(0, 100, prefix="parent",
683                                                children=["la_tgt"],
684                                                parent_list=parents)
685
686         children = [item for item in all_objects if item not in parents]
687
688         # create the link source objects and link them to the child/target
689         la_sources = self.create_object_range(0, 100, prefix="la_src")
690         all_objects += la_sources
691
692         for i in range(0, 100):
693             self.modify_object(la_sources[i], "managedBy", children[i])
694
695         expected_links = la_sources
696
697         # modify the children/targets so they come after the link source
698         for x in range(0, 100):
699             self.modify_object(children[x], "displayName", "OU%d" % x)
700
701         # modify the parents, so they now come last in the replication
702         for x in range(0, 100):
703             self.modify_object(parents[x], "displayName", "OU%d" % x)
704
705         # We've now got objects in the following order:
706         # [100 la_source][100 la_target][100 parents (of la_target)]
707
708         links_expected = True
709
710         # Get all the replication data - this code should resend the requests
711         # with GET_TGT and GET_ANC
712         while not self.replication_complete():
713
714             # get the next block of replication data (this sets GET_TGT/GET_ANC)
715             self.repl_get_next(assert_links=links_expected)
716             links_expected = len(self.rxd_links) < len(expected_links)
717
718         # The way the test objects have been created should force
719         # self.repl_get_next() to use the GET_TGT/GET_ANC flags. If this
720         # doesn't actually happen, then the test isn't doing its job properly
721         self.assertTrue(self.used_get_tgt,
722                         "Test didn't use the GET_TGT flag as expected")
723         self.assertTrue(self.used_get_anc,
724                         "Test didn't use the GET_ANC flag as expected")
725
726         # Check we get all the objects we're expecting
727         self.assert_expected_data(all_objects)
728
729         # Check we received links for all the link sources
730         self.assert_expected_links(expected_links)
731
732         # Second part of test. Add some extra objects and kick off another
733         # replication. The test code will use the HWM from the last replication
734         # so we'll only receive the objects we modify below
735         self.start_new_repl_cycle()
736
737         # add an extra level of grandchildren that hang off a child
738         # that got created last time
739         new_parent = "OU=test_new_parent,%s" % children[0]
740         self.add_object(new_parent)
741         new_children = []
742
743         for x in range(0, 50):
744             dn = "OU=test_new_la_tgt%d,%s" % (x, new_parent)
745             self.add_object(dn)
746             new_children.append(dn)
747
748         # replace half of the links to point to the new children
749         for x in range(0, 50):
750             self.delete_attribute(la_sources[x], "managedBy", children[x])
751             self.modify_object(la_sources[x], "managedBy", new_children[x])
752
753         # add some filler objects to fill up the 1st chunk
754         filler = self.create_object_range(0, 100, prefix="filler")
755
756         # modify the new children/targets so they come after the link source
757         for x in range(0, 50):
758             self.modify_object(new_children[x], "displayName", "OU-%d" % x)
759
760         # modify the parent, so it now comes last in the replication
761         self.modify_object(new_parent, "displayName", "OU%d" % x)
762
763         # We should now get the modified objects in the following order:
764         # [50 links (x 2)][100 filler][50 new children][new parent]
765         # Note that the link sources aren't actually sent (their new linked
766         # attributes are sent, but apart from that, nothing has changed)
767         all_objects = filler + new_children + [new_parent]
768         expected_links = la_sources[:50]
769
770         links_expected = True
771
772         while not self.replication_complete():
773             self.repl_get_next(assert_links=links_expected)
774             links_expected = len(self.rxd_links) < len(expected_links)
775
776         self.assertTrue(self.used_get_tgt,
777                         "Test didn't use the GET_TGT flag as expected")
778         self.assertTrue(self.used_get_anc,
779                         "Test didn't use the GET_ANC flag as expected")
780
781         # Check we get all the objects we're expecting
782         self.assert_expected_data(all_objects)
783
784         # Check we received links (50 deleted links and 50 new)
785         self.assert_expected_links(expected_links, num_expected=100)
786
787     def _repl_integrity_obj_deletion(self, delete_link_source=True):
788         """
789         Tests deleting link objects while a replication is in progress.
790         """
791
792         # create some objects and link them together, with some filler
793         # object in between the link sources
794         la_sources = self.create_object_range(0, 100, prefix="la_source")
795         la_targets = self.create_object_range(0, 100, prefix="la_targets")
796
797         for i in range(0, 50):
798             self.modify_object(la_sources[i], "managedBy", la_targets[i])
799
800         filler = self.create_object_range(0, 100, prefix="filler")
801
802         for i in range(50, 100):
803             self.modify_object(la_sources[i], "managedBy", la_targets[i])
804
805         # touch the targets so that the sources get replicated first
806         for i in range(0, 100):
807             self.modify_object(la_targets[i], "displayName", "OU%d" % i)
808
809         # objects should now be in the following USN order:
810         # [50 la_source][100 filler][50 la_source][100 la_target]
811
812         # Get the first block containing 50 link sources
813         self.repl_get_next()
814
815         # delete either the link targets or link source objects
816         if delete_link_source:
817             objects_to_delete = la_sources
818             # in GET_TGT testenvs we only receive the first 50 source objects
819             expected_objects = la_sources[:50] + la_targets + filler
820         else:
821             objects_to_delete = la_targets
822             expected_objects = la_sources + filler
823
824         for obj in objects_to_delete:
825             self.ldb_dc2.delete(obj)
826
827         # complete the replication
828         while not self.replication_complete():
829             self.repl_get_next()
830
831         # Check we get all the objects we're expecting
832         self.assert_expected_data(expected_objects)
833
834         # we can't use assert_expected_links() here because it tries to check
835         # against the deleted objects on the DC. (Although we receive some
836         # links from the first block processed, the Samba client should end up
837         # deleting these, as the source/target object involved is deleted)
838         self.assertTrue(len(self.rxd_links) == 50,
839                         "Expected 50 links, not %d" % len(self.rxd_links))
840
841     def test_repl_integrity_src_obj_deletion(self):
842         self._repl_integrity_obj_deletion(delete_link_source=True)
843
844     def test_repl_integrity_tgt_obj_deletion(self):
845         self._repl_integrity_obj_deletion(delete_link_source=False)
846
847     def restore_deleted_object(self, guid, new_dn):
848         """Re-animates a deleted object"""
849
850         res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"],
851                                   controls=['show_deleted:1'], scope=ldb.SCOPE_BASE)
852         if len(res) != 1:
853             return
854
855         msg = ldb.Message()
856         msg.dn = res[0].dn
857         msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted")
858         msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName")
859         self.test_ldb_dc.modify(msg, ["show_deleted:1"])
860
861     def sync_DCs(self, nc_dn=None):
862         # make sure DC1 has all the changes we've made to DC2
863         self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn)
864
865     def get_object_guid(self, dn):
866         res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE)
867         return res[0]['objectGUID'][0]
868
869
870     def set_dc_connection(self, conn):
871         """
872         Switches over the connection state info that the underlying drs_base
873         class uses so that we replicate with a different DC.
874         """
875         self.default_hwm = conn.default_hwm
876         self.default_utdv = conn.default_utdv
877         self.drs = conn.drs
878         self.drs_handle = conn.drs_handle
879         self.set_test_ldb_dc(conn.ldb_dc)
880
881     def assert_DCs_replication_is_consistent(self, peer_conn, all_objects,
882                                              expected_links):
883         """
884         Replicates against both the primary and secondary DCs in the testenv
885         and checks that both return the expected results.
886         """
887         print("Checking replication against primary test DC...")
888
889         # get the replication data from the test DC first
890         while not self.replication_complete():
891             self.repl_get_next()
892
893         # Check we get all the objects and links we're expecting
894         self.assert_expected_data(all_objects)
895         self.assert_expected_links(expected_links)
896
897         # switch over the DC state info so we now talk to the peer DC
898         self.set_dc_connection(peer_conn)
899         self.init_test_state()
900
901         print("Checking replication against secondary test DC...")
902
903         # check that we get the same information from the 2nd DC
904         while not self.replication_complete():
905             self.repl_get_next()
906
907         self.assert_expected_data(all_objects)
908         self.assert_expected_links(expected_links)
909
910         # switch back to using the default connection
911         self.set_dc_connection(self.default_conn)
912
913     def test_repl_integrity_obj_reanimation(self):
914         """
915         Checks receiving links for a re-animated object doesn't lose links.
916         We test this against the peer DC to make sure it doesn't drop links.
917         """
918
919         # This test is a little different in that we're particularly interested
920         # in exercising the replmd client code on the second DC.
921         # First, make sure the peer DC has the base OU, then connect to it (so
922         # we store its inital HWM)
923         self.sync_DCs()
924         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
925
926         # create the link source/target objects
927         la_sources = self.create_object_range(0, 100, prefix="la_src")
928         la_targets = self.create_object_range(0, 100, prefix="la_tgt")
929
930         # store the target object's GUIDs (we need to know these to reanimate them)
931         target_guids = []
932
933         for dn in la_targets:
934             target_guids.append(self.get_object_guid(dn))
935
936         # delete the link target
937         for x in range(0, 100):
938             self.ldb_dc2.delete(la_targets[x])
939
940         # sync the DCs, then disable replication. We want the peer DC to get
941         # all the following changes in a single replication cycle
942         self.sync_DCs()
943         self._disable_all_repl(self.dnsname_dc2)
944
945         # restore the target objects for the linked attributes again
946         for x in range(0, 100):
947             self.restore_deleted_object(target_guids[x], la_targets[x])
948
949         # add the links
950         for x in range(0, 100):
951             self.modify_object(la_sources[x], "managedBy", la_targets[x])
952
953         # create some additional filler objects
954         filler = self.create_object_range(0, 100, prefix="filler")
955
956         # modify the targets so they now come last
957         for x in range(0, 100):
958             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
959
960         # the objects should now be sent in the following order:
961         # [la sources + links][filler][la targets]
962         all_objects = la_sources + la_targets + filler
963         expected_links = la_sources
964
965         # Enable replication again make sure the 2 DCs are back in sync
966         self._enable_all_repl(self.dnsname_dc2)
967         self.sync_DCs()
968
969         # Get the replication data from each DC in turn.
970         # Check that both give us all the objects and links we're expecting,
971         # i.e. no links were lost
972         self.assert_DCs_replication_is_consistent(peer_conn, all_objects,
973                                                   expected_links)
974
975     def test_repl_integrity_cross_partition_links(self):
976         """
977         Checks that a cross-partition link to an unknown target object does
978         not result in missing links.
979         """
980
981         # check the peer DC is up-to-date, then connect (storing its HWM)
982         self.sync_DCs()
983         peer_conn = DcConnection(self, self.ldb_dc1, self.dnsname_dc1)
984
985         # stop replication so the peer gets the following objects in one go
986         self._disable_all_repl(self.dnsname_dc2)
987
988         # create a link source object in the main NC
989         la_source = "OU=cross_nc_src,%s" % self.ou
990         self.add_object(la_source)
991
992         # create the link target (a server object) in the config NC
993         rand = random.randint(1, 10000000)
994         la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \
995                     "CN=Sites,%s" %(rand, self.config_dn)
996         self.add_object(la_target, objectclass="server")
997
998         # add a cross-partition link between the two
999         self.modify_object(la_source, "managedBy", la_target)
1000
1001         # First, sync across to the peer the NC containing the link source object
1002         self.sync_DCs()
1003
1004         # Now, before the peer has received the partition containing the target
1005         # object, try replicating from the peer. It will only know about half
1006         # of the link at this point, but it should be a valid scenario
1007         self.set_dc_connection(peer_conn)
1008
1009         while not self.replication_complete():
1010             # pretend we've received other link targets out of order and that's
1011             # forced us to use GET_TGT. This checks the peer doesn't fail trying
1012             # to fetch a cross-partition target object that doesn't exist
1013             self.repl_get_next(get_tgt=True)
1014
1015         self.set_dc_connection(self.default_conn)
1016         self.init_test_state()
1017
1018         # Now sync across the partition containing the link target object
1019         self.sync_DCs(nc_dn=self.config_dn)
1020         self._enable_all_repl(self.dnsname_dc2)
1021
1022         # Get the replication data from each DC in turn.
1023         # Check that both return the cross-partition link (note we're not
1024         # checking the config domain NC here for simplicity)
1025         self.assert_DCs_replication_is_consistent(peer_conn,
1026                                                   all_objects=[la_source],
1027                                                   expected_links=[la_source])
1028
1029         # the cross-partition linked attribute has a missing backlink. Check
1030         # that we can still delete it successfully
1031         self.delete_attribute(la_source, "managedBy", la_target)
1032         self.sync_DCs()
1033
1034         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1035                                       attrs=["managedBy"],
1036                                       controls=['extended_dn:1:0'],
1037                                       scope=ldb.SCOPE_BASE)
1038         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1039                          % la_source)
1040         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
1041                                       attrs=["managedBy"],
1042                                       controls=['extended_dn:1:0'],
1043                                       scope=ldb.SCOPE_BASE)
1044         self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute"
1045                          % la_source)
1046
1047         # Check receiving a cross-partition link to a deleted target.
1048         # Delete the target and make sure the deletion is sync'd between DCs
1049         target_guid = self.get_object_guid(la_target)
1050         self.test_ldb_dc.delete(la_target)
1051         self.sync_DCs(nc_dn=self.config_dn)        
1052         self._disable_all_repl(self.dnsname_dc2)
1053
1054         # re-animate the target
1055         self.restore_deleted_object(target_guid, la_target)
1056         self.modify_object(la_source, "managedBy", la_target)
1057
1058         # now sync the link - because the target is in another partition, the
1059         # peer DC receives a link for a deleted target, which it should accept
1060         self.sync_DCs()
1061         res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc1, la_source),
1062                                       attrs=["managedBy"],
1063                                       controls=['extended_dn:1:0'],
1064                                       scope=ldb.SCOPE_BASE)
1065         self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute"
1066                         % la_source)
1067
1068         # cleanup the server object we created in the Configuration partition
1069         self.test_ldb_dc.delete(la_target)
1070         self._enable_all_repl(self.dnsname_dc2)
1071
1072     def test_repl_get_tgt_multivalued_links(self):
1073         """Tests replication with multi-valued link attributes."""
1074
1075         # create the target/source objects and link them together
1076         la_targets = self.create_object_range(0, 500, prefix="la_tgt")
1077         la_source = "CN=la_src,%s" % self.ou
1078         self.add_object(la_source, objectclass="msExchConfigurationContainer")
1079
1080         for tgt in la_targets:
1081             self.modify_object(la_source, "addressBookRoots2", tgt)
1082
1083         filler = self.create_object_range(0, 100, prefix="filler")
1084
1085         # We should receive the objects/links in the following order:
1086         # [500 targets + 1 source][500 links][100 filler]
1087         expected_objects = la_targets + [la_source] + filler
1088         link_only_chunk = False
1089
1090         # First do the replication without needing GET_TGT
1091         while not self.replication_complete():
1092             ctr6 = self.repl_get_next()
1093
1094             if ctr6.object_count == 0 and ctr6.linked_attributes_count != 0:
1095                 link_only_chunk = True
1096
1097         # we should receive one chunk that contains only links
1098         self.assertTrue(link_only_chunk,
1099                         "Expected to receive a chunk containing only links")
1100
1101         # check we received all the expected objects/links
1102         self.assert_expected_data(expected_objects)
1103         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1104
1105         # Do the replication again, forcing the use of GET_TGT this time
1106         self.init_test_state()
1107
1108         for x in range(0, 500):
1109             self.modify_object(la_targets[x], "displayName", "OU-%d" % x)
1110
1111         # The objects/links should get sent in the following order:
1112         # [1 source][500 targets][500 links][100 filler]
1113
1114         while not self.replication_complete():
1115             ctr6 = self.repl_get_next()
1116
1117         self.assertTrue(self.used_get_tgt,
1118                         "Test didn't use the GET_TGT flag as expected")
1119
1120         # check we received all the expected objects/links
1121         self.assert_expected_data(expected_objects)
1122         self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500)
1123
1124
1125 class DcConnection:
1126     """Helper class to track a connection to another DC"""
1127
1128     def __init__(self, drs_base, ldb_dc, dnsname_dc):
1129         self.ldb_dc = ldb_dc
1130         (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
1131         (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
1132
1133