From dcd68bce1826641d4d881ad5c94c251a6ab175f0 Mon Sep 17 00:00:00 2001 From: Aaron Haslett Date: Fri, 21 Sep 2018 17:55:42 +1200 Subject: [PATCH] ldb: complex expression testing Tests that prepare complex ldap expressions and equivalent python expressions, then compare the results of the two. Signed-off-by: Aaron Haslett Reviewed-by: Gary Lockyer Reviewed-by: Andrew Bartlett Reviewed-by: Noel Power Autobuild-User(master): Gary Lockyer Autobuild-Date(master): Fri Dec 7 07:07:08 CET 2018 on sn-devel-144 --- python/samba/tests/complex_expressions.py | 473 ++++++++++++++++++++++ selftest/knownfail.d/complex_expressions | 4 + source4/selftest/tests.py | 6 + 3 files changed, 483 insertions(+) create mode 100644 python/samba/tests/complex_expressions.py create mode 100644 selftest/knownfail.d/complex_expressions diff --git a/python/samba/tests/complex_expressions.py b/python/samba/tests/complex_expressions.py new file mode 100644 index 00000000000..75ea61f12d5 --- /dev/null +++ b/python/samba/tests/complex_expressions.py @@ -0,0 +1,473 @@ +# -*- coding: utf-8 -*- + +# Copyright Andrew Bartlett 2018 +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +from __future__ import print_function +import optparse +import samba +import samba.getopt as options +import sys +import os +import time +from samba.auth import system_session +from samba.tests import TestCase +import ldb + +ERRCODE_ENTRY_EXISTS = 68 +ERRCODE_OPERATIONS_ERROR = 1 +ERRCODE_INVALID_VALUE = 21 +ERRCODE_CLASS_VIOLATION = 65 + +parser = optparse.OptionParser("{0} ".format(sys.argv[0])) +sambaopts = options.SambaOptions(parser) + +# use command line creds if available +credopts = options.CredentialsOptions(parser) +parser.add_option_group(credopts) +parser.add_option("-v", action="store_true", dest="verbose", + help="print successful expression outputs") +opts, args = parser.parse_args() + +if len(args) < 1: + parser.print_usage() + sys.exit(1) + +lp = sambaopts.get_loadparm() +creds = credopts.get_credentials(lp) + +# Set properly at end of file. +host = None + +global ou_count +ou_count = 0 + + +class ComplexExpressionTests(TestCase): + # Using setUpClass instead of setup because we're not modifying any + # records in the tests + @classmethod + def setUpClass(cls): + super(ComplexExpressionTests, cls).setUpClass() + cls.samdb = samba.samdb.SamDB(host, lp=lp, + session_info=system_session(), + credentials=creds) + + ou_name = "ComplexExprTest" + cls.base_dn = "OU={0},{1}".format(ou_name, cls.samdb.domain_dn()) + + try: + cls.samdb.delete(cls.base_dn, ["tree_delete:1"]) + except: + pass + + try: + cls.samdb.create_ou(cls.base_dn) + except ldb.LdbError as e: + if e.args[0] == ERRCODE_ENTRY_EXISTS: + print(('test ou {ou} already exists. Delete with ' + '"samba-tool group delete OU={ou} ' + '--force-subtree-delete"').format(ou=ou_name)) + raise e + + cls.name_template = "testuser{0}" + cls.default_n = 10 + + # These fields are carefully hand-picked from the schema. They have + # syntax and handling appropriate for our test structure. + cls.largeint_f = "accountExpires" + cls.str_f = "accountNameHistory" + cls.int_f = "flags" + cls.enum_f = "preferredDeliveryMethod" + cls.time_f = "msTSExpireDate" + cls.ranged_int_f = "countryCode" + + @classmethod + def tearDownClass(cls): + cls.samdb.delete(cls.base_dn, ["tree_delete:1"]) + + # Make test OU containing users with field=val for each val + def make_test_objects(self, field, vals): + global ou_count + ou_count += 1 + ou_dn = "OU=testou{0},{1}".format(ou_count, self.base_dn) + self.samdb.create_ou(ou_dn) + + ldap_objects = [{"dn": "CN=testuser{0},{1}".format(n, ou_dn), + "name": self.name_template.format(n), + "objectClass": "user", + field: n} + for n in vals] + + for ldap_object in ldap_objects: + # It's useful to keep appropriate python types in the ldap_object + # dict but smdb's 'add' function expects strings. + stringed_ldap_object = {k: str(v) + for (k, v) in ldap_object.items()} + try: + self.samdb.add(stringed_ldap_object) + except ldb.LdbError as e: + print("failed to add %s" % (stringed_ldap_object)) + raise e + + return ou_dn, ldap_objects + + # Run search expr and print out time. This function should be used for + # almost all searching. + def time_ldap_search(self, expr, dn): + time_taken = 0 + try: + start_time = time.time() + res = self.samdb.search(base=dn, + scope=ldb.SCOPE_SUBTREE, + expression=expr) + time_taken = time.time() - start_time + except Exception as e: + print("failed expr " + expr) + raise e + print("{0} took {1}s".format(expr, time_taken)) + return res, time_taken + + # Take an ldap expression and an equivalent python expression. + # Run and time the ldap expression and compare the result to the python + # expression run over the a list of ldap_object dicts. + def assertLDAPQuery(self, ldap_expr, ou_dn, py_expr, ldap_objects): + + # run (and time) the LDAP search expression over the DB + res, time_taken = self.time_ldap_search(ldap_expr, ou_dn) + results = {str(row.get('name')[0]) for row in res} + + # build the set of expected results by evaluating the python-equivalent + # of the search expression over the same set of objects + expected_results = set() + for ldap_object in ldap_objects: + try: + final_expr = py_expr.format(**ldap_object) + except KeyError: + # If the format on the py_expr hits a key error, then + # ldap_object doesn't have the field, so it shouldn't match. + continue + + if eval(final_expr): + expected_results.add(str(ldap_object['name'])) + + self.assertEqual(results, expected_results) + + if opts.verbose: + ldap_object_names = {l['name'] for l in ldap_objects} + excluded = ldap_object_names - results + excluded = "\n ".join(excluded) or "[NOTHING]" + returned = "\n ".join(expected_results) or "[NOTHING]" + + print("PASS: Expression {0} took {1}s and returned:" + "\n {2}\n" + "Excluded:\n {3}\n".format(ldap_expr, + time_taken, + returned, + excluded)) + + # Basic integer range test + def test_int_range(self, field=None): + n = self.default_n + field = field or self.int_f + ou_dn, ldap_objects = self.make_test_objects(field, range(n)) + + expr = "(&(%s>=%s)(%s<=%s))" % (field, n-1, field, n+1) + py_expr = "%d <= {%s} <= %d" % (n-1, field, n+1) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Same test again for largeint and enum + def test_largeint_range(self): + self.test_int_range(self.largeint_f) + + def test_enum_range(self): + self.test_int_range(self.enum_f) + + # Special range test for integer field with upper and lower bounds defined. + # The bounds are checked on insertion, not search, so we should be able + # to compare to a constant that is outside bounds. + def test_ranged_int_range(self): + field = self.ranged_int_f + ubound = 2**16 + width = 8 + + vals = list(range(ubound-width, ubound)) + ou_dn, ldap_objects = self.make_test_objects(field, vals) + + # Check <= value above overflow returns all vals + expr = "(%s<=%d)" % (field, ubound+5) + py_expr = "{%s} <= %d" % (field, ubound+5) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Test range also works for time fields + def test_time_range(self): + n = self.default_n + field = self.time_f + n = self.default_n + width = int(n/2) + + base_time = 20050116175514 + time_range = [base_time + t for t in range(-width, width)] + time_range = [str(t) + ".0Z" for t in time_range] + ou_dn, ldap_objects = self.make_test_objects(field, time_range) + + expr = "(%s<=%s)" % (field, str(base_time) + ".0Z") + py_expr = 'int("{%s}"[:-3]) <= %d' % (field, base_time) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + expr = "(&(%s>=%s)(%s<=%s))" % (field, str(base_time-1) + ".0Z", + field, str(base_time+1) + ".0Z") + py_expr = '%d <= int("{%s}"[:-3]) <= %d' % (base_time-1, + field, + base_time+1) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Run each comparison op on a simple test set. Time taken will be printed. + def test_int_single_cmp_op_speeds(self, field=None): + n = self.default_n + field = field or self.int_f + ou_dn, ldap_objects = self.make_test_objects(field, range(n)) + + comp_ops = ['=', '<=', '>='] + py_comp_ops = ['==', '<=', '>='] + exprs = ["(%s%s%d)" % (field, c, n) for c in comp_ops] + py_exprs = ["{%s}%s%d" % (field, c, n) for c in py_comp_ops] + + for expr, py_expr in zip(exprs, py_exprs): + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + def test_largeint_single_cmp_op_speeds(self): + self.test_int_single_cmp_op_speeds(self.largeint_f) + + def test_enum_single_cmp_op_speeds(self): + self.test_int_single_cmp_op_speeds(self.enum_f) + + # Check strings are ordered using a naive ordering. + def test_str_ordering(self): + field = self.str_f + a_ord = ord('A') + n = 10 + str_range = ['abc{0}d'.format(chr(c)) for c in range(a_ord, a_ord+n)] + ou_dn, ldap_objects = self.make_test_objects(field, str_range) + half_n = int(a_ord + n/2) + + # Basic <= and >= statements + expr = "(%s>=abc%s)" % (field, chr(half_n)) + py_expr = "'{%s}' >= 'abc%s'" % (field, chr(half_n)) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + expr = "(%s<=abc%s)" % (field, chr(half_n)) + py_expr = "'{%s}' <= 'abc%s'" % (field, chr(half_n)) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # String range + expr = "(&(%s>=abc%s)(%s<=abc%s))" % (field, chr(half_n-2), + field, chr(half_n+2)) + py_expr = "'abc%s' <= '{%s}' <= 'abc%s'" % (chr(half_n-2), + field, + chr(half_n+2)) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Integers treated as string + expr = "(%s>=1)" % (field) + py_expr = "'{%s}' >= '1'" % (field) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Windows returns nothing for invalid expressions. Expected fail on samba. + def test_invalid_expressions(self, field=None): + field = field or self.int_f + n = self.default_n + ou_dn, ldap_objects = self.make_test_objects(field, list(range(n))) + int_expressions = ["(%s>=abc)", + "(%s<=abc)", + "(%s=abc)"] + + for expr in int_expressions: + expr = expr % (field) + self.assertLDAPQuery(expr, ou_dn, "False", ldap_objects) + + def test_largeint_invalid_expressions(self): + self.test_invalid_expressions(self.largeint_f) + + def test_enum_invalid_expressions(self): + self.test_invalid_expressions(self.enum_f) + + def test_case_insensitive(self): + str_range = ["äbc"+str(n) for n in range(10)] + ou_dn, ldap_objects = self.make_test_objects(self.str_f, str_range) + + expr = "(%s=äbc1)" % (self.str_f) + pyexpr = '"{%s}"=="äbc1"' % (self.str_f) + self.assertLDAPQuery(expr, ou_dn, pyexpr, ldap_objects) + + expr = "(%s=ÄbC1)" % (self.str_f) + self.assertLDAPQuery(expr, ou_dn, pyexpr, ldap_objects) + + # Check negative numbers can be entered and compared + def test_negative_cmp(self, field=None): + field = field or self.int_f + width = 6 + around_zero = list(range(-width, width)) + ou_dn, ldap_objects = self.make_test_objects(field, around_zero) + + expr = "(%s>=-3)" % (field) + py_expr = "{%s} >= -3" % (field) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + def test_negative_cmp_largeint(self): + self.test_negative_cmp(self.largeint_f) + + def test_negative_cmp_enum(self): + self.test_negative_cmp(self.enum_f) + + # Check behaviour on insertion and comparison of zero-prefixed numbers. + # Samba errors on insertion, Windows strips the leading zeroes. + def test_zero_prefix(self, field=None): + field = field or self.int_f + + # Test comparison with 0-prefixed constants. + n = self.default_n + ou_dn, ldap_objects = self.make_test_objects(field, list(range(n))) + + expr = "(%s>=00%d)" % (field, n/2) + py_expr = "{%s} >= %d" % (field, n/2) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Delete the test OU so we don't mix it up with the next one. + self.samdb.delete(ou_dn, ["tree_delete:1"]) + + # Try inserting 0-prefixed numbers, check it fails. + zero_pref_nums = ['00'+str(num) for num in range(n)] + try: + ou_dn, ldap_objects = self.make_test_objects(field, zero_pref_nums) + except ldb.LdbError as e: + if e.args[0] != ERRCODE_INVALID_VALUE: + raise e + return + + # Samba doesn't get this far - the exception is raised. Windows allows + # the insertion and removes the leading 0s as tested below. + # Either behaviour is fine. + print("LDAP allowed insertion of 0-prefixed nums for field " + field) + + res = self.samdb.search(base=ou_dn, + scope=ldb.SCOPE_SUBTREE, + expression="(objectClass=user)") + returned_nums = [str(r.get(field)[0]) for r in res] + expect = [str(n) for n in range(n)] + self.assertEqual(set(returned_nums), set(expect)) + + expr = "(%s>=%d)" % (field, n/2) + py_expr = "{%s} >= %d" % (field, n/2) + for ldap_object in ldap_objects: + ldap_object[field] = int(ldap_object[field]) + + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + def test_zero_prefix_largeint(self): + self.test_zero_prefix(self.largeint_f) + + def test_zero_prefix_enum(self): + self.test_zero_prefix(self.enum_f) + + # Check integer overflow is handled as best it can be. + def test_int_overflow(self, field=None, of=None): + field = field or self.int_f + of = of or 2**31-1 + width = 8 + + vals = list(range(of-width, of+width)) + ou_dn, ldap_objects = self.make_test_objects(field, vals) + + # Check ">=overflow" doesn't return vals past overflow + expr = "(%s>=%d)" % (field, of-3) + py_expr = "%d <= {%s} <= %d" % (of-3, field, of) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # "<=overflow" returns everything + expr = "(%s<=%d)" % (field, of) + py_expr = "True" + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Values past overflow should be negative + expr = "(&(%s<=%d)(%s>=0))" % (field, of, field) + py_expr = "{%s} <= %d" % (field, of) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + expr = "(%s<=0)" % (field) + py_expr = "{%s} >= %d" % (field, of+1) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + + # Get the values back out and check vals past overflow are negative. + res = self.samdb.search(base=ou_dn, + scope=ldb.SCOPE_SUBTREE, + expression="(objectClass=user)") + returned_nums = [str(r.get(field)[0]) for r in res] + + # Note: range(a,b) == [a..b-1] (confusing) + up_to_overflow = list(range(of-width, of+1)) + negatives = list(range(-of-1, -of+width-2)) + + expect = [str(n) for n in up_to_overflow + negatives] + self.assertEqual(set(returned_nums), set(expect)) + + def test_enum_overflow(self): + self.test_int_overflow(self.enum_f, 2**31-1) + + # Check cmp works on uSNChanged. We can't insert uSNChanged vals, they get + # added automatically so we'll just insert some objects and go with what + # we get. + def test_usnchanged(self): + field = "uSNChanged" + n = 10 + # Note we can't actually set uSNChanged via LDAP (LDB ignores it), + # so the input val range doesn't matter here + ou_dn, _ = self.make_test_objects(field, list(range(n))) + + # Get the assigned uSNChanged values + res = self.samdb.search(base=ou_dn, + scope=ldb.SCOPE_SUBTREE, + expression="(objectClass=user)") + + # Our vals got ignored so make ldap_objects from search result + ldap_objects = [{'name': str(r['name'][0]), + field: int(r[field][0])} + for r in res] + + # Get the median val and use as the number in the test search expr. + nums = [l[field] for l in ldap_objects] + nums = list(sorted(nums)) + search_num = nums[int(len(nums)/2)] + + expr = "(&(%s<=%d)(objectClass=user))" % (field, search_num) + py_expr = "{%s} <= %d" % (field, search_num) + self.assertLDAPQuery(expr, ou_dn, py_expr, ldap_objects) + +# If we're called independently then import subunit, get host from first +# arg and run. Otherwise, subunit ran us so just set host from env. +# We always try to run over LDAP rather than direct file, so that +# search timings are not impacted by opening and closing the tdb file. +if __name__ == "__main__": + from samba.tests.subunitrun import TestProgram + host = args[0] + + if "://" not in host: + if os.path.isfile(host): + host = "tdb://%s" % host + else: + host = "ldap://%s" % host + TestProgram(module=__name__) +else: + host = "ldap://" + os.getenv("SERVER") diff --git a/selftest/knownfail.d/complex_expressions b/selftest/knownfail.d/complex_expressions new file mode 100644 index 00000000000..8ec468ac79b --- /dev/null +++ b/selftest/knownfail.d/complex_expressions @@ -0,0 +1,4 @@ +# Ldb accepts invalid search expressions and returns weird results. +.*samba.tests.complex_expressions.ComplexExpressionTests.test_largeint_invalid_expressions.* +.*samba.tests.complex_expressions.ComplexExpressionTests.test_enum_invalid_expressions.* +.*samba.tests.complex_expressions.ComplexExpressionTests.test_invalid_expressions.* diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 575c14b8be2..c676b878b83 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -627,6 +627,12 @@ def planoldpythontestsuite(env, module, name=None, extra_path=[], environ={}, ex args[args.index(subunitrun)] = subunitrun3 plantestsuite_loadlist(name + ".python3", env, args) +# Run complex search expressions test once for each database backend. +# Right now ad_dc has mdb and ad_dc_ntvfs has tdb +mdb_testenv = "ad_dc" +tdb_testenv = "ad_dc_ntvfs" +for testenv in [mdb_testenv, tdb_testenv]: + planoldpythontestsuite(testenv, "samba.tests.complex_expressions", extra_args=['-U"$USERNAME%$PASSWORD"'], py3_compatible=True) planoldpythontestsuite("ad_dc_ntvfs:local", "samba.tests.gensec", extra_args=['-U"$USERNAME%$PASSWORD"'], py3_compatible=True) planoldpythontestsuite("none", "simple", extra_path=["%s/lib/tdb/python/tests" % srcdir()], name="tdb.python", py3_compatible=True) -- 2.34.1