from samba.netcmd.fsmo import get_fsmo_roleowner
import re
from samba import sites
+from samba.dsdb import _dsdb_load_udv_v2
def get_prim_dom(secrets_path, lp):
self.backend = backend
self.base_cmd += ["--backend-store=" + backend]
+ def get_expected_partitions(self, samdb):
+ basedn = str(samdb.get_default_basedn())
+ config_dn = "CN=Configuration,%s" % basedn
+ return [basedn, config_dn, "CN=Schema,%s" % config_dn,
+ "DC=DomainDnsZones,%s" % basedn,
+ "DC=ForestDnsZones,%s" % basedn]
+
def assert_partitions_present(self, samdb):
"""Asserts all expected partitions are present in the backup samdb"""
res = samdb.search(base="", scope=ldb.SCOPE_BASE,
attrs=['namingContexts'])
actual_ncs = [str(r) for r in res[0].get('namingContexts')]
- basedn = str(samdb.get_default_basedn())
- config_dn = "CN=Configuration,%s" % basedn
- expected_ncs = [basedn, config_dn, "CN=Schema,%s" % config_dn,
- "DC=DomainDnsZones,%s" % basedn,
- "DC=ForestDnsZones,%s" % basedn]
+ expected_ncs = self.get_expected_partitions(samdb)
for nc in expected_ncs:
self.assertTrue(nc in actual_ncs,
"%s not in %s" % (nc, str(actual_ncs)))
+ def assert_repl_uptodate_vector(self, samdb):
+ """Asserts an replUpToDateVector entry exists for the original DC"""
+ orig_invoc_id = self.ldb.get_invocation_id()
+ expected_ncs = self.get_expected_partitions(samdb)
+
+ # loop through the partitions and check the upToDateness vector
+ for nc in expected_ncs:
+ found = False
+ for cursor in _dsdb_load_udv_v2(samdb, nc):
+ if orig_invoc_id == str(cursor.source_dsa_invocation_id):
+ found = True
+ break
+ self.assertTrue(found, "Couldn't find UDTV for original DC")
+
def assert_dcs_present(self, samdb, expected_server, expected_count=None):
"""Checks that the expected server is present in the restored DB"""
search_expr = "(&(objectClass=Server)(serverReference=*))"
self.assert_dcs_present(samdb, self.new_server, expected_count=1)
self.assert_fsmo_roles(samdb, self.new_server, self.server)
self.assert_secrets(samdb, expect_secrets=expect_secrets)
+
+ # check we still have an uptodateness vector for the original DC
+ self.assert_repl_uptodate_vector(samdb)
return samdb
def assert_user_secrets(self, samdb, username, expect_secrets):