8 import samba.getopt as options
10 from samba.tests.subunitrun import SubunitOptions, TestProgram
12 from samba.tests import delete_force
13 from samba.dcerpc import security, misc
14 from samba.samdb import SamDB
15 from samba.auth import system_session
16 from samba.ndr import ndr_unpack
17 from ldb import Message, MessageElement, Dn
18 from ldb import FLAG_MOD_ADD, FLAG_MOD_REPLACE, FLAG_MOD_DELETE
19 from ldb import SCOPE_BASE, SCOPE_SUBTREE
21 class MatchRulesTests(samba.tests.TestCase):
23 super(MatchRulesTests, self).setUp()
25 self.ldb = SamDB(host, credentials=creds, session_info=system_session(lp), lp=lp)
26 self.base_dn = self.ldb.domain_dn()
27 self.ou = "ou=matchrulestest,%s" % self.base_dn
28 self.ou_users = "ou=users,%s" % self.ou
29 self.ou_groups = "ou=groups,%s" % self.ou
30 self.ou_computers = "ou=computers,%s" % self.ou
32 # Add a organizational unit to create objects
35 "objectclass": "organizationalUnit"})
37 # Add the following OU hierarchy and set otherWellKnownObjects,
38 # which has BinaryDN syntax:
46 "dn": "OU=o1,%s" % self.ou,
47 "objectclass": "organizationalUnit"})
49 "dn": "OU=o2,OU=o1,%s" % self.ou,
50 "objectclass": "organizationalUnit"})
52 "dn": "OU=o3,OU=o2,OU=o1,%s" % self.ou,
53 "objectclass": "organizationalUnit"})
55 "dn": "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
56 "objectclass": "organizationalUnit"})
59 m.dn = Dn(self.ldb, self.ou)
60 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000001:OU=o1,%s" % self.ou,
61 FLAG_MOD_ADD, "otherWellKnownObjects")
65 m.dn = Dn(self.ldb, "OU=o1,%s" % self.ou)
66 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000002:OU=o2,OU=o1,%s" % self.ou,
67 FLAG_MOD_ADD, "otherWellKnownObjects")
71 m.dn = Dn(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
72 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000003:OU=o3,OU=o2,OU=o1,%s" % self.ou,
73 FLAG_MOD_ADD, "otherWellKnownObjects")
77 m.dn = Dn(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
78 m["otherWellKnownObjects"] = MessageElement("B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou,
79 FLAG_MOD_ADD, "otherWellKnownObjects")
82 # Create OU for users and groups
85 "objectclass": "organizationalUnit"})
88 "objectclass": "organizationalUnit"})
90 "dn": self.ou_computers,
91 "objectclass": "organizationalUnit"})
95 "dn": "cn=g1,%s" % self.ou_groups,
96 "objectclass": "group" })
98 "dn": "cn=g2,%s" % self.ou_groups,
99 "objectclass": "group" })
101 "dn": "cn=g3,%s" % self.ou_groups,
102 "objectclass": "group" })
104 "dn": "cn=g4,%s" % self.ou_groups,
105 "objectclass": "group" })
109 "dn": "cn=u1,%s" % self.ou_users,
110 "objectclass": "user"})
112 "dn": "cn=u2,%s" % self.ou_users,
113 "objectclass": "user"})
115 "dn": "cn=u3,%s" % self.ou_users,
116 "objectclass": "user"})
118 "dn": "cn=u4,%s" % self.ou_users,
119 "objectclass": "user"})
121 # Add computers to test Object(DN-Binary) syntax
123 "dn": "cn=c1,%s" % self.ou_computers,
124 "objectclass": "computer",
125 "dNSHostName": "c1.%s" % self.lp.get("realm").lower(),
126 "servicePrincipalName": ["HOST/c1"],
127 "sAMAccountName": "c1$",
128 "userAccountControl": "83890178"})
131 "dn": "cn=c2,%s" % self.ou_computers,
132 "objectclass": "computer",
133 "dNSHostName": "c2.%s" % self.lp.get("realm").lower(),
134 "servicePrincipalName": ["HOST/c2"],
135 "sAMAccountName": "c2$",
136 "userAccountControl": "83890178"})
139 "dn": "cn=c3,%s" % self.ou_computers,
140 "objectclass": "computer",
141 "dNSHostName": "c3.%s" % self.lp.get("realm").lower(),
142 "servicePrincipalName": ["HOST/c3"],
143 "sAMAccountName": "c3$",
144 "userAccountControl": "83890178"})
146 # Create the following hierarchy:
158 m.dn = Dn(self.ldb, "cn=g1,%s" % self.ou_groups)
159 m["member"] = MessageElement("cn=u1,%s" % self.ou_users,
160 FLAG_MOD_ADD, "member")
165 m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
166 m["member"] = MessageElement("cn=u2,%s" % self.ou_users,
167 FLAG_MOD_ADD, "member")
172 m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
173 m["member"] = MessageElement("cn=u3,%s" % self.ou_users,
174 FLAG_MOD_ADD, "member")
179 m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
180 m["member"] = MessageElement("cn=u4,%s" % self.ou_users,
181 FLAG_MOD_ADD, "member")
186 m.dn = Dn(self.ldb, "cn=g4,%s" % self.ou_groups)
187 m["member"] = MessageElement("cn=g3,%s" % self.ou_groups,
188 FLAG_MOD_ADD, "member")
193 m.dn = Dn(self.ldb, "cn=g3,%s" % self.ou_groups)
194 m["member"] = MessageElement("cn=g2,%s" % self.ou_groups,
195 FLAG_MOD_ADD, "member")
200 m.dn = Dn(self.ldb, "cn=g2,%s" % self.ou_groups)
201 m["member"] = MessageElement("cn=g1,%s" % self.ou_groups,
202 FLAG_MOD_ADD, "member")
205 # The msDS-RevealedUsers is owned by system and cannot be modified
206 # directly. Set the schemaUpgradeInProgress flag as workaround
207 # and create this hierarchy:
213 m.dn = Dn(self.ldb, "")
214 m["e1"] = MessageElement("1", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
218 m.dn = Dn(self.ldb, "cn=c2,%s" % self.ou_computers)
219 m["e1"] = MessageElement("B:8:01010101:cn=c3,%s" % self.ou_computers,
220 FLAG_MOD_ADD, "msDS-RevealedUsers")
224 m.dn = Dn(self.ldb, "cn=c1,%s" % self.ou_computers)
225 m["e1"] = MessageElement("B:8:01010101:cn=c2,%s" % self.ou_computers,
226 FLAG_MOD_ADD, "msDS-RevealedUsers")
230 m.dn = Dn(self.ldb, "")
231 m["e1"] = MessageElement("0", FLAG_MOD_REPLACE, "schemaUpgradeInProgress")
234 # Add a couple of ms-Exch-Configuration-Container to test forward-link
235 # attributes without backward link (addressBookRoots2)
240 "dn": "cn=e1,%s" % self.ou,
241 "objectclass": "msExchConfigurationContainer"})
243 "dn": "cn=e2,%s" % self.ou,
244 "objectclass": "msExchConfigurationContainer"})
247 m.dn = Dn(self.ldb, "cn=e2,%s" % self.ou)
248 m["e1"] = MessageElement("cn=c1,%s" % self.ou_computers,
249 FLAG_MOD_ADD, "addressBookRoots2")
253 m.dn = Dn(self.ldb, "cn=e1,%s" % self.ou)
254 m["e1"] = MessageElement("cn=e2,%s" % self.ou,
255 FLAG_MOD_ADD, "addressBookRoots2")
259 super(MatchRulesTests, self).tearDown()
260 delete_force(self.ldb, "cn=u4,%s" % self.ou_users)
261 delete_force(self.ldb, "cn=u3,%s" % self.ou_users)
262 delete_force(self.ldb, "cn=u2,%s" % self.ou_users)
263 delete_force(self.ldb, "cn=u1,%s" % self.ou_users)
264 delete_force(self.ldb, "cn=g4,%s" % self.ou_groups)
265 delete_force(self.ldb, "cn=g3,%s" % self.ou_groups)
266 delete_force(self.ldb, "cn=g2,%s" % self.ou_groups)
267 delete_force(self.ldb, "cn=g1,%s" % self.ou_groups)
268 delete_force(self.ldb, "cn=c1,%s" % self.ou_computers)
269 delete_force(self.ldb, "cn=c2,%s" % self.ou_computers)
270 delete_force(self.ldb, "cn=c3,%s" % self.ou_computers)
271 delete_force(self.ldb, self.ou_users)
272 delete_force(self.ldb, self.ou_groups)
273 delete_force(self.ldb, self.ou_computers)
274 delete_force(self.ldb, "OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
275 delete_force(self.ldb, "OU=o3,OU=o2,OU=o1,%s" % self.ou)
276 delete_force(self.ldb, "OU=o2,OU=o1,%s" % self.ou)
277 delete_force(self.ldb, "OU=o1,%s" % self.ou)
278 delete_force(self.ldb, "CN=e2,%s" % self.ou)
279 delete_force(self.ldb, "CN=e1,%s" % self.ou)
280 delete_force(self.ldb, self.ou)
282 def test_u1_member_of_g4(self):
283 # Search without transitive match must return 0 results
284 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
286 expression="member=cn=u1,%s" % self.ou_users)
287 self.assertTrue(len(res1) == 0)
289 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
291 expression="memberOf=cn=g4,%s" % self.ou_groups)
292 self.assertTrue(len(res1) == 0)
294 # Search with transitive match must return 1 results
295 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
297 expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
298 self.assertTrue(len(res1) == 1)
300 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
302 expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
303 self.assertTrue(len(res1) == 1)
305 def test_g1_member_of_g4(self):
306 # Search without transitive match must return 0 results
307 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
309 expression="member=cn=g1,%s" % self.ou_groups)
310 self.assertTrue(len(res1) == 0)
312 res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
314 expression="memberOf=cn=g4,%s" % self.ou_groups)
315 self.assertTrue(len(res1) == 0)
317 # Search with transitive match must return 1 results
318 res1 = self.ldb.search("cn=g4,%s" % self.ou_groups,
320 expression="member:1.2.840.113556.1.4.1941:=cn=g1,%s" % self.ou_groups)
321 self.assertTrue(len(res1) == 1)
323 res1 = self.ldb.search("cn=g1,%s" % self.ou_groups,
325 expression="memberOf:1.2.840.113556.1.4.1941:=cn=g4,%s" % self.ou_groups)
326 self.assertTrue(len(res1) == 1)
328 def test_u1_groups(self):
329 res1 = self.ldb.search(self.ou_groups,
331 expression="member=cn=u1,%s" % self.ou_users)
332 self.assertTrue(len(res1) == 1)
334 res1 = self.ldb.search(self.ou_groups,
336 expression="member:1.2.840.113556.1.4.1941:=cn=u1,%s" % self.ou_users)
337 self.assertTrue(len(res1) == 4)
339 def test_u2_groups(self):
340 res1 = self.ldb.search(self.ou_groups,
342 expression="member=cn=u2,%s" % self.ou_users)
343 self.assertTrue(len(res1) == 1)
345 res1 = self.ldb.search(self.ou_groups,
347 expression="member:1.2.840.113556.1.4.1941:=cn=u2,%s" % self.ou_users)
348 self.assertTrue(len(res1) == 3)
350 def test_u3_groups(self):
351 res1 = self.ldb.search(self.ou_groups,
353 expression="member=cn=u3,%s" % self.ou_users)
354 self.assertTrue(len(res1) == 1)
356 res1 = self.ldb.search(self.ou_groups,
358 expression="member:1.2.840.113556.1.4.1941:=cn=u3,%s" % self.ou_users)
359 self.assertTrue(len(res1) == 2)
361 def test_u4_groups(self):
362 res1 = self.ldb.search(self.ou_groups,
364 expression="member=cn=u4,%s" % self.ou_users)
365 self.assertTrue(len(res1) == 1)
367 res1 = self.ldb.search(self.ou_groups,
369 expression="member:1.2.840.113556.1.4.1941:=cn=u4,%s" % self.ou_users)
370 self.assertTrue(len(res1) == 1)
372 def test_extended_dn(self):
373 res1 = self.ldb.search("cn=u1,%s" % self.ou_users,
375 expression="objectClass=*",
376 attrs=['objectSid', 'objectGUID'])
377 self.assertTrue(len(res1) == 1)
379 sid = self.ldb.schema_format_value("objectSid", res1[0]["objectSid"][0])
380 guid = self.ldb.schema_format_value("objectGUID", res1[0]['objectGUID'][0])
382 res1 = self.ldb.search(self.ou_groups,
384 expression="member=<SID=%s>" % sid)
385 self.assertTrue(len(res1) == 1)
387 res1 = self.ldb.search(self.ou_groups,
389 expression="member=<GUID=%s>" % guid)
390 self.assertTrue(len(res1) == 1)
392 res1 = self.ldb.search(self.ou_groups,
394 expression="member:1.2.840.113556.1.4.1941:=<SID=%s>" % sid)
395 self.assertTrue(len(res1) == 4)
397 res1 = self.ldb.search(self.ou_groups,
399 expression="member:1.2.840.113556.1.4.1941:=<GUID=%s>" % guid)
400 self.assertTrue(len(res1) == 4)
402 def test_object_dn_binary(self):
403 res1 = self.ldb.search(self.ou_computers,
405 expression="msDS-RevealedUsers=B:8:01010101:cn=c3,%s" % self.ou_computers)
406 self.assertTrue(len(res1) == 1)
408 res1 = self.ldb.search(self.ou_computers,
410 expression="msDS-RevealedUsers:1.2.840.113556.1.4.1941:=B:8:01010101:cn=c3,%s" % self.ou_computers)
411 self.assertTrue(len(res1) == 2)
413 def test_one_way_links(self):
414 res1 = self.ldb.search(self.ou,
416 expression="addressBookRoots2=cn=c1,%s" % self.ou_computers)
417 self.assertTrue(len(res1) == 1)
419 res1 = self.ldb.search(self.ou,
421 expression="addressBookRoots2:1.2.840.113556.1.4.1941:=cn=c1,%s" % self.ou_computers)
422 self.assertTrue(len(res1) == 2)
424 def test_not_linked_attrs(self):
425 res1 = self.ldb.search(self.base_dn,
427 expression="wellKnownObjects=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
428 self.assertTrue(len(res1) == 1)
430 res1 = self.ldb.search(self.base_dn,
432 expression="wellKnownObjects:1.2.840.113556.1.4.1941:=B:32:aa312825768811d1aded00c04fd8d5cd:CN=computers,%s" % self.base_dn)
433 self.assertTrue(len(res1) == 0)
436 res1 = self.ldb.search(self.ou,
438 expression="otherWellKnownObjects=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
439 self.assertTrue(len(res1) == 1)
441 res1 = self.ldb.search(self.ou,
443 expression="otherWellKnownObjects:1.2.840.113556.1.4.1941:=B:32:00000000000000000000000000000004:OU=o4,OU=o3,OU=o2,OU=o1,%s" % self.ou)
444 self.assertTrue(len(res1) == 0)
446 parser = optparse.OptionParser("match_rules.py [options] <host>")
447 sambaopts = options.SambaOptions(parser)
448 parser.add_option_group(sambaopts)
449 parser.add_option_group(options.VersionOptions(parser))
451 # use command line creds if available
452 credopts = options.CredentialsOptions(parser)
453 parser.add_option_group(credopts)
454 opts, args = parser.parse_args()
455 subunitopts = SubunitOptions(parser)
456 parser.add_option_group(subunitopts)
464 lp = sambaopts.get_loadparm()
465 creds = credopts.get_credentials(lp)
467 if not "://" in host:
468 if os.path.isfile(host):
469 host = "tdb://%s" % host
471 host = "ldap://%s" % host
473 TestProgram(module=__name__, opts=subunitopts)