""" % (sid, unixid, sid, type_string, sid)
self.add(self.parse_ldif(mod).next()[1])
-
-
class dc_join(object):
- '''perform a DC join'''
+ """Perform a DC join."""
def __init__(ctx, server=None, creds=None, lp=None, site=None,
netbios_name=None, targetdir=None, domain=None,
pass
def cleanup_old_join(ctx):
- '''remove any DNs from a previous join'''
+ """Remove any DNs from a previous join."""
try:
# find the krbtgt link
print("checking sAMAccountName")
pass
def promote_possible(ctx):
- '''confirm that the account is just a bare NT4 BDC or a member server, so can be safely promoted'''
+ """confirm that the account is just a bare NT4 BDC or a member server, so can be safely promoted"""
if ctx.subdomain:
# This shouldn't happen
raise Exception("Can not promote into a subdomain")
raise Exception("Account '%s' appears to be an active DC, use 'samba-tool domain join' if you must re-create this account" % ctx.samname)
if (int(res[0]["userAccountControl"][0]) & (samba.dsdb.UF_WORKSTATION_TRUST_ACCOUNT|samba.dsdb.UF_SERVER_TRUST_ACCOUNT) == 0):
raise Exception("Account %s is not a domain member or a bare NT4 BDC, use 'samba-tool domain join' instead'" % ctx.samname)
-
+
ctx.promote_from_dn = res[0].dn
def find_dc(ctx, domain):
- '''find a writeable DC for the given domain'''
+ """find a writeable DC for the given domain"""
try:
ctx.cldap_ret = ctx.net.finddc(domain=domain, flags=nbt.NBT_SERVER_LDAP | nbt.NBT_SERVER_DS | nbt.NBT_SERVER_WRITABLE)
except Exception:
rec["msDS-NeverRevealGroup"] = ctx.never_reveal_sid
elif ctx.promote_existing:
rec["msDS-NeverRevealGroup"] = []
-
+
if ctx.reveal_sid:
rec["msDS-RevealOnDemandGroup"] = ctx.reveal_sid
elif ctx.promote_existing:
ctx.samdb.modify(m)
def join_add_objects2(ctx):
- '''add the various objects needed for the join, for subdomains post replication'''
+ """add the various objects needed for the join, for subdomains post replication"""
print "Adding %s" % ctx.partition_dn
# NOTE: windows sends a ntSecurityDescriptor here, we
replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
def join_provision(ctx):
- '''provision the local SAM'''
+ """Provision the local SAM."""
print "Calling bare provision"
ctx.names = presult.names
def join_provision_own_domain(ctx):
- '''provision the local SAM'''
+ """Provision the local SAM."""
# we now operate exclusively on the local database, which
# we need to reopen in order to get the newly created schema
print("Provision OK for domain %s" % ctx.names.dnsdomain)
def join_replicate(ctx):
- '''replicate the SAM'''
+ """Replicate the SAM."""
print "Starting replication"
ctx.local_samdb.transaction_start()
ctx.drsuapi.DsReplicaUpdateRefs(ctx.drsuapi_handle, 1, r)
def join_finalise(ctx):
- '''finalise the join, mark us synchronised and setup secrets db'''
+ """Finalise the join, mark us synchronised and setup secrets db."""
logger = logging.getLogger("provision")
logger.addHandler(logging.StreamHandler(sys.stdout))
targetdir=ctx.targetdir)
def join_setup_trusts(ctx):
- '''provision the local SAM'''
+ """provision the local SAM."""
def arcfour_encrypt(key, data):
from Crypto.Cipher import ARC4
ctx.promote_possible()
else:
ctx.cleanup_old_join()
-
+
try:
ctx.join_add_objects()
ctx.join_provision()
targetdir=None, domain=None, domain_critical_only=False,
machinepass=None, use_ntvfs=False, dns_backend=None,
promote_existing=False):
- """join as a RODC"""
+ """Join as a RODC."""
ctx = dc_join(server, creds, lp, site, netbios_name, targetdir, domain,
machinepass, use_ntvfs, dns_backend, promote_existing)
ctx.krbtgt_dn = "CN=krbtgt_%s,CN=Users,%s" % (ctx.myname, ctx.base_dn)
# setup some defaults for accounts that should be replicated to this RODC
- ctx.never_reveal_sid = [ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
- "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
- "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
- "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS ]
+ ctx.never_reveal_sid = [
+ "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_DENY),
+ "<SID=%s>" % security.SID_BUILTIN_ADMINISTRATORS,
+ "<SID=%s>" % security.SID_BUILTIN_SERVER_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_BACKUP_OPERATORS,
+ "<SID=%s>" % security.SID_BUILTIN_ACCOUNT_OPERATORS]
ctx.reveal_sid = "<SID=%s-%s>" % (ctx.domsid, security.DOMAIN_RID_RODC_ALLOW)
mysid = ctx.get_mysid()
ctx.do_join()
-
print "Joined domain %s (SID %s) as an RODC" % (ctx.domain_name, ctx.domsid)
targetdir=None, domain=None, domain_critical_only=False,
machinepass=None, use_ntvfs=False, dns_backend=None,
promote_existing=False):
- """join as a DC"""
+ """Join as a DC."""
ctx = dc_join(server, creds, lp, site, netbios_name, targetdir, domain,
machinepass, use_ntvfs, dns_backend, promote_existing)
ctx.do_join()
print "Joined domain %s (SID %s) as a DC" % (ctx.domain_name, ctx.domsid)
-def join_subdomain(server=None, creds=None, lp=None, site=None, netbios_name=None,
- targetdir=None, parent_domain=None, dnsdomain=None, netbios_domain=None,
- machinepass=None, use_ntvfs=False, dns_backend=None):
- """join as a DC"""
+def join_subdomain(server=None, creds=None, lp=None, site=None,
+ netbios_name=None, targetdir=None, parent_domain=None, dnsdomain=None,
+ netbios_domain=None, machinepass=None, use_ntvfs=False,
+ dns_backend=None):
+ """Join as a DC."""
ctx = dc_join(server, creds, lp, site, netbios_name, targetdir, parent_domain,
machinepass, use_ntvfs, dns_backend)
ctx.subdomain = True
class IdmapDatabase(TdbDatabase):
"""Samba 3 ID map database reader."""
+
def _check_version(self):
assert fetch_int32(self.tdb, "IDMAP_VERSION\0") == IDMAP_VERSION_V2
class SecretsDatabase(TdbDatabase):
"""Samba 3 Secrets database reader."""
+
def get_auth_password(self):
return self.tdb.get("SECRETS/AUTH_PASSWORD")
SHARE_DATABASE_VERSION_V1 = 1
SHARE_DATABASE_VERSION_V2 = 2
+
class ShareInfoDatabase(TdbDatabase):
"""Samba 3 Share Info database reader."""
+
def _check_version(self):
assert fetch_int32(self.tdb, "INFO/version\0") in (SHARE_DATABASE_VERSION_V1, SHARE_DATABASE_VERSION_V2)
class Samba3(object):
"""Samba 3 configuration and state data reader."""
+
def __init__(self, smbconfpath, s3_lp_ctx=None):
"""Open the configuration and data for a Samba 3 installation.
else:
self.transaction_commit()
-
def setpassword(self, search_filter, password,
force_change_at_next_login=False, username=None):
"""Sets the password for a user
class SDUtils(object):
- """Some utilities for manipulation of security descriptors
- on objects"""
+ """Some utilities for manipulation of security descriptors on objects."""
def __init__(self, samdb):
self.ldb = samdb
self.domain_sid = security.dom_sid(self.ldb.get_domain_sid())
def modify_sd_on_dn(self, object_dn, sd, controls=None):
- """ Modify security descriptor using either SDDL string
+ """Modify security descriptor using either SDDL string
or security.descriptor object
"""
m = Message()
return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
def dacl_add_ace(self, object_dn, ace):
- """ Adds an ACE to an objects security descriptor
+ """Add an ACE to an objects security descriptor
"""
desc = self.read_sd_on_dn(object_dn)
desc_sddl = desc.as_sddl(self.domain_sid)
self.modify_sd_on_dn(object_dn, desc_sddl)
def get_sd_as_sddl(self, object_dn, controls=None):
- """ Return object nTSecutiryDescriptor in SDDL format
+ """Return object nTSecutiryDescriptor in SDDL format
"""
desc = self.read_sd_on_dn(object_dn, controls=controls)
return desc.as_sddl(self.domain_sid)
if s is not None:
s.close()
+
class TestSimpleQueries(DNSTest):
+
def test_one_a_query(self):
"create a query packet containing one query record"
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
class TestDNSUpdates(DNSTest):
+
def test_two_updates(self):
"create two update requests"
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
self.assertEquals(response.ancount, 1)
self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
-
def test_delete_record(self):
"Test if deleting records works"
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
class TestComplexQueries(DNSTest):
+
def setUp(self):
super(TestComplexQueries, self).setUp()
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
client_finished = False
server_finished = False
server_to_client = ""
-
+
"""Run the actual call loop"""
while client_finished == False and server_finished == False:
if not client_finished:
def test_iter(self):
self.assertEquals([], list(self._get_shares({})))
self.assertEquals([], list(self._get_shares({"global":{}})))
- self.assertEquals(["bla"], list(self._get_shares({"global":{}, "bla":{}})))
+ self.assertEquals(
+ ["bla"],
+ list(self._get_shares({"global":{}, "bla":{}})))
def test_len(self):
shares = self._get_shares({"global": {}})
def get_context(self, *args, **kwargs):
return Messaging(*args, **kwargs)
-
+
def test_register(self):
x = self.get_context()
def callback():
"""Tests for the Samba3 NT -> posix ACL layer"""
-from samba.ntacls import setntacl, getntacl, XattrBackendError
+from samba.ntacls import setntacl, getntacl
from samba.dcerpc import xattr, security, smb_acl, idmap
from samba.param import LoadParm
-from samba.tests import TestCase, TestSkipped
+from samba.tests import TestCase
from samba import provision
import random
import os
# print "a_perm: %o" % entry.a_perm
# print "uid: %d" % entry.uid
# print "gid: %d" % entry.gid
-
+
class PosixAclMappingTests(TestCase):
def test_setntacl(self):
self.assertRaises(ValueError, sanitize_server_role, "foo")
def test_valid(self):
- self.assertEquals("standalone server", sanitize_server_role("ROLE_STANDALONE"))
- self.assertEquals("standalone server", sanitize_server_role("standalone"))
- self.assertEquals("active directory domain controller", sanitize_server_role("domain controller"))
+ self.assertEquals(
+ "standalone server",
+ sanitize_server_role("ROLE_STANDALONE"))
+ self.assertEquals(
+ "standalone server",
+ sanitize_server_role("standalone"))
+ self.assertEquals(
+ "active directory domain controller",
+ sanitize_server_role("domain controller"))
class DummyLogger(object):
class HelperTests(samba.tests.TestCase):
def test_predef_to_name(self):
- self.assertEquals("HKEY_LOCAL_MACHINE",
+ self.assertEquals("HKEY_LOCAL_MACHINE",
registry.get_predef_name(0x80000002))
def test_str_regtype(self):
from samba.dcerpc.security import dom_sid
import os
+
for p in [ "../../../../../testdata/samba3", "../../../../testdata/samba3" ]:
DATADIR = os.path.join(os.path.dirname(__file__), p)
if os.path.exists(DATADIR):
from samba.dcerpc import lsa, samr, security
from samba.dcerpc.security import dom_sid
from samba.credentials import Credentials
-from samba.auth import system_session
from samba import dsdb
from samba.ndr import ndr_pack
from samba import unix2nttime
except ldb.LdbError, e:
logger.warn("Could not set account policy, (%s)", str(e))
-def add_posix_attrs(logger, samdb, sid, name, nisdomain, xid_type, home=None, shell=None, pgid=None):
+
+def add_posix_attrs(logger, samdb, sid, name, nisdomain, xid_type, home=None,
+ shell=None, pgid=None):
"""Add posix attributes for the user/group
:param samdb: Samba4 sam.ldb database
'Could not modify AD idmap entry for sid=%s, id=%s, type=%s (%s)',
str(sid), str(xid), xid_type, str(e))
+
def add_idmap_entry(idmapdb, sid, xid, xid_type, logger):
"""Create idmap entry
logger.warning("LDAP entry for user %s contains more than one %s", user, attr)
return None
-def upgrade_from_samba3(samba3, logger, targetdir, session_info=None, useeadb=False, dns_backend=None,
- use_ntvfs=False):
+
+def upgrade_from_samba3(samba3, logger, targetdir, session_info=None,
+ useeadb=False, dns_backend=None, use_ntvfs=False):
"""Upgrade from samba3 database to samba4 AD database
:param samba3: samba3 object
logger.info("Administrator password has been set to password of user '%s'", admin_user)
if result.server_role == "active directory domain controller":
- setsysvolacl(result.samdb, result.paths.netlogon, result.paths.sysvol, result.paths.root_uid, result.paths.wheel_gid,
- security.dom_sid(result.domainsid), result.names.dnsdomain, result.names.domaindn, result.lp, use_ntvfs)
+ setsysvolacl(result.samdb, result.paths.netlogon, result.paths.sysvol,
+ result.paths.root_uid, result.paths.wheel_gid,
+ security.dom_sid(result.domainsid), result.names.dnsdomain,
+ result.names.domaindn, result.lp, use_ntvfs)
# FIXME: import_registry(registry.Registry(), samba3.get_registry())
# FIXME: shares