tests/ridalloc_exop: Add a new suite of tests for RID allocation
authorGarming Sam <garming@catalyst.net.nz>
Mon, 31 Oct 2016 02:24:49 +0000 (15:24 +1300)
committerGarming Sam <garming@samba.org>
Fri, 4 Nov 2016 03:41:18 +0000 (04:41 +0100)
This moves some tests from getnc_exop.py regarding RID sets as well as
adding new tests for actions on join.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=9954

Pair-programmed-with: Clive Ferreira <cliveferreira@catalyst.net.nz>

Signed-off-by: Andrew Bartlett <abartlet@samaba.org>
Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Signed-off-by: Clive Ferreira <cliveferreira@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
selftest/knownfail
source4/selftest/tests.py
source4/torture/drs/python/getnc_exop.py
source4/torture/drs/python/ridalloc_exop.py [new file with mode: 0644]

index 38b5f51bb369bdf8545c462b83333a2cb3dd01db..309e57010759040dbc19fab9ec411f0283f14382 100644 (file)
 ^samba4.rpc.echo.*on.*with.object.echo.sinkdata.*nt4_dc
 ^samba4.rpc.echo.*on.*with.object.echo.addone.*nt4_dc
 ^samba4.rpc.echo.*on.*ncacn_ip_tcp.*with.object.*nt4_dc
