s4-selftest/drs: Confirm GetNCChanges REPL_OBJ works with a DummyDN and real GUID
authorAndrew Bartlett <abartlet@samba.org>
Fri, 2 Dec 2022 02:30:05 +0000 (15:30 +1300)
committerJule Anger <janger@samba.org>
Wed, 1 Feb 2023 16:30:11 +0000 (16:30 +0000)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=10635

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
(cherry picked from commit 70faccae6d595056174af8d63b3437c9fe3805aa)

selftest/knownfail.d/getncchanges
source4/torture/drs/python/getnc_exop.py

index b716ff83797883915305716d8e1015868a1139c1..cb6c8c630e3b0ddfd7ad1a7cc4d7403798d23286 100644 (file)
@@ -9,3 +9,6 @@ samba4.drs.getncchanges.python\(promoted_dc\).getncchanges.DrsReplicaSyncIntegri
 ^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID
 ^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ
 ^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidNC_DummyDN_InvalidGUID_RID_ALLOC
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_InvalidDestDSA_and_GUID_RID_ALLOC
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_DummyDN_valid_GUID_REPL_OBJ
+^samba4.drs.getnc_exop.python\(.*\).getnc_exop.DrsReplicaSyncTestCase.test_DummyDN_valid_GUID_REPL_SECRET
index 885156cc6e20db6d1c48ca78aeeb7bd1ef478a85..9c16ecca323e7913520b5d653b4c47c1fd6e9889 100644 (file)
@@ -241,8 +241,8 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
             (enum, estr) = e1.args
             self.assertEqual(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
 
-    def test_InvalidNC_DummyDN_InvalidGUID(self):
-        """Test full replication on a totally invalid GUID fails with the right error code"""
+    def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self):
+        """Test single object replication on a totally invalid GUID fails with the right error code"""
         fsmo_dn = self.ldb_dc1.get_schema_basedn()
         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
 
@@ -250,16 +250,16 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
                                invocation_id=fsmo_owner["invocation_id"],
                                nc_dn_str="DummyDN",
                                nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
-                               exop=drsuapi.DRSUAPI_EXOP_NONE)
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
 
         (drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
         try:
             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
         except WERRORError as e1:
             (enum, estr) = e1.args
-            self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
+            self.assertEqual(enum, werror.WERR_DS_DRA_BAD_DN)
 
-    def test_InvalidNC_DummyDN_InvalidGUID_REPL_OBJ(self):
+    def test_InvalidNC_DummyDN_InvalidGUID_REPL_SECRET(self):
         """Test single object replication on a totally invalid GUID fails with the right error code"""
         fsmo_dn = self.ldb_dc1.get_schema_basedn()
         (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
@@ -295,6 +295,52 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
             (enum, estr) = e1.args
             self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
 
+    def test_DummyDN_valid_GUID_REPL_OBJ(self):
+        dc_guid_1 = self.ldb_dc1.get_invocation_id()
+        drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+
+        res = self.ldb_dc1.search(base=self.ou, scope=SCOPE_BASE,
+                                  attrs=["objectGUID"])
+
+        guid = misc.GUID(res[0]["objectGUID"][0])
+
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=dc_guid_1,
+                               nc_dn_str="DummyDN",
+                               nc_guid=guid,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
+
+        try:
+            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        except WERRORError as e1:
+            (enum, estr) = e1.args
+            self.fail(f"Failed to call GetNCChanges with EXOP_REPL_OBJ and a GUID: {estr}")
+
+        self.assertEqual(ctr.first_object.object.identifier.guid, guid)
+
+    def test_DummyDN_valid_GUID_REPL_SECRET(self):
+        dc_guid_1 = self.ldb_dc1.get_invocation_id()
+        drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+
+        res = self.ldb_dc1.search(base=self.ou, scope=SCOPE_BASE,
+                                  attrs=["objectGUID"])
+
+        guid = misc.GUID(res[0]["objectGUID"][0])
+
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=dc_guid_1,
+                               nc_dn_str="DummyDN",
+                               nc_guid=guid,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET)
+
+        try:
+            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        except WERRORError as e1:
+            (enum, estr) = e1.args
+
+            # We expect to get as far as failing on the missing dest_dsa
+            self.assertEqual(enum, werror.WERR_DS_DRA_DB_ERROR)
+
     def test_link_utdv_hwm(self):
         """Test verify the DRS_GET_ANC behavior."""
 
@@ -668,7 +714,29 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
             (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
         except WERRORError as e1:
             (enum, estr) = e1.args
-            self.fail("DsGetNCChanges failed with {estr}")
+            self.fail(f"DsGetNCChanges failed with {estr}")
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
+    def test_InvalidDestDSA_and_GUID_RID_ALLOC(self):
+        """Test role transfer with invalid destination DSA guid"""
+        fsmo_dn = self.ldb_dc1.get_schema_basedn()
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str="DummyDN",
+                               nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        try:
+            (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        except WERRORError as e1:
+            (enum, estr) = e1.args
+            self.fail(f"DsGetNCChanges failed with {estr}")
         self.assertEqual(level, 6, "Expected level 6 response!")
         self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))