CVE-2023-4154 dsdb/tests: Extend attribute read DirSync tests
authorAndrew Bartlett <abartlet@samba.org>
Tue, 22 Aug 2023 03:08:17 +0000 (15:08 +1200)
committerJule Anger <janger@samba.org>
Sun, 8 Oct 2023 20:06:12 +0000 (22:06 +0200)
The aim here is to document the expected (even if not implemented)
SEARCH_FLAG_RODC_ATTRIBUTE vs SEARCH_FLAG_CONFIDENTIAL, behaviour, so
that any change once CVE-2023-4154 is fixed can be noted.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15424

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
selftest/knownfail.d/dirsync
source4/dsdb/tests/python/dirsync.py

index 9367f92e109c503a13f46aa426c8cd893de040e9..db098549a08dfd9c22c7ab72bba2cf5bc42f457c 100644 (file)
@@ -1 +1,14 @@
-^samba4.ldap.dirsync.python\(.*\).__main__.SimpleDirsyncTests.test_dirsync_unicodePwd
\ No newline at end of file
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialDirsyncTests.test_dirsync_OBJECT_SECURITY_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialDirsyncTests.test_dirsync_unicodePwd_OBJ_SEC_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialDirsyncTests.test_dirsync_unicodePwd_with_GET_CHANGES\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialDirsyncTests.test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialDirsyncTests.test_dirsync_unicodePwd_with_GET_CHANGES_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialDirsyncTests.test_dirsync_with_GET_CHANGES_OBJECT_SECURITY_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialFilteredDirsyncTests.test_dirsync_OBJECT_SECURITY_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialFilteredDirsyncTests.test_dirsync_OBJECT_SECURITY_with_GET_CHANGES_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialFilteredDirsyncTests.test_dirsync_with_GET_CHANGES\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialFilteredDirsyncTests.test_dirsync_with_GET_CHANGES_attr\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.ConfidentialFilteredDirsyncTests.test_dirsync_with_GET_CHANGES_insist_on_empty_element\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.FilteredDirsyncTests.test_dirsync_with_GET_CHANGES\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.FilteredDirsyncTests.test_dirsync_with_GET_CHANGES_attr\(.*\)
+^samba4.ldap.dirsync.python\(.*\).__main__.FilteredDirsyncTests.test_dirsync_with_GET_CHANGES_insist_on_empty_element\(.*\)
index 2cacaf01251a7de7a9fa2611f05a0c842e35f780..a0691f0afe062dc271c2f4697931f5400de50f2e 100755 (executable)
@@ -3,6 +3,7 @@
 # Unit tests for dirsync control
 # Copyright (C) Matthieu Patou <mat@matws.net> 2011
 # Copyright (C) Jelmer Vernooij <jelmer@samba.org> 2014
+# Copyright (C) Catalyst.Net Ltd
 #
 # This program is free software; you can redistribute it and/or modify
 # it under the terms of the GNU General Public License as published by
@@ -30,7 +31,8 @@ import base64
 import ldb
 from ldb import LdbError, SCOPE_BASE
 from ldb import Message, MessageElement, Dn
-from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
+from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE
+from samba.dsdb import SEARCH_FLAG_CONFIDENTIAL, SEARCH_FLAG_RODC_ATTRIBUTE
 from samba.dcerpc import security, misc, drsblobs
 from samba.ndr import ndr_unpack, ndr_pack
 
@@ -60,7 +62,6 @@ if len(args) < 1:
 host = args.pop()
 if "://" not in host:
     ldaphost = "ldap://%s" % host
-    ldapshost = "ldaps://%s" % host
 else:
     ldaphost = host
     start = host.rindex("://")
@@ -77,8 +78,8 @@ creds = credopts.get_credentials(lp)
 class DirsyncBaseTests(samba.tests.TestCase):
 
     def setUp(self):
-        super(DirsyncBaseTests, self).setUp()
-        self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
+        super().setUp()
+        self.ldb_admin = SamDB(ldaphost, credentials=creds, session_info=system_session(lp), lp=lp)
         self.base_dn = self.ldb_admin.domain_dn()
         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
         self.user_pass = samba.generate_random_password(12, 16)
@@ -87,63 +88,60 @@ class DirsyncBaseTests(samba.tests.TestCase):
         # used for anonymous login
         print("baseDN: %s" % self.base_dn)
 
