04d31c2c63d557e7d667ca431389d0ed0470b966
[metze/samba/wip.git] / source4 / torture / drs / python / linked_attributes_drs.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import sys
5 import os
6 import base64
7 import random
8 import re
9
10 sys.path.insert(0, "bin/python")
11 import samba
12 from samba.tests.subunitrun import SubunitOptions, TestProgram
13
14 import samba.getopt as options
15
16 from samba.auth import system_session
17 import ldb
18 from samba.samdb import SamDB
19 from samba.dcerpc import misc
20
21 from samba.dcerpc import drsuapi, misc, drsblobs
22 from samba.drs_utils import drs_DsBind
23 from samba.ndr import ndr_unpack, ndr_pack
24
25 import drs_base
26
27 import time
28
29
30 class LATestException(Exception):
31     pass
32
33 class ExopBaseTest:
34     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
35                    replica_flags=0, max_objects=0):
36         req8 = drsuapi.DsGetNCChangesRequest8()
37
38         req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
39         req8.source_dsa_invocation_id = misc.GUID(invocation_id)
40         req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
41         req8.naming_context.dn = unicode(nc_dn_str)
42         req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
43         req8.highwatermark.tmp_highest_usn = 0
44         req8.highwatermark.reserved_usn = 0
45         req8.highwatermark.highest_usn = 0
46         req8.uptodateness_vector = None
47         req8.replica_flags = replica_flags
48         req8.max_object_count = max_objects
49         req8.max_ndr_size = 402116
50         req8.extended_op = exop
51         req8.fsmo_info = 0
52         req8.partial_attribute_set = None
53         req8.partial_attribute_set_ex = None
54         req8.mapping_ctr.num_mappings = 0
55         req8.mapping_ctr.mappings = None
56
57         return req8
58
59     def _ds_bind(self, server_name):
60         binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
61
62         drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
63         (drs_handle, supported_extensions) = drs_DsBind(drs)
64         return (drs, drs_handle)
65
66     
67 class LATests(drs_base.DrsBaseTestCase, ExopBaseTest):
68
69     def setUp(self):
70         super(LATests, self).setUp()
71         # DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
72         # we're only using one
73         self.samdb = self.ldb_dc1
74         
75         self.base_dn = self.samdb.domain_dn()
76         self.ou = "OU=la,%s" % self.base_dn
77         if True:
78             try:
79                 self.samdb.delete(self.ou, ['tree_delete:1'])
80             except ldb.LdbError, e:
81                 pass
82         self.samdb.add({'objectclass': 'organizationalUnit',
83                         'dn': self.ou})
84
85         self.dc_guid = self.samdb.get_invocation_id()
86         self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
87
88     def tearDown(self):
89         super(LATests, self).tearDown()
90         try:
91             self.samdb.delete(self.ou, ['tree_delete:1'])
92         except ldb.LdbError, e:
93             pass
94
95     def delete_user(self, user):
96         self.samdb.delete(user['dn'])
97         del self.users[self.users.index(user)]
98
99     def add_object(self, cn, objectclass):
100         dn = "CN=%s,%s" % (cn, self.ou)
101         self.samdb.add({'cn': cn,
102                       'objectclass': objectclass,
103                       'dn': dn})
104
105         return dn
106
107     def add_objects(self, n, objectclass, prefix=None):
108         if prefix is None:
109             prefix = objectclass
110         dns = []
111         for i in range(n):
112             dns.append(self.add_object("%s%d" % (prefix, i + 1),
113                                        objectclass))
114         return dns
115
116     def add_linked_attribute(self, src, dest, attr='member'):
117         m = ldb.Message()
118         m.dn = ldb.Dn(self.samdb, src)
119         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
120         self.samdb.modify(m)
121
122     def remove_linked_attribute(self, src, dest, attr='member'):
123         m = ldb.Message()
124         m.dn = ldb.Dn(self.samdb, src)
125         m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
126         self.samdb.modify(m)
127
128     def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
129
130         req8 = self._exop_req8(dest_dsa=None,
131                                invocation_id=self.dc_guid,
132                                nc_dn_str=obj,
133                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
134
135         level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
136         expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
137
138         links = []
139         for link in ctr.linked_attributes:
140             if link.attid == expected_attid:
141                 unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
142                                       link.value.blob)
143                 active = link.flags &  drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
144                 links.append((str(unpacked.dn), bool(active)))
145
146         return links
147
148
149     def assert_forward_links(self, obj, expected, attr='member'):
150         results = self.attr_search(obj, expected, attr)
151         self.assertEqual(len(results), len(expected))
152
153         for k, v in results:
154             self.assertTrue(k in expected) 
155             self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
156                              (k, expected[k], v))
157
158     def get_object_guid(self, dn):
159         res = self.samdb.search(dn,
160                                 scope=ldb.SCOPE_BASE,
161                                 attrs=['objectGUID'])
162         return str(misc.GUID(res[0]['objectGUID'][0]))
163
164     def test_links_all_delete_group(self):
165         u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
166         g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
167         g2guid = self.get_object_guid(g2)
168
169         self.add_linked_attribute(g1, u1)
170         self.add_linked_attribute(g2, u1)
171         self.add_linked_attribute(g2, u2)
172
173         self.samdb.delete(g2)
174         self.assert_forward_links(g1, {u1: True})
175         res = self.samdb.search('<GUID=%s>' % g2guid,
176                                 scope=ldb.SCOPE_BASE,
177                                 controls=['show_deleted:1'])
178         new_dn = res[0].dn
179         self.assert_forward_links(new_dn, {})
180
181
182     def test_la_links_delete_link(self):
183         u1, u2 = self.add_objects(2, 'user', 'u_del_link')
184         g1, g2 = self.add_objects(2, 'group', 'g_del_link')
185
186         self.add_linked_attribute(g1, u1)
187         self.add_linked_attribute(g2, u1)
188         self.add_linked_attribute(g2, u2)
189
190         self.remove_linked_attribute(g2, u1)
191
192         self.assert_forward_links(g1, {u1: True})
193         self.assert_forward_links(g2, {u1: False, u2: True})
194
195         self.add_linked_attribute(g2, u1)
196         self.remove_linked_attribute(g2, u2)
197         self.assert_forward_links(g2, {u1: True, u2: False})
198         self.remove_linked_attribute(g2, u1)
199         self.assert_forward_links(g2, {u1: False, u2: False})
200
201     def test_la_links_delete_user(self):
202         u1, u2 = self.add_objects(2, 'user', 'u_del_user')
203         g1, g2 = self.add_objects(2, 'group', 'g_del_user')
204
205         self.add_linked_attribute(g1, u1)
206         self.add_linked_attribute(g2, u1)
207         self.add_linked_attribute(g2, u2)
208
209         self.samdb.delete(u1)
210
211         self.assert_forward_links(g1, {})
212         self.assert_forward_links(g2, {u2: True})