CVE-2019-12436 dsdb/paged_results: ignore successful results without messages
[samba.git] / source4 / dsdb / tests / python / vlv.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import base64
9 import random
10 import re
11
12 sys.path.insert(0, "bin/python")
13 import samba
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 import ldb
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
23
24 import time
25
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
35
36 parser.add_option('--elements', type='int', default=20,
37                   help="use this many elements in the tests")
38
39 parser.add_option('--delete-in-setup', action='store_true',
40                   help="cleanup in next setup rather than teardown")
41
42 parser.add_option('--skip-attr-regex',
43                   help="ignore attributes matching this regex")
44
45 opts, args = parser.parse_args()
46
47 if len(args) < 1:
48     parser.print_usage()
49     sys.exit(1)
50
51 host = args[0]
52
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
55
56 N_ELEMENTS = opts.elements
57
58
59 class VlvTestException(Exception):
60     pass
61
62
63 def encode_vlv_control(critical=1,
64                        before=0, after=0,
65                        offset=None,
66                        gte=None,
67                        n=0, cookie=None):
68
69     s = "vlv:%d:%d:%d:" % (critical, before, after)
70
71     if offset is not None:
72         m = "%d:%d" % (offset, n)
73     elif b':' in gte or b'\x00' in gte:
74         gte = get_string(base64.b64encode(gte))
75         m = "base64>=%s" % gte
76     else:
77         m = ">=%s" % get_string(gte)
78
79     if cookie is None:
80         return s + m
81
82     return s + m + ':' + cookie
83
84
85 def get_cookie(controls, expected_n=None):
86     """Get the cookie, STILL base64 encoded, or raise ValueError."""
87     for c in list(controls):
88         cstr = str(c)
89         if cstr.startswith('vlv_resp'):
90             head, n, _, cookie = cstr.rsplit(':', 3)
91             if expected_n is not None and int(n) != expected_n:
92                 raise ValueError("Expected %s items, server said %s" %
93                                  (expected_n, n))
94             return cookie
95     raise ValueError("there is no VLV response")
96
97
98 class TestsWithUserOU(samba.tests.TestCase):
99
100     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101         name = "%s%d%s" % (prefix, i, suffix)
102         user = {
103             'cn': name,
104             "objectclass": "user",
105             'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106             "roomNumber": "%sbc" % (n - i),
107             "carLicense": "后来经",
108             "facsimileTelephoneNumber": name,
109             "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
110             "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
111             "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
112             "flags": str(i * (n - i)),
113             "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
114                                             ' 3z}'[i & 3],
115                                             '"@'[i & 1],),
116         }
117
118         # _user_broken_attrs tests are broken due to problems outside
119         # of VLV.
120         _user_broken_attrs = {
121             # Sort doesn't look past a NUL byte.
122             "photo": "\x00%d" % (n - i),
123             "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
124                                                              chr(i & 255), i),
125             "displayNamePrintable": "%d\x00%c" % (i, i & 255),
126             "adminDisplayName": "%d\x00b" % (n - i),
127             "title": "%d%sb" % (n - i, '\x00' * i),
128             "comment": "Favourite colour is %d" % (n % (i + 1)),
129
130             # Names that vary only in case. Windows returns
131             # equivalent addresses in the order they were put
132             # in ('a st', 'A st',...).
133             "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
134         }
135
136         if attrs is not None:
137             user.update(attrs)
138
139         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
140
141         if opts.skip_attr_regex:
142             match = re.compile(opts.skip_attr_regex).search
143             for k in user.keys():
144                 if match(k):
145                     del user[k]
146
147         self.users.append(user)
148         self.ldb.add(user)
149         return user
150
151     def setUp(self):
152         super(TestsWithUserOU, self).setUp()
153         self.ldb = SamDB(host, credentials=creds,
154                          session_info=system_session(lp), lp=lp)
155
156         self.base_dn = self.ldb.domain_dn()
157         self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn
158         self.ou = "ou=vlvou,%s" % self.tree_dn
159         if opts.delete_in_setup:
160             try:
161                 self.ldb.delete(self.tree_dn, ['tree_delete:1'])
162             except ldb.LdbError as e:
163                 print("tried deleting %s, got error %s" % (self.tree_dn, e))
164         self.ldb.add({
165             "dn": self.tree_dn,
166             "objectclass": "organizationalUnit"})
167         self.ldb.add({
168             "dn": self.ou,
169             "objectclass": "organizationalUnit"})
170
171         self.users = []
172         for i in range(N_ELEMENTS):
173             self.create_user(i, N_ELEMENTS)
174
175         attrs = self.users[0].keys()
176         self.binary_sorted_keys = ['audio',
177                                    'photo',
178                                    "msTSExpireDate4",
179                                    'serialNumber',
180                                    "displayNamePrintable"]
181
182         self.numeric_sorted_keys = ['flags',
183                                     'accountExpires']
184
185         self.timestamp_keys = ['msTSExpireDate4']
186
187         self.int64_keys = set(['accountExpires'])
188
189         self.locale_sorted_keys = [x for x in attrs if
190                                    x not in (self.binary_sorted_keys +
191                                              self.numeric_sorted_keys)]
192
193         # don't try spaces, etc in cn
194         self.delicate_keys = ['cn']
195
196     def tearDown(self):
197         super(TestsWithUserOU, self).tearDown()
198         if not opts.delete_in_setup:
199             self.ldb.delete(self.tree_dn, ['tree_delete:1'])
200
201
202 class VLVTests(TestsWithUserOU):
203
204     def get_full_list(self, attr, include_cn=False):
205         """Fetch the whole list sorted on the attribute, using the VLV.
206         This way you get a VLV cookie."""
207         n_users = len(self.users)
208         sort_control = "server_sort:1:0:%s" % attr
209         half_n = n_users // 2
210         vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
211         attrs = [attr]
212         if include_cn:
213             attrs.append('cn')
214         res = self.ldb.search(self.ou,
215                               scope=ldb.SCOPE_ONELEVEL,
216                               attrs=attrs,
217                               controls=[sort_control,
218                                         vlv_search])
219         if include_cn:
220             full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
221         else:
222             full_results = [str(x[attr][0]).lower() for x in res]
223         controls = res.controls
224         return full_results, controls, sort_control
225
226     def get_expected_order(self, attr, expression=None):
227         """Fetch the whole list sorted on the attribute, using sort only."""
228         sort_control = "server_sort:1:0:%s" % attr
229         res = self.ldb.search(self.ou,
230                               scope=ldb.SCOPE_ONELEVEL,
231                               expression=expression,
232                               attrs=[attr],
233                               controls=[sort_control])
234         results = [x[attr][0] for x in res]
235         return results
236
237     def delete_user(self, user):
238         self.ldb.delete(user['dn'])
239         del self.users[self.users.index(user)]
240
241     def get_gte_tests_and_order(self, attr, expression=None):
242         expected_order = self.get_expected_order(attr, expression=expression)
243         gte_users = []
244         if attr in self.delicate_keys:
245             gte_keys = [
246                 '3',
247                 'abc',
248                 '¹',
249                 'ŋđ¼³ŧ“«đð',
250                 '桑巴',
251             ]
252         elif attr in self.timestamp_keys:
253             gte_keys = [
254                 '18560101010000.0Z',
255                 '19140103010000.0Z',
256                 '19560101010010.0Z',
257                 '19700101000000.0Z',
258                 '19991231211234.3Z',
259                 '20061111211234.0Z',
260                 '20390901041234.0Z',
261                 '25560101010000.0Z',
262             ]
263         elif attr not in self.numeric_sorted_keys:
264             gte_keys = [
265                 '3',
266                 'abc',
267                 ' ',
268                 '!@#!@#!',
269                 'kōkako',
270                 '¹',
271                 'ŋđ¼³ŧ“«đð',
272                 '\n\t\t',
273                 '桑巴',
274                 'zzzz',
275             ]
276             if expected_order:
277                 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
278
279         else:
280             # "numeric" means positive integers
281             # doesn't work with -1, 3.14, ' 3', '9' * 20
282             gte_keys = ['3',
283                         '1' * 10,
284                         '1',
285                         '9' * 7,
286                         '0']
287
288             if attr in self.int64_keys:
289                 gte_keys += ['3' * 12, '71' * 8]
290
291         for i, x in enumerate(gte_keys):
292             user = self.create_user(i, N_ELEMENTS,
293                                     prefix='gte',
294                                     attrs={attr: x})
295             gte_users.append(user)
296
297         gte_order = self.get_expected_order(attr)
298         for user in gte_users:
299             self.delete_user(user)
300
301         # for sanity's sake
302         expected_order_2 = self.get_expected_order(attr, expression=expression)
303         self.assertEqual(expected_order, expected_order_2)
304
305         # Map gte tests to indexes in expected order. This will break
306         # if gte_order and expected_order are differently ordered (as
307         # it should).
308         gte_map = {}
309
310         # index to the first one with each value
311         index_map = {}
312         for i, k in enumerate(expected_order):
313             if k not in index_map:
314                 index_map[k] = i
315
316         keys = []
317         for k in gte_order:
318             if k in index_map:
319                 i = index_map[k]
320                 gte_map[k] = i
321                 for k in keys:
322                     gte_map[k] = i
323                 keys = []
324             else:
325                 keys.append(k)
326
327         for k in keys:
328             gte_map[k] = len(expected_order)
329
330         if False:
331             print("gte_map:")
332             for k in gte_order:
333                 print("   %10s => %10s" % (k, gte_map[k]))
334
335         return gte_order, expected_order, gte_map
336
337     def assertCorrectResults(self, results, expected_order,
338                              offset, before, after):
339         """A helper to calculate offsets correctly and say as much as possible
340         when something goes wrong."""
341
342         start = max(offset - before - 1, 0)
343         end = offset + after
344         expected_results = expected_order[start: end]
345
346         # if it is a tuple with the cn, drop the cn
347         if expected_results and isinstance(expected_results[0], tuple):
348             expected_results = [x[0] for x in expected_results]
349
350         if expected_results == results:
351             return
352
353         if expected_order is not None:
354             print("expected order: %s" % expected_order[:20])
355             if len(expected_order) > 20:
356                 print("... and %d more not shown" % (len(expected_order) - 20))
357
358         print("offset %d before %d after %d" % (offset, before, after))
359         print("start %d end %d" % (start, end))
360         print("expected: %s" % expected_results)
361         print("got     : %s" % results)
362         self.assertEquals(expected_results, results)
363
364     def test_server_vlv_with_cookie(self):
365         attrs = [x for x in self.users[0].keys() if x not in
366                  ('dn', 'objectclass')]
367         for attr in attrs:
368             expected_order = self.get_expected_order(attr)
369             sort_control = "server_sort:1:0:%s" % attr
370             res = None
371             n = len(self.users)
372             for before in [10, 0, 3, 1, 4, 5, 2]:
373                 for after in [0, 3, 1, 4, 5, 2, 7]:
374                     for offset in range(max(1, before - 2),
375                                         min(n - after + 2, n)):
376                         if res is None:
377                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
378                                                                offset)
379                         else:
380                             cookie = get_cookie(res.controls, n)
381                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
382                                           (before, after, offset, n,
383                                            cookie))
384
385                         res = self.ldb.search(self.ou,
386                                               scope=ldb.SCOPE_ONELEVEL,
387                                               attrs=[attr],
388                                               controls=[sort_control,
389                                                         vlv_search])
390
391                         results = [x[attr][0] for x in res]
392
393                         self.assertCorrectResults(results, expected_order,
394                                                   offset, before, after)
395
396     def run_index_tests_with_expressions(self, expressions):
397         # Here we don't test every before/after combination.
398         attrs = [x for x in self.users[0].keys() if x not in
399                  ('dn', 'objectclass')]
400         for attr in attrs:
401             for expression in expressions:
402                 expected_order = self.get_expected_order(attr, expression)
403                 sort_control = "server_sort:1:0:%s" % attr
404                 res = None
405                 n = len(expected_order)
406                 for before in range(0, 11):
407                     after = before
408                     for offset in range(max(1, before - 2),
409                                         min(n - after + 2, n)):
410                         if res is None:
411                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
412                                                                offset)
413                         else:
414                             cookie = get_cookie(res.controls)
415                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
416                                           (before, after, offset, n,
417                                            cookie))
418
419                         res = self.ldb.search(self.ou,
420                                               expression=expression,
421                                               scope=ldb.SCOPE_ONELEVEL,
422                                               attrs=[attr],
423                                               controls=[sort_control,
424                                                         vlv_search])
425
426                         results = [x[attr][0] for x in res]
427
428                         self.assertCorrectResults(results, expected_order,
429                                                   offset, before, after)
430
431     def test_server_vlv_with_expression(self):
432         """What happens when we run the VLV with an expression?"""
433         expressions = ["(objectClass=*)",
434                        "(cn=%s)" % self.users[-1]['cn'],
435                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
436                        ]
437         self.run_index_tests_with_expressions(expressions)
438
439     def test_server_vlv_with_failing_expression(self):
440         """What happens when we run the VLV on an expression that matches
441         nothing?"""
442         expressions = ["(samaccountname=testferf)",
443                        "(cn=hefalump)",
444                        ]
445         self.run_index_tests_with_expressions(expressions)
446
447     def run_gte_tests_with_expressions(self, expressions):
448         # Here we don't test every before/after combination.
449         attrs = [x for x in self.users[0].keys() if x not in
450                  ('dn', 'objectclass')]
451         for expression in expressions:
452             for attr in attrs:
453                 gte_order, expected_order, gte_map = \
454                     self.get_gte_tests_and_order(attr, expression)
455                 # In case there is some order dependency, disorder tests
456                 gte_tests = gte_order[:]
457                 random.seed(2)
458                 random.shuffle(gte_tests)
459                 res = None
460                 sort_control = "server_sort:1:0:%s" % attr
461
462                 expected_order = self.get_expected_order(attr, expression)
463                 sort_control = "server_sort:1:0:%s" % attr
464                 res = None
465                 for before in range(0, 11):
466                     after = before
467                     for gte in gte_tests:
468                         if res is not None:
469                             cookie = get_cookie(res.controls)
470                         else:
471                             cookie = None
472                         vlv_search = encode_vlv_control(before=before,
473                                                         after=after,
474                                                         gte=get_bytes(gte),
475                                                         cookie=cookie)
476
477                         res = self.ldb.search(self.ou,
478                                               scope=ldb.SCOPE_ONELEVEL,
479                                               expression=expression,
480                                               attrs=[attr],
481                                               controls=[sort_control,
482                                                         vlv_search])
483
484                         results = [x[attr][0] for x in res]
485                         offset = gte_map.get(gte, len(expected_order))
486
487                         # here offset is 0-based
488                         start = max(offset - before, 0)
489                         end = offset + 1 + after
490
491                         expected_results = expected_order[start: end]
492
493                         self.assertEquals(expected_results, results)
494
495     def test_vlv_gte_with_expression(self):
496         """What happens when we run the VLV with an expression?"""
497         expressions = ["(objectClass=*)",
498                        "(cn=%s)" % self.users[-1]['cn'],
499                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
500                        ]
501         self.run_gte_tests_with_expressions(expressions)
502
503     def test_vlv_gte_with_failing_expression(self):
504         """What happens when we run the VLV on an expression that matches
505         nothing?"""
506         expressions = ["(samaccountname=testferf)",
507                        "(cn=hefalump)",
508                        ]
509         self.run_gte_tests_with_expressions(expressions)
510
511     def test_server_vlv_with_cookie_while_adding_and_deleting(self):
512         """What happens if we add or remove items in the middle of the VLV?
513
514         Nothing. The search and the sort is not repeated, and we only
515         deal with the objects originally found.
516         """
517         attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
518                           ('dn', 'objectclass')]
519         user_number = 0
520         iteration = 0
521         for attr in attrs:
522             full_results, controls, sort_control = \
523                             self.get_full_list(attr, True)
524             original_n = len(self.users)
525
526             expected_order = full_results
527             random.seed(1)
528
529             for before in list(range(0, 3)) + [6, 11, 19]:
530                 for after in list(range(0, 3)) + [6, 11, 19]:
531                     start = max(before - 1, 1)
532                     end = max(start + 4, original_n - after + 2)
533                     for offset in range(start, end):
534                         # if iteration > 2076:
535                         #    return
536                         cookie = get_cookie(controls, original_n)
537                         vlv_search = encode_vlv_control(before=before,
538                                                         after=after,
539                                                         offset=offset,
540                                                         n=original_n,
541                                                         cookie=cookie)
542
543                         iteration += 1
544                         res = self.ldb.search(self.ou,
545                                               scope=ldb.SCOPE_ONELEVEL,
546                                               attrs=[attr],
547                                               controls=[sort_control,
548                                                         vlv_search])
549
550                         controls = res.controls
551                         results = [x[attr][0] for x in res]
552                         real_offset = max(1, min(offset, len(expected_order)))
553
554                         expected_results = []
555                         skipped = 0
556                         begin_offset = max(real_offset - before - 1, 0)
557                         real_before = min(before, real_offset - 1)
558                         real_after = min(after,
559                                          len(expected_order) - real_offset)
560
561                         for x in expected_order[begin_offset:]:
562                             if x is not None:
563                                 expected_results.append(get_bytes(x[0]))
564                                 if (len(expected_results) ==
565                                     real_before + real_after + 1):
566                                     break
567                             else:
568                                 skipped += 1
569
570                         if expected_results != results:
571                             print("attr %s before %d after %d offset %d" %
572                                   (attr, before, after, offset))
573                         self.assertEquals(expected_results, results)
574
575                         n = len(self.users)
576                         if random.random() < 0.1 + (n < 5) * 0.05:
577                             if n == 0:
578                                 i = 0
579                             else:
580                                 i = random.randrange(n)
581                             user = self.create_user(i, n, suffix='-%s' %
582                                                     user_number)
583                             user_number += 1
584                         if random.random() < 0.1  + (n > 50) * 0.02 and n:
585                             index = random.randrange(n)
586                             user = self.users.pop(index)
587
588                             self.ldb.delete(user['dn'])
589
590                             replaced = (user[attr], user['cn'])
591                             if replaced in expected_order:
592                                 i = expected_order.index(replaced)
593                                 expected_order[i] = None
594
595     def test_server_vlv_with_cookie_while_changing(self):
596         """What happens if we modify items in the middle of the VLV?
597
598         The expected behaviour (as found on Windows) is the sort is
599         not repeated, but the changes in attributes are reflected.
600         """
601         attrs = [x for x in self.users[0].keys() if x not in
602                  ('dn', 'objectclass', 'cn')]
603         for attr in attrs:
604             n_users = len(self.users)
605             expected_order = [x.upper() for x in self.get_expected_order(attr)]
606             sort_control = "server_sort:1:0:%s" % attr
607             res = None
608             i = 0
609
610             # First we'll fetch the whole list so we know the original
611             # sort order. This is necessary because we don't know how
612             # the server will order equivalent items. We are using the
613             # dn as a key.
614             half_n = n_users // 2
615             vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
616             res = self.ldb.search(self.ou,
617                                   scope=ldb.SCOPE_ONELEVEL,
618                                   attrs=['dn', attr],
619                                   controls=[sort_control, vlv_search])
620
621             results = [x[attr][0].upper() for x in res]
622             #self.assertEquals(expected_order, results)
623
624             dn_order = [str(x['dn']) for x in res]
625             values = results[:]
626
627             for before in range(0, 3):
628                 for after in range(0, 3):
629                     for offset in range(1 + before, n_users - after):
630                         cookie = get_cookie(res.controls, len(self.users))
631                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
632                                       (before, after, offset, len(self.users),
633                                        cookie))
634
635                         res = self.ldb.search(self.ou,
636                                               scope=ldb.SCOPE_ONELEVEL,
637                                               attrs=['dn', attr],
638                                               controls=[sort_control,
639                                                         vlv_search])
640
641                         dn_results = [str(x['dn']) for x in res]
642                         dn_expected = dn_order[offset - before - 1:
643                                                offset + after]
644
645                         self.assertEquals(dn_expected, dn_results)
646
647                         results = [x[attr][0].upper() for x in res]
648
649                         self.assertCorrectResults(results, values,
650                                                   offset, before, after)
651
652                         i += 1
653                         if i % 3 == 2:
654                             if (attr in self.locale_sorted_keys or
655                                 attr in self.binary_sorted_keys):
656                                 i1 = i % n_users
657                                 i2 = (i ^ 255) % n_users
658                                 dn1 = dn_order[i1]
659                                 dn2 = dn_order[i2]
660                                 v2 = values[i2]
661
662                                 if v2 in self.locale_sorted_keys:
663                                     v2 += '-%d' % i
664                                 cn1 = dn1.split(',', 1)[0][3:]
665                                 cn2 = dn2.split(',', 1)[0][3:]
666
667                                 values[i1] = v2
668
669                                 m = ldb.Message()
670                                 m.dn = ldb.Dn(self.ldb, dn1)
671                                 m[attr] = ldb.MessageElement(v2,
672                                                              ldb.FLAG_MOD_REPLACE,
673                                                              attr)
674
675                                 self.ldb.modify(m)
676
677     def test_server_vlv_fractions_with_cookie(self):
678         """What happens when the count is set to an arbitrary number?
679
680         In that case the offset and the count form a fraction, and the
681         VLV should be centred at a point offset/count of the way
682         through. For example, if offset is 3 and count is 6, the VLV
683         should be looking around halfway. The actual algorithm is a
684         bit fiddlier than that, because of the one-basedness of VLV.
685         """
686         attrs = [x for x in self.users[0].keys() if x not in
687                  ('dn', 'objectclass')]
688
689         n_users = len(self.users)
690
691         random.seed(4)
692
693         for attr in attrs:
694             full_results, controls, sort_control = self.get_full_list(attr)
695             self.assertEqual(len(full_results), n_users)
696             for before in range(0, 2):
697                 for after in range(0, 2):
698                     for denominator in range(1, 20):
699                         for offset in range(1, denominator + 3):
700                             cookie = get_cookie(controls, len(self.users))
701                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
702                                           (before, after, offset,
703                                            denominator,
704                                            cookie))
705                             try:
706                                 res = self.ldb.search(self.ou,
707                                                       scope=ldb.SCOPE_ONELEVEL,
708                                                       attrs=[attr],
709                                                       controls=[sort_control,
710                                                                 vlv_search])
711                             except ldb.LdbError as e:
712                                 if offset != 0:
713                                     raise
714                                 print("offset %d denominator %d raised error "
715                                       "expected error %s\n"
716                                       "(offset zero is illegal unless "
717                                       "content count is zero)" %
718                                       (offset, denominator, e))
719                                 continue
720
721                             results = [str(x[attr][0]).lower() for x in res]
722
723                             if denominator == 0:
724                                 denominator = n_users
725                                 if offset == 0:
726                                     offset = denominator
727                             elif denominator == 1:
728                                 # the offset can only be 1, but the 1/1 case
729                                 # means something special
730                                 if offset == 1:
731                                     real_offset = n_users
732                                 else:
733                                     real_offset = 1
734                             else:
735                                 if offset > denominator:
736                                     offset = denominator
737                                 real_offset = (1 +
738                                                int(round((n_users - 1) *
739                                                          (offset - 1) /
740                                                          (denominator - 1.0)))
741                                                )
742
743                             self.assertCorrectResults(results, full_results,
744                                                       real_offset, before,
745                                                       after)
746
747                             controls = res.controls
748                             if False:
749                                 for c in list(controls):
750                                     cstr = str(c)
751                                     if cstr.startswith('vlv_resp'):
752                                         bits = cstr.rsplit(':')
753                                         print("the answer is %s; we said %d" %
754                                               (bits[2], real_offset))
755                                         break
756
757     def test_server_vlv_no_cookie(self):
758         attrs = [x for x in self.users[0].keys() if x not in
759                  ('dn', 'objectclass')]
760
761         for attr in attrs:
762             expected_order = self.get_expected_order(attr)
763             sort_control = "server_sort:1:0:%s" % attr
764             for before in range(0, 5):
765                 for after in range(0, 7):
766                     for offset in range(1 + before, len(self.users) - after):
767                         res = self.ldb.search(self.ou,
768                                               scope=ldb.SCOPE_ONELEVEL,
769                                               attrs=[attr],
770                                               controls=[sort_control,
771                                                         "vlv:1:%d:%d:%d:0" %
772                                                         (before, after,
773                                                          offset)])
774                         results = [x[attr][0] for x in res]
775                         self.assertCorrectResults(results, expected_order,
776                                                   offset, before, after)
777
778     def get_expected_order_showing_deleted(self, attr,
779                                            expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
780                                            base=None,
781                                            scope=ldb.SCOPE_SUBTREE
782                                            ):
783         """Fetch the whole list sorted on the attribute, using sort only,
784         searching in the entire tree, not just our OU. This is the
785         way to find deleted objects.
786         """
787         if base is None:
788             base = self.base_dn
789         sort_control = "server_sort:1:0:%s" % attr
790         controls = [sort_control, "show_deleted:1"]
791
792         res = self.ldb.search(base,
793                               scope=scope,
794                               expression=expression,
795                               attrs=[attr],
796                               controls=controls)
797         results = [x[attr][0] for x in res]
798         return results
799
800     def add_deleted_users(self, n):
801         deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
802                          for i in range(n)]
803
804         for user in deleted_users:
805             self.delete_user(user)
806
807     def test_server_vlv_no_cookie_show_deleted(self):
808         """What do we see with the show_deleted control?"""
809         attrs = ['objectGUID',
810                  'cn',
811                  'sAMAccountName',
812                  'objectSid',
813                  'name',
814                  'whenChanged',
815                  'usnChanged'
816                  ]
817
818         # add some deleted users first, just in case there are none
819         self.add_deleted_users(6)
820         random.seed(22)
821         expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
822
823         for attr in attrs:
824             show_deleted_control = "show_deleted:1"
825             expected_order = self.get_expected_order_showing_deleted(attr,
826                                                                      expression)
827             n = len(expected_order)
828             sort_control = "server_sort:1:0:%s" % attr
829             for before in [3, 1, 0]:
830                 for after in [0, 2]:
831                     # don't test every position, because there could be hundreds.
832                     # jump back and forth instead
833                     for i in range(20):
834                         offset = random.randrange(max(1, before - 2),
835                                                   min(n - after + 2, n))
836                         res = self.ldb.search(self.base_dn,
837                                               expression=expression,
838                                               scope=ldb.SCOPE_SUBTREE,
839                                               attrs=[attr],
840                                               controls=[sort_control,
841                                                         show_deleted_control,
842                                                         "vlv:1:%d:%d:%d:0" %
843                                                         (before, after,
844                                                          offset)
845                                                         ]
846                                               )
847                         results = [x[attr][0] for x in res]
848                         self.assertCorrectResults(results, expected_order,
849                                                   offset, before, after)
850
851     def test_server_vlv_no_cookie_show_deleted_only(self):
852         """What do we see with the show_deleted control when we're not looking
853         at any non-deleted things"""
854         attrs = ['objectGUID',
855                  'cn',
856                  'sAMAccountName',
857                  'objectSid',
858                  'whenChanged',
859                  ]
860
861         # add some deleted users first, just in case there are none
862         self.add_deleted_users(4)
863         base = 'CN=Deleted Objects,%s' % self.base_dn
864         expression = "(cn=vlv-deleted*)"
865         for attr in attrs:
866             show_deleted_control = "show_deleted:1"
867             expected_order = self.get_expected_order_showing_deleted(attr,
868                                                                      expression=expression,
869                                                                      base=base,
870                                                                      scope=ldb.SCOPE_ONELEVEL)
871             print("searching for attr %s amongst %d deleted objects" %
872                   (attr, len(expected_order)))
873             sort_control = "server_sort:1:0:%s" % attr
874             step = max(len(expected_order) // 10, 1)
875             for before in [3, 0]:
876                 for after in [0, 2]:
877                     for offset in range(1 + before,
878                                         len(expected_order) - after,
879                                         step):
880                         res = self.ldb.search(base,
881                                               expression=expression,
882                                               scope=ldb.SCOPE_ONELEVEL,
883                                               attrs=[attr],
884                                               controls=[sort_control,
885                                                         show_deleted_control,
886                                                         "vlv:1:%d:%d:%d:0" %
887                                                         (before, after,
888                                                          offset)])
889                         results = [x[attr][0] for x in res]
890                         self.assertCorrectResults(results, expected_order,
891                                                   offset, before, after)
892
893     def test_server_vlv_with_cookie_show_deleted(self):
894         """What do we see with the show_deleted control?"""
895         attrs = ['objectGUID',
896                  'cn',
897                  'sAMAccountName',
898                  'objectSid',
899                  'name',
900                  'whenChanged',
901                  'usnChanged'
902                  ]
903         self.add_deleted_users(6)
904         random.seed(23)
905         for attr in attrs:
906             expected_order = self.get_expected_order(attr)
907             sort_control = "server_sort:1:0:%s" % attr
908             res = None
909             show_deleted_control = "show_deleted:1"
910             expected_order = self.get_expected_order_showing_deleted(attr)
911             n = len(expected_order)
912             expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
913             for before in [3, 2, 1, 0]:
914                 after = before
915                 for i in range(20):
916                     offset = random.randrange(max(1, before - 2),
917                                               min(n - after + 2, n))
918                     if res is None:
919                         vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
920                                                            offset)
921                     else:
922                         cookie = get_cookie(res.controls, n)
923                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
924                                       (before, after, offset, n,
925                                        cookie))
926
927                     res = self.ldb.search(self.base_dn,
928                                           expression=expression,
929                                           scope=ldb.SCOPE_SUBTREE,
930                                           attrs=[attr],
931                                           controls=[sort_control,
932                                                     vlv_search,
933                                                     show_deleted_control])
934
935                     results = [x[attr][0] for x in res]
936
937                     self.assertCorrectResults(results, expected_order,
938                                               offset, before, after)
939
940     def test_server_vlv_gte_with_cookie(self):
941         attrs = [x for x in self.users[0].keys() if x not in
942                  ('dn', 'objectclass')]
943         for attr in attrs:
944             gte_order, expected_order, gte_map = \
945                                         self.get_gte_tests_and_order(attr)
946             # In case there is some order dependency, disorder tests
947             gte_tests = gte_order[:]
948             random.seed(1)
949             random.shuffle(gte_tests)
950             res = None
951             sort_control = "server_sort:1:0:%s" % attr
952             for before in [0, 1, 2, 4]:
953                 for after in [0, 1, 3, 6]:
954                     for gte in gte_tests:
955                         if res is not None:
956                             cookie = get_cookie(res.controls, len(self.users))
957                         else:
958                             cookie = None
959                         vlv_search = encode_vlv_control(before=before,
960                                                         after=after,
961                                                         gte=get_bytes(gte),
962                                                         cookie=cookie)
963
964                         res = self.ldb.search(self.ou,
965                                               scope=ldb.SCOPE_ONELEVEL,
966                                               attrs=[attr],
967                                               controls=[sort_control,
968                                                         vlv_search])
969
970                         results = [x[attr][0] for x in res]
971                         offset = gte_map.get(gte, len(expected_order))
972
973                         # here offset is 0-based
974                         start = max(offset - before, 0)
975                         end = offset + 1 + after
976
977                         expected_results = expected_order[start: end]
978
979                         self.assertEquals(expected_results, results)
980
981     def test_server_vlv_gte_no_cookie(self):
982         attrs = [x for x in self.users[0].keys() if x not in
983                  ('dn', 'objectclass')]
984         iteration = 0
985         for attr in attrs:
986             gte_order, expected_order, gte_map = \
987                                         self.get_gte_tests_and_order(attr)
988             # In case there is some order dependency, disorder tests
989             gte_tests = gte_order[:]
990             random.seed(1)
991             random.shuffle(gte_tests)
992
993             sort_control = "server_sort:1:0:%s" % attr
994             for before in [0, 1, 3]:
995                 for after in [0, 4]:
996                     for gte in gte_tests:
997                         vlv_search = encode_vlv_control(before=before,
998                                                         after=after,
999                                                         gte=get_bytes(gte))
1000
1001                         res = self.ldb.search(self.ou,
1002                                               scope=ldb.SCOPE_ONELEVEL,
1003                                               attrs=[attr],
1004                                               controls=[sort_control,
1005                                                         vlv_search])
1006                         results = [x[attr][0] for x in res]
1007
1008                         # here offset is 0-based
1009                         offset = gte_map.get(gte, len(expected_order))
1010                         start = max(offset - before, 0)
1011                         end = offset + after + 1
1012                         expected_results = expected_order[start: end]
1013                         iteration += 1
1014                         if expected_results != results:
1015                             middle = expected_order[len(expected_order) // 2]
1016                             print(expected_results, results)
1017                             print(middle)
1018                             print(expected_order)
1019                             print()
1020                             print("\nattr %s offset %d before %d "
1021                                   "after %d gte %s" %
1022                                   (attr, offset, before, after, gte))
1023                         self.assertEquals(expected_results, results)
1024
1025     def test_multiple_searches(self):
1026         """The maximum number of concurrent vlv searches per connection is
1027         currently set at 3. That means if you open 4 VLV searches the
1028         cookie on the first one should fail.
1029         """
1030         # Windows has a limit of 10 VLVs where there are low numbers
1031         # of objects in each search.
1032         attrs = ([x for x in self.users[0].keys() if x not in
1033                   ('dn', 'objectclass')] * 2)[:12]
1034
1035         vlv_cookies = []
1036         for attr in attrs:
1037             sort_control = "server_sort:1:0:%s" % attr
1038
1039             res = self.ldb.search(self.ou,
1040                                   scope=ldb.SCOPE_ONELEVEL,
1041                                   attrs=[attr],
1042                                   controls=[sort_control,
1043                                             "vlv:1:1:1:1:0"])
1044
1045             cookie = get_cookie(res.controls, len(self.users))
1046             vlv_cookies.append(cookie)
1047             time.sleep(0.2)
1048
1049         # now this one should fail
1050         self.assertRaises(ldb.LdbError,
1051                           self.ldb.search,
1052                           self.ou,
1053                           scope=ldb.SCOPE_ONELEVEL,
1054                           attrs=[attr],
1055                           controls=[sort_control,
1056                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1057
1058         # and this one should succeed
1059         res = self.ldb.search(self.ou,
1060                               scope=ldb.SCOPE_ONELEVEL,
1061                               attrs=[attr],
1062                               controls=[sort_control,
1063                                         "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1064
1065         # this one should fail because it is a new connection and
1066         # doesn't share cookies
1067         new_ldb = SamDB(host, credentials=creds,
1068                         session_info=system_session(lp), lp=lp)
1069
1070         self.assertRaises(ldb.LdbError,
1071                           new_ldb.search, self.ou,
1072                           scope=ldb.SCOPE_ONELEVEL,
1073                           attrs=[attr],
1074                           controls=[sort_control,
1075                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1076
1077         # but now without the critical flag it just does no VLV.
1078         new_ldb.search(self.ou,
1079                        scope=ldb.SCOPE_ONELEVEL,
1080                        attrs=[attr],
1081                        controls=[sort_control,
1082                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1083
1084     # Run a vlv search and return important fields of the response control
1085     def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
1086         sort_ctrl = "server_sort:1:0:%s" % attr
1087         ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
1088         if cookie:
1089             ctrl += ":" + cookie
1090
1091         res = self.ldb.search(self.ou,
1092                               expression=expr,
1093                               scope=ldb.SCOPE_ONELEVEL,
1094                               attrs=[attr],
1095                               controls=[ctrl, sort_ctrl])
1096         results = [str(x[attr][0]) for x in res]
1097
1098         ctrls = [str(c) for c in res.controls if
1099                  str(c).startswith('vlv')]
1100         self.assertEqual(len(ctrls), 1)
1101
1102         spl = ctrls[0].rsplit(':')
1103         cookie = ""
1104         if len(spl) == 6:
1105             cookie = spl[-1]
1106
1107         return results, cookie
1108
1109     def test_vlv_modify_during_view(self):
1110         attr = 'roomNumber'
1111         expr = "(objectclass=user)"
1112
1113         # Start new search
1114         full_results, cookie = self.vlv_search(attr, expr,
1115                                                after_count=len(self.users))
1116
1117         # Edit a user
1118         edit_index = len(self.users)//2
1119         edit_attr = full_results[edit_index]
1120         users_with_attr = [u for u in self.users if u[attr] == edit_attr]
1121         self.assertEqual(len(users_with_attr), 1)
1122         edit_user = users_with_attr[0]
1123
1124         # Put z at the front of the val so it comes last in ordering
1125         edit_val = "z_" + edit_user[attr]
1126
1127         m = ldb.Message()
1128         m.dn = ldb.Dn(self.ldb, edit_user['dn'])
1129         m[attr] = ldb.MessageElement(edit_val, ldb.FLAG_MOD_REPLACE, attr)
1130         self.ldb.modify(m)
1131
1132         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1133                                           after_count=len(self.users))
1134
1135         # Make expected_results by copying and editing full_results
1136         expected_results = full_results[:]
1137         expected_results[edit_index] = edit_val
1138         self.assertEqual(results, expected_results)
1139
1140     # Test changing the search expression in a request on an initialised view
1141     # Expected failure on samba, passes on windows
1142     def test_vlv_change_search_expr(self):
1143         attr = 'roomNumber'
1144         expr = "(objectclass=user)"
1145
1146         # Start new search
1147         full_results, cookie = self.vlv_search(attr, expr,
1148                                                after_count=len(self.users))
1149
1150         middle_index = len(full_results)//2
1151         # Search that excludes the old value but includes the new one
1152         expr = "%s>=%s" % (attr, full_results[middle_index])
1153         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1154                                           after_count=len(self.users))
1155         self.assertEqual(results, full_results[middle_index:])
1156
1157     # Check you can't add a value to a vlv view
1158     def test_vlv_add_during_view(self):
1159         attr = 'roomNumber'
1160         expr = "(objectclass=user)"
1161
1162         # Start new search
1163         full_results, cookie = self.vlv_search(attr, expr,
1164                                                after_count=len(self.users))
1165
1166         # Add a user at the end of the sort order
1167         add_val = "z_addedval"
1168         user = {'cn': add_val, "objectclass": "user", attr: add_val}
1169         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1170         self.ldb.add(user)
1171
1172         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1173                                           after_count=len(self.users)+1)
1174         self.assertEqual(results, full_results)
1175
1176     def test_vlv_delete_during_view(self):
1177         attr = 'roomNumber'
1178         expr = "(objectclass=user)"
1179
1180         # Start new search
1181         full_results, cookie = self.vlv_search(attr, expr,
1182                                                after_count=len(self.users))
1183
1184         # Delete one of the users
1185         del_index = len(self.users)//2
1186         del_user = self.users[del_index]
1187         self.ldb.delete(del_user['dn'])
1188
1189         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1190                                           after_count=len(self.users))
1191         expected_results = [r for r in full_results if r != del_user[attr]]
1192         self.assertEqual(results, expected_results)
1193
1194
1195 class PagedResultsTests(TestsWithUserOU):
1196
1197     def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
1198                      attrs=None, ou=None, subtree=False):
1199         ou = ou or self.ou
1200         if cookie:
1201             cookie = ":" + cookie
1202         ctrl = "paged_results:1:" + str(page_size) + cookie
1203         controls = [ctrl]
1204
1205         # If extra controls are provided then add them, else default to
1206         # sort control on 'cn' attribute
1207         if extra_ctrls is not None:
1208             controls += extra_ctrls
1209         else:
1210             sort_ctrl = "server_sort:1:0:cn"
1211             controls.append(sort_ctrl)
1212
1213         kwargs = {}
1214         if attrs is not None:
1215             kwargs = {"attrs": attrs}
1216
1217         scope = ldb.SCOPE_ONELEVEL
1218         if subtree:
1219             scope = ldb.SCOPE_SUBTREE
1220
1221         res = self.ldb.search(ou,
1222                               expression=expr,
1223                               scope=scope,
1224                               controls=controls,
1225                               **kwargs)
1226         results = [str(r['cn'][0]) for r in res]
1227
1228         ctrls = [str(c) for c in res.controls if
1229                  str(c).startswith("paged_results")]
1230         assert len(ctrls) == 1, "no paged_results response"
1231
1232         spl = ctrls[0].rsplit(':', 3)
1233         cookie = ""
1234         if len(spl) == 3:
1235             cookie = spl[-1]
1236         return results, cookie
1237
1238     def test_paged_delete_during_search(self):
1239         expr = "(objectClass=*)"
1240
1241         # Start new search
1242         first_page_size = 3
1243         results, cookie = self.paged_search(expr, page_size=first_page_size)
1244
1245         # Run normal search to get expected results
1246         unedited_results, _ = self.paged_search(expr,
1247                                                 page_size=len(self.users))
1248
1249         # Get remaining users not returned by the search above
1250         unreturned_users = [u for u in self.users if u['cn'] not in results]
1251
1252         # Delete one of the users
1253         del_index = len(self.users)//2
1254         del_user = unreturned_users[del_index]
1255         self.ldb.delete(del_user['dn'])
1256
1257         # Run test
1258         results, _ = self.paged_search(expr, cookie=cookie,
1259                                        page_size=len(self.users))
1260         expected_results = [r for r in unedited_results[first_page_size:]
1261                             if r != del_user['cn']]
1262         self.assertEqual(results, expected_results)
1263
1264     def test_paged_show_deleted(self):
1265         unique = time.strftime("%s", time.gmtime())[-5:]
1266         prefix = "show_deleted_test_%s_" % (unique)
1267         expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
1268         del_ctrl = "show_deleted:1"
1269
1270         num_users = 10
1271         users = []
1272         for i in range(num_users):
1273             user = self.create_user(i, num_users, prefix=prefix)
1274             users.append(user)
1275
1276         first_user = users[0]
1277         self.ldb.delete(first_user['dn'])
1278
1279         # Start new search
1280         first_page_size = 3
1281         results, cookie = self.paged_search(expr, page_size=first_page_size,
1282                                             extra_ctrls=[del_ctrl],
1283                                             ou=self.base_dn,
1284                                             subtree=True)
1285
1286         # Get remaining users not returned by the search above
1287         unreturned_users = [u for u in users if u['cn'] not in results]
1288
1289         # Delete one of the users
1290         del_index = len(users)//2
1291         del_user = unreturned_users[del_index]
1292         self.ldb.delete(del_user['dn'])
1293
1294         results2, _ = self.paged_search(expr, cookie=cookie,
1295                                         page_size=len(users)*2,
1296                                         extra_ctrls=[del_ctrl],
1297                                         ou=self.base_dn,
1298                                         subtree=True)
1299
1300         user_cns = {str(u['cn']) for u in users}
1301         deleted_cns = {first_user['cn'], del_user['cn']}
1302
1303         all_results = results + results2
1304         normal_results = {r for r in all_results if "DEL:" not in r}
1305         self.assertEqual(normal_results, user_cns - deleted_cns)
1306
1307         # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
1308         deleted_results = {r[:r.index('\n')] for r in all_results
1309                            if "DEL:" in r}
1310         self.assertEqual(deleted_results, deleted_cns)
1311
1312     def test_paged_add_during_search(self):
1313         expr = "(objectClass=*)"
1314
1315         # Start new search
1316         first_page_size = 3
1317         results, cookie = self.paged_search(expr, page_size=first_page_size)
1318
1319         unedited_results, _ = self.paged_search(expr,
1320                                                 page_size=len(self.users)+1)
1321
1322         # Get remaining users not returned by the search above
1323         unwalked_users = [cn for cn in unedited_results if cn not in results]
1324
1325         # Add a user in the middle of the sort order
1326         middle_index = len(unwalked_users)//2
1327         middle_user = unwalked_users[middle_index]
1328
1329         user = {'cn': middle_user + '_2', "objectclass": "user"}
1330         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1331         self.ldb.add(user)
1332
1333         results, _ = self.paged_search(expr, cookie=cookie,
1334                                        page_size=len(self.users)+1)
1335         expected_results = unwalked_users[:]
1336
1337         # Uncomment this line to assert that adding worked.
1338         # expected_results.insert(middle_index+1, user['cn'])
1339
1340         self.assertEqual(results, expected_results)
1341
1342     def test_paged_rename_during_search(self):
1343         expr = "(objectClass=*)"
1344
1345         # Start new search
1346         first_page_size = 3
1347         results, cookie = self.paged_search(expr, page_size=first_page_size)
1348
1349         unedited_results, _ = self.paged_search(expr,
1350                                                 page_size=len(self.users)+1)
1351
1352         # Modify user in the middle of the remaining sort order
1353         unwalked_users = [cn for cn in unedited_results if cn not in results]
1354         middle_index = len(unwalked_users)//2
1355         middle_cn = unwalked_users[middle_index]
1356
1357         # Find user object
1358         users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1359         self.assertEqual(len(users_with_middle_cn), 1)
1360         middle_user = users_with_middle_cn[0]
1361
1362         # Rename object
1363         edit_cn = "z_" + middle_cn
1364         new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
1365         self.ldb.rename(middle_user['dn'], new_dn)
1366
1367         results, _ = self.paged_search(expr, cookie=cookie,
1368                                        page_size=len(self.users)+1)
1369         expected_results = unwalked_users[:]
1370         expected_results[middle_index] = edit_cn
1371         self.assertEqual(results, expected_results)
1372
1373     def test_paged_modify_object_scope(self):
1374         expr = "(objectClass=*)"
1375
1376         ou2 = "OU=vlvtestou2,%s" % (self.tree_dn)
1377         self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
1378
1379         # Do a separate, full search to get all results
1380         unedited_results, _ = self.paged_search(expr,
1381                                                 page_size=len(self.users)+1)
1382
1383         # Rename before starting a search
1384         first_cn = self.users[0]['cn']
1385         new_dn = "CN=%s,%s" % (first_cn, ou2)
1386         self.ldb.rename(self.users[0]['dn'], new_dn)
1387
1388         # Start new search under the original OU
1389         first_page_size = 3
1390         results, cookie = self.paged_search(expr, page_size=first_page_size)
1391         self.assertEqual(results, unedited_results[1:1+first_page_size])
1392
1393         # Get one of the users that is yet to be returned
1394         unwalked_users = [cn for cn in unedited_results if cn not in results]
1395         middle_index = len(unwalked_users)//2
1396         middle_cn = unwalked_users[middle_index]
1397
1398         # Find user object
1399         users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1400         self.assertEqual(len(users_with_middle_cn), 1)
1401         middle_user = users_with_middle_cn[0]
1402
1403         # Rename
1404         new_dn = "CN=%s,%s" % (middle_cn, ou2)
1405         self.ldb.rename(middle_user['dn'], new_dn)
1406
1407         results, _ = self.paged_search(expr, cookie=cookie,
1408                                        page_size=len(self.users)+1)
1409
1410         expected_results = unwalked_users[:]
1411
1412         # We should really expect that the object renamed into a different
1413         # OU should vanish from the results, but turns out Windows does return
1414         # the object in this case.  Our module matches the Windows behaviour.
1415
1416         # If behaviour changes, this line inverts the test's expectations to
1417         # what you might expect.
1418         # del expected_results[middle_index]
1419
1420         # But still expect the user we removed before the search to be gone
1421         del expected_results[0]
1422
1423         self.assertEqual(results, expected_results)
1424
1425     def test_paged_modify_one_during_search(self):
1426         prefix = "change_during_search_"
1427         num_users = 5
1428         users = [self.create_user(i, num_users, prefix=prefix)
1429                  for i in range(num_users)]
1430         expr = "(&(objectClass=user)(facsimileTelephoneNumber=%s*))" % (prefix)
1431
1432         # Get the first page, then change the searched attribute and
1433         # try for the second page.
1434         results, cookie = self.paged_search(expr, page_size=1)
1435         self.assertEqual(len(results), 1)
1436         unwalked_users = [u for u in users if u['cn'] != results[0]]
1437         self.assertEqual(len(unwalked_users), num_users-1)
1438
1439         mod_dn = unwalked_users[0]['dn']
1440         self.ldb.modify_ldif("dn: %s\n"
1441                              "changetype: modify\n"
1442                              "replace: facsimileTelephoneNumber\n"
1443                              "facsimileTelephoneNumber: 123" % mod_dn)
1444
1445         results, _ = self.paged_search(expr, cookie=cookie,
1446                                        page_size=len(self.users))
1447         expected_cns = {u['cn'] for u in unwalked_users if u['dn'] != mod_dn}
1448         self.assertEqual(set(results), expected_cns)
1449
1450     def test_paged_modify_all_during_search(self):
1451         prefix = "change_during_search_"
1452         num_users = 5
1453         users = [self.create_user(i, num_users, prefix=prefix)
1454                  for i in range(num_users)]
1455         expr = "(&(objectClass=user)(facsimileTelephoneNumber=%s*))" % (prefix)
1456
1457         # Get the first page, then change the searched attribute and
1458         # try for the second page.
1459         results, cookie = self.paged_search(expr, page_size=1)
1460         unwalked_users = [u for u in users if u['cn'] != results[0]]
1461
1462         for u in users:
1463             self.ldb.modify_ldif("dn: %s\n"
1464                                  "changetype: modify\n"
1465                                  "replace: facsimileTelephoneNumber\n"
1466                                  "facsimileTelephoneNumber: 123" % u['dn'])
1467
1468         results, _ = self.paged_search(expr, cookie=cookie,
1469                                        page_size=len(self.users))
1470         self.assertEqual(results, [])
1471
1472     def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
1473                                 extra_ctrls=None):
1474         try:
1475             results, _ = self.paged_search(expr, cookie=cookie,
1476                                            page_size=2,
1477                                            extra_ctrls=extra_ctrls,
1478                                            attrs=attrs)
1479         except ldb.LdbError as e:
1480             self.assertEqual(e.args[0], err_num)
1481             return
1482
1483         self.fail("No error raised by invalid search")
1484
1485     def test_paged_changed_expr(self):
1486         # Initiate search then use a different expr in subsequent req
1487         expr = "(objectClass=*)"
1488         results, cookie = self.paged_search(expr, page_size=3)
1489         expr = "cn>=a"
1490         expected_error_num = 12
1491         self.assertPagedSearchRaises(expected_error_num, expr, cookie)
1492
1493     def test_paged_changed_controls(self):
1494         expr = "(objectClass=*)"
1495         sort_ctrl = "server_sort:1:0:cn"
1496         del_ctrl = "show_deleted:1"
1497         expected_error_num = 12
1498         ps = 3
1499
1500         # Initiate search with a sort control then remove in subsequent req
1501         results, cookie = self.paged_search(expr, page_size=ps,
1502                                             extra_ctrls=[sort_ctrl])
1503         self.assertPagedSearchRaises(expected_error_num, expr,
1504                                      cookie, extra_ctrls=[])
1505
1506         # Initiate search with no sort control then add one in subsequent req
1507         results, cookie = self.paged_search(expr, page_size=ps,
1508                                             extra_ctrls=[])
1509         self.assertPagedSearchRaises(expected_error_num, expr,
1510                                      cookie, extra_ctrls=[sort_ctrl])
1511
1512         # Initiate search with show-deleted control then
1513         # remove it in subsequent req
1514         results, cookie = self.paged_search(expr, page_size=ps,
1515                                             extra_ctrls=[del_ctrl])
1516         self.assertPagedSearchRaises(expected_error_num, expr,
1517                                      cookie, extra_ctrls=[])
1518
1519         # Initiate normal search then add show-deleted control
1520         # in subsequent req
1521         results, cookie = self.paged_search(expr, page_size=ps,
1522                                             extra_ctrls=[])
1523         self.assertPagedSearchRaises(expected_error_num, expr,
1524                                      cookie, extra_ctrls=[del_ctrl])
1525
1526         # Changing order of controls shouldn't break the search
1527         results, cookie = self.paged_search(expr, page_size=ps,
1528                                             extra_ctrls=[del_ctrl, sort_ctrl])
1529         try:
1530             results, cookie = self.paged_search(expr, page_size=ps,
1531                                                 extra_ctrls=[sort_ctrl,
1532                                                              del_ctrl])
1533         except ldb.LdbError as e:
1534             self.fail(e)
1535
1536     def test_paged_cant_change_controls_data(self):
1537         # Some defaults for the rest of the tests
1538         expr = "(objectClass=*)"
1539         sort_ctrl = "server_sort:1:0:cn"
1540         expected_error_num = 12
1541
1542         # Initiate search with sort control then change it in subsequent req
1543         results, cookie = self.paged_search(expr, page_size=3,
1544                                             extra_ctrls=[sort_ctrl])
1545         changed_sort_ctrl = "server_sort:1:0:roomNumber"
1546         self.assertPagedSearchRaises(expected_error_num, expr,
1547                                      cookie, extra_ctrls=[changed_sort_ctrl])
1548
1549         # Initiate search with a control with crit=1, then use crit=0
1550         results, cookie = self.paged_search(expr, page_size=3,
1551                                             extra_ctrls=[sort_ctrl])
1552         changed_sort_ctrl = "server_sort:0:0:cn"
1553         self.assertPagedSearchRaises(expected_error_num, expr,
1554                                      cookie, extra_ctrls=[changed_sort_ctrl])
1555
1556     def test_paged_search_referrals(self):
1557         expr = "(objectClass=*)"
1558         paged_ctrl = "paged_results:1:5"
1559         res = self.ldb.search(self.base_dn,
1560                               expression=expr,
1561                               attrs=['cn'],
1562                               scope=ldb.SCOPE_SUBTREE,
1563                               controls=[paged_ctrl])
1564
1565         # Do a paged search walk over the whole database and save a list
1566         # of all the referrals returned by each search.
1567         referral_lists = []
1568
1569         while True:
1570             referral_lists.append(res.referals)
1571
1572             ctrls = [str(c) for c in res.controls if
1573                      str(c).startswith("paged_results")]
1574             self.assertEqual(len(ctrls), 1)
1575             spl = ctrls[0].rsplit(':')
1576             if len(spl) != 3:
1577                 break
1578
1579             cookie = spl[-1]
1580             res = self.ldb.search(self.base_dn,
1581                                   expression=expr,
1582                                   attrs=['cn'],
1583                                   scope=ldb.SCOPE_SUBTREE,
1584                                   controls=[paged_ctrl + ":" + cookie])
1585
1586         ref_list = referral_lists[0]
1587
1588         # Sanity check to make sure the search actually did something
1589         self.assertGreater(len(referral_lists), 2)
1590
1591         # Check the first referral set contains stuff
1592         self.assertGreater(len(ref_list), 0)
1593
1594         # Check the others don't
1595         self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
1596
1597         # Check the entries in the first referral list look like referrals
1598         self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
1599
1600     def test_paged_change_attrs(self):
1601         expr = "(objectClass=*)"
1602         attrs = ['cn']
1603         expected_error_num = 12
1604
1605         results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
1606         results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
1607                                             attrs=attrs)
1608
1609         changed_attrs = attrs + ['roomNumber']
1610         self.assertPagedSearchRaises(expected_error_num, expr,
1611                                      cookie, attrs=changed_attrs,
1612                                      extra_ctrls=[])
1613
1614     def test_paged_search_lockstep(self):
1615         expr = "(objectClass=*)"
1616         ps = 3
1617
1618         all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
1619
1620         # Run two different but overlapping paged searches simultaneously.
1621         set_1_index = int((len(all_results))//3)
1622         set_2_index = int((2*len(all_results))//3)
1623         set_1 = all_results[set_1_index:]
1624         set_2 = all_results[:set_2_index+1]
1625         set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
1626         set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
1627
1628         results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
1629         self.assertEqual(results, set_1[:ps])
1630         results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
1631         self.assertEqual(results, set_2[:ps])
1632
1633         results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
1634                                              page_size=ps)
1635         self.assertEqual(results, set_1[ps:ps*2])
1636         results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
1637                                              page_size=ps)
1638         self.assertEqual(results, set_2[ps:ps*2])
1639
1640         results, _ = self.paged_search(set_1_expr, cookie=cookie1,
1641                                        page_size=len(self.users))
1642         self.assertEqual(results, set_1[ps*2:])
1643         results, _ = self.paged_search(set_2_expr, cookie=cookie2,
1644                                        page_size=len(self.users))
1645         self.assertEqual(results, set_2[ps*2:])
1646
1647
1648 if "://" not in host:
1649     if os.path.isfile(host):
1650         host = "tdb://%s" % host
1651     else:
1652         host = "ldap://%s" % host
1653
1654
1655 TestProgram(module=__name__, opts=subunitopts)