selftest: aces: fix mutable default args in assemble_ace
[janger/samba-autobuild/.git] / python / samba / tests / conditional_ace_assembler.py
1 # Unix SMB/CIFS implementation.
2 # Copyright © Catalyst IT 2023
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17 """Fine-grained control over conditional ACE contents.
18
19 This deliberately allows you to do broken things that SDDL doesn't.
20
21 - token sequences that make no real sense
22 - sequences that make sense which SDDL can't encode
23 - strings that aren't proper utf-16
24 - etc.
25 """
26
27 import struct
28 from samba.dcerpc import security, conditional_ace as ca
29 from samba.ndr import ndr_pack
30
31
32 class Composite:
33     token = ca.CONDITIONAL_ACE_TOKEN_COMPOSITE
34
35     def __init__(self, *tokens):
36         self.members = []
37         for t in tokens:
38             self.members.append(dwim_one_token(t))
39
40     def __bytes__(self):
41         v = []
42         for x in self.members:
43             v.extend(bytes(x))
44
45         return (bytes([self.token]) +
46                 struct.pack("<I", len(v)) +
47                 bytes(v))
48
49
50 class Int:
51     def __init__(self, value,
52                  bits=ca.CONDITIONAL_ACE_TOKEN_INT64,
53                  base=ca.CONDITIONAL_ACE_INT_BASE_10,
54                  sign=ca.CONDITIONAL_ACE_INT_SIGN_NONE):
55         self.value = value
56         self.bits = int(bits)
57         self.base = int(base)
58         self.sign = int(sign)
59
60     def __bytes__(self):
61         n = struct.pack('<q', self.value)
62         return bytes([self.bits]) + n + bytes([self.sign, self.base])
63
64
65 class String:
66     """A string is decoded as UTF-16.
67     Other iterables allows the insertion of arbitrary raw bytes."""
68     token = ca.CONDITIONAL_ACE_TOKEN_UNICODE
69
70     def __init__(self, value):
71         if isinstance(value, str):
72             value = value.encode('utf-16-le')
73         self.value = list(value)
74
75     def __bytes__(self):
76         header = struct.pack('<BI', self.token, len(self.value))
77         return header + bytes(self.value)
78
79
80 class LocalAttr(String):
81     token = ca.CONDITIONAL_ACE_LOCAL_ATTRIBUTE
82
83
84 class UserAttr(String):
85     token = ca.CONDITIONAL_ACE_USER_ATTRIBUTE
86
87
88 class DeviceAttr(String):
89     token = ca.CONDITIONAL_ACE_DEVICE_ATTRIBUTE
90
91
92 class ResourceAttr(String):
93     token = ca.CONDITIONAL_ACE_RESOURCE_ATTRIBUTE
94
95
96 class ByteString:
97     """takes an iterable of 8-bit numbers, or a string."""
98     token = ca.CONDITIONAL_ACE_TOKEN_OCTET_STRING
99
100     def __init__(self, value):
101         if isinstance(value, str):
102             value = value.encode()
103         self.value = bytes(value)
104         if max(self.value) > 255 or min(self.value) < 0:
105             raise ValueError("bytes do need to be bytes (0-255)")
106
107     def __bytes__(self):
108         header = struct.pack('<BI', self.token, len(self.value))
109         return header + self.value
110
111
112 class SID:
113     token = ca.CONDITIONAL_ACE_TOKEN_SID
114
115     def __init__(self, sidstring):
116         self.sid = security.domsid(sidstring)
117
118     def __bytes__(self):
119         value = ndr_pack(self.sid)
120         header = struct.pack('B<I', self.token, len(value))
121         return header + value
122
123
124 class Token:
125     """To add a raw byte, like
126     Token(ca.CONDITIONAL_ACE_TOKEN_COMPOSITE)
127     """
128     def __init__(self, v):
129         self.token = v
130
131     def __bytes__(self):
132         return bytes([self.token])
133
134
135 def _add_tokens():
136     for tok in dir(ca):
137         if not tok[:22] == 'CONDITIONAL_ACE_TOKEN_':
138             continue
139         k = tok[22:]
140         globals()[k] = Token(getattr(ca, tok))
141
142 _add_tokens()
143
144
145 def dwim_one_token(t):
146     if isinstance(t, int):
147         return Int(t)
148     if isinstance(t, str):
149         return String(t)
150     if isinstance(t, tuple):
151         return Composite(*t)
152     if isinstance(t, bytes):
153         return ByteString(t)
154
155     return t
156
157
158 def assemble(*tokens):
159     program = b'artx'
160     if len(tokens) == 1 and isinstance(tokens, (list, tuple, set)):
161         print("WARNING: single argument container will become a composite. "
162               "you might have meant 'assemble(*args)', not 'assemble(args)'")
163
164     for t in tokens:
165         t = dwim_one_token(t)
166         program += bytes(t)
167
168     program += b'\x00\x00\x00'
169     program = program[:-(len(program) & 3)]
170
171     return program
172
173
174 def assemble_ace(tokens=None,
175                  type=security.SEC_ACE_TYPE_ACCESS_ALLOWED_CALLBACK,
176                  trustee=None,
177                  flags=None,
178                  object=None,
179                  access_mask=None):
180     if tokens is None:
181         tokens = []
182
183     type_strings = {
184         'XA': security.SEC_ACE_TYPE_ACCESS_ALLOWED_CALLBACK,
185         'XD': security.SEC_ACE_TYPE_ACCESS_DENIED_CALLBACK,
186         'ZA': security.SEC_ACE_TYPE_ACCESS_ALLOWED_CALLBACK_OBJECT,
187         # this can also make plain ACEs
188         'A': security.SEC_ACE_TYPE_ACCESS_ALLOWED,
189         'D': security.SEC_ACE_TYPE_ACCESS_DENIED,
190     }
191
192     a = security.ace()
193     a.type = type_strings.get(type, type)
194     if trustee is not None:
195         a.trustee = trustee
196     if flags is not None:
197         a.flags = flags
198     if object is not None:
199         a.object = object
200     if tokens:
201         a.coda = assemble(*tokens)
202     return a
203
204
205 def assemble_sd(base_sddl='D:',
206                 add_trailing_allow_ace=False,
207                 domain_sid=None,
208                 **ace_args):
209     """Make a security descriptor using the base_sddl, then add the
210     assembled conditional ACE on the end of its DACL. If
211     add_trailing_allow_ace is true, an allow ace matching
212     '(A;;0x1ff;;;WD)' is added to the end, allowing successful deny
213     ACEs to be detected.
214     """
215     if domain_sid is None:
216         domain_sid = security.dom_sid('S-1-2-3-4')
217
218     sd = security.descriptor.from_sddl(base_sddl, domain_sid)
219     ace = assemble_ace(**ace_args)
220     sd.dacl_add(ace)
221     if add_trailing_allow_ace:
222         # If the compiled ACE is a deny ACE, we won't know if it
223         # worked unless there is a wide ranging allow ACE following
224         # it.
225         allow_ace = assemble_ace(type=security.SEC_ACE_TYPE_ACCESS_ALLOWED,
226                                  trustee=security.dom_sid(security.SID_WORLD),
227                                  access_mask=security.SEC_FILE_ALL)
228         sd.dacl_add(allow_ace)
229
230     return sd