From 68f8a1c2747fd51a633b34dc4301b1f6acae5de6 Mon Sep 17 00:00:00 2001 From: Tim Beale Date: Fri, 27 Jul 2018 14:34:16 +1200 Subject: [PATCH] Fix PEP8 warning E501 line too long Mostly involves splitting up long strings or comments so that they span multiple lines. Some place-holder variables have been added in a few places to avoid exceeding 80 chars. Signed-off-by: Tim Beale Reviewed-by: Andrew Bartlett Reviewed-by: Douglas Bagnall --- python/samba/netcmd/pso.py | 109 ++++++++------ python/samba/tests/pso.py | 20 +-- .../tests/samba_tool/passwordsettings.py | 58 ++++---- .../dsdb/tests/python/password_settings.py | 53 ++++--- source4/torture/drs/python/getncchanges.py | 137 +++++++++++------- source4/torture/drs/python/link_conflicts.py | 133 +++++++++++------ 6 files changed, 311 insertions(+), 199 deletions(-) diff --git a/python/samba/netcmd/pso.py b/python/samba/netcmd/pso.py index e030832af7e..96f0b4f259e 100644 --- a/python/samba/netcmd/pso.py +++ b/python/samba/netcmd/pso.py @@ -19,7 +19,8 @@ import samba.getopt as options import ldb from samba.samdb import SamDB from samba.netcmd import (Command, CommandError, Option, SuperCommand) -from samba.dcerpc.samr import DOMAIN_PASSWORD_COMPLEX, DOMAIN_PASSWORD_STORE_CLEARTEXT +from samba.dcerpc.samr import (DOMAIN_PASSWORD_COMPLEX, + DOMAIN_PASSWORD_STORE_CLEARTEXT) from samba.auth import system_session NEVER_TIMESTAMP = int(-0x8000000000000000) @@ -136,8 +137,8 @@ def show_pso_for_user(outf, samdb, username): if len(res) == 0: outf.write("User '%s' not found.\n" % username) elif 'msDS-ResultantPSO' not in res[0]: - outf.write("No PSO applies to user '%s'. The default domain settings apply.\n" - % username) + outf.write("No PSO applies to user '%s'. " + "The default domain settings apply.\n" % username) outf.write("Refer to 'samba-tool domain passwordsettings show'.\n") else: # sanity-check user has permissions to view PSO details (non-admin @@ -234,15 +235,18 @@ def check_pso_constraints(min_pwd_length=None, history_length=None, # check values as per section 3.1.1.5.2.2 Constraints in MS-ADTS spec if history_length is not None and history_length > 1024: - raise CommandError("Bad password history length: valid range is 0 to 1024") + raise CommandError("Bad password history length: " + "valid range is 0 to 1024") if min_pwd_length is not None and min_pwd_length > 255: - raise CommandError("Bad minimum password length: valid range is 0 to 255") + raise CommandError("Bad minimum password length: " + "valid range is 0 to 255") if min_pwd_age is not None and max_pwd_age is not None: # note max-age=zero is a special case meaning 'never expire' if min_pwd_age >= max_pwd_age and max_pwd_age != 0: - raise CommandError("Minimum password age must be less than the maximum age") + raise CommandError("Minimum password age must be less than " + "maximum age") # the same args are used for both create and set commands @@ -250,21 +254,29 @@ pwd_settings_options = [ Option("--complexity", type="choice", choices=["on", "off"], help="The password complexity (on | off)."), Option("--store-plaintext", type="choice", choices=["on", "off"], - help="Store plaintext passwords where account have 'store passwords with reversible encryption' set (on | off)."), + help="Store plaintext passwords where account have " + "'store passwords with reversible encryption' set (on | off)."), Option("--history-length", help="The password history length ().", type=int), Option("--min-pwd-length", help="The minimum password length ().", type=int), Option("--min-pwd-age", - help="The minimum password age (). Default is domain setting.", type=int), + help=("The minimum password age (). " + "Default is domain setting."), type=int), Option("--max-pwd-age", - help="The maximum password age (). Default is domain setting.", type=int), - Option("--account-lockout-duration", - help="The the length of time an account is locked out after exeeding the limit on bad password attempts (). Default is domain setting", type=int), - Option("--account-lockout-threshold", - help="The number of bad password attempts allowed before locking out the account (). Default is domain setting.", type=int), + help=("The maximum password age (). " + "Default is domain setting."), type=int), + Option("--account-lockout-duration", type=int, + help=("The length of time an account is locked out after exceeding " + "the limit on bad password attempts (). " + "Default is domain setting")), + Option("--account-lockout-threshold", type=int, + help=("The number of bad password attempts allowed before locking " + "out the account (). Default is domain setting.")), Option("--reset-account-lockout-after", - help="After this time is elapsed, the recorded number of attempts restarts from zero (). Default is domain setting.", type=int)] + help=("After this time is elapsed, the recorded number of attempts " + "restarts from zero (). " + "Default is domain setting."), type=int)] def num_options_in_args(options, args): @@ -309,8 +321,8 @@ class cmd_domain_pwdsettings_pso_create(Command): } takes_options = pwd_settings_options + [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H") + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str) ] takes_args = ["psoname", "precedence"] @@ -329,7 +341,8 @@ class cmd_domain_pwdsettings_pso_create(Command): try: precedence = int(precedence) except ValueError: - raise CommandError("The PSO's precedence should be a numerical value. Try --help") + raise CommandError("The PSO's precedence should be " + "a numerical value. Try --help") # sanity-check that the PSO doesn't already exist pso_dn = "CN=%s,%s" % (psoname, pso_container(samdb)) @@ -347,14 +360,17 @@ class cmd_domain_pwdsettings_pso_create(Command): # otherwise there's no point in creating a PSO num_pwd_args = num_options_in_args(pwd_settings_options, self.raw_argv) if num_pwd_args == 0: - raise CommandError("Please specify at least one password policy setting. Try --help") + raise CommandError("Please specify at least one password policy " + "setting. Try --help") # it's unlikely that the user will specify all 9 password policy # settings on the CLI - current domain password-settings as the default # values for unspecified arguments if num_pwd_args < len(pwd_settings_options): - self.message("Not all password policy options have been specified.") - self.message("For unspecified options, the current domain password settings will be used as the default values.") + self.message("Not all password policy options " + "have been specified.") + self.message("For unspecified options, the current domain password" + " settings will be used as the default values.") # lookup the current domain password-settings res = samdb.search(samdb.domain_dn(), scope=ldb.SCOPE_BASE, @@ -420,7 +436,8 @@ class cmd_domain_pwdsettings_pso_create(Command): except ldb.LdbError as e: (num, msg) = e.args if num == ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS: - raise CommandError("Administrator permissions are needed to create a PSO.") + raise CommandError("Administrator permissions are needed " + "to create a PSO.") else: raise CommandError("Failed to create PSO '%s': %s" % (pso_dn, msg)) @@ -439,7 +456,8 @@ class cmd_domain_pwdsettings_pso_set(Command): takes_options = pwd_settings_options + [ Option("--precedence", type=int, - help="This PSO's precedence relative to other PSOs. Lower precedence is better ()."), + help=("This PSO's precedence relative to other PSOs. " + "Lower precedence is better ().")), Option("-H", "--URL", help="LDB URL for database or target server", type=str, metavar="URL", dest="H"), ] @@ -464,19 +482,23 @@ class cmd_domain_pwdsettings_pso_set(Command): # we expect the user to specify at least one password-policy setting num_pwd_args = num_options_in_args(pwd_settings_options, self.raw_argv) if num_pwd_args == 0 and precedence is None: - raise CommandError("Please specify at least one password policy setting. Try --help") + raise CommandError("Please specify at least one password policy " + "setting. Try --help") if min_pwd_age is not None or max_pwd_age is not None: - # if we're modifying either the max or min pwd-age, check the max is - # always larger. We may have to fetch the PSO's setting to verify this + # if we're modifying either the max or min pwd-age, check the max + # is always larger. We may have to fetch the PSO's setting to + # verify this res = samdb.search(pso_dn, scope=ldb.SCOPE_BASE, attrs=['msDS-MinimumPasswordAge', 'msDS-MaximumPasswordAge']) if min_pwd_age is None: - min_pwd_age = timestamp_to_days(res[0]['msDS-MinimumPasswordAge'][0]) + min_pwd_ticks = res[0]['msDS-MinimumPasswordAge'][0] + min_pwd_age = timestamp_to_days(min_pwd_ticks) if max_pwd_age is None: - max_pwd_age = timestamp_to_days(res[0]['msDS-MaximumPasswordAge'][0]) + max_pwd_ticks = res[0]['msDS-MaximumPasswordAge'][0] + max_pwd_age = timestamp_to_days(max_pwd_ticks) check_pso_constraints(max_pwd_age=max_pwd_age, min_pwd_age=min_pwd_age, history_length=history_length, @@ -516,8 +538,8 @@ class cmd_domain_pwdsettings_pso_delete(Command): } takes_options = [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H") + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str) ] takes_args = ["psoname"] @@ -556,8 +578,8 @@ class cmd_domain_pwdsettings_pso_list(Command): } takes_options = [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H") + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str) ] def run(self, H=None, credopts=None, sambaopts=None, versionopts=None): @@ -574,7 +596,8 @@ class cmd_domain_pwdsettings_pso_list(Command): # an unprivileged search against Windows returns nothing here. On Samba # we get the PSO names, but not their attributes if len(res) == 0 or 'msDS-PasswordSettingsPrecedence' not in res[0]: - self.outf.write("No PSOs are present, or you don't have permission to view them.\n") + self.outf.write("No PSOs are present, or you don't have permission" + " to view them.\n") return # sort the PSOs so they're displayed in order of precedence @@ -600,8 +623,8 @@ class cmd_domain_pwdsettings_pso_show(Command): } takes_options = [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H") + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str) ] takes_args = ["psoname"] @@ -630,8 +653,8 @@ class cmd_domain_pwdsettings_pso_show_user(Command): } takes_options = [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H") + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str) ] takes_args = ["username"] @@ -666,8 +689,8 @@ class cmd_domain_pwdsettings_pso_apply(Command): } takes_options = [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H") + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str) ] takes_args = ["psoname", "user_or_group"] @@ -696,7 +719,8 @@ class cmd_domain_pwdsettings_pso_apply(Command): target_dn = str(res[0].dn) m = ldb.Message() m.dn = ldb.Dn(samdb, pso_dn) - m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn, ldb.FLAG_MOD_ADD, + m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn, + ldb.FLAG_MOD_ADD, "msDS-PSOAppliesTo") try: samdb.modify(m) @@ -725,8 +749,8 @@ class cmd_domain_pwdsettings_pso_unapply(Command): } takes_options = [ - Option("-H", "--URL", help="LDB URL for database or target server", type=str, - metavar="URL", dest="H"), + Option("-H", "--URL", help="LDB URL for database or target server", + metavar="URL", dest="H", type=str), ] takes_args = ["psoname", "user_or_group"] @@ -755,7 +779,8 @@ class cmd_domain_pwdsettings_pso_unapply(Command): target_dn = str(res[0].dn) m = ldb.Message() m.dn = ldb.Dn(samdb, pso_dn) - m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn, ldb.FLAG_MOD_DELETE, + m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn, + ldb.FLAG_MOD_DELETE, "msDS-PSOAppliesTo") try: samdb.modify(m) diff --git a/python/samba/tests/pso.py b/python/samba/tests/pso.py index 0d37108e060..57ec03e62b3 100644 --- a/python/samba/tests/pso.py +++ b/python/samba/tests/pso.py @@ -56,8 +56,9 @@ class TestUser: if hist_len == 0: return self.all_old_passwords[:] - # just exclude our pwd_history if there's not much in it. This can happen - # if we've been using a lower PasswordHistoryLength setting previously + # just exclude our pwd_history if there's not much in it. This can + # happen if we've been using a lower PasswordHistoryLength setting + # previously hist_len = min(len(self.pwd_history), hist_len) # return any passwords up to the nth-from-last item @@ -67,8 +68,9 @@ class TestUser: """Updates the user's password history to reflect a password change""" # we maintain 2 lists: all passwords the user has ever had, and an # effective password-history that should roughly mirror the DC. - # pwd_history_change() handles the corner-case where we need to truncate - # password-history due to PasswordHistoryLength settings changes + # pwd_history_change() handles the corner-case where we need to + # truncate password-history due to PasswordHistoryLength settings + # changes if new_password in self.all_old_passwords: self.all_old_passwords.remove(new_password) self.all_old_passwords.append(new_password) @@ -102,15 +104,16 @@ add: userPassword userPassword: %s """ % (self.dn, self.get_password(), new_password) # this modify will throw an exception if new_password doesn't meet the - # PSO constraints (which the test code catches if it's expected to fail) + # PSO constraints (which the test code catches if it's expected to + # fail) self.ldb.modify_ldif(ldif) self.update_pwd_history(new_password) def pwd_history_change(self, old_hist_len, new_hist_len): """ - Updates what in the password history will take effect, to reflect changes - on the DC. When the PasswordHistoryLength applied to a user changes from - a low setting (e.g. 2) to a higher setting (e.g. 4), passwords #3 and #4 + Updates the effective password history, to reflect changes on the DC. + When the PasswordHistoryLength applied to a user changes from a low + setting (e.g. 2) to a higher setting (e.g. 4), passwords #3 and #4 won't actually have been stored on the DC, so we need to make sure they are removed them from our mirror pwd_history list. """ @@ -267,4 +270,3 @@ msDS-PasswordSettingsPrecedence: %u """ % (self.dn, new_precedence) samdb.modify_ldif(ldif) self.precedence = new_precedence - diff --git a/python/samba/tests/samba_tool/passwordsettings.py b/python/samba/tests/samba_tool/passwordsettings.py index 9c934b41dc9..e29c76c730d 100644 --- a/python/samba/tests/samba_tool/passwordsettings.py +++ b/python/samba/tests/samba_tool/passwordsettings.py @@ -31,8 +31,8 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): self.user_auth = "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]) self.ldb = self.getSamDB("-H", self.server, self.user_auth) - self.pso_container = \ - "CN=Password Settings Container,CN=System,%s" % self.ldb.domain_dn() + system_dn = "CN=System,%s" % self.ldb.domain_dn() + self.pso_container = "CN=Password Settings Container,%s" % system_dn self.obj_cleanup = [] def tearDown(self): @@ -48,9 +48,12 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): dn = "CN=%s,%s" % (pso_name, self.pso_container) pso_attrs = ['name', 'msDS-PasswordSettingsPrecedence', 'msDS-PasswordReversibleEncryptionEnabled', - 'msDS-PasswordHistoryLength', 'msDS-MinimumPasswordLength', - 'msDS-PasswordComplexityEnabled', 'msDS-MinimumPasswordAge', - 'msDS-MaximumPasswordAge', 'msDS-LockoutObservationWindow', + 'msDS-PasswordHistoryLength', + 'msDS-MinimumPasswordLength', + 'msDS-PasswordComplexityEnabled', + 'msDS-MinimumPasswordAge', + 'msDS-MaximumPasswordAge', + 'msDS-LockoutObservationWindow', 'msDS-LockoutThreshold', 'msDS-LockoutDuration'] res = self.ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=pso_attrs) self.assertEquals(len(res), 1, "PSO lookup failed") @@ -67,8 +70,8 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): # check the PSO's settings match the search results self.assertEquals(str(res[0]['msDS-PasswordComplexityEnabled'][0]), complexity_str) - self.assertEquals(str(res[0]['msDS-PasswordReversibleEncryptionEnabled'][0]), - plaintext_str) + plaintext_res = res[0]['msDS-PasswordReversibleEncryptionEnabled'][0] + self.assertEquals(str(plaintext_res), plaintext_str) self.assertEquals(int(res[0]['msDS-PasswordHistoryLength'][0]), pso.history_len) self.assertEquals(int(res[0]['msDS-MinimumPasswordLength'][0]), @@ -89,13 +92,14 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): "pso", "show"), pso_name, "-H", self.server, self.user_auth) - self.assertTrue(len(out.split(":")) >= 10, "Expect 10 fields displayed") + self.assertTrue(len(out.split(":")) >= 10, + "Expect 10 fields displayed") # for a few settings, sanity-check the display is what we expect self.assertIn("Minimum password length: %u" % pso.password_len, out) self.assertIn("Password history length: %u" % pso.history_len, out) - self.assertIn("lockout threshold (attempts): %u" % pso.lockout_attempts, - out) + lockout_str = "lockout threshold (attempts): %u" % pso.lockout_attempts + self.assertIn(lockout_str, out) def test_pso_create(self): """Tests basic PSO creation using the samba-tool""" @@ -207,14 +211,14 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): pso_settings.precedence = 99 pso_settings.lockout_attempts = 10 pso_settings.lockout_duration = 60 * 17 - (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", - "pso", "set"), pso_name, - "--precedence=99", - "--account-lockout-threshold=10", - "--account-lockout-duration=17", - "-H", self.server, - self.user_auth) - self.assertCmdSuccess(result, out, err) + (res, out, err) = self.runsublevelcmd("domain", ("passwordsettings", + "pso", "set"), pso_name, + "--precedence=99", + "--account-lockout-threshold=10", + "--account-lockout-duration=17", + "-H", self.server, + self.user_auth) + self.assertCmdSuccess(res, out, err) self.assertEquals(err, "", "Shouldn't be any error messages") self.assertIn("Successfully updated", out) @@ -259,8 +263,8 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): # first check the samba-tool output tells us the correct PSO is applied (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", - "pso", "show-user"), user.name, - "-H", self.server, + "pso", "show-user"), + user.name, "-H", self.server, self.user_auth) self.assertCmdSuccess(result, out, err) self.assertEquals(err, "", "Shouldn't be any error messages") @@ -363,19 +367,22 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", "pso", "create"), "bad-perm", "250", "--complexity=off", - "-H", self.server, unpriv_auth) + "-H", self.server, + unpriv_auth) self.assertCmdFail(result, "Need admin privileges to modify PSO") self.assertIn("Administrator permissions are needed", err) (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", "pso", "delete"), pso_name, - "-H", self.server, unpriv_auth) + "-H", self.server, + unpriv_auth) self.assertCmdFail(result, "Need admin privileges to delete PSO") self.assertIn("You may not have permission", err) (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", "pso", "show"), pso_name, - "-H", self.server, unpriv_auth) + "-H", self.server, + unpriv_auth) self.assertCmdFail(result, "Need admin privileges to view PSO") self.assertIn("You may not have permission", err) @@ -420,9 +427,9 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): # check we can change the domain setting self.addCleanup(self.ldb.set_minPwdLength, min_pwd_len) new_len = int(min_pwd_len) + 3 + min_pwd_args = "--min-pwd-length=%u" % new_len (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", - "set"), - "--min-pwd-length=%u" % new_len, + "set"), min_pwd_args, "-H", self.server, self.user_auth) self.assertCmdSuccess(result, out, err) @@ -437,4 +444,3 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest): self.assertCmdSuccess(result, out, err) self.assertEquals(err, "", "Shouldn't be any error messages") self.assertIn("Minimum password length: %u" % new_len, out) - diff --git a/source4/dsdb/tests/python/password_settings.py b/source4/dsdb/tests/python/password_settings.py index f0dde562ea2..88d7c3d38c1 100644 --- a/source4/dsdb/tests/python/password_settings.py +++ b/source4/dsdb/tests/python/password_settings.py @@ -26,7 +26,8 @@ # Usage: # export SERVER_IP=target_dc # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun -# PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN password_settings -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN \ +# password_settings -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" # import samba.tests @@ -37,6 +38,7 @@ import time from samba.tests.password_test import PasswordTestCase from samba.tests.pso import TestUser from samba.tests.pso import PasswordSettings +from samba.tests import env_get_var_value from samba.credentials import Credentials from samba import gensec import base64 @@ -46,7 +48,7 @@ class PasswordSettingsTestCase(PasswordTestCase): def setUp(self): super(PasswordSettingsTestCase, self).setUp() - self.host_url = "ldap://%s" % samba.tests.env_get_var_value("SERVER_IP") + self.host_url = "ldap://%s" % env_get_var_value("SERVER_IP") self.ldb = samba.tests.connect_samdb(self.host_url) # create a temp OU to put this test's users into @@ -65,7 +67,7 @@ class PasswordSettingsTestCase(PasswordTestCase): # remove all objects under the top-level OU self.ldb.delete(self.ou, ["tree_delete:1"]) - # PSOs can't reside within an OU so they need to be cleaned up separately + # PSOs can't reside within an OU so they get cleaned up separately for obj in self.test_objs: self.ldb.delete(obj) @@ -79,7 +81,8 @@ class PasswordSettingsTestCase(PasswordTestCase): self.ldb.add({"dn": dn, "objectclass": "group"}) return dn - def set_attribute(self, dn, attr, value, operation=FLAG_MOD_ADD, samdb=None): + def set_attribute(self, dn, attr, value, operation=FLAG_MOD_ADD, + samdb=None): """Modifies an attribute for an object""" if samdb is None: samdb = self.ldb @@ -114,7 +117,8 @@ class PasswordSettingsTestCase(PasswordTestCase): except ldb.LdbError as e: (num, msg) = e.args # fail the test (rather than throw an error) - self.fail("Password '%s' unexpectedly rejected: %s" % (password, msg)) + self.fail("Password '%s' unexpectedly rejected: %s" % (password, + msg)) def assert_PSO_applied(self, user, pso): """ @@ -191,7 +195,7 @@ class PasswordSettingsTestCase(PasswordTestCase): self.assert_password_valid(user, password) def test_pso_basics(self): - """Simple tests that a PSO takes effect when applied to a group or user""" + """Simple tests that a PSO takes effect when applied to a group/user""" # create some PSOs that vary in priority and basic password-len best_pso = PasswordSettings("highest-priority-PSO", self.ldb, @@ -237,7 +241,8 @@ class PasswordSettingsTestCase(PasswordTestCase): self.assert_PSO_applied(user, best_pso) # delete a group membership and check the PSO changes - self.set_attribute(group3, "member", user.dn, operation=FLAG_MOD_DELETE) + self.set_attribute(group3, "member", user.dn, + operation=FLAG_MOD_DELETE) self.assert_PSO_applied(user, medium_pso) # apply the low-precedence PSO directly to the user @@ -311,7 +316,8 @@ class PasswordSettingsTestCase(PasswordTestCase): self.assert_PSO_applied(user, group2_pso) def get_guid(self, dn): - res = self.ldb.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE) + res = self.ldb.search(base=dn, attrs=["objectGUID"], + scope=ldb.SCOPE_BASE) return res[0]['objectGUID'][0] def guid_string(self, guid): @@ -333,7 +339,8 @@ class PasswordSettingsTestCase(PasswordTestCase): best_guid = guid_list[0] # sanity-check the mapping between GUID and DN is correct - self.assertEqual(self.guid_string(self.get_guid(mapping[best_guid].dn)), + best_pso_dn = mapping[best_guid].dn + self.assertEqual(self.guid_string(self.get_guid(best_pso_dn)), self.guid_string(best_guid)) # return the PSO that this GUID corresponds to @@ -457,7 +464,8 @@ class PasswordSettingsTestCase(PasswordTestCase): pso.apply_to(user.dn) self.assertTrue(user.get_resultant_PSO() == pso.dn) - # changing the password immediately should fail, even if password is valid + # changing the password immediately should fail, even if the password + # is valid valid_password = "min-age-passwd" self.assert_password_invalid(user, valid_password) # then trying the same password later should succeed @@ -481,8 +489,9 @@ class PasswordSettingsTestCase(PasswordTestCase): user = self.add_user("testuser") - # we can't wait around long enough for the max-age to expire, so instead - # just check the msDS-UserPasswordExpiryTimeComputed for the user + # we can't wait around long enough for the max-age to expire, so + # instead just check the msDS-UserPasswordExpiryTimeComputed for + # the user attrs = ['msDS-UserPasswordExpiryTimeComputed'] res = self.ldb.search(user.dn, attrs=attrs) domain_expiry = int(res[0]['msDS-UserPasswordExpiryTimeComputed'][0]) @@ -519,9 +528,10 @@ class PasswordSettingsTestCase(PasswordTestCase): precedence=2, password_len=10) self.add_obj_cleanup([default_pso.dn, guest_pso.dn, admin_pso.dn, builtin_pso.dn]) - domain_users = "CN=Domain Users,CN=Users,%s" % self.ldb.domain_dn() - domain_guests = "CN=Domain Guests,CN=Users,%s" % self.ldb.domain_dn() - admin_users = "CN=Domain Admins,CN=Users,%s" % self.ldb.domain_dn() + base_dn = self.ldb.domain_dn() + domain_users = "CN=Domain Users,CN=Users,%s" % base_dn + domain_guests = "CN=Domain Guests,CN=Users,%s" % base_dn + admin_users = "CN=Domain Admins,CN=Users,%s" % base_dn # if we apply a PSO to Domain Users (which all users are a member of) # then that PSO should take effect on a new user @@ -532,8 +542,8 @@ class PasswordSettingsTestCase(PasswordTestCase): # Apply a PSO to a builtin group. 'Domain Users' should be a member of # Builtin/Users, but builtin groups should be excluded from the PSO # calculation, so this should have no effect - builtin_pso.apply_to("CN=Users,CN=Builtin,%s" % self.ldb.domain_dn()) - builtin_pso.apply_to("CN=Administrators,CN=Builtin,%s" % self.ldb.domain_dn()) + builtin_pso.apply_to("CN=Users,CN=Builtin,%s" % base_dn) + builtin_pso.apply_to("CN=Administrators,CN=Builtin,%s" % base_dn) self.assert_PSO_applied(user, default_pso) # change the user's primary group to another group (the primaryGroupID @@ -798,7 +808,9 @@ unicodePwd:: %s def set_domain_pwdHistoryLength(self, value): m = ldb.Message() m.dn = ldb.Dn(self.ldb, self.ldb.domain_dn()) - m["pwdHistoryLength"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "pwdHistoryLength") + m["pwdHistoryLength"] = ldb.MessageElement(value, + ldb.FLAG_MOD_REPLACE, + "pwdHistoryLength") self.ldb.modify(m) def test_domain_pwd_history(self): @@ -852,8 +864,3 @@ unicodePwd:: %s # *next* time the user's password changes. self.set_domain_pwdHistoryLength("1") self.assert_password_invalid(user, "NewPwd12#") - - - - - diff --git a/source4/torture/drs/python/getncchanges.py b/source4/torture/drs/python/getncchanges.py index 086dcd19d4d..097866d81fc 100644 --- a/source4/torture/drs/python/getncchanges.py +++ b/source4/torture/drs/python/getncchanges.py @@ -24,7 +24,8 @@ # export DC1=dc1_dns_name # export DC2=dc2_dns_name # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun -# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \ +# getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" # from __future__ import print_function @@ -101,7 +102,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): def start_new_repl_cycle(self): """Resets enough state info to start a new replication cycle""" # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know - # whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve + # whether a parent/target is unknown and needs GET_ANC/GET_TGT to + # resolve self.rxd_links = [] self.used_get_tgt = False @@ -159,13 +161,16 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # Note that with GET_ANC Windows can end up sending the same parent # object multiple times, so this might be noteworthy but doesn't # warrant failing the test - if (len(received_list) != len(expected_list)): - print("Note: received %d objects but expected %d" % (len(received_list), - len(expected_list))) + num_received = len(received_list) + num_expected = len(expected_list) + if num_received != num_expected: + print("Note: received %d objects but expected %d" % (num_received, + num_expected)) # Check that we received every object that we were expecting for dn in expected_list: - self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn) + self.assertTrue(dn in received_list, + "DN '%s' missing from replication." % dn) def test_repl_integrity(self): """ @@ -176,8 +181,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # The server behaviour differs between samba and Windows. Samba returns # the objects in the original order (up to the pre-modify HWM). Windows # incorporates the modified objects and returns them in the new order - # (i.e. modified objects last), up to the post-modify HWM. The Microsoft - # docs state the Windows behaviour is optional. + # (i.e. modified objects last), up to the post-modify HWM. The + # Microsoft docs state the Windows behaviour is optional. # Create a range of objects to replicate. expected_dn_list = self.create_object_range(0, 400) @@ -188,11 +193,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # so long as by the end we've received everything self.repl_get_next() - # Modify some of the second page of objects. This should bump the highwatermark + # Modify some of the second page of objects. This should bump the + # highwatermark for x in range(100, 200): self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x) - (post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc) + (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc) self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn) # Get the remaining blocks of data @@ -223,7 +229,9 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): return parent_dn == self.ou or parent_dn in known_dn_list def _repl_send_request(self, get_anc=False, get_tgt=False): - """Sends a GetNCChanges request for the next block of replication data.""" + """ + Sends a GetNCChanges request for the next block of replication data. + """ # we're just trying to mimic regular client behaviour here, so just # use the highwatermark in the last response we received @@ -240,7 +248,7 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): more_flags = 0 if get_anc: - replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC + replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC self.used_get_anc = True if get_tgt: @@ -252,6 +260,7 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): max_objects=self.max_objects, highwatermark=highwatermark, uptodateness_vector=uptodateness_vector, + more_flags=more_flags) def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False): @@ -298,11 +307,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): return self.repl_get_next(get_anc=True, get_tgt=get_tgt, assert_links=assert_links) - # check we know about references to any objects in the linked attritbutes + # check we know about references to any objects in the linked attrs received_links = self._get_ctr6_links(ctr6) - # This is so that older versions of Samba fail - we want the links to be - # sent roughly with the objects, rather than getting all links at the end + # This is so that older versions of Samba fail - we want the links to + # be sent roughly with the objects, rather than getting all links at + # the end if assert_links: self.assertTrue(len(received_links) > 0, "Links were expected in the GetNCChanges response") @@ -363,8 +373,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): def test_repl_integrity_get_anc(self): """ - Modify the parent objects being replicated while the replication is still - in progress (using GET_ANC) and check that no object loss occurs. + Modify the parent objects being replicated while the replication is + still in progress (using GET_ANC) and check that no object loss occurs. """ # Note that GET_ANC behaviour varies between Windows and Samba. @@ -453,13 +463,15 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # Look up the link attribute in the DB # The extended_dn option will dump the GUID info for the link # attribute (as a hex blob) - res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr], - controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE) + res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), + attrs=[link_attr], + controls=['extended_dn:1:0'], + scope=ldb.SCOPE_BASE) # We didn't find the expected link attribute in the DB for the object. # Something has gone wrong somewhere... - self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s" - % (dn, link_attr)) + self.assertTrue(link_attr in res[0], + "%s in DB doesn't have attribute %s" % (dn, link_attr)) # find the received link in the list and assert that the target and # source GUIDs match what's in the DB @@ -481,7 +493,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): print("Link %s --> %s" % (dn[:25], link.targetDN[:25])) break - self.assertTrue(found, "Did not receive expected link for DN %s" % dn) + self.assertTrue(found, + "Did not receive expected link for DN %s" % dn) def test_repl_get_tgt(self): """ @@ -512,7 +525,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # with GET_TGT while not self.replication_complete(): - # get the next block of replication data (this sets GET_TGT if needed) + # get the next block of replication data (this sets GET_TGT + # if needed) self.repl_get_next(assert_links=links_expected) links_expected = len(self.rxd_links) < len(expected_links) @@ -550,10 +564,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): self.modify_object(objectsA[i], "managedBy", objectsB[i]) for i in range(0, 100): - self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100]) + self.modify_object(objectsB[i], "managedBy", + objectsC[(i + 1) % 100]) for i in range(0, 100): - self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100]) + self.modify_object(objectsC[i], "managedBy", + objectsB[(i + 1) % 100]) all_objects = objectsA + objectsB + objectsC expected_links = all_objects @@ -567,7 +583,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # with GET_TGT while not self.replication_complete(): - # get the next block of replication data (this sets GET_TGT if needed) + # get the next block of replication data (this sets GET_TGT + # if needed) self.repl_get_next(assert_links=links_expected) links_expected = len(self.rxd_links) < len(expected_links) @@ -602,8 +619,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): self.repl_get_next(get_tgt=True) # create the target objects and add the links. These objects should be - # outside the scope of the Samba replication cycle, but the links should - # still get sent with the source object + # outside the scope of the Samba replication cycle, but the links + # should still get sent with the source object managers = self.create_object_range(0, 100, prefix="manager") for i in range(0, 100): @@ -643,7 +660,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # Add links from the parents to the children for x in range(0, 100): - self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100]) + self.modify_object(parent_dn_list[x], "managedBy", + expected_dn_list[x + 100]) # add some filler objects at the end. This allows us to easily see # which chunk the links get sent in @@ -706,7 +724,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # with GET_TGT and GET_ANC while not self.replication_complete(): - # get the next block of replication data (this sets GET_TGT/GET_ANC) + # get the next block of replication data (this sets + # GET_TGT/GET_ANC) self.repl_get_next(assert_links=links_expected) links_expected = len(self.rxd_links) < len(expected_links) @@ -842,23 +861,31 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): def restore_deleted_object(self, guid, new_dn): """Re-animates a deleted object""" - res = self.test_ldb_dc.search(base="" % self._GUID_string(guid), attrs=["isDeleted"], - controls=['show_deleted:1'], scope=ldb.SCOPE_BASE) + guid_str = self._GUID_string(guid) + res = self.test_ldb_dc.search(base="" % guid_str, + attrs=["isDeleted"], + controls=['show_deleted:1'], + scope=ldb.SCOPE_BASE) if len(res) != 1: return msg = ldb.Message() msg.dn = res[0].dn - msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted") - msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName") + msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, + "isDeleted") + msg["distinguishedName"] = ldb.MessageElement([new_dn], + ldb.FLAG_MOD_REPLACE, + "distinguishedName") self.test_ldb_dc.modify(msg, ["show_deleted:1"]) def sync_DCs(self, nc_dn=None): # make sure DC1 has all the changes we've made to DC2 - self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn) + self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, + nc_dn=nc_dn) def get_object_guid(self, dn): - res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE) + res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], + scope=ldb.SCOPE_BASE) return res[0]['objectGUID'][0] def set_dc_connection(self, conn): @@ -921,7 +948,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): la_sources = self.create_object_range(0, 100, prefix="la_src") la_targets = self.create_object_range(0, 100, prefix="la_tgt") - # store the target object's GUIDs (we need to know these to reanimate them) + # store the target object's GUIDs (we need to know these to + # reanimate them) target_guids = [] for dn in la_targets: @@ -984,15 +1012,16 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): self.add_object(la_source) # create the link target (a server object) in the config NC + sites_dn = "CN=Sites,%s" % self.config_dn + servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn rand = random.randint(1, 10000000) - la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \ - "CN=Sites,%s" % (rand, self.config_dn) + la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn) self.add_object(la_target, objectclass="server") # add a cross-partition link between the two self.modify_object(la_source, "managedBy", la_target) - # First, sync across to the peer the NC containing the link source object + # First, sync to the peer the NC containing the link source object self.sync_DCs() # Now, before the peer has received the partition containing the target @@ -1002,8 +1031,9 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): while not self.replication_complete(): # pretend we've received other link targets out of order and that's - # forced us to use GET_TGT. This checks the peer doesn't fail trying - # to fetch a cross-partition target object that doesn't exist + # forced us to use GET_TGT. This checks the peer doesn't fail + # trying to fetch a cross-partition target object that doesn't + # exist self.repl_get_next(get_tgt=True) self.set_dc_connection(self.default_conn) @@ -1029,14 +1059,14 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): attrs=["managedBy"], controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE) - self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute" - % la_source) + self.assertFalse("managedBy" in res[0], + "%s in DB still has managedBy attribute" % la_source) res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source), attrs=["managedBy"], controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE) - self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute" - % la_source) + self.assertFalse("managedBy" in res[0], + "%s in DB still has managedBy attribute" % la_source) # Check receiving a cross-partition link to a deleted target. # Delete the target and make sure the deletion is sync'd between DCs @@ -1056,8 +1086,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): attrs=["managedBy"], controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE) - self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute" - % la_source) + self.assertTrue("managedBy" in res[0], + "%s in DB missing managedBy attribute" % la_source) # cleanup the server object we created in the Configuration partition self.test_ldb_dc.delete(la_target) @@ -1094,7 +1124,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # check we received all the expected objects/links self.assert_expected_data(expected_objects) - self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500) + self.assert_expected_links([la_source], link_attr="addressBookRoots2", + num_expected=500) # Do the replication again, forcing the use of GET_TGT this time self.init_test_state() @@ -1113,7 +1144,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase): # check we received all the expected objects/links self.assert_expected_data(expected_objects) - self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500) + self.assert_expected_links([la_source], link_attr="addressBookRoots2", + num_expected=500) class DcConnection: @@ -1122,6 +1154,5 @@ class DcConnection: def __init__(self, drs_base, ldb_dc, dnsname_dc): self.ldb_dc = ldb_dc (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc) - (self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc) - - + (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc) + self.default_utdv = utdv diff --git a/source4/torture/drs/python/link_conflicts.py b/source4/torture/drs/python/link_conflicts.py index 6e74a56316d..4d6f05f41c1 100644 --- a/source4/torture/drs/python/link_conflicts.py +++ b/source4/torture/drs/python/link_conflicts.py @@ -25,7 +25,8 @@ # export DC1=dc1_dns_name # export DC2=dc2_dns_name # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun -# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN link_conflicts -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" +# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \ +# link_conflicts -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" # import drs_base @@ -37,6 +38,7 @@ import time from drs_base import AbstractLink from samba.dcerpc import drsuapi, misc +from samba.dcerpc.drsuapi import DRSUAPI_EXOP_ERR_SUCCESS # specifies the order to sync DCs in DC1_TO_DC2 = 1 @@ -47,7 +49,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): def setUp(self): super(DrsReplicaLinkConflictTestCase, self).setUp() - self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_link_conflict") + self.ou = samba.tests.create_test_ou(self.ldb_dc1, + "test_link_conflict") self.base_dn = self.ldb_dc1.get_default_basedn() (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1) @@ -97,12 +100,16 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): """Manually syncs the 2 DCs to ensure they're in sync""" if sync_order == DC1_TO_DC2: # sync DC1-->DC2, then DC2-->DC1 - self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1) - self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2) + self._net_drs_replicate(DC=self.dnsname_dc2, + fromDC=self.dnsname_dc1) + self._net_drs_replicate(DC=self.dnsname_dc1, + fromDC=self.dnsname_dc2) else: # sync DC2-->DC1, then DC1-->DC2 - self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2) - self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1) + self._net_drs_replicate(DC=self.dnsname_dc1, + fromDC=self.dnsname_dc2) + self._net_drs_replicate(DC=self.dnsname_dc2, + fromDC=self.dnsname_dc1) def ensure_unique_timestamp(self): """Waits a second to ensure a unique timestamp between 2 objects""" @@ -123,12 +130,14 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): """ actual_len = len(res1[0][attr]) self.assertTrue(actual_len == expected_count, - "Expected %u %s attributes, but got %u" % (expected_count, - attr, actual_len)) + "Expected %u %s attributes, got %u" % (expected_count, + attr, + actual_len)) actual_len = len(res2[0][attr]) self.assertTrue(actual_len == expected_count, - "Expected %u %s attributes, but got %u" % (expected_count, - attr, actual_len)) + "Expected %u %s attributes, got %u" % (expected_count, + attr, + actual_len)) # check DCs both agree on the same linked attributes for val in res1[0][attr]: @@ -214,7 +223,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): self._check_replicated_links(src_ou, [link1, link2]) def test_conflict_single_valued_link(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_conflict_single_valued_link(sync_order=DC1_TO_DC2) self._test_conflict_single_valued_link(sync_order=DC2_TO_DC1) @@ -248,7 +258,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): self.assert_attrs_match(res1, res2, "managedBy", 1) def test_duplicate_single_valued_link(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_duplicate_single_valued_link(sync_order=DC1_TO_DC2) self._test_duplicate_single_valued_link(sync_order=DC2_TO_DC1) @@ -267,9 +278,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): # create the same user (link target) on each DC. # Note that the GUIDs will differ between the DCs target_dn = self.unique_dn("CN=target") - target1_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user") + target1_guid = self.add_object(self.ldb_dc1, target_dn, + objectclass="user") self.ensure_unique_timestamp() - target2_guid = self.add_object(self.ldb_dc2, target_dn, objectclass="user") + target2_guid = self.add_object(self.ldb_dc2, target_dn, + objectclass="user") # link the src group to the respective target created self.add_link_attr(self.ldb_dc1, src_dn, "member", target_dn) @@ -298,7 +311,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): "Expected link to conflicting target object not found") def test_conflict_multi_valued_link(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_conflict_multi_valued_link(sync_order=DC1_TO_DC2) self._test_conflict_multi_valued_link(sync_order=DC2_TO_DC1) @@ -331,7 +345,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): self.assert_attrs_match(res1, res2, "member", 1) def test_duplicate_multi_valued_link(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_duplicate_multi_valued_link(sync_order=DC1_TO_DC2) self._test_duplicate_multi_valued_link(sync_order=DC2_TO_DC1) @@ -343,7 +358,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): # create a common link target target_dn = self.unique_dn("CN=target") - target_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user") + target_guid = self.add_object(self.ldb_dc1, target_dn, + objectclass="user") self.sync_DCs() # create the same group (link source) on each DC. @@ -367,7 +383,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): scope=SCOPE_BASE, attrs=["memberOf"]) src1_backlink = False - # our test user should still be a member of 2 groups (check both DCs agree) + # our test user should still be a member of 2 groups (check both + # DCs agree) self.assert_attrs_match(res1, res2, "memberOf", 2) for val in res1[0]["memberOf"]: @@ -377,10 +394,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): src1_backlink = True self.assertTrue(src1_backlink, - "Expected backlink to conflicting source object not found") + "Backlink to conflicting source object not found") def test_conflict_backlinks(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_conflict_backlinks(sync_order=DC1_TO_DC2) self._test_conflict_backlinks(sync_order=DC2_TO_DC1) @@ -424,12 +442,15 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): res2 = self.ldb_dc2.search(base="" % src_guid, scope=SCOPE_BASE, attrs=["member"]) - # our test user should still be a member of the group (check both DCs agree) - self.assertTrue("member" in res1[0], "Expected member attribute missing") + # our test user should still be a member of the group (check both + # DCs agree) + self.assertTrue("member" in res1[0], + "Expected member attribute missing") self.assert_attrs_match(res1, res2, "member", 1) def test_link_deletion_conflict(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_link_deletion_conflict(sync_order=DC1_TO_DC2) self._test_link_deletion_conflict(sync_order=DC2_TO_DC1) @@ -440,7 +461,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): """ target_dn = self.unique_dn("CN=target") - target_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user") + target_guid = self.add_object(self.ldb_dc1, target_dn, + objectclass="user") src_dn = self.unique_dn("CN=src") src_guid = self.add_object(self.ldb_dc1, src_dn, objectclass="group") @@ -463,9 +485,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): # the object deletion should trump the link addition. # Check the link no longer exists on the remaining object res1 = self.ldb_dc1.search(base="" % search_guid, - scope=SCOPE_BASE, attrs=["member", "memberOf"]) + scope=SCOPE_BASE, + attrs=["member", "memberOf"]) res2 = self.ldb_dc2.search(base="" % search_guid, - scope=SCOPE_BASE, attrs=["member", "memberOf"]) + scope=SCOPE_BASE, + attrs=["member", "memberOf"]) self.assertFalse("member" in res1[0], "member attr shouldn't exist") self.assertFalse("member" in res2[0], "member attr shouldn't exist") @@ -473,13 +497,18 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): self.assertFalse("memberOf" in res2[0], "member attr shouldn't exist") def test_obj_deletion_conflict(self): - # repeat the test twice, to give each DC a chance to resolve the conflict - self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, del_target=True) - self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, del_target=True) + # repeat the test twice, to give each DC a chance to resolve + # the conflict + self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, + del_target=True) + self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, + del_target=True) # and also try deleting the source object instead of the link target - self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, del_target=False) - self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, del_target=False) + self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, + del_target=False) + self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, + del_target=False) def _test_full_sync_link_conflict(self, sync_order): """ @@ -501,11 +530,19 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): # Do a couple of full syncs which should resolve the conflict # (but only for one DC) if sync_order == DC1_TO_DC2: - self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, full_sync=True) - self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, full_sync=True) + self._net_drs_replicate(DC=self.dnsname_dc2, + fromDC=self.dnsname_dc1, + full_sync=True) + self._net_drs_replicate(DC=self.dnsname_dc2, + fromDC=self.dnsname_dc1, + full_sync=True) else: - self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, full_sync=True) - self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, full_sync=True) + self._net_drs_replicate(DC=self.dnsname_dc1, + fromDC=self.dnsname_dc2, + full_sync=True) + self._net_drs_replicate(DC=self.dnsname_dc1, + fromDC=self.dnsname_dc2, + full_sync=True) # delete and re-add the link on one DC self.del_link_attr(self.ldb_dc1, src_dn, "member", target_dn) @@ -525,11 +562,13 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): scope=SCOPE_BASE, attrs=["member"]) # check the membership still exits (and both DCs agree) - self.assertTrue("member" in res1[0], "Expected member attribute missing") + self.assertTrue("member" in res1[0], + "Expected member attribute missing") self.assert_attrs_match(res1, res2, "member", 1) def test_full_sync_link_conflict(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_full_sync_link_conflict(sync_order=DC1_TO_DC2) self._test_full_sync_link_conflict(sync_order=DC2_TO_DC1) @@ -586,7 +625,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): def _test_conflict_single_valued_link_deleted_loser(self, sync_order): """ - Tests a single-valued link conflict, where the losing link value is deleted. + Tests a single-valued link conflict, where the losing link value is + deleted. """ src_ou = self.unique_dn("OU=src") src_guid = self.add_object(self.ldb_dc1, src_ou) @@ -599,9 +639,9 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): target1_guid = self.add_object(self.ldb_dc1, target1_ou) target2_guid = self.add_object(self.ldb_dc2, target2_ou) - # add the links - we want the link to end up deleted on DC2, but active on - # DC1. DC1 has the better version and DC2 has the better timestamp - the - # better version should win + # add the links - we want the link to end up deleted on DC2, but active + # on DC1. DC1 has the better version and DC2 has the better timestamp - + # the better version should win self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou) self.del_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou) self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou) @@ -689,7 +729,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): self._check_replicated_links(src_ou, [link1, link2]) def test_conflict_existing_single_valued_link(self): - # repeat the test twice, to give each DC a chance to resolve the conflict + # repeat the test twice, to give each DC a chance to resolve + # the conflict self._test_conflict_existing_single_valued_link(sync_order=DC1_TO_DC2) self._test_conflict_existing_single_valued_link(sync_order=DC2_TO_DC1) @@ -707,7 +748,7 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): # get the link info via replication ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP, dest_dsa=None, - drs_error=drsuapi.DRSUAPI_EXOP_ERR_SUCCESS, + drs_error=DRSUAPI_EXOP_ERR_SUCCESS, exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ, highwatermark=self.zero_highwatermark(), nc_dn_str=src_ou) @@ -715,6 +756,6 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase): self.assertTrue(ctr6.linked_attributes_count == 1, "DRS didn't return a link") link = ctr6.linked_attributes[0] - self.assertTrue(link.meta_data.version == 1, - "Link version started from %u, not 1" % link.meta_data.version) - + rcvd_version = link.meta_data.version + self.assertTrue(rcvd_version == 1, + "Link version started from %u, not 1" % rcvd_version) -- 2.34.1