-    def get_user_dn(self, name):
-        return "CN=%s,CN=Users,%s" % (name, self.base_dn)
+        userou = "OU=dirsync-test"
+        self.ou = f"{userou},{self.base_dn}"
+        samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
+        self.ldb_admin.create_ou(self.ou)
+        self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
 
-    def get_ldb_connection(self, target_username, target_password):
-        creds_tmp = Credentials()
-        creds_tmp.set_username(target_username)
-        creds_tmp.set_password(target_password)
-        creds_tmp.set_domain(creds.get_domain())
-        creds_tmp.set_realm(creds.get_realm())
-        creds_tmp.set_workstation(creds.get_workstation())
-        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
-                                      | gensec.FEATURE_SEAL)
-        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
-        ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
-        return ldb_target
-
-
-# tests on ldap add operations
-class SimpleDirsyncTests(DirsyncBaseTests):
-
-    def setUp(self):
-        super(SimpleDirsyncTests, self).setUp()
         # Regular user
         self.dirsync_user = "test_dirsync_user"
         self.simple_user = "test_simple_user"
         self.admin_user = "test_admin_user"
-        self.ouname = None
+        self.dirsync_pass = self.user_pass
+        self.simple_pass = self.user_pass
+        self.admin_pass = self.user_pass
 
-        self.ldb_admin.newuser(self.dirsync_user, self.user_pass)
-        self.ldb_admin.newuser(self.simple_user, self.user_pass)
-        self.ldb_admin.newuser(self.admin_user, self.user_pass)
+        self.ldb_admin.newuser(self.dirsync_user, self.dirsync_pass, userou=userou)
+        self.ldb_admin.newuser(self.simple_user, self.simple_pass, userou=userou)
+        self.ldb_admin.newuser(self.admin_user, self.admin_pass, userou=userou)
         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
 
         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
         mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
                                    str(user_sid))
         self.sd_utils.dacl_add_ace(self.base_dn, mod)
+        self.addCleanup(self.sd_utils.dacl_delete_aces, self.base_dn, mod)
 
         # add admins to the Domain Admins group
         self.ldb_admin.add_remove_group_members("Domain Admins", [self.admin_user],
                                                 add_members_operation=True)
 
-    def tearDown(self):
-        super(SimpleDirsyncTests, self).tearDown()
-        delete_force(self.ldb_admin, self.get_user_dn(self.dirsync_user))
-        delete_force(self.ldb_admin, self.get_user_dn(self.simple_user))
-        delete_force(self.ldb_admin, self.get_user_dn(self.admin_user))
-        if self.ouname:
-            delete_force(self.ldb_admin, self.ouname)
-        self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
+    def get_user_dn(self, name):
+        return ldb.Dn(self.ldb_admin, "CN={0},{1}".format(name, self.ou))
+
+    def get_ldb_connection(self, target_username, target_password):
+        creds_tmp = Credentials()
+        creds_tmp.set_username(target_username)
+        creds_tmp.set_password(target_password)
+        creds_tmp.set_domain(creds.get_domain())
+        creds_tmp.set_realm(creds.get_realm())
+        creds_tmp.set_workstation(creds.get_workstation())
+        creds_tmp.set_gensec_features(creds_tmp.get_gensec_features()
+                                      | gensec.FEATURE_SEAL)
+        creds_tmp.set_kerberos_state(DONT_USE_KERBEROS)  # kinit is too expensive to use in a tight loop
+        ldb_target = SamDB(url=ldaphost, credentials=creds_tmp, lp=lp)
+        return ldb_target
+
+# tests on ldap add operations
+class SimpleDirsyncTests(DirsyncBaseTests):
 
     # def test_dirsync_errors(self):
 
     def test_dirsync_supported(self):
         """Test the basic of the dirsync is supported"""
         self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         res = self.ldb_admin.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
         res = self.ldb_dirsync.search(self.base_dn, expression="samaccountname=*", controls=["dirsync:1:0:1"])
         try:
@@ -169,8 +167,8 @@ class SimpleDirsyncTests(DirsyncBaseTests):
 
     def test_dirsync_errors(self):
         """Test if dirsync returns the correct LDAP errors in case of pb"""
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
-        self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self.ldb_dirsync = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
         try:
             self.ldb_simple.search(self.base_dn,
                                    expression="samaccountname=*",
@@ -292,11 +290,11 @@ class SimpleDirsyncTests(DirsyncBaseTests):
                                     attrs=["parentGUID"],
                                     controls=["dirsync:1:0:1"])
         self.assertEqual(len(res.msgs), 0)
