2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
12 sys.path.insert(0, "bin/python")
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
16 import samba.getopt as options
18 from samba.auth import system_session
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
36 parser.add_option('--elements', type='int', default=20,
37 help="use this many elements in the tests")
39 parser.add_option('--delete-in-setup', action='store_true',
40 help="cleanup in next setup rather than teardown")
42 parser.add_option('--skip-attr-regex',
43 help="ignore attributes matching this regex")
45 opts, args = parser.parse_args()
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
56 N_ELEMENTS = opts.elements
59 class VlvTestException(Exception):
63 def encode_vlv_control(critical=1,
69 s = "vlv:%d:%d:%d:" % (critical, before, after)
71 if offset is not None:
72 m = "%d:%d" % (offset, n)
73 elif b':' in gte or b'\x00' in gte:
74 gte = get_string(base64.b64encode(gte))
75 m = "base64>=%s" % gte
77 m = ">=%s" % get_string(gte)
82 return s + m + ':' + cookie
85 def get_cookie(controls, expected_n=None):
86 """Get the cookie, STILL base64 encoded, or raise ValueError."""
87 for c in list(controls):
89 if cstr.startswith('vlv_resp'):
90 head, n, _, cookie = cstr.rsplit(':', 3)
91 if expected_n is not None and int(n) != expected_n:
92 raise ValueError("Expected %s items, server said %s" %
95 raise ValueError("there is no VLV response")
98 class TestsWithUserOU(samba.tests.TestCase):
100 def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101 name = "%s%d%s" % (prefix, i, suffix)
104 "objectclass": "user",
105 'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106 "roomNumber": "%sbc" % (n - i),
108 "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
109 "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
110 "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
111 "flags": str(i * (n - i)),
112 "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
117 # _user_broken_attrs tests are broken due to problems outside
119 _user_broken_attrs = {
120 # Sort doesn't look past a NUL byte.
121 "photo": "\x00%d" % (n - i),
122 "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
124 "displayNamePrintable": "%d\x00%c" % (i, i & 255),
125 "adminDisplayName": "%d\x00b" % (n - i),
126 "title": "%d%sb" % (n - i, '\x00' * i),
127 "comment": "Favourite colour is %d" % (n % (i + 1)),
129 # Names that vary only in case. Windows returns
130 # equivalent addresses in the order they were put
131 # in ('a st', 'A st',...).
132 "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
135 if attrs is not None:
138 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
140 if opts.skip_attr_regex:
141 match = re.compile(opts.skip_attr_regex).search
142 for k in user.keys():
146 self.users.append(user)
151 super(TestsWithUserOU, self).setUp()
152 self.ldb = SamDB(host, credentials=creds,
153 session_info=system_session(lp), lp=lp)
155 self.base_dn = self.ldb.domain_dn()
156 self.ou = "ou=vlv,%s" % self.base_dn
157 if opts.delete_in_setup:
159 self.ldb.delete(self.ou, ['tree_delete:1'])
160 except ldb.LdbError as e:
161 print("tried deleting %s, got error %s" % (self.ou, e))
164 "objectclass": "organizationalUnit"})
167 for i in range(N_ELEMENTS):
168 self.create_user(i, N_ELEMENTS)
170 attrs = self.users[0].keys()
171 self.binary_sorted_keys = ['audio',
175 "displayNamePrintable"]
177 self.numeric_sorted_keys = ['flags',
180 self.timestamp_keys = ['msTSExpireDate4']
182 self.int64_keys = set(['accountExpires'])
184 self.locale_sorted_keys = [x for x in attrs if
185 x not in (self.binary_sorted_keys +
186 self.numeric_sorted_keys)]
188 # don't try spaces, etc in cn
189 self.delicate_keys = ['cn']
192 super(TestsWithUserOU, self).tearDown()
193 if not opts.delete_in_setup:
194 self.ldb.delete(self.ou, ['tree_delete:1'])
197 class VLVTests(TestsWithUserOU):
199 def get_full_list(self, attr, include_cn=False):
200 """Fetch the whole list sorted on the attribute, using the VLV.
201 This way you get a VLV cookie."""
202 n_users = len(self.users)
203 sort_control = "server_sort:1:0:%s" % attr
204 half_n = n_users // 2
205 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
209 res = self.ldb.search(self.ou,
210 scope=ldb.SCOPE_ONELEVEL,
212 controls=[sort_control,
215 full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
217 full_results = [str(x[attr][0]).lower() for x in res]
218 controls = res.controls
219 return full_results, controls, sort_control
221 def get_expected_order(self, attr, expression=None):
222 """Fetch the whole list sorted on the attribute, using sort only."""
223 sort_control = "server_sort:1:0:%s" % attr
224 res = self.ldb.search(self.ou,
225 scope=ldb.SCOPE_ONELEVEL,
226 expression=expression,
228 controls=[sort_control])
229 results = [x[attr][0] for x in res]
232 def delete_user(self, user):
233 self.ldb.delete(user['dn'])
234 del self.users[self.users.index(user)]
236 def get_gte_tests_and_order(self, attr, expression=None):
237 expected_order = self.get_expected_order(attr, expression=expression)
239 if attr in self.delicate_keys:
247 elif attr in self.timestamp_keys:
258 elif attr not in self.numeric_sorted_keys:
272 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
275 # "numeric" means positive integers
276 # doesn't work with -1, 3.14, ' 3', '9' * 20
283 if attr in self.int64_keys:
284 gte_keys += ['3' * 12, '71' * 8]
286 for i, x in enumerate(gte_keys):
287 user = self.create_user(i, N_ELEMENTS,
290 gte_users.append(user)
292 gte_order = self.get_expected_order(attr)
293 for user in gte_users:
294 self.delete_user(user)
297 expected_order_2 = self.get_expected_order(attr, expression=expression)
298 self.assertEqual(expected_order, expected_order_2)
300 # Map gte tests to indexes in expected order. This will break
301 # if gte_order and expected_order are differently ordered (as
305 # index to the first one with each value
307 for i, k in enumerate(expected_order):
308 if k not in index_map:
323 gte_map[k] = len(expected_order)
328 print(" %10s => %10s" % (k, gte_map[k]))
330 return gte_order, expected_order, gte_map
332 def assertCorrectResults(self, results, expected_order,
333 offset, before, after):
334 """A helper to calculate offsets correctly and say as much as possible
335 when something goes wrong."""
337 start = max(offset - before - 1, 0)
339 expected_results = expected_order[start: end]
341 # if it is a tuple with the cn, drop the cn
342 if expected_results and isinstance(expected_results[0], tuple):
343 expected_results = [x[0] for x in expected_results]
345 if expected_results == results:
348 if expected_order is not None:
349 print("expected order: %s" % expected_order[:20])
350 if len(expected_order) > 20:
351 print("... and %d more not shown" % (len(expected_order) - 20))
353 print("offset %d before %d after %d" % (offset, before, after))
354 print("start %d end %d" % (start, end))
355 print("expected: %s" % expected_results)
356 print("got : %s" % results)
357 self.assertEquals(expected_results, results)
359 def test_server_vlv_with_cookie(self):
360 attrs = [x for x in self.users[0].keys() if x not in
361 ('dn', 'objectclass')]
363 expected_order = self.get_expected_order(attr)
364 sort_control = "server_sort:1:0:%s" % attr
367 for before in [10, 0, 3, 1, 4, 5, 2]:
368 for after in [0, 3, 1, 4, 5, 2, 7]:
369 for offset in range(max(1, before - 2),
370 min(n - after + 2, n)):
372 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
375 cookie = get_cookie(res.controls, n)
376 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
377 (before, after, offset, n,
380 res = self.ldb.search(self.ou,
381 scope=ldb.SCOPE_ONELEVEL,
383 controls=[sort_control,
386 results = [x[attr][0] for x in res]
388 self.assertCorrectResults(results, expected_order,
389 offset, before, after)
391 def run_index_tests_with_expressions(self, expressions):
392 # Here we don't test every before/after combination.
393 attrs = [x for x in self.users[0].keys() if x not in
394 ('dn', 'objectclass')]
396 for expression in expressions:
397 expected_order = self.get_expected_order(attr, expression)
398 sort_control = "server_sort:1:0:%s" % attr
400 n = len(expected_order)
401 for before in range(0, 11):
403 for offset in range(max(1, before - 2),
404 min(n - after + 2, n)):
406 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
409 cookie = get_cookie(res.controls)
410 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
411 (before, after, offset, n,
414 res = self.ldb.search(self.ou,
415 expression=expression,
416 scope=ldb.SCOPE_ONELEVEL,
418 controls=[sort_control,
421 results = [x[attr][0] for x in res]
423 self.assertCorrectResults(results, expected_order,
424 offset, before, after)
426 def test_server_vlv_with_expression(self):
427 """What happens when we run the VLV with an expression?"""
428 expressions = ["(objectClass=*)",
429 "(cn=%s)" % self.users[-1]['cn'],
430 "(roomNumber=%s)" % self.users[0]['roomNumber'],
432 self.run_index_tests_with_expressions(expressions)
434 def test_server_vlv_with_failing_expression(self):
435 """What happens when we run the VLV on an expression that matches
437 expressions = ["(samaccountname=testferf)",
440 self.run_index_tests_with_expressions(expressions)
442 def run_gte_tests_with_expressions(self, expressions):
443 # Here we don't test every before/after combination.
444 attrs = [x for x in self.users[0].keys() if x not in
445 ('dn', 'objectclass')]
446 for expression in expressions:
448 gte_order, expected_order, gte_map = \
449 self.get_gte_tests_and_order(attr, expression)
450 # In case there is some order dependency, disorder tests
451 gte_tests = gte_order[:]
453 random.shuffle(gte_tests)
455 sort_control = "server_sort:1:0:%s" % attr
457 expected_order = self.get_expected_order(attr, expression)
458 sort_control = "server_sort:1:0:%s" % attr
460 for before in range(0, 11):
462 for gte in gte_tests:
464 cookie = get_cookie(res.controls)
467 vlv_search = encode_vlv_control(before=before,
472 res = self.ldb.search(self.ou,
473 scope=ldb.SCOPE_ONELEVEL,
474 expression=expression,
476 controls=[sort_control,
479 results = [x[attr][0] for x in res]
480 offset = gte_map.get(gte, len(expected_order))
482 # here offset is 0-based
483 start = max(offset - before, 0)
484 end = offset + 1 + after
486 expected_results = expected_order[start: end]
488 self.assertEquals(expected_results, results)
490 def test_vlv_gte_with_expression(self):
491 """What happens when we run the VLV with an expression?"""
492 expressions = ["(objectClass=*)",
493 "(cn=%s)" % self.users[-1]['cn'],
494 "(roomNumber=%s)" % self.users[0]['roomNumber'],
496 self.run_gte_tests_with_expressions(expressions)
498 def test_vlv_gte_with_failing_expression(self):
499 """What happens when we run the VLV on an expression that matches
501 expressions = ["(samaccountname=testferf)",
504 self.run_gte_tests_with_expressions(expressions)
506 def test_server_vlv_with_cookie_while_adding_and_deleting(self):
507 """What happens if we add or remove items in the middle of the VLV?
509 Nothing. The search and the sort is not repeated, and we only
510 deal with the objects originally found.
512 attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
513 ('dn', 'objectclass')]
517 full_results, controls, sort_control = \
518 self.get_full_list(attr, True)
519 original_n = len(self.users)
521 expected_order = full_results
524 for before in list(range(0, 3)) + [6, 11, 19]:
525 for after in list(range(0, 3)) + [6, 11, 19]:
526 start = max(before - 1, 1)
527 end = max(start + 4, original_n - after + 2)
528 for offset in range(start, end):
529 # if iteration > 2076:
531 cookie = get_cookie(controls, original_n)
532 vlv_search = encode_vlv_control(before=before,
539 res = self.ldb.search(self.ou,
540 scope=ldb.SCOPE_ONELEVEL,
542 controls=[sort_control,
545 controls = res.controls
546 results = [x[attr][0] for x in res]
547 real_offset = max(1, min(offset, len(expected_order)))
549 expected_results = []
551 begin_offset = max(real_offset - before - 1, 0)
552 real_before = min(before, real_offset - 1)
553 real_after = min(after,
554 len(expected_order) - real_offset)
556 for x in expected_order[begin_offset:]:
558 expected_results.append(get_bytes(x[0]))
559 if (len(expected_results) ==
560 real_before + real_after + 1):
565 if expected_results != results:
566 print("attr %s before %d after %d offset %d" %
567 (attr, before, after, offset))
568 self.assertEquals(expected_results, results)
571 if random.random() < 0.1 + (n < 5) * 0.05:
575 i = random.randrange(n)
576 user = self.create_user(i, n, suffix='-%s' %
579 if random.random() < 0.1 + (n > 50) * 0.02 and n:
580 index = random.randrange(n)
581 user = self.users.pop(index)
583 self.ldb.delete(user['dn'])
585 replaced = (user[attr], user['cn'])
586 if replaced in expected_order:
587 i = expected_order.index(replaced)
588 expected_order[i] = None
590 def test_server_vlv_with_cookie_while_changing(self):
591 """What happens if we modify items in the middle of the VLV?
593 The expected behaviour (as found on Windows) is the sort is
594 not repeated, but the changes in attributes are reflected.
596 attrs = [x for x in self.users[0].keys() if x not in
597 ('dn', 'objectclass', 'cn')]
599 n_users = len(self.users)
600 expected_order = [x.upper() for x in self.get_expected_order(attr)]
601 sort_control = "server_sort:1:0:%s" % attr
605 # First we'll fetch the whole list so we know the original
606 # sort order. This is necessary because we don't know how
607 # the server will order equivalent items. We are using the
609 half_n = n_users // 2
610 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
611 res = self.ldb.search(self.ou,
612 scope=ldb.SCOPE_ONELEVEL,
614 controls=[sort_control, vlv_search])
616 results = [x[attr][0].upper() for x in res]
617 #self.assertEquals(expected_order, results)
619 dn_order = [str(x['dn']) for x in res]
622 for before in range(0, 3):
623 for after in range(0, 3):
624 for offset in range(1 + before, n_users - after):
625 cookie = get_cookie(res.controls, len(self.users))
626 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
627 (before, after, offset, len(self.users),
630 res = self.ldb.search(self.ou,
631 scope=ldb.SCOPE_ONELEVEL,
633 controls=[sort_control,
636 dn_results = [str(x['dn']) for x in res]
637 dn_expected = dn_order[offset - before - 1:
640 self.assertEquals(dn_expected, dn_results)
642 results = [x[attr][0].upper() for x in res]
644 self.assertCorrectResults(results, values,
645 offset, before, after)
649 if (attr in self.locale_sorted_keys or
650 attr in self.binary_sorted_keys):
652 i2 = (i ^ 255) % n_users
657 if v2 in self.locale_sorted_keys:
659 cn1 = dn1.split(',', 1)[0][3:]
660 cn2 = dn2.split(',', 1)[0][3:]
665 m.dn = ldb.Dn(self.ldb, dn1)
666 m[attr] = ldb.MessageElement(v2,
667 ldb.FLAG_MOD_REPLACE,
672 def test_server_vlv_fractions_with_cookie(self):
673 """What happens when the count is set to an arbitrary number?
675 In that case the offset and the count form a fraction, and the
676 VLV should be centred at a point offset/count of the way
677 through. For example, if offset is 3 and count is 6, the VLV
678 should be looking around halfway. The actual algorithm is a
679 bit fiddlier than that, because of the one-basedness of VLV.
681 attrs = [x for x in self.users[0].keys() if x not in
682 ('dn', 'objectclass')]
684 n_users = len(self.users)
689 full_results, controls, sort_control = self.get_full_list(attr)
690 self.assertEqual(len(full_results), n_users)
691 for before in range(0, 2):
692 for after in range(0, 2):
693 for denominator in range(1, 20):
694 for offset in range(1, denominator + 3):
695 cookie = get_cookie(controls, len(self.users))
696 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
697 (before, after, offset,
701 res = self.ldb.search(self.ou,
702 scope=ldb.SCOPE_ONELEVEL,
704 controls=[sort_control,
706 except ldb.LdbError as e:
709 print("offset %d denominator %d raised error "
710 "expected error %s\n"
711 "(offset zero is illegal unless "
712 "content count is zero)" %
713 (offset, denominator, e))
716 results = [str(x[attr][0]).lower() for x in res]
719 denominator = n_users
722 elif denominator == 1:
723 # the offset can only be 1, but the 1/1 case
724 # means something special
726 real_offset = n_users
730 if offset > denominator:
733 int(round((n_users - 1) *
735 (denominator - 1.0)))
738 self.assertCorrectResults(results, full_results,
742 controls = res.controls
744 for c in list(controls):
746 if cstr.startswith('vlv_resp'):
747 bits = cstr.rsplit(':')
748 print("the answer is %s; we said %d" %
749 (bits[2], real_offset))
752 def test_server_vlv_no_cookie(self):
753 attrs = [x for x in self.users[0].keys() if x not in
754 ('dn', 'objectclass')]
757 expected_order = self.get_expected_order(attr)
758 sort_control = "server_sort:1:0:%s" % attr
759 for before in range(0, 5):
760 for after in range(0, 7):
761 for offset in range(1 + before, len(self.users) - after):
762 res = self.ldb.search(self.ou,
763 scope=ldb.SCOPE_ONELEVEL,
765 controls=[sort_control,
769 results = [x[attr][0] for x in res]
770 self.assertCorrectResults(results, expected_order,
771 offset, before, after)
773 def get_expected_order_showing_deleted(self, attr,
774 expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
776 scope=ldb.SCOPE_SUBTREE
778 """Fetch the whole list sorted on the attribute, using sort only,
779 searching in the entire tree, not just our OU. This is the
780 way to find deleted objects.
784 sort_control = "server_sort:1:0:%s" % attr
785 controls = [sort_control, "show_deleted:1"]
787 res = self.ldb.search(base,
789 expression=expression,
792 results = [x[attr][0] for x in res]
795 def add_deleted_users(self, n):
796 deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
799 for user in deleted_users:
800 self.delete_user(user)
802 def test_server_vlv_no_cookie_show_deleted(self):
803 """What do we see with the show_deleted control?"""
804 attrs = ['objectGUID',
813 # add some deleted users first, just in case there are none
814 self.add_deleted_users(6)
816 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
819 show_deleted_control = "show_deleted:1"
820 expected_order = self.get_expected_order_showing_deleted(attr,
822 n = len(expected_order)
823 sort_control = "server_sort:1:0:%s" % attr
824 for before in [3, 1, 0]:
826 # don't test every position, because there could be hundreds.
827 # jump back and forth instead
829 offset = random.randrange(max(1, before - 2),
830 min(n - after + 2, n))
831 res = self.ldb.search(self.base_dn,
832 expression=expression,
833 scope=ldb.SCOPE_SUBTREE,
835 controls=[sort_control,
836 show_deleted_control,
842 results = [x[attr][0] for x in res]
843 self.assertCorrectResults(results, expected_order,
844 offset, before, after)
846 def test_server_vlv_no_cookie_show_deleted_only(self):
847 """What do we see with the show_deleted control when we're not looking
848 at any non-deleted things"""
849 attrs = ['objectGUID',
856 # add some deleted users first, just in case there are none
857 self.add_deleted_users(4)
858 base = 'CN=Deleted Objects,%s' % self.base_dn
859 expression = "(cn=vlv-deleted*)"
861 show_deleted_control = "show_deleted:1"
862 expected_order = self.get_expected_order_showing_deleted(attr,
863 expression=expression,
865 scope=ldb.SCOPE_ONELEVEL)
866 print("searching for attr %s amongst %d deleted objects" %
867 (attr, len(expected_order)))
868 sort_control = "server_sort:1:0:%s" % attr
869 step = max(len(expected_order) // 10, 1)
870 for before in [3, 0]:
872 for offset in range(1 + before,
873 len(expected_order) - after,
875 res = self.ldb.search(base,
876 expression=expression,
877 scope=ldb.SCOPE_ONELEVEL,
879 controls=[sort_control,
880 show_deleted_control,
884 results = [x[attr][0] for x in res]
885 self.assertCorrectResults(results, expected_order,
886 offset, before, after)
888 def test_server_vlv_with_cookie_show_deleted(self):
889 """What do we see with the show_deleted control?"""
890 attrs = ['objectGUID',
898 self.add_deleted_users(6)
901 expected_order = self.get_expected_order(attr)
902 sort_control = "server_sort:1:0:%s" % attr
904 show_deleted_control = "show_deleted:1"
905 expected_order = self.get_expected_order_showing_deleted(attr)
906 n = len(expected_order)
907 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
908 for before in [3, 2, 1, 0]:
911 offset = random.randrange(max(1, before - 2),
912 min(n - after + 2, n))
914 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
917 cookie = get_cookie(res.controls, n)
918 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
919 (before, after, offset, n,
922 res = self.ldb.search(self.base_dn,
923 expression=expression,
924 scope=ldb.SCOPE_SUBTREE,
926 controls=[sort_control,
928 show_deleted_control])
930 results = [x[attr][0] for x in res]
932 self.assertCorrectResults(results, expected_order,
933 offset, before, after)
935 def test_server_vlv_gte_with_cookie(self):
936 attrs = [x for x in self.users[0].keys() if x not in
937 ('dn', 'objectclass')]
939 gte_order, expected_order, gte_map = \
940 self.get_gte_tests_and_order(attr)
941 # In case there is some order dependency, disorder tests
942 gte_tests = gte_order[:]
944 random.shuffle(gte_tests)
946 sort_control = "server_sort:1:0:%s" % attr
947 for before in [0, 1, 2, 4]:
948 for after in [0, 1, 3, 6]:
949 for gte in gte_tests:
951 cookie = get_cookie(res.controls, len(self.users))
954 vlv_search = encode_vlv_control(before=before,
959 res = self.ldb.search(self.ou,
960 scope=ldb.SCOPE_ONELEVEL,
962 controls=[sort_control,
965 results = [x[attr][0] for x in res]
966 offset = gte_map.get(gte, len(expected_order))
968 # here offset is 0-based
969 start = max(offset - before, 0)
970 end = offset + 1 + after
972 expected_results = expected_order[start: end]
974 self.assertEquals(expected_results, results)
976 def test_server_vlv_gte_no_cookie(self):
977 attrs = [x for x in self.users[0].keys() if x not in
978 ('dn', 'objectclass')]
981 gte_order, expected_order, gte_map = \
982 self.get_gte_tests_and_order(attr)
983 # In case there is some order dependency, disorder tests
984 gte_tests = gte_order[:]
986 random.shuffle(gte_tests)
988 sort_control = "server_sort:1:0:%s" % attr
989 for before in [0, 1, 3]:
991 for gte in gte_tests:
992 vlv_search = encode_vlv_control(before=before,
996 res = self.ldb.search(self.ou,
997 scope=ldb.SCOPE_ONELEVEL,
999 controls=[sort_control,
1001 results = [x[attr][0] for x in res]
1003 # here offset is 0-based
1004 offset = gte_map.get(gte, len(expected_order))
1005 start = max(offset - before, 0)
1006 end = offset + after + 1
1007 expected_results = expected_order[start: end]
1009 if expected_results != results:
1010 middle = expected_order[len(expected_order) // 2]
1011 print(expected_results, results)
1013 print(expected_order)
1015 print("\nattr %s offset %d before %d "
1017 (attr, offset, before, after, gte))
1018 self.assertEquals(expected_results, results)
1020 def test_multiple_searches(self):
1021 """The maximum number of concurrent vlv searches per connection is
1022 currently set at 3. That means if you open 4 VLV searches the
1023 cookie on the first one should fail.
1025 # Windows has a limit of 10 VLVs where there are low numbers
1026 # of objects in each search.
1027 attrs = ([x for x in self.users[0].keys() if x not in
1028 ('dn', 'objectclass')] * 2)[:12]
1032 sort_control = "server_sort:1:0:%s" % attr
1034 res = self.ldb.search(self.ou,
1035 scope=ldb.SCOPE_ONELEVEL,
1037 controls=[sort_control,
1040 cookie = get_cookie(res.controls, len(self.users))
1041 vlv_cookies.append(cookie)
1044 # now this one should fail
1045 self.assertRaises(ldb.LdbError,
1048 scope=ldb.SCOPE_ONELEVEL,
1050 controls=[sort_control,
1051 "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1053 # and this one should succeed
1054 res = self.ldb.search(self.ou,
1055 scope=ldb.SCOPE_ONELEVEL,
1057 controls=[sort_control,
1058 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1060 # this one should fail because it is a new connection and
1061 # doesn't share cookies
1062 new_ldb = SamDB(host, credentials=creds,
1063 session_info=system_session(lp), lp=lp)
1065 self.assertRaises(ldb.LdbError,
1066 new_ldb.search, self.ou,
1067 scope=ldb.SCOPE_ONELEVEL,
1069 controls=[sort_control,
1070 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1072 # but now without the critical flag it just does no VLV.
1073 new_ldb.search(self.ou,
1074 scope=ldb.SCOPE_ONELEVEL,
1076 controls=[sort_control,
1077 "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1079 # Run a vlv search and return important fields of the response control
1080 def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
1081 sort_ctrl = "server_sort:1:0:%s" % attr
1082 ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
1084 ctrl += ":" + cookie
1086 res = self.ldb.search(self.ou,
1088 scope=ldb.SCOPE_ONELEVEL,
1090 controls=[ctrl, sort_ctrl])
1091 results = [str(x[attr][0]) for x in res]
1093 ctrls = [str(c) for c in res.controls if
1094 str(c).startswith('vlv')]
1095 self.assertEqual(len(ctrls), 1)
1097 spl = ctrls[0].rsplit(':')
1102 return results, cookie
1104 def test_vlv_modify_during_view(self):
1106 expr = "(objectclass=user)"
1109 full_results, cookie = self.vlv_search(attr, expr,
1110 after_count=len(self.users))
1113 edit_index = len(self.users)//2
1114 edit_attr = full_results[edit_index]
1115 users_with_attr = [u for u in self.users if u[attr] == edit_attr]
1116 self.assertEqual(len(users_with_attr), 1)
1117 edit_user = users_with_attr[0]
1119 # Put z at the front of the val so it comes last in ordering
1120 edit_val = "z_" + edit_user[attr]
1123 m.dn = ldb.Dn(self.ldb, edit_user['dn'])
1124 m[attr] = ldb.MessageElement(edit_val, ldb.FLAG_MOD_REPLACE, attr)
1127 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1128 after_count=len(self.users))
1130 # Make expected_results by copying and editing full_results
1131 expected_results = full_results[:]
1132 expected_results[edit_index] = edit_val
1133 self.assertEqual(results, expected_results)
1135 # Test changing the search expression in a request on an initialised view
1136 # Expected failure on samba, passes on windows
1137 def test_vlv_change_search_expr(self):
1139 expr = "(objectclass=user)"
1142 full_results, cookie = self.vlv_search(attr, expr,
1143 after_count=len(self.users))
1145 middle_index = len(full_results)//2
1146 # Search that excludes the old value but includes the new one
1147 expr = "%s>=%s" % (attr, full_results[middle_index])
1148 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1149 after_count=len(self.users))
1150 self.assertEqual(results, full_results[middle_index:])
1152 # Check you can't add a value to a vlv view
1153 def test_vlv_add_during_view(self):
1155 expr = "(objectclass=user)"
1158 full_results, cookie = self.vlv_search(attr, expr,
1159 after_count=len(self.users))
1161 # Add a user at the end of the sort order
1162 add_val = "z_addedval"
1163 user = {'cn': add_val, "objectclass": "user", attr: add_val}
1164 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1167 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1168 after_count=len(self.users)+1)
1169 self.assertEqual(results, full_results)
1171 def test_vlv_delete_during_view(self):
1173 expr = "(objectclass=user)"
1176 full_results, cookie = self.vlv_search(attr, expr,
1177 after_count=len(self.users))
1179 # Delete one of the users
1180 del_index = len(self.users)//2
1181 del_user = self.users[del_index]
1182 self.ldb.delete(del_user['dn'])
1184 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1185 after_count=len(self.users))
1186 expected_results = [r for r in full_results if r != del_user[attr]]
1187 self.assertEqual(results, expected_results)
1190 class PagedResultsTests(TestsWithUserOU):
1192 def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
1193 attrs=None, ou=None, subtree=False):
1196 cookie = ":" + cookie
1197 ctrl = "paged_results:1:" + str(page_size) + cookie
1200 # If extra controls are provided then add them, else default to
1201 # sort control on 'cn' attribute
1202 if extra_ctrls is not None:
1203 controls += extra_ctrls
1205 sort_ctrl = "server_sort:1:0:cn"
1206 controls.append(sort_ctrl)
1209 if attrs is not None:
1210 kwargs = {"attrs": attrs}
1212 scope = ldb.SCOPE_ONELEVEL
1214 scope = ldb.SCOPE_SUBTREE
1216 res = self.ldb.search(ou,
1221 results = [str(r['cn'][0]) for r in res]
1223 ctrls = [str(c) for c in res.controls if
1224 str(c).startswith("paged_results")]
1225 assert len(ctrls) == 1, "no paged_results response"
1227 spl = ctrls[0].rsplit(':', 3)
1231 return results, cookie
1233 def test_paged_delete_during_search(self):
1234 expr = "(objectClass=*)"
1238 results, cookie = self.paged_search(expr, page_size=first_page_size)
1240 # Run normal search to get expected results
1241 unedited_results, _ = self.paged_search(expr,
1242 page_size=len(self.users))
1244 # Get remaining users not returned by the search above
1245 unreturned_users = [u for u in self.users if u['cn'] not in results]
1247 # Delete one of the users
1248 del_index = len(self.users)//2
1249 del_user = unreturned_users[del_index]
1250 self.ldb.delete(del_user['dn'])
1253 results, _ = self.paged_search(expr, cookie=cookie,
1254 page_size=len(self.users))
1255 expected_results = [r for r in unedited_results[first_page_size:]
1256 if r != del_user['cn']]
1257 self.assertEqual(results, expected_results)
1259 def test_paged_show_deleted(self):
1260 unique = time.strftime("%s", time.gmtime())[-5:]
1261 prefix = "show_deleted_test_%s_" % (unique)
1262 expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
1263 del_ctrl = "show_deleted:1"
1267 for i in range(num_users):
1268 user = self.create_user(i, num_users, prefix=prefix)
1271 first_user = users[0]
1272 self.ldb.delete(first_user['dn'])
1276 results, cookie = self.paged_search(expr, page_size=first_page_size,
1277 extra_ctrls=[del_ctrl],
1281 # Get remaining users not returned by the search above
1282 unreturned_users = [u for u in users if u['cn'] not in results]
1284 # Delete one of the users
1285 del_index = len(users)//2
1286 del_user = unreturned_users[del_index]
1287 self.ldb.delete(del_user['dn'])
1289 results2, _ = self.paged_search(expr, cookie=cookie,
1290 page_size=len(users)*2,
1291 extra_ctrls=[del_ctrl],
1295 user_cns = {str(u['cn']) for u in users}
1296 deleted_cns = {first_user['cn'], del_user['cn']}
1298 all_results = results + results2
1299 normal_results = {r for r in all_results if "DEL:" not in r}
1300 self.assertEqual(normal_results, user_cns - deleted_cns)
1302 # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
1303 deleted_results = {r[:r.index('\n')] for r in all_results
1305 self.assertEqual(deleted_results, deleted_cns)
1307 def test_paged_add_during_search(self):
1308 expr = "(objectClass=*)"
1312 results, cookie = self.paged_search(expr, page_size=first_page_size)
1314 unedited_results, _ = self.paged_search(expr,
1315 page_size=len(self.users)+1)
1317 # Get remaining users not returned by the search above
1318 unwalked_users = [cn for cn in unedited_results if cn not in results]
1320 # Add a user in the middle of the sort order
1321 middle_index = len(unwalked_users)//2
1322 middle_user = unwalked_users[middle_index]
1324 user = {'cn': middle_user + '_2', "objectclass": "user"}
1325 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1328 results, _ = self.paged_search(expr, cookie=cookie,
1329 page_size=len(self.users)+1)
1330 expected_results = unwalked_users[:]
1332 # Uncomment this line to assert that adding worked.
1333 # expected_results.insert(middle_index+1, user['cn'])
1335 self.assertEqual(results, expected_results)
1337 def test_paged_modify_during_search(self):
1338 expr = "(objectClass=*)"
1342 results, cookie = self.paged_search(expr, page_size=first_page_size)
1344 unedited_results, _ = self.paged_search(expr,
1345 page_size=len(self.users)+1)
1347 # Modify user in the middle of the remaining sort order
1348 unwalked_users = [cn for cn in unedited_results if cn not in results]
1349 middle_index = len(unwalked_users)//2
1350 middle_cn = unwalked_users[middle_index]
1353 users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1354 self.assertEqual(len(users_with_middle_cn), 1)
1355 middle_user = users_with_middle_cn[0]
1358 edit_cn = "z_" + middle_cn
1359 new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
1360 self.ldb.rename(middle_user['dn'], new_dn)
1362 results, _ = self.paged_search(expr, cookie=cookie,
1363 page_size=len(self.users)+1)
1364 expected_results = unwalked_users[:]
1365 expected_results[middle_index] = edit_cn
1366 self.assertEqual(results, expected_results)
1368 def test_paged_modify_object_scope(self):
1369 expr = "(objectClass=*)"
1371 ou2 = "OU=vlvtestou2,%s" % (self.base_dn)
1373 self.ldb.delete(ou2, ['tree_delete:1'])
1374 except ldb.LdbError:
1376 self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
1378 # Do a separate, full search to get all results
1379 unedited_results, _ = self.paged_search(expr,
1380 page_size=len(self.users)+1)
1382 # Rename before starting a search
1383 first_cn = self.users[0]['cn']
1384 new_dn = "CN=%s,%s" % (first_cn, ou2)
1385 self.ldb.rename(self.users[0]['dn'], new_dn)
1387 # Start new search under the original OU
1389 results, cookie = self.paged_search(expr, page_size=first_page_size)
1390 self.assertEqual(results, unedited_results[1:1+first_page_size])
1392 # Get one of the users that is yet to be returned
1393 unwalked_users = [cn for cn in unedited_results if cn not in results]
1394 middle_index = len(unwalked_users)//2
1395 middle_cn = unwalked_users[middle_index]
1398 users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1399 self.assertEqual(len(users_with_middle_cn), 1)
1400 middle_user = users_with_middle_cn[0]
1403 new_dn = "CN=%s,%s" % (middle_cn, ou2)
1404 self.ldb.rename(middle_user['dn'], new_dn)
1406 results, _ = self.paged_search(expr, cookie=cookie,
1407 page_size=len(self.users)+1)
1409 expected_results = unwalked_users[:]
1411 # We should really expect that the object renamed into a different
1412 # OU should vanish from the results, but turns out Windows does return
1413 # the object in this case. Our module matches the Windows behaviour.
1415 # If behaviour changes, this line inverts the test's expectations to
1416 # what you might expect.
1417 # del expected_results[middle_index]
1419 # But still expect the user we removed before the search to be gone
1420 del expected_results[0]
1422 self.assertEqual(results, expected_results)
1424 def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
1427 results, _ = self.paged_search(expr, cookie=cookie,
1429 extra_ctrls=extra_ctrls,
1431 except ldb.LdbError as e:
1432 self.assertEqual(e.args[0], err_num)
1435 self.fail("No error raised by invalid search")
1437 def test_paged_changed_expr(self):
1438 # Initiate search then use a different expr in subsequent req
1439 expr = "(objectClass=*)"
1440 results, cookie = self.paged_search(expr, page_size=3)
1442 expected_error_num = 12
1443 self.assertPagedSearchRaises(expected_error_num, expr, cookie)
1445 def test_paged_changed_controls(self):
1446 expr = "(objectClass=*)"
1447 sort_ctrl = "server_sort:1:0:cn"
1448 del_ctrl = "show_deleted:1"
1449 expected_error_num = 12
1452 # Initiate search with a sort control then remove in subsequent req
1453 results, cookie = self.paged_search(expr, page_size=ps,
1454 extra_ctrls=[sort_ctrl])
1455 self.assertPagedSearchRaises(expected_error_num, expr,
1456 cookie, extra_ctrls=[])
1458 # Initiate search with no sort control then add one in subsequent req
1459 results, cookie = self.paged_search(expr, page_size=ps,
1461 self.assertPagedSearchRaises(expected_error_num, expr,
1462 cookie, extra_ctrls=[sort_ctrl])
1464 # Initiate search with show-deleted control then
1465 # remove it in subsequent req
1466 results, cookie = self.paged_search(expr, page_size=ps,
1467 extra_ctrls=[del_ctrl])
1468 self.assertPagedSearchRaises(expected_error_num, expr,
1469 cookie, extra_ctrls=[])
1471 # Initiate normal search then add show-deleted control
1473 results, cookie = self.paged_search(expr, page_size=ps,
1475 self.assertPagedSearchRaises(expected_error_num, expr,
1476 cookie, extra_ctrls=[del_ctrl])
1478 # Changing order of controls shouldn't break the search
1479 results, cookie = self.paged_search(expr, page_size=ps,
1480 extra_ctrls=[del_ctrl, sort_ctrl])
1482 results, cookie = self.paged_search(expr, page_size=ps,
1483 extra_ctrls=[sort_ctrl,
1485 except ldb.LdbError as e:
1488 def test_paged_cant_change_controls_data(self):
1489 # Some defaults for the rest of the tests
1490 expr = "(objectClass=*)"
1491 sort_ctrl = "server_sort:1:0:cn"
1492 expected_error_num = 12
1494 # Initiate search with sort control then change it in subsequent req
1495 results, cookie = self.paged_search(expr, page_size=3,
1496 extra_ctrls=[sort_ctrl])
1497 changed_sort_ctrl = "server_sort:1:0:roomNumber"
1498 self.assertPagedSearchRaises(expected_error_num, expr,
1499 cookie, extra_ctrls=[changed_sort_ctrl])
1501 # Initiate search with a control with crit=1, then use crit=0
1502 results, cookie = self.paged_search(expr, page_size=3,
1503 extra_ctrls=[sort_ctrl])
1504 changed_sort_ctrl = "server_sort:0:0:cn"
1505 self.assertPagedSearchRaises(expected_error_num, expr,
1506 cookie, extra_ctrls=[changed_sort_ctrl])
1508 def test_paged_search_referrals(self):
1509 expr = "(objectClass=*)"
1510 paged_ctrl = "paged_results:1:5"
1511 res = self.ldb.search(self.base_dn,
1514 scope=ldb.SCOPE_SUBTREE,
1515 controls=[paged_ctrl])
1517 # Do a paged search walk over the whole database and save a list
1518 # of all the referrals returned by each search.
1522 referral_lists.append(res.referals)
1524 ctrls = [str(c) for c in res.controls if
1525 str(c).startswith("paged_results")]
1526 self.assertEqual(len(ctrls), 1)
1527 spl = ctrls[0].rsplit(':')
1532 res = self.ldb.search(self.base_dn,
1535 scope=ldb.SCOPE_SUBTREE,
1536 controls=[paged_ctrl + ":" + cookie])
1538 ref_list = referral_lists[0]
1540 # Sanity check to make sure the search actually did something
1541 self.assertGreater(len(referral_lists), 2)
1543 # Check the first referral set contains stuff
1544 self.assertGreater(len(ref_list), 0)
1546 # Check the others don't
1547 self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
1549 # Check the entries in the first referral list look like referrals
1550 self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
1552 def test_paged_change_attrs(self):
1553 expr = "(objectClass=*)"
1555 expected_error_num = 12
1557 results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
1558 results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
1561 changed_attrs = attrs + ['roomNumber']
1562 self.assertPagedSearchRaises(expected_error_num, expr,
1563 cookie, attrs=changed_attrs,
1566 def test_paged_search_lockstep(self):
1567 expr = "(objectClass=*)"
1570 all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
1572 # Run two different but overlapping paged searches simultaneously.
1573 set_1_index = int((len(all_results))//3)
1574 set_2_index = int((2*len(all_results))//3)
1575 set_1 = all_results[set_1_index:]
1576 set_2 = all_results[:set_2_index+1]
1577 set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
1578 set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
1580 results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
1581 self.assertEqual(results, set_1[:ps])
1582 results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
1583 self.assertEqual(results, set_2[:ps])
1585 results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
1587 self.assertEqual(results, set_1[ps:ps*2])
1588 results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
1590 self.assertEqual(results, set_2[ps:ps*2])
1592 results, _ = self.paged_search(set_1_expr, cookie=cookie1,
1593 page_size=len(self.users))
1594 self.assertEqual(results, set_1[ps*2:])
1595 results, _ = self.paged_search(set_2_expr, cookie=cookie2,
1596 page_size=len(self.users))
1597 self.assertEqual(results, set_2[ps*2:])
1600 if "://" not in host:
1601 if os.path.isfile(host):
1602 host = "tdb://%s" % host
1604 host = "ldap://%s" % host
1607 TestProgram(module=__name__, opts=subunitopts)