8 import samba.getopt as options
10 from samba.tests.subunitrun import SubunitOptions, TestProgram
12 from samba.tests import delete_force
13 from samba.dcerpc import security, misc
14 from samba.samdb import SamDB
15 from samba.auth import system_session
16 from samba.ndr import ndr_unpack
17 from ldb import Message, MessageElement, Dn
18 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
19 from ldb import SCOPE_BASE, SCOPE_SUBTREE
21 class MatchRulesTests(samba.tests.TestCase):
23 super(MatchRulesTests, self).setUp()
25 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
26 self.base_dn = self.ldb.domain_dn()
27 self.ou = "ou=matchrulestest,%s" % self.base_dn
28 self.ou_users = "ou=users,%s" % self.ou
29 self.ou_groups = "ou=groups,%s" % self.ou
30 self.ou_computers = "ou=computers,%s" % self.ou
32 # Add a organizational unit to create objects
35 "objectclass": "organizationalUnit"})
37 # Add the following OU hierarchy and set otherWellKnownObjects,
38 # which has BinaryDN syntax:
46 "dn": "OU=o1,%s" % self.ou,
47 "objectclass": "organizationalUnit"})
49 "dn": "OU=o2,OU=o1,%s" % self.ou,
50 "objectclass": "organizationalUnit"})
52 "dn": "OU=o3,OU=o2,OU=o1,%s" % self.ou,
53 "objectclass": "organizationalUnit"})
55 "dn": "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
56 "objectclass": "organizationalUnit"})
59 m.dn = Dn(self.ldb, self.ou)
60 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000001:OU=o1,%s" % self.ou,
61 FLAG_MOD_ADD, "otherWellKnownObjects")
65 m.dn = Dn(self.ldb, "OU=o1,%s" % self.ou)
66 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000002:OU=o2,OU=o1,%s" % self.ou,
67 FLAG_MOD_ADD, "otherWellKnownObjects")
71 m.dn = Dn(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
72 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000003:OU=o3,OU=o2,OU=o1,%s" % self.ou,
73 FLAG_MOD_ADD, "otherWellKnownObjects")
77 m.dn = Dn(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
78 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
79 FLAG_MOD_ADD, "otherWellKnownObjects")
82 # Create OU for users and groups
85 "objectclass": "organizationalUnit"})
88 "objectclass": "organizationalUnit"})
90 "dn": self.ou_computers,
91 "objectclass": "organizationalUnit"})
95 "dn": "cn=g1,%s" % self.ou_groups,
96 "objectclass": "group" })
98 "dn": "cn=g2,%s" % self.ou_groups,
99 "objectclass": "group" })
101 "dn": "cn=g3,%s" % self.ou_groups,
102 "objectclass": "group" })
104 "dn": "cn=g4,%s" % self.ou_groups,
105 "objectclass": "group" })
109 "dn": "cn=u1,%s" % self.ou_users,
110 "objectclass": "user"})
112 "dn": "cn=u2,%s" % self.ou_users,
113 "objectclass": "user"})
115 "dn": "cn=u3,%s" % self.ou_users,
116 "objectclass": "user"})
118 "dn": "cn=u4,%s" % self.ou_users,
119 "objectclass": "user"})
121 # Add computers to test Object(DN-Binary) syntax
123 "dn": "cn=c1,%s" % self.ou_computers,
124 "objectclass": "computer",
125 "dNSHostName": "c1.%s" % self.lp.get("realm").lower(),
126 "servicePrincipalName": ["HOST/c1"],
127 "sAMAccountName": "c1$",
128 "userAccountControl": "83890178"})
131 "dn": "cn=c2,%s" % self.ou_computers,
132 "objectclass": "computer",
133 "dNSHostName": "c2.%s" % self.lp.get("realm").lower(),
134 "servicePrincipalName": ["HOST/c2"],
135 "sAMAccountName": "c2$",
136 "userAccountControl": "83890178"})
139 "dn": "cn=c3,%s" % self.ou_computers,
140 "objectclass": "computer",
141 "dNSHostName": "c3.%s" % self.lp.get("realm").lower(),
142 "servicePrincipalName": ["HOST/c3"],
143 "sAMAccountName": "c3$",
144 "userAccountControl": "83890178"})
146 # Create the following hierarchy:
158 m.dn = Dn(self.ldb, "cn=g1,%s" % self.ou_groups)
159 m["member"] = MessageElement("cn=u1,%s" % self.ou_users,
160 FLAG_MOD_ADD, "member")
165 m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
166 m["member"] = MessageElement("cn=u2,%s" % self.ou_users,
167 FLAG_MOD_ADD, "member")
172 m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
173 m["member"] = MessageElement("cn=u3,%s" % self.ou_users,
174 FLAG_MOD_ADD, "member")
179 m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
180 m["member"] = MessageElement("cn=u4,%s" % self.ou_users,
181 FLAG_MOD_ADD, "member")
186 m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
187 m["member"] = MessageElement("cn=g3,%s" % self.ou_groups,
188 FLAG_MOD_ADD, "member")
193 m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
194 m["member"] = MessageElement("cn=g2,%s" % self.ou_groups,
195 FLAG_MOD_ADD, "member")
200 m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
201 m["member"] = MessageElement("cn=g1,%s" % self.ou_groups,
202 FLAG_MOD_ADD, "member")
205 # The msDS-RevealedUsers is owned by system and cannot be modified
206 # directly. Set the schemaUpgradeInProgress flag as workaround
207 # and create this hierarchy:
214 # While appropriate for this test, this is NOT a good practice
215 # in general. This is only done here because the alternative
216 # is to make a schema modification.
218 # IF/WHEN Samba protects this attribute better, this
219 # particular part of the test can be removed, as the same code
220 # is covered by the addressBookRoots2 case well enough.
223 m.dn = Dn(self.ldb, "")
224 m["e1"] = MessageElement("1", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
228 m.dn = Dn(self.ldb, "cn=c2,%s" % self.ou_computers)
229 m["e1"] = MessageElement("B:8:01010101:cn=c3,%s" % self.ou_computers,
230 FLAG_MOD_ADD, "msDS-RevealedUsers")
234 m.dn = Dn(self.ldb, "cn=c1,%s" % self.ou_computers)
235 m["e1"] = MessageElement("B:8:01010101:cn=c2,%s" % self.ou_computers,
236 FLAG_MOD_ADD, "msDS-RevealedUsers")
240 m.dn = Dn(self.ldb, "")
241 m["e1"] = MessageElement("0", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
244 # Add a couple of ms-Exch-Configuration-Container to test forward-link
245 # attributes without backward link (addressBookRoots2)
250 "dn": "cn=e1,%s" % self.ou,
251 "objectclass": "msExchConfigurationContainer"})
253 "dn": "cn=e2,%s" % self.ou,
254 "objectclass": "msExchConfigurationContainer"})
257 m.dn = Dn(self.ldb, "cn=e2,%s" % self.ou)
258 m["e1"] = MessageElement("cn=c1,%s" % self.ou_computers,
259 FLAG_MOD_ADD, "addressBookRoots2")
263 m.dn = Dn(self.ldb, "cn=e1,%s" % self.ou)
264 m["e1"] = MessageElement("cn=e2,%s" % self.ou,
265 FLAG_MOD_ADD, "addressBookRoots2")
269 super(MatchRulesTests, self).tearDown()
270 delete_force(self.ldb, "cn=u4,%s" % self.ou_users)
271 delete_force(self.ldb, "cn=u3,%s" % self.ou_users)
272 delete_force(self.ldb, "cn=u2,%s" % self.ou_users)
273 delete_force(self.ldb, "cn=u1,%s" % self.ou_users)
274 delete_force(self.ldb, "cn=g4,%s" % self.ou_groups)
275 delete_force(self.ldb, "cn=g3,%s" % self.ou_groups)
276 delete_force(self.ldb, "cn=g2,%s" % self.ou_groups)
277 delete_force(self.ldb, "cn=g1,%s" % self.ou_groups)
278 delete_force(self.ldb, "cn=c1,%s" % self.ou_computers)
279 delete_force(self.ldb, "cn=c2,%s" % self.ou_computers)
280 delete_force(self.ldb, "cn=c3,%s" % self.ou_computers)
281 delete_force(self.ldb, self.ou_users)
282 delete_force(self.ldb, self.ou_groups)
283 delete_force(self.ldb, self.ou_computers)
284 delete_force(self.ldb, "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
285 delete_force(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
286 delete_force(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
287 delete_force(self.ldb, "OU=o1,%s" % self.ou)
288 delete_force(self.ldb, "CN=e2,%s" % self.ou)
289 delete_force(self.ldb, "CN=e1,%s" % self.ou)
290 delete_force(self.ldb, self.ou)
292 def test_u1_member_of_g4(self):
293 # Search without transitive match must return 0 results
294 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
296 expression="member=cn=u1,%s" % self.ou_users)
297 self.assertTrue(len(res1) == 0)
299 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
301 expression="memberOf=cn=g4,%s" % self.ou_groups)
302 self.assertTrue(len(res1) == 0)
304 # Search with transitive match must return 1 results
305 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
307 expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
308 self.assertTrue(len(res1) == 1)
310 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
312 expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
313 self.assertTrue(len(res1) == 1)
315 def test_g1_member_of_g4(self):
316 # Search without transitive match must return 0 results
317 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
319 expression="member=cn=g1,%s" % self.ou_groups)
320 self.assertTrue(len(res1) == 0)
322 res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
324 expression="memberOf=cn=g4,%s" % self.ou_groups)
325 self.assertTrue(len(res1) == 0)
327 # Search with transitive match must return 1 results
328 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
330 expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups)
331 self.assertTrue(len(res1) == 1)
333 res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
335 expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
336 self.assertTrue(len(res1) == 1)
338 def test_u1_groups(self):
339 res1 = self.ldb.search(self.ou_groups,
341 expression="member=cn=u1,%s" % self.ou_users)
342 self.assertTrue(len(res1) == 1)
344 res1 = self.ldb.search(self.ou_groups,
346 expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
347 self.assertTrue(len(res1) == 4)
349 def test_u2_groups(self):
350 res1 = self.ldb.search(self.ou_groups,
352 expression="member=cn=u2,%s" % self.ou_users)
353 self.assertTrue(len(res1) == 1)
355 res1 = self.ldb.search(self.ou_groups,
357 expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users)
358 self.assertTrue(len(res1) == 3)
360 def test_u3_groups(self):
361 res1 = self.ldb.search(self.ou_groups,
363 expression="member=cn=u3,%s" % self.ou_users)
364 self.assertTrue(len(res1) == 1)
366 res1 = self.ldb.search(self.ou_groups,
368 expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users)
369 self.assertTrue(len(res1) == 2)
371 def test_u4_groups(self):
372 res1 = self.ldb.search(self.ou_groups,
374 expression="member=cn=u4,%s" % self.ou_users)
375 self.assertTrue(len(res1) == 1)
377 res1 = self.ldb.search(self.ou_groups,
379 expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users)
380 self.assertTrue(len(res1) == 1)
382 def test_extended_dn(self):
383 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
385 expression="objectClass=*",
386 attrs=['objectSid', 'objectGUID'])
387 self.assertTrue(len(res1) == 1)
389 sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0])
390 guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0])
392 res1 = self.ldb.search(self.ou_groups,
394 expression="member=<SID=%s>" % sid)
395 self.assertTrue(len(res1) == 1)
397 res1 = self.ldb.search(self.ou_groups,
399 expression="member=<GUID=%s>" % guid)
400 self.assertTrue(len(res1) == 1)
402 res1 = self.ldb.search(self.ou_groups,
404 expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid)
405 self.assertTrue(len(res1) == 4)
407 res1 = self.ldb.search(self.ou_groups,
409 expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid)
410 self.assertTrue(len(res1) == 4)
412 def test_object_dn_binary(self):
413 res1 = self.ldb.search(self.ou_computers,
415 expression="msDS-RevealedUsers=B:8:01010101:cn=c3,%s" % self.ou_computers)
416 self.assertTrue(len(res1) == 1)
418 res1 = self.ldb.search(self.ou_computers,
420 expression="msDS-RevealedUsers:1.2.840.113556.1.4.1941:=B:8:01010101:cn=c3,%s" % self.ou_computers)
421 self.assertTrue(len(res1) == 2)
423 def test_one_way_links(self):
424 res1 = self.ldb.search(self.ou,
426 expression="addressBookRoots2=cn=c1,%s" % self.ou_computers)
427 self.assertTrue(len(res1) == 1)
429 res1 = self.ldb.search(self.ou,
431 expression="addressBookRoots2:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers)
432 self.assertTrue(len(res1) == 2)
434 def test_not_linked_attrs(self):
435 res1 = self.ldb.search(self.base_dn,
437 expression="wellKnownObjects=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
438 self.assertTrue(len(res1) == 1)
440 res1 = self.ldb.search(self.base_dn,
442 expression="wellKnownObjects:1.2.840.113556.1.4.1941:=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
443 self.assertTrue(len(res1) == 0)
446 res1 = self.ldb.search(self.ou,
448 expression="otherWellKnownObjects=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
449 self.assertTrue(len(res1) == 1)
451 res1 = self.ldb.search(self.ou,
453 expression="otherWellKnownObjects:1.2.840.113556.1.4.1941:=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
454 self.assertTrue(len(res1) == 0)
456 parser = optparse.OptionParser("match_rules.py [options] <host>")
457 sambaopts = options.SambaOptions(parser)
458 parser.add_option_group(sambaopts)
459 parser.add_option_group(options.VersionOptions(parser))
461 # use command line creds if available
462 credopts = options.CredentialsOptions(parser)
463 parser.add_option_group(credopts)
464 opts, args = parser.parse_args()
465 subunitopts = SubunitOptions(parser)
466 parser.add_option_group(subunitopts)
474 lp = sambaopts.get_loadparm()
475 creds = credopts.get_credentials(lp)
477 if not "://" in host:
478 if os.path.isfile(host):
479 host = "tdb://%s" % host
481 host = "ldap://%s" % host
483 TestProgram(module=__name__, opts=subunitopts)