CVE-2023-4154 python:sd_utils: add dacl_{prepend,append,delete}_aces() helpers
[samba.git] / python / samba / sd_utils.py
1 # Utility methods for security descriptor manipulation
2 #
3 # Copyright Nadezhda Ivanova 2010 <nivanova@samba.org>
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 """Utility methods for security descriptor manipulation."""
20
21 import samba
22 from ldb import Message, MessageElement, Dn
23 from ldb import FLAG_MOD_REPLACE, SCOPE_BASE
24 from samba.ndr import ndr_pack, ndr_unpack, ndr_deepcopy
25 from samba.dcerpc import security
26 from samba.ntstatus import (
27     NT_STATUS_OBJECT_NAME_NOT_FOUND,
28 )
29
30
31 class SDUtils(object):
32     """Some utilities for manipulation of security descriptors on objects."""
33
34     def __init__(self, samdb):
35         self.ldb = samdb
36         self.domain_sid = security.dom_sid(self.ldb.get_domain_sid())
37
38     def modify_sd_on_dn(self, object_dn, sd, controls=None):
39         """Modify security descriptor using either SDDL string
40             or security.descriptor object
41         """
42         m = Message()
43         if isinstance(object_dn, Dn):
44             m.dn = object_dn
45         else:
46             m.dn = Dn(self.ldb, object_dn)
47
48         assert(isinstance(sd, str) or isinstance(sd, security.descriptor))
49         if isinstance(sd, str):
50             tmp_desc = security.descriptor.from_sddl(sd, self.domain_sid)
51         elif isinstance(sd, security.descriptor):
52             tmp_desc = sd
53
54         m["nTSecurityDescriptor"] = MessageElement(ndr_pack(tmp_desc),
55                                                    FLAG_MOD_REPLACE,
56                                                    "nTSecurityDescriptor")
57         self.ldb.modify(m, controls)
58
59     def read_sd_on_dn(self, object_dn, controls=None):
60         res = self.ldb.search(object_dn, SCOPE_BASE, None,
61                               ["nTSecurityDescriptor"], controls=controls)
62         desc = res[0]["nTSecurityDescriptor"][0]
63         return ndr_unpack(security.descriptor, desc)
64
65     def get_object_sid(self, object_dn):
66         res = self.ldb.search(object_dn)
67         return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
68
69     def update_aces_in_dacl(self, dn, del_aces=None, add_aces=None,
70                             sddl_attr=None, controls=None):
71         if del_aces is None:
72             del_aces=[]
73         if add_aces is None:
74             add_aces=[]
75
76         def ace_from_sddl(ace_sddl):
77             ace_sd = security.descriptor.from_sddl("D:" + ace_sddl, self.domain_sid)
78             assert(len(ace_sd.dacl.aces)==1)
79             return ace_sd.dacl.aces[0]
80
81         if sddl_attr is None:
82             if controls is None:
83                 controls=["sd_flags:1:%d" % security.SECINFO_DACL]
84             sd = self.read_sd_on_dn(dn, controls=controls)
85             if not sd.type & security.SEC_DESC_DACL_PROTECTED:
86                 # if the DACL is not protected remove all
87                 # inherited aces, as they will be re-inherited
88                 # on the server, we need a ndr_deepcopy in order
89                 # to avoid reference problems while deleting
90                 # the aces while looping over them
91                 dacl_copy = ndr_deepcopy(sd.dacl)
92                 for ace in dacl_copy.aces:
93                     if ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE:
94                         try:
95                             sd.dacl_del_ace(ace)
96                         except samba.NTSTATUSError as err:
97                             if err.args[0] != NT_STATUS_OBJECT_NAME_NOT_FOUND:
98                                 raise err
99                             # dacl_del_ace may remove more than
100                             # one ace, so we may not find it anymore
101                             pass
102         else:
103             if controls is None:
104                 controls=[]
105             res = self.ldb.search(dn, SCOPE_BASE, None,
106                                   [sddl_attr], controls=controls)
107             old_sddl = str(res[0][sddl_attr][0])
108             sd = security.descriptor.from_sddl(old_sddl, self.domain_sid)
109
110         num_changes = 0
111         del_ignored = []
112         add_ignored = []
113         inherited_ignored = []
114
115         for ace in del_aces:
116             if isinstance(ace, str):
117                 ace = ace_from_sddl(ace)
118             assert(isinstance(ace, security.ace))
119
120             if ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE:
121                 inherited_ignored.append(ace)
122                 continue
123
124             if ace not in sd.dacl.aces:
125                 del_ignored.append(ace)
126                 continue
127
128             sd.dacl_del_ace(ace)
129             num_changes += 1
130
131         for ace in add_aces:
132             add_idx = -1
133             if isinstance(ace, dict):
134                 if "idx" in ace:
135                     add_idx = ace["idx"]
136                 ace = ace["ace"]
137             if isinstance(ace, str):
138                 ace = ace_from_sddl(ace)
139             assert(isinstance(ace, security.ace))
140
141             if ace.flags & security.SEC_ACE_FLAG_INHERITED_ACE:
142                 inherited_ignored.append(ace)
143                 continue
144
145             if ace in sd.dacl.aces:
146                 add_ignored.append(ace)
147                 continue
148
149             sd.dacl_add(ace, add_idx)
150             num_changes += 1
151
152         if num_changes == 0:
153             return del_ignored, add_ignored, inherited_ignored
154
155         if sddl_attr is None:
156             self.modify_sd_on_dn(dn, sd, controls=controls)
157         else:
158             new_sddl = sd.as_sddl(self.domain_sid)
159             m = Message()
160             m.dn = dn
161             m[sddl_attr] = MessageElement(new_sddl.encode('ascii'),
162                                           FLAG_MOD_REPLACE,
163                                           sddl_attr)
164             self.ldb.modify(m, controls=controls)
165
166         return del_ignored, add_ignored, inherited_ignored
167
168     def dacl_prepend_aces(self, object_dn, aces, controls=None):
169         """Prepend an ACE (or more) to an objects security descriptor
170         """
171         ace_sd = security.descriptor.from_sddl("D:" + aces, self.domain_sid)
172         add_aces = []
173         add_idx = 0
174         for ace in ace_sd.dacl.aces:
175             add_aces.append({"idx": add_idx, "ace": ace})
176             add_idx += 1
177         _,ai,ii = self.update_aces_in_dacl(object_dn, add_aces=add_aces,
178                                            controls=controls)
179         return ai, ii
180
181     def dacl_add_ace(self, object_dn, ace):
182         """Add an ACE (or more) to an objects security descriptor
183         """
184         _,_ = self.dacl_prepend_aces(object_dn, ace,
185                                      controls=["show_deleted:1"])
186
187     def dacl_append_aces(self, object_dn, aces, controls=None):
188         """Append an ACE (or more) to an objects security descriptor
189         """
190         ace_sd = security.descriptor.from_sddl("D:" + aces, self.domain_sid)
191         add_aces = []
192         for ace in ace_sd.dacl.aces:
193             add_aces.append(ace)
194         _,ai,ii = self.update_aces_in_dacl(object_dn, add_aces=add_aces,
195                                            controls=controls)
196         return ai, ii
197
198     def dacl_delete_aces(self, object_dn, aces, controls=None):
199         """Delete an ACE (or more) to an objects security descriptor
200         """
201         del_sd = security.descriptor.from_sddl("D:" + aces, self.domain_sid)
202         del_aces = []
203         for ace in del_sd.dacl.aces:
204             del_aces.append(ace)
205         di,_,ii = self.update_aces_in_dacl(object_dn, del_aces=del_aces,
206                                            controls=controls)
207         return di, ii
208
209     def get_sd_as_sddl(self, object_dn, controls=[]):
210         """Return object nTSecutiryDescriptor in SDDL format
211         """
212         desc = self.read_sd_on_dn(object_dn, controls + ["show_deleted:1"])
213         return desc.as_sddl(self.domain_sid)