1b30e9fbb79737db062f1a2b62d1fc00e9b81238
[obnox/samba/samba-obnox.git] / lib / ldb-samba / tests / match_rules.py
1 #!/usr/bin/env python
2
3 import optparse
4 import sys
5 import os
6 import unittest
7 import samba
8 import samba.getopt as options
9
10 from samba.tests.subunitrun import SubunitOptions, TestProgram
11
12 from samba.tests import delete_force
13 from samba.dcerpc import security, misc
14 from samba.samdb import SamDB
15 from samba.auth import system_session
16 from samba.ndr import ndr_unpack
17 from ldb import Message, MessageElement, Dn, LdbError
18 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
19 from ldb import SCOPE_BASE, SCOPE_SUBTREE
20
21 class MatchRulesTests(samba.tests.TestCase):
22     def setUp(self):
23         super(MatchRulesTests, self).setUp()
24         self.lp = lp
25         self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
26         self.base_dn = self.ldb.domain_dn()
27         self.ou = "ou=matchrulestest,%s" % self.base_dn
28         self.ou_users = "ou=users,%s" % self.ou
29         self.ou_groups = "ou=groups,%s" % self.ou
30         self.ou_computers = "ou=computers,%s" % self.ou
31
32         # Add a organizational unit to create objects
33         self.ldb.add({
34             "dn": self.ou,
35             "objectclass": "organizationalUnit"})
36
37         # Add the following OU hierarchy and set otherWellKnownObjects,
38         # which has BinaryDN syntax:
39         #
40         # o1
41         # |--> o2
42         # |    |--> o3
43         # |    |    |-->o4
44
45         self.ldb.add({
46             "dn": "OU=o1,%s" % self.ou,
47             "objectclass": "organizationalUnit"})
48         self.ldb.add({
49             "dn": "OU=o2,OU=o1,%s" % self.ou,
50             "objectclass": "organizationalUnit"})
51         self.ldb.add({
52             "dn": "OU=o3,OU=o2,OU=o1,%s" % self.ou,
53             "objectclass": "organizationalUnit"})
54         self.ldb.add({
55             "dn": "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
56             "objectclass": "organizationalUnit"})
57
58         m = Message()
59         m.dn = Dn(self.ldb, self.ou)
60         m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000001:OU=o1,%s" % self.ou,
61                                      FLAG_MOD_ADD, "otherWellKnownObjects")
62         self.ldb.modify(m)
63
64         m = Message()
65         m.dn = Dn(self.ldb, "OU=o1,%s" % self.ou)
66         m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000002:OU=o2,OU=o1,%s" % self.ou,
67                                      FLAG_MOD_ADD, "otherWellKnownObjects")
68         self.ldb.modify(m)
69
70         m = Message()
71         m.dn = Dn(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
72         m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000003:OU=o3,OU=o2,OU=o1,%s" % self.ou,
73                                      FLAG_MOD_ADD, "otherWellKnownObjects")
74         self.ldb.modify(m)
75
76         m = Message()
77         m.dn = Dn(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
78         m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
79                                      FLAG_MOD_ADD, "otherWellKnownObjects")
80         self.ldb.modify(m)
81
82         # Create OU for users and groups
83         self.ldb.add({
84             "dn": self.ou_users,
85             "objectclass": "organizationalUnit"})
86         self.ldb.add({
87             "dn": self.ou_groups,
88             "objectclass": "organizationalUnit"})
89         self.ldb.add({
90             "dn": self.ou_computers,
91             "objectclass": "organizationalUnit"})
92
93         # Add four groups
94         self.ldb.add({
95             "dn": "cn=g1,%s" % self.ou_groups,
96             "objectclass": "group" })
97         self.ldb.add({
98             "dn": "cn=g2,%s" % self.ou_groups,
99             "objectclass": "group" })
100         self.ldb.add({
101             "dn": "cn=g3,%s" % self.ou_groups,
102             "objectclass": "group" })
103         self.ldb.add({
104             "dn": "cn=g4,%s" % self.ou_groups,
105             "objectclass": "group" })
106
107         # Add four users
108         self.ldb.add({
109             "dn": "cn=u1,%s" % self.ou_users,
110             "objectclass": "user"})
111         self.ldb.add({
112             "dn": "cn=u2,%s" % self.ou_users,
113             "objectclass": "user"})
114         self.ldb.add({
115             "dn": "cn=u3,%s" % self.ou_users,
116             "objectclass": "user"})
117         self.ldb.add({
118             "dn": "cn=u4,%s" % self.ou_users,
119             "objectclass": "user"})
120
121         # Add computers to test Object(DN-Binary) syntax
122         self.ldb.add({
123             "dn": "cn=c1,%s" % self.ou_computers,
124             "objectclass": "computer",
125             "dNSHostName": "c1.%s" % self.lp.get("realm").lower(),
126             "servicePrincipalName": ["HOST/c1"],
127             "sAMAccountName": "c1$",
128             "userAccountControl": "83890178"})
129
130         self.ldb.add({
131             "dn": "cn=c2,%s" % self.ou_computers,
132             "objectclass": "computer",
133             "dNSHostName": "c2.%s" % self.lp.get("realm").lower(),
134             "servicePrincipalName": ["HOST/c2"],
135             "sAMAccountName": "c2$",
136             "userAccountControl": "83890178"})
137
138         self.ldb.add({
139             "dn": "cn=c3,%s" % self.ou_computers,
140             "objectclass": "computer",
141             "dNSHostName": "c3.%s" % self.lp.get("realm").lower(),
142             "servicePrincipalName": ["HOST/c3"],
143             "sAMAccountName": "c3$",
144             "userAccountControl": "83890178"})
145
146         # Create the following hierarchy:
147         # g4
148         # |--> u4
149         # |--> g3
150         # |    |--> u3
151         # |    |--> g2
152         # |    |    |--> u2
153         # |    |    |--> g1
154         # |    |    |    |--> u1
155
156         # u1 member of g1
157         m = Message()
158         m.dn = Dn(self.ldb, "cn=g1,%s" % self.ou_groups)
159         m["member"] = MessageElement("cn=u1,%s" % self.ou_users,
160                                      FLAG_MOD_ADD, "member")
161         self.ldb.modify(m)
162
163         # u2 member of g2
164         m = Message()
165         m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
166         m["member"] = MessageElement("cn=u2,%s" % self.ou_users,
167                                      FLAG_MOD_ADD, "member")
168         self.ldb.modify(m)
169
170         # u3 member of g3
171         m = Message()
172         m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
173         m["member"] = MessageElement("cn=u3,%s" % self.ou_users,
174                                      FLAG_MOD_ADD, "member")
175         self.ldb.modify(m)
176
177         # u4 member of g4
178         m = Message()
179         m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
180         m["member"] = MessageElement("cn=u4,%s" % self.ou_users,
181                                      FLAG_MOD_ADD, "member")
182         self.ldb.modify(m)
183
184         # g3 member of g4
185         m = Message()
186         m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
187         m["member"] = MessageElement("cn=g3,%s" % self.ou_groups,
188                                      FLAG_MOD_ADD, "member")
189         self.ldb.modify(m)
190
191         # g2 member of g3
192         m = Message()
193         m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
194         m["member"] = MessageElement("cn=g2,%s" % self.ou_groups,
195                                      FLAG_MOD_ADD, "member")
196         self.ldb.modify(m)
197
198         # g1 member of g2
199         m = Message()
200         m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
201         m["member"] = MessageElement("cn=g1,%s" % self.ou_groups,
202                                      FLAG_MOD_ADD, "member")
203         self.ldb.modify(m)
204
205         # The msDS-RevealedUsers is owned by system and cannot be modified
206         # directly. Set the schemaUpgradeInProgress flag as workaround
207         # and create this hierarchy:
208         # ou=computers
209         # |-> c1
210         # |   |->c2
211         # |   |  |->u1
212
213         #
214         # While appropriate for this test, this is NOT a good practice
215         # in general.  This is only done here because the alternative
216         # is to make a schema modification.
217         #
218         # IF/WHEN Samba protects this attribute better, this
219         # particular part of the test can be removed, as the same code
220         # is covered by the addressBookRoots2 case well enough.
221         #
222         m = Message()
223         m.dn = Dn(self.ldb, "")
224         m["e1"] = MessageElement("1", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
225         self.ldb.modify(m)
226
227         m = Message()
228         m.dn = Dn(self.ldb, "cn=c2,%s" % self.ou_computers)
229         m["e1"] = MessageElement("B:8:01010101:cn=c3,%s" % self.ou_computers,
230                                  FLAG_MOD_ADD, "msDS-RevealedUsers")
231         self.ldb.modify(m)
232
233         m = Message()
234         m.dn = Dn(self.ldb, "cn=c1,%s" % self.ou_computers)
235         m["e1"] = MessageElement("B:8:01010101:cn=c2,%s" % self.ou_computers,
236                                  FLAG_MOD_ADD, "msDS-RevealedUsers")
237         self.ldb.modify(m)
238
239         m = Message()
240         m.dn = Dn(self.ldb, "")
241         m["e1"] = MessageElement("0", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
242         self.ldb.modify(m)
243
244         # Add a couple of ms-Exch-Configuration-Container to test forward-link
245         # attributes without backward link (addressBookRoots2)
246         # e1
247         # |--> e2
248         # |    |--> c1
249         self.ldb.add({
250             "dn": "cn=e1,%s" % self.ou,
251             "objectclass": "msExchConfigurationContainer"})
252         self.ldb.add({
253             "dn": "cn=e2,%s" % self.ou,
254             "objectclass": "msExchConfigurationContainer"})
255
256         m = Message()
257         m.dn = Dn(self.ldb, "cn=e2,%s" % self.ou)
258         m["e1"] = MessageElement("cn=c1,%s" % self.ou_computers,
259                                  FLAG_MOD_ADD, "addressBookRoots2")
260         self.ldb.modify(m)
261
262         m = Message()
263         m.dn = Dn(self.ldb, "cn=e1,%s" % self.ou)
264         m["e1"] = MessageElement("cn=e2,%s" % self.ou,
265                                  FLAG_MOD_ADD, "addressBookRoots2")
266         self.ldb.modify(m)
267
268     def tearDown(self):
269         super(MatchRulesTests, self).tearDown()
270         delete_force(self.ldb, "cn=u4,%s" % self.ou_users)
271         delete_force(self.ldb, "cn=u3,%s" % self.ou_users)
272         delete_force(self.ldb, "cn=u2,%s" % self.ou_users)
273         delete_force(self.ldb, "cn=u1,%s" % self.ou_users)
274         delete_force(self.ldb, "cn=g4,%s" % self.ou_groups)
275         delete_force(self.ldb, "cn=g3,%s" % self.ou_groups)
276         delete_force(self.ldb, "cn=g2,%s" % self.ou_groups)
277         delete_force(self.ldb, "cn=g1,%s" % self.ou_groups)
278         delete_force(self.ldb, "cn=c1,%s" % self.ou_computers)
279         delete_force(self.ldb, "cn=c2,%s" % self.ou_computers)
280         delete_force(self.ldb, "cn=c3,%s" % self.ou_computers)
281         delete_force(self.ldb, self.ou_users)
282         delete_force(self.ldb, self.ou_groups)
283         delete_force(self.ldb, self.ou_computers)
284         delete_force(self.ldb, "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
285         delete_force(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
286         delete_force(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
287         delete_force(self.ldb, "OU=o1,%s" % self.ou)
288         delete_force(self.ldb, "CN=e2,%s" % self.ou)
289         delete_force(self.ldb, "CN=e1,%s" % self.ou)
290         delete_force(self.ldb, self.ou)
291
292     def test_u1_member_of_g4(self):
293         # Search without transitive match must return 0 results
294         res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
295                         scope=SCOPE_BASE,
296                         expression="member=cn=u1,%s" % self.ou_users)
297         self.assertTrue(len(res1) == 0)
298
299         res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
300                         scope=SCOPE_BASE,
301                         expression="memberOf=cn=g4,%s" % self.ou_groups)
302         self.assertTrue(len(res1) == 0)
303
304         try:
305             # Search with transitive match must return 1 results
306             res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
307                             scope=SCOPE_BASE,
308                             expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
309             self.assertTrue(len(res1) == 1)
310         except LdbError, err:
311             self.fail(str(err))
312
313         res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
314                         scope=SCOPE_BASE,
315                         expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
316         self.assertTrue(len(res1) == 1)
317
318     def test_g1_member_of_g4(self):
319         # Search without transitive match must return 0 results
320         res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
321                         scope=SCOPE_BASE,
322                         expression="member=cn=g1,%s" % self.ou_groups)
323         self.assertTrue(len(res1) == 0)
324
325         res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
326                         scope=SCOPE_BASE,
327                         expression="memberOf=cn=g4,%s" % self.ou_groups)
328         self.assertTrue(len(res1) == 0)
329
330         try:
331             # Search with transitive match must return 1 results
332             res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
333                             scope=SCOPE_BASE,
334                             expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups)
335             self.assertTrue(len(res1) == 1)
336         except LdbError, err:
337             self.fail(str(err))
338
339         res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
340                         scope=SCOPE_BASE,
341                         expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
342         self.assertTrue(len(res1) == 1)
343
344     def test_u1_groups(self):
345         res1 = self.ldb.search(self.ou_groups,
346                         scope=SCOPE_SUBTREE,
347                         expression="member=cn=u1,%s" % self.ou_users)
348         self.assertTrue(len(res1) == 1)
349
350         try:
351             res1 = self.ldb.search(self.ou_groups,
352                             scope=SCOPE_SUBTREE,
353                             expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
354             self.assertTrue(len(res1) == 4)
355         except LdbError, err:
356             self.fail(str(err))
357
358     def test_u2_groups(self):
359         res1 = self.ldb.search(self.ou_groups,
360                         scope=SCOPE_SUBTREE,
361                         expression="member=cn=u2,%s" % self.ou_users)
362         self.assertTrue(len(res1) == 1)
363
364         try:
365             res1 = self.ldb.search(self.ou_groups,
366                             scope=SCOPE_SUBTREE,
367                             expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users)
368             self.assertTrue(len(res1) == 3)
369         except LdbError, err:
370             self.fail(str(err))
371
372     def test_u3_groups(self):
373         res1 = self.ldb.search(self.ou_groups,
374                         scope=SCOPE_SUBTREE,
375                         expression="member=cn=u3,%s" % self.ou_users)
376         self.assertTrue(len(res1) == 1)
377
378         try:
379             res1 = self.ldb.search(self.ou_groups,
380                             scope=SCOPE_SUBTREE,
381                             expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users)
382             self.assertTrue(len(res1) == 2)
383         except LdbError, err:
384             self.fail(str(err))
385
386     def test_u4_groups(self):
387         res1 = self.ldb.search(self.ou_groups,
388                         scope=SCOPE_SUBTREE,
389                         expression="member=cn=u4,%s" % self.ou_users)
390         self.assertTrue(len(res1) == 1)
391
392         try:
393             res1 = self.ldb.search(self.ou_groups,
394                             scope=SCOPE_SUBTREE,
395                             expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users)
396             self.assertTrue(len(res1) == 1)
397         except LdbError, err:
398             self.fail(str(err))
399
400     def test_extended_dn(self):
401         res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
402                         scope=SCOPE_BASE,
403                         expression="objectClass=*",
404                         attrs=['objectSid', 'objectGUID'])
405         self.assertTrue(len(res1) == 1)
406
407         sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0])
408         guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0])
409
410         res1 = self.ldb.search(self.ou_groups,
411                         scope=SCOPE_SUBTREE,
412                         expression="member=<SID=%s>" % sid)
413         self.assertTrue(len(res1) == 1)
414
415         res1 = self.ldb.search(self.ou_groups,
416                         scope=SCOPE_SUBTREE,
417                         expression="member=<GUID=%s>" % guid)
418         self.assertTrue(len(res1) == 1)
419
420         try:
421             res1 = self.ldb.search(self.ou_groups,
422                             scope=SCOPE_SUBTREE,
423                             expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid)
424             self.assertTrue(len(res1) == 4)
425         except LdbError, err:
426             self.fail(str(err))
427
428         res1 = self.ldb.search(self.ou_groups,
429                         scope=SCOPE_SUBTREE,
430                         expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid)
431         self.assertTrue(len(res1) == 4)
432
433     def test_object_dn_binary(self):
434         res1 = self.ldb.search(self.ou_computers,
435                         scope=SCOPE_SUBTREE,
436                         expression="msDS-RevealedUsers=B:8:01010101:cn=c3,%s" % self.ou_computers)
437         self.assertTrue(len(res1) == 1)
438
439         try:
440             res1 = self.ldb.search(self.ou_computers,
441                             scope=SCOPE_SUBTREE,
442                             expression="msDS-RevealedUsers:1.2.840.113556.1.4.1941:=B:8:01010101:cn=c3,%s" % self.ou_computers)
443             self.assertTrue(len(res1) == 2)
444         except LdbError, err:
445             self.fail(str(err))
446
447     def test_one_way_links(self):
448         res1 = self.ldb.search(self.ou,
449                         scope=SCOPE_SUBTREE,
450                         expression="addressBookRoots2=cn=c1,%s" % self.ou_computers)
451         self.assertTrue(len(res1) == 1)
452
453         try:
454             res1 = self.ldb.search(self.ou,
455                             scope=SCOPE_SUBTREE,
456                             expression="addressBookRoots2:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers)
457             self.assertTrue(len(res1) == 2)
458         except LdbError, err:
459             self.fail(str(err))
460
461     def test_not_linked_attrs(self):
462         res1 = self.ldb.search(self.base_dn,
463                         scope=SCOPE_BASE,
464                         expression="wellKnownObjects=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
465         self.assertTrue(len(res1) == 1)
466
467         try:
468             res1 = self.ldb.search(self.base_dn,
469                             scope=SCOPE_BASE,
470                             expression="wellKnownObjects:1.2.840.113556.1.4.1941:=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
471             self.assertTrue(len(res1) == 0)
472         except LdbError, err:
473             self.fail(str(err))
474
475
476         res1 = self.ldb.search(self.ou,
477                         scope=SCOPE_SUBTREE,
478                         expression="otherWellKnownObjects=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
479         self.assertTrue(len(res1) == 1)
480
481         res1 = self.ldb.search(self.ou,
482                         scope=SCOPE_SUBTREE,
483                         expression="otherWellKnownObjects:1.2.840.113556.1.4.1941:=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
484         self.assertTrue(len(res1) == 0)
485
486 parser = optparse.OptionParser("match_rules.py [options] <host>")
487 sambaopts = options.SambaOptions(parser)
488 parser.add_option_group(sambaopts)
489 parser.add_option_group(options.VersionOptions(parser))
490
491 # use command line creds if available
492 credopts = options.CredentialsOptions(parser)
493 parser.add_option_group(credopts)
494 opts, args = parser.parse_args()
495 subunitopts = SubunitOptions(parser)
496 parser.add_option_group(subunitopts)
497
498 if len(args) < 1:
499     parser.print_usage()
500     sys.exit(1)
501
502 host = args[0]
503
504 lp = sambaopts.get_loadparm()
505 creds = credopts.get_credentials(lp)
506
507 if not "://" in host:
508     if os.path.isfile(host):
509         host = "tdb://%s" % host
510     else:
511         host = "ldap://%s" % host
512
513 TestProgram(module=__name__, opts=subunitopts)