import optparse
import sys
-import os
-
sys.path.insert(0, "bin/python")
import samba
samba.ensure_external_module("testtools", "testtools")
samba.ensure_external_module("subunit", "subunit/python")
-import samba.getopt as options
-
-from samba.auth import system_session
from ldb import (LdbError, ERR_NO_SUCH_OBJECT, Message,
MessageElement, Dn, FLAG_MOD_REPLACE)
-from samba.samdb import SamDB
import samba.tests
import samba.dsdb as dsdb
+import samba.getopt as options
from subunit.run import SubunitTestRunner
+
import unittest
parser = optparse.OptionParser("urgent_replication.py [options] <host>")
sambaopts = options.SambaOptions(parser)
parser.add_option_group(sambaopts)
parser.add_option_group(options.VersionOptions(parser))
+
# use command line creds if available
credopts = options.CredentialsOptions(parser)
parser.add_option_group(credopts)
host = args[0]
-lp = sambaopts.get_loadparm()
-creds = credopts.get_credentials(lp)
class UrgentReplicationTests(samba.tests.TestCase):
def setUp(self):
super(UrgentReplicationTests, self).setUp()
- self.ldb = ldb
- self.base_dn = ldb.domain_dn()
+ self.ldb = samba.tests.connect_samdb(host, global_schema=False)
+ self.base_dn = self.ldb.domain_dn()
print "baseDN: %s\n" % self.base_dn
def test_nonurgent_object(self):
- """Test if the urgent replication is not activated
- when handling a non urgent object"""
+ """Test if the urgent replication is not activated when handling a non urgent object."""
self.ldb.add({
"dn": "cn=nonurgenttest,cn=users," + self.base_dn,
"objectclass":"user",
"samaccountname":"nonurgenttest",
"description":"nonurgenttest description"})
- # urgent replication should not be enabled when creating
+ # urgent replication should not be enabled when creating
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
# urgent replication should not be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
"description")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
-
def test_nTDSDSA_object(self):
- '''Test if the urgent replication is activated
- when handling a nTDSDSA object'''
+ """Test if the urgent replication is activated when handling a nTDSDSA object."""
self.ldb.add({
- "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" % self.ldb.get_config_basedn(),
+ "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,%s" %
+ self.ldb.get_config_basedn(),
"objectclass":"server",
"cn":"test server",
"name":"test server",
# urgent replication should NOT be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
"options")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
-
def test_crossRef_object(self):
- '''Test if the urgent replication is activated
- when handling a crossRef object'''
+ """Test if the urgent replication is activated when handling a crossRef object."""
self.ldb.add({
"dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
"objectClass": "crossRef",
"cn": "test crossRef",
- "dnsRoot": lp.get("realm").lower(),
+ "dnsRoot": self.get_loadparm().get("realm").lower(),
"instanceType": "4",
"nCName": self.base_dn,
"showInAdvancedViewOnly": "TRUE",
# urgent replication should NOT be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
"systemFlags")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
res = self.ldb.load_partition_usn("cn=Configuration," + self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
-
-
def test_attributeSchema_object(self):
- '''Test if the urgent replication is activated
- when handling an attributeSchema object'''
+ """Test if the urgent replication is activated when handling an attributeSchema object"""
try:
self.ldb.add_ldif(
except LdbError:
print "Not testing urgent replication when creating attributeSchema object ...\n"
- # urgent replication should be enabled when modifying
+ # urgent replication should be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m.dn = Dn(self.ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
"lDAPDisplayName")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
-
def test_classSchema_object(self):
- '''Test if the urgent replication is activated
- when handling a classSchema object'''
+ """Test if the urgent replication is activated when handling a classSchema object."""
try:
self.ldb.add_ldif(
"""dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
# urgent replication should be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
+ m.dn = Dn(self.ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
"lDAPDisplayName")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn("cn=Schema,cn=Configuration," + self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
-
def test_secret_object(self):
- '''Test if the urgent replication is activated
- when handling a secret object'''
+ """Test if the urgent replication is activated when handling a secret object."""
self.ldb.add({
"dn": "cn=test secret,cn=System," + self.base_dn,
# urgent replication should be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=test secret,cn=System," + self.base_dn)
m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
"currentValue")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
- # urgent replication should NOT be enabled when deleting
+ # urgent replication should NOT be enabled when deleting
self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
-
def test_rIDManager_object(self):
- '''Test if the urgent replication is activated
- when handling a rIDManager object'''
+ """Test if the urgent replication is activated when handling a rIDManager object."""
self.ldb.add_ldif(
"""dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
objectClass: rIDManager
# urgent replication should be enabled when modifying
m = Message()
- m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+ m.dn = Dn(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
"systemFlags")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
-
def test_urgent_attributes(self):
- '''Test if the urgent replication is activated
- when handling urgent attributes of an object'''
+ """Test if the urgent replication is activated when handling urgent attributes of an object."""
self.ldb.add({
"dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
- # urgent replication should be enabled when modifying userAccountControl
+ # urgent replication should be enabled when modifying userAccountControl
m = Message()
- m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
m["userAccountControl"] = MessageElement(str(dsdb.UF_NORMAL_ACCOUNT+dsdb.UF_SMARTCARD_REQUIRED), FLAG_MOD_REPLACE,
"userAccountControl")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
# urgent replication should be enabled when modifying lockoutTime
m = Message()
- m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
"lockoutTime")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
# urgent replication should be enabled when modifying pwdLastSet
m = Message()
- m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
"pwdLastSet")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertEquals(res["uSNHighest"], res["uSNUrgent"])
# urgent replication should NOT be enabled when modifying a not-urgent
# attribute
m = Message()
- m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+ m.dn = Dn(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
m["description"] = MessageElement("updated urgent attributes test description",
FLAG_MOD_REPLACE, "description")
- ldb.modify(m)
+ self.ldb.modify(m)
res = self.ldb.load_partition_usn(self.base_dn)
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"])
-if not "://" in host:
- if os.path.isfile(host):
- host = "tdb://%s" % host
- else:
- host = "ldap://%s" % host
-
-
-ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp,
- global_schema=False)
-
runner = SubunitTestRunner()
rc = 0
+#
if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
rc = 1
+
sys.exit(rc)