2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
4 from __future__ import print_function
12 sys.path.insert(0, "bin/python")
14 from samba.tests.subunitrun import SubunitOptions, TestProgram
16 import samba.getopt as options
18 from samba.auth import system_session
20 from samba.samdb import SamDB
21 from samba.compat import get_bytes
22 from samba.compat import get_string
26 parser = optparse.OptionParser("vlv.py [options] <host>")
27 sambaopts = options.SambaOptions(parser)
28 parser.add_option_group(sambaopts)
29 parser.add_option_group(options.VersionOptions(parser))
30 # use command line creds if available
31 credopts = options.CredentialsOptions(parser)
32 parser.add_option_group(credopts)
33 subunitopts = SubunitOptions(parser)
34 parser.add_option_group(subunitopts)
36 parser.add_option('--elements', type='int', default=20,
37 help="use this many elements in the tests")
39 parser.add_option('--delete-in-setup', action='store_true',
40 help="cleanup in next setup rather than teardown")
42 parser.add_option('--skip-attr-regex',
43 help="ignore attributes matching this regex")
45 opts, args = parser.parse_args()
53 lp = sambaopts.get_loadparm()
54 creds = credopts.get_credentials(lp)
56 N_ELEMENTS = opts.elements
59 class VlvTestException(Exception):
63 def encode_vlv_control(critical=1,
69 s = "vlv:%d:%d:%d:" % (critical, before, after)
71 if offset is not None:
72 m = "%d:%d" % (offset, n)
73 elif b':' in gte or b'\x00' in gte:
74 gte = get_string(base64.b64encode(gte))
75 m = "base64>=%s" % gte
77 m = ">=%s" % get_string(gte)
82 return s + m + ':' + cookie
85 def get_cookie(controls, expected_n=None):
86 """Get the cookie, STILL base64 encoded, or raise ValueError."""
87 for c in list(controls):
89 if cstr.startswith('vlv_resp'):
90 head, n, _, cookie = cstr.rsplit(':', 3)
91 if expected_n is not None and int(n) != expected_n:
92 raise ValueError("Expected %s items, server said %s" %
95 raise ValueError("there is no VLV response")
98 class TestsWithUserOU(samba.tests.TestCase):
100 def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
101 name = "%s%d%s" % (prefix, i, suffix)
104 "objectclass": "user",
105 'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
106 "roomNumber": "%sbc" % (n - i),
108 "facsimileTelephoneNumber": name,
109 "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
110 "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
111 "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
112 "flags": str(i * (n - i)),
113 "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
118 # _user_broken_attrs tests are broken due to problems outside
120 _user_broken_attrs = {
121 # Sort doesn't look past a NUL byte.
122 "photo": "\x00%d" % (n - i),
123 "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
125 "displayNamePrintable": "%d\x00%c" % (i, i & 255),
126 "adminDisplayName": "%d\x00b" % (n - i),
127 "title": "%d%sb" % (n - i, '\x00' * i),
128 "comment": "Favourite colour is %d" % (n % (i + 1)),
130 # Names that vary only in case. Windows returns
131 # equivalent addresses in the order they were put
132 # in ('a st', 'A st',...).
133 "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
136 if attrs is not None:
139 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
141 if opts.skip_attr_regex:
142 match = re.compile(opts.skip_attr_regex).search
143 for k in user.keys():
147 self.users.append(user)
152 super(TestsWithUserOU, self).setUp()
153 self.ldb = SamDB(host, credentials=creds,
154 session_info=system_session(lp), lp=lp)
156 self.base_dn = self.ldb.domain_dn()
157 self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn
158 self.ou = "ou=vlvou,%s" % self.tree_dn
159 if opts.delete_in_setup:
161 self.ldb.delete(self.tree_dn, ['tree_delete:1'])
162 except ldb.LdbError as e:
163 print("tried deleting %s, got error %s" % (self.tree_dn, e))
166 "objectclass": "organizationalUnit"})
169 "objectclass": "organizationalUnit"})
172 for i in range(N_ELEMENTS):
173 self.create_user(i, N_ELEMENTS)
175 attrs = self.users[0].keys()
176 self.binary_sorted_keys = ['audio',
180 "displayNamePrintable"]
182 self.numeric_sorted_keys = ['flags',
185 self.timestamp_keys = ['msTSExpireDate4']
187 self.int64_keys = set(['accountExpires'])
189 self.locale_sorted_keys = [x for x in attrs if
190 x not in (self.binary_sorted_keys +
191 self.numeric_sorted_keys)]
193 # don't try spaces, etc in cn
194 self.delicate_keys = ['cn']
197 super(TestsWithUserOU, self).tearDown()
198 if not opts.delete_in_setup:
199 self.ldb.delete(self.tree_dn, ['tree_delete:1'])
202 class VLVTests(TestsWithUserOU):
204 def get_full_list(self, attr, include_cn=False):
205 """Fetch the whole list sorted on the attribute, using the VLV.
206 This way you get a VLV cookie."""
207 n_users = len(self.users)
208 sort_control = "server_sort:1:0:%s" % attr
209 half_n = n_users // 2
210 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
214 res = self.ldb.search(self.ou,
215 scope=ldb.SCOPE_ONELEVEL,
217 controls=[sort_control,
220 full_results = [(str(x[attr][0]), str(x['cn'][0])) for x in res]
222 full_results = [str(x[attr][0]).lower() for x in res]
223 controls = res.controls
224 return full_results, controls, sort_control
226 def get_expected_order(self, attr, expression=None):
227 """Fetch the whole list sorted on the attribute, using sort only."""
228 sort_control = "server_sort:1:0:%s" % attr
229 res = self.ldb.search(self.ou,
230 scope=ldb.SCOPE_ONELEVEL,
231 expression=expression,
233 controls=[sort_control])
234 results = [x[attr][0] for x in res]
237 def delete_user(self, user):
238 self.ldb.delete(user['dn'])
239 del self.users[self.users.index(user)]
241 def get_gte_tests_and_order(self, attr, expression=None):
242 expected_order = self.get_expected_order(attr, expression=expression)
244 if attr in self.delicate_keys:
252 elif attr in self.timestamp_keys:
263 elif attr not in self.numeric_sorted_keys:
277 gte_keys.append(expected_order[len(expected_order) // 2] + b' tail')
280 # "numeric" means positive integers
281 # doesn't work with -1, 3.14, ' 3', '9' * 20
288 if attr in self.int64_keys:
289 gte_keys += ['3' * 12, '71' * 8]
291 for i, x in enumerate(gte_keys):
292 user = self.create_user(i, N_ELEMENTS,
295 gte_users.append(user)
297 gte_order = self.get_expected_order(attr)
298 for user in gte_users:
299 self.delete_user(user)
302 expected_order_2 = self.get_expected_order(attr, expression=expression)
303 self.assertEqual(expected_order, expected_order_2)
305 # Map gte tests to indexes in expected order. This will break
306 # if gte_order and expected_order are differently ordered (as
310 # index to the first one with each value
312 for i, k in enumerate(expected_order):
313 if k not in index_map:
328 gte_map[k] = len(expected_order)
333 print(" %10s => %10s" % (k, gte_map[k]))
335 return gte_order, expected_order, gte_map
337 def assertCorrectResults(self, results, expected_order,
338 offset, before, after):
339 """A helper to calculate offsets correctly and say as much as possible
340 when something goes wrong."""
342 start = max(offset - before - 1, 0)
344 expected_results = expected_order[start: end]
346 # if it is a tuple with the cn, drop the cn
347 if expected_results and isinstance(expected_results[0], tuple):
348 expected_results = [x[0] for x in expected_results]
350 if expected_results == results:
353 if expected_order is not None:
354 print("expected order: %s" % expected_order[:20])
355 if len(expected_order) > 20:
356 print("... and %d more not shown" % (len(expected_order) - 20))
358 print("offset %d before %d after %d" % (offset, before, after))
359 print("start %d end %d" % (start, end))
360 print("expected: %s" % expected_results)
361 print("got : %s" % results)
362 self.assertEquals(expected_results, results)
364 def test_server_vlv_with_cookie(self):
365 attrs = [x for x in self.users[0].keys() if x not in
366 ('dn', 'objectclass')]
368 expected_order = self.get_expected_order(attr)
369 sort_control = "server_sort:1:0:%s" % attr
372 for before in [10, 0, 3, 1, 4, 5, 2]:
373 for after in [0, 3, 1, 4, 5, 2, 7]:
374 for offset in range(max(1, before - 2),
375 min(n - after + 2, n)):
377 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
380 cookie = get_cookie(res.controls, n)
381 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
382 (before, after, offset, n,
385 res = self.ldb.search(self.ou,
386 scope=ldb.SCOPE_ONELEVEL,
388 controls=[sort_control,
391 results = [x[attr][0] for x in res]
393 self.assertCorrectResults(results, expected_order,
394 offset, before, after)
396 def run_index_tests_with_expressions(self, expressions):
397 # Here we don't test every before/after combination.
398 attrs = [x for x in self.users[0].keys() if x not in
399 ('dn', 'objectclass')]
401 for expression in expressions:
402 expected_order = self.get_expected_order(attr, expression)
403 sort_control = "server_sort:1:0:%s" % attr
405 n = len(expected_order)
406 for before in range(0, 11):
408 for offset in range(max(1, before - 2),
409 min(n - after + 2, n)):
411 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
414 cookie = get_cookie(res.controls)
415 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
416 (before, after, offset, n,
419 res = self.ldb.search(self.ou,
420 expression=expression,
421 scope=ldb.SCOPE_ONELEVEL,
423 controls=[sort_control,
426 results = [x[attr][0] for x in res]
428 self.assertCorrectResults(results, expected_order,
429 offset, before, after)
431 def test_server_vlv_with_expression(self):
432 """What happens when we run the VLV with an expression?"""
433 expressions = ["(objectClass=*)",
434 "(cn=%s)" % self.users[-1]['cn'],
435 "(roomNumber=%s)" % self.users[0]['roomNumber'],
437 self.run_index_tests_with_expressions(expressions)
439 def test_server_vlv_with_failing_expression(self):
440 """What happens when we run the VLV on an expression that matches
442 expressions = ["(samaccountname=testferf)",
445 self.run_index_tests_with_expressions(expressions)
447 def run_gte_tests_with_expressions(self, expressions):
448 # Here we don't test every before/after combination.
449 attrs = [x for x in self.users[0].keys() if x not in
450 ('dn', 'objectclass')]
451 for expression in expressions:
453 gte_order, expected_order, gte_map = \
454 self.get_gte_tests_and_order(attr, expression)
455 # In case there is some order dependency, disorder tests
456 gte_tests = gte_order[:]
458 random.shuffle(gte_tests)
460 sort_control = "server_sort:1:0:%s" % attr
462 expected_order = self.get_expected_order(attr, expression)
463 sort_control = "server_sort:1:0:%s" % attr
465 for before in range(0, 11):
467 for gte in gte_tests:
469 cookie = get_cookie(res.controls)
472 vlv_search = encode_vlv_control(before=before,
477 res = self.ldb.search(self.ou,
478 scope=ldb.SCOPE_ONELEVEL,
479 expression=expression,
481 controls=[sort_control,
484 results = [x[attr][0] for x in res]
485 offset = gte_map.get(gte, len(expected_order))
487 # here offset is 0-based
488 start = max(offset - before, 0)
489 end = offset + 1 + after
491 expected_results = expected_order[start: end]
493 self.assertEquals(expected_results, results)
495 def test_vlv_gte_with_expression(self):
496 """What happens when we run the VLV with an expression?"""
497 expressions = ["(objectClass=*)",
498 "(cn=%s)" % self.users[-1]['cn'],
499 "(roomNumber=%s)" % self.users[0]['roomNumber'],
501 self.run_gte_tests_with_expressions(expressions)
503 def test_vlv_gte_with_failing_expression(self):
504 """What happens when we run the VLV on an expression that matches
506 expressions = ["(samaccountname=testferf)",
509 self.run_gte_tests_with_expressions(expressions)
511 def test_server_vlv_with_cookie_while_adding_and_deleting(self):
512 """What happens if we add or remove items in the middle of the VLV?
514 Nothing. The search and the sort is not repeated, and we only
515 deal with the objects originally found.
517 attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
518 ('dn', 'objectclass')]
522 full_results, controls, sort_control = \
523 self.get_full_list(attr, True)
524 original_n = len(self.users)
526 expected_order = full_results
529 for before in list(range(0, 3)) + [6, 11, 19]:
530 for after in list(range(0, 3)) + [6, 11, 19]:
531 start = max(before - 1, 1)
532 end = max(start + 4, original_n - after + 2)
533 for offset in range(start, end):
534 # if iteration > 2076:
536 cookie = get_cookie(controls, original_n)
537 vlv_search = encode_vlv_control(before=before,
544 res = self.ldb.search(self.ou,
545 scope=ldb.SCOPE_ONELEVEL,
547 controls=[sort_control,
550 controls = res.controls
551 results = [x[attr][0] for x in res]
552 real_offset = max(1, min(offset, len(expected_order)))
554 expected_results = []
556 begin_offset = max(real_offset - before - 1, 0)
557 real_before = min(before, real_offset - 1)
558 real_after = min(after,
559 len(expected_order) - real_offset)
561 for x in expected_order[begin_offset:]:
563 expected_results.append(get_bytes(x[0]))
564 if (len(expected_results) ==
565 real_before + real_after + 1):
570 if expected_results != results:
571 print("attr %s before %d after %d offset %d" %
572 (attr, before, after, offset))
573 self.assertEquals(expected_results, results)
576 if random.random() < 0.1 + (n < 5) * 0.05:
580 i = random.randrange(n)
581 user = self.create_user(i, n, suffix='-%s' %
584 if random.random() < 0.1 + (n > 50) * 0.02 and n:
585 index = random.randrange(n)
586 user = self.users.pop(index)
588 self.ldb.delete(user['dn'])
590 replaced = (user[attr], user['cn'])
591 if replaced in expected_order:
592 i = expected_order.index(replaced)
593 expected_order[i] = None
595 def test_server_vlv_with_cookie_while_changing(self):
596 """What happens if we modify items in the middle of the VLV?
598 The expected behaviour (as found on Windows) is the sort is
599 not repeated, but the changes in attributes are reflected.
601 attrs = [x for x in self.users[0].keys() if x not in
602 ('dn', 'objectclass', 'cn')]
604 n_users = len(self.users)
605 expected_order = [x.upper() for x in self.get_expected_order(attr)]
606 sort_control = "server_sort:1:0:%s" % attr
610 # First we'll fetch the whole list so we know the original
611 # sort order. This is necessary because we don't know how
612 # the server will order equivalent items. We are using the
614 half_n = n_users // 2
615 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
616 res = self.ldb.search(self.ou,
617 scope=ldb.SCOPE_ONELEVEL,
619 controls=[sort_control, vlv_search])
621 results = [x[attr][0].upper() for x in res]
622 #self.assertEquals(expected_order, results)
624 dn_order = [str(x['dn']) for x in res]
627 for before in range(0, 3):
628 for after in range(0, 3):
629 for offset in range(1 + before, n_users - after):
630 cookie = get_cookie(res.controls, len(self.users))
631 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
632 (before, after, offset, len(self.users),
635 res = self.ldb.search(self.ou,
636 scope=ldb.SCOPE_ONELEVEL,
638 controls=[sort_control,
641 dn_results = [str(x['dn']) for x in res]
642 dn_expected = dn_order[offset - before - 1:
645 self.assertEquals(dn_expected, dn_results)
647 results = [x[attr][0].upper() for x in res]
649 self.assertCorrectResults(results, values,
650 offset, before, after)
654 if (attr in self.locale_sorted_keys or
655 attr in self.binary_sorted_keys):
657 i2 = (i ^ 255) % n_users
662 if v2 in self.locale_sorted_keys:
664 cn1 = dn1.split(',', 1)[0][3:]
665 cn2 = dn2.split(',', 1)[0][3:]
670 m.dn = ldb.Dn(self.ldb, dn1)
671 m[attr] = ldb.MessageElement(v2,
672 ldb.FLAG_MOD_REPLACE,
677 def test_server_vlv_fractions_with_cookie(self):
678 """What happens when the count is set to an arbitrary number?
680 In that case the offset and the count form a fraction, and the
681 VLV should be centred at a point offset/count of the way
682 through. For example, if offset is 3 and count is 6, the VLV
683 should be looking around halfway. The actual algorithm is a
684 bit fiddlier than that, because of the one-basedness of VLV.
686 attrs = [x for x in self.users[0].keys() if x not in
687 ('dn', 'objectclass')]
689 n_users = len(self.users)
694 full_results, controls, sort_control = self.get_full_list(attr)
695 self.assertEqual(len(full_results), n_users)
696 for before in range(0, 2):
697 for after in range(0, 2):
698 for denominator in range(1, 20):
699 for offset in range(1, denominator + 3):
700 cookie = get_cookie(controls, len(self.users))
701 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
702 (before, after, offset,
706 res = self.ldb.search(self.ou,
707 scope=ldb.SCOPE_ONELEVEL,
709 controls=[sort_control,
711 except ldb.LdbError as e:
714 print("offset %d denominator %d raised error "
715 "expected error %s\n"
716 "(offset zero is illegal unless "
717 "content count is zero)" %
718 (offset, denominator, e))
721 results = [str(x[attr][0]).lower() for x in res]
724 denominator = n_users
727 elif denominator == 1:
728 # the offset can only be 1, but the 1/1 case
729 # means something special
731 real_offset = n_users
735 if offset > denominator:
738 int(round((n_users - 1) *
740 (denominator - 1.0)))
743 self.assertCorrectResults(results, full_results,
747 controls = res.controls
749 for c in list(controls):
751 if cstr.startswith('vlv_resp'):
752 bits = cstr.rsplit(':')
753 print("the answer is %s; we said %d" %
754 (bits[2], real_offset))
757 def test_server_vlv_no_cookie(self):
758 attrs = [x for x in self.users[0].keys() if x not in
759 ('dn', 'objectclass')]
762 expected_order = self.get_expected_order(attr)
763 sort_control = "server_sort:1:0:%s" % attr
764 for before in range(0, 5):
765 for after in range(0, 7):
766 for offset in range(1 + before, len(self.users) - after):
767 res = self.ldb.search(self.ou,
768 scope=ldb.SCOPE_ONELEVEL,
770 controls=[sort_control,
774 results = [x[attr][0] for x in res]
775 self.assertCorrectResults(results, expected_order,
776 offset, before, after)
778 def get_expected_order_showing_deleted(self, attr,
779 expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
781 scope=ldb.SCOPE_SUBTREE
783 """Fetch the whole list sorted on the attribute, using sort only,
784 searching in the entire tree, not just our OU. This is the
785 way to find deleted objects.
789 sort_control = "server_sort:1:0:%s" % attr
790 controls = [sort_control, "show_deleted:1"]
792 res = self.ldb.search(base,
794 expression=expression,
797 results = [x[attr][0] for x in res]
800 def add_deleted_users(self, n):
801 deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
804 for user in deleted_users:
805 self.delete_user(user)
807 def test_server_vlv_no_cookie_show_deleted(self):
808 """What do we see with the show_deleted control?"""
809 attrs = ['objectGUID',
818 # add some deleted users first, just in case there are none
819 self.add_deleted_users(6)
821 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
824 show_deleted_control = "show_deleted:1"
825 expected_order = self.get_expected_order_showing_deleted(attr,
827 n = len(expected_order)
828 sort_control = "server_sort:1:0:%s" % attr
829 for before in [3, 1, 0]:
831 # don't test every position, because there could be hundreds.
832 # jump back and forth instead
834 offset = random.randrange(max(1, before - 2),
835 min(n - after + 2, n))
836 res = self.ldb.search(self.base_dn,
837 expression=expression,
838 scope=ldb.SCOPE_SUBTREE,
840 controls=[sort_control,
841 show_deleted_control,
847 results = [x[attr][0] for x in res]
848 self.assertCorrectResults(results, expected_order,
849 offset, before, after)
851 def test_server_vlv_no_cookie_show_deleted_only(self):
852 """What do we see with the show_deleted control when we're not looking
853 at any non-deleted things"""
854 attrs = ['objectGUID',
861 # add some deleted users first, just in case there are none
862 self.add_deleted_users(4)
863 base = 'CN=Deleted Objects,%s' % self.base_dn
864 expression = "(cn=vlv-deleted*)"
866 show_deleted_control = "show_deleted:1"
867 expected_order = self.get_expected_order_showing_deleted(attr,
868 expression=expression,
870 scope=ldb.SCOPE_ONELEVEL)
871 print("searching for attr %s amongst %d deleted objects" %
872 (attr, len(expected_order)))
873 sort_control = "server_sort:1:0:%s" % attr
874 step = max(len(expected_order) // 10, 1)
875 for before in [3, 0]:
877 for offset in range(1 + before,
878 len(expected_order) - after,
880 res = self.ldb.search(base,
881 expression=expression,
882 scope=ldb.SCOPE_ONELEVEL,
884 controls=[sort_control,
885 show_deleted_control,
889 results = [x[attr][0] for x in res]
890 self.assertCorrectResults(results, expected_order,
891 offset, before, after)
893 def test_server_vlv_with_cookie_show_deleted(self):
894 """What do we see with the show_deleted control?"""
895 attrs = ['objectGUID',
903 self.add_deleted_users(6)
906 expected_order = self.get_expected_order(attr)
907 sort_control = "server_sort:1:0:%s" % attr
909 show_deleted_control = "show_deleted:1"
910 expected_order = self.get_expected_order_showing_deleted(attr)
911 n = len(expected_order)
912 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
913 for before in [3, 2, 1, 0]:
916 offset = random.randrange(max(1, before - 2),
917 min(n - after + 2, n))
919 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
922 cookie = get_cookie(res.controls, n)
923 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
924 (before, after, offset, n,
927 res = self.ldb.search(self.base_dn,
928 expression=expression,
929 scope=ldb.SCOPE_SUBTREE,
931 controls=[sort_control,
933 show_deleted_control])
935 results = [x[attr][0] for x in res]
937 self.assertCorrectResults(results, expected_order,
938 offset, before, after)
940 def test_server_vlv_gte_with_cookie(self):
941 attrs = [x for x in self.users[0].keys() if x not in
942 ('dn', 'objectclass')]
944 gte_order, expected_order, gte_map = \
945 self.get_gte_tests_and_order(attr)
946 # In case there is some order dependency, disorder tests
947 gte_tests = gte_order[:]
949 random.shuffle(gte_tests)
951 sort_control = "server_sort:1:0:%s" % attr
952 for before in [0, 1, 2, 4]:
953 for after in [0, 1, 3, 6]:
954 for gte in gte_tests:
956 cookie = get_cookie(res.controls, len(self.users))
959 vlv_search = encode_vlv_control(before=before,
964 res = self.ldb.search(self.ou,
965 scope=ldb.SCOPE_ONELEVEL,
967 controls=[sort_control,
970 results = [x[attr][0] for x in res]
971 offset = gte_map.get(gte, len(expected_order))
973 # here offset is 0-based
974 start = max(offset - before, 0)
975 end = offset + 1 + after
977 expected_results = expected_order[start: end]
979 self.assertEquals(expected_results, results)
981 def test_server_vlv_gte_no_cookie(self):
982 attrs = [x for x in self.users[0].keys() if x not in
983 ('dn', 'objectclass')]
986 gte_order, expected_order, gte_map = \
987 self.get_gte_tests_and_order(attr)
988 # In case there is some order dependency, disorder tests
989 gte_tests = gte_order[:]
991 random.shuffle(gte_tests)
993 sort_control = "server_sort:1:0:%s" % attr
994 for before in [0, 1, 3]:
996 for gte in gte_tests:
997 vlv_search = encode_vlv_control(before=before,
1001 res = self.ldb.search(self.ou,
1002 scope=ldb.SCOPE_ONELEVEL,
1004 controls=[sort_control,
1006 results = [x[attr][0] for x in res]
1008 # here offset is 0-based
1009 offset = gte_map.get(gte, len(expected_order))
1010 start = max(offset - before, 0)
1011 end = offset + after + 1
1012 expected_results = expected_order[start: end]
1014 if expected_results != results:
1015 middle = expected_order[len(expected_order) // 2]
1016 print(expected_results, results)
1018 print(expected_order)
1020 print("\nattr %s offset %d before %d "
1022 (attr, offset, before, after, gte))
1023 self.assertEquals(expected_results, results)
1025 def test_multiple_searches(self):
1026 """The maximum number of concurrent vlv searches per connection is
1027 currently set at 3. That means if you open 4 VLV searches the
1028 cookie on the first one should fail.
1030 # Windows has a limit of 10 VLVs where there are low numbers
1031 # of objects in each search.
1032 attrs = ([x for x in self.users[0].keys() if x not in
1033 ('dn', 'objectclass')] * 2)[:12]
1037 sort_control = "server_sort:1:0:%s" % attr
1039 res = self.ldb.search(self.ou,
1040 scope=ldb.SCOPE_ONELEVEL,
1042 controls=[sort_control,
1045 cookie = get_cookie(res.controls, len(self.users))
1046 vlv_cookies.append(cookie)
1049 # now this one should fail
1050 self.assertRaises(ldb.LdbError,
1053 scope=ldb.SCOPE_ONELEVEL,
1055 controls=[sort_control,
1056 "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1058 # and this one should succeed
1059 res = self.ldb.search(self.ou,
1060 scope=ldb.SCOPE_ONELEVEL,
1062 controls=[sort_control,
1063 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1065 # this one should fail because it is a new connection and
1066 # doesn't share cookies
1067 new_ldb = SamDB(host, credentials=creds,
1068 session_info=system_session(lp), lp=lp)
1070 self.assertRaises(ldb.LdbError,
1071 new_ldb.search, self.ou,
1072 scope=ldb.SCOPE_ONELEVEL,
1074 controls=[sort_control,
1075 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1077 # but now without the critical flag it just does no VLV.
1078 new_ldb.search(self.ou,
1079 scope=ldb.SCOPE_ONELEVEL,
1081 controls=[sort_control,
1082 "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1084 # Run a vlv search and return important fields of the response control
1085 def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
1086 sort_ctrl = "server_sort:1:0:%s" % attr
1087 ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
1089 ctrl += ":" + cookie
1091 res = self.ldb.search(self.ou,
1093 scope=ldb.SCOPE_ONELEVEL,
1095 controls=[ctrl, sort_ctrl])
1096 results = [str(x[attr][0]) for x in res]
1098 ctrls = [str(c) for c in res.controls if
1099 str(c).startswith('vlv')]
1100 self.assertEqual(len(ctrls), 1)
1102 spl = ctrls[0].rsplit(':')
1107 return results, cookie
1109 def test_vlv_modify_during_view(self):
1111 expr = "(objectclass=user)"
1114 full_results, cookie = self.vlv_search(attr, expr,
1115 after_count=len(self.users))
1118 edit_index = len(self.users)//2
1119 edit_attr = full_results[edit_index]
1120 users_with_attr = [u for u in self.users if u[attr] == edit_attr]
1121 self.assertEqual(len(users_with_attr), 1)
1122 edit_user = users_with_attr[0]
1124 # Put z at the front of the val so it comes last in ordering
1125 edit_val = "z_" + edit_user[attr]
1128 m.dn = ldb.Dn(self.ldb, edit_user['dn'])
1129 m[attr] = ldb.MessageElement(edit_val, ldb.FLAG_MOD_REPLACE, attr)
1132 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1133 after_count=len(self.users))
1135 # Make expected_results by copying and editing full_results
1136 expected_results = full_results[:]
1137 expected_results[edit_index] = edit_val
1138 self.assertEqual(results, expected_results)
1140 # Test changing the search expression in a request on an initialised view
1141 # Expected failure on samba, passes on windows
1142 def test_vlv_change_search_expr(self):
1144 expr = "(objectclass=user)"
1147 full_results, cookie = self.vlv_search(attr, expr,
1148 after_count=len(self.users))
1150 middle_index = len(full_results)//2
1151 # Search that excludes the old value but includes the new one
1152 expr = "%s>=%s" % (attr, full_results[middle_index])
1153 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1154 after_count=len(self.users))
1155 self.assertEqual(results, full_results[middle_index:])
1157 # Check you can't add a value to a vlv view
1158 def test_vlv_add_during_view(self):
1160 expr = "(objectclass=user)"
1163 full_results, cookie = self.vlv_search(attr, expr,
1164 after_count=len(self.users))
1166 # Add a user at the end of the sort order
1167 add_val = "z_addedval"
1168 user = {'cn': add_val, "objectclass": "user", attr: add_val}
1169 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1172 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1173 after_count=len(self.users)+1)
1174 self.assertEqual(results, full_results)
1176 def test_vlv_delete_during_view(self):
1178 expr = "(objectclass=user)"
1181 full_results, cookie = self.vlv_search(attr, expr,
1182 after_count=len(self.users))
1184 # Delete one of the users
1185 del_index = len(self.users)//2
1186 del_user = self.users[del_index]
1187 self.ldb.delete(del_user['dn'])
1189 results, cookie = self.vlv_search(attr, expr, cookie=cookie,
1190 after_count=len(self.users))
1191 expected_results = [r for r in full_results if r != del_user[attr]]
1192 self.assertEqual(results, expected_results)
1195 class PagedResultsTests(TestsWithUserOU):
1197 def paged_search(self, expr, cookie="", page_size=0, extra_ctrls=None,
1198 attrs=None, ou=None, subtree=False):
1201 cookie = ":" + cookie
1202 ctrl = "paged_results:1:" + str(page_size) + cookie
1205 # If extra controls are provided then add them, else default to
1206 # sort control on 'cn' attribute
1207 if extra_ctrls is not None:
1208 controls += extra_ctrls
1210 sort_ctrl = "server_sort:1:0:cn"
1211 controls.append(sort_ctrl)
1214 if attrs is not None:
1215 kwargs = {"attrs": attrs}
1217 scope = ldb.SCOPE_ONELEVEL
1219 scope = ldb.SCOPE_SUBTREE
1221 res = self.ldb.search(ou,
1226 results = [str(r['cn'][0]) for r in res]
1228 ctrls = [str(c) for c in res.controls if
1229 str(c).startswith("paged_results")]
1230 assert len(ctrls) == 1, "no paged_results response"
1232 spl = ctrls[0].rsplit(':', 3)
1236 return results, cookie
1238 def test_paged_delete_during_search(self):
1239 expr = "(objectClass=*)"
1243 results, cookie = self.paged_search(expr, page_size=first_page_size)
1245 # Run normal search to get expected results
1246 unedited_results, _ = self.paged_search(expr,
1247 page_size=len(self.users))
1249 # Get remaining users not returned by the search above
1250 unreturned_users = [u for u in self.users if u['cn'] not in results]
1252 # Delete one of the users
1253 del_index = len(self.users)//2
1254 del_user = unreturned_users[del_index]
1255 self.ldb.delete(del_user['dn'])
1258 results, _ = self.paged_search(expr, cookie=cookie,
1259 page_size=len(self.users))
1260 expected_results = [r for r in unedited_results[first_page_size:]
1261 if r != del_user['cn']]
1262 self.assertEqual(results, expected_results)
1264 def test_paged_show_deleted(self):
1265 unique = time.strftime("%s", time.gmtime())[-5:]
1266 prefix = "show_deleted_test_%s_" % (unique)
1267 expr = "(&(objectClass=user)(cn=%s*))" % (prefix)
1268 del_ctrl = "show_deleted:1"
1272 for i in range(num_users):
1273 user = self.create_user(i, num_users, prefix=prefix)
1276 first_user = users[0]
1277 self.ldb.delete(first_user['dn'])
1281 results, cookie = self.paged_search(expr, page_size=first_page_size,
1282 extra_ctrls=[del_ctrl],
1286 # Get remaining users not returned by the search above
1287 unreturned_users = [u for u in users if u['cn'] not in results]
1289 # Delete one of the users
1290 del_index = len(users)//2
1291 del_user = unreturned_users[del_index]
1292 self.ldb.delete(del_user['dn'])
1294 results2, _ = self.paged_search(expr, cookie=cookie,
1295 page_size=len(users)*2,
1296 extra_ctrls=[del_ctrl],
1300 user_cns = {str(u['cn']) for u in users}
1301 deleted_cns = {first_user['cn'], del_user['cn']}
1303 all_results = results + results2
1304 normal_results = {r for r in all_results if "DEL:" not in r}
1305 self.assertEqual(normal_results, user_cns - deleted_cns)
1307 # Deleted results get "\nDEL:<GUID>" added to the CN, so cut it out.
1308 deleted_results = {r[:r.index('\n')] for r in all_results
1310 self.assertEqual(deleted_results, deleted_cns)
1312 def test_paged_add_during_search(self):
1313 expr = "(objectClass=*)"
1317 results, cookie = self.paged_search(expr, page_size=first_page_size)
1319 unedited_results, _ = self.paged_search(expr,
1320 page_size=len(self.users)+1)
1322 # Get remaining users not returned by the search above
1323 unwalked_users = [cn for cn in unedited_results if cn not in results]
1325 # Add a user in the middle of the sort order
1326 middle_index = len(unwalked_users)//2
1327 middle_user = unwalked_users[middle_index]
1329 user = {'cn': middle_user + '_2', "objectclass": "user"}
1330 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
1333 results, _ = self.paged_search(expr, cookie=cookie,
1334 page_size=len(self.users)+1)
1335 expected_results = unwalked_users[:]
1337 # Uncomment this line to assert that adding worked.
1338 # expected_results.insert(middle_index+1, user['cn'])
1340 self.assertEqual(results, expected_results)
1342 def test_paged_rename_during_search(self):
1343 expr = "(objectClass=*)"
1347 results, cookie = self.paged_search(expr, page_size=first_page_size)
1349 unedited_results, _ = self.paged_search(expr,
1350 page_size=len(self.users)+1)
1352 # Modify user in the middle of the remaining sort order
1353 unwalked_users = [cn for cn in unedited_results if cn not in results]
1354 middle_index = len(unwalked_users)//2
1355 middle_cn = unwalked_users[middle_index]
1358 users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1359 self.assertEqual(len(users_with_middle_cn), 1)
1360 middle_user = users_with_middle_cn[0]
1363 edit_cn = "z_" + middle_cn
1364 new_dn = middle_user['dn'].replace(middle_cn, edit_cn)
1365 self.ldb.rename(middle_user['dn'], new_dn)
1367 results, _ = self.paged_search(expr, cookie=cookie,
1368 page_size=len(self.users)+1)
1369 expected_results = unwalked_users[:]
1370 expected_results[middle_index] = edit_cn
1371 self.assertEqual(results, expected_results)
1373 def test_paged_modify_object_scope(self):
1374 expr = "(objectClass=*)"
1376 ou2 = "OU=vlvtestou2,%s" % (self.tree_dn)
1377 self.ldb.add({"dn": ou2, "objectclass": "organizationalUnit"})
1379 # Do a separate, full search to get all results
1380 unedited_results, _ = self.paged_search(expr,
1381 page_size=len(self.users)+1)
1383 # Rename before starting a search
1384 first_cn = self.users[0]['cn']
1385 new_dn = "CN=%s,%s" % (first_cn, ou2)
1386 self.ldb.rename(self.users[0]['dn'], new_dn)
1388 # Start new search under the original OU
1390 results, cookie = self.paged_search(expr, page_size=first_page_size)
1391 self.assertEqual(results, unedited_results[1:1+first_page_size])
1393 # Get one of the users that is yet to be returned
1394 unwalked_users = [cn for cn in unedited_results if cn not in results]
1395 middle_index = len(unwalked_users)//2
1396 middle_cn = unwalked_users[middle_index]
1399 users_with_middle_cn = [u for u in self.users if u['cn'] == middle_cn]
1400 self.assertEqual(len(users_with_middle_cn), 1)
1401 middle_user = users_with_middle_cn[0]
1404 new_dn = "CN=%s,%s" % (middle_cn, ou2)
1405 self.ldb.rename(middle_user['dn'], new_dn)
1407 results, _ = self.paged_search(expr, cookie=cookie,
1408 page_size=len(self.users)+1)
1410 expected_results = unwalked_users[:]
1412 # We should really expect that the object renamed into a different
1413 # OU should vanish from the results, but turns out Windows does return
1414 # the object in this case. Our module matches the Windows behaviour.
1416 # If behaviour changes, this line inverts the test's expectations to
1417 # what you might expect.
1418 # del expected_results[middle_index]
1420 # But still expect the user we removed before the search to be gone
1421 del expected_results[0]
1423 self.assertEqual(results, expected_results)
1425 def test_paged_modify_one_during_search(self):
1426 prefix = "change_during_search_"
1428 users = [self.create_user(i, num_users, prefix=prefix)
1429 for i in range(num_users)]
1430 expr = "(&(objectClass=user)(facsimileTelephoneNumber=%s*))" % (prefix)
1432 # Get the first page, then change the searched attribute and
1433 # try for the second page.
1434 results, cookie = self.paged_search(expr, page_size=1)
1435 self.assertEqual(len(results), 1)
1436 unwalked_users = [u for u in users if u['cn'] != results[0]]
1437 self.assertEqual(len(unwalked_users), num_users-1)
1439 mod_dn = unwalked_users[0]['dn']
1440 self.ldb.modify_ldif("dn: %s\n"
1441 "changetype: modify\n"
1442 "replace: facsimileTelephoneNumber\n"
1443 "facsimileTelephoneNumber: 123" % mod_dn)
1445 results, _ = self.paged_search(expr, cookie=cookie,
1446 page_size=len(self.users))
1447 expected_cns = {u['cn'] for u in unwalked_users if u['dn'] != mod_dn}
1448 self.assertEqual(set(results), expected_cns)
1450 def test_paged_modify_all_during_search(self):
1451 prefix = "change_during_search_"
1453 users = [self.create_user(i, num_users, prefix=prefix)
1454 for i in range(num_users)]
1455 expr = "(&(objectClass=user)(facsimileTelephoneNumber=%s*))" % (prefix)
1457 # Get the first page, then change the searched attribute and
1458 # try for the second page.
1459 results, cookie = self.paged_search(expr, page_size=1)
1460 unwalked_users = [u for u in users if u['cn'] != results[0]]
1463 self.ldb.modify_ldif("dn: %s\n"
1464 "changetype: modify\n"
1465 "replace: facsimileTelephoneNumber\n"
1466 "facsimileTelephoneNumber: 123" % u['dn'])
1468 results, _ = self.paged_search(expr, cookie=cookie,
1469 page_size=len(self.users))
1470 self.assertEqual(results, [])
1472 def assertPagedSearchRaises(self, err_num, expr, cookie, attrs=None,
1475 results, _ = self.paged_search(expr, cookie=cookie,
1477 extra_ctrls=extra_ctrls,
1479 except ldb.LdbError as e:
1480 self.assertEqual(e.args[0], err_num)
1483 self.fail("No error raised by invalid search")
1485 def test_paged_changed_expr(self):
1486 # Initiate search then use a different expr in subsequent req
1487 expr = "(objectClass=*)"
1488 results, cookie = self.paged_search(expr, page_size=3)
1490 expected_error_num = 12
1491 self.assertPagedSearchRaises(expected_error_num, expr, cookie)
1493 def test_paged_changed_controls(self):
1494 expr = "(objectClass=*)"
1495 sort_ctrl = "server_sort:1:0:cn"
1496 del_ctrl = "show_deleted:1"
1497 expected_error_num = 12
1500 # Initiate search with a sort control then remove in subsequent req
1501 results, cookie = self.paged_search(expr, page_size=ps,
1502 extra_ctrls=[sort_ctrl])
1503 self.assertPagedSearchRaises(expected_error_num, expr,
1504 cookie, extra_ctrls=[])
1506 # Initiate search with no sort control then add one in subsequent req
1507 results, cookie = self.paged_search(expr, page_size=ps,
1509 self.assertPagedSearchRaises(expected_error_num, expr,
1510 cookie, extra_ctrls=[sort_ctrl])
1512 # Initiate search with show-deleted control then
1513 # remove it in subsequent req
1514 results, cookie = self.paged_search(expr, page_size=ps,
1515 extra_ctrls=[del_ctrl])
1516 self.assertPagedSearchRaises(expected_error_num, expr,
1517 cookie, extra_ctrls=[])
1519 # Initiate normal search then add show-deleted control
1521 results, cookie = self.paged_search(expr, page_size=ps,
1523 self.assertPagedSearchRaises(expected_error_num, expr,
1524 cookie, extra_ctrls=[del_ctrl])
1526 # Changing order of controls shouldn't break the search
1527 results, cookie = self.paged_search(expr, page_size=ps,
1528 extra_ctrls=[del_ctrl, sort_ctrl])
1530 results, cookie = self.paged_search(expr, page_size=ps,
1531 extra_ctrls=[sort_ctrl,
1533 except ldb.LdbError as e:
1536 def test_paged_cant_change_controls_data(self):
1537 # Some defaults for the rest of the tests
1538 expr = "(objectClass=*)"
1539 sort_ctrl = "server_sort:1:0:cn"
1540 expected_error_num = 12
1542 # Initiate search with sort control then change it in subsequent req
1543 results, cookie = self.paged_search(expr, page_size=3,
1544 extra_ctrls=[sort_ctrl])
1545 changed_sort_ctrl = "server_sort:1:0:roomNumber"
1546 self.assertPagedSearchRaises(expected_error_num, expr,
1547 cookie, extra_ctrls=[changed_sort_ctrl])
1549 # Initiate search with a control with crit=1, then use crit=0
1550 results, cookie = self.paged_search(expr, page_size=3,
1551 extra_ctrls=[sort_ctrl])
1552 changed_sort_ctrl = "server_sort:0:0:cn"
1553 self.assertPagedSearchRaises(expected_error_num, expr,
1554 cookie, extra_ctrls=[changed_sort_ctrl])
1556 def test_paged_search_referrals(self):
1557 expr = "(objectClass=*)"
1558 paged_ctrl = "paged_results:1:5"
1559 res = self.ldb.search(self.base_dn,
1562 scope=ldb.SCOPE_SUBTREE,
1563 controls=[paged_ctrl])
1565 # Do a paged search walk over the whole database and save a list
1566 # of all the referrals returned by each search.
1570 referral_lists.append(res.referals)
1572 ctrls = [str(c) for c in res.controls if
1573 str(c).startswith("paged_results")]
1574 self.assertEqual(len(ctrls), 1)
1575 spl = ctrls[0].rsplit(':')
1580 res = self.ldb.search(self.base_dn,
1583 scope=ldb.SCOPE_SUBTREE,
1584 controls=[paged_ctrl + ":" + cookie])
1586 ref_list = referral_lists[0]
1588 # Sanity check to make sure the search actually did something
1589 self.assertGreater(len(referral_lists), 2)
1591 # Check the first referral set contains stuff
1592 self.assertGreater(len(ref_list), 0)
1594 # Check the others don't
1595 self.assertTrue(all([len(l) == 0 for l in referral_lists[1:]]))
1597 # Check the entries in the first referral list look like referrals
1598 self.assertTrue(all([s.startswith('ldap://') for s in ref_list]))
1600 def test_paged_change_attrs(self):
1601 expr = "(objectClass=*)"
1603 expected_error_num = 12
1605 results, cookie = self.paged_search(expr, page_size=3, attrs=attrs)
1606 results, cookie = self.paged_search(expr, cookie=cookie, page_size=3,
1609 changed_attrs = attrs + ['roomNumber']
1610 self.assertPagedSearchRaises(expected_error_num, expr,
1611 cookie, attrs=changed_attrs,
1614 def test_paged_search_lockstep(self):
1615 expr = "(objectClass=*)"
1618 all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
1620 # Run two different but overlapping paged searches simultaneously.
1621 set_1_index = int((len(all_results))//3)
1622 set_2_index = int((2*len(all_results))//3)
1623 set_1 = all_results[set_1_index:]
1624 set_2 = all_results[:set_2_index+1]
1625 set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
1626 set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
1628 results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
1629 self.assertEqual(results, set_1[:ps])
1630 results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
1631 self.assertEqual(results, set_2[:ps])
1633 results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
1635 self.assertEqual(results, set_1[ps:ps*2])
1636 results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
1638 self.assertEqual(results, set_2[ps:ps*2])
1640 results, _ = self.paged_search(set_1_expr, cookie=cookie1,
1641 page_size=len(self.users))
1642 self.assertEqual(results, set_1[ps*2:])
1643 results, _ = self.paged_search(set_2_expr, cookie=cookie2,
1644 page_size=len(self.users))
1645 self.assertEqual(results, set_2[ps*2:])
1648 if "://" not in host:
1649 if os.path.isfile(host):
1650 host = "tdb://%s" % host
1652 host = "ldap://%s" % host
1655 TestProgram(module=__name__, opts=subunitopts)