s4-drs: Test situations for Urgent Replication
authorFernando J V da Silva <fernandojvsilva@yahoo.com.br>
Thu, 4 Feb 2010 19:03:41 +0000 (17:03 -0200)
committerAndrew Tridgell <tridge@samba.org>
Mon, 15 Feb 2010 10:57:08 +0000 (21:57 +1100)
Checks if the partition's uSNUrgent is updated or not, depending
on the class of the object which is created, modified or deleted.

Signed-off-by: Andrew Tridgell <tridge@samba.org>
source4/lib/ldb/tests/python/urgent_replication.py [new file with mode: 0644]
source4/selftest/tests.sh

diff --git a/source4/lib/ldb/tests/python/urgent_replication.py b/source4/lib/ldb/tests/python/urgent_replication.py
new file mode 100644 (file)
index 0000000..d0a63a1
--- /dev/null
@@ -0,0 +1,414 @@
+#!/usr/bin/python
+# -*- coding: utf-8 -*-
+# This is a port of the original in testprogs/ejs/ldap.js
+
+import getopt
+import optparse
+import sys
+import time
+import random
+import base64
+import os
+
+sys.path.append("bin/python")
+sys.path.append("../lib/subunit/python")
+
+import samba.getopt as options
+
+from samba.auth import system_session
+from ldb import SCOPE_BASE, LdbError
+from ldb import ERR_NO_SUCH_OBJECT
+from ldb import Message, MessageElement, Dn
+from ldb import FLAG_MOD_REPLACE
+from samba import Ldb
+from samba import glue
+
+from subunit.run import SubunitTestRunner
+import unittest
+
+from samba.ndr import ndr_pack, ndr_unpack
+from samba.dcerpc import security
+
+parser = optparse.OptionParser("ldap [options] <host>")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+parser.add_option_group(options.VersionOptions(parser))
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+opts, args = parser.parse_args()
+
+if len(args) < 1:
+    parser.print_usage()
+    sys.exit(1)
+
+host = args[0]
+
+lp = sambaopts.get_loadparm()
+creds = credopts.get_credentials(lp)
+
+class UrgentReplicationTests(unittest.TestCase):
+
+    def delete_force(self, ldb, dn):
+        try:
+            ldb.delete(dn)
+        except LdbError, (num, _):
+            self.assertEquals(num, ERR_NO_SUCH_OBJECT)
+
+    def find_basedn(self, ldb):
+        res = ldb.search(base="", expression="", scope=SCOPE_BASE,
+                         attrs=["defaultNamingContext"])
+        self.assertEquals(len(res), 1)
+        return res[0]["defaultNamingContext"][0]
+
+    def find_configurationdn(self, ldb):
+        res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["configurationNamingContext"])
+        self.assertEquals(len(res), 1)
+        return res[0]["configurationNamingContext"][0]
+
+    def find_schemadn(self, ldb):
+        res = ldb.search(base="", expression="", scope=SCOPE_BASE, attrs=["schemaNamingContext"])
+        self.assertEquals(len(res), 1)
+        return res[0]["schemaNamingContext"][0]
+
+    def find_domain_sid(self):
+        res = self.ldb.search(base=self.base_dn, expression="(objectClass=*)", scope=SCOPE_BASE)
+        return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
+
+    def setUp(self):
+        self.ldb = ldb
+        self.gc_ldb = gc_ldb
+        self.base_dn = self.find_basedn(ldb)
+        self.configuration_dn = self.find_configurationdn(ldb)
+        self.schema_dn = self.find_schemadn(ldb)
+        self.domain_sid = self.find_domain_sid()
+
+        print "baseDN: %s\n" % self.base_dn
+
+    def test_nonurgent_object(self):
+        '''Test if the urgent replication is not activated
+           when handling a non urgent object'''
+        self.ldb.add({
+            "dn": "cn=nonurgenttest,cn=users," + self.base_dn,
+            "objectclass":"user",
+            "samaccountname":"nonurgenttest",
+            "description":"nonurgenttest description"});
+
+        ''' urgent replication shouldn't be enabled when creating '''
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication shouldn't be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+        m["description"] = MessageElement("new description", FLAG_MOD_REPLACE,
+          "description")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication shouldn't be enabled when deleting '''
+        self.delete_force(self.ldb, "cn=nonurgenttest,cn=users," + self.base_dn)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+    def test_nTDSDSA_object(self):
+        '''Test if the urgent replication is activated
+           when handling a nTDSDSA object'''
+        self.ldb.add({
+            "dn": "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn,
+            "objectclass":"server",
+            "cn":"test server",
+            "name":"test server",
+            "systemFlags":"50000000"});
+
+        self.ldb.add_ldif(
+            """dn: cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration,%s""" % (self.base_dn) + """
+objectclass: nTDSDSA
+cn: NTDS Settings test
+options: 1
+instanceType: 4
+systemFlags: 33554432""", ["relax:0"]);
+
+        ''' urgent replication should be enabled when creation '''
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should NOT be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+        m["options"] = MessageElement("0", FLAG_MOD_REPLACE,
+          "options")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should be enabled when deleting '''
+        self.delete_force(self.ldb, "cn=NTDS Settings test,cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        self.delete_force(self.ldb, "cn=test server,cn=Servers,cn=Default-First-Site-Name,cn=Sites,cn=Configuration," + self.base_dn)
+
+
+    def test_crossRef_object(self):
+        '''Test if the urgent replication is activated
+           when handling a crossRef object'''
+        self.ldb.add({
+                      "dn": "CN=test crossRef,CN=Partitions,CN=Configuration,"+ self.base_dn,
+                      "objectClass": "crossRef",
+                      "cn": "test crossRef",
+                      "instanceType": "4",
+                      "nCName": self.base_dn,
+                      "showInAdvancedViewOnly": "TRUE",
+                      "name": "test crossRef",
+                      "systemFlags": "1"});
+
+        ''' urgent replication should be enabled when creating '''
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should NOT be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+        m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+          "systemFlags")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+        ''' urgent replication should be enabled when deleting '''
+        self.delete_force(self.ldb, "cn=test crossRef,CN=Partitions,CN=Configuration," + self.base_dn)
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Configuration," + self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+
+    def test_attributeSchema_object(self):
+        '''Test if the urgent replication is activated
+           when handling an attributeSchema object'''
+
+        try:
+            self.ldb.add_ldif(
+                              """dn: CN=test attributeSchema,cn=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: attributeSchema
+cn: test attributeSchema
+instanceType: 4
+isSingleValued: FALSE
+showInAdvancedViewOnly: FALSE
+attributeID: 0.9.2342.19200300.100.1.1
+attributeSyntax: 2.5.5.12
+adminDisplayName: test attributeSchema
+adminDescription: test attributeSchema
+oMSyntax: 64
+systemOnly: FALSE
+searchFlags: 8
+lDAPDisplayName: test attributeSchema
+name: test attributeSchema
+systemFlags: 0""", ["relax:0"]);
+
+            ''' urgent replication should be enabled when creating '''
+            res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+            self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        except LdbError:
+            print "Not testing urgent replication when creating attributeSchema object ...\n"
+
+        ''' urgent replication should be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "CN=test attributeSchema,CN=Schema,CN=Configuration," + self.base_dn)
+        m["lDAPDisplayName"] = MessageElement("updated test attributeSchema", FLAG_MOD_REPLACE,
+          "lDAPDisplayName")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+    def test_classSchema_object(self):
+        '''Test if the urgent replication is activated
+           when handling a classSchema object'''
+        try:
+            self.ldb.add_ldif(
+                            """dn: CN=test classSchema,CN=Schema,CN=Configuration,%s""" % self.base_dn + """
+objectClass: classSchema
+cn: test classSchema
+instanceType: 4
+subClassOf: top
+governsID: 1.2.840.113556.1.5.999
+rDNAttID: cn
+showInAdvancedViewOnly: TRUE
+adminDisplayName: test classSchema
+adminDescription: test classSchema
+objectClassCategory: 1
+lDAPDisplayName: test classSchema
+name: test classSchema
+systemOnly: FALSE
+systemPossSuperiors: dfsConfiguration
+systemMustContain: msDFS-SchemaMajorVersion
+defaultSecurityDescriptor: D:(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)(A;;RPWPCRCCD
+ CLCLORCWOWDSDDTSW;;;SY)(A;;RPLCLORC;;;AU)(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;CO)
+systemFlags: 16
+defaultHidingValue: TRUE""", ["relax:0"]);
+
+            ''' urgent replication should be enabled when creating '''
+            res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+            self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        except LdbError:
+            print "Not testing urgent replication when creating classSchema object ...\n"
+
+        ''' urgent replication should be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "CN=test classSchema,CN=Schema,CN=Configuration," + self.base_dn)
+        m["lDAPDisplayName"] = MessageElement("updated test classSchema", FLAG_MOD_REPLACE,
+          "lDAPDisplayName")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, "cn=Schema,cn=Configuration," + self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+    def test_secret_object(self):
+
+        '''Test if the urgent replication is activated
+           when handling a secret object'''
+
+        self.ldb.add({
+            "dn": "cn=test secret,cn=System," + self.base_dn,
+            "objectClass":"secret",
+            "cn":"test secret",
+            "name":"test secret",
+            "currentValue":"xxxxxxx"});
+
+
+        ''' urgent replication should be enabled when creationg '''
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=test secret,cn=System," + self.base_dn)
+        m["currentValue"] = MessageElement("yyyyyyyy", FLAG_MOD_REPLACE,
+          "currentValue")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should NOT be enabled when deleting '''
+        self.delete_force(self.ldb, "cn=test secret,cn=System," + self.base_dn)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+    def test_rIDManager_object(self):
+        '''Test if the urgent replication is activated
+            when handling a rIDManager object'''
+        self.ldb.add_ldif(
+            """dn: CN=RID Manager test,CN=System,%s""" % self.base_dn + """
+objectClass: rIDManager
+cn: RID Manager test
+instanceType: 4
+showInAdvancedViewOnly: TRUE
+name: RID Manager test
+systemFlags: -1946157056
+isCriticalSystemObject: TRUE
+rIDAvailablePool: 133001-1073741823""", ["relax:0"])
+
+        ''' urgent replication should be enabled when creating '''
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should be enabled when modifying '''
+        m = Message()
+        m.dn = Dn(ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+        m["systemFlags"] = MessageElement("0", FLAG_MOD_REPLACE,
+          "systemFlags")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should NOT be enabled when deleting '''
+        self.delete_force(self.ldb, "CN=RID Manager test,CN=System," + self.base_dn)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+    def test_urgent_attributes(self):
+        '''Test if the urgent replication is activated
+            when handling urgent attributes of an object'''
+
+        self.ldb.add({
+            "dn": "cn=user UrgAttr test,cn=users," + self.base_dn,
+            "objectclass":"user",
+            "samaccountname":"user UrgAttr test",
+            "userAccountControl":"1",
+            "lockoutTime":"0",
+            "pwdLastSet":"0",
+            "description":"urgent attributes test description"});
+
+        ''' urgent replication should NOT be enabled when creating '''
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should be enabled when modifying userAccountControl '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+        m["userAccountControl"] = MessageElement("0", FLAG_MOD_REPLACE,
+          "userAccountControl")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should be enabled when modifying lockoutTime '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+        m["lockoutTime"] = MessageElement("1", FLAG_MOD_REPLACE,
+          "lockoutTime")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should be enabled when modifying pwdLastSet '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+        m["pwdLastSet"] = MessageElement("1", FLAG_MOD_REPLACE,
+          "pwdLastSet")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should NOT be enabled when modifying a not-urgent attribute '''
+        m = Message()
+        m.dn = Dn(ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+        m["description"] = MessageElement("updated urgent attributes test description",
+                                          FLAG_MOD_REPLACE, "description")
+        ldb.modify(m)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+        ''' urgent replication should NOT be enabled when deleting '''
+        self.delete_force(self.ldb, "cn=user UrgAttr test,cn=users," + self.base_dn)
+        res = glue.dsdb_load_partition_usn(self.ldb, self.base_dn)
+        self.assertNotEquals(res["uSNHighest"], res["uSNUrgent"]);
+
+
+if not "://" in host:
+    if os.path.isfile(host):
+        host = "tdb://%s" % host
+    else:
+        host = "ldap://%s" % host
+
+
+ldb = Ldb(host, credentials=creds, session_info=system_session(), lp=lp)
+if not "tdb://" in host:
+    gc_ldb = Ldb("%s:3268" % host, credentials=creds,
+                 session_info=system_session(), lp=lp)
+else:
+    gc_ldb = None
+
+runner = SubunitTestRunner()
+rc = 0
+if not runner.run(unittest.makeSuite(UrgentReplicationTests)).wasSuccessful():
+    rc = 1
+sys.exit(rc)
index 904f1484a80d9e400c7e7d9ab365589b729293f8..d2813b58b3503d093af11d575febf66c4ac43c56 100755 (executable)
@@ -461,6 +461,7 @@ plantest "subunit.python" none $SUBUNITRUN subunit
 plantest "rpcecho.python" dc:local $SUBUNITRUN samba.tests.dcerpc.rpcecho
 plantest "winreg.python" dc:local $SUBUNITRUN -U\$USERNAME%\$PASSWORD samba.tests.dcerpc.registry
 plantest "ldap.python" dc PYTHONPATH="$PYTHONPATH:../lib/subunit/python" $PYTHON $samba4srcdir/lib/ldb/tests/python/ldap.py $CONFIGURATION \$SERVER -U\$USERNAME%\$PASSWORD -W \$DOMAIN
+plantest "urgent_replication.python" dc PYTHONPATH="$PYTHONPATH:../lib/subunit/python" $PYTHON $samba4srcdir/lib/ldb/tests/python/urgent_replication.py $CONFIGURATION \$SERVER -U\$USERNAME%\$PASSWORD -W \$DOMAIN
 plantest "ldap_schema.python" dc PYTHONPATH="$PYTHONPATH:../lib/subunit/python" $PYTHON $samba4srcdir/lib/ldb/tests/python/ldap_schema.py $CONFIGURATION \$SERVER -U\$USERNAME%\$PASSWORD -W \$DOMAIN
 plantest "ldap.possibleInferiors.python" dc $PYTHON $samba4srcdir/dsdb/samdb/ldb_modules/tests/possibleinferiors.py $CONFIGURATION ldap://\$SERVER -U\$USERNAME%\$PASSWORD -W \$DOMAIN
 plantest "ldap.secdesc.python" dc PYTHONPATH="$PYTHONPATH:../lib/subunit/python" $PYTHON $samba4srcdir/lib/ldb/tests/python/sec_descriptor.py $CONFIGURATION \$SERVER -U\$USERNAME%\$PASSWORD -W \$DOMAIN