afc6343c30934ae6f869c9537e632f91f5474d1a
[metze/samba/wip.git] / source4 / dsdb / tests / python / notification.py
1 #!/usr/bin/env python
2 #
3 # Unit tests for the notification control
4 # Copyright (C) Stefan Metzmacher 2016
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import optparse
20 import sys
21 import os
22
23 sys.path.insert(0, "bin/python")
24 import samba
25
26 from samba.tests.subunitrun import SubunitOptions, TestProgram
27
28 import samba.getopt as options
29
30 from samba.auth import system_session
31 from samba import ldb
32 from samba.samdb import SamDB
33 from samba.ndr import ndr_unpack
34 from samba import gensec
35 from samba.credentials import Credentials
36 import samba.tests
37
38 from samba.auth import AUTH_SESSION_INFO_DEFAULT_GROUPS, AUTH_SESSION_INFO_AUTHENTICATED, AUTH_SESSION_INFO_SIMPLE_PRIVILEGES
39
40 from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE, LdbError
41 from ldb import ERR_TIME_LIMIT_EXCEEDED, ERR_ADMIN_LIMIT_EXCEEDED, ERR_UNWILLING_TO_PERFORM
42 from ldb import Message
43
44 parser = optparse.OptionParser("notification.py [options] <host>")
45 sambaopts = options.SambaOptions(parser)
46 parser.add_option_group(sambaopts)
47 parser.add_option_group(options.VersionOptions(parser))
48 # use command line creds if available
49 credopts = options.CredentialsOptions(parser)
50 parser.add_option_group(credopts)
51 subunitopts = SubunitOptions(parser)
52 parser.add_option_group(subunitopts)
53 opts, args = parser.parse_args()
54
55 if len(args) < 1:
56     parser.print_usage()
57     sys.exit(1)
58
59 url = args[0]
60
61 lp = sambaopts.get_loadparm()
62 creds = credopts.get_credentials(lp)
63 creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
64
65 class LDAPNotificationTest(samba.tests.TestCase):
66
67     def setUp(self):
68         super(samba.tests.TestCase, self).setUp()
69         self.ldb = SamDB(url, credentials=creds, session_info=system_session(lp), lp=lp)
70         self.base_dn = self.ldb.domain_dn()
71
72         res = self.ldb.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
73         self.assertEquals(len(res), 1)
74
75         self.user_sid_dn = "<SID=%s>" % str(ndr_unpack(samba.dcerpc.security.dom_sid, res[0]["tokenGroups"][0]))
76
77     def test_simple_search(self):
78         """Testing a notification with an modify and a timeout"""
79         if not url.startswith("ldap"):
80             self.fail(msg="This test is only valid on ldap")
81
82         msg1 = None
83         search1 = self.ldb.search_iterator(base=self.user_sid_dn,
84                                            expression="(objectClass=*)",
85                                            scope=ldb.SCOPE_SUBTREE,
86                                            attrs=["name", "objectGUID", "displayName"])
87         for reply in search1:
88             self.assertIsInstance(reply, ldb.Message)
89             self.assertIsNone(msg1)
90             msg1 = reply
91         res1 = search1.result()
92
93         search2 = self.ldb.search_iterator(base=self.base_dn,
94                                            expression="(objectClass=*)",
95                                            scope=ldb.SCOPE_SUBTREE,
96                                            attrs=["name", "objectGUID", "displayName"])
97         refs2 = 0
98         msg2 = None
99         for reply in search2:
100             if isinstance(reply, str):
101                 refs2 += 1
102                 continue
103             self.assertIsInstance(reply, ldb.Message)
104             if reply["objectGUID"][0] == msg1["objectGUID"][0]:
105                 self.assertIsNone(msg2)
106                 msg2 = reply
107                 self.assertEqual(msg1.dn, msg2.dn)
108                 self.assertEqual(len(msg1), len(msg2))
109                 self.assertEqual(msg1["name"], msg2["name"])
110                 #self.assertEqual(msg1["displayName"], msg2["displayName"])
111         res2 = search2.result()
112
113         self.ldb.modify_ldif("""
114 dn: """ + self.user_sid_dn + """
115 changetype: modify
116 replace: otherLoginWorkstations
117 otherLoginWorkstations: BEFORE"
118 """)
119         notify1 = self.ldb.search_iterator(base=self.base_dn,
120                                            expression="(objectClass=*)",
121                                            scope=ldb.SCOPE_SUBTREE,
122                                            attrs=["name", "objectGUID", "displayName"],
123                                            controls=["notification:1"],
124                                            timeout=1)
125
126         self.ldb.modify_ldif("""
127 dn: """ + self.user_sid_dn + """
128 changetype: modify
129 replace: otherLoginWorkstations
130 otherLoginWorkstations: AFTER"
131 """)
132
133         msg3 = None
134         for reply in notify1:
135             self.assertIsInstance(reply, ldb.Message)
136             if reply["objectGUID"][0] == msg1["objectGUID"][0]:
137                 self.assertIsNone(msg3)
138                 msg3 = reply
139                 self.assertEqual(msg1.dn, msg3.dn)
140                 self.assertEqual(len(msg1), len(msg3))
141                 self.assertEqual(msg1["name"], msg3["name"])
142                 #self.assertEqual(msg1["displayName"], msg3["displayName"])
143         try:
144             res = notify1.result()
145             self.fail()
146         except LdbError, (num, _):
147             self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
148         self.assertIsNotNone(msg3)
149
150         self.ldb.modify_ldif("""
151 dn: """ + self.user_sid_dn + """
152 changetype: delete
153 delete: otherLoginWorkstations
154 """)
155
156     def test_max_search(self):
157         """Testing the max allowed notifications"""
158         if not url.startswith("ldap"):
159             self.fail(msg="This test is only valid on ldap")
160
161         max_notifications = 5
162
163         notifies = [None] * (max_notifications + 1)
164         for i in xrange(0, max_notifications + 1):
165             notifies[i] = self.ldb.search_iterator(base=self.base_dn,
166                                                    expression="(objectClass=*)",
167                                                    scope=ldb.SCOPE_SUBTREE,
168                                                    attrs=["name"],
169                                                    controls=["notification:1"],
170                                                    timeout=1)
171         num_admin_limit = 0
172         num_time_limit = 0
173         for i in xrange(0, max_notifications + 1):
174             try:
175                 for msg in notifies[i]:
176                     continue
177                 res = notifies[i].result()
178                 self.fail()
179             except LdbError, (num, _):
180                 if num == ERR_ADMIN_LIMIT_EXCEEDED:
181                     num_admin_limit += 1
182                     continue
183                 if num == ERR_TIME_LIMIT_EXCEEDED:
184                     num_time_limit += 1
185                     continue
186                 raise
187         self.assertEqual(num_admin_limit, 1)
188         self.assertEqual(num_time_limit, max_notifications)
189
190     def test_invalid_filter(self):
191         """Testing invalid filters for notifications"""
192         if not url.startswith("ldap"):
193             self.fail(msg="This test is only valid on ldap")
194
195         valid_attrs = ["objectClass", "objectGUID", "distinguishedName", "name"]
196
197         for va in valid_attrs:
198             try:
199                 hnd = self.ldb.search_iterator(base=self.base_dn,
200                                                expression="(%s=*)" % va,
201                                                scope=ldb.SCOPE_SUBTREE,
202                                                attrs=["name"],
203                                                controls=["notification:1"],
204                                                timeout=1)
205                 for reply in hnd:
206                     self.fail()
207                 res = hnd.result()
208                 self.fail()
209             except LdbError, (num, _):
210                 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
211
212             try:
213                 hnd = self.ldb.search_iterator(base=self.base_dn,
214                                                expression="(|(%s=*)(%s=value))" % (va, va),
215                                                scope=ldb.SCOPE_SUBTREE,
216                                                attrs=["name"],
217                                                controls=["notification:1"],
218                                                timeout=1)
219                 for reply in hnd:
220                     self.fail()
221                 res = hnd.result()
222                 self.fail()
223             except LdbError, (num, _):
224                 self.assertEquals(num, ERR_TIME_LIMIT_EXCEEDED)
225
226             try:
227                 hnd = self.ldb.search_iterator(base=self.base_dn,
228                                                expression="(&(%s=*)(%s=value))" % (va, va),
229                                                scope=ldb.SCOPE_SUBTREE,
230                                                attrs=["name"],
231                                                controls=["notification:1"],
232                                                timeout=0)
233                 for reply in hnd:
234                     self.fail()
235                 res = hnd.result()
236                 self.fail()
237             except LdbError, (num, _):
238                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
239
240             try:
241                 hnd = self.ldb.search_iterator(base=self.base_dn,
242                                                expression="(%s=value)" % va,
243                                                scope=ldb.SCOPE_SUBTREE,
244                                                attrs=["name"],
245                                                controls=["notification:1"],
246                                                timeout=0)
247                 for reply in hnd:
248                     self.fail()
249                 res = hnd.result()
250                 self.fail()
251             except LdbError, (num, _):
252                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
253
254             try:
255                 hnd = self.ldb.search_iterator(base=self.base_dn,
256                                                expression="(%s>=value)" % va,
257                                                scope=ldb.SCOPE_SUBTREE,
258                                                attrs=["name"],
259                                                controls=["notification:1"],
260                                                timeout=0)
261                 for reply in hnd:
262                     self.fail()
263                 res = hnd.result()
264                 self.fail()
265             except LdbError, (num, _):
266                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
267
268             try:
269                 hnd = self.ldb.search_iterator(base=self.base_dn,
270                                                expression="(%s<=value)" % va,
271                                                scope=ldb.SCOPE_SUBTREE,
272                                                attrs=["name"],
273                                                controls=["notification:1"],
274                                                timeout=0)
275                 for reply in hnd:
276                     self.fail()
277                 res = hnd.result()
278                 self.fail()
279             except LdbError, (num, _):
280                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
281
282             try:
283                 hnd = self.ldb.search_iterator(base=self.base_dn,
284                                                expression="(%s=*value*)" % va,
285                                                scope=ldb.SCOPE_SUBTREE,
286                                                attrs=["name"],
287                                                controls=["notification:1"],
288                                                timeout=0)
289                 for reply in hnd:
290                     self.fail()
291                 res = hnd.result()
292                 self.fail()
293             except LdbError, (num, _):
294                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
295
296             try:
297                 hnd = self.ldb.search_iterator(base=self.base_dn,
298                                                expression="(!(%s=*))" % va,
299                                                scope=ldb.SCOPE_SUBTREE,
300                                                attrs=["name"],
301                                                controls=["notification:1"],
302                                                timeout=0)
303                 for reply in hnd:
304                     self.fail()
305                 res = hnd.result()
306                 self.fail()
307             except LdbError, (num, _):
308                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
309
310         res = self.ldb.search(base=self.ldb.get_schema_basedn(),
311                               expression="(objectClass=attributeSchema)",
312                               scope=ldb.SCOPE_ONELEVEL,
313                               attrs=["lDAPDisplayName"],
314                               controls=["paged_results:1:2500"])
315         for msg in res:
316             va = msg["lDAPDisplayName"][0]
317             if va in valid_attrs:
318                 continue
319
320             try:
321                 hnd = self.ldb.search_iterator(base=self.base_dn,
322                                                expression="(%s=*)" % va,
323                                                scope=ldb.SCOPE_SUBTREE,
324                                                attrs=["name"],
325                                                controls=["notification:1"],
326                                                timeout=0)
327                 for reply in hnd:
328                     self.fail()
329                 res = hnd.result()
330                 self.fail()
331             except LdbError, (num, _):
332                 if num != ERR_UNWILLING_TO_PERFORM:
333                     print "va[%s]" % va
334                 self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
335
336         try:
337             va = "noneAttributeName"
338             hnd = self.ldb.search_iterator(base=self.base_dn,
339                                            expression="(%s=*)" % va,
340                                            scope=ldb.SCOPE_SUBTREE,
341                                            attrs=["name"],
342                                            controls=["notification:1"],
343                                            timeout=0)
344             for reply in hnd:
345                 self.fail()
346             res = hnd.result()
347             self.fail()
348         except LdbError, (num, _):
349             if num != ERR_UNWILLING_TO_PERFORM:
350                 print "va[%s]" % va
351             self.assertEquals(num, ERR_UNWILLING_TO_PERFORM)
352
353 if not "://" in url:
354     if os.path.isfile(url):
355         url = "tdb://%s" % url
356     else:
357         url = "ldap://%s" % url
358
359 TestProgram(module=__name__, opts=subunitopts)