logger.info("Joined domain %s (SID %s) as a DC" % (ctx.domain_name, ctx.domsid))
def join_clone(logger=None, server=None, creds=None, lp=None,
- targetdir=None, domain=None):
+ targetdir=None, domain=None, include_secrets=False):
"""Join as a DC."""
ctx = dc_join(logger, server, creds, lp, site=None, netbios_name=None, targetdir=targetdir, domain=domain,
machinepass=None, use_ntvfs=False, dns_backend="NONE", promote_existing=False, clone_only=True)
drsuapi.DRSUAPI_DRS_PER_SYNC |
drsuapi.DRSUAPI_DRS_FULL_SYNC_IN_PROGRESS |
drsuapi.DRSUAPI_DRS_NEVER_SYNCED)
+ if not include_secrets:
+ ctx.replica_flags |= drsuapi.DRSUAPI_DRS_SPECIAL_SECRET_PROCESSING
ctx.domain_replica_flags = ctx.replica_flags
ctx.do_join()
Option("--server", help="DC to join", type=str),
Option("--targetdir", help="where to store provision", type=str),
Option("--quiet", help="Be quiet", action="store_true"),
+ Option("--include-secrets", help="Also replicate secret values", action="store_true"),
Option("--verbose", help="Be verbose", action="store_true")
]
def run(self, domain, sambaopts=None, credopts=None,
versionopts=None, server=None, targetdir=None,
- quiet=False, verbose=False):
+ quiet=False, verbose=False, include_secrets=False):
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp)
logger.setLevel(logging.INFO)
join_clone(logger=logger, server=server, creds=creds, lp=lp, domain=domain,
- targetdir=targetdir)
+ targetdir=targetdir, include_secrets=include_secrets)
class cmd_drs(SuperCommand):
nc_name = ldb_rootdse["defaultNamingContext"]
ds_name = ldb_rootdse["dsServiceName"]
ldap_service_name = str(server_rootdse["ldapServiceName"][0])
+ self.assertEqual(nc_name, server_nc_name)
+ # The clone should pretend to be the source server
+ self.assertEqual(ds_name, server_ds_name)
+ self.assertEqual(ldap_service_name, server_ldap_service_name)
+
+ samdb = samba.tests.connect_samdb("tdb://" + os.path.join(self.tempdir, "private", "sam.ldb"),
+ ldap_only=False, lp=self.get_loadparm())
+ def get_krbtgt_pw():
+ krbtgt_pw = samdb.searchone("unicodePwd", "cn=krbtgt,CN=users,%s" % nc_name)
+ self.assertRaises(KeyError, get_krbtgt_pw)
+ shutil.rmtree(os.path.join(self.tempdir, "private"))
+ shutil.rmtree(os.path.join(self.tempdir, "etc"))
+ shutil.rmtree(os.path.join(self.tempdir, "msg.lock"))
+ os.remove(os.path.join(self.tempdir, "names.tdb"))
+ shutil.rmtree(os.path.join(self.tempdir, "state"))
+
+ def test_samba_tool_drs_clone_dc_secrets(self):
+ """Tests 'samba-tool drs clone-dc-database --include-secrets' command ."""
+ server_rootdse = self._get_rootDSE(self.dc1)
+ server_nc_name = server_rootdse["defaultNamingContext"]
+ server_ds_name = server_rootdse["dsServiceName"]
+ server_ldap_service_name = str(server_rootdse["ldapServiceName"][0])
+ server_realm = server_ldap_service_name.split(":")[0]
+ creds = self.get_credentials()
+ out = self.check_output("samba-tool drs clone-dc-database %s --server=%s %s --targetdir=%s --include-secrets"
+ % (server_realm,
+ self.dc1,
+ self.cmdline_creds,
+ self.tempdir))
+ ldb_rootdse = self._get_rootDSE("tdb://" + os.path.join(self.tempdir, "private", "sam.ldb"), ldap_only=False)
+ nc_name = ldb_rootdse["defaultNamingContext"]
+ ds_name = ldb_rootdse["dsServiceName"]
+ ldap_service_name = str(server_rootdse["ldapServiceName"][0])
+
+ samdb = samba.tests.connect_samdb("tdb://" + os.path.join(self.tempdir, "private", "sam.ldb"),
+ ldap_only=False, lp=self.get_loadparm())
+ krbtgt_pw = samdb.searchone("unicodePwd", "cn=krbtgt,CN=users,%s" % nc_name)
+ self.assertIsNotNone(krbtgt_pw)
+
self.assertEqual(nc_name, server_nc_name)
# The clone should pretend to be the source server
self.assertEqual(ds_name, server_ds_name)