s4-torture/drs: Add test showing that if present in the set the NC root leads and...
[samba.git] / source4 / torture / drs / python / getncchanges.py
index 5b402e81c882e73cbc72a52f630d1eb590480771..580d8cc66f33ace390e8dbe2f9ea03baa981dafc 100644 (file)
@@ -1268,6 +1268,153 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
         """
         self._test_do_full_repl_no_overlap(mix=True)
 
+    def nc_change(self):
+        old_base_msg = self.default_conn.ldb_dc.search(base=self.base_dn,
+                                                       scope=SCOPE_BASE,
+                                                       attrs=["oEMInformation"])
+        rec_cleanup = {"dn": self.base_dn,
+                       "oEMInformation": old_base_msg[0]["oEMInformation"][0]}
+        m_cleanup = ldb.Message.from_dict(self.default_conn.ldb_dc,
+                                          rec_cleanup,
+                                          ldb.FLAG_MOD_REPLACE)
+
+        self.addCleanup(self.default_conn.ldb_dc.modify, m_cleanup)
+
+        rec = {"dn": self.base_dn,
+               "oEMInformation": f"Tortured by Samba's getncchanges.py {self.id()} against {self.default_conn.dnsname_dc}"}
+        m = ldb.Message.from_dict(self.default_conn.ldb_dc, rec, ldb.FLAG_MOD_REPLACE)
+        self.default_conn.ldb_dc.modify(m)
+
+    def _test_repl_nc_is_first(self, start_at_zero=True, nc_change=True, ou_change=True, mid_change=False):
+        """Tests that the NC is always replicated first, but does not move the
+           tmp_highest_usn at that point, just like 'early' GET_ANC objects.
+        """
+
+        # create objects, twice more than the page size of 133
+        objs = self.create_object_range(0, 300, prefix="obj")
+
+        if nc_change:
+            self.nc_change()
+
+        if mid_change:
+            # create even moire objects
+            objs = self.create_object_range(301, 450, prefix="obj2")
+
+        base_msg = self.default_conn.ldb_dc.search(base=self.base_dn,
+                                                   scope=SCOPE_BASE,
+                                                   attrs=["uSNChanged",
+                                                          "objectGUID"])
+
+        base_guid = misc.GUID(base_msg[0]["objectGUID"][0])
+        base_usn = int(base_msg[0]["uSNChanged"][0])
+
+        if ou_change:
+            # Make one more modification.  We want to assert we have
+            # caught up to the base DN, but Windows both promotes the NC
+            # to the front and skips including it in the tmp_highest_usn,
+            # so we make a later modification that will be to show we get
+            # this change.
+            rec = {"dn": self.ou,
+                   "postalCode": "0"}
+            m = ldb.Message.from_dict(self.default_conn.ldb_dc, rec, ldb.FLAG_MOD_REPLACE)
+            self.default_conn.ldb_dc.modify(m)
+
+        ou_msg = self.default_conn.ldb_dc.search(base=self.ou,
+                                                   scope=SCOPE_BASE,
+                                                   attrs=["uSNChanged",
+                                                          "objectGUID"])
+
+        ou_guid = misc.GUID(ou_msg[0]["objectGUID"][0])
+        ou_usn = int(ou_msg[0]["uSNChanged"][0])
+
+        # Check some predicates about USN ordering that the below tests will rely on
+        if ou_change and nc_change:
+            self.assertGreater(ou_usn, base_usn);
+        elif not ou_change and nc_change:
+            self.assertGreater(base_usn, ou_usn);
+
+        ctr6 = self.repl_get_next()
+
+        guid_list_1 = self._get_ctr6_object_guids(ctr6)
+        if nc_change or start_at_zero:
+            self.assertEqual(base_guid, misc.GUID(guid_list_1[0]))
+            self.assertIn(str(base_guid), guid_list_1)
+            self.assertNotIn(str(base_guid), guid_list_1[1:])
+        else:
+            self.assertNotEqual(base_guid, misc.GUID(guid_list_1[0]))
+            self.assertNotIn(str(base_guid), guid_list_1)
+
+        self.assertTrue(ctr6.more_data)
+
+        if not ou_change and nc_change:
+            self.assertLess(ctr6.new_highwatermark.tmp_highest_usn, base_usn)
+
+        i = 0
+        while not self.replication_complete():
+            i = i + 1
+            last_tmp_highest_usn = ctr6.new_highwatermark.tmp_highest_usn
+            ctr6 = self.repl_get_next()
+            guid_list_2 = self._get_ctr6_object_guids(ctr6)
+            if len(guid_list_2) > 0:
+                self.assertNotEqual(last_tmp_highest_usn, ctr6.new_highwatermark.tmp_highest_usn)
+
+            if (nc_change or start_at_zero) and base_usn > last_tmp_highest_usn:
+                self.assertEqual(base_guid, misc.GUID(guid_list_2[0]),
+                                 f"pass={i} more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
+                self.assertIn(str(base_guid), guid_list_2,
+                                 f"pass {i}·more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
+            else:
+                self.assertNotIn(str(base_guid), guid_list_2,
+                                 f"pass {i}·more_data={ctr6.more_data} base_usn={base_usn} tmp_highest_usn={ctr6.new_highwatermark.tmp_highest_usn} last_tmp_highest_usn={last_tmp_highest_usn}")
+
+        if ou_change:
+            # The modification to the base OU should be in the final chunk
+            self.assertIn(str(ou_guid), guid_list_2)
+            self.assertGreaterEqual(ctr6.new_highwatermark.highest_usn,
+                                    ou_usn)
+        else:
+            # Show that the NC root change does not show up in the
+            # highest_usn.  We either get the change before or after
+            # it.
+            self.assertNotEqual(ctr6.new_highwatermark.highest_usn,
+                                base_usn)
+        self.assertEqual(ctr6.new_highwatermark.highest_usn,
+                          ctr6.new_highwatermark.tmp_highest_usn)
+
+        self.assertFalse(ctr6.more_data)
+
+    def test_repl_nc_is_first_start_zero_nc_change(self):
+        self.default_hwm = drsuapi.DsReplicaHighWaterMark()
+        self._test_repl_nc_is_first(start_at_zero=True, nc_change=True, ou_change=True)
+
+    def test_repl_nc_is_first_start_zero(self):
+        # Get the NC change in the middle of the replication stream, certainly not at the start or end
+        self.nc_change()
+        self.default_hwm = drsuapi.DsReplicaHighWaterMark()
+        self._test_repl_nc_is_first(start_at_zero=True, nc_change=False, ou_change=False)
+
+    def test_repl_nc_is_first_mid(self):
+        # This is a modification of the next test, that Samba
+        # will pass as it will always include the NC in the
+        # tmp_highest_usn at the point where it belongs
+        self._test_repl_nc_is_first(start_at_zero=False,
+                                    nc_change=True,
+                                    ou_change=True,
+                                    mid_change=True)
+
+    def test_repl_nc_is_first(self):
+        # This is a modification of the next test, that Samba
+        # will pass as it will always include the NC in the
+        # tmp_highest_usn at the point where it belongs
+        self._test_repl_nc_is_first(start_at_zero=False, nc_change=True, ou_change=True)
+
+    def test_repl_nc_is_first_nc_change_only(self):
+        # This shows that the NC change is not reflected in the tmp_highest_usn
+        self._test_repl_nc_is_first(start_at_zero=False, nc_change=True, ou_change=False)
+
+    def test_repl_nc_is_first_no_change(self):
+        # The NC should not be present in this replication
+        self._test_repl_nc_is_first(start_at_zero=False, nc_change=False, ou_change=False)
 
 class DcConnection:
     """Helper class to track a connection to another DC"""