paged results: testing suite for new paged results module
[samba.git] / source4 / dsdb / tests / python / vlv.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
5 import optparse
6 import sys
7 import os
8 import base64
9 import random
10 import re
11
12 sys.path.insert(0, "bin/python")
13 import samba
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
15
16 import samba.getopt as options
17
18 from samba.auth import system_session
19 import ldb
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
23
24 import time
25
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
35
36 parser.add_option('--elements', type='int', default=20,
37                   help="use this many elements in the tests")
38
39 parser.add_option('--delete-in-setup', action='store_true',
40                   help="cleanup in next setup rather than teardown")
41
42 parser.add_option('--skip-attr-regex',
43                   help="ignore attributes matching this regex")
44
45 opts, args = parser.parse_args()
46
47 if len(args) < 1:
48     parser.print_usage()
49     sys.exit(1)
50
51 host = args[0]
52
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
55
56 N_ELEMENTS = opts.elements
57
58
59 class VlvTestException(Exception):
60     pass
61
62
63 def encode_vlv_control(critical=1,
64                        before=0, after=0,
65                        offset=None,
66                        gte=None,
67                        n=0, cookie=None):
68
69     s = "vlv:%d:%d:%d:" % (critical, before, after)
70
71     if offset is not None:
72         m = "%d:%d" % (offset, n)
73     elif b':' in gte or b'\x00' in gte:
74         gte = get_string(base64.b64encode(gte))
75         m = "base64>=%s" % gte
76     else:
77         m = ">=%s" % get_string(gte)
78
79     if cookie is None:
80         return s + m
81
82     return s + m + ':' + cookie
83
84
85 def get_cookie(controls, expected_n=None):
86     """Get the cookie, STILL base64 encoded, or raise ValueError."""
87     for c in list(controls):
88         cstr = str(c)
89         if cstr.startswith('vlv_resp'):
90             head, n, _, cookie = cstr.rsplit(':', 3)
91             if expected_n is not None and int(n) != expected_n:
92                 raise ValueError("Expected %s items, server said %s" %
93                                  (expected_n, n))
94             return cookie
95     raise ValueError("there is no VLV response")
96
97
98 class TestsWithUserOU(samba.tests.TestCase):
99
100     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101         name = "%s%d%s" % (prefix, i, suffix)
102         user = {
103             'cn': name,
104             "objectclass": "user",
105             'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106             "roomNumber": "%sbc" % (n - i),
107             "carLicense": "后来经",
108             "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
109             "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
110             "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
111             "flags": str(i * (n - i)),
112             "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
113                                             ' 3z}'[i & 3],
114                                             '"@'[i & 1],),
115         }
116
117         # _user_broken_attrs tests are broken due to problems outside
118         # of VLV.
119         _user_broken_attrs = {
120             # Sort doesn't look past a NUL byte.
121             "photo": "\x00%d" % (n - i),
122             "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
123                                                              chr(i & 255), i),
124             "displayNamePrintable": "%d\x00%c" % (i, i & 255),
125             "adminDisplayName": "%d\x00b" % (n - i),
126             "title": "%d%sb" % (n - i, '\x00' * i),
127             "comment": "Favourite colour is %d" % (n % (i + 1)),
128
129             # Names that vary only in case. Windows returns
130             # equivalent addresses in the order they were put
131             # in ('a st', 'A st',...).
132             "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
133         }
134
135         if attrs is not None:
136             user.update(attrs)
137
138         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
139
140         if opts.skip_attr_regex:
141             match = re.compile(opts.skip_attr_regex).search
142             for k in user.keys():
143                 if match(k):
144                     del user[k]
145
146         self.users.append(user)
147         self.ldb.add(user)
148         return user
149
150     def setUp(self):
151         super(TestsWithUserOU, self).setUp()
152         self.ldb = SamDB(host, credentials=creds,
153                          session_info=system_session(lp), lp=lp)
154
155         self.base_dn = self.ldb.domain_dn()
156         self.ou = "ou=vlv,%s" % self.base_dn
157         if opts.delete_in_setup:
158             try:
159                 self.ldb.delete(self.ou, ['tree_delete:1'])
160             except ldb.LdbError as e:
161                 print("tried deleting %s, got error %s" % (self.ou, e))
162         self.ldb.add({
163             "dn": self.ou,
164             "objectclass": "organizationalUnit"})
165
166         self.users = []
167         for i in range(N_ELEMENTS):
168             self.create_user(i, N_ELEMENTS)
169
170         attrs = self.users[0].keys()
171         self.binary_sorted_keys = ['audio',
172                                    'photo',
173                                    "msTSExpireDate4",
174                                    'serialNumber',
175                                    "displayNamePrintable"]
176
177         self.numeric_sorted_keys = ['flags',
178                                     'accountExpires']
179
180         self.timestamp_keys = ['msTSExpireDate4']
181
182         self.int64_keys = set(['accountExpires'])
183
184         self.locale_sorted_keys = [x for x in attrs if
185                                    x not in (self.binary_sorted_keys +
186                                              self.numeric_sorted_keys)]
187
188         # don't try spaces, etc in cn
189         self.delicate_keys = ['cn']
190
191     def tearDown(self):
192         super(TestsWithUserOU, self).tearDown()
193         if not opts.delete_in_setup:
194             self.ldb.delete(self.ou, ['tree_delete:1'])
195
196
197 class VLVTests(TestsWithUserOU):
198
199     def get_full_list(self, attr, include_cn=False):
200         """Fetch the whole list sorted on the attribute, using the VLV.
201         This way you get a VLV cookie."""
202         n_users = len(self.users)
203         sort_control = "server_sort:1:0:%s" % attr
204         half_n = n_users // 2
205         vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
206         attrs = [attr]
207         if include_cn:
208             attrs.append('cn')
209         res = self.ldb.search(self.ou,
210                               scope=ldb.SCOPE_ONELEVEL,
211                               attrs=attrs,
212                               controls=[sort_control,
213                                         vlv_search])
214         if include_cn:
215             full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
216         else:
217             full_results = [str(x[attr][0]).lower() for x in res]
218         controls = res.controls
219         return full_results, controls, sort_control
220
221     def get_expected_order(self, attr, expression=None):
222         """Fetch the whole list sorted on the attribute, using sort only."""
223         sort_control = "server_sort:1:0:%s" % attr
224         res = self.ldb.search(self.ou,
225                               scope=ldb.SCOPE_ONELEVEL,
226                               expression=expression,
227                               attrs=[attr],
228                               controls=[sort_control])
229         results = [x[attr][0] for x in res]
230         return results
231
232     def delete_user(self, user):
233         self.ldb.delete(user['dn'])
234         del self.users[self.users.index(user)]
235
236     def get_gte_tests_and_order(self, attr, expression=None):
237         expected_order = self.get_expected_order(attr, expression=expression)
238         gte_users = []
239         if attr in self.delicate_keys:
240             gte_keys = [
241                 '3',
242                 'abc',
243                 '¹',
244                 'ŋđ¼³ŧ“«đð',
245                 '桑巴',
246             ]
247         elif attr in self.timestamp_keys:
248             gte_keys = [
249                 '18560101010000.0Z',
250                 '19140103010000.0Z',
251                 '19560101010010.0Z',
252                 '19700101000000.0Z',
253                 '19991231211234.3Z',
254                 '20061111211234.0Z',
255                 '20390901041234.0Z',
256                 '25560101010000.0Z',
257             ]
258         elif attr not in self.numeric_sorted_keys:
259             gte_keys = [
260                 '3',
261                 'abc',
262                 ' ',
263                 '!@#!@#!',
264                 'kōkako',
265                 '¹',
266                 'ŋđ¼³ŧ“«đð',
267                 '\n\t\t',
268                 '桑巴',
269                 'zzzz',
270             ]
271             if expected_order:
272                 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
273
274         else:
275             # "numeric" means positive integers
276             # doesn't work with -1, 3.14, ' 3', '9' * 20
277             gte_keys = ['3',
278                         '1' * 10,
279                         '1',
280                         '9' * 7,
281                         '0']
282
283             if attr in self.int64_keys:
284                 gte_keys += ['3' * 12, '71' * 8]
285
286         for i, x in enumerate(gte_keys):
287             user = self.create_user(i, N_ELEMENTS,
288                                     prefix='gte',
289                                     attrs={attr: x})
290             gte_users.append(user)
291
292         gte_order = self.get_expected_order(attr)
293         for user in gte_users:
294             self.delete_user(user)
295
296         # for sanity's sake
297         expected_order_2 = self.get_expected_order(attr, expression=expression)
298         self.assertEqual(expected_order, expected_order_2)
299
300         # Map gte tests to indexes in expected order. This will break
301         # if gte_order and expected_order are differently ordered (as
302         # it should).
303         gte_map = {}
304
305         # index to the first one with each value
306         index_map = {}
307         for i, k in enumerate(expected_order):
308             if k not in index_map:
309                 index_map[k] = i
310
311         keys = []
312         for k in gte_order:
313             if k in index_map:
314                 i = index_map[k]
315                 gte_map[k] = i
316                 for k in keys:
317                     gte_map[k] = i
318                 keys = []
319             else:
320                 keys.append(k)
321
322         for k in keys:
323             gte_map[k] = len(expected_order)
324
325         if False:
326             print("gte_map:")
327             for k in gte_order:
328                 print("   %10s => %10s" % (k, gte_map[k]))
329
330         return gte_order, expected_order, gte_map
331
332     def assertCorrectResults(self, results, expected_order,
333                              offset, before, after):
334         """A helper to calculate offsets correctly and say as much as possible
335         when something goes wrong."""
336
337         start = max(offset - before - 1, 0)
338         end = offset + after
339         expected_results = expected_order[start: end]
340
341         # if it is a tuple with the cn, drop the cn
342         if expected_results and isinstance(expected_results[0], tuple):
343             expected_results = [x[0] for x in expected_results]
344
345         if expected_results == results:
346             return
347
348         if expected_order is not None:
349             print("expected order: %s" % expected_order[:20])
350             if len(expected_order) > 20:
351                 print("... and %d more not shown" % (len(expected_order) - 20))
352
353         print("offset %d before %d after %d" % (offset, before, after))
354         print("start %d end %d" % (start, end))
355         print("expected: %s" % expected_results)
356         print("got     : %s" % results)
357         self.assertEquals(expected_results, results)
358
359     def test_server_vlv_with_cookie(self):
360         attrs = [x for x in self.users[0].keys() if x not in
361                  ('dn', 'objectclass')]
362         for attr in attrs:
363             expected_order = self.get_expected_order(attr)
364             sort_control = "server_sort:1:0:%s" % attr
365             res = None
366             n = len(self.users)
367             for before in [10, 0, 3, 1, 4, 5, 2]:
368                 for after in [0, 3, 1, 4, 5, 2, 7]:
369                     for offset in range(max(1, before - 2),
370                                         min(n - after + 2, n)):
371                         if res is None:
372                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
373                                                                offset)
374                         else:
375                             cookie = get_cookie(res.controls, n)
376                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
377                                           (before, after, offset, n,
378                                            cookie))
379
380                         res = self.ldb.search(self.ou,
381                                               scope=ldb.SCOPE_ONELEVEL,
382                                               attrs=[attr],
383                                               controls=[sort_control,
384                                                         vlv_search])
385
386                         results = [x[attr][0] for x in res]
387
388                         self.assertCorrectResults(results, expected_order,
389                                                   offset, before, after)
390
391     def run_index_tests_with_expressions(self, expressions):
392         # Here we don't test every before/after combination.
393         attrs = [x for x in self.users[0].keys() if x not in
394                  ('dn', 'objectclass')]
395         for attr in attrs:
396             for expression in expressions:
397                 expected_order = self.get_expected_order(attr, expression)
398                 sort_control = "server_sort:1:0:%s" % attr
399                 res = None
400                 n = len(expected_order)
401                 for before in range(0, 11):
402                     after = before
403                     for offset in range(max(1, before - 2),
404                                         min(n - after + 2, n)):
405                         if res is None:
406                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
407                                                                offset)
408                         else:
409                             cookie = get_cookie(res.controls)
410                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
411                                           (before, after, offset, n,
412                                            cookie))
413
414                         res = self.ldb.search(self.ou,
415                                               expression=expression,
416                                               scope=ldb.SCOPE_ONELEVEL,
417                                               attrs=[attr],
418                                               controls=[sort_control,
419                                                         vlv_search])
420
421                         results = [x[attr][0] for x in res]
422
423                         self.assertCorrectResults(results, expected_order,
424                                                   offset, before, after)
425
426     def test_server_vlv_with_expression(self):
427         """What happens when we run the VLV with an expression?"""
428         expressions = ["(objectClass=*)",
429                        "(cn=%s)" % self.users[-1]['cn'],
430                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
431                        ]
432         self.run_index_tests_with_expressions(expressions)
433
434     def test_server_vlv_with_failing_expression(self):
435         """What happens when we run the VLV on an expression that matches
436         nothing?"""
437         expressions = ["(samaccountname=testferf)",
438                        "(cn=hefalump)",
439                        ]
440         self.run_index_tests_with_expressions(expressions)
441
442     def run_gte_tests_with_expressions(self, expressions):
443         # Here we don't test every before/after combination.
444         attrs = [x for x in self.users[0].keys() if x not in
445                  ('dn', 'objectclass')]
446         for expression in expressions:
447             for attr in attrs:
448                 gte_order, expected_order, gte_map = \
449                     self.get_gte_tests_and_order(attr, expression)
450                 # In case there is some order dependency, disorder tests
451                 gte_tests = gte_order[:]
452                 random.seed(2)
453                 random.shuffle(gte_tests)
454                 res = None
455                 sort_control = "server_sort:1:0:%s" % attr
456
457                 expected_order = self.get_expected_order(attr, expression)
458                 sort_control = "server_sort:1:0:%s" % attr
459                 res = None
460                 for before in range(0, 11):
461                     after = before
462                     for gte in gte_tests:
463                         if res is not None:
464                             cookie = get_cookie(res.controls)
465                         else:
466                             cookie = None
467                         vlv_search = encode_vlv_control(before=before,
468                                                         after=after,
469                                                         gte=get_bytes(gte),
470                                                         cookie=cookie)
471
472                         res = self.ldb.search(self.ou,
473                                               scope=ldb.SCOPE_ONELEVEL,
474                                               expression=expression,
475                                               attrs=[attr],
476                                               controls=[sort_control,
477                                                         vlv_search])
478
479                         results = [x[attr][0] for x in res]
480                         offset = gte_map.get(gte, len(expected_order))
481
482                         # here offset is 0-based
483                         start = max(offset - before, 0)
484                         end = offset + 1 + after
485
486                         expected_results = expected_order[start: end]
487
488                         self.assertEquals(expected_results, results)
489
490     def test_vlv_gte_with_expression(self):
491         """What happens when we run the VLV with an expression?"""
492         expressions = ["(objectClass=*)",
493                        "(cn=%s)" % self.users[-1]['cn'],
494                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
495                        ]
496         self.run_gte_tests_with_expressions(expressions)
497
498     def test_vlv_gte_with_failing_expression(self):
499         """What happens when we run the VLV on an expression that matches
500         nothing?"""
501         expressions = ["(samaccountname=testferf)",
502                        "(cn=hefalump)",
503                        ]
504         self.run_gte_tests_with_expressions(expressions)
505
506     def test_server_vlv_with_cookie_while_adding_and_deleting(self):
507         """What happens if we add or remove items in the middle of the VLV?
508
509         Nothing. The search and the sort is not repeated, and we only
510         deal with the objects originally found.
511         """
512         attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
513                           ('dn', 'objectclass')]
514         user_number = 0
515         iteration = 0
516         for attr in attrs:
517             full_results, controls, sort_control = \
518                             self.get_full_list(attr, True)
519             original_n = len(self.users)
520
521             expected_order = full_results
522             random.seed(1)
523
524             for before in list(range(0, 3)) + [6, 11, 19]:
525                 for after in list(range(0, 3)) + [6, 11, 19]:
526                     start = max(before - 1, 1)
527                     end = max(start + 4, original_n - after + 2)
528                     for offset in range(start, end):
529                         # if iteration > 2076:
530                         #    return
531                         cookie = get_cookie(controls, original_n)
532                         vlv_search = encode_vlv_control(before=before,
533                                                         after=after,
534                                                         offset=offset,
535                                                         n=original_n,
536                                                         cookie=cookie)
537
538                         iteration += 1
539                         res = self.ldb.search(self.ou,
540                                               scope=ldb.SCOPE_ONELEVEL,
541                                               attrs=[attr],
542                                               controls=[sort_control,
543                                                         vlv_search])
544
545                         controls = res.controls
546                         results = [x[attr][0] for x in res]
547                         real_offset = max(1, min(offset, len(expected_order)))
548
549                         expected_results = []
550                         skipped = 0
551                         begin_offset = max(real_offset - before - 1, 0)
552                         real_before = min(before, real_offset - 1)
553                         real_after = min(after,
554                                          len(expected_order) - real_offset)
555
556                         for x in expected_order[begin_offset:]:
557                             if x is not None:
558                                 expected_results.append(get_bytes(x[0]))
559                                 if (len(expected_results) ==
560                                     real_before + real_after + 1):
561                                     break
562                             else:
563                                 skipped += 1
564
565                         if expected_results != results:
566                             print("attr %s before %d after %d offset %d" %
567                                   (attr, before, after, offset))
568                         self.assertEquals(expected_results, results)
569
570                         n = len(self.users)
571                         if random.random() < 0.1 + (n < 5) * 0.05:
572                             if n == 0:
573                                 i = 0
574                             else:
575                                 i = random.randrange(n)
576                             user = self.create_user(i, n, suffix='-%s' %
577                                                     user_number)
578                             user_number += 1
579                         if random.random() < 0.1  + (n > 50) * 0.02 and n:
580                             index = random.randrange(n)
581                             user = self.users.pop(index)
582
583                             self.ldb.delete(user['dn'])
584
585                             replaced = (user[attr], user['cn'])
586                             if replaced in expected_order:
587                                 i = expected_order.index(replaced)
588                                 expected_order[i] = None
589
590     def test_server_vlv_with_cookie_while_changing(self):
591         """What happens if we modify items in the middle of the VLV?
592
593         The expected behaviour (as found on Windows) is the sort is
594         not repeated, but the changes in attributes are reflected.
595         """
596         attrs = [x for x in self.users[0].keys() if x not in
597                  ('dn', 'objectclass', 'cn')]
598         for attr in attrs:
599             n_users = len(self.users)
600             expected_order = [x.upper() for x in self.get_expected_order(attr)]
601             sort_control = "server_sort:1:0:%s" % attr
602             res = None
603             i = 0
604
605             # First we'll fetch the whole list so we know the original
606             # sort order. This is necessary because we don't know how
607             # the server will order equivalent items. We are using the
608             # dn as a key.
609             half_n = n_users // 2
610             vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
611             res = self.ldb.search(self.ou,
612                                   scope=ldb.SCOPE_ONELEVEL,
613                                   attrs=['dn', attr],
614                                   controls=[sort_control, vlv_search])
615
616             results = [x[attr][0].upper() for x in res]
617             #self.assertEquals(expected_order, results)
618
619             dn_order = [str(x['dn']) for x in res]
620             values = results[:]
621
622             for before in range(0, 3):
623                 for after in range(0, 3):
624                     for offset in range(1 + before, n_users - after):
625                         cookie = get_cookie(res.controls, len(self.users))
626                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
627                                       (before, after, offset, len(self.users),
628                                        cookie))
629
630                         res = self.ldb.search(self.ou,
631                                               scope=ldb.SCOPE_ONELEVEL,
632                                               attrs=['dn', attr],
633                                               controls=[sort_control,
634                                                         vlv_search])
635
636                         dn_results = [str(x['dn']) for x in res]
637                         dn_expected = dn_order[offset - before - 1:
638                                                offset + after]
639
640                         self.assertEquals(dn_expected, dn_results)
641
642                         results = [x[attr][0].upper() for x in res]
643
644                         self.assertCorrectResults(results, values,
645                                                   offset, before, after)
646
647                         i += 1
648                         if i % 3 == 2:
649                             if (attr in self.locale_sorted_keys or
650                                 attr in self.binary_sorted_keys):
651                                 i1 = i % n_users
652                                 i2 = (i ^ 255) % n_users
653                                 dn1 = dn_order[i1]
654                                 dn2 = dn_order[i2]
655                                 v2 = values[i2]
656
657                                 if v2 in self.locale_sorted_keys:
658                                     v2 += '-%d' % i
659                                 cn1 = dn1.split(',', 1)[0][3:]
660                                 cn2 = dn2.split(',', 1)[0][3:]
661
662                                 values[i1] = v2
663
664                                 m = ldb.Message()
665                                 m.dn = ldb.Dn(self.ldb, dn1)
666                                 m[attr] = ldb.MessageElement(v2,
667                                                              ldb.FLAG_MOD_REPLACE,
668                                                              attr)
669
670                                 self.ldb.modify(m)
671
672     def test_server_vlv_fractions_with_cookie(self):
673         """What happens when the count is set to an arbitrary number?
674
675         In that case the offset and the count form a fraction, and the
676         VLV should be centred at a point offset/count of the way
677         through. For example, if offset is 3 and count is 6, the VLV
678         should be looking around halfway. The actual algorithm is a
679         bit fiddlier than that, because of the one-basedness of VLV.
680         """
681         attrs = [x for x in self.users[0].keys() if x not in
682                  ('dn', 'objectclass')]
683
684         n_users = len(self.users)
685
686         random.seed(4)
687
688         for attr in attrs:
689             full_results, controls, sort_control = self.get_full_list(attr)
690             self.assertEqual(len(full_results), n_users)
691             for before in range(0, 2):
692                 for after in range(0, 2):
693                     for denominator in range(1, 20):
694                         for offset in range(1, denominator + 3):
695                             cookie = get_cookie(controls, len(self.users))
696                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
697                                           (before, after, offset,
698                                            denominator,
699                                            cookie))
700                             try:
701                                 res = self.ldb.search(self.ou,
702                                                       scope=ldb.SCOPE_ONELEVEL,
703                                                       attrs=[attr],
704                                                       controls=[sort_control,
705                                                                 vlv_search])
706                             except ldb.LdbError as e:
707                                 if offset != 0:
708                                     raise
709                                 print("offset %d denominator %d raised error "
710                                       "expected error %s\n"
711                                       "(offset zero is illegal unless "
712                                       "content count is zero)" %
713                                       (offset, denominator, e))
714                                 continue
715
716                             results = [str(x[attr][0]).lower() for x in res]
717
718                             if denominator == 0:
719                                 denominator = n_users
720                                 if offset == 0:
721                                     offset = denominator
722                             elif denominator == 1:
723                                 # the offset can only be 1, but the 1/1 case
724                                 # means something special
725                                 if offset == 1:
726                                     real_offset = n_users
727                                 else:
728                                     real_offset = 1
729                             else:
730                                 if offset > denominator:
731                                     offset = denominator
732                                 real_offset = (1 +
733                                                int(round((n_users - 1) *
734                                                          (offset - 1) /
735                                                          (denominator - 1.0)))
736                                                )
737
738                             self.assertCorrectResults(results, full_results,
739                                                       real_offset, before,
740                                                       after)
741
742                             controls = res.controls
743                             if False:
744                                 for c in list(controls):
745                                     cstr = str(c)
746                                     if cstr.startswith('vlv_resp'):
747                                         bits = cstr.rsplit(':')
748                                         print("the answer is %s; we said %d" %
749                                               (bits[2], real_offset))
750                                         break
751
752     def test_server_vlv_no_cookie(self):
753         attrs = [x for x in self.users[0].keys() if x not in
754                  ('dn', 'objectclass')]
755
756         for attr in attrs:
757             expected_order = self.get_expected_order(attr)
758             sort_control = "server_sort:1:0:%s" % attr
759             for before in range(0, 5):
760                 for after in range(0, 7):
761                     for offset in range(1 + before, len(self.users) - after):
762                         res = self.ldb.search(self.ou,
763                                               scope=ldb.SCOPE_ONELEVEL,
764                                               attrs=[attr],
765                                               controls=[sort_control,
766                                                         "vlv:1:%d:%d:%d:0" %
767                                                         (before, after,
768                                                          offset)])
769                         results = [x[attr][0] for x in res]
770                         self.assertCorrectResults(results, expected_order,
771                                                   offset, before, after)
772
773     def get_expected_order_showing_deleted(self, attr,
774                                            expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
775                                            base=None,
776                                            scope=ldb.SCOPE_SUBTREE
777                                            ):
778         """Fetch the whole list sorted on the attribute, using sort only,
779         searching in the entire tree, not just our OU. This is the
780         way to find deleted objects.
781         """
782         if base is None:
783             base = self.base_dn
784         sort_control = "server_sort:1:0:%s" % attr
785         controls = [sort_control, "show_deleted:1"]
786
787         res = self.ldb.search(base,
788                               scope=scope,
789                               expression=expression,
790                               attrs=[attr],
791                               controls=controls)
792         results = [x[attr][0] for x in res]
793         return results
794
795     def add_deleted_users(self, n):
796         deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
797                          for i in range(n)]
798
799         for user in deleted_users:
800             self.delete_user(user)
801
802     def test_server_vlv_no_cookie_show_deleted(self):
803         """What do we see with the show_deleted control?"""
804         attrs = ['objectGUID',
805                  'cn',
806                  'sAMAccountName',
807                  'objectSid',
808                  'name',
809                  'whenChanged',
810                  'usnChanged'
811                  ]
812
813         # add some deleted users first, just in case there are none
814         self.add_deleted_users(6)
815         random.seed(22)
816         expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
817
818         for attr in attrs:
819             show_deleted_control = "show_deleted:1"
820             expected_order = self.get_expected_order_showing_deleted(attr,
821                                                                      expression)
822             n = len(expected_order)
823             sort_control = "server_sort:1:0:%s" % attr
824             for before in [3, 1, 0]:
825                 for after in [0, 2]:
826                     # don't test every position, because there could be hundreds.
827                     # jump back and forth instead
828                     for i in range(20):
829                         offset = random.randrange(max(1, before - 2),
830                                                   min(n - after + 2, n))
831                         res = self.ldb.search(self.base_dn,
832                                               expression=expression,
833                                               scope=ldb.SCOPE_SUBTREE,
834                                               attrs=[attr],
835                                               controls=[sort_control,
836                                                         show_deleted_control,
837                                                         "vlv:1:%d:%d:%d:0" %
838                                                         (before, after,
839                                                          offset)
840                                                         ]
841                                               )
842                         results = [x[attr][0] for x in res]
843                         self.assertCorrectResults(results, expected_order,
844                                                   offset, before, after)
845
846     def test_server_vlv_no_cookie_show_deleted_only(self):
847         """What do we see with the show_deleted control when we're not looking
848         at any non-deleted things"""
849         attrs = ['objectGUID',
850                  'cn',
851                  'sAMAccountName',
852                  'objectSid',
853                  'whenChanged',
854                  ]
855
856         # add some deleted users first, just in case there are none
857         self.add_deleted_users(4)
858         base = 'CN=Deleted Objects,%s' % self.base_dn
859         expression = "(cn=vlv-deleted*)"
860         for attr in attrs:
861             show_deleted_control = "show_deleted:1"
862             expected_order = self.get_expected_order_showing_deleted(attr,
863                                                                      expression=expression,
864                                                                      base=base,
865                                                                      scope=ldb.SCOPE_ONELEVEL)
866             print("searching for attr %s amongst %d deleted objects" %
867                   (attr, len(expected_order)))
868             sort_control = "server_sort:1:0:%s" % attr
869             step = max(len(expected_order) // 10, 1)
870             for before in [3, 0]:
871                 for after in [0, 2]:
872                     for offset in range(1 + before,
873                                         len(expected_order) - after,
874                                         step):
875                         res = self.ldb.search(base,
876                                               expression=expression,
877                                               scope=ldb.SCOPE_ONELEVEL,
878                                               attrs=[attr],
879                                               controls=[sort_control,
880                                                         show_deleted_control,
881                                                         "vlv:1:%d:%d:%d:0" %
882                                                         (before, after,
883                                                          offset)])
884                         results = [x[attr][0] for x in res]
885                         self.assertCorrectResults(results, expected_order,
886                                                   offset, before, after)
887
888     def test_server_vlv_with_cookie_show_deleted(self):
889         """What do we see with the show_deleted control?"""
890         attrs = ['objectGUID',
891                  'cn',
892                  'sAMAccountName',
893                  'objectSid',
894                  'name',
895                  'whenChanged',
896                  'usnChanged'
897                  ]
898         self.add_deleted_users(6)
899         random.seed(23)
900         for attr in attrs:
901             expected_order = self.get_expected_order(attr)
902             sort_control = "server_sort:1:0:%s" % attr
903             res = None
904             show_deleted_control = "show_deleted:1"
905             expected_order = self.get_expected_order_showing_deleted(attr)
906             n = len(expected_order)
907             expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
908             for before in [3, 2, 1, 0]:
909                 after = before
910                 for i in range(20):
911                     offset = random.randrange(max(1, before - 2),
912                                               min(n - after + 2, n))
913                     if res is None:
914                         vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
915                                                            offset)
916                     else:
917                         cookie = get_cookie(res.controls, n)
918                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
919                                       (before, after, offset, n,
920                                        cookie))
921
922                     res = self.ldb.search(self.base_dn,
923                                           expression=expression,
924                                           scope=ldb.SCOPE_SUBTREE,
925                                           attrs=[attr],
926                                           controls=[sort_control,
927                                                     vlv_search,
928                                                     show_deleted_control])
929
930                     results = [x[attr][0] for x in res]
931
932                     self.assertCorrectResults(results, expected_order,
933                                               offset, before, after)
934
935     def test_server_vlv_gte_with_cookie(self):
936         attrs = [x for x in self.users[0].keys() if x not in
937                  ('dn', 'objectclass')]
938         for attr in attrs:
939             gte_order, expected_order, gte_map = \
940                                         self.get_gte_tests_and_order(attr)
941             # In case there is some order dependency, disorder tests
942             gte_tests = gte_order[:]
943             random.seed(1)
944             random.shuffle(gte_tests)
945             res = None
946             sort_control = "server_sort:1:0:%s" % attr
947             for before in [0, 1, 2, 4]:
948                 for after in [0, 1, 3, 6]:
949                     for gte in gte_tests:
950                         if res is not None:
951                             cookie = get_cookie(res.controls, len(self.users))
952                         else:
953                             cookie = None
954                         vlv_search = encode_vlv_control(before=before,
955                                                         after=after,
956                                                         gte=get_bytes(gte),
957                                                         cookie=cookie)
958
959                         res = self.ldb.search(self.ou,
960                                               scope=ldb.SCOPE_ONELEVEL,
961                                               attrs=[attr],
962                                               controls=[sort_control,
963                                                         vlv_search])
964
965                         results = [x[attr][0] for x in res]
966                         offset = gte_map.get(gte, len(expected_order))
967
968                         # here offset is 0-based
969                         start = max(offset - before, 0)
970                         end = offset + 1 + after
971
972                         expected_results = expected_order[start: end]
973
974                         self.assertEquals(expected_results, results)
975
976     def test_server_vlv_gte_no_cookie(self):
977         attrs = [x for x in self.users[0].keys() if x not in
978                  ('dn', 'objectclass')]
979         iteration = 0
980         for attr in attrs:
981             gte_order, expected_order, gte_map = \
982                                         self.get_gte_tests_and_order(attr)
983             # In case there is some order dependency, disorder tests
984             gte_tests = gte_order[:]
985             random.seed(1)
986             random.shuffle(gte_tests)
987
988             sort_control = "server_sort:1:0:%s" % attr
989             for before in [0, 1, 3]:
990                 for after in [0, 4]:
991                     for gte in gte_tests:
992                         vlv_search = encode_vlv_control(before=before,
993                                                         after=after,
994                                                         gte=get_bytes(gte))
995
996                         res = self.ldb.search(self.ou,
997                                               scope=ldb.SCOPE_ONELEVEL,
998                                               attrs=[attr],
999                                               controls=[sort_control,
1000                                                         vlv_search])
1001                         results = [x[attr][0] for x in res]
1002
1003                         # here offset is 0-based
1004                         offset = gte_map.get(gte, len(expected_order))
1005                         start = max(offset - before, 0)
1006                         end = offset + after + 1
1007                         expected_results = expected_order[start: end]
1008                         iteration += 1
1009                         if expected_results != results:
1010                             middle = expected_order[len(expected_order) // 2]
1011                             print(expected_results, results)
1012                             print(middle)
1013                             print(expected_order)
1014                             print()
1015                             print("\nattr %s offset %d before %d "
1016                                   "after %d gte %s" %
1017                                   (attr, offset, before, after, gte))
1018                         self.assertEquals(expected_results, results)
1019
1020     def test_multiple_searches(self):
1021         """The maximum number of concurrent vlv searches per connection is
1022         currently set at 3. That means if you open 4 VLV searches the
1023         cookie on the first one should fail.
1024         """
1025         # Windows has a limit of 10 VLVs where there are low numbers
1026         # of objects in each search.
1027         attrs = ([x for x in self.users[0].keys() if x not in
1028                   ('dn', 'objectclass')] * 2)[:12]
1029
1030         vlv_cookies = []
1031         for attr in attrs:
1032             sort_control = "server_sort:1:0:%s" % attr
1033
1034             res = self.ldb.search(self.ou,
1035                                   scope=ldb.SCOPE_ONELEVEL,
1036                                   attrs=[attr],
1037                                   controls=[sort_control,
1038                                             "vlv:1:1:1:1:0"])
1039
1040             cookie = get_cookie(res.controls, len(self.users))
1041             vlv_cookies.append(cookie)
1042             time.sleep(0.2)
1043
1044         # now this one should fail
1045         self.assertRaises(ldb.LdbError,
1046                           self.ldb.search,
1047                           self.ou,
1048                           scope=ldb.SCOPE_ONELEVEL,
1049                           attrs=[attr],
1050                           controls=[sort_control,
1051                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1052
1053         # and this one should succeed
1054         res = self.ldb.search(self.ou,
1055                               scope=ldb.SCOPE_ONELEVEL,
1056                               attrs=[attr],
1057                               controls=[sort_control,
1058                                         "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1059
1060         # this one should fail because it is a new connection and
1061         # doesn't share cookies
1062         new_ldb = SamDB(host, credentials=creds,
1063                         session_info=system_session(lp), lp=lp)
1064
1065         self.assertRaises(ldb.LdbError,
1066                           new_ldb.search, self.ou,
1067                           scope=ldb.SCOPE_ONELEVEL,
1068                           attrs=[attr],
1069                           controls=[sort_control,
1070                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1071
1072         # but now without the critical flag it just does no VLV.
1073         new_ldb.search(self.ou,
1074                        scope=ldb.SCOPE_ONELEVEL,
1075                        attrs=[attr],
1076                        controls=[sort_control,
1077                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1078
1079     # Run a vlv search and return important fields of the response control
1080     def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
1081         sort_ctrl = "server_sort:1:0:%s" % attr
1082         ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
1083         if cookie:
1084             ctrl += ":" + cookie
1085
1086         res = self.ldb.search(self.ou,
1087                               expression=expr,
1088                               scope=ldb.SCOPE_ONELEVEL,
1089                               attrs=[attr],
1090                               controls=[ctrl, sort_ctrl])
1091         results = [str(x[attr][0]) for x in res]
1092
1093         ctrls = [str(c) for c in res.controls if
1094                  str(c).startswith('vlv')]
1095         self.assertEqual(len(ctrls), 1)
1096
1097         spl = ctrls[0].rsplit(':')
1098         cookie = ""
1099         if len(spl) == 6:
1100             cookie = spl[-1]
1101
1102         return results, cookie
1103
1104     def test_vlv_modify_during_view(self):
1105         attr = 'roomNumber'
1106         expr = "(objectclass=user)"
1107
1108         # Start new search
1109         full_results, cookie = self.vlv_search(attr, expr,
1110                                                after_count=len(self.users))
1111
1112         # Edit a user
1113         edit_index = len(self.users)//2
1114         edit_attr = full_results[edit_index]
1115         users_with_attr = [u for u in self.users if u[attr] == edit_attr]
1116         self.assertEqual(len(users_with_attr), 1)
1117         edit_user = users_with_attr[0]
1118
1119         # Put z at the front of the val so it comes last in ordering
1120         edit_val = "z_" + edit_user[attr]
1121
1122         m = ldb.Message()
1123         m.dn = ldb.Dn(self.ldb, edit_user['dn'])
1124         m[attr] = ldb.MessageElement(edit_val, ldb.FLAG_MOD_REPLACE, attr)
1125         self.ldb.modify(m)
1126
1127         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1128                                           after_count=len(self.users))
1129
1130         # Make expected_results by copying and editing full_results
1131         expected_results = full_results[:]
1132         expected_results[edit_index] = edit_val
1133         self.assertEqual(results, expected_results)
1134
1135     # Test changing the search expression in a request on an initialised view
1136     # Expected failure on samba, passes on windows
1137     def test_vlv_change_search_expr(self):
1138         attr = 'roomNumber'
1139         expr = "(objectclass=user)"
1140
1141         # Start new search
1142         full_results, cookie = self.vlv_search(attr, expr,
1143                                                after_count=len(self.users))
1144
1145         middle_index = len(full_results)//2
1146         # Search that excludes the old value but includes the new one
1147         expr = "%s>=%s" % (attr, full_results[middle_index])
1148         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1149                                           after_count=len(self.users))
1150         self.assertEqual(results, full_results[middle_index:])
1151
1152     # Check you can't add a value to a vlv view
1153     def test_vlv_add_during_view(self):
1154         attr = 'roomNumber'
1155         expr = "(objectclass=user)"
1156
1157         # Start new search
1158         full_results, cookie = self.vlv_search(attr, expr,
1159                                                after_count=len(self.users))
1160
1161         # Add a user at the end of the sort order
1162         add_val = "z_addedval"
1163         user = {'cn': add_val, "objectclass": "user", attr: add_val}
1164         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1165         self.ldb.add(user)
1166
1167         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1168                                           after_count=len(self.users)+1)
1169         self.assertEqual(results, full_results)
1170
1171     def test_vlv_delete_during_view(self):
1172         attr = 'roomNumber'
1173         expr = "(objectclass=user)"
1174
1175         # Start new search
1176         full_results, cookie = self.vlv_search(attr, expr,
1177                                                after_count=len(self.users))
1178
1179         # Delete one of the users
1180         del_index = len(self.users)//2
1181         del_user = self.users[del_index]
1182         self.ldb.delete(del_user['dn'])
1183
1184         results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1185                                           after_count=len(self.users))
1186         expected_results = [r for r in full_results if r != del_user[attr]]
1187         self.assertEqual(results, expected_results)
1188
1189
1190 class PagedResultsTests(TestsWithUserOU):
1191
1192     def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
1193                      attrs=None, ou=None, subtree=False):
1194         ou = ou or self.ou
1195         if cookie:
1196             cookie = ":" + cookie
1197         ctrl = "paged_results:1:" + str(page_size) + cookie
1198         controls = [ctrl]
1199
1200         # If extra controls are provided then add them, else default to
1201         # sort control on 'cn' attribute
1202         if extra_ctrls is not None:
1203             controls += extra_ctrls
1204         else:
1205             sort_ctrl = "server_sort:1:0:cn"
1206             controls.append(sort_ctrl)
1207
1208         kwargs = {}
1209         if attrs is not None:
1210             kwargs = {"attrs": attrs}
1211
1212         scope = ldb.SCOPE_ONELEVEL
1213         if subtree:
1214             scope = ldb.SCOPE_SUBTREE
1215
1216         res = self.ldb.search(ou,
1217                               expression=expr,
1218                               scope=scope,
1219                               controls=controls,
1220                               **kwargs)
1221         results = [str(r['cn'][0]) for r in res]
1222
1223         ctrls = [str(c) for c in res.controls if
1224                  str(c).startswith("paged_results")]
1225         assert len(ctrls) == 1, "no paged_results response"
1226
1227         spl = ctrls[0].rsplit(':', 3)
1228         cookie = ""
1229         if len(spl) == 3:
1230             cookie = spl[-1]
1231         return results, cookie
1232
1233     def test_paged_delete_during_search(self):
1234         expr = "(objectClass=*)"
1235
1236         # Start new search
1237         first_page_size = 3
1238         results, cookie = self.paged_search(expr, page_size=first_page_size)
1239
1240         # Run normal search to get expected results
1241         unedited_results, _ = self.paged_search(expr,
1242                                                 page_size=len(self.users))
1243
1244         # Get remaining users not returned by the search above
1245         unreturned_users = [u for u in self.users if u['cn'] not in results]
1246
1247         # Delete one of the users
1248         del_index = len(self.users)//2
1249         del_user = unreturned_users[del_index]
1250         self.ldb.delete(del_user['dn'])
1251
1252         # Run test
1253         results, _ = self.paged_search(expr, cookie=cookie,
1254                                        page_size=len(self.users))
1255         expected_results = [r for r in unedited_results[first_page_size:]
1256                             if r != del_user['cn']]
1257         self.assertEqual(results, expected_results)
1258
1259     def test_paged_show_deleted(self):
1260         unique = time.strftime("%s", time.gmtime())[-5:]
1261         prefix = "show_deleted_test_%s_" % (unique)
1262         expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
1263         del_ctrl = "show_deleted:1"
1264
1265         num_users = 10
1266         users = []
1267         for i in range(num_users):
1268             user = self.create_user(i, num_users, prefix=prefix)
1269             users.append(user)
1270
1271         first_user = users[0]
1272         self.ldb.delete(first_user['dn'])
1273
1274         # Start new search
1275         first_page_size = 3
1276         results, cookie = self.paged_search(expr, page_size=first_page_size,
1277                                             extra_ctrls=[del_ctrl],
1278                                             ou=self.base_dn,
1279                                             subtree=True)
1280
1281         # Get remaining users not returned by the search above
1282         unreturned_users = [u for u in users if u['cn'] not in results]
1283
1284         # Delete one of the users
1285         del_index = len(users)//2
1286         del_user = unreturned_users[del_index]
1287         self.ldb.delete(del_user['dn'])
1288
1289         results2, _ = self.paged_search(expr, cookie=cookie,
1290                                         page_size=len(users)*2,
1291                                         extra_ctrls=[del_ctrl],
1292                                         ou=self.base_dn,
1293                                         subtree=True)
1294
1295         user_cns = {str(u['cn']) for u in users}
1296         deleted_cns = {first_user['cn'], del_user['cn']}
1297
1298         all_results = results + results2
1299         normal_results = {r for r in all_results if "DEL:" not in r}
1300         self.assertEqual(normal_results, user_cns - deleted_cns)
1301
1302         # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
1303         deleted_results = {r[:r.index('\n')] for r in all_results
1304                            if "DEL:" in r}
1305         self.assertEqual(deleted_results, deleted_cns)
1306
1307     def test_paged_add_during_search(self):
1308         expr = "(objectClass=*)"
1309
1310         # Start new search
1311         first_page_size = 3
1312         results, cookie = self.paged_search(expr, page_size=first_page_size)
1313
1314         unedited_results, _ = self.paged_search(expr,
1315                                                 page_size=len(self.users)+1)
1316
1317         # Get remaining users not returned by the search above
1318         unwalked_users = [cn for cn in unedited_results if cn not in results]
1319
1320         # Add a user in the middle of the sort order
1321         middle_index = len(unwalked_users)//2
1322         middle_user = unwalked_users[middle_index]
1323
1324         user = {'cn': middle_user + '_2', "objectclass": "user"}
1325         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1326         self.ldb.add(user)
1327
1328         results, _ = self.paged_search(expr, cookie=cookie,
1329                                        page_size=len(self.users)+1)
1330         expected_results = unwalked_users[:]
1331
1332         # Uncomment this line to assert that adding worked.
1333         # expected_results.insert(middle_index+1, user['cn'])
1334
1335         self.assertEqual(results, expected_results)
1336
1337     def test_paged_modify_during_search(self):
1338         expr = "(objectClass=*)"
1339
1340         # Start new search
1341         first_page_size = 3
1342         results, cookie = self.paged_search(expr, page_size=first_page_size)
1343
1344         unedited_results, _ = self.paged_search(expr,
1345                                                 page_size=len(self.users)+1)
1346
1347         # Modify user in the middle of the remaining sort order
1348         unwalked_users = [cn for cn in unedited_results if cn not in results]
1349         middle_index = len(unwalked_users)//2
1350         middle_cn = unwalked_users[middle_index]
1351
1352         # Find user object
1353         users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1354         self.assertEqual(len(users_with_middle_cn), 1)
1355         middle_user = users_with_middle_cn[0]
1356
1357         # Rename object
1358         edit_cn = "z_" + middle_cn
1359         new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
1360         self.ldb.rename(middle_user['dn'], new_dn)
1361
1362         results, _ = self.paged_search(expr, cookie=cookie,
1363                                        page_size=len(self.users)+1)
1364         expected_results = unwalked_users[:]
1365         expected_results[middle_index] = edit_cn
1366         self.assertEqual(results, expected_results)
1367
1368     def test_paged_modify_object_scope(self):
1369         expr = "(objectClass=*)"
1370
1371         ou2 = "OU=vlvtestou2,%s" % (self.base_dn)
1372         try:
1373             self.ldb.delete(ou2, ['tree_delete:1'])
1374         except ldb.LdbError:
1375             pass
1376         self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
1377
1378         # Do a separate, full search to get all results
1379         unedited_results, _ = self.paged_search(expr,
1380                                                 page_size=len(self.users)+1)
1381
1382         # Rename before starting a search
1383         first_cn = self.users[0]['cn']
1384         new_dn = "CN=%s,%s" % (first_cn, ou2)
1385         self.ldb.rename(self.users[0]['dn'], new_dn)
1386
1387         # Start new search under the original OU
1388         first_page_size = 3
1389         results, cookie = self.paged_search(expr, page_size=first_page_size)
1390         self.assertEqual(results, unedited_results[1:1+first_page_size])
1391
1392         # Get one of the users that is yet to be returned
1393         unwalked_users = [cn for cn in unedited_results if cn not in results]
1394         middle_index = len(unwalked_users)//2
1395         middle_cn = unwalked_users[middle_index]
1396
1397         # Find user object
1398         users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1399         self.assertEqual(len(users_with_middle_cn), 1)
1400         middle_user = users_with_middle_cn[0]
1401
1402         # Rename
1403         new_dn = "CN=%s,%s" % (middle_cn, ou2)
1404         self.ldb.rename(middle_user['dn'], new_dn)
1405
1406         results, _ = self.paged_search(expr, cookie=cookie,
1407                                        page_size=len(self.users)+1)
1408
1409         expected_results = unwalked_users[:]
1410
1411         # We should really expect that the object renamed into a different
1412         # OU should vanish from the results, but turns out Windows does return
1413         # the object in this case.  Our module matches the Windows behaviour.
1414
1415         # If behaviour changes, this line inverts the test's expectations to
1416         # what you might expect.
1417         # del expected_results[middle_index]
1418
1419         # But still expect the user we removed before the search to be gone
1420         del expected_results[0]
1421
1422         self.assertEqual(results, expected_results)
1423
1424     def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
1425                                 extra_ctrls=None):
1426         try:
1427             results, _ = self.paged_search(expr, cookie=cookie,
1428                                            page_size=2,
1429                                            extra_ctrls=extra_ctrls,
1430                                            attrs=attrs)
1431         except ldb.LdbError as e:
1432             self.assertEqual(e.args[0], err_num)
1433             return
1434
1435         self.fail("No error raised by invalid search")
1436
1437     def test_paged_changed_expr(self):
1438         # Initiate search then use a different expr in subsequent req
1439         expr = "(objectClass=*)"
1440         results, cookie = self.paged_search(expr, page_size=3)
1441         expr = "cn>=a"
1442         expected_error_num = 12
1443         self.assertPagedSearchRaises(expected_error_num, expr, cookie)
1444
1445     def test_paged_changed_controls(self):
1446         expr = "(objectClass=*)"
1447         sort_ctrl = "server_sort:1:0:cn"
1448         del_ctrl = "show_deleted:1"
1449         expected_error_num = 12
1450         ps = 3
1451
1452         # Initiate search with a sort control then remove in subsequent req
1453         results, cookie = self.paged_search(expr, page_size=ps,
1454                                             extra_ctrls=[sort_ctrl])
1455         self.assertPagedSearchRaises(expected_error_num, expr,
1456                                      cookie, extra_ctrls=[])
1457
1458         # Initiate search with no sort control then add one in subsequent req
1459         results, cookie = self.paged_search(expr, page_size=ps,
1460                                             extra_ctrls=[])
1461         self.assertPagedSearchRaises(expected_error_num, expr,
1462                                      cookie, extra_ctrls=[sort_ctrl])
1463
1464         # Initiate search with show-deleted control then
1465         # remove it in subsequent req
1466         results, cookie = self.paged_search(expr, page_size=ps,
1467                                             extra_ctrls=[del_ctrl])
1468         self.assertPagedSearchRaises(expected_error_num, expr,
1469                                      cookie, extra_ctrls=[])
1470
1471         # Initiate normal search then add show-deleted control
1472         # in subsequent req
1473         results, cookie = self.paged_search(expr, page_size=ps,
1474                                             extra_ctrls=[])
1475         self.assertPagedSearchRaises(expected_error_num, expr,
1476                                      cookie, extra_ctrls=[del_ctrl])
1477
1478         # Changing order of controls shouldn't break the search
1479         results, cookie = self.paged_search(expr, page_size=ps,
1480                                             extra_ctrls=[del_ctrl, sort_ctrl])
1481         try:
1482             results, cookie = self.paged_search(expr, page_size=ps,
1483                                                 extra_ctrls=[sort_ctrl,
1484                                                              del_ctrl])
1485         except ldb.LdbError as e:
1486             self.fail(e)
1487
1488     def test_paged_cant_change_controls_data(self):
1489         # Some defaults for the rest of the tests
1490         expr = "(objectClass=*)"
1491         sort_ctrl = "server_sort:1:0:cn"
1492         expected_error_num = 12
1493
1494         # Initiate search with sort control then change it in subsequent req
1495         results, cookie = self.paged_search(expr, page_size=3,
1496                                             extra_ctrls=[sort_ctrl])
1497         changed_sort_ctrl = "server_sort:1:0:roomNumber"
1498         self.assertPagedSearchRaises(expected_error_num, expr,
1499                                      cookie, extra_ctrls=[changed_sort_ctrl])
1500
1501         # Initiate search with a control with crit=1, then use crit=0
1502         results, cookie = self.paged_search(expr, page_size=3,
1503                                             extra_ctrls=[sort_ctrl])
1504         changed_sort_ctrl = "server_sort:0:0:cn"
1505         self.assertPagedSearchRaises(expected_error_num, expr,
1506                                      cookie, extra_ctrls=[changed_sort_ctrl])
1507
1508     def test_paged_search_referrals(self):
1509         expr = "(objectClass=*)"
1510         paged_ctrl = "paged_results:1:5"
1511         res = self.ldb.search(self.base_dn,
1512                               expression=expr,
1513                               attrs=['cn'],
1514                               scope=ldb.SCOPE_SUBTREE,
1515                               controls=[paged_ctrl])
1516
1517         # Do a paged search walk over the whole database and save a list
1518         # of all the referrals returned by each search.
1519         referral_lists = []
1520
1521         while True:
1522             referral_lists.append(res.referals)
1523
1524             ctrls = [str(c) for c in res.controls if
1525                      str(c).startswith("paged_results")]
1526             self.assertEqual(len(ctrls), 1)
1527             spl = ctrls[0].rsplit(':')
1528             if len(spl) != 3:
1529                 break
1530
1531             cookie = spl[-1]
1532             res = self.ldb.search(self.base_dn,
1533                                   expression=expr,
1534                                   attrs=['cn'],
1535                                   scope=ldb.SCOPE_SUBTREE,
1536                                   controls=[paged_ctrl + ":" + cookie])
1537
1538         ref_list = referral_lists[0]
1539
1540         # Sanity check to make sure the search actually did something
1541         self.assertGreater(len(referral_lists), 2)
1542
1543         # Check the first referral set contains stuff
1544         self.assertGreater(len(ref_list), 0)
1545
1546         # Check the others don't
1547         self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
1548
1549         # Check the entries in the first referral list look like referrals
1550         self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
1551
1552     def test_paged_change_attrs(self):
1553         expr = "(objectClass=*)"
1554         attrs = ['cn']
1555         expected_error_num = 12
1556
1557         results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
1558         results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
1559                                             attrs=attrs)
1560
1561         changed_attrs = attrs + ['roomNumber']
1562         self.assertPagedSearchRaises(expected_error_num, expr,
1563                                      cookie, attrs=changed_attrs,
1564                                      extra_ctrls=[])
1565
1566     def test_paged_search_lockstep(self):
1567         expr = "(objectClass=*)"
1568         ps = 3
1569
1570         all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
1571
1572         # Run two different but overlapping paged searches simultaneously.
1573         set_1_index = int((len(all_results))//3)
1574         set_2_index = int((2*len(all_results))//3)
1575         set_1 = all_results[set_1_index:]
1576         set_2 = all_results[:set_2_index+1]
1577         set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
1578         set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
1579
1580         results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
1581         self.assertEqual(results, set_1[:ps])
1582         results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
1583         self.assertEqual(results, set_2[:ps])
1584
1585         results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
1586                                              page_size=ps)
1587         self.assertEqual(results, set_1[ps:ps*2])
1588         results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
1589                                              page_size=ps)
1590         self.assertEqual(results, set_2[ps:ps*2])
1591
1592         results, _ = self.paged_search(set_1_expr, cookie=cookie1,
1593                                        page_size=len(self.users))
1594         self.assertEqual(results, set_1[ps*2:])
1595         results, _ = self.paged_search(set_2_expr, cookie=cookie2,
1596                                        page_size=len(self.users))
1597         self.assertEqual(results, set_2[ps*2:])
1598
1599
1600 if "://" not in host:
1601     if os.path.isfile(host):
1602         host = "tdb://%s" % host
1603     else:
1604         host = "ldap://%s" % host
1605
1606
1607 TestProgram(module=__name__, opts=subunitopts)