9fe453d7b804373ff0428c859e1da204a38480f5
[samba.git] / source4 / dsdb / tests / python / vlv.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 import optparse
5 import sys
6 import os
7 import base64
8 import random
9 import re
10
11 sys.path.insert(0, "bin/python")
12 import samba
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
14
15 import samba.getopt as options
16
17 from samba.auth import system_session
18 import ldb
19 from samba.samdb import SamDB
20
21 import time
22
23 parser = optparse.OptionParser("vlv.py [options] <host>")
24 sambaopts = options.SambaOptions(parser)
25 parser.add_option_group(sambaopts)
26 parser.add_option_group(options.VersionOptions(parser))
27 # use command line creds if available
28 credopts = options.CredentialsOptions(parser)
29 parser.add_option_group(credopts)
30 subunitopts = SubunitOptions(parser)
31 parser.add_option_group(subunitopts)
32
33 parser.add_option('--elements', type='int', default=20,
34                   help="use this many elements in the tests")
35
36 parser.add_option('--delete-in-setup', action='store_true',
37                   help="cleanup in next setup rather than teardown")
38
39 parser.add_option('--skip-attr-regex',
40                   help="ignore attributes matching this regex")
41
42 opts, args = parser.parse_args()
43
44 if len(args) < 1:
45     parser.print_usage()
46     sys.exit(1)
47
48 host = args[0]
49
50 lp = sambaopts.get_loadparm()
51 creds = credopts.get_credentials(lp)
52
53 N_ELEMENTS = opts.elements
54
55
56 class VlvTestException(Exception):
57     pass
58
59
60 def encode_vlv_control(critical=1,
61                        before=0, after=0,
62                        offset=None,
63                        gte=None,
64                        n=0, cookie=None):
65
66     s = "vlv:%d:%d:%d:" % (critical, before, after)
67
68     if offset is not None:
69         m = "%d:%d" % (offset, n)
70     elif ':' in gte or '\x00' in gte:
71         gte = base64.b64encode(gte)
72         m = "base64>=%s" % gte
73     else:
74         m = ">=%s" % gte
75
76     if cookie is None:
77         return s + m
78
79     return s + m + ':' + cookie
80
81
82 def get_cookie(controls, expected_n=None):
83     """Get the cookie, STILL base64 encoded, or raise ValueError."""
84     for c in list(controls):
85         cstr = str(c)
86         if cstr.startswith('vlv_resp'):
87             head, n, _, cookie = cstr.rsplit(':', 3)
88             if expected_n is not None and int(n) != expected_n:
89                 raise ValueError("Expected %s items, server said %s" %
90                                  (expected_n, n))
91             return cookie
92     raise ValueError("there is no VLV response")
93
94
95 class VLVTests(samba.tests.TestCase):
96
97     def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
98         name = "%s%d%s" % (prefix, i, suffix)
99         user = {
100             'cn': name,
101             "objectclass": "user",
102             'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
103             "roomNumber": "%sbc" % (n - i),
104             "carLicense": "后来经",
105             "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
106             "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
107             "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
108             "flags": str(i * (n - i)),
109             "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
110                                             ' 3z}'[i & 3],
111                                             '"@'[i & 1],),
112         }
113
114         # _user_broken_attrs tests are broken due to problems outside
115         # of VLV.
116         _user_broken_attrs = {
117             # Sort doesn't look past a NUL byte.
118             "photo": "\x00%d" % (n - i),
119             "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
120                                                              chr(i & 255), i),
121             "displayNamePrintable": "%d\x00%c" % (i, i & 255),
122             "adminDisplayName": "%d\x00b" % (n-i),
123             "title": "%d%sb" % (n - i, '\x00' * i),
124             "comment": "Favourite colour is %d" % (n % (i + 1)),
125
126             # Names that vary only in case. Windows returns
127             # equivalent addresses in the order they were put
128             # in ('a st', 'A st',...).
129             "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
130         }
131
132         if attrs is not None:
133             user.update(attrs)
134
135         user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
136
137         if opts.skip_attr_regex:
138             match = re.compile(opts.skip_attr_regex).search
139             for k in user.keys():
140                 if match(k):
141                     del user[k]
142
143         self.users.append(user)
144         self.ldb.add(user)
145         return user
146
147     def setUp(self):
148         super(VLVTests, self).setUp()
149         self.ldb = SamDB(host, credentials=creds,
150                          session_info=system_session(lp), lp=lp)
151
152         self.base_dn = self.ldb.domain_dn()
153         self.ou = "ou=vlv,%s" % self.base_dn
154         if opts.delete_in_setup:
155             try:
156                 self.ldb.delete(self.ou, ['tree_delete:1'])
157             except ldb.LdbError, e:
158                 print "tried deleting %s, got error %s" % (self.ou, e)
159         self.ldb.add({
160             "dn": self.ou,
161             "objectclass": "organizationalUnit"})
162
163         self.users = []
164         for i in range(N_ELEMENTS):
165             self.create_user(i, N_ELEMENTS)
166
167         attrs = self.users[0].keys()
168         self.binary_sorted_keys = ['audio',
169                                    'photo',
170                                    "msTSExpireDate4",
171                                    'serialNumber',
172                                    "displayNamePrintable"]
173
174         self.numeric_sorted_keys = ['flags',
175                                     'accountExpires']
176
177         self.timestamp_keys = ['msTSExpireDate4']
178
179         self.int64_keys = set(['accountExpires'])
180
181         self.locale_sorted_keys = [x for x in attrs if
182                                    x not in (self.binary_sorted_keys +
183                                              self.numeric_sorted_keys)]
184
185         # don't try spaces, etc in cn
186         self.delicate_keys = ['cn']
187
188     def tearDown(self):
189         super(VLVTests, self).tearDown()
190         if not opts.delete_in_setup:
191             self.ldb.delete(self.ou, ['tree_delete:1'])
192
193     def get_full_list(self, attr, include_cn=False):
194         """Fetch the whole list sorted on the attribute, using the VLV.
195         This way you get a VLV cookie."""
196         n_users = len(self.users)
197         sort_control = "server_sort:1:0:%s" % attr
198         half_n = n_users // 2
199         vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
200         attrs = [attr]
201         if include_cn:
202             attrs.append('cn')
203         res = self.ldb.search(self.ou,
204                               scope=ldb.SCOPE_ONELEVEL,
205                               attrs=attrs,
206                               controls=[sort_control,
207                                         vlv_search])
208         if include_cn:
209             full_results = [(x[attr][0], x['cn'][0]) for x in res]
210         else:
211             full_results = [x[attr][0].lower() for x in res]
212         controls = res.controls
213         return full_results, controls, sort_control
214
215     def get_expected_order(self, attr, expression=None):
216         """Fetch the whole list sorted on the attribute, using sort only."""
217         sort_control = "server_sort:1:0:%s" % attr
218         res = self.ldb.search(self.ou,
219                               scope=ldb.SCOPE_ONELEVEL,
220                               expression=expression,
221                               attrs=[attr],
222                               controls=[sort_control])
223         results = [x[attr][0] for x in res]
224         return results
225
226     def delete_user(self, user):
227         self.ldb.delete(user['dn'])
228         del self.users[self.users.index(user)]
229
230     def get_gte_tests_and_order(self, attr, expression=None):
231         expected_order = self.get_expected_order(attr, expression=expression)
232         gte_users = []
233         if attr in self.delicate_keys:
234             gte_keys = [
235                 '3',
236                 'abc',
237                 '¹',
238                 'ŋđ¼³ŧ“«đð',
239                 '桑巴',
240             ]
241         elif attr in self.timestamp_keys:
242             gte_keys = [
243                 '18560101010000.0Z',
244                 '19140103010000.0Z',
245                 '19560101010010.0Z',
246                 '19700101000000.0Z',
247                 '19991231211234.3Z',
248                 '20061111211234.0Z',
249                 '20390901041234.0Z',
250                 '25560101010000.0Z',
251             ]
252         elif attr not in self.numeric_sorted_keys:
253             gte_keys = [
254                 '3',
255                 'abc',
256                 ' ',
257                 '!@#!@#!',
258                 'kōkako',
259                 '¹',
260                 'ŋđ¼³ŧ“«đð',
261                 '\n\t\t',
262                 '桑巴',
263                 'zzzz',
264             ]
265             if expected_order:
266                 gte_keys.append(expected_order[len(expected_order) // 2] + ' tail')
267
268         else:
269             # "numeric" means positive integers
270             # doesn't work with -1, 3.14, ' 3', '9' * 20
271             gte_keys = ['3',
272                         '1' * 10,
273                         '1',
274                         '9' * 7,
275                         '0']
276
277             if attr in self.int64_keys:
278                 gte_keys += ['3' * 12, '71' * 8]
279
280         for i, x in enumerate(gte_keys):
281             user = self.create_user(i, N_ELEMENTS,
282                                     prefix='gte',
283                                     attrs={attr: x})
284             gte_users.append(user)
285
286         gte_order = self.get_expected_order(attr)
287         for user in gte_users:
288             self.delete_user(user)
289
290         # for sanity's sake
291         expected_order_2 = self.get_expected_order(attr, expression=expression)
292         self.assertEqual(expected_order, expected_order_2)
293
294         # Map gte tests to indexes in expected order. This will break
295         # if gte_order and expected_order are differently ordered (as
296         # it should).
297         gte_map = {}
298
299         # index to the first one with each value
300         index_map = {}
301         for i, k in enumerate(expected_order):
302             if k not in index_map:
303                 index_map[k] = i
304
305         keys = []
306         for k in gte_order:
307             if k in index_map:
308                 i = index_map[k]
309                 gte_map[k] = i
310                 for k in keys:
311                     gte_map[k] = i
312                 keys = []
313             else:
314                 keys.append(k)
315
316         for k in keys:
317             gte_map[k] = len(expected_order)
318
319         if False:
320             print "gte_map:"
321             for k in gte_order:
322                 print "   %10s => %10s" % (k, gte_map[k])
323
324         return gte_order, expected_order, gte_map
325
326     def assertCorrectResults(self, results, expected_order,
327                              offset, before, after):
328         """A helper to calculate offsets correctly and say as much as possible
329         when something goes wrong."""
330
331         start = max(offset - before - 1, 0)
332         end = offset + after
333         expected_results = expected_order[start: end]
334
335         # if it is a tuple with the cn, drop the cn
336         if expected_results and isinstance(expected_results[0], tuple):
337             expected_results = [x[0] for x in expected_results]
338
339         if expected_results == results:
340             return
341
342         if expected_order is not None:
343             print "expected order: %s" % expected_order[:20]
344             if len(expected_order) > 20:
345                 print "... and %d more not shown" % (len(expected_order) - 20)
346
347         print "offset %d before %d after %d" % (offset, before, after)
348         print "start %d end %d" % (start, end)
349         print "expected: %s" % expected_results
350         print "got     : %s" % results
351         self.assertEquals(expected_results, results)
352
353     def test_server_vlv_with_cookie(self):
354         attrs = [x for x in self.users[0].keys() if x not in
355                  ('dn', 'objectclass')]
356         for attr in attrs:
357             expected_order = self.get_expected_order(attr)
358             sort_control = "server_sort:1:0:%s" % attr
359             res = None
360             n = len(self.users)
361             for before in [10, 0, 3, 1, 4, 5, 2]:
362                 for after in [0, 3, 1, 4, 5, 2, 7]:
363                     for offset in range(max(1, before - 2),
364                                         min(n - after + 2, n)):
365                         if res is None:
366                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
367                                                                offset)
368                         else:
369                             cookie = get_cookie(res.controls, n)
370                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
371                                           (before, after, offset, n,
372                                            cookie))
373
374                         res = self.ldb.search(self.ou,
375                                               scope=ldb.SCOPE_ONELEVEL,
376                                               attrs=[attr],
377                                               controls=[sort_control,
378                                                         vlv_search])
379
380                         results = [x[attr][0] for x in res]
381
382                         self.assertCorrectResults(results, expected_order,
383                                                   offset, before, after)
384
385     def run_index_tests_with_expressions(self, expressions):
386         # Here we don't test every before/after combination.
387         attrs = [x for x in self.users[0].keys() if x not in
388                  ('dn', 'objectclass')]
389         for attr in attrs:
390             for expression in expressions:
391                 expected_order = self.get_expected_order(attr, expression)
392                 sort_control = "server_sort:1:0:%s" % attr
393                 res = None
394                 n = len(expected_order)
395                 for before in range(0, 11):
396                     after = before
397                     for offset in range(max(1, before - 2),
398                                         min(n - after + 2, n)):
399                         if res is None:
400                             vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
401                                                                offset)
402                         else:
403                             cookie = get_cookie(res.controls)
404                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
405                                           (before, after, offset, n,
406                                            cookie))
407
408                         res = self.ldb.search(self.ou,
409                                               expression=expression,
410                                               scope=ldb.SCOPE_ONELEVEL,
411                                               attrs=[attr],
412                                               controls=[sort_control,
413                                                         vlv_search])
414
415                         results = [x[attr][0] for x in res]
416
417                         self.assertCorrectResults(results, expected_order,
418                                                   offset, before, after)
419
420     def test_server_vlv_with_expression(self):
421         """What happens when we run the VLV with an expression?"""
422         expressions = ["(objectClass=*)",
423                        "(cn=%s)" % self.users[-1]['cn'],
424                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
425                        ]
426         self.run_index_tests_with_expressions(expressions)
427
428     def test_server_vlv_with_failing_expression(self):
429         """What happens when we run the VLV on an expression that matches
430         nothing?"""
431         expressions = ["(samaccountname=testferf)",
432                        "(cn=hefalump)",
433                        ]
434         self.run_index_tests_with_expressions(expressions)
435
436     def run_gte_tests_with_expressions(self, expressions):
437         # Here we don't test every before/after combination.
438         attrs = [x for x in self.users[0].keys() if x not in
439                  ('dn', 'objectclass')]
440         for expression in expressions:
441             for attr in attrs:
442                 gte_order, expected_order, gte_map = \
443                     self.get_gte_tests_and_order(attr, expression)
444                 # In case there is some order dependency, disorder tests
445                 gte_tests = gte_order[:]
446                 random.seed(2)
447                 random.shuffle(gte_tests)
448                 res = None
449                 sort_control = "server_sort:1:0:%s" % attr
450
451                 expected_order = self.get_expected_order(attr, expression)
452                 sort_control = "server_sort:1:0:%s" % attr
453                 res = None
454                 for before in range(0, 11):
455                     after = before
456                     for gte in gte_tests:
457                         if res is not None:
458                             cookie = get_cookie(res.controls)
459                         else:
460                             cookie = None
461                         vlv_search = encode_vlv_control(before=before,
462                                                         after=after,
463                                                         gte=gte,
464                                                         cookie=cookie)
465
466                         res = self.ldb.search(self.ou,
467                                               scope=ldb.SCOPE_ONELEVEL,
468                                               expression=expression,
469                                               attrs=[attr],
470                                               controls=[sort_control,
471                                                         vlv_search])
472
473                         results = [x[attr][0] for x in res]
474                         offset = gte_map.get(gte, len(expected_order))
475
476                         # here offset is 0-based
477                         start = max(offset - before, 0)
478                         end = offset + 1 + after
479
480                         expected_results = expected_order[start: end]
481
482                         self.assertEquals(expected_results, results)
483
484     def test_vlv_gte_with_expression(self):
485         """What happens when we run the VLV with an expression?"""
486         expressions = ["(objectClass=*)",
487                        "(cn=%s)" % self.users[-1]['cn'],
488                        "(roomNumber=%s)" % self.users[0]['roomNumber'],
489                        ]
490         self.run_gte_tests_with_expressions(expressions)
491
492     def test_vlv_gte_with_failing_expression(self):
493         """What happens when we run the VLV on an expression that matches
494         nothing?"""
495         expressions = ["(samaccountname=testferf)",
496                        "(cn=hefalump)",
497                        ]
498         self.run_gte_tests_with_expressions(expressions)
499
500     def test_server_vlv_with_cookie_while_adding_and_deleting(self):
501         """What happens if we add or remove items in the middle of the VLV?
502
503         Nothing. The search and the sort is not repeated, and we only
504         deal with the objects originally found.
505         """
506         attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
507                  ('dn', 'objectclass')]
508         user_number = 0
509         iteration = 0
510         for attr in attrs:
511             full_results, controls, sort_control = \
512                             self.get_full_list(attr, True)
513             original_n = len(self.users)
514
515             expected_order = full_results
516             random.seed(1)
517
518             for before in range(0, 3) + [6, 11, 19]:
519                 for after in range(0, 3) + [6, 11, 19]:
520                     start = max(before - 1, 1)
521                     end = max(start + 4, original_n - after + 2)
522                     for offset in range(start, end):
523                         #if iteration > 2076:
524                         #    return
525                         cookie = get_cookie(controls, original_n)
526                         vlv_search = encode_vlv_control(before=before,
527                                                         after=after,
528                                                         offset=offset,
529                                                         n=original_n,
530                                                         cookie=cookie)
531
532                         iteration += 1
533                         res = self.ldb.search(self.ou,
534                                               scope=ldb.SCOPE_ONELEVEL,
535                                               attrs=[attr],
536                                               controls=[sort_control,
537                                                         vlv_search])
538
539                         controls = res.controls
540                         results = [x[attr][0] for x in res]
541                         real_offset = max(1, min(offset, len(expected_order)))
542
543                         expected_results = []
544                         skipped = 0
545                         begin_offset = max(real_offset - before - 1, 0)
546                         real_before = min(before, real_offset - 1)
547                         real_after = min(after,
548                                          len(expected_order) - real_offset)
549
550                         for x in expected_order[begin_offset:]:
551                             if x is not None:
552                                 expected_results.append(x[0])
553                                 if (len(expected_results) ==
554                                     real_before + real_after + 1):
555                                     break
556                             else:
557                                 skipped += 1
558
559                         if expected_results != results:
560                             print ("attr %s before %d after %d offset %d" %
561                                    (attr, before, after, offset))
562                         self.assertEquals(expected_results, results)
563
564                         n = len(self.users)
565                         if random.random() < 0.1 + (n < 5) * 0.05:
566                             if n == 0:
567                                 i = 0
568                             else:
569                                 i = random.randrange(n)
570                             user = self.create_user(i, n, suffix='-%s' %
571                                                     user_number)
572                             user_number += 1
573                         if random.random() < 0.1  + (n > 50) * 0.02 and n:
574                             index = random.randrange(n)
575                             user = self.users.pop(index)
576
577                             self.ldb.delete(user['dn'])
578
579                             replaced = (user[attr], user['cn'])
580                             if replaced in expected_order:
581                                 i = expected_order.index(replaced)
582                                 expected_order[i] = None
583
584     def test_server_vlv_with_cookie_while_changing(self):
585         """What happens if we modify items in the middle of the VLV?
586
587         The expected behaviour (as found on Windows) is the sort is
588         not repeated, but the changes in attributes are reflected.
589         """
590         attrs = [x for x in self.users[0].keys() if x not in
591                  ('dn', 'objectclass', 'cn')]
592         for attr in attrs:
593             n_users = len(self.users)
594             expected_order = [x.upper() for x in self.get_expected_order(attr)]
595             sort_control = "server_sort:1:0:%s" % attr
596             res = None
597             i = 0
598
599             # First we'll fetch the whole list so we know the original
600             # sort order. This is necessary because we don't know how
601             # the server will order equivalent items. We are using the
602             # dn as a key.
603             half_n = n_users // 2
604             vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
605             res = self.ldb.search(self.ou,
606                                   scope=ldb.SCOPE_ONELEVEL,
607                                   attrs=['dn', attr],
608                                   controls=[sort_control, vlv_search])
609
610             results = [x[attr][0].upper() for x in res]
611             #self.assertEquals(expected_order, results)
612
613             dn_order = [str(x['dn']) for x in res]
614             values = results[:]
615
616             for before in range(0, 3):
617                 for after in range(0, 3):
618                     for offset in range(1 + before, n_users - after):
619                         cookie = get_cookie(res.controls, len(self.users))
620                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
621                                       (before, after, offset, len(self.users),
622                                        cookie))
623
624                         res = self.ldb.search(self.ou,
625                                               scope=ldb.SCOPE_ONELEVEL,
626                                               attrs=['dn', attr],
627                                               controls=[sort_control,
628                                                         vlv_search])
629
630                         dn_results = [str(x['dn']) for x in res]
631                         dn_expected = dn_order[offset - before - 1:
632                                                offset + after]
633
634                         self.assertEquals(dn_expected, dn_results)
635
636                         results = [x[attr][0].upper() for x in res]
637
638                         self.assertCorrectResults(results, values,
639                                                   offset, before, after)
640
641                         i += 1
642                         if i % 3 == 2:
643                             if (attr in self.locale_sorted_keys or
644                                 attr in self.binary_sorted_keys):
645                                 i1 = i % n_users
646                                 i2 = (i ^ 255) % n_users
647                                 dn1 = dn_order[i1]
648                                 dn2 = dn_order[i2]
649                                 v2 = values[i2]
650
651                                 if v2 in self.locale_sorted_keys:
652                                     v2 += '-%d' % i
653                                 cn1 = dn1.split(',', 1)[0][3:]
654                                 cn2 = dn2.split(',', 1)[0][3:]
655
656                                 values[i1] = v2
657
658                                 m = ldb.Message()
659                                 m.dn = ldb.Dn(self.ldb, dn1)
660                                 m[attr] = ldb.MessageElement(v2,
661                                                              ldb.FLAG_MOD_REPLACE,
662                                                              attr)
663
664                                 self.ldb.modify(m)
665
666     def test_server_vlv_fractions_with_cookie(self):
667         """What happens when the count is set to an arbitrary number?
668
669         In that case the offset and the count form a fraction, and the
670         VLV should be centred at a point offset/count of the way
671         through. For example, if offset is 3 and count is 6, the VLV
672         should be looking around halfway. The actual algorithm is a
673         bit fiddlier than that, because of the one-basedness of VLV.
674         """
675         attrs = [x for x in self.users[0].keys() if x not in
676                  ('dn', 'objectclass')]
677
678         n_users = len(self.users)
679
680         random.seed(4)
681
682         for attr in attrs:
683             full_results, controls, sort_control = self.get_full_list(attr)
684             self.assertEqual(len(full_results), n_users)
685             for before in range(0, 2):
686                 for after in range(0, 2):
687                     for denominator in range(1, 20):
688                         for offset in range(1, denominator + 3):
689                             cookie = get_cookie(controls, len(self.users))
690                             vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
691                                           (before, after, offset,
692                                            denominator,
693                                            cookie))
694                             try:
695                                 res = self.ldb.search(self.ou,
696                                                       scope=ldb.SCOPE_ONELEVEL,
697                                                       attrs=[attr],
698                                                       controls=[sort_control,
699                                                                 vlv_search])
700                             except ldb.LdbError, e:
701                                 if offset != 0:
702                                     raise
703                                 print ("offset %d denominator %d raised error "
704                                        "expected error %s\n"
705                                        "(offset zero is illegal unless "
706                                        "content count is zero)" %
707                                        (offset, denominator, e))
708                                 continue
709
710                             results = [x[attr][0].lower() for x in res]
711
712                             if denominator == 0:
713                                 denominator = n_users
714                                 if offset == 0:
715                                     offset = denominator
716                             elif denominator == 1:
717                                 # the offset can only be 1, but the 1/1 case
718                                 # means something special
719                                 if offset == 1:
720                                     real_offset = n_users
721                                 else:
722                                     real_offset = 1
723                             else:
724                                 if offset > denominator:
725                                     offset = denominator
726                                 real_offset = (1 +
727                                                int(round((n_users - 1) *
728                                                          (offset - 1) /
729                                                          (denominator - 1.0)))
730                                 )
731
732                             self.assertCorrectResults(results, full_results,
733                                                       real_offset, before,
734                                                       after)
735
736                             controls = res.controls
737                             if False:
738                                 for c in list(controls):
739                                     cstr = str(c)
740                                     if cstr.startswith('vlv_resp'):
741                                         bits = cstr.rsplit(':')
742                                         print ("the answer is %s; we said %d" %
743                                                (bits[2], real_offset))
744                                         break
745
746     def test_server_vlv_no_cookie(self):
747         attrs = [x for x in self.users[0].keys() if x not in
748                  ('dn', 'objectclass')]
749
750         for attr in attrs:
751             expected_order = self.get_expected_order(attr)
752             sort_control = "server_sort:1:0:%s" % attr
753             for before in range(0, 5):
754                 for after in range(0, 7):
755                     for offset in range(1 + before, len(self.users) - after):
756                         res = self.ldb.search(self.ou,
757                                               scope=ldb.SCOPE_ONELEVEL,
758                                               attrs=[attr],
759                                               controls=[sort_control,
760                                                         "vlv:1:%d:%d:%d:0" %
761                                                         (before, after,
762                                                          offset)])
763                         results = [x[attr][0] for x in res]
764                         self.assertCorrectResults(results, expected_order,
765                                                   offset, before, after)
766
767     def get_expected_order_showing_deleted(self, attr,
768                                            expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
769                                            base=None,
770                                            scope=ldb.SCOPE_SUBTREE
771                                            ):
772         """Fetch the whole list sorted on the attribute, using sort only,
773         searching in the entire tree, not just our OU. This is the
774         way to find deleted objects.
775         """
776         if base is None:
777             base = self.base_dn
778         sort_control = "server_sort:1:0:%s" % attr
779         controls = [sort_control, "show_deleted:1"]
780
781         res = self.ldb.search(base,
782                               scope=scope,
783                               expression=expression,
784                               attrs=[attr],
785                               controls=controls)
786         results = [x[attr][0] for x in res]
787         return results
788
789     def add_deleted_users(self, n):
790         deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
791                          for i in range(n)]
792
793         for user in deleted_users:
794             self.delete_user(user)
795
796     def test_server_vlv_no_cookie_show_deleted(self):
797         """What do we see with the show_deleted control?"""
798         attrs = ['objectGUID',
799                  'cn',
800                  'sAMAccountName',
801                  'objectSid',
802                  'name',
803                  'whenChanged',
804                  'usnChanged'
805         ]
806
807         # add some deleted users first, just in case there are none
808         self.add_deleted_users(6)
809         random.seed(22)
810         expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
811
812         for attr in attrs:
813             show_deleted_control = "show_deleted:1"
814             expected_order = self.get_expected_order_showing_deleted(attr,
815                                                                      expression)
816             n = len(expected_order)
817             sort_control = "server_sort:1:0:%s" % attr
818             for before in [3, 1, 0]:
819                 for after in [0, 2]:
820                     # don't test every position, because there could be hundreds.
821                     # jump back and forth instead
822                     for i in range(20):
823                         offset = random.randrange(max(1, before - 2),
824                                                   min(n - after + 2, n))
825                         res = self.ldb.search(self.base_dn,
826                                               expression=expression,
827                                               scope=ldb.SCOPE_SUBTREE,
828                                               attrs=[attr],
829                                               controls=[sort_control,
830                                                         show_deleted_control,
831                                                         "vlv:1:%d:%d:%d:0" %
832                                                         (before, after,
833                                                          offset)
834                                               ]
835                         )
836                         results = [x[attr][0] for x in res]
837                         self.assertCorrectResults(results, expected_order,
838                                                   offset, before, after)
839
840     def test_server_vlv_no_cookie_show_deleted_only(self):
841         """What do we see with the show_deleted control when we're not looking
842         at any non-deleted things"""
843         attrs = ['objectGUID',
844                  'cn',
845                  'sAMAccountName',
846                  'objectSid',
847                  'whenChanged',
848         ]
849
850         # add some deleted users first, just in case there are none
851         self.add_deleted_users(4)
852         base = 'CN=Deleted Objects,%s' % self.base_dn
853         expression = "(cn=vlv-deleted*)"
854         for attr in attrs:
855             show_deleted_control = "show_deleted:1"
856             expected_order = self.get_expected_order_showing_deleted(attr,
857                                                     expression=expression,
858                                                     base=base,
859                                                     scope=ldb.SCOPE_ONELEVEL)
860             print ("searching for attr %s amongst %d deleted objects" %
861                    (attr, len(expected_order)))
862             sort_control = "server_sort:1:0:%s" % attr
863             step = max(len(expected_order) // 10, 1)
864             for before in [3, 0]:
865                 for after in [0, 2]:
866                     for offset in range(1 + before,
867                                         len(expected_order) - after,
868                                         step):
869                         res = self.ldb.search(base,
870                                               expression=expression,
871                                               scope=ldb.SCOPE_ONELEVEL,
872                                               attrs=[attr],
873                                               controls=[sort_control,
874                                                         show_deleted_control,
875                                                         "vlv:1:%d:%d:%d:0" %
876                                                         (before, after,
877                                                          offset)])
878                         results = [x[attr][0] for x in res]
879                         self.assertCorrectResults(results, expected_order,
880                                                   offset, before, after)
881
882
883
884     def test_server_vlv_with_cookie_show_deleted(self):
885         """What do we see with the show_deleted control?"""
886         attrs = ['objectGUID',
887                  'cn',
888                  'sAMAccountName',
889                  'objectSid',
890                  'name',
891                  'whenChanged',
892                  'usnChanged'
893         ]
894         self.add_deleted_users(6)
895         random.seed(23)
896         for attr in attrs:
897             expected_order = self.get_expected_order(attr)
898             sort_control = "server_sort:1:0:%s" % attr
899             res = None
900             show_deleted_control = "show_deleted:1"
901             expected_order = self.get_expected_order_showing_deleted(attr)
902             n = len(expected_order)
903             expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
904             for before in [3, 2, 1, 0]:
905                 after = before
906                 for i in range(20):
907                     offset = random.randrange(max(1, before - 2),
908                                               min(n - after + 2, n))
909                     if res is None:
910                         vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
911                                                            offset)
912                     else:
913                         cookie = get_cookie(res.controls, n)
914                         vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
915                                       (before, after, offset, n,
916                                        cookie))
917
918                     res = self.ldb.search(self.base_dn,
919                                           expression=expression,
920                                           scope=ldb.SCOPE_SUBTREE,
921                                           attrs=[attr],
922                                           controls=[sort_control,
923                                                     vlv_search,
924                                                     show_deleted_control])
925
926                     results = [x[attr][0] for x in res]
927
928                     self.assertCorrectResults(results, expected_order,
929                                               offset, before, after)
930
931
932     def test_server_vlv_gte_with_cookie(self):
933         attrs = [x for x in self.users[0].keys() if x not in
934                  ('dn', 'objectclass')]
935         for attr in attrs:
936             gte_order, expected_order, gte_map = \
937                                         self.get_gte_tests_and_order(attr)
938             # In case there is some order dependency, disorder tests
939             gte_tests = gte_order[:]
940             random.seed(1)
941             random.shuffle(gte_tests)
942             res = None
943             sort_control = "server_sort:1:0:%s" % attr
944             for before in [0, 1, 2, 4]:
945                 for after in [0, 1, 3, 6]:
946                     for gte in gte_tests:
947                         if res is not None:
948                             cookie = get_cookie(res.controls, len(self.users))
949                         else:
950                             cookie = None
951                         vlv_search = encode_vlv_control(before=before,
952                                                         after=after,
953                                                         gte=gte,
954                                                         cookie=cookie)
955
956                         res = self.ldb.search(self.ou,
957                                               scope=ldb.SCOPE_ONELEVEL,
958                                               attrs=[attr],
959                                               controls=[sort_control,
960                                                         vlv_search])
961
962                         results = [x[attr][0] for x in res]
963                         offset = gte_map.get(gte, len(expected_order))
964
965                         # here offset is 0-based
966                         start = max(offset - before, 0)
967                         end = offset + 1 + after
968
969                         expected_results = expected_order[start: end]
970
971                         self.assertEquals(expected_results, results)
972
973     def test_server_vlv_gte_no_cookie(self):
974         attrs = [x for x in self.users[0].keys() if x not in
975                  ('dn', 'objectclass')]
976         iteration = 0
977         for attr in attrs:
978             gte_order, expected_order, gte_map = \
979                                         self.get_gte_tests_and_order(attr)
980             # In case there is some order dependency, disorder tests
981             gte_tests = gte_order[:]
982             random.seed(1)
983             random.shuffle(gte_tests)
984
985             sort_control = "server_sort:1:0:%s" % attr
986             for before in [0, 1, 3]:
987                 for after in [0, 4]:
988                     for gte in gte_tests:
989                         vlv_search = encode_vlv_control(before=before,
990                                                         after=after,
991                                                         gte=gte)
992
993                         res = self.ldb.search(self.ou,
994                                               scope=ldb.SCOPE_ONELEVEL,
995                                               attrs=[attr],
996                                               controls=[sort_control,
997                                                         vlv_search])
998                         results = [x[attr][0] for x in res]
999
1000                         # here offset is 0-based
1001                         offset = gte_map.get(gte, len(expected_order))
1002                         start = max(offset - before, 0)
1003                         end = offset + after + 1
1004                         expected_results = expected_order[start: end]
1005                         iteration += 1
1006                         if expected_results != results:
1007                             middle = expected_order[len(expected_order) // 2]
1008                             print expected_results, results
1009                             print middle
1010                             print expected_order
1011                             print
1012                             print ("\nattr %s offset %d before %d "
1013                                    "after %d gte %s" %
1014                                    (attr, offset, before, after, gte))
1015                         self.assertEquals(expected_results, results)
1016
1017     def test_multiple_searches(self):
1018         """The maximum number of concurrent vlv searches per connection is
1019         currently set at 3. That means if you open 4 VLV searches the
1020         cookie on the first one should fail.
1021         """
1022         # Windows has a limit of 10 VLVs where there are low numbers
1023         # of objects in each search.
1024         attrs = ([x for x in self.users[0].keys() if x not in
1025                   ('dn', 'objectclass')] * 2)[:12]
1026
1027         vlv_cookies = []
1028         for attr in attrs:
1029             sort_control = "server_sort:1:0:%s" % attr
1030
1031             res = self.ldb.search(self.ou,
1032                                   scope=ldb.SCOPE_ONELEVEL,
1033                                   attrs=[attr],
1034                                   controls=[sort_control,
1035                                             "vlv:1:1:1:1:0"])
1036
1037             cookie = get_cookie(res.controls, len(self.users))
1038             vlv_cookies.append(cookie)
1039             time.sleep(0.2)
1040
1041         # now this one should fail
1042         self.assertRaises(ldb.LdbError,
1043                           self.ldb.search,
1044                           self.ou,
1045                           scope=ldb.SCOPE_ONELEVEL,
1046                           attrs=[attr],
1047                           controls=[sort_control,
1048                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1049
1050         # and this one should succeed
1051         res = self.ldb.search(self.ou,
1052                               scope=ldb.SCOPE_ONELEVEL,
1053                               attrs=[attr],
1054                               controls=[sort_control,
1055                                         "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1056
1057         # this one should fail because it is a new connection and
1058         # doesn't share cookies
1059         new_ldb = SamDB(host, credentials=creds,
1060                         session_info=system_session(lp), lp=lp)
1061
1062         self.assertRaises(ldb.LdbError,
1063                           new_ldb.search, self.ou,
1064                           scope=ldb.SCOPE_ONELEVEL,
1065                           attrs=[attr],
1066                           controls=[sort_control,
1067                                     "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1068
1069         # but now without the critical flag it just does no VLV.
1070         new_ldb.search(self.ou,
1071                        scope=ldb.SCOPE_ONELEVEL,
1072                        attrs=[attr],
1073                        controls=[sort_control,
1074                                  "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1075
1076
1077 if "://" not in host:
1078     if os.path.isfile(host):
1079         host = "tdb://%s" % host
1080     else:
1081         host = "ldap://%s" % host
1082
1083
1084 TestProgram(module=__name__, opts=subunitopts)