8 import samba.getopt as options
10 from samba.tests.subunitrun import SubunitOptions, TestProgram
12 from samba.tests import delete_force
13 from samba.dcerpc import security, misc
14 from samba.samdb import SamDB
15 from samba.auth import system_session
16 from samba.ndr import ndr_unpack
17 from ldb import Message, MessageElement, Dn, LdbError
18 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
19 from ldb import SCOPE_BASE, SCOPE_SUBTREE
21 class MatchRulesTests(samba.tests.TestCase):
23 super(MatchRulesTests, self).setUp()
25 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
26 self.base_dn = self.ldb.domain_dn()
27 self.ou = "ou=matchrulestest,%s" % self.base_dn
28 self.ou_users = "ou=users,%s" % self.ou
29 self.ou_groups = "ou=groups,%s" % self.ou
30 self.ou_computers = "ou=computers,%s" % self.ou
32 # Add a organizational unit to create objects
35 "objectclass": "organizationalUnit"})
37 # Add the following OU hierarchy and set otherWellKnownObjects,
38 # which has BinaryDN syntax:
46 "dn": "OU=o1,%s" % self.ou,
47 "objectclass": "organizationalUnit"})
49 "dn": "OU=o2,OU=o1,%s" % self.ou,
50 "objectclass": "organizationalUnit"})
52 "dn": "OU=o3,OU=o2,OU=o1,%s" % self.ou,
53 "objectclass": "organizationalUnit"})
55 "dn": "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
56 "objectclass": "organizationalUnit"})
59 m.dn = Dn(self.ldb, self.ou)
60 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000001:OU=o1,%s" % self.ou,
61 FLAG_MOD_ADD, "otherWellKnownObjects")
65 m.dn = Dn(self.ldb, "OU=o1,%s" % self.ou)
66 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000002:OU=o2,OU=o1,%s" % self.ou,
67 FLAG_MOD_ADD, "otherWellKnownObjects")
71 m.dn = Dn(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
72 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000003:OU=o3,OU=o2,OU=o1,%s" % self.ou,
73 FLAG_MOD_ADD, "otherWellKnownObjects")
77 m.dn = Dn(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
78 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
79 FLAG_MOD_ADD, "otherWellKnownObjects")
82 # Create OU for users and groups
85 "objectclass": "organizationalUnit"})
88 "objectclass": "organizationalUnit"})
90 "dn": self.ou_computers,
91 "objectclass": "organizationalUnit"})
95 "dn": "cn=g1,%s" % self.ou_groups,
96 "objectclass": "group" })
98 "dn": "cn=g2,%s" % self.ou_groups,
99 "objectclass": "group" })
101 "dn": "cn=g3,%s" % self.ou_groups,
102 "objectclass": "group" })
104 "dn": "cn=g4,%s" % self.ou_groups,
105 "objectclass": "group" })
109 "dn": "cn=u1,%s" % self.ou_users,
110 "objectclass": "user"})
112 "dn": "cn=u2,%s" % self.ou_users,
113 "objectclass": "user"})
115 "dn": "cn=u3,%s" % self.ou_users,
116 "objectclass": "user"})
118 "dn": "cn=u4,%s" % self.ou_users,
119 "objectclass": "user"})
121 # Add computers to test Object(DN-Binary) syntax
123 "dn": "cn=c1,%s" % self.ou_computers,
124 "objectclass": "computer",
125 "dNSHostName": "c1.%s" % self.lp.get("realm").lower(),
126 "servicePrincipalName": ["HOST/c1"],
127 "sAMAccountName": "c1$",
128 "userAccountControl": "83890178"})
131 "dn": "cn=c2,%s" % self.ou_computers,
132 "objectclass": "computer",
133 "dNSHostName": "c2.%s" % self.lp.get("realm").lower(),
134 "servicePrincipalName": ["HOST/c2"],
135 "sAMAccountName": "c2$",
136 "userAccountControl": "83890178"})
139 "dn": "cn=c3,%s" % self.ou_computers,
140 "objectclass": "computer",
141 "dNSHostName": "c3.%s" % self.lp.get("realm").lower(),
142 "servicePrincipalName": ["HOST/c3"],
143 "sAMAccountName": "c3$",
144 "userAccountControl": "83890178"})
146 # Create the following hierarchy:
158 m.dn = Dn(self.ldb, "cn=g1,%s" % self.ou_groups)
159 m["member"] = MessageElement("cn=u1,%s" % self.ou_users,
160 FLAG_MOD_ADD, "member")
165 m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
166 m["member"] = MessageElement("cn=u2,%s" % self.ou_users,
167 FLAG_MOD_ADD, "member")
172 m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
173 m["member"] = MessageElement("cn=u3,%s" % self.ou_users,
174 FLAG_MOD_ADD, "member")
179 m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
180 m["member"] = MessageElement("cn=u4,%s" % self.ou_users,
181 FLAG_MOD_ADD, "member")
186 m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
187 m["member"] = MessageElement("cn=g3,%s" % self.ou_groups,
188 FLAG_MOD_ADD, "member")
193 m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
194 m["member"] = MessageElement("cn=g2,%s" % self.ou_groups,
195 FLAG_MOD_ADD, "member")
200 m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
201 m["member"] = MessageElement("cn=g1,%s" % self.ou_groups,
202 FLAG_MOD_ADD, "member")
205 # The msDS-RevealedUsers is owned by system and cannot be modified
206 # directly. Set the schemaUpgradeInProgress flag as workaround
207 # and create this hierarchy:
214 # While appropriate for this test, this is NOT a good practice
215 # in general. This is only done here because the alternative
216 # is to make a schema modification.
218 # IF/WHEN Samba protects this attribute better, this
219 # particular part of the test can be removed, as the same code
220 # is covered by the addressBookRoots2 case well enough.
223 m.dn = Dn(self.ldb, "")
224 m["e1"] = MessageElement("1", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
228 m.dn = Dn(self.ldb, "cn=c2,%s" % self.ou_computers)
229 m["e1"] = MessageElement("B:8:01010101:cn=c3,%s" % self.ou_computers,
230 FLAG_MOD_ADD, "msDS-RevealedUsers")
234 m.dn = Dn(self.ldb, "cn=c1,%s" % self.ou_computers)
235 m["e1"] = MessageElement("B:8:01010101:cn=c2,%s" % self.ou_computers,
236 FLAG_MOD_ADD, "msDS-RevealedUsers")
240 m.dn = Dn(self.ldb, "")
241 m["e1"] = MessageElement("0", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
244 # Add a couple of ms-Exch-Configuration-Container to test forward-link
245 # attributes without backward link (addressBookRoots2)
250 "dn": "cn=e1,%s" % self.ou,
251 "objectclass": "msExchConfigurationContainer"})
253 "dn": "cn=e2,%s" % self.ou,
254 "objectclass": "msExchConfigurationContainer"})
257 m.dn = Dn(self.ldb, "cn=e2,%s" % self.ou)
258 m["e1"] = MessageElement("cn=c1,%s" % self.ou_computers,
259 FLAG_MOD_ADD, "addressBookRoots2")
263 m.dn = Dn(self.ldb, "cn=e1,%s" % self.ou)
264 m["e1"] = MessageElement("cn=e2,%s" % self.ou,
265 FLAG_MOD_ADD, "addressBookRoots2")
269 super(MatchRulesTests, self).tearDown()
270 delete_force(self.ldb, "cn=u4,%s" % self.ou_users)
271 delete_force(self.ldb, "cn=u3,%s" % self.ou_users)
272 delete_force(self.ldb, "cn=u2,%s" % self.ou_users)
273 delete_force(self.ldb, "cn=u1,%s" % self.ou_users)
274 delete_force(self.ldb, "cn=g4,%s" % self.ou_groups)
275 delete_force(self.ldb, "cn=g3,%s" % self.ou_groups)
276 delete_force(self.ldb, "cn=g2,%s" % self.ou_groups)
277 delete_force(self.ldb, "cn=g1,%s" % self.ou_groups)
278 delete_force(self.ldb, "cn=c1,%s" % self.ou_computers)
279 delete_force(self.ldb, "cn=c2,%s" % self.ou_computers)
280 delete_force(self.ldb, "cn=c3,%s" % self.ou_computers)
281 delete_force(self.ldb, self.ou_users)
282 delete_force(self.ldb, self.ou_groups)
283 delete_force(self.ldb, self.ou_computers)
284 delete_force(self.ldb, "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
285 delete_force(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
286 delete_force(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
287 delete_force(self.ldb, "OU=o1,%s" % self.ou)
288 delete_force(self.ldb, "CN=e2,%s" % self.ou)
289 delete_force(self.ldb, "CN=e1,%s" % self.ou)
290 delete_force(self.ldb, self.ou)
292 def test_u1_member_of_g4(self):
293 # Search without transitive match must return 0 results
294 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
296 expression="member=cn=u1,%s" % self.ou_users)
297 self.assertTrue(len(res1) == 0)
299 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
301 expression="memberOf=cn=g4,%s" % self.ou_groups)
302 self.assertTrue(len(res1) == 0)
305 # Search with transitive match must return 1 results
306 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
308 expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
309 self.assertTrue(len(res1) == 1)
310 except LdbError, err:
313 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
315 expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
316 self.assertTrue(len(res1) == 1)
318 def test_g1_member_of_g4(self):
319 # Search without transitive match must return 0 results
320 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
322 expression="member=cn=g1,%s" % self.ou_groups)
323 self.assertTrue(len(res1) == 0)
325 res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
327 expression="memberOf=cn=g4,%s" % self.ou_groups)
328 self.assertTrue(len(res1) == 0)
331 # Search with transitive match must return 1 results
332 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
334 expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups)
335 self.assertTrue(len(res1) == 1)
336 except LdbError, err:
339 res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
341 expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
342 self.assertTrue(len(res1) == 1)
344 def test_u1_groups(self):
345 res1 = self.ldb.search(self.ou_groups,
347 expression="member=cn=u1,%s" % self.ou_users)
348 self.assertTrue(len(res1) == 1)
351 res1 = self.ldb.search(self.ou_groups,
353 expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
354 self.assertTrue(len(res1) == 4)
355 except LdbError, err:
358 def test_u2_groups(self):
359 res1 = self.ldb.search(self.ou_groups,
361 expression="member=cn=u2,%s" % self.ou_users)
362 self.assertTrue(len(res1) == 1)
365 res1 = self.ldb.search(self.ou_groups,
367 expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users)
368 self.assertTrue(len(res1) == 3)
369 except LdbError, err:
372 def test_u3_groups(self):
373 res1 = self.ldb.search(self.ou_groups,
375 expression="member=cn=u3,%s" % self.ou_users)
376 self.assertTrue(len(res1) == 1)
379 res1 = self.ldb.search(self.ou_groups,
381 expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users)
382 self.assertTrue(len(res1) == 2)
383 except LdbError, err:
386 def test_u4_groups(self):
387 res1 = self.ldb.search(self.ou_groups,
389 expression="member=cn=u4,%s" % self.ou_users)
390 self.assertTrue(len(res1) == 1)
393 res1 = self.ldb.search(self.ou_groups,
395 expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users)
396 self.assertTrue(len(res1) == 1)
397 except LdbError, err:
400 def test_extended_dn(self):
401 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
403 expression="objectClass=*",
404 attrs=['objectSid', 'objectGUID'])
405 self.assertTrue(len(res1) == 1)
407 sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0])
408 guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0])
410 res1 = self.ldb.search(self.ou_groups,
412 expression="member=<SID=%s>" % sid)
413 self.assertTrue(len(res1) == 1)
415 res1 = self.ldb.search(self.ou_groups,
417 expression="member=<GUID=%s>" % guid)
418 self.assertTrue(len(res1) == 1)
421 res1 = self.ldb.search(self.ou_groups,
423 expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid)
424 self.assertTrue(len(res1) == 4)
425 except LdbError, err:
428 res1 = self.ldb.search(self.ou_groups,
430 expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid)
431 self.assertTrue(len(res1) == 4)
433 def test_object_dn_binary(self):
434 res1 = self.ldb.search(self.ou_computers,
436 expression="msDS-RevealedUsers=B:8:01010101:cn=c3,%s" % self.ou_computers)
437 self.assertTrue(len(res1) == 1)
440 res1 = self.ldb.search(self.ou_computers,
442 expression="msDS-RevealedUsers:1.2.840.113556.1.4.1941:=B:8:01010101:cn=c3,%s" % self.ou_computers)
443 self.assertTrue(len(res1) == 2)
444 except LdbError, err:
447 def test_one_way_links(self):
448 res1 = self.ldb.search(self.ou,
450 expression="addressBookRoots2=cn=c1,%s" % self.ou_computers)
451 self.assertTrue(len(res1) == 1)
454 res1 = self.ldb.search(self.ou,
456 expression="addressBookRoots2:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers)
457 self.assertTrue(len(res1) == 2)
458 except LdbError, err:
461 def test_not_linked_attrs(self):
462 res1 = self.ldb.search(self.base_dn,
464 expression="wellKnownObjects=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
465 self.assertTrue(len(res1) == 1)
468 res1 = self.ldb.search(self.base_dn,
470 expression="wellKnownObjects:1.2.840.113556.1.4.1941:=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
471 self.assertTrue(len(res1) == 0)
472 except LdbError, err:
476 res1 = self.ldb.search(self.ou,
478 expression="otherWellKnownObjects=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
479 self.assertTrue(len(res1) == 1)
481 res1 = self.ldb.search(self.ou,
483 expression="otherWellKnownObjects:1.2.840.113556.1.4.1941:=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
484 self.assertTrue(len(res1) == 0)
486 parser = optparse.OptionParser("match_rules.py [options] <host>")
487 sambaopts = options.SambaOptions(parser)
488 parser.add_option_group(sambaopts)
489 parser.add_option_group(options.VersionOptions(parser))
491 # use command line creds if available
492 credopts = options.CredentialsOptions(parser)
493 parser.add_option_group(credopts)
494 opts, args = parser.parse_args()
495 subunitopts = SubunitOptions(parser)
496 parser.add_option_group(subunitopts)
504 lp = sambaopts.get_loadparm()
505 creds = credopts.get_credentials(lp)
507 if not "://" in host:
508 if os.path.isfile(host):
509 host = "tdb://%s" % host
511 host = "ldap://%s" % host
513 TestProgram(module=__name__, opts=subunitopts)