from samba.dcerpc import drsuapi, misc, drsblobs
from samba.drs_utils import drs_DsBind
+from samba.ndr import ndr_unpack
+def _linked_attribute_compare(la1, la2):
+ """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
+ la1, la1_target = la1
+ la2, la2_target = la2
+
+ # Ascending host object GUID
+ c = cmp(la1.identifier.guid, la2.identifier.guid)
+ if c != 0:
+ return c
+
+ # Ascending attribute ID
+ if la1.attid != la2.attid:
+ return -1 if la1.attid < la2.attid else 1
+
+ la1_active = la1.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
+ la2_active = la2.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
+
+ # Ascending 'is present'
+ if la1_active != la2_active:
+ return 1 if la1_active else -1
+
+ # Ascending target object GUID
+ return cmp(la1_target, la2_target)
+
+class AbstractLink:
+ def __init__(self, attid, flags, identifier, targetGUID):
+ self.attid = attid
+ self.flags = flags
+ self.identifier = identifier
+ self.targetGUID = targetGUID
+
+ def __eq__(self, other):
+ return isinstance(other, AbstractLink) and \
+ ((self.attid, self.flags, self.identifier, self.targetGUID) ==
+ (other.attid, other.flags, other.identifier, other.targetGUID))
+
+ def __hash__(self):
+ return hash((self.attid, self.flags, self.identifier, self.targetGUID))
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
"""Intended as a semi-black box test case for DsGetNCChanges
def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
replica_flags=0):
req8 = drsuapi.DsGetNCChangesRequest8()
-
- req8.destination_dsa_guid = misc.GUID(dest_dsa)
+
+ req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
req8.source_dsa_invocation_id = misc.GUID(invocation_id)
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
req8.naming_context.dn = unicode(nc_dn_str)
self.assertEqual(ctr6.drs_error[0], 0)
# We don't check the linked_attributes_count as if the domain
# has an RODC, it can gain links on the server account object
+
+ def test_sort_behaviour_single_object(self):
+ """Testing sorting behaviour on single objects"""
+ self.base_dn = self.ldb_dc1.get_default_basedn()
+ self.ou = "ou=sort_exop,%s" % self.base_dn
+ self.ldb_dc1.add({
+ "dn": self.ou,
+ "objectclass": "organizationalUnit"})
+
+ user1_dn = "cn=test_user1,%s" % self.ou
+ user2_dn = "cn=test_user2,%s" % self.ou
+ user3_dn = "cn=test_user3,%s" % self.ou
+ group_dn = "cn=test_group,%s" % self.ou
+
+ self.ldb_dc1.add({"dn": user1_dn, "objectclass": "user"})
+ self.ldb_dc1.add({"dn": user2_dn, "objectclass": "user"})
+ self.ldb_dc1.add({"dn": user3_dn, "objectclass": "user"})
+ self.ldb_dc1.add({"dn": group_dn, "objectclass": "group"})
+
+ u1_guid = str(misc.GUID(self.ldb_dc1.search(base=user1_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0]))
+ u2_guid = str(misc.GUID(self.ldb_dc1.search(base=user2_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0]))
+ u3_guid = str(misc.GUID(self.ldb_dc1.search(base=user3_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0]))
+ g_guid = str(misc.GUID(self.ldb_dc1.search(base=group_dn,
+ attrs=["objectGUID"])[0]['objectGUID'][0]))
+
+ self.add_linked_attribute(group_dn, user1_dn,
+ attr='member')
+ self.add_linked_attribute(group_dn, user2_dn,
+ attr='member')
+ self.add_linked_attribute(group_dn, user3_dn,
+ attr='member')
+ self.add_linked_attribute(group_dn, user1_dn,
+ attr='managedby')
+ self.add_linked_attribute(group_dn, user2_dn,
+ attr='nonSecurityMember')
+ self.add_linked_attribute(group_dn, user3_dn,
+ attr='nonSecurityMember')
+
+ set_inactive = AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ g_guid, u3_guid)
+
+ expected_links = set([set_inactive,
+ AbstractLink(drsuapi.DRSUAPI_ATTID_member,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ g_guid,
+ u1_guid),
+ AbstractLink(drsuapi.DRSUAPI_ATTID_member,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ g_guid,
+ u2_guid),
+ AbstractLink(drsuapi.DRSUAPI_ATTID_member,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ g_guid,
+ u3_guid),
+ AbstractLink(drsuapi.DRSUAPI_ATTID_managedBy,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ g_guid,
+ u1_guid),
+ AbstractLink(drsuapi.DRSUAPI_ATTID_nonSecurityMember,
+ drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE,
+ g_guid,
+ u2_guid),
+ ])
+
+ dc_guid_1 = self.ldb_dc1.get_invocation_id()
+
+ drs, drs_handle = self._ds_bind(self.dnsname_dc1)
+
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=dc_guid_1,
+ nc_dn_str=group_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
+
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+
+ no_inactive = []
+ for link in ctr.linked_attributes:
+ target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
+ link.value.blob).guid
+ no_inactive.append((link, target_guid))
+ self.assertTrue(AbstractLink(link.attid, link.flags,
+ str(link.identifier.guid),
+ str(target_guid)) in expected_links)
+
+ no_inactive.sort(cmp=_linked_attribute_compare)
+
+ # assert the two arrays are the same
+ self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
+ self.assertEqual(len(expected_links), ctr.linked_attributes_count)
+
+ self.remove_linked_attribute(group_dn, user3_dn,
+ attr='nonSecurityMember')
+
+ # Set the link inactive
+ expected_links.remove(set_inactive)
+ set_inactive.flags = 0
+ expected_links.add(set_inactive)
+
+ has_inactive = []
+ (level, ctr) = drs.DsGetNCChanges(drs_handle, 8, req8)
+ for link in ctr.linked_attributes:
+ target_guid = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
+ link.value.blob).guid
+ has_inactive.append((link, target_guid))
+ self.assertTrue(AbstractLink(link.attid, link.flags,
+ str(link.identifier.guid),
+ str(target_guid)) in expected_links)
+
+ has_inactive.sort(cmp=_linked_attribute_compare)
+
+ # assert the two arrays are the same
+ self.assertEqual([x[0] for x in has_inactive], ctr.linked_attributes)
+ self.assertEqual(len(expected_links), ctr.linked_attributes_count)
+
+ # tidyup groups and users
+ try:
+ self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
+ except ldb.LdbError as (enum, string):
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ pass
+
+ def add_linked_attribute(self, src, dest, attr='member'):
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, src)
+ m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
+ self.ldb_dc1.modify(m)
+
+ def remove_linked_attribute(self, src, dest, attr='member'):
+ m = ldb.Message()
+ m.dn = ldb.Dn(self.ldb_dc1, src)
+ m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
+ self.ldb_dc1.modify(m)