-        ouname = "OU=testou,%s" % self.base_dn
+        ouname = "OU=testou,%s" % self.ou
         self.ouname = ouname
         self.ldb_admin.create_ou(ouname)
         delta = Message()
-        delta.dn = Dn(self.ldb_admin, str(ouname))
+        delta.dn = Dn(self.ldb_admin, ouname)
         delta["cn"] = MessageElement("test ou",
                                      FLAG_MOD_ADD,
                                      "cn")
@@ -457,7 +455,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
     def test_dirsync_linkedattributes_OBJECT_SECURITY(self):
         """Check that dirsync returned deleted objects too"""
         # Let's search for members
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         res = self.ldb_simple.search(self.base_dn,
                                      expression="(name=Administrators)",
                                      controls=["dirsync:1:1:1"])
@@ -582,7 +580,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
                                     controls=controls)
 
     def test_dirsync_linkedattributes_range(self):
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         res = self.ldb_admin.search(self.base_dn,
                                     attrs=["member;range=1-1"],
                                     expression="(name=Administrators)",
@@ -594,7 +592,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         self.assertTrue(len(res[0].get("member")) > 0)
 
     def test_dirsync_linkedattributes_range_user(self):
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         try:
             res = self.ldb_simple.search(self.base_dn,
                                          attrs=["member;range=1-1"],
@@ -608,7 +606,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
 
     def test_dirsync_linkedattributes(self):
         flag_incr_linked = 2147483648
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         res = self.ldb_admin.search(self.base_dn,
                                     attrs=["member"],
                                     expression="(name=Administrators)",
@@ -676,7 +674,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
     def test_dirsync_extended_dn(self):
         """Check that dirsync works together with the extended_dn control"""
         # Let's search for members
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         res = self.ldb_simple.search(self.base_dn,
                                      expression="(name=Administrators)",
                                      controls=["dirsync:1:1:1"])
@@ -707,7 +705,7 @@ class SimpleDirsyncTests(DirsyncBaseTests):
     def test_dirsync_deleted_items_OBJECT_SECURITY(self):
         """Check that dirsync returned deleted objects too"""
         # Let's create an OU
-        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.user_pass)
+        self.ldb_simple = self.get_ldb_connection(self.simple_user, self.simple_pass)
         ouname = "OU=testou3,%s" % self.base_dn
         self.ouname = ouname
         self.ldb_admin.create_ou(ouname)
@@ -742,18 +740,364 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         self.assertEqual(guid2, guid)
         self.assertEqual(str(res[0].dn), "")
 
-    def test_dirsync_unicodePwd(self):
+class SpecialDirsyncTests(DirsyncBaseTests):
+
+    def setUp(self):
+        super().setUp()
+
+        self.schema_dn = self.ldb_admin.get_schema_basedn()
+
+        # the tests work by setting the 'Confidential' or 'RODC Filtered' bit in the searchFlags
+        # for an existing schema attribute. This only works against Windows if
+        # the systemFlags does not have FLAG_SCHEMA_BASE_OBJECT set for the
+        # schema attribute being modified. There are only a few attributes that
+        # meet this criteria (most of which only apply to 'user' objects)
+        self.conf_attr = "homePostalAddress"
+        attr_cn = "CN=Address-Home"
+        # schemaIdGuid for homePostalAddress (used for ACE tests)
+        self.attr_dn = f"{attr_cn},{self.schema_dn}"
+
+        userou = "OU=conf-attr-test"
+        self.ou = "{0},{1}".format(userou, self.base_dn)
+        samba.tests.delete_force(self.ldb_admin, self.ou, controls=['tree_delete:1'])
+        self.ldb_admin.create_ou(self.ou)
+        self.addCleanup(samba.tests.delete_force, self.ldb_admin, self.ou, controls=['tree_delete:1'])
+
+        # add a test object with this attribute set
+        self.conf_value = "abcdef"
+        self.conf_user = "conf-user"
+        self.ldb_admin.newuser(self.conf_user, self.user_pass, userou=userou)
+        self.conf_dn = self.get_user_dn(self.conf_user)
+        self.add_attr(self.conf_dn, self.conf_attr, self.conf_value)
+
+        # sanity-check the flag is not already set (this'll cause problems if
+        # previous test run didn't clean up properly)
+
+        search_flags = int(self.get_attr_search_flags(self.attr_dn))
+        if search_flags & SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE:
+            self.set_attr_search_flags(self.attr_dn, str(search_flags &~ (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE)))
+        search_flags = int(self.get_attr_search_flags(self.attr_dn))
+        self.assertEqual(0, search_flags & (SEARCH_FLAG_CONFIDENTIAL|SEARCH_FLAG_RODC_ATTRIBUTE),
+                         f"{self.conf_attr} searchFlags did not reset to omit SEARCH_FLAG_CONFIDENTIAL and SEARCH_FLAG_RODC_ATTRIBUTE ({search_flags})")
+
+        # work out the original 'searchFlags' value before we overwrite it
+        old_value = self.get_attr_search_flags(self.attr_dn)
+
+        self.set_attr_search_flags(self.attr_dn, str(self.flag_under_test))
+
+        # reset the value after the test completes
+        self.addCleanup(self.set_attr_search_flags, self.attr_dn, old_value)
+
+    def add_attr(self, dn, attr, value):
+        m = Message()
+        m.dn = dn
+        m[attr] = MessageElement(value, FLAG_MOD_ADD, attr)
+        self.ldb_admin.modify(m)
+
+    def set_attr_search_flags(self, attr_dn, flags):
+        """Modifies the searchFlags for an object in the schema"""
+        m = Message()
+        m.dn = Dn(self.ldb_admin, attr_dn)
+        m['searchFlags'] = MessageElement(flags, FLAG_MOD_REPLACE,
+                                          'searchFlags')
+        self.ldb_admin.modify(m)
+
+        # note we have to update the schema for this change to take effect (on
+        # Windows, at least)
+        self.ldb_admin.set_schema_update_now()
+
+    def get_attr_search_flags(self, attr_dn):
+        res = self.ldb_admin.search(attr_dn, scope=SCOPE_BASE,
+                                    attrs=['searchFlags'])
+        return res[0]['searchFlags'][0]
+
+    def find_under_current_ou(self, res):
+        for msg in res:
+            if msg.dn == self.conf_dn:
+                return msg
+        self.fail(f"Failed to find object {self.conf_dn} in {len(res)} results")
+
+
+class ConfidentialDirsyncTests(SpecialDirsyncTests):
+
+    def setUp(self):
+        self.flag_under_test = SEARCH_FLAG_CONFIDENTIAL
+        super().setUp()
+
+    def test_unicodePwd_normal(self):
         res = self.ldb_admin.search(self.base_dn,
                                     attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
-                                    expression="(samAccountName=krbtgt)",
-                                    controls=["dirsync:1:0:0"])
+                                    expression=f"(samAccountName={self.conf_user})")
+
+        msg = res[0]
+
+        self.assertTrue("samAccountName" in msg)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        self.assertTrue(msg.get("unicodePwd") is None)
+        self.assertTrue(msg.get("supplementalCredentials") is None)
+
+    def _test_dirsync_unicodePwd(self, ldb_conn, control=None, insist_on_empty_element=False):
+        res = ldb_conn.search(self.base_dn,
+                         attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
+                         expression=f"(samAccountName={self.conf_user})",
+                         controls=[control])
+
+        msg = self.find_under_current_ou(res)
 
-        self.assertTrue(len(res) == 1)
+        self.assertTrue("samAccountName" in msg)
         # This form ensures this is a case insensitive comparison
-        self.assertTrue("samAccountName" in res[0])
-        self.assertTrue(res[0].get("samAccountName"))
-        self.assertTrue(res[0].get("unicodePwd") is None)
-        self.assertTrue(res[0].get("supplementalCredentials") is None)
+        self.assertTrue(msg.get("samAccountName"))
+        if insist_on_empty_element:
+            self.assertTrue(msg.get("unicodePwd") is not None)
+            self.assertEqual(len(msg.get("unicodePwd")), 0)
+            self.assertTrue(msg.get("supplementalCredentials") is not None)
+            self.assertEqual(len(msg.get("supplementalCredentials")), 0)
+        else:
+            self.assertTrue(msg.get("unicodePwd") is None
+                            or len(msg.get("unicodePwd")) == 0)
+            self.assertTrue(msg.get("supplementalCredentials") is None
+                            or len(msg.get("supplementalCredentials")) == 0)
+
+    def test_dirsync_unicodePwd_OBJ_SEC(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0")
+
+    def test_dirsync_unicodePwd_OBJ_SEC_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0", insist_on_empty_element=True)
+
+    def test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0")
+
+    def test_dirsync_unicodePwd_with_GET_CHANGES_OBJ_SEC_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:1:0", insist_on_empty_element=True)
+
+    def test_dirsync_unicodePwd_with_GET_CHANGES(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:0:0")
+
+    def test_dirsync_unicodePwd_with_GET_CHANGES_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_unicodePwd(ldb_conn, control="dirsync:1:0:0", insist_on_empty_element=True)
+
+    def test_normal(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        res = ldb_conn.search(self.base_dn,
+                         attrs=[self.conf_attr, "samAccountName"],
+                         expression=f"(samAccountName={self.conf_user})")
+
+        msg = res[0]
+        self.assertTrue("samAccountName" in msg)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        self.assertTrue(msg.get(self.conf_attr) is None)
+
+    def _test_dirsync_OBJECT_SECURITY(self, ldb_conn, insist_on_empty_element=False):
+        res = ldb_conn.search(self.base_dn,
+                              attrs=[self.conf_attr, "samAccountName"],
+                              expression=f"(samAccountName={self.conf_user})",
+                              controls=["dirsync:1:1:0"])
+
+        msg = self.find_under_current_ou(res)
+        self.assertTrue("samAccountName" in msg)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        if insist_on_empty_element:
+            self.assertTrue(msg.get(self.conf_attr) is not None)
+            self.assertEqual(len(msg.get(self.conf_attr)), 0)
+        else:
+            self.assertTrue(msg.get(self.conf_attr) is None
+                            or len(msg.get(self.conf_attr)) == 0)
+
+    def test_dirsync_OBJECT_SECURITY(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn)
+
+    def test_dirsync_OBJECT_SECURITY_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
+
+    def test_dirsync_with_GET_CHANGES(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        res = ldb_conn.search(self.base_dn,
+                         attrs=[self.conf_attr, "samAccountName"],
+                         expression=f"(samAccountName={self.conf_user})",
+                         controls=["dirsync:1:0:0"])
+
+        msg = self.find_under_current_ou(res)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        self.assertTrue(msg.get(self.conf_attr))
+        self.assertEqual(len(msg.get(self.conf_attr)), 1)
+
+    def test_dirsync_with_GET_CHANGES_OBJECT_SECURITY(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn)
+
+    def test_dirsync_with_GET_CHANGES_OBJECT_SECURITY_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
+
+class FilteredDirsyncTests(SpecialDirsyncTests):
+
+    def setUp(self):
+        self.flag_under_test = SEARCH_FLAG_RODC_ATTRIBUTE
+        super().setUp()
+
+    def test_attr(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        res = ldb_conn.search(self.base_dn,
+                         attrs=[self.conf_attr, "samAccountName"],
+                         expression=f"(samAccountName={self.conf_user})")
+
+        msg = res[0]
+        self.assertTrue("samAccountName" in msg)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        self.assertTrue(msg.get(self.conf_attr))
+        self.assertEqual(len(msg.get(self.conf_attr)), 1)
+
+    def _test_dirsync_OBJECT_SECURITY(self, ldb_conn):
+        res = ldb_conn.search(self.base_dn,
+                         attrs=[self.conf_attr, "samAccountName"],
+                         expression=f"(samAccountName={self.conf_user})",
+                         controls=["dirsync:1:1:0"])
+
+        msg = self.find_under_current_ou(res)
+        self.assertTrue("samAccountName" in msg)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        self.assertTrue(msg.get(self.conf_attr))
+        self.assertEqual(len(msg.get(self.conf_attr)), 1)
+
+    def test_dirsync_OBJECT_SECURITY(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn)
+
+    def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn)
+
+    def _test_dirsync_with_GET_CHANGES(self, insist_on_empty_element=False):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        res = ldb_conn.search(self.base_dn,
+                         expression=f"(samAccountName={self.conf_user})",
+                         controls=["dirsync:1:0:0"])
+
+        msg = self.find_under_current_ou(res)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        if insist_on_empty_element:
+            self.assertTrue(msg.get(self.conf_attr) is not None)
+            self.assertEqual(len(msg.get(self.conf_attr)), 0)
+        else:
+            self.assertTrue(msg.get(self.conf_attr) is None
+                            or len(msg.get(self.conf_attr)) == 0)
+
+    def test_dirsync_with_GET_CHANGES(self):
+        self._test_dirsync_with_GET_CHANGES()
+
+    def test_dirsync_with_GET_CHANGES_insist_on_empty_element(self):
+        self._test_dirsync_with_GET_CHANGES(insist_on_empty_element=True)
+
+    def test_dirsync_with_GET_CHANGES_attr(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        try:
+            res = ldb_conn.search(self.base_dn,
+                                  attrs=[self.conf_attr, "samAccountName"],
+                                  expression=f"(samAccountName={self.conf_user})",
+                                  controls=["dirsync:1:0:0"])
+            self.fail("ldb.search() should have failed with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
+        except ldb.LdbError as e:
+            (errno, errstr) = e.args
+            self.assertEqual(errno, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
+class ConfidentialFilteredDirsyncTests(SpecialDirsyncTests):
+
+    def setUp(self):
+        self.flag_under_test = SEARCH_FLAG_RODC_ATTRIBUTE|SEARCH_FLAG_CONFIDENTIAL
+        super().setUp()
+
+    def test_attr(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        res = ldb_conn.search(self.base_dn,
+                         attrs=["unicodePwd", "supplementalCredentials", "samAccountName"],
+                         expression=f"(samAccountName={self.conf_user})")
+
+        msg = res[0]
+        self.assertTrue(msg.get("samAccountName"))
+        self.assertTrue(msg.get(self.conf_attr) is None)
+
+    def _test_dirsync_OBJECT_SECURITY(self, ldb_conn, insist_on_empty_element=False):
+        res = ldb_conn.search(self.base_dn,
+                              attrs=[self.conf_attr, "samAccountName"],
+                              expression=f"(samAccountName={self.conf_user})",
+                              controls=["dirsync:1:1:0"])
+
+        msg = self.find_under_current_ou(res)
+        self.assertTrue("samAccountName" in msg)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        if insist_on_empty_element:
+            self.assertTrue(msg.get(self.conf_attr) is not None)
+            self.assertEqual(len(msg.get(self.conf_attr)), 0)
+        else:
+            self.assertTrue(msg.get(self.conf_attr) is None
+                            or len(msg.get(self.conf_attr)) == 0)
+
+    def test_dirsync_OBJECT_SECURITY(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn)
+
+    def test_dirsync_OBJECT_SECURITY_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.simple_user, self.simple_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
+
+    def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn)
+
+    def test_dirsync_OBJECT_SECURITY_with_GET_CHANGES_insist_on_empty_element(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        self._test_dirsync_OBJECT_SECURITY(ldb_conn, insist_on_empty_element=True)
+
+    def _test_dirsync_with_GET_CHANGES(self, insist_on_empty_element=False):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        res = ldb_conn.search(self.base_dn,
+                         expression=f"(samAccountName={self.conf_user})",
+                         controls=["dirsync:1:0:0"])
+
+        msg = self.find_under_current_ou(res)
+        # This form ensures this is a case insensitive comparison
+        self.assertTrue(msg.get("samAccountName"))
+        if insist_on_empty_element:
+            self.assertTrue(msg.get(self.conf_attr) is not None)
+            self.assertEqual(len(msg.get(self.conf_attr)), 0)
+        else:
+            self.assertTrue(msg.get(self.conf_attr) is None
+                            or len(msg.get(self.conf_attr)) == 0)
+
+    def test_dirsync_with_GET_CHANGES(self):
+        self._test_dirsync_with_GET_CHANGES()
+
+    def test_dirsync_with_GET_CHANGES_insist_on_empty_element(self):
+        self._test_dirsync_with_GET_CHANGES(insist_on_empty_element=True)
+
+    def test_dirsync_with_GET_CHANGES_attr(self):
+        ldb_conn = self.get_ldb_connection(self.dirsync_user, self.dirsync_pass)
+        try:
+            res = ldb_conn.search(self.base_dn,
+                                  attrs=[self.conf_attr, "samAccountName"],
+                                  expression=f"(samAccountName={self.conf_user})",
+                                  controls=["dirsync:1:0:0"])
+            self.fail("ldb.search() should have failed with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
+        except ldb.LdbError as e:
+            (errno, errstr) = e.args
+            self.assertEqual(errno, ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS)
+
 
 if not getattr(opts, "listtests", False):
     lp = sambaopts.get_loadparm()