2 # -*- coding: utf-8 -*-
3 # Originally based on ./sam.py
11 sys.path.insert(0, "bin/python")
13 from samba.tests.subunitrun import SubunitOptions, TestProgram
15 import samba.getopt as options
17 from samba.auth import system_session
19 from samba.samdb import SamDB
23 parser = optparse.OptionParser("vlv.py [options] <host>")
24 sambaopts = options.SambaOptions(parser)
25 parser.add_option_group(sambaopts)
26 parser.add_option_group(options.VersionOptions(parser))
27 # use command line creds if available
28 credopts = options.CredentialsOptions(parser)
29 parser.add_option_group(credopts)
30 subunitopts = SubunitOptions(parser)
31 parser.add_option_group(subunitopts)
33 parser.add_option('--elements', type='int', default=20,
34 help="use this many elements in the tests")
36 parser.add_option('--delete-in-setup', action='store_true',
37 help="cleanup in next setup rather than teardown")
39 parser.add_option('--skip-attr-regex',
40 help="ignore attributes matching this regex")
42 opts, args = parser.parse_args()
50 lp = sambaopts.get_loadparm()
51 creds = credopts.get_credentials(lp)
53 N_ELEMENTS = opts.elements
56 class VlvTestException(Exception):
60 def encode_vlv_control(critical=1,
66 s = "vlv:%d:%d:%d:" % (critical, before, after)
68 if offset is not None:
69 m = "%d:%d" % (offset, n)
70 elif ':' in gte or '\x00' in gte:
71 gte = base64.b64encode(gte)
72 m = "base64>=%s" % gte
79 return s + m + ':' + cookie
82 def get_cookie(controls, expected_n=None):
83 """Get the cookie, STILL base64 encoded, or raise ValueError."""
84 for c in list(controls):
86 if cstr.startswith('vlv_resp'):
87 head, n, _, cookie = cstr.rsplit(':', 3)
88 if expected_n is not None and int(n) != expected_n:
89 raise ValueError("Expected %s items, server said %s" %
92 raise ValueError("there is no VLV response")
95 class VLVTests(samba.tests.TestCase):
97 def create_user(self, i, n, prefix='vlvtest', suffix='', attrs=None):
98 name = "%s%d%s" % (prefix, i, suffix)
101 "objectclass": "user",
102 'givenName': "abcdefghijklmnopqrstuvwxyz"[i % 26],
103 "roomNumber": "%sbc" % (n - i),
105 "employeeNumber": "%s%sx" % (abs(i * (99 - i)), '\n' * (i & 255)),
106 "accountExpires": "%s" % (10 ** 9 + 1000000 * i),
107 "msTSExpireDate4": "19%02d0101010000.0Z" % (i % 100),
108 "flags": str(i * (n - i)),
109 "serialNumber": "abc %s%s%s" % ('AaBb |-/'[i & 7],
114 # _user_broken_attrs tests are broken due to problems outside
116 _user_broken_attrs = {
117 # Sort doesn't look past a NUL byte.
118 "photo": "\x00%d" % (n - i),
119 "audio": "%sn octet string %s%s ♫♬\x00lalala" % ('Aa'[i & 1],
121 "displayNamePrintable": "%d\x00%c" % (i, i & 255),
122 "adminDisplayName": "%d\x00b" % (n-i),
123 "title": "%d%sb" % (n - i, '\x00' * i),
124 "comment": "Favourite colour is %d" % (n % (i + 1)),
126 # Names that vary only in case. Windows returns
127 # equivalent addresses in the order they were put
128 # in ('a st', 'A st',...).
129 "street": "%s st" % (chr(65 | (i & 14) | ((i & 1) * 32))),
132 if attrs is not None:
135 user['dn'] = "cn=%s,%s" % (user['cn'], self.ou)
137 if opts.skip_attr_regex:
138 match = re.compile(opts.skip_attr_regex).search
139 for k in user.keys():
143 self.users.append(user)
148 super(VLVTests, self).setUp()
149 self.ldb = SamDB(host, credentials=creds,
150 session_info=system_session(lp), lp=lp)
152 self.base_dn = self.ldb.domain_dn()
153 self.ou = "ou=vlv,%s" % self.base_dn
154 if opts.delete_in_setup:
156 self.ldb.delete(self.ou, ['tree_delete:1'])
157 except ldb.LdbError, e:
158 print "tried deleting %s, got error %s" % (self.ou, e)
161 "objectclass": "organizationalUnit"})
164 for i in range(N_ELEMENTS):
165 self.create_user(i, N_ELEMENTS)
167 attrs = self.users[0].keys()
168 self.binary_sorted_keys = ['audio',
172 "displayNamePrintable"]
174 self.numeric_sorted_keys = ['flags',
177 self.timestamp_keys = ['msTSExpireDate4']
179 self.int64_keys = set(['accountExpires'])
181 self.locale_sorted_keys = [x for x in attrs if
182 x not in (self.binary_sorted_keys +
183 self.numeric_sorted_keys)]
185 # don't try spaces, etc in cn
186 self.delicate_keys = ['cn']
189 super(VLVTests, self).tearDown()
190 if not opts.delete_in_setup:
191 self.ldb.delete(self.ou, ['tree_delete:1'])
193 def get_full_list(self, attr, include_cn=False):
194 """Fetch the whole list sorted on the attribute, using the VLV.
195 This way you get a VLV cookie."""
196 n_users = len(self.users)
197 sort_control = "server_sort:1:0:%s" % attr
198 half_n = n_users // 2
199 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
203 res = self.ldb.search(self.ou,
204 scope=ldb.SCOPE_ONELEVEL,
206 controls=[sort_control,
209 full_results = [(x[attr][0], x['cn'][0]) for x in res]
211 full_results = [x[attr][0].lower() for x in res]
212 controls = res.controls
213 return full_results, controls, sort_control
215 def get_expected_order(self, attr, expression=None):
216 """Fetch the whole list sorted on the attribute, using sort only."""
217 sort_control = "server_sort:1:0:%s" % attr
218 res = self.ldb.search(self.ou,
219 scope=ldb.SCOPE_ONELEVEL,
220 expression=expression,
222 controls=[sort_control])
223 results = [x[attr][0] for x in res]
226 def delete_user(self, user):
227 self.ldb.delete(user['dn'])
228 del self.users[self.users.index(user)]
230 def get_gte_tests_and_order(self, attr, expression=None):
231 expected_order = self.get_expected_order(attr, expression=expression)
233 if attr in self.delicate_keys:
241 elif attr in self.timestamp_keys:
252 elif attr not in self.numeric_sorted_keys:
266 gte_keys.append(expected_order[len(expected_order) // 2] + ' tail')
269 # "numeric" means positive integers
270 # doesn't work with -1, 3.14, ' 3', '9' * 20
277 if attr in self.int64_keys:
278 gte_keys += ['3' * 12, '71' * 8]
280 for i, x in enumerate(gte_keys):
281 user = self.create_user(i, N_ELEMENTS,
284 gte_users.append(user)
286 gte_order = self.get_expected_order(attr)
287 for user in gte_users:
288 self.delete_user(user)
291 expected_order_2 = self.get_expected_order(attr, expression=expression)
292 self.assertEqual(expected_order, expected_order_2)
294 # Map gte tests to indexes in expected order. This will break
295 # if gte_order and expected_order are differently ordered (as
299 # index to the first one with each value
301 for i, k in enumerate(expected_order):
302 if k not in index_map:
317 gte_map[k] = len(expected_order)
322 print " %10s => %10s" % (k, gte_map[k])
324 return gte_order, expected_order, gte_map
326 def assertCorrectResults(self, results, expected_order,
327 offset, before, after):
328 """A helper to calculate offsets correctly and say as much as possible
329 when something goes wrong."""
331 start = max(offset - before - 1, 0)
333 expected_results = expected_order[start: end]
335 # if it is a tuple with the cn, drop the cn
336 if expected_results and isinstance(expected_results[0], tuple):
337 expected_results = [x[0] for x in expected_results]
339 if expected_results == results:
342 if expected_order is not None:
343 print "expected order: %s" % expected_order[:20]
344 if len(expected_order) > 20:
345 print "... and %d more not shown" % (len(expected_order) - 20)
347 print "offset %d before %d after %d" % (offset, before, after)
348 print "start %d end %d" % (start, end)
349 print "expected: %s" % expected_results
350 print "got : %s" % results
351 self.assertEquals(expected_results, results)
353 def test_server_vlv_with_cookie(self):
354 attrs = [x for x in self.users[0].keys() if x not in
355 ('dn', 'objectclass')]
357 expected_order = self.get_expected_order(attr)
358 sort_control = "server_sort:1:0:%s" % attr
361 for before in [10, 0, 3, 1, 4, 5, 2]:
362 for after in [0, 3, 1, 4, 5, 2, 7]:
363 for offset in range(max(1, before - 2),
364 min(n - after + 2, n)):
366 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
369 cookie = get_cookie(res.controls, n)
370 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
371 (before, after, offset, n,
374 res = self.ldb.search(self.ou,
375 scope=ldb.SCOPE_ONELEVEL,
377 controls=[sort_control,
380 results = [x[attr][0] for x in res]
382 self.assertCorrectResults(results, expected_order,
383 offset, before, after)
385 def run_index_tests_with_expressions(self, expressions):
386 # Here we don't test every before/after combination.
387 attrs = [x for x in self.users[0].keys() if x not in
388 ('dn', 'objectclass')]
390 for expression in expressions:
391 expected_order = self.get_expected_order(attr, expression)
392 sort_control = "server_sort:1:0:%s" % attr
394 n = len(expected_order)
395 for before in range(0, 11):
397 for offset in range(max(1, before - 2),
398 min(n - after + 2, n)):
400 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
403 cookie = get_cookie(res.controls)
404 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
405 (before, after, offset, n,
408 res = self.ldb.search(self.ou,
409 expression=expression,
410 scope=ldb.SCOPE_ONELEVEL,
412 controls=[sort_control,
415 results = [x[attr][0] for x in res]
417 self.assertCorrectResults(results, expected_order,
418 offset, before, after)
420 def test_server_vlv_with_expression(self):
421 """What happens when we run the VLV with an expression?"""
422 expressions = ["(objectClass=*)",
423 "(cn=%s)" % self.users[-1]['cn'],
424 "(roomNumber=%s)" % self.users[0]['roomNumber'],
426 self.run_index_tests_with_expressions(expressions)
428 def test_server_vlv_with_failing_expression(self):
429 """What happens when we run the VLV on an expression that matches
431 expressions = ["(samaccountname=testferf)",
434 self.run_index_tests_with_expressions(expressions)
436 def run_gte_tests_with_expressions(self, expressions):
437 # Here we don't test every before/after combination.
438 attrs = [x for x in self.users[0].keys() if x not in
439 ('dn', 'objectclass')]
440 for expression in expressions:
442 gte_order, expected_order, gte_map = \
443 self.get_gte_tests_and_order(attr, expression)
444 # In case there is some order dependency, disorder tests
445 gte_tests = gte_order[:]
447 random.shuffle(gte_tests)
449 sort_control = "server_sort:1:0:%s" % attr
451 expected_order = self.get_expected_order(attr, expression)
452 sort_control = "server_sort:1:0:%s" % attr
454 for before in range(0, 11):
456 for gte in gte_tests:
458 cookie = get_cookie(res.controls)
461 vlv_search = encode_vlv_control(before=before,
466 res = self.ldb.search(self.ou,
467 scope=ldb.SCOPE_ONELEVEL,
468 expression=expression,
470 controls=[sort_control,
473 results = [x[attr][0] for x in res]
474 offset = gte_map.get(gte, len(expected_order))
476 # here offset is 0-based
477 start = max(offset - before, 0)
478 end = offset + 1 + after
480 expected_results = expected_order[start: end]
482 self.assertEquals(expected_results, results)
484 def test_vlv_gte_with_expression(self):
485 """What happens when we run the VLV with an expression?"""
486 expressions = ["(objectClass=*)",
487 "(cn=%s)" % self.users[-1]['cn'],
488 "(roomNumber=%s)" % self.users[0]['roomNumber'],
490 self.run_gte_tests_with_expressions(expressions)
492 def test_vlv_gte_with_failing_expression(self):
493 """What happens when we run the VLV on an expression that matches
495 expressions = ["(samaccountname=testferf)",
498 self.run_gte_tests_with_expressions(expressions)
500 def test_server_vlv_with_cookie_while_adding_and_deleting(self):
501 """What happens if we add or remove items in the middle of the VLV?
503 Nothing. The search and the sort is not repeated, and we only
504 deal with the objects originally found.
506 attrs = ['cn'] + [x for x in self.users[0].keys() if x not in
507 ('dn', 'objectclass')]
511 full_results, controls, sort_control = \
512 self.get_full_list(attr, True)
513 original_n = len(self.users)
515 expected_order = full_results
518 for before in range(0, 3) + [6, 11, 19]:
519 for after in range(0, 3) + [6, 11, 19]:
520 start = max(before - 1, 1)
521 end = max(start + 4, original_n - after + 2)
522 for offset in range(start, end):
523 #if iteration > 2076:
525 cookie = get_cookie(controls, original_n)
526 vlv_search = encode_vlv_control(before=before,
533 res = self.ldb.search(self.ou,
534 scope=ldb.SCOPE_ONELEVEL,
536 controls=[sort_control,
539 controls = res.controls
540 results = [x[attr][0] for x in res]
541 real_offset = max(1, min(offset, len(expected_order)))
543 expected_results = []
545 begin_offset = max(real_offset - before - 1, 0)
546 real_before = min(before, real_offset - 1)
547 real_after = min(after,
548 len(expected_order) - real_offset)
550 for x in expected_order[begin_offset:]:
552 expected_results.append(x[0])
553 if (len(expected_results) ==
554 real_before + real_after + 1):
559 if expected_results != results:
560 print ("attr %s before %d after %d offset %d" %
561 (attr, before, after, offset))
562 self.assertEquals(expected_results, results)
565 if random.random() < 0.1 + (n < 5) * 0.05:
569 i = random.randrange(n)
570 user = self.create_user(i, n, suffix='-%s' %
573 if random.random() < 0.1 + (n > 50) * 0.02 and n:
574 index = random.randrange(n)
575 user = self.users.pop(index)
577 self.ldb.delete(user['dn'])
579 replaced = (user[attr], user['cn'])
580 if replaced in expected_order:
581 i = expected_order.index(replaced)
582 expected_order[i] = None
584 def test_server_vlv_with_cookie_while_changing(self):
585 """What happens if we modify items in the middle of the VLV?
587 The expected behaviour (as found on Windows) is the sort is
588 not repeated, but the changes in attributes are reflected.
590 attrs = [x for x in self.users[0].keys() if x not in
591 ('dn', 'objectclass', 'cn')]
593 n_users = len(self.users)
594 expected_order = [x.upper() for x in self.get_expected_order(attr)]
595 sort_control = "server_sort:1:0:%s" % attr
599 # First we'll fetch the whole list so we know the original
600 # sort order. This is necessary because we don't know how
601 # the server will order equivalent items. We are using the
603 half_n = n_users // 2
604 vlv_search = "vlv:1:%d:%d:%d:0" % (half_n, half_n, half_n + 1)
605 res = self.ldb.search(self.ou,
606 scope=ldb.SCOPE_ONELEVEL,
608 controls=[sort_control, vlv_search])
610 results = [x[attr][0].upper() for x in res]
611 #self.assertEquals(expected_order, results)
613 dn_order = [str(x['dn']) for x in res]
616 for before in range(0, 3):
617 for after in range(0, 3):
618 for offset in range(1 + before, n_users - after):
619 cookie = get_cookie(res.controls, len(self.users))
620 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
621 (before, after, offset, len(self.users),
624 res = self.ldb.search(self.ou,
625 scope=ldb.SCOPE_ONELEVEL,
627 controls=[sort_control,
630 dn_results = [str(x['dn']) for x in res]
631 dn_expected = dn_order[offset - before - 1:
634 self.assertEquals(dn_expected, dn_results)
636 results = [x[attr][0].upper() for x in res]
638 self.assertCorrectResults(results, values,
639 offset, before, after)
643 if (attr in self.locale_sorted_keys or
644 attr in self.binary_sorted_keys):
646 i2 = (i ^ 255) % n_users
651 if v2 in self.locale_sorted_keys:
653 cn1 = dn1.split(',', 1)[0][3:]
654 cn2 = dn2.split(',', 1)[0][3:]
659 m.dn = ldb.Dn(self.ldb, dn1)
660 m[attr] = ldb.MessageElement(v2,
661 ldb.FLAG_MOD_REPLACE,
666 def test_server_vlv_fractions_with_cookie(self):
667 """What happens when the count is set to an arbitrary number?
669 In that case the offset and the count form a fraction, and the
670 VLV should be centred at a point offset/count of the way
671 through. For example, if offset is 3 and count is 6, the VLV
672 should be looking around halfway. The actual algorithm is a
673 bit fiddlier than that, because of the one-basedness of VLV.
675 attrs = [x for x in self.users[0].keys() if x not in
676 ('dn', 'objectclass')]
678 n_users = len(self.users)
683 full_results, controls, sort_control = self.get_full_list(attr)
684 self.assertEqual(len(full_results), n_users)
685 for before in range(0, 2):
686 for after in range(0, 2):
687 for denominator in range(1, 20):
688 for offset in range(1, denominator + 3):
689 cookie = get_cookie(controls, len(self.users))
690 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
691 (before, after, offset,
695 res = self.ldb.search(self.ou,
696 scope=ldb.SCOPE_ONELEVEL,
698 controls=[sort_control,
700 except ldb.LdbError, e:
703 print ("offset %d denominator %d raised error "
704 "expected error %s\n"
705 "(offset zero is illegal unless "
706 "content count is zero)" %
707 (offset, denominator, e))
710 results = [x[attr][0].lower() for x in res]
713 denominator = n_users
716 elif denominator == 1:
717 # the offset can only be 1, but the 1/1 case
718 # means something special
720 real_offset = n_users
724 if offset > denominator:
727 int(round((n_users - 1) *
729 (denominator - 1.0)))
732 self.assertCorrectResults(results, full_results,
736 controls = res.controls
738 for c in list(controls):
740 if cstr.startswith('vlv_resp'):
741 bits = cstr.rsplit(':')
742 print ("the answer is %s; we said %d" %
743 (bits[2], real_offset))
746 def test_server_vlv_no_cookie(self):
747 attrs = [x for x in self.users[0].keys() if x not in
748 ('dn', 'objectclass')]
751 expected_order = self.get_expected_order(attr)
752 sort_control = "server_sort:1:0:%s" % attr
753 for before in range(0, 5):
754 for after in range(0, 7):
755 for offset in range(1 + before, len(self.users) - after):
756 res = self.ldb.search(self.ou,
757 scope=ldb.SCOPE_ONELEVEL,
759 controls=[sort_control,
763 results = [x[attr][0] for x in res]
764 self.assertCorrectResults(results, expected_order,
765 offset, before, after)
767 def get_expected_order_showing_deleted(self, attr,
768 expression="(|(cn=vlvtest*)(cn=vlv-deleted*))",
770 scope=ldb.SCOPE_SUBTREE
772 """Fetch the whole list sorted on the attribute, using sort only,
773 searching in the entire tree, not just our OU. This is the
774 way to find deleted objects.
778 sort_control = "server_sort:1:0:%s" % attr
779 controls = [sort_control, "show_deleted:1"]
781 res = self.ldb.search(base,
783 expression=expression,
786 results = [x[attr][0] for x in res]
789 def add_deleted_users(self, n):
790 deleted_users = [self.create_user(i, n, prefix='vlv-deleted')
793 for user in deleted_users:
794 self.delete_user(user)
796 def test_server_vlv_no_cookie_show_deleted(self):
797 """What do we see with the show_deleted control?"""
798 attrs = ['objectGUID',
807 # add some deleted users first, just in case there are none
808 self.add_deleted_users(6)
810 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
813 show_deleted_control = "show_deleted:1"
814 expected_order = self.get_expected_order_showing_deleted(attr,
816 n = len(expected_order)
817 sort_control = "server_sort:1:0:%s" % attr
818 for before in [3, 1, 0]:
820 # don't test every position, because there could be hundreds.
821 # jump back and forth instead
823 offset = random.randrange(max(1, before - 2),
824 min(n - after + 2, n))
825 res = self.ldb.search(self.base_dn,
826 expression=expression,
827 scope=ldb.SCOPE_SUBTREE,
829 controls=[sort_control,
830 show_deleted_control,
836 results = [x[attr][0] for x in res]
837 self.assertCorrectResults(results, expected_order,
838 offset, before, after)
840 def test_server_vlv_no_cookie_show_deleted_only(self):
841 """What do we see with the show_deleted control when we're not looking
842 at any non-deleted things"""
843 attrs = ['objectGUID',
850 # add some deleted users first, just in case there are none
851 self.add_deleted_users(4)
852 base = 'CN=Deleted Objects,%s' % self.base_dn
853 expression = "(cn=vlv-deleted*)"
855 show_deleted_control = "show_deleted:1"
856 expected_order = self.get_expected_order_showing_deleted(attr,
857 expression=expression,
859 scope=ldb.SCOPE_ONELEVEL)
860 print ("searching for attr %s amongst %d deleted objects" %
861 (attr, len(expected_order)))
862 sort_control = "server_sort:1:0:%s" % attr
863 step = max(len(expected_order) // 10, 1)
864 for before in [3, 0]:
866 for offset in range(1 + before,
867 len(expected_order) - after,
869 res = self.ldb.search(base,
870 expression=expression,
871 scope=ldb.SCOPE_ONELEVEL,
873 controls=[sort_control,
874 show_deleted_control,
878 results = [x[attr][0] for x in res]
879 self.assertCorrectResults(results, expected_order,
880 offset, before, after)
884 def test_server_vlv_with_cookie_show_deleted(self):
885 """What do we see with the show_deleted control?"""
886 attrs = ['objectGUID',
894 self.add_deleted_users(6)
897 expected_order = self.get_expected_order(attr)
898 sort_control = "server_sort:1:0:%s" % attr
900 show_deleted_control = "show_deleted:1"
901 expected_order = self.get_expected_order_showing_deleted(attr)
902 n = len(expected_order)
903 expression = "(|(cn=vlvtest*)(cn=vlv-deleted*))"
904 for before in [3, 2, 1, 0]:
907 offset = random.randrange(max(1, before - 2),
908 min(n - after + 2, n))
910 vlv_search = "vlv:1:%d:%d:%d:0" % (before, after,
913 cookie = get_cookie(res.controls, n)
914 vlv_search = ("vlv:1:%d:%d:%d:%s:%s" %
915 (before, after, offset, n,
918 res = self.ldb.search(self.base_dn,
919 expression=expression,
920 scope=ldb.SCOPE_SUBTREE,
922 controls=[sort_control,
924 show_deleted_control])
926 results = [x[attr][0] for x in res]
928 self.assertCorrectResults(results, expected_order,
929 offset, before, after)
932 def test_server_vlv_gte_with_cookie(self):
933 attrs = [x for x in self.users[0].keys() if x not in
934 ('dn', 'objectclass')]
936 gte_order, expected_order, gte_map = \
937 self.get_gte_tests_and_order(attr)
938 # In case there is some order dependency, disorder tests
939 gte_tests = gte_order[:]
941 random.shuffle(gte_tests)
943 sort_control = "server_sort:1:0:%s" % attr
944 for before in [0, 1, 2, 4]:
945 for after in [0, 1, 3, 6]:
946 for gte in gte_tests:
948 cookie = get_cookie(res.controls, len(self.users))
951 vlv_search = encode_vlv_control(before=before,
956 res = self.ldb.search(self.ou,
957 scope=ldb.SCOPE_ONELEVEL,
959 controls=[sort_control,
962 results = [x[attr][0] for x in res]
963 offset = gte_map.get(gte, len(expected_order))
965 # here offset is 0-based
966 start = max(offset - before, 0)
967 end = offset + 1 + after
969 expected_results = expected_order[start: end]
971 self.assertEquals(expected_results, results)
973 def test_server_vlv_gte_no_cookie(self):
974 attrs = [x for x in self.users[0].keys() if x not in
975 ('dn', 'objectclass')]
978 gte_order, expected_order, gte_map = \
979 self.get_gte_tests_and_order(attr)
980 # In case there is some order dependency, disorder tests
981 gte_tests = gte_order[:]
983 random.shuffle(gte_tests)
985 sort_control = "server_sort:1:0:%s" % attr
986 for before in [0, 1, 3]:
988 for gte in gte_tests:
989 vlv_search = encode_vlv_control(before=before,
993 res = self.ldb.search(self.ou,
994 scope=ldb.SCOPE_ONELEVEL,
996 controls=[sort_control,
998 results = [x[attr][0] for x in res]
1000 # here offset is 0-based
1001 offset = gte_map.get(gte, len(expected_order))
1002 start = max(offset - before, 0)
1003 end = offset + after + 1
1004 expected_results = expected_order[start: end]
1006 if expected_results != results:
1007 middle = expected_order[len(expected_order) // 2]
1008 print expected_results, results
1010 print expected_order
1012 print ("\nattr %s offset %d before %d "
1014 (attr, offset, before, after, gte))
1015 self.assertEquals(expected_results, results)
1017 def test_multiple_searches(self):
1018 """The maximum number of concurrent vlv searches per connection is
1019 currently set at 3. That means if you open 4 VLV searches the
1020 cookie on the first one should fail.
1022 # Windows has a limit of 10 VLVs where there are low numbers
1023 # of objects in each search.
1024 attrs = ([x for x in self.users[0].keys() if x not in
1025 ('dn', 'objectclass')] * 2)[:12]
1029 sort_control = "server_sort:1:0:%s" % attr
1031 res = self.ldb.search(self.ou,
1032 scope=ldb.SCOPE_ONELEVEL,
1034 controls=[sort_control,
1037 cookie = get_cookie(res.controls, len(self.users))
1038 vlv_cookies.append(cookie)
1041 # now this one should fail
1042 self.assertRaises(ldb.LdbError,
1045 scope=ldb.SCOPE_ONELEVEL,
1047 controls=[sort_control,
1048 "vlv:1:1:1:1:0:%s" % vlv_cookies[0]])
1050 # and this one should succeed
1051 res = self.ldb.search(self.ou,
1052 scope=ldb.SCOPE_ONELEVEL,
1054 controls=[sort_control,
1055 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1057 # this one should fail because it is a new connection and
1058 # doesn't share cookies
1059 new_ldb = SamDB(host, credentials=creds,
1060 session_info=system_session(lp), lp=lp)
1062 self.assertRaises(ldb.LdbError,
1063 new_ldb.search, self.ou,
1064 scope=ldb.SCOPE_ONELEVEL,
1066 controls=[sort_control,
1067 "vlv:1:1:1:1:0:%s" % vlv_cookies[-1]])
1069 # but now without the critical flag it just does no VLV.
1070 new_ldb.search(self.ou,
1071 scope=ldb.SCOPE_ONELEVEL,
1073 controls=[sort_control,
1074 "vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
1077 if "://" not in host:
1078 if os.path.isfile(host):
1079 host = "tdb://%s" % host
1081 host = "ldap://%s" % host
1084 TestProgram(module=__name__, opts=subunitopts)