3 # Unit tests for the notification control
4 # Copyright (C) Stefan Metzmacher 2016
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, "bin/python")
26 from samba.tests.subunitrun import SubunitOptions, TestProgram
28 import samba.getopt as options
30 from samba.auth import system_session
32 from samba.samdb import SamDB
33 from samba.ndr import ndr_unpack
34 from samba import gensec
35 from samba.credentials import Credentials
38 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
40 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
41 from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
42 from ldb import Message
44 parser = optparse.OptionParser("notification.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
48 # use command line creds if available
49 credopts = options.CredentialsOptions(parser)
50 parser.add_option_group(credopts)
51 subunitopts = SubunitOptions(parser)
52 parser.add_option_group(subunitopts)
53 opts, args = parser.parse_args()
61 lp = sambaopts.get_loadparm()
62 creds = credopts.get_credentials(lp)
63 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
65 class LDAPNotificationTest(samba.tests.TestCase):
68 super(samba.tests.TestCase, self).setUp()
69 self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
70 self.base_dn = self.ldb.domain_dn()
72 res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
73 self.assertEquals(len(res), 1)
75 self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
77 def test_simple_search(self):
78 """Testing a notification with an modify and a timeout"""
79 if not url.startswith("ldap"):
80 self.fail(msg="This test is only valid on ldap")
83 search1 = self.ldb.search_iterator(base=self.user_sid_dn,
84 expression="(objectClass=*)",
85 scope=ldb.SCOPE_SUBTREE,
86 attrs=["name", "objectGUID", "displayName"])
88 self.assertIsInstance(reply, ldb.Message)
89 self.assertIsNone(msg1)
91 res1 = search1.result()
93 search2 = self.ldb.search_iterator(base=self.base_dn,
94 expression="(objectClass=*)",
95 scope=ldb.SCOPE_SUBTREE,
96 attrs=["name", "objectGUID", "displayName"])
100 if isinstance(reply, str):
103 self.assertIsInstance(reply, ldb.Message)
104 if reply["objectGUID"][0] == msg1["objectGUID"][0]:
105 self.assertIsNone(msg2)
107 self.assertEqual(msg1.dn, msg2.dn)
108 self.assertEqual(len(msg1), len(msg2))
109 self.assertEqual(msg1["name"], msg2["name"])
110 #self.assertEqual(msg1["displayName"], msg2["displayName"])
111 res2 = search2.result()
113 self.ldb.modify_ldif("""
114 dn: """ + self.user_sid_dn + """
116 replace: otherLoginWorkstations
117 otherLoginWorkstations: BEFORE"
119 notify1 = self.ldb.search_iterator(base=self.base_dn,
120 expression="(objectClass=*)",
121 scope=ldb.SCOPE_SUBTREE,
122 attrs=["name", "objectGUID", "displayName"],
123 controls=["notification:1"],
126 self.ldb.modify_ldif("""
127 dn: """ + self.user_sid_dn + """
129 replace: otherLoginWorkstations
130 otherLoginWorkstations: AFTER"
134 for reply in notify1:
135 self.assertIsInstance(reply, ldb.Message)
136 if reply["objectGUID"][0] == msg1["objectGUID"][0]:
137 self.assertIsNone(msg3)
139 self.assertEqual(msg1.dn, msg3.dn)
140 self.assertEqual(len(msg1), len(msg3))
141 self.assertEqual(msg1["name"], msg3["name"])
142 #self.assertEqual(msg1["displayName"], msg3["displayName"])
144 res = notify1.result()
146 except LdbError, (num, _):
147 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
148 self.assertIsNotNone(msg3)
150 self.ldb.modify_ldif("""
151 dn: """ + self.user_sid_dn + """
153 delete: otherLoginWorkstations
156 def test_max_search(self):
157 """Testing the max allowed notifications"""
158 if not url.startswith("ldap"):
159 self.fail(msg="This test is only valid on ldap")
161 max_notifications = 5
163 notifies = [None] * (max_notifications + 1)
164 for i in xrange(0, max_notifications + 1):
165 notifies[i] = self.ldb.search_iterator(base=self.base_dn,
166 expression="(objectClass=*)",
167 scope=ldb.SCOPE_SUBTREE,
169 controls=["notification:1"],
173 for i in xrange(0, max_notifications + 1):
175 for msg in notifies[i]:
177 res = notifies[i].result()
179 except LdbError, (num, _):
180 if num == ERR_ADMIN_LIMIT_EXCEEDED:
183 if num == ERR_TIME_LIMIT_EXCEEDED:
187 self.assertEqual(num_admin_limit, 1)
188 self.assertEqual(num_time_limit, max_notifications)
190 def test_invalid_filter(self):
191 """Testing invalid filters for notifications"""
192 if not url.startswith("ldap"):
193 self.fail(msg="This test is only valid on ldap")
195 valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
197 for va in valid_attrs:
199 hnd = self.ldb.search_iterator(base=self.base_dn,
200 expression="(%s=*)" % va,
201 scope=ldb.SCOPE_SUBTREE,
203 controls=["notification:1"],
209 except LdbError, (num, _):
210 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
213 hnd = self.ldb.search_iterator(base=self.base_dn,
214 expression="(|(%s=*)(%s=value))" % (va, va),
215 scope=ldb.SCOPE_SUBTREE,
217 controls=["notification:1"],
223 except LdbError, (num, _):
224 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
227 hnd = self.ldb.search_iterator(base=self.base_dn,
228 expression="(&(%s=*)(%s=value))" % (va, va),
229 scope=ldb.SCOPE_SUBTREE,
231 controls=["notification:1"],
237 except LdbError, (num, _):
238 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
241 hnd = self.ldb.search_iterator(base=self.base_dn,
242 expression="(%s=value)" % va,
243 scope=ldb.SCOPE_SUBTREE,
245 controls=["notification:1"],
251 except LdbError, (num, _):
252 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
255 hnd = self.ldb.search_iterator(base=self.base_dn,
256 expression="(%s>=value)" % va,
257 scope=ldb.SCOPE_SUBTREE,
259 controls=["notification:1"],
265 except LdbError, (num, _):
266 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
269 hnd = self.ldb.search_iterator(base=self.base_dn,
270 expression="(%s<=value)" % va,
271 scope=ldb.SCOPE_SUBTREE,
273 controls=["notification:1"],
279 except LdbError, (num, _):
280 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
283 hnd = self.ldb.search_iterator(base=self.base_dn,
284 expression="(%s=*value*)" % va,
285 scope=ldb.SCOPE_SUBTREE,
287 controls=["notification:1"],
293 except LdbError, (num, _):
294 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
297 hnd = self.ldb.search_iterator(base=self.base_dn,
298 expression="(!(%s=*))" % va,
299 scope=ldb.SCOPE_SUBTREE,
301 controls=["notification:1"],
307 except LdbError, (num, _):
308 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
310 res = self.ldb.search(base=self.ldb.get_schema_basedn(),
311 expression="(objectClass=attributeSchema)",
312 scope=ldb.SCOPE_ONELEVEL,
313 attrs=["lDAPDisplayName"],
314 controls=["paged_results:1:2500"])
316 va = msg["lDAPDisplayName"][0]
317 if va in valid_attrs:
321 hnd = self.ldb.search_iterator(base=self.base_dn,
322 expression="(%s=*)" % va,
323 scope=ldb.SCOPE_SUBTREE,
325 controls=["notification:1"],
331 except LdbError, (num, _):
332 if num != ERR_UNWILLING_TO_PERFORM:
334 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
337 va = "noneAttributeName"
338 hnd = self.ldb.search_iterator(base=self.base_dn,
339 expression="(%s=*)" % va,
340 scope=ldb.SCOPE_SUBTREE,
342 controls=["notification:1"],
348 except LdbError, (num, _):
349 if num != ERR_UNWILLING_TO_PERFORM:
351 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
354 if os.path.isfile(url):
355 url = "tdb://%s" % url
357 url = "ldap://%s" % url
359 TestProgram(module=__name__, opts=subunitopts)