import ldb
import drs_base
-
+from samba.dcerpc import drsuapi, misc
+from samba.drs_utils import drs_DsBind
class DrsReplSchemaTestCase(drs_base.DrsBaseTestCase):
# current Class or Attribute object id
obj_id = 0
+ def _ds_bind(self, server_name):
+ binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
+
+ drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
+ (drs_handle, supported_extensions) = drs_DsBind(drs)
+ return (drs, drs_handle)
+
+ def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
+ replica_flags=0, max_objects=0):
+ req8 = drsuapi.DsGetNCChangesRequest8()
+
+ req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
+ req8.source_dsa_invocation_id = misc.GUID(invocation_id)
+ req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
+ req8.naming_context.dn = unicode(nc_dn_str)
+ req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
+ req8.highwatermark.tmp_highest_usn = 0
+ req8.highwatermark.reserved_usn = 0
+ req8.highwatermark.highest_usn = 0
+ req8.uptodateness_vector = None
+ req8.replica_flags = replica_flags
+ req8.max_object_count = max_objects
+ req8.max_ndr_size = 402116
+ req8.extended_op = exop
+ req8.fsmo_info = 0
+ req8.partial_attribute_set = None
+ req8.partial_attribute_set_ex = None
+ req8.mapping_ctr.num_mappings = 0
+ req8.mapping_ctr.mappings = None
+
+ return req8
+
def setUp(self):
super(DrsReplSchemaTestCase, self).setUp()
self._check_object(c_dn)
self._check_object(a_dn)
+ def test_classWithCustomLinkAttribute(self):
+ """Create new Attribute and a Class,
+ that has value for newly created attribute.
+ This should check code path that searches for
+ AttributeID_id in Schema cache"""
+ # add new attributeSchema object
+ (a_ldn, a_dn) = self._schema_new_attr(self.ldb_dc1, "attr-Link-X", 1,
+ attrs={'linkID':"99990",
+ "attributeSyntax": "2.5.5.1",
+ "omSyntax": "127"})
+ # add a base classSchema class so we can use our new
+ # attribute in class definition in a sibling class
+ (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-Link-Y", 7,
+ 1,
+ {"systemMayContain": a_ldn,
+ "subClassOf": "classSchema"})
+ # add new classSchema object with value for a_ldb attribute
+ (c_ldn, c_dn) = self._schema_new_class(self.ldb_dc1, "cls-Link-Z", 8,
+ 1,
+ {"objectClass": ["top", "classSchema", c_ldn],
+ a_ldn: self.schema_dn})
+ # force replication from DC1 to DC2
+ self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, nc_dn=self.schema_dn, forced=True)
+ # check objects are replicated
+ self._check_object(c_dn)
+ self._check_object(a_dn)
+
+ res = self.ldb_dc1.search(base=a_dn,
+ scope=SCOPE_BASE,
+ attrs=["msDS-IntId"])
+ self.assertEqual(1, len(res))
+ self.assertTrue("msDS-IntId" in res[0])
+ int_id = int(res[0]["msDS-IntId"][0])
+ if int_id < 0:
+ int_id += (1 << 32)
+
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1)
+
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=dc_guid_1,
+ nc_dn_str=c_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_SYNC_FORCED)
+
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+
+ for link in ctr.linked_attributes:
+ self.assertTrue(link.attid != int_id,
+ 'Got %d for both' % link.attid)
+
def test_attribute(self):
"""Simple test for attributeSchema replication"""
# add new attributeSchema object