from ldb import SCOPE_BASE
import random
-from samba.dcerpc import drsuapi
-
+from samba.dcerpc import drsuapi, misc
+from samba import WERRORError
+from samba import werror
class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def setUp(self):
num_expected=500)
+ def test_InvalidNC_DummyDN_InvalidGUID_full_repl(self):
+ """Test full replication on a totally invalid GUID fails with the right error code"""
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+
+ req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+ invocation_id=dc_guid_1,
+ nc_dn_str="DummyDN",
+ nc_guid=misc.GUID("c2d2f745-1610-4e93-964b-d4ba73eb32f8"),
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ max_objects=1)
+
+ (drs, drs_handle) = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.assertEqual(enum, werror.WERR_DS_DRA_BAD_NC)
+
+ def test_DummyDN_valid_GUID_full_repl(self):
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1, ip=self.url_dc1)
+
+ res = self.ldb_dc1.search(base=self.base_dn, scope=SCOPE_BASE,
+ attrs=["objectGUID"])
+
+ guid = misc.GUID(res[0]["objectGUID"][0])
+
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=dc_guid_1,
+ nc_dn_str="DummyDN",
+ nc_guid=guid,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP |
+ drsuapi.DRSUAPI_DRS_GET_ANC,
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ max_objects=1)
+
+ try:
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ except WERRORError as e1:
+ (enum, estr) = e1.args
+ self.fail(f"Failed to call GetNCChanges with DummyDN and a GUID: {estr}")
+
+ # The NC should be the first object returned due to GET_ANC
+ self.assertEqual(ctr.first_object.object.identifier.guid, guid)
+
+
class DcConnection:
"""Helper class to track a connection to another DC"""