CVE-2023-4154 python:sd_utils: introduce update_aces_in_dacl() helper
authorStefan Metzmacher <metze@samba.org>
Fri, 10 Mar 2023 17:25:18 +0000 (18:25 +0100)
committerJule Anger <janger@samba.org>
Sun, 8 Oct 2023 20:06:18 +0000 (22:06 +0200)
This is a more generic api that can be re-used in other places
as well in future. It operates on a security descriptor object instead of
SDDL.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15424

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
(cherry picked from commit 8411e6d302e25d10f1035ebbdcbde7308566e930)

python/samba/sd_utils.py

index 26e80ee2f4ac6d4bc434b364055e1da96b249a3a..52a78de5d09a8f663703fe769885e756d97a2f1c 100644 (file)
 import samba
 from ldb import Message, MessageElement, Dn
 from ldb import FLAG_MOD_REPLACE, SCOPE_BASE
-from samba.ndr import ndr_pack, ndr_unpack
+from samba.ndr import ndr_pack, ndr_unpack, ndr_deepcopy
 from samba.dcerpc import security
+from samba.ntstatus import (
+    NT_STATUS_OBJECT_NAME_NOT_FOUND,
+)
 
 
 class SDUtils(object):
@@ -63,19 +66,116 @@ class SDUtils(object):
         res = self.ldb.search(object_dn)
         return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
 
+    def update_aces_in_dacl(self, dn, del_aces=None, add_aces=None,
+                            sddl_attr=None, controls=None):
+        if del_aces is None:
+            del_aces=[]
+        if add_aces is None:
+            add_aces=[]
+
+        def ace_from_sddl(ace_sddl):
+            ace_sd = security.descriptor.from_sddl("D:" + ace_sddl, self.domain_sid)
+            assert(len(ace_sd.dacl.aces)==1)
+            return ace_sd.dacl.aces[0]
+
+        if sddl_attr is None:
+            if controls is None:
+                controls=["sd_flags:1:%d" % security.SECINFO_DACL]
+            sd = self.read_sd_on_dn(dn, controls=controls)
+            if not sd.type & security.SEC_DESC_DACL_PROTECTED:
+                # if the DACL is not protected remove all
+                # inherited aces, as they will be re-inherited
+                # on the server, we need a ndr_deepcopy in order
+                # to avoid reference problems while deleting
+                # the aces while looping over them
+                dacl_copy = ndr_deepcopy(sd.dacl)
+                for ace in dacl_copy.aces:
+                    if ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE:
+                        try:
+                            sd.dacl_del_ace(ace)
+                        except samba.NTSTATUSError as err:
+                            if err.args[0] != NT_STATUS_OBJECT_NAME_NOT_FOUND:
+                                raise err
+                            # dacl_del_ace may remove more than
+                            # one ace, so we may not find it anymore
+                            pass
+        else:
+            if controls is None:
+                controls=[]
+            res = self.ldb.search(dn, SCOPE_BASE, None,
+                                  [sddl_attr], controls=controls)
+            old_sddl = str(res[0][sddl_attr][0])
+            sd = security.descriptor.from_sddl(old_sddl, self.domain_sid)
+
+        num_changes = 0
+        del_ignored = []
+        add_ignored = []
+        inherited_ignored = []
+
+        for ace in del_aces:
+            if isinstance(ace, str):
+                ace = ace_from_sddl(ace)
+            assert(isinstance(ace, security.ace))
+
+            if ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE:
+                inherited_ignored.append(ace)
+                continue
+
+            if ace not in sd.dacl.aces:
+                del_ignored.append(ace)
+                continue
+
+            sd.dacl_del_ace(ace)
+            num_changes += 1
+
+        for ace in add_aces:
+            add_idx = -1
+            if isinstance(ace, dict):
+                if "idx" in ace:
+                    add_idx = ace["idx"]
+                ace = ace["ace"]
+            if isinstance(ace, str):
+                ace = ace_from_sddl(ace)
+            assert(isinstance(ace, security.ace))
+
+            if ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE:
+                inherited_ignored.append(ace)
+                continue
+
+            if ace in sd.dacl.aces:
+                add_ignored.append(ace)
+                continue
+
+            sd.dacl_add(ace, add_idx)
+            num_changes += 1
+
+        if num_changes == 0:
+            return del_ignored, add_ignored, inherited_ignored
+
+        if sddl_attr is None:
+            self.modify_sd_on_dn(dn, sd, controls=controls)
+        else:
+            new_sddl = sd.as_sddl(self.domain_sid)
+            m = Message()
+            m.dn = dn
+            m[sddl_attr] = MessageElement(new_sddl.encode('ascii'),
+                                          FLAG_MOD_REPLACE,
+                                          sddl_attr)
+            self.ldb.modify(m, controls=controls)
+
+        return del_ignored, add_ignored, inherited_ignored
+
     def dacl_add_ace(self, object_dn, ace):
-        """Add an ACE to an objects security descriptor
+        """Add an ACE (or more) to an objects security descriptor
         """
-        desc = self.read_sd_on_dn(object_dn, ["show_deleted:1"])
-        desc_sddl = desc.as_sddl(self.domain_sid)
-        if ace in desc_sddl:
-            return
-        if desc_sddl.find("(") >= 0:
-            desc_sddl = (desc_sddl[:desc_sddl.index("(")] + ace +
-                         desc_sddl[desc_sddl.index("("):])
-        else:
-            desc_sddl = desc_sddl + ace
-        self.modify_sd_on_dn(object_dn, desc_sddl, ["show_deleted:1"])
+        ace_sd = security.descriptor.from_sddl("D:" + ace, self.domain_sid)
+        add_aces = []
+        add_idx = 0
+        for ace in ace_sd.dacl.aces:
+            add_aces.append({"idx": add_idx, "ace": ace})
+            add_idx += 1
+        _,_,_ = self.update_aces_in_dacl(object_dn, add_aces=add_aces,
+                                         controls=["show_deleted:1"])
 
     def get_sd_as_sddl(self, object_dn, controls=[]):
         """Return object nTSecutiryDescriptor in SDDL format