+^samba4.drs.ridalloc_exop.python.*ridalloc_exop.DrsReplicaSyncTestCase.test_offline_manual_seized_ridalloc_with_dbcheck
+^samba4.drs.ridalloc_exop.python.*ridalloc_exop.DrsReplicaSyncTestCase.test_offline_ridalloc
+^samba4.drs.ridalloc_exop.python.*ridalloc_exop.DrsReplicaSyncTestCase.test_offline_samba_tool_seized_ridalloc
+^samba4.drs.ridalloc_exop.python.*ridalloc_exop.DrsReplicaSyncTestCase.test_join_time_ridalloc
+^samba4.drs.ridalloc_exop.python.*ridalloc_exop.DrsReplicaSyncTestCase.test_rid_set_dbcheck_after_seize
+^samba4.drs.ridalloc_exop.python.*ridalloc_exop.DrsReplicaSyncTestCase.test_rid_set_dbcheck
\ No newline at end of file
index d30ffff4197d3aa133f5e2b21dfea70b5d40cdca..87acaf56d830f6921eaf8929db0bdc8c883698d7 100755 (executable)
@@ -655,9 +655,16 @@ plantestsuite("samba4.blackbox.provision-backend", "none", ["PYTHON=%s" % python
 # Test renaming the DC
 plantestsuite("samba4.blackbox.renamedc.sh", "none", ["PYTHON=%s" % python, os.path.join(bbdir, "renamedc.sh"), '$PREFIX/provision'])
 
-for env in ['vampire_dc', 'promoted_dc']:
+# DRS python tests
+
+env = 'vampire_dc'
+planoldpythontestsuite(env, "ridalloc_exop",
+                       extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+                       name="samba4.drs.ridalloc_exop.python(%s)" % env,
+                       environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
+                       extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
 
-    # DRS python tests
+for env in ['vampire_dc', 'promoted_dc']:
     planoldpythontestsuite("%s:local" % env, "samba.tests.blackbox.samba_tool_drs",
                            environ={'DC1': '$DC_SERVER', 'DC2': '$%s_SERVER' % env.upper()},
                            extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
index 941d323382003fa0df4ff656d15feeeb40ed2aa2..246d8593358bca5df26b80c2d1e422cfa5805ea0 100644 (file)
@@ -204,167 +204,6 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
         self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
 
-    def test_InvalidDestDSA_ridalloc(self):
-        """Test RID allocation with invalid destination DSA guid"""
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
-                               invocation_id=fsmo_owner["invocation_id"],
-                               nc_dn_str=fsmo_dn,
-                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
-
-        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
-        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-        self.assertEqual(level, 6, "Expected level 6 response!")
-        self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
-        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
-        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-
-    def test_do_ridalloc(self):
-        """Test doing a RID allocation with a valid destination DSA guid"""
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
-                               invocation_id=fsmo_owner["invocation_id"],
-                               nc_dn_str=fsmo_dn,
-                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
-
-        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
-        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-        self.assertEqual(level, 6, "Expected level 6 response!")
-        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
-        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-        ctr6 = ctr
-        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
-        self.assertEqual(ctr6.object_count, 3)
-        self.assertNotEqual(ctr6.first_object, None)
-        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
-        self.assertNotEqual(ctr6.first_object.next_object, None)
-        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
-        second_object = ctr6.first_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
-        third_object = ctr6.first_object.next_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
-
-        self.assertEqual(ctr6.more_data, False)
-        self.assertEqual(ctr6.nc_object_count, 0)
-        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
-        self.assertEqual(ctr6.drs_error[0], 0)
-        # We don't check the linked_attributes_count as if the domain
-        # has an RODC, it can gain links on the server account object
-
-    def test_do_ridalloc_get_anc(self):
-        """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
-                               invocation_id=fsmo_owner["invocation_id"],
-                               nc_dn_str=fsmo_dn,
-                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
-                               replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
-
-        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
-        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-        self.assertEqual(level, 6, "Expected level 6 response!")
-        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
-        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
-        ctr6 = ctr
-        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
-        self.assertEqual(ctr6.object_count, 3)
-        self.assertNotEqual(ctr6.first_object, None)
-        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
-        self.assertNotEqual(ctr6.first_object.next_object, None)
-        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
-        second_object = ctr6.first_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
-        third_object = ctr6.first_object.next_object.next_object.object
-        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
-        self.assertEqual(ctr6.more_data, False)
-        self.assertEqual(ctr6.nc_object_count, 0)
-        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
-        self.assertEqual(ctr6.drs_error[0], 0)
-        # We don't check the linked_attributes_count as if the domain
-        # has an RODC, it can gain links on the server account object
-
-    def test_edit_rid_master(self):
-        """Test doing a RID allocation after changing the RID master from the original one.
-           This should set rIDNextRID to 0 on the new RID master."""
-        # 1. a. Transfer role to non-RID master
-        #    b. Check that it succeeds correctly
-        #
-        # 2. a. Call the RID alloc against the former master.
-        #    b. Check that it succeeds.
-        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
-
-        # 1. Swap RID master role
-        m = ldb.Message()
-        m.dn = ldb.Dn(self.ldb_dc1, "")
-        m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
-                                                  "becomeRidMaster")
-
-        # Make sure that ldb_dc1 == RID Master
-
-        server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
-
-        # self.ldb_dc1 == LOCALDC
-        if server_dn == fsmo_owner['server_dn']:
-            # ldb_dc1 == VAMPIREDC
-            ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
-        else:
-            # Otherwise switch the two
-            ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
-
-        try:
-            # ldb_dc1 is now RID MASTER (as VAMPIREDC)
-            ldb_dc1.modify(m)
-        except ldb.LdbError, (num, msg):
-            self.fail("Failed to reassign RID Master " +  msg)
-
-        try:
-            # 2. Perform a RID alloc
-            req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
-                    invocation_id=fsmo_not_owner["invocation_id"],
-                    nc_dn_str=fsmo_dn,
-                    exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
-
-            (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
-            # 3. Make sure the allocation succeeds
-            try:
-                (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
-            except RuntimeError, e:
-                self.fail("RID allocation failed: " + str(e))
-
-            fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
-
-            self.assertEqual(level, 6, "Expected level 6 response!")
-            self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
-            self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
-            ctr6 = ctr
-            self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
-            self.assertEqual(ctr6.object_count, 3)
-            self.assertNotEqual(ctr6.first_object, None)
-            self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
-            self.assertNotEqual(ctr6.first_object.next_object, None)
-            self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
-            second_object = ctr6.first_object.next_object.object
-            self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
-            third_object = ctr6.first_object.next_object.next_object.object
-            self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
-        finally:
-            # Swap the RID master back for other tests
-            m = ldb.Message()
-            m.dn = ldb.Dn(ldb_dc2, "")
-            m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
-            try:
-                ldb_dc2.modify(m)
-            except ldb.LdbError, (num, msg):
-                self.fail("Failed to restore RID Master " +  msg)
-
-
 class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
     def setUp(self):
         super(DrsReplicaPrefixMapTestCase, self).setUp()
diff --git a/source4/torture/drs/python/ridalloc_exop.py b/source4/torture/drs/python/ridalloc_exop.py
new file mode 100644 (file)
index 0000000..5a273fc
--- /dev/null
@@ -0,0 +1,714 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Tests various RID allocation scenarios
+#
+# Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
+# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
+# Copyright (C) Catalyst IT Ltd. 2016
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# Usage:
+#  export DC1=dc1_dns_name
+#  export DC2=dc2_dns_name
+#  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
+#  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN ridalloc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+#
+
+import drs_base
+import samba.tests
+
+import ldb
+from ldb import SCOPE_BASE
+
+from samba.dcerpc import drsuapi, misc
+from samba.drs_utils import drs_DsBind
+from samba.samdb import SamDB
+
+import shutil, tempfile, os
+from samba.netcmd.main import cmd_sambatool
+from samba.auth import system_session, admin_session
+from samba.dbchecker import dbcheck
+from samba.ndr import ndr_pack
+from samba.dcerpc import security
+
+class ExopBaseTest:
+    def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
+                   replica_flags=0, max_objects=0, partial_attribute_set=None,
+                   partial_attribute_set_ex=None, mapping_ctr=None):
+        req8 = drsuapi.DsGetNCChangesRequest8()
+
+        req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
+        req8.source_dsa_invocation_id = misc.GUID(invocation_id)
+        req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
+        req8.naming_context.dn = unicode(nc_dn_str)
+        req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
+        req8.highwatermark.tmp_highest_usn = 0
+        req8.highwatermark.reserved_usn = 0
+        req8.highwatermark.highest_usn = 0
+        req8.uptodateness_vector = None
+        req8.replica_flags = replica_flags
+        req8.max_object_count = max_objects
+        req8.max_ndr_size = 402116
+        req8.extended_op = exop
+        req8.fsmo_info = 0
+        req8.partial_attribute_set = partial_attribute_set
+        req8.partial_attribute_set_ex = partial_attribute_set_ex
+        if mapping_ctr:
+            req8.mapping_ctr = mapping_ctr
+        else:
+            req8.mapping_ctr.num_mappings = 0
+            req8.mapping_ctr.mappings = None
+
+        return req8
+
+    def _ds_bind(self, server_name):
+        binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
+
+        drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
+        (drs_handle, supported_extensions) = drs_DsBind(drs)
+        return (drs, drs_handle)
+
+
+class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase, ExopBaseTest):
+    """Intended as a semi-black box test case for DsGetNCChanges
+       implementation for extended operations. It should be testing
+       how DsGetNCChanges handles different input params (mostly invalid).
+       Final goal is to make DsGetNCChanges as binary compatible to
+       Windows implementation as possible"""
+
+    def setUp(self):
+        super(DrsReplicaSyncTestCase, self).setUp()
+
+    def tearDown(self):
+        super(DrsReplicaSyncTestCase, self).tearDown()
+
+    def _determine_fSMORoleOwner(self, fsmo_obj_dn):
+        """Returns (owner, not_owner) pair where:
+             owner: dns name for FSMO owner
+             not_owner: dns name for DC not owning the FSMO"""
+        # collect info to return later
+        fsmo_info_1 = {"dns_name": self.dnsname_dc1,
+                       "invocation_id": self.ldb_dc1.get_invocation_id(),
+                       "ntds_guid": self.ldb_dc1.get_ntds_GUID(),
+                       "server_dn": self.ldb_dc1.get_serverName()}
+        fsmo_info_2 = {"dns_name": self.dnsname_dc2,
+                       "invocation_id": self.ldb_dc2.get_invocation_id(),
+                       "ntds_guid": self.ldb_dc2.get_ntds_GUID(),
+                       "server_dn": self.ldb_dc2.get_serverName()}
+
+        msgs = self.ldb_dc1.search(scope=ldb.SCOPE_BASE, base=fsmo_info_1["server_dn"], attrs=["serverReference"])
+        fsmo_info_1["server_acct_dn"] = ldb.Dn(self.ldb_dc1, msgs[0]["serverReference"][0])
+        fsmo_info_1["rid_set_dn"] = ldb.Dn(self.ldb_dc1, "CN=RID Set") + fsmo_info_1["server_acct_dn"]
+
+        msgs = self.ldb_dc2.search(scope=ldb.SCOPE_BASE, base=fsmo_info_2["server_dn"], attrs=["serverReference"])
+        fsmo_info_2["server_acct_dn"] = ldb.Dn(self.ldb_dc2, msgs[0]["serverReference"][0])
+        fsmo_info_2["rid_set_dn"] = ldb.Dn(self.ldb_dc2, "CN=RID Set") + fsmo_info_2["server_acct_dn"]
+
+        # determine the owner dc
+        res = self.ldb_dc1.search(fsmo_obj_dn,
+                                  scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
+        assert len(res) == 1, "Only one fSMORoleOwner value expected for %s!"%fsmo_obj_dn
+        fsmo_owner = res[0]["fSMORoleOwner"][0]
+        if fsmo_owner == self.info_dc1["dsServiceName"][0]:
+            return (fsmo_info_1, fsmo_info_2)
+        return (fsmo_info_2, fsmo_info_1)
+
+    def _check_exop_failed(self, ctr6, expected_failure):
+        self.assertEqual(ctr6.extended_ret, expected_failure)
+        #self.assertEqual(ctr6.object_count, 0)
+        #self.assertEqual(ctr6.first_object, None)
+        self.assertEqual(ctr6.more_data, False)
+        self.assertEqual(ctr6.nc_object_count, 0)
+        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
+        self.assertEqual(ctr6.linked_attributes_count, 0)
+        self.assertEqual(ctr6.linked_attributes, [])
+        self.assertEqual(ctr6.drs_error[0], 0)
+
+    def test_InvalidDestDSA_ridalloc(self):
+        """Test RID allocation with invalid destination DSA guid"""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa="9c637462-5b8c-4467-aef2-bdb1f57bc4ef",
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str=fsmo_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self._check_exop_failed(ctr, drsuapi.DRSUAPI_EXOP_ERR_UNKNOWN_CALLER)
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+
+    def test_do_ridalloc(self):
+        """Test doing a RID allocation with a valid destination DSA guid"""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str=fsmo_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+        ctr6 = ctr
+        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
+        self.assertEqual(ctr6.object_count, 3)
+        self.assertNotEqual(ctr6.first_object, None)
+        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
+        self.assertNotEqual(ctr6.first_object.next_object, None)
+        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
+        second_object = ctr6.first_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
+        third_object = ctr6.first_object.next_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
+
+        self.assertEqual(ctr6.more_data, False)
+        self.assertEqual(ctr6.nc_object_count, 0)
+        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
+        self.assertEqual(ctr6.drs_error[0], 0)
+        # We don't check the linked_attributes_count as if the domain
+        # has an RODC, it can gain links on the server account object
+
+    def test_do_ridalloc_get_anc(self):
+        """Test doing a RID allocation with a valid destination DSA guid and GET_ANC flag"""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        req8 = self._exop_req8(dest_dsa=fsmo_not_owner["ntds_guid"],
+                               invocation_id=fsmo_owner["invocation_id"],
+                               nc_dn_str=fsmo_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC,
+                               replica_flags=drsuapi.DRSUAPI_DRS_GET_ANC)
+
+        (drs, drs_handle) = self._ds_bind(fsmo_owner["dns_name"])
+        (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+        self.assertEqual(level, 6, "Expected level 6 response!")
+        self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_owner["ntds_guid"]))
+        self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_owner["invocation_id"]))
+        ctr6 = ctr
+        self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
+        self.assertEqual(ctr6.object_count, 3)
+        self.assertNotEqual(ctr6.first_object, None)
+        self.assertEqual(ldb.Dn(self.ldb_dc1, ctr6.first_object.object.identifier.dn), fsmo_dn)
+        self.assertNotEqual(ctr6.first_object.next_object, None)
+        self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
+        second_object = ctr6.first_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_not_owner["rid_set_dn"])
+        third_object = ctr6.first_object.next_object.next_object.object
+        self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_not_owner["server_acct_dn"])
+        self.assertEqual(ctr6.more_data, False)
+        self.assertEqual(ctr6.nc_object_count, 0)
+        self.assertEqual(ctr6.nc_linked_attributes_count, 0)
+        self.assertEqual(ctr6.drs_error[0], 0)
+        # We don't check the linked_attributes_count as if the domain
+        # has an RODC, it can gain links on the server account object
+
+    def test_edit_rid_master(self):
+        """Test doing a RID allocation after changing the RID master from the original one.
+           This should set rIDNextRID to 0 on the new RID master."""
+        # 1. a. Transfer role to non-RID master
+        #    b. Check that it succeeds correctly
+        #
+        # 2. a. Call the RID alloc against the former master.
+        #    b. Check that it succeeds.
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        # 1. Swap RID master role
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb_dc1, "")
+        m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE,
+                                                  "becomeRidMaster")
+
+        # Make sure that ldb_dc1 == RID Master
+
+        server_dn = str(ldb.Dn(self.ldb_dc1, self.ldb_dc1.get_dsServiceName()).parent())
+
+        # self.ldb_dc1 == LOCALDC
+        if server_dn == fsmo_owner['server_dn']:
+            # ldb_dc1 == VAMPIREDC
+            ldb_dc1, ldb_dc2 = self.ldb_dc2, self.ldb_dc1
+        else:
+            # Otherwise switch the two
+            ldb_dc1, ldb_dc2 = self.ldb_dc1, self.ldb_dc2
+
+        try:
+            # ldb_dc1 is now RID MASTER (as VAMPIREDC)
+            ldb_dc1.modify(m)
+        except ldb.LdbError, (num, msg):
+            self.fail("Failed to reassign RID Master " +  msg)
+
+        try:
+            # 2. Perform a RID alloc
+            req8 = self._exop_req8(dest_dsa=fsmo_owner["ntds_guid"],
+                    invocation_id=fsmo_not_owner["invocation_id"],
+                    nc_dn_str=fsmo_dn,
+                    exop=drsuapi.DRSUAPI_EXOP_FSMO_RID_ALLOC)
+
+            (drs, drs_handle) = self._ds_bind(fsmo_not_owner["dns_name"])
+            # 3. Make sure the allocation succeeds
+            try:
+                (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+            except RuntimeError, e:
+                self.fail("RID allocation failed: " + str(e))
+
+            fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+
+            self.assertEqual(level, 6, "Expected level 6 response!")
+            self.assertEqual(ctr.source_dsa_guid, misc.GUID(fsmo_not_owner["ntds_guid"]))
+            self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(fsmo_not_owner["invocation_id"]))
+            ctr6 = ctr
+            self.assertEqual(ctr6.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_SUCCESS)
+            self.assertEqual(ctr6.object_count, 3)
+            self.assertNotEqual(ctr6.first_object, None)
+            self.assertEqual(ldb.Dn(ldb_dc2, ctr6.first_object.object.identifier.dn), fsmo_dn)
+            self.assertNotEqual(ctr6.first_object.next_object, None)
+            self.assertNotEqual(ctr6.first_object.next_object.next_object, None)
+            second_object = ctr6.first_object.next_object.object
+            self.assertEqual(ldb.Dn(self.ldb_dc1, second_object.identifier.dn), fsmo_owner["rid_set_dn"])
+            third_object = ctr6.first_object.next_object.next_object.object
+            self.assertEqual(ldb.Dn(self.ldb_dc1, third_object.identifier.dn), fsmo_owner["server_acct_dn"])
+        finally:
+            # Swap the RID master back for other tests
+            m = ldb.Message()
+            m.dn = ldb.Dn(ldb_dc2, "")
+            m["becomeRidMaster"] = ldb.MessageElement("1", ldb.FLAG_MOD_REPLACE, "becomeRidMaster")
+            try:
+                ldb_dc2.modify(m)
+            except ldb.LdbError, (num, msg):
+                self.fail("Failed to restore RID Master " +  msg)
+
+    def test_offline_samba_tool_seized_ridalloc(self):
+        """Perform a join against the non-RID manager and then seize the RID Manager role"""
+
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            lp = self.get_loadparm()
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=system_session(lp), lp=lp)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # Assert that no RID Set has been set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertFalse("rIDSetReferences" in res[0])
+
+            (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
+            self.assertCmdSuccess(result, out, err)
+            self.assertEquals(err,"","Shouldn't be any error messages")
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+        finally:
+            shutil.rmtree(targetdir, ignore_errors=True)
+            self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST1")
+
+    def _test_join(self, server, netbios_name):
+        tmpdir = os.path.join(self.tempdir, "targetdir")
+        creds = self.get_credentials()
+        cmd = cmd_sambatool.subcommands['domain'].subcommands['join']
+        result = cmd._run("samba-tool domain join",
+                          creds.get_realm(),
+                          "dc", "-U%s%%%s" % (creds.get_username(),
+                                              creds.get_password()),
+                          '--targetdir=%s' % tmpdir,
+                          '--server=%s' % server,
+                          "--option=netbios name = %s" % netbios_name)
+        return tmpdir
+
+    def _test_force_demote(self, server, netbios_name):
+        creds = self.get_credentials()
+        cmd = cmd_sambatool.subcommands['domain'].subcommands['demote']
+        result = cmd._run("samba-tool domain demote",
+                          "-U%s%%%s" % (creds.get_username(),
+                                              creds.get_password()),
+                          '--server=%s' % server,
+                          "--remove-other-dead-server=%s" % netbios_name)
+
+    def test_offline_manual_seized_ridalloc_with_dbcheck(self):
+        """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
+        but do not create the RID set. Confirm that dbcheck correctly creates
+        the RID Set.
+
+        Also check
+        """
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            lp = self.get_loadparm()
+
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=system_session(lp), lp=lp)
+
+            serviceName = new_ldb.get_dsServiceName()
+            m = ldb.Message()
+            m.dn = fsmo_dn
+            m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
+                                                   ldb.FLAG_MOD_REPLACE,
+                                                   "fSMORoleOwner")
+            new_ldb.modify(m)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # Assert that no RID Set has been set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertFalse("rIDSetReferences" in res[0])
+
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
+
+            self.assertEqual(chk.check_database(DN=server_ref_dn, scope=ldb.SCOPE_BASE), 1, "Should have fixed one error (missing RID Set)")
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+        finally:
+            self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST2")
+            shutil.rmtree(targetdir, ignore_errors=True)
+
+    def test_offline_manual_seized_ridalloc_add_user(self):
+        """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
+        but do not create the RID set. Confirm that user-add correctly creates
+        the RID Set."""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            lp = self.get_loadparm()
+
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=system_session(lp), lp=lp)
+
+            serviceName = new_ldb.get_dsServiceName()
+            m = ldb.Message()
+            m.dn = fsmo_dn
+            m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
+                                                   ldb.FLAG_MOD_REPLACE,
+                                                   "fSMORoleOwner")
+            new_ldb.modify(m)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # Assert that no RID Set has been set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertFalse("rIDSetReferences" in res[0])
+
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            new_ldb.newuser("ridalloctestuser", "P@ssword!")
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+
+        finally:
+            self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST3")
+            shutil.rmtree(targetdir, ignore_errors=True)
+
+    def test_offline_manual_seized_ridalloc_add_user_as_admin(self):
+        """Peform the same actions as test_offline_samba_tool_seized_ridalloc,
+        but do not create the RID set. Confirm that user-add correctly creates
+        the RID Set."""
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            lp = self.get_loadparm()
+
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=admin_session(lp, self.ldb_dc1.get_domain_sid()), lp=lp)
+
+            serviceName = new_ldb.get_dsServiceName()
+            m = ldb.Message()
+            m.dn = fsmo_dn
+            m["fSMORoleOwner"] = ldb.MessageElement(serviceName,
+                                                   ldb.FLAG_MOD_REPLACE,
+                                                   "fSMORoleOwner")
+            new_ldb.modify(m)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # Assert that no RID Set has been set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertFalse("rIDSetReferences" in res[0])
+
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            # Create a user to allocate a RID Set for itself (the RID master)
+            new_ldb.newuser("ridalloctestuser", "P@ssword!")
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+
+        finally:
+            self._test_force_demote(fsmo_not_owner['dns_name'], "RIDALLOCTEST4")
+            shutil.rmtree(targetdir, ignore_errors=True)
+
+    def test_join_time_ridalloc(self):
+        """Perform a join against the RID manager and assert we have a RID Set"""
+
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST5")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            lp = self.get_loadparm()
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=system_session(lp), lp=lp)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+        finally:
+            self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST5")
+            shutil.rmtree(targetdir, ignore_errors=True)
+
+    def test_rid_set_dbcheck(self):
+        """Perform a join against the RID manager and assert we have a RID Set.
+        Using dbcheck, we assert that we can detect out of range users."""
+
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST6")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            lp = self.get_loadparm()
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=system_session(lp), lp=lp)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+            rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
+
+            # 4. Add a new user (triggers RID set work)
+            new_ldb.newuser("ridalloctestuser", "P@ssword!")
+
+            # 5. Now fetch the RID SET
+            rid_set_res = new_ldb.search(base=rid_set_dn,
+                                         scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
+                                                                      'rIDAllocationPool'])
+            next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
+            last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
+
+            # 6. Add user above the ridNextRid and at mid-range.
+            #
+            # We can do this with safety because this is an offline DB that will be
+            # destroyed.
+            m = ldb.Message()
+            m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser1,CN=Users")
+            m.dn.add_base(new_ldb.get_default_basedn())
+            m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
+            m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 10))),
+                                                ldb.FLAG_MOD_ADD,
+                                                'objectSid')
+            new_ldb.add(m, controls=["relax:0"])
+
+            # 7. Check the RID Set
+            chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
+
+            # Should have one error (wrong rIDNextRID)
+            self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 1)
+
+            # 8. Assert we get didn't show any other errors
+            chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
+
+            rid_set_res = new_ldb.search(base=rid_set_dn,
+                                         scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
+                                                                      'rIDAllocationPool'])
+            last_allocated_rid = int(rid_set_res[0]["rIDNextRid"][0])
+            self.assertEquals(last_allocated_rid, last_rid - 10)
+
+            # 9. Assert that the range wasn't thrown away
+
+            next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
+            self.assertEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
+        finally:
+            self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST6")
+            shutil.rmtree(targetdir, ignore_errors=True)
+
+
+    def test_rid_set_dbcheck_after_seize(self):
+        """Perform a join against the RID manager and assert we have a RID Set.
+        We seize the RID master role, then using dbcheck, we assert that we can
+        detect out of range users (and then bump the RID set as required)."""
+
+        fsmo_dn = ldb.Dn(self.ldb_dc1, "CN=RID Manager$,CN=System," + self.ldb_dc1.domain_dn())
+        (fsmo_owner, fsmo_not_owner) = self._determine_fSMORoleOwner(fsmo_dn)
+
+        targetdir = self._test_join(fsmo_owner['dns_name'], "RIDALLOCTEST7")
+        try:
+            # Connect to the database
+            ldb_url = "tdb://%s" % os.path.join(targetdir, "private/sam.ldb")
+            smbconf = os.path.join(targetdir, "etc/smb.conf")
+
+            lp = self.get_loadparm()
+            new_ldb = SamDB(ldb_url, credentials=self.get_credentials(),
+                            session_info=system_session(lp), lp=lp)
+
+            # 1. Get server name
+            res = new_ldb.search(base=ldb.Dn(new_ldb, new_ldb.get_serverName()),
+                                 scope=ldb.SCOPE_BASE, attrs=["serverReference"])
+            # 2. Get server reference
+            server_ref_dn = ldb.Dn(new_ldb, res[0]['serverReference'][0])
+
+            # 3. Assert we get the RID Set
+            res = new_ldb.search(base=server_ref_dn,
+                                 scope=ldb.SCOPE_BASE, attrs=['rIDSetReferences'])
+
+            self.assertTrue("rIDSetReferences" in res[0])
+            rid_set_dn = ldb.Dn(new_ldb, res[0]["rIDSetReferences"][0])
+
+            # 4. Seize the RID Manager role
+            (result, out, err) = self.runsubcmd("fsmo", "seize", "--role", "rid", "-H", ldb_url, "-s", smbconf, "--force")
+            self.assertCmdSuccess(result, out, err)
+            self.assertEquals(err,"","Shouldn't be any error messages")
+
+            # 5. Add a new user (triggers RID set work)
+            new_ldb.newuser("ridalloctestuser", "P@ssword!")
+
+            # 6. Now fetch the RID SET
+            rid_set_res = new_ldb.search(base=rid_set_dn,
+                                         scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
+                                                                      'rIDAllocationPool'])
+            next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
+            last_rid = (0xFFFFFFFF00000000 & next_pool) >> 32
+
+            # 7. Add user above the ridNextRid and at almost the end of the range.
+            #
+            m = ldb.Message()
+            m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser2,CN=Users")
+            m.dn.add_base(new_ldb.get_default_basedn())
+            m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
+            m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % (last_rid - 3))),
+                                                ldb.FLAG_MOD_ADD,
+                                                'objectSid')
+            new_ldb.add(m, controls=["relax:0"])
+
+            # 8. Add user above the ridNextRid and at the end of the range
+            m = ldb.Message()
+            m.dn = ldb.Dn(new_ldb, "CN=ridsettestuser3,CN=Users")
+            m.dn.add_base(new_ldb.get_default_basedn())
+            m['objectClass'] = ldb.MessageElement('user', ldb.FLAG_MOD_ADD, 'objectClass')
+            m['objectSid'] = ldb.MessageElement(ndr_pack(security.dom_sid(str(new_ldb.get_domain_sid()) + "-%d" % last_rid)),
+                                                ldb.FLAG_MOD_ADD,
+                                                'objectSid')
+            new_ldb.add(m, controls=["relax:0"])
+
+            chk = dbcheck(new_ldb, verbose=False, fix=True, yes=True, quiet=True)
+
+            # Should have fixed two errors (wrong ridNextRid)
+            self.assertEqual(chk.check_database(DN=rid_set_dn, scope=ldb.SCOPE_BASE), 2)
+
+            # 9. Assert we get didn't show any other errors
+            chk = dbcheck(new_ldb, verbose=False, fix=False, quiet=True)
+
+            # 10. Add another user (checks RID rollover)
+            # We have seized the role, so we can do that.
+            new_ldb.newuser("ridalloctestuser3", "P@ssword!")
+
+            rid_set_res = new_ldb.search(base=rid_set_dn,
+                                         scope=ldb.SCOPE_BASE, attrs=['rIDNextRid',
+                                                                      'rIDAllocationPool'])
+            next_pool = int(rid_set_res[0]["rIDAllocationPool"][0])
+            self.assertNotEqual(last_rid, (0xFFFFFFFF00000000 & next_pool) >> 32, "rid pool should have changed")
+        finally:
+            self._test_force_demote(fsmo_owner['dns_name'], "RIDALLOCTEST7")
+            shutil.rmtree(targetdir, ignore_errors=True)