2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
10 sys.path.insert(0, "bin/python")
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
14 import samba.getopt as options
16 from samba.auth import system_session
18 from samba.samdb import SamDB
19 from samba.dcerpc import misc
21 from samba.dcerpc import drsuapi, misc, drsblobs
22 from samba.drs_utils import drs_DsBind
23 from samba.ndr import ndr_unpack, ndr_pack
30 class LATestException(Exception):
34 def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
35 replica_flags=0, max_objects=0):
36 req8 = drsuapi.DsGetNCChangesRequest8()
38 req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
39 req8.source_dsa_invocation_id = misc.GUID(invocation_id)
40 req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
41 req8.naming_context.dn = unicode(nc_dn_str)
42 req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
43 req8.highwatermark.tmp_highest_usn = 0
44 req8.highwatermark.reserved_usn = 0
45 req8.highwatermark.highest_usn = 0
46 req8.uptodateness_vector = None
47 req8.replica_flags = replica_flags
48 req8.max_object_count = max_objects
49 req8.max_ndr_size = 402116
50 req8.extended_op = exop
52 req8.partial_attribute_set = None
53 req8.partial_attribute_set_ex = None
54 req8.mapping_ctr.num_mappings = 0
55 req8.mapping_ctr.mappings = None
59 def _ds_bind(self, server_name):
60 binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
62 drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
63 (drs_handle, supported_extensions) = drs_DsBind(drs)
64 return (drs, drs_handle)
67 class LATests(drs_base.DrsBaseTestCase, ExopBaseTest):
70 super(LATests, self).setUp()
71 # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
72 # we're only using one
73 self.samdb = self.ldb_dc1
75 self.base_dn = self.samdb.domain_dn()
76 self.ou = "OU=la,%s" % self.base_dn
79 self.samdb.delete(self.ou, ['tree_delete:1'])
80 except ldb.LdbError, e:
82 self.samdb.add({'objectclass': 'organizationalUnit',
85 self.dc_guid = self.samdb.get_invocation_id()
86 self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
89 super(LATests, self).tearDown()
91 self.samdb.delete(self.ou, ['tree_delete:1'])
92 except ldb.LdbError, e:
95 def delete_user(self, user):
96 self.samdb.delete(user['dn'])
97 del self.users[self.users.index(user)]
99 def add_object(self, cn, objectclass):
100 dn = "CN=%s,%s" % (cn, self.ou)
101 self.samdb.add({'cn': cn,
102 'objectclass': objectclass,
107 def add_objects(self, n, objectclass, prefix=None):
112 dns.append(self.add_object("%s%d" % (prefix, i + 1),
116 def add_linked_attribute(self, src, dest, attr='member'):
118 m.dn = ldb.Dn(self.samdb, src)
119 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
122 def remove_linked_attribute(self, src, dest, attr='member'):
124 m.dn = ldb.Dn(self.samdb, src)
125 m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
128 def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
130 req8 = self._exop_req8(dest_dsa=None,
131 invocation_id=self.dc_guid,
133 exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
135 level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
136 expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
139 for link in ctr.linked_attributes:
140 if link.attid == expected_attid:
141 unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
143 active = link.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
144 links.append((str(unpacked.dn), bool(active)))
149 def assert_forward_links(self, obj, expected, attr='member'):
150 results = self.attr_search(obj, expected, attr)
151 self.assertEqual(len(results), len(expected))
154 self.assertTrue(k in expected)
155 self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
158 def get_object_guid(self, dn):
159 res = self.samdb.search(dn,
160 scope=ldb.SCOPE_BASE,
161 attrs=['objectGUID'])
162 return str(misc.GUID(res[0]['objectGUID'][0]))
164 def test_links_all_delete_group(self):
165 u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
166 g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
167 g2guid = self.get_object_guid(g2)
169 self.add_linked_attribute(g1, u1)
170 self.add_linked_attribute(g2, u1)
171 self.add_linked_attribute(g2, u2)
173 self.samdb.delete(g2)
174 self.assert_forward_links(g1, {u1: True})
175 res = self.samdb.search('<GUID=%s>' % g2guid,
176 scope=ldb.SCOPE_BASE,
177 controls=['show_deleted:1'])
179 self.assert_forward_links(new_dn, {})
182 def test_la_links_delete_link(self):
183 u1, u2 = self.add_objects(2, 'user', 'u_del_link')
184 g1, g2 = self.add_objects(2, 'group', 'g_del_link')
186 self.add_linked_attribute(g1, u1)
187 self.add_linked_attribute(g2, u1)
188 self.add_linked_attribute(g2, u2)
190 self.remove_linked_attribute(g2, u1)
192 self.assert_forward_links(g1, {u1: True})
193 self.assert_forward_links(g2, {u1: False, u2: True})
195 self.add_linked_attribute(g2, u1)
196 self.remove_linked_attribute(g2, u2)
197 self.assert_forward_links(g2, {u1: True, u2: False})
198 self.remove_linked_attribute(g2, u1)
199 self.assert_forward_links(g2, {u1: False, u2: False})
201 def test_la_links_delete_user(self):
202 u1, u2 = self.add_objects(2, 'user', 'u_del_user')
203 g1, g2 = self.add_objects(2, 'group', 'g_del_user')
205 self.add_linked_attribute(g1, u1)
206 self.add_linked_attribute(g2, u1)
207 self.add_linked_attribute(g2, u2)
209 self.samdb.delete(u1)
211 self.assert_forward_links(g1, {})
212 self.assert_forward_links(g2, {u2: True})