def err_missing_dn_GUID_component(self, dn, attrname, val, dsdb_dn, errstr):
"""handle a missing GUID extended DN component"""
self.report("ERROR: %s component for %s in object %s - %s" % (errstr, attrname, dn, val))
- controls=["extended_dn:1:1", "show_recycled:1"]
+ controls = ["extended_dn:1:1", "show_recycled:1"]
try:
res = self.samdb.search(base=str(dsdb_dn.dn), scope=ldb.SCOPE_BASE,
attrs=[], controls=controls)
def err_incorrect_binary_dn(self, dn, attrname, val, dsdb_dn, errstr):
"""handle an incorrect binary DN component"""
self.report("ERROR: %s binary component for %s in object %s - %s" % (errstr, attrname, dn, val))
- controls=["extended_dn:1:1", "show_recycled:1"]
+ controls = ["extended_dn:1:1", "show_recycled:1"]
if not self.confirm_all('Change DN to %s?' % str(dsdb_dn), 'fix_all_binary_dn'):
self.report("Not fixing %s" % errstr)
dsdb_dn.binary = "%08X" % int(res[0]['instanceType'][0])
if str(dsdb_dn) != val:
- error_count +=1
+ error_count += 1
self.err_incorrect_binary_dn(obj.dn, attrname, val, dsdb_dn, "incorrect instanceType part of Binary DN")
continue
if len(set_attrs_from_md) < len(list_attid_from_md) \
or len(wrong_attids) > 0 \
or sorted(list_attid_from_md) != list_attid_from_md:
- error_count +=1
+ error_count += 1
self.err_replmetadata_incorrect_attid(dn, attrname, obj[attrname], wrong_attids)
else:
return ndr_pack(sec)
def get_empty_descriptor(domain_sid, name_map={}):
- sddl= ""
+ sddl = ""
return sddl2binary(sddl, domain_sid, name_map)
# "get_schema_descriptor" is located in "schema.py"
return sddl2binary(sddl, domain_sid, name_map)
def get_domain_descriptor(domain_sid, name_map={}):
- sddl= "O:BAG:BAD:AI(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;4828cc14-1437-45bc-9b07-ad6f015e5f28;RU)" \
+ sddl = "O:BAG:BAD:AI(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;4828cc14-1437-45bc-9b07-ad6f015e5f28;RU)" \
"(OA;CIIO;RP;4c164200-20c0-11d0-a768-00aa006e0529;bf967aba-0de6-11d0-a285-00aa003049e2;RU)" \
"(OA;CIIO;RP;5f202010-79a5-11d0-9020-00c04fc2d4cf;4828cc14-1437-45bc-9b07-ad6f015e5f28;RU)" \
"(OA;CIIO;RP;5f202010-79a5-11d0-9020-00c04fc2d4cf;bf967aba-0de6-11d0-a285-00aa003049e2;RU)" \
# then we return that boolean at the end.
inf_conf = ConfigParser()
- inf_conf.optionxform=str
+ inf_conf.optionxform = str
try:
inf_conf.readfp(StringIO(policy))
except:
def _get_nameserver_ip(self):
"""Grab the nameserver IP address from /etc/resolv.conf."""
from os import path
- RESOLV_CONF="/etc/resolv.conf"
+ RESOLV_CONF = "/etc/resolv.conf"
if not path.isfile(RESOLV_CONF):
self.logger.warning("Failed to locate %s" % RESOLV_CONF)
# Directly on the base DN
m = ldb.Message()
m.dn = ldb.Dn(samdb, domain_dn)
- m["msDS-Behavior-Version"]= ldb.MessageElement(
+ m["msDS-Behavior-Version"] = ldb.MessageElement(
str(new_level_domain), ldb.FLAG_MOD_REPLACE,
"msDS-Behavior-Version")
samdb.modify(m)
m = ldb.Message()
m.dn = ldb.Dn(samdb, "CN=" + lp.get("workgroup")
+ ",CN=Partitions,%s" % samdb.get_config_basedn())
- m["msDS-Behavior-Version"]= ldb.MessageElement(
+ m["msDS-Behavior-Version"] = ldb.MessageElement(
str(new_level_domain), ldb.FLAG_MOD_REPLACE,
"msDS-Behavior-Version")
try:
m = ldb.Message()
m.dn = ldb.Dn(samdb, "CN=Partitions,%s" % samdb.get_config_basedn())
- m["msDS-Behavior-Version"]= ldb.MessageElement(
+ m["msDS-Behavior-Version"] = ldb.MessageElement(
str(new_level_forest), ldb.FLAG_MOD_REPLACE,
"msDS-Behavior-Version")
samdb.modify(m)
server_type_string))
self.remote_server = remote_info.pdc_dns_name
- self.remote_binding_string="ncacn_np:%s[%s]" % (self.remote_server, remote_binding_options)
+ self.remote_binding_string = "ncacn_np:%s[%s]" % (self.remote_server, remote_binding_options)
self.remote_creds = remote_creds
return self.remote_server
print("Defunct object %s doesn't exist, skipping" % self.dn)
return True
elif self.unknown_oid is not None:
- print("Skipping unknown OID %s for object %s" %(self.unknown_oid, self.dn))
+ print("Skipping unknown OID %s for object %s" % (self.unknown_oid, self.dn))
return True
return False
#save new options
m = ldb.Message()
m.dn = ldb.Dn(self.samdb, ntds_dn)
- m["options"]= ldb.MessageElement(str(dsa_opts), ldb.FLAG_MOD_REPLACE, "options")
+ m["options"] = ldb.MessageElement(str(dsa_opts), ldb.FLAG_MOD_REPLACE, "options")
self.samdb.modify(m)
# print out new DSA options
cur_opts = [x for x in self.option_map if self.option_map[x] & dsa_opts]
m = ldb.Message()
m.dn = ldb.Dn(samdb, role_object)
- m["fSMORoleOwner"]= ldb.MessageElement(new_owner,
+ m["fSMORoleOwner"] = ldb.MessageElement(new_owner,
ldb.FLAG_MOD_ADD,
"fSMORoleOwner")
try:
m.dn = ldb.Dn(samdb, "")
if role == "rid":
master_owner = get_fsmo_roleowner(samdb, rid_dn, role)
- m["becomeRidMaster"]= ldb.MessageElement(
+ m["becomeRidMaster"] = ldb.MessageElement(
"1", ldb.FLAG_MOD_REPLACE,
"becomeRidMaster")
elif role == "pdc":
scope=ldb.SCOPE_BASE, attrs=["objectSid"])
assert len(res) == 1
sid = res[0]["objectSid"][0]
- m["becomePdc"]= ldb.MessageElement(
+ m["becomePdc"] = ldb.MessageElement(
sid, ldb.FLAG_MOD_REPLACE,
"becomePdc")
elif role == "naming":
master_owner = get_fsmo_roleowner(samdb, naming_dn, role)
- m["becomeDomainMaster"]= ldb.MessageElement(
+ m["becomeDomainMaster"] = ldb.MessageElement(
"1", ldb.FLAG_MOD_REPLACE,
"becomeDomainMaster")
elif role == "infrastructure":
master_owner = get_fsmo_roleowner(samdb, infrastructure_dn, role)
- m["becomeInfrastructureMaster"]= ldb.MessageElement(
+ m["becomeInfrastructureMaster"] = ldb.MessageElement(
"1", ldb.FLAG_MOD_REPLACE,
"becomeInfrastructureMaster")
elif role == "schema":
master_owner = get_fsmo_roleowner(samdb, schema_dn, role)
- m["becomeSchemaMaster"]= ldb.MessageElement(
+ m["becomeSchemaMaster"] = ldb.MessageElement(
"1", ldb.FLAG_MOD_REPLACE,
"becomeSchemaMaster")
else:
if force is not None or seize == True:
self.message("Seizing %s FSMO role..." % role)
- m["fSMORoleOwner"]= ldb.MessageElement(
+ m["fSMORoleOwner"] = ldb.MessageElement(
serviceName, ldb.FLAG_MOD_REPLACE,
"fSMORoleOwner")
if force is not None or seize == True:
self.message("Seizing %s FSMO role..." % role)
- m["fSMORoleOwner"]= ldb.MessageElement(
+ m["fSMORoleOwner"] = ldb.MessageElement(
serviceName, ldb.FLAG_MOD_REPLACE,
"fSMORoleOwner")
try:
continue
try:
- sd_flags=security.SECINFO_OWNER|security.SECINFO_GROUP|security.SECINFO_DACL
+ sd_flags = security.SECINFO_OWNER|security.SECINFO_GROUP|security.SECINFO_DACL
gmsg = self.samdb.search(base=g['dn'], scope=ldb.SCOPE_BASE,
attrs=['name', 'displayName', 'flags',
'nTSecurityDescriptor'],
m['a05'] = ldb.MessageElement("0", ldb.FLAG_MOD_REPLACE, "versionNumber")
m['a07'] = ldb.MessageElement("2", ldb.FLAG_MOD_REPLACE, "gpcFunctionalityVersion")
m['a04'] = ldb.MessageElement("0", ldb.FLAG_MOD_REPLACE, "flags")
- controls=["permissive_modify:0"]
+ controls = ["permissive_modify:0"]
self.samdb.modify(m, controls=controls)
except Exception:
self.samdb.transaction_cancel()
# It does not matter if they are in the same DC, in two DC in one domain or in two
# different domains.
if self.search_scope != SCOPE_BASE:
- title= "\n* DNs found only in %s:" % self.con.host
+ title = "\n* DNs found only in %s:" % self.con.host
for x in self.dn_list:
if not x.upper() in [q.upper() for q in other.dn_list]:
if title and not self.skip_missing_dn:
self.dn_list[self.dn_list.index(x)] = ""
self.dn_list = [x for x in self.dn_list if x]
#
- title= "\n* DNs found only in %s:" % other.con.host
+ title = "\n* DNs found only in %s:" % other.con.host
for x in other.dn_list:
if not x.upper() in [q.upper() for q in self.dn_list]:
if title and not self.skip_missing_dn:
s3conf.set("passdb backend", "samba_dsdb:%s" % samdb.url)
LA_sid = security.dom_sid(str(domain_sid)
- +"-"+str(security.DOMAIN_RID_ADMINISTRATOR))
+ + "-"+str(security.DOMAIN_RID_ADMINISTRATOR))
BA_sid = security.dom_sid(security.SID_BUILTIN_ADMINISTRATORS)
s4_passdb = passdb.PDB(s3conf.get("passdb backend"))
res = sam.search(
expression="samaccountname=%s" % ldb.binary_encode(cleaneduser),
scope=ldb.SCOPE_SUBTREE, attrs=["servicePrincipalName"])
- if len(res) >0:
+ if len(res) > 0:
spns = res[0].get("servicePrincipalName")
found = False
flag = ldb.FLAG_MOD_ADD
res = sam.search(
expression="samaccountname=%s" % ldb.binary_encode(cleaneduser),
scope=ldb.SCOPE_SUBTREE, attrs=["servicePrincipalName"])
- if len(res) >0:
+ if len(res) > 0:
res[0].dn
msg = ldb.Message()
spns = res[0].get("servicePrincipalName")
expression="servicePrincipalName=%s" % ldb.binary_encode(name),
scope=ldb.SCOPE_SUBTREE,
attrs=["servicePrincipalName", "samAccountName"])
- if len(res) >0:
+ if len(res) > 0:
result = None
if user is not None:
(cleaneduser, realm, domain) = _get_user_realm_domain(user)
"and no specific user was specified, list of users"
" with this spn:%s" % (name, listUser))
else:
- result=res[0]
+ result = res[0]
msg = ldb.Message()
domain, dns_domain):
if i == 1:
user = account_name
- realm= domain
+ realm = domain
elif i == 2:
user = account_name.lower()
realm = domain.lower()
del dirsync_obj[a]
dirsync_obj["# %s::" % a] = ["REDACTED SECRET ATTRIBUTE"]
dirsync_ldif = self.samdb.write_ldif(dirsync_obj, ldb.CHANGETYPE_NONE)
- log_msg("# Dirsync[%d] %s %s\n%s" %(idx, guid, sid, dirsync_ldif))
+ log_msg("# Dirsync[%d] %s %s\n%s" % (idx, guid, sid, dirsync_ldif))
obj = self.get_account_attributes(self.samdb,
username="%s" % sid,
basedn="<GUID=%s>" % guid,
line = line[2:]
plus_lines.append(line)
- user_ldif="dn: %s\n" % user_dn
+ user_ldif = "dn: %s\n" % user_dn
user_ldif += "changetype: modify\n"
for line in minus_lines:
attr, val = line.split(':', 1)
- search_attr="%s:" % attr
+ search_attr = "%s:" % attr
if not re.search(r'^' + search_attr, str(plus_lines)):
user_ldif += "delete: %s\n" % attr
user_ldif += "%s: %s\n" % (attr, val)
for line in plus_lines:
attr, val = line.split(':', 1)
- search_attr="%s:" % attr
+ search_attr = "%s:" % attr
if re.search(r'^' + search_attr, str(minus_lines)):
user_ldif += "replace: %s\n" % attr
user_ldif += "%s: %s\n" % (attr, val)
str(current[0]["defaultNamingContext"][0].decode('utf8')),
paths.smbconf, basedn)))
- names.domaindn=current[0]["defaultNamingContext"][0]
- names.rootdn=current[0]["rootDomainNamingContext"][0]
- names.ncs=current[0]["namingContexts"]
+ names.domaindn = current[0]["defaultNamingContext"][0]
+ names.rootdn = current[0]["rootDomainNamingContext"][0]
+ names.ncs = current[0]["namingContexts"]
names.dnsforestdn = None
names.dnsdomaindn = None
SYSVOL_ACL = "O:LAG:BAD:P(A;OICI;0x001f01ff;;;BA)(A;OICI;0x001200a9;;;SO)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)"
POLICIES_ACL = "O:LAG:BAD:P(A;OICI;0x001f01ff;;;BA)(A;OICI;0x001200a9;;;SO)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001301bf;;;PA)"
-SYSVOL_SERVICE="sysvol"
+SYSVOL_SERVICE = "sysvol"
def set_dir_acl(path, acl, lp, domsid, use_ntvfs, passdb, service=SYSVOL_SERVICE):
setntacl(lp, path, acl, domsid, use_ntvfs=use_ntvfs, skip_invalid_chown=True, passdb=passdb, service=service)
ips = []
while "." in entries[i]:
ips.append(entries[i])
- i+=1
+ i += 1
nb_flags = int(entries[i][:-1], 16)
assert not name in self.entries, "Name %s exists twice" % name
self.entries[name] = (ttl, ips, nb_flags)
o.originating_usn = seq
o.local_usn = seq
- if not found and addifnotexist and len(ctr.array) >0:
+ if not found and addifnotexist and len(ctr.array) > 0:
o2 = drsblobs.replPropertyMetaData1()
o2.attid = 589914
att_oid = self.get_oid_from_attid(o2.attid)
attribute="lDAPDisplayName",
scope=SCOPE_SUBTREE)
if target is not None:
- attributes[str(res[i]["lDAPDisplayName"])]=str(target)
+ attributes[str(res[i]["lDAPDisplayName"])] = str(target)
return attributes
class SkipTest(Exception):
"""Test skipped."""
-HEXDUMP_FILTER=bytearray([x if ((len(repr(chr(x)))==3) and (x < 127)) else ord('.') for x in range(256)])
+HEXDUMP_FILTER = bytearray([x if ((len(repr(chr(x))) == 3) and (x < 127)) else ord('.') for x in range(256)])
class TestCase(unittest.TestCase):
"""A Samba test case."""
# objects can be slow to replicate out. So the OU created by a previous
# testenv may still exist at the point that tests start on another testenv.
rand = randint(1, 10000000)
- dn = ldb.Dn(samdb, "OU=%s%d,%s" %(name, rand, samdb.get_default_basedn()))
+ dn = ldb.Dn(samdb, "OU=%s%d,%s" % (name, rand, samdb.get_default_basedn()))
samdb.add({"dn": dn, "objectclass": "organizationalUnit"})
return dn
self.creds.get_secure_channel_type())
def test_get_nt_hash(self):
- password="geheim"
- hex_nthash="c2ae1fe6e648846352453e816f2aeb93"
+ password = "geheim"
+ hex_nthash = "c2ae1fe6e648846352453e816f2aeb93"
self.creds.set_password(password)
self.assertEqual(password, self.creds.get_password())
self.assertEqual(binascii.a2b_hex(hex_nthash),
self.creds.get_nt_hash())
def test_get_ntlm_response(self):
- password="SecREt01"
- hex_challenge="0123456789abcdef"
- hex_nthash="cd06ca7c7e10c99b1d33b7485a2ed808"
- hex_session_key="3f373ea8e4af954f14faa506f8eebdc4"
- hex_ntlm_response="25a98c1c31e81847466b29b2df4680f39958fb8c213a9cc6"
+ password = "SecREt01"
+ hex_challenge = "0123456789abcdef"
+ hex_nthash = "cd06ca7c7e10c99b1d33b7485a2ed808"
+ hex_session_key = "3f373ea8e4af954f14faa506f8eebdc4"
+ hex_ntlm_response = "25a98c1c31e81847466b29b2df4680f39958fb8c213a9cc6"
self.creds.set_username("fred")
self.creds.set_domain("nurk")
self.creds.set_password(password)
def test_get_nt_hash_string(self):
self.creds.set_password_will_be_nt_hash(True)
- hex_nthash="c2ae1fe6e648846352453e816f2aeb93"
+ hex_nthash = "c2ae1fe6e648846352453e816f2aeb93"
self.creds.set_password(hex_nthash)
self.assertEqual(None, self.creds.get_password())
self.assertEqual(binascii.a2b_hex(hex_nthash),
self.assertEqual(creds.authentication_requested(), False)
def test_parse_file_1(self):
- realm="realm.example.com"
- domain="dom"
- password="pass"
- username="user"
+ realm = "realm.example.com"
+ domain = "dom"
+ password = "pass"
+ username = "user"
passwd_file_name = os.path.join(self.tempdir, "parse_file")
passwd_file_fd = open(passwd_file_name, self.open_mode)
os.unlink(passwd_file_name)
def test_parse_file_2(self):
- realm="realm.example.com"
- domain="dom"
- password="pass"
- username="user"
+ realm = "realm.example.com"
+ domain = "dom"
+ password = "pass"
+ username = "user"
passwd_file_name = os.path.join(self.tempdir, "parse_file")
passwd_file_fd = open(passwd_file_name, self.open_mode)
os.unlink(passwd_file_name)
def test_parse_file_3(self):
- realm="realm.example.com"
- domain="domain"
- password="password"
- username="username"
+ realm = "realm.example.com"
+ domain = "domain"
+ password = "password"
+ username = "username"
- userdom="userdom"
+ userdom = "userdom"
passwd_file_name = os.path.join(self.tempdir, "parse_file")
passwd_file_fd = open(passwd_file_name, self.open_mode)
os.unlink(passwd_file_name)
def test_parse_file_4(self):
- realm="realm.example.com"
- domain="domain"
- password="password"
- username="username"
+ realm = "realm.example.com"
+ domain = "domain"
+ password = "password"
+ username = "username"
- userdom="userdom"
+ userdom = "userdom"
passwd_file_name = os.path.join(self.tempdir, "parse_file")
passwd_file_fd = open(passwd_file_name, self.open_mode)
os.unlink(passwd_file_name)
def test_parse_file_5(self):
- realm="realm.example.com"
- domain="domain"
- password="password"
- username="username"
+ realm = "realm.example.com"
+ domain = "domain"
+ password = "password"
+ username = "username"
- userdom="userdom"
+ userdom = "userdom"
passwd_file_name = os.path.join(self.tempdir, "parse_file")
passwd_file_fd = open(passwd_file_name, self.open_mode)
auth_context_id=auth_context["auth_context_id"],
auth_blob=zero_sig)
else:
- auth_info=""
+ auth_info = ""
pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
a.auth_type = auth_type
a.auth_level = auth_level
a.auth_pad_length = auth_pad_length
- a.auth_context_id= auth_context_id
+ a.auth_context_id = auth_context_id
a.credentials = auth_blob
ai = ndr_pack(a)
super(NtaclsBackupRestoreTests, self).setUp()
self.server = os.environ["SERVER"] # addc
- samdb_url='ldap://' + self.server
+ samdb_url = 'ldap://' + self.server
self.service = 'test1' # service/share to test
# root path for service
# Add a set of split records
self.ldb.add_ldif("""
-dn: """+ self.samba4.dn("cn=Domain Users") + """
+dn: """ + self.samba4.dn("cn=Domain Users") + """
objectClass: group
cn: Domain Users
objectSid: S-1-5-21-4231626423-2410014848-2360679739-513
# Add a set of split records
self.ldb.add_ldif("""
-dn: """+ self.samba4.dn("cn=X") + """
+dn: """ + self.samba4.dn("cn=X") + """
objectClass: user
cn: X
codePage: x
def randomName(self, count=8):
"""Create a random name, cap letters and numbers, and always starting with a letter"""
name = random.choice(string.ascii_uppercase)
- name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase+ string.digits) for x in range(count - 1))
+ name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(count - 1))
return name
def randomPass(self, count=16):
name = random.choice(string.ascii_uppercase)
name += random.choice(string.digits)
name += random.choice(string.ascii_lowercase)
- name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase+ string.digits) for x in range(count - 3))
+ name += ''.join(random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for x in range(count - 3))
return name
def randomXid(self):
"virtualCryptSHA512;rounds=5129",
True)
self.assertFalse(sha256 == _get_attribute(out, "virtualCryptSHA256"))
- self.assertFalse(sha512 ==_get_attribute(out, "virtualCryptSHA512"))
+ self.assertFalse(sha512 == _get_attribute(out, "virtualCryptSHA512"))
# The returned hashes should specify the correct number of rounds
self.assertTrue(sha256.startswith("{CRYPT}$5$rounds=2561"))
def test_shebang_lines(self):
"""Check that files with shebang lines and only those are executable."""
files_with_shebang = {}
- files_without_shebang= {}
+ files_without_shebang = {}
for fname, line_no, line in self._iter_source_files_lines():
if line_no >= 1:
continue
cont = False
continue
if usn < int(range[idx]):
- if idx %2 == 1:
+ if idx % 2 == 1:
ok = True
cont = False
if usn == int(range[idx]):
return ret
else:
if i == minimum-1:
- assert len1!=len2,"PB PB PB" + " ".join(tab1)+" / " + " ".join(tab2)
+ assert len1 != len2,"PB PB PB" + " ".join(tab1)+" / " + " ".join(tab2)
if len1 > len2:
return 1
else:
for o in obj.array:
# like a timestamp but with the resolution of 1 minute
- minutestamp =_glue.nttime2unix(o.originating_change_time) // 60
+ minutestamp = _glue.nttime2unix(o.originating_change_time) // 60
hash_ts = hash_id.get(str(o.originating_invocation_id))
if hash_ts is None:
def find_git_root():
'''get to the top of the git repo'''
- p=os.getcwd()
+ p = os.getcwd()
while p != '/':
if os.path.isdir(os.path.join(p, ".git")):
return p
def find_git_root():
'''get to the top of the git repo'''
- p=os.getcwd()
+ p = os.getcwd()
while p != '/':
if os.path.isdir(os.path.join(p, ".git")):
return p
if temp is None:
raise Exception(parameter['name'] + " has an invalid param type " + parameter['type'])
output_string += temp
- f.write(output_string + "(" + parameter['function'] +", " + parameter['function'] + ')\n')
+ f.write(output_string + "(" + parameter['function'] + ", " + parameter['function'] + ')\n')
finally:
f.close()
remote_error = subunit.RemoteError(reason.decode("utf-8"))
if not terminated:
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"reason (%s) interrupted" % result))
return 1
else:
try:
test = open_tests.pop(testname)
except KeyError:
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
- statistics['TESTS_EXPECTED_OK']+=1
+ statistics['TESTS_EXPECTED_OK'] += 1
msg_ops.addSuccess(test)
elif result in ("xfail", "knownfail"):
try:
test = open_tests.pop(testname)
except KeyError:
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
- statistics['TESTS_EXPECTED_FAIL']+=1
+ statistics['TESTS_EXPECTED_FAIL'] += 1
msg_ops.addExpectedFailure(test, remote_error)
elif result in ("uxsuccess", ):
try:
test = open_tests.pop(testname)
except KeyError:
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
- statistics['TESTS_UNEXPECTED_OK']+=1
+ statistics['TESTS_UNEXPECTED_OK'] += 1
msg_ops.addUnexpectedSuccess(test)
exitcode = 1
elif result in ("failure", "fail"):
try:
test = open_tests.pop(testname)
except KeyError:
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
exitcode = 1
msg_ops.addError(subunit.RemotedTestCase(testname), subunit.RemoteError(u"Test was never started"))
else:
- statistics['TESTS_UNEXPECTED_FAIL']+=1
+ statistics['TESTS_UNEXPECTED_FAIL'] += 1
exitcode = 1
msg_ops.addFailure(test, remote_error)
elif result == "skip":
- statistics['TESTS_SKIP']+=1
+ statistics['TESTS_SKIP'] += 1
# Allow tests to be skipped without prior announcement of test
try:
test = open_tests.pop(testname)
test = subunit.RemotedTestCase(testname)
msg_ops.addSkip(test, reason)
elif result == "error":
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
exitcode = 1
try:
test = open_tests.pop(testname)
while open_tests:
test = subunit.RemotedTestCase(open_tests.popitem()[1])
msg_ops.addError(test, subunit.RemoteError(u"was started but never finished!"))
- statistics['TESTS_ERROR']+=1
+ statistics['TESTS_ERROR'] += 1
exitcode = 1
return exitcode
if self.output is None:
sys.stdout.write(msg)
else:
- self.output+=msg
+ self.output += msg
def startTest(self, test):
self.seen_output = True
def addError(self, test, err=None):
test = self._add_prefix(test)
- self.error_added+=1
- self.total_error+=1
+ self.error_added += 1
+ self.total_error += 1
self._ops.addError(test, err)
self.output = None
if self.fail_immediately:
def addUnexpectedSuccess(self, test):
test = self._add_prefix(test)
- self.uxsuccess_added+=1
- self.total_uxsuccess+=1
+ self.uxsuccess_added += 1
+ self.total_uxsuccess += 1
self._ops.addUnexpectedSuccess(test)
if self.output:
self._ops.output_msg(self.output)
if xfail_reason is None:
xfail_reason = find_in_list(self.flapping, test.id())
if xfail_reason is not None:
- self.xfail_added+=1
- self.total_xfail+=1
+ self.xfail_added += 1
+ self.total_xfail += 1
self._ops.addExpectedFailure(test, err)
else:
- self.fail_added+=1
- self.total_fail+=1
+ self.fail_added += 1
+ self.total_fail += 1
self._ops.addFailure(test, err)
if self.output:
self._ops.output_msg(self.output)
print("no output for name[%s]" % name)
if result in ("success", "xfail"):
- self.suites_ok+=1
+ self.suites_ok += 1
else:
self.output_msg("ERROR: Testsuite[%s]\n" % name)
if reason is not None:
def skip_testsuite(self, name, reason="UNKNOWN"):
self.skips.setdefault(reason, []).append(name)
if self.totalsuites:
- self.totalsuites-=1
+ self.totalsuites -= 1
have_man_pages_support = ("XSLTPROC_MANPAGES" in config_hash)
with_pam = ("WITH_PAM" in config_hash)
-pam_wrapper_so_path=config_hash["LIBPAM_WRAPPER_SO_PATH"]
+pam_wrapper_so_path = config_hash["LIBPAM_WRAPPER_SO_PATH"]
planpythontestsuite("none", "samba.tests.source", py3_compatible=True)
if have_man_pages_support:
'''Check for default charsets for Samba3
'''
if conf.CHECK_ICONV(define='HAVE_NATIVE_ICONV'):
- default_dos_charset=False
- default_unix_charset=False
+ default_dos_charset = False
+ default_unix_charset = False
# check for default dos charset name
for charset in ['CP850', 'IBM850']:
if conf.CHECK_CHARSET_EXISTS(charset, headers='iconv.h'):
- default_dos_charset=charset
+ default_dos_charset = charset
break
# check for default unix charset name
for charset in ['UTF-8', 'UTF8']:
if conf.CHECK_CHARSET_EXISTS(charset, headers='iconv.h'):
- default_unix_charset=charset
+ default_unix_charset = charset
break
# At this point, we have a libiconv candidate. We know that
# deal, since we can't guarantee that the results we get now will
# match the results we get at runtime anyway.
if crossbuild:
- default_dos_charset="CP850"
- default_unix_charset="UTF-8"
+ default_dos_charset = "CP850"
+ default_unix_charset = "UTF-8"
# TODO: this used to warn about the set charset on cross builds
if default_dos_charset is False or default_unix_charset is False:
#print domain
#print domsid
-sids=[domsid + '-512', 'S-1-5-32-545', domsid + '-513', 'S-1-1-0', 'S-1-3-1', 'S-1-5-1']
+sids = [domsid + '-512', 'S-1-5-32-545', domsid + '-513', 'S-1-1-0', 'S-1-3-1', 'S-1-5-1']
flush_cache(sids=sids)
sids2xids = subprocess.Popen([wbinfo, '--sids-to-unix-ids=' + ','.join(sids)],
stdout=subprocess.PIPE).communicate()[0].strip()
-gids=[]
-uids=[]
+gids = []
+uids = []
idtypes = []
for line in sids2xids.split('\n'):
plantestsuite("samba3.blackbox.smbclient_auth.plain (%s) %s" % (env, options), env, [os.path.join(samba3srcdir, "script/tests/test_smbclient_auth.sh"), '$SERVER', '$SERVER_IP', '$DC_USERNAME', '$DC_PASSWORD', smbclient3, configuration, options])
plantestsuite("samba3.blackbox.smbclient_auth.plain (%s) %s member creds" % (env, options), env, [os.path.join(samba3srcdir, "script/tests/test_smbclient_auth.sh"), '$SERVER', '$SERVER_IP', '$SERVER/$USERNAME', '$PASSWORD', smbclient3, configuration, options])
-env="ad_dc"
+env = "ad_dc"
plantestsuite("samba3.blackbox.smbspool", env, [os.path.join(samba3srcdir, "script/tests/test_smbspool.sh"), '$SERVER', '$SERVER_IP', '$DC_USERNAME', '$DC_PASSWORD', env])
for env in ["ad_member:local", "nt4_dc:local"]:
plantestsuite("samba3.blackbox.smbpasswd", env, [os.path.join(samba3srcdir, "script/tests/test_smbpasswd.sh"), '$SERVER', '$SERVER_IP', '$DC_USERNAME', '$DC_PASSWORD'])
-env="nt4_dc"
+env = "nt4_dc"
plantestsuite("samba3.blackbox.smbclient_auth.plain (%s) ipv6" % env, env, [os.path.join(samba3srcdir, "script/tests/test_smbclient_auth.sh"), '$SERVER', '$SERVER_IPV6', '$SERVER/$USERNAME', '$PASSWORD', smbclient3, configuration])
for env in ["nt4_member", "ad_member"]:
vfs = ["vfs.fruit", "vfs.acl_xattr", "vfs.fruit_netatalk", "vfs.fruit_file_id", "vfs.fruit_timemachine"]
-tests= base + raw + smb2 + rpc + unix + local + rap + nbt + libsmbclient + idmap + vfs
+tests = base + raw + smb2 + rpc + unix + local + rap + nbt + libsmbclient + idmap + vfs
for t in tests:
if t == "base.delaywrite" or t == "base.deny1" or t == "base.deny2":
expression=("ldapDisplayName=%s" % oc),
attrs=["possibleInferiors"])
- poss=[]
+ poss = []
if len(res) == 0 or res[0].get("possibleInferiors") is None:
return poss
for item in res[0]["possibleInferiors"]:
def get_object_classes(db):
"""return a list of all object classes"""
- list=[]
+ list = []
for item in classinfo:
list.append(item)
return list
self.testuser5 = "to_be_undeleted5"
self.testuser6 = "to_be_undeleted6"
- self.new_dn_ou = "CN="+ self.testuser4 + "," + self.ou1 + self.base_dn
+ self.new_dn_ou = "CN=" + self.testuser4 + "," + self.ou1 + self.base_dn
# Create regular user
self.testuser1_dn = self.get_user_dn(self.testuser1)
def check_rdn(self, liveObj, delObj, rdnName):
print("Checking for correct rDN")
- rdn=liveObj[rdnName][0]
- rdn2=delObj[rdnName][0]
- name2=delObj["name"][0]
- dn_rdn=delObj.dn.get_rdn_value()
- guid=liveObj["objectGUID"][0]
+ rdn = liveObj[rdnName][0]
+ rdn2 = delObj[rdnName][0]
+ name2 = delObj["name"][0]
+ dn_rdn = delObj.dn.get_rdn_value()
+ guid = liveObj["objectGUID"][0]
self.assertEquals(rdn2, rdn + "\nDEL:" + self.GUID_string(guid))
self.assertEquals(name2, rdn + "\nDEL:" + self.GUID_string(guid))
self.assertEquals(name2, dn_rdn)
"objectClass": "server"})
self.objLive1 = self.search_dn(self.usr1)
- self.guid1=self.objLive1["objectGUID"][0]
+ self.guid1 = self.objLive1["objectGUID"][0]
self.objLive2 = self.search_dn(self.usr2)
- self.guid2=self.objLive2["objectGUID"][0]
+ self.guid2 = self.objLive2["objectGUID"][0]
self.objLive3 = self.search_dn(self.grp1)
- self.guid3=self.objLive3["objectGUID"][0]
+ self.guid3 = self.objLive3["objectGUID"][0]
self.objLive4 = self.search_dn(self.sit1)
- self.guid4=self.objLive4["objectGUID"][0]
+ self.guid4 = self.objLive4["objectGUID"][0]
self.objLive5 = self.search_dn(self.ss1)
- self.guid5=self.objLive5["objectGUID"][0]
+ self.guid5 = self.objLive5["objectGUID"][0]
self.objLive6 = self.search_dn(self.srv1)
- self.guid6=self.objLive6["objectGUID"][0]
+ self.guid6 = self.objLive6["objectGUID"][0]
self.objLive7 = self.search_dn(self.srv2)
- self.guid7=self.objLive7["objectGUID"][0]
+ self.guid7 = self.objLive7["objectGUID"][0]
self.deleted_objects_config_dn \
= self.ldb.get_wellknown_dn(self.ldb.get_config_basedn(),
attrs=["parentGUID"],
controls=["dirsync:1:0:1"])
self.assertEqual(len(res.msgs), 0)
- ouname="OU=testou,%s" % self.base_dn
+ ouname = "OU=testou,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
delta = Message()
control2 = str(":".join(ctl))
# Let's create an OU
- ouname="OU=testou2,%s" % self.base_dn
+ ouname = "OU=testou2,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
res = self.ldb_admin.search(self.base_dn,
def test_dirsync_deleted_items(self):
"""Check that dirsync returnd deleted objects too"""
# Let's create an OU
- ouname="OU=testou3,%s" % self.base_dn
+ ouname = "OU=testou3,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
res = self.ldb_admin.search(self.base_dn,
ctl = str(res.controls[0]).split(":")
cookie = ndr_unpack(drsblobs.ldapControlDirSyncCookie, base64.b64decode(str(ctl[4])))
cookie.blob.guid1 = misc.GUID("128a99bf-abcd-1234-abcd-1fb625e530db")
- controls=["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+ controls = ["dirsync:1:0:0:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
res = self.ldb_admin.search(self.base_dn,
expression="(&(objectClass=organizationalUnit)(!(isDeleted=*)))",
controls=controls)
"""Check that dirsync returnd deleted objects too"""
# Let's create an OU
self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
- ouname="OU=testou3,%s" % self.base_dn
+ ouname = "OU=testou3,%s" % self.base_dn
self.ouname = ouname
self.ldb_admin.create_ou(ouname)
homeDirectory: /home/posixuser
loginShell: /bin/bash
gecos: Posix User;;;
-description: A POSIX user"""% (self.base_dn))
+description: A POSIX user""" % (self.base_dn))
# Testing removing the posixAccount objectClass from an existing user"
self.ldb.modify_ldif("""dn: cn=posixuser,CN=Users,%s
changetype: modify
delete: objectClass
-objectClass: posixAccount"""% (self.base_dn))
+objectClass: posixAccount""" % (self.base_dn))
# Testing adding the posixAccount objectClass to an existing user"
self.ldb.modify_ldif("""dn: cn=posixuser,CN=Users,%s
changetype: modify
add: objectClass
-objectClass: posixAccount"""% (self.base_dn))
+objectClass: posixAccount""" % (self.base_dn))
delete_force(self.ldb, "cn=posixuser,cn=users," + self.base_dn)
delete_force(self.ldb, "cn=ldaptestuser,cn=users," + self.base_dn)
object_name = "obj" + time.strftime("%s", time.gmtime())
ldif = """
-dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
+dn: CN=%s,CN=Users,%s""" % (object_name, self.base_dn) + """
objectClass: organizationalPerson
objectClass: person
objectClass: """ + class_ldap_display_name + """
objectClass: top
cn: """ + object_name + """
instanceType: 4
-objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
-distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
+objectCategory: CN=%s,%s""" % (class_name, self.schema_dn) + """
+distinguishedName: CN=%s,CN=Users,%s""" % (object_name, self.base_dn) + """
name: """ + object_name + """
""" + attr_ldap_display_name + """: test
"""
def _get_object_ldif(self, object_name, class_name, class_ldap_display_name, attr_name, attr_value):
# add object with correct syntax
ldif = """
-dn: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
+dn: CN=%s,CN=Users,%s""" % (object_name, self.base_dn) + """
objectClass: organizationalPerson
objectClass: person
objectClass: """ + class_ldap_display_name + """
objectClass: top
cn: """ + object_name + """
instanceType: 4
-objectCategory: CN=%s,%s"""% (class_name, self.schema_dn) + """
-distinguishedName: CN=%s,CN=Users,%s"""% (object_name, self.base_dn) + """
+objectCategory: CN=%s,%s""" % (class_name, self.schema_dn) + """
+distinguishedName: CN=%s,CN=Users,%s""" % (object_name, self.base_dn) + """
name: """ + object_name + """
""" + attr_name + attr_value + """
"""
badPasswordTime = 0
logonCount = 0
lastLogon = 0
- lastLogonTimestamp=0
+ lastLogonTimestamp = 0
logoncount_relation = ''
lastlogon_relation = ''
badPasswordTime = 0
logonCount = 0
lastLogon = 0
- lastLogonTimestamp=0
+ lastLogonTimestamp = 0
logoncount_relation = ''
lastlogon_relation = ''
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
mod = mod.replace(";OI;", ";OIIOID;") # change it how it's gonna look like
self.assertTrue(mod in desc_sddl)
- self.sd_utils.modify_sd_on_dn(group_dn, "D:" +moded)
+ self.sd_utils.modify_sd_on_dn(group_dn, "D:" + moded)
desc_sddl = self.sd_utils.get_sd_as_sddl(group_dn)
self.assertTrue(moded in desc_sddl)
self.assertTrue(mod in desc_sddl)
See that only the owner has been changed.
"""
attrs = ["nTSecurityDescriptor", "replPropertyMetaData", "uSNChanged"]
- controls=["sd_flags:1:%d" % (SECINFO_DACL)]
+ controls = ["sd_flags:1:%d" % (SECINFO_DACL)]
ace = "(A;CI;CC;;;NU)"
sub_ace = "(A;CIID;CC;;;NU)"
sd_sddl = "O:BAG:BAD:P(A;CI;0x000f01ff;;;AU)"
rids = samr_conn.GetGroupsForUser(user_handle)
samr_dns = set()
for rid in rids.rids:
- self.assertEqual(rid.attributes, samr.SE_GROUP_MANDATORY | samr.SE_GROUP_ENABLED_BY_DEFAULT| samr.SE_GROUP_ENABLED)
+ self.assertEqual(rid.attributes, samr.SE_GROUP_MANDATORY | samr.SE_GROUP_ENABLED_BY_DEFAULT | samr.SE_GROUP_ENABLED)
sid = "%s-%d" % (domain_sid, rid.rid)
res = self.admin_ldb.search(base="<SID=%s>" % sid, scope=ldb.SCOPE_BASE,
attrs=[])
def test_crossRef_object(self):
"""Test if the urgent replication is activated when handling a crossRef object."""
self.ldb.add({
- "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
+ "dn": "CN=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn,
"objectClass": "crossRef",
"cn": "test crossRef",
"dnsRoot": self.get_loadparm().get("realm").lower(),
self.sd_utils.dacl_add_ace("OU=test_computer_ou1," + self.base_dn, mod)
- computername=self.computernames[0]
+ computername = self.computernames[0]
sd = ldb.MessageElement((ndr_pack(self.sd_reference_modify)),
ldb.FLAG_MOD_ADD,
"nTSecurityDescriptor")
m = ldb.Message()
m.dn = res[0].dn
- m["description"]= ldb.MessageElement(
+ m["description"] = ldb.MessageElement(
("A description"), ldb.FLAG_MOD_REPLACE,
"description")
self.samdb.modify(m)
self.sd_utils.dacl_add_ace("OU=test_computer_ou1," + self.base_dn, mod)
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername)
res = self.admin_samdb.search("%s" % self.base_dn,
m = ldb.Message()
m.dn = res[0].dn
- m["description"]= ldb.MessageElement(
+ m["description"] = ldb.MessageElement(
("A description"), ldb.FLAG_MOD_REPLACE,
"description")
self.samdb.modify(m)
def test_admin_mod_uac(self):
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername, samdb=self.admin_samdb)
res = self.admin_samdb.search("%s" % self.base_dn,
scope=SCOPE_SUBTREE,
attrs=["userAccountControl"])
- self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT| UF_ACCOUNTDISABLE)
+ self.assertEqual(int(res[0]["userAccountControl"][0]), UF_NORMAL_ACCOUNT | UF_ACCOUNTDISABLE)
def test_uac_bits_set(self):
self.sd_utils.dacl_add_ace("OU=test_computer_ou1," + self.base_dn, mod)
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername)
res = self.admin_samdb.search("%s" % self.base_dn,
m = ldb.Message()
m.dn = res[0].dn
- m["description"]= ldb.MessageElement(
+ m["description"] = ldb.MessageElement(
("A description"), ldb.FLAG_MOD_REPLACE,
"description")
self.samdb.modify(m)
self.sd_utils.dacl_add_ace("OU=test_computer_ou1," + self.base_dn, mod)
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername, others={"userAccountControl": [str(account_type)]})
res = self.admin_samdb.search("%s" % self.base_dn,
m = ldb.Message()
m.dn = res[0].dn
- m["description"]= ldb.MessageElement(
+ m["description"] = ldb.MessageElement(
("A description"), ldb.FLAG_MOD_REPLACE,
"description")
self.samdb.modify(m)
self.uac_bits_unrelated_modify_helper(UF_WORKSTATION_TRUST_ACCOUNT)
def test_uac_bits_add(self):
- computername=self.computernames[0]
+ computername = self.computernames[0]
user_sid = self.sd_utils.get_object_sid(self.unpriv_user_dn)
mod = "(OA;;CC;bf967a86-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
self.fail("Unable to set userAccountControl bit 0x%08X on %s: %s" % (bit, computername, estr))
def test_primarygroupID_cc_add(self):
- computername=self.computernames[0]
+ computername = self.computernames[0]
user_sid = self.sd_utils.get_object_sid(self.unpriv_user_dn)
mod = "(OA;;CC;bf967a86-0de6-11d0-a285-00aa003049e2;;%s)" % str(user_sid)
def test_primarygroupID_priv_DC_modify(self):
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername,
others={"userAccountControl": [str(UF_SERVER_TRUST_ACCOUNT)]},
m = ldb.Message()
m.dn = ldb.Dn(self.admin_samdb, "<SID=%s-%d>" % (str(self.domain_sid),
security.DOMAIN_RID_USERS))
- m["member"]= ldb.MessageElement(
+ m["member"] = ldb.MessageElement(
[str(res[0].dn)], ldb.FLAG_MOD_ADD,
"member")
self.admin_samdb.modify(m)
m = ldb.Message()
m.dn = res[0].dn
- m["primaryGroupID"]= ldb.MessageElement(
+ m["primaryGroupID"] = ldb.MessageElement(
[str(security.DOMAIN_RID_USERS)], ldb.FLAG_MOD_REPLACE,
"primaryGroupID")
try:
self.assertEqual(enum, ldb.ERR_UNWILLING_TO_PERFORM)
def test_primarygroupID_priv_member_modify(self):
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername,
others={"userAccountControl": [str(UF_WORKSTATION_TRUST_ACCOUNT|UF_PARTIAL_SECRETS_ACCOUNT)]},
m = ldb.Message()
m.dn = ldb.Dn(self.admin_samdb, "<SID=%s-%d>" % (str(self.domain_sid),
security.DOMAIN_RID_USERS))
- m["member"]= ldb.MessageElement(
+ m["member"] = ldb.MessageElement(
[str(res[0].dn)], ldb.FLAG_MOD_ADD,
"member")
self.admin_samdb.modify(m)
m = ldb.Message()
m.dn = res[0].dn
- m["primaryGroupID"]= ldb.MessageElement(
+ m["primaryGroupID"] = ldb.MessageElement(
[str(security.DOMAIN_RID_USERS)], ldb.FLAG_MOD_REPLACE,
"primaryGroupID")
try:
def test_primarygroupID_priv_user_modify(self):
- computername=self.computernames[0]
+ computername = self.computernames[0]
self.add_computer_ldap(computername,
others={"userAccountControl": [str(UF_WORKSTATION_TRUST_ACCOUNT)]},
m = ldb.Message()
m.dn = ldb.Dn(self.admin_samdb, "<SID=%s-%d>" % (str(self.domain_sid),
security.DOMAIN_RID_ADMINS))
- m["member"]= ldb.MessageElement(
+ m["member"] = ldb.MessageElement(
[str(res[0].dn)], ldb.FLAG_MOD_ADD,
"member")
self.admin_samdb.modify(m)
m = ldb.Message()
m.dn = res[0].dn
- m["primaryGroupID"]= ldb.MessageElement(
+ m["primaryGroupID"] = ldb.MessageElement(
[str(security.DOMAIN_RID_ADMINS)], ldb.FLAG_MOD_REPLACE,
"primaryGroupID")
self.admin_samdb.modify(m)
print("highest usn %s" % cookie.blob.highwatermark.highest_usn)
print("tmp higest usn %s" % cookie.blob.highwatermark.tmp_highest_usn)
print("reserved usn %s" % cookie.blob.highwatermark.reserved_usn)
- if cookie.blob.extra_length >0:
+ if cookie.blob.extra_length > 0:
print("highest usn in extra %s" % cookie.blob.extra.ctr.cursors[0].highest_usn)
return cookie
-remote_ldb= Ldb("ldap://" + opts.host + ":389", credentials=creds, lp=lp)
+remote_ldb = Ldb("ldap://" + opts.host + ":389", credentials=creds, lp=lp)
tab = []
if opts.b:
base = opts.b
print("")
print("Getting allusers with cookie")
-controls=["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+controls = ["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
(msgs, ctrls) = remote_ldb.searchex(expression="(samaccountname=*)", base=base, attrs=["objectClass"], controls=controls)
if (len(ctrls)):
for ctl in ctrls:
print("")
print("Getting all the entries")
-controls=["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+controls = ["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
(msgs, ctrls) = remote_ldb.searchex(expression="(objectclass=*)", base=base, controls=controls)
cont = 0
if (len(ctrls)):
bigusn = usn + 1000
while (cont == "1"):
print("")
- controls=["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+ controls = ["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
(msgs, ctrls) = remote_ldb.searchex(expression="(objectclass=*)", base=base, controls=controls)
if (len(ctrls)):
for ctl in ctrls:
if cookie.blob.extra_length > 0:
print("here")
cookie.blob.extra.ctr.cursors[0].highest_usn = bigusn - 1
-controls=["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+controls = ["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
(msgs, ctrls) = remote_ldb.searchex(expression="(objectclass=*)", base=base, controls=controls)
if (len(ctrls)):
for ctl in ctrls:
if cookie.blob.extra_length > 0:
cookie.blob.extra.ctr.cursors[0].source_dsa_invocation_id = misc.GUID("128a99bf-e2df-4832-ac0a-1fb625e530db")
cookie.blob.extra.ctr.cursors[0].highest_usn = bigusn - 1
-controls=["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8'))
+controls = ["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8'))
(msgs, ctrls) = remote_ldb.searchex(expression="(objectclass=*)", base=base, controls=controls)
if (len(ctrls)):
for ctl in ctrls:
cookie.blob.highwatermark.tmp_highest_usn = (usn - 2)
if cookie.blob.extra_length > 0:
cookie.blob.extra.ctr.cursors[0].highest_usn = (usn - 2)
-controls=["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
+controls = ["dirsync:1:1:50:%s" % base64.b64encode(ndr_pack(cookie)).decode('utf8')]
(msgs, ctrls) = remote_ldb.searchex(expression="(objectclass=*)", base=base, controls=controls)
if (len(ctrls)):
for ctl in ctrls:
elif transport == "ncacn_ip_tcp":
tests = ncacn_ip_tcp_tests
else:
- raise AssertionError("invalid transport %r"% transport)
+ raise AssertionError("invalid transport %r" % transport)
for t in tests:
plansmbtorture4testsuite(t, env, ["%s:$SERVER[%s]" % (transport, bindoptions), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN'], "samba4.%s on %s with %s" % (t, transport, bindoptions))
plansmbtorture4testsuite('rpc.samba3-sharesec', env, ["%s:$SERVER[%s]" % (transport, bindoptions), '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN', '--option=torture:share=tmp'], "samba4.rpc.samba3.sharesec on %s with %s" % (transport, bindoptions))
"samba4.krb5.kdc with account ALLOWED permission to replicate to an RODC")
# This ensures we have correct behaviour on a server that is not not the PDC emulator
-env="promoted_dc"
+env = "promoted_dc"
plansmbtorture4testsuite('krb5.kdc', env, ['ncacn_np:$SERVER_IP', "-k", "yes", '-U$USERNAME%$PASSWORD', '--workgroup=$DOMAIN', '--realm=$REALM'],
"samba4.krb5.kdc with specified account")
next_object = ctr6.first_object
for i in range(0, ctr6.object_count):
- print("Obj %d: %s %s" %(i, next_object.object.identifier.dn[:25],
+ print("Obj %d: %s %s" % (i, next_object.object.identifier.dn[:25],
next_object.object.identifier.guid))
next_object = next_object.next_object
l.value.blob)
print("Link Tgt %s... <-- Src %s"
- %(target.dn[:25], l.identifier.guid))
+ % (target.dn[:25], l.identifier.guid))
state = "Del"
if l.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE:
state = "Act"
- print(" v%u %s changed %u" %(l.meta_data.version, state,
+ print(" v%u %s changed %u" % (l.meta_data.version, state,
l.meta_data.originating_change_time))
- print("HWM: %d" %(ctr6.new_highwatermark.highest_usn))
- print("Tmp HWM: %d" %(ctr6.new_highwatermark.tmp_highest_usn))
- print("More data: %d" %(ctr6.more_data))
+ print("HWM: %d" % (ctr6.new_highwatermark.highest_usn))
+ print("Tmp HWM: %d" % (ctr6.new_highwatermark.tmp_highest_usn))
+ print("More data: %d" % (ctr6.more_data))
def _get_replication(self, replica_flags,
drs_error=drsuapi.DRSUAPI_EXOP_ERR_NONE, drs=None, drs_handle=None,
drsuapi.DRSUAPI_DRS_WRIT_REP)
self._check_replication([ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
self._check_replication([ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
m = ldb.Message()
drsuapi.DRSUAPI_DRS_WRIT_REP)
self._check_replication([ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
self._check_replication([ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([ou1],
highwatermark=hwm1)
self._check_replication([ou1],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
highwatermark=hwm1)
self._check_replication([ou1],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
uptodateness_vector=utdv1)
drsuapi.DRSUAPI_DRS_WRIT_REP)
self._check_replication([ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
self._check_replication([ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([ou1,ou2],
highwatermark=hwm1)
self._check_replication([ou1,ou2],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
highwatermark=hwm1)
self._check_replication([ou1,ou2],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
uptodateness_vector=utdv1)
drsuapi.DRSUAPI_DRS_WRIT_REP)
self._check_replication([self.ou,ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
self._check_replication([self.ou,ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([self.ou,ou2],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
uptodateness_vector=utdv2)
drsuapi.DRSUAPI_DRS_WRIT_REP)
self._check_replication([self.ou,ou1,ou2,dc3,cn3],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([dc3],
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
self._check_replication([self.ou,ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
m = ldb.Message()
# Can fail against Windows due to equal precedence of dc3, cn3
self._check_replication([self.ou,ou1,ou2,dc3,cn3],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[ou2_managedBy_dc3])
drsuapi.DRSUAPI_DRS_CRITICAL_ONLY)
self._check_replication([self.ou,ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC)
self._check_replication([],
expected_links=[dc3_managedBy_ou1])
self._check_replication([self.ou,ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1])
expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2])
self._check_replication([self.ou,ou1,ou2,dc3],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2])
highwatermark=hwm7)
self._check_replication([],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
highwatermark=hwm7)
highwatermark=hwm7)
self._check_replication([],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
highwatermark=hwm7)
uptodateness_vector=utdv7)
self._check_replication([],
- drsuapi.DRSUAPI_DRS_WRIT_REP|
+ drsuapi.DRSUAPI_DRS_WRIT_REP |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
uptodateness_vector=utdv7)
uptodateness_vector=utdv7)
self._check_replication([],
- drsuapi.DRSUAPI_DRS_CRITICAL_ONLY|
+ drsuapi.DRSUAPI_DRS_CRITICAL_ONLY |
drsuapi.DRSUAPI_DRS_GET_ANC,
expected_links=[dc3_managedBy_ou1,dc3_managedBy_ou2],
uptodateness_vector=utdv7)
# objects can be slow to replicate out. So the OU created by a previous
# testenv may still exist at this point).
rand = random.randint(1, 10000000)
- test_ou = "OU=test_getnc_unpriv%d" %rand
- self.ou = "%s,%s" %(test_ou, self.base_dn)
+ test_ou = "OU=test_getnc_unpriv%d" % rand
+ self.ou = "%s,%s" % (test_ou, self.base_dn)
self.ldb_dc1.add({
"dn": self.ou,
"objectclass": "organizationalUnit"})
t.chdir('${PREFIX}')
t.del_files(["var", "private"])
t.run_cmd("rm -f etc/smb.conf")
- provision=['bin/samba-tool',
+ provision = ['bin/samba-tool',
'domain',
'provision',
'--realm=${LCREALM}',
time.sleep(2)
child.sendline("net use t: \\\\${HOSTNAME}.${LCREALM}\\test")
i = child.expect(["The command completed successfully", "The network path was not found"])
- retries -=1
+ retries -= 1
t.run_net_time(child)
child.expect("C:")
if i == 1:
time.sleep(2)
- retries -=1
+ retries -= 1
t.info("Checking if showrepl is happy")
child.sendline("repadmin /showrepl")
child.expect("C:")
if i == 1:
time.sleep(2)
- retries -=1
+ retries -= 1
t.info("Checking if showrepl is happy")
child.sendline("repadmin /showrepl")
if output:
return subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=dir).communicate()[0]
if isinstance(cmd, list):
- shell=False
+ shell = False
else:
- shell=True
+ shell = True
if checkfail:
return subprocess.check_call(cmd, shell=shell, cwd=dir)
else:
else:
self.info('$ ' + cmd)
if isinstance(cmd, list):
- shell=False
+ shell = False
else:
- shell=True
+ shell = True
os.chdir(dir)
ret = subprocess.Popen(cmd, shell=shell, stderr=subprocess.STDOUT)
os.chdir(cwd)
def ping_wait(self, hostname):
'''wait for a hostname to come up on the network'''
hostname = self.substitute(hostname)
- loops=10
+ loops = 10
while loops > 0:
try:
self.run_cmd("ping -c 1 -w 10 %s" % hostname)