tests/krb5: Add test for an authentication policy that allows a specific account
[anoopcs/samba.git] / python / samba / tests / krb5 / conditional_ace_tests.py
1 #!/usr/bin/env python3
2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2023
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import sys
21 import os
22
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
25
26 from collections import OrderedDict
27 from functools import partial
28 import re
29 from string import Formatter
30
31 import ldb
32
33 from samba import dsdb, ntstatus
34 from samba.dcerpc import claims, krb5pac, security
35 from samba.ndr import ndr_pack, ndr_unpack
36 from samba.sd_utils import escaped_claim_id
37
38 from samba.tests import DynamicTestCase, env_get_var_value
39 from samba.tests.krb5.authn_policy_tests import (
40     AuditEvent,
41     AuditReason,
42     AuthnPolicyBaseTests,
43 )
44 from samba.tests.krb5.raw_testcase import RawKerberosTest
45 from samba.tests.krb5.rfc4120_constants import (
46     KDC_ERR_BADOPTION,
47     KDC_ERR_GENERIC,
48     KDC_ERR_POLICY,
49     NT_PRINCIPAL,
50 )
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
52
53 SidType = RawKerberosTest.SidType
54
55 global_asn1_print = False
56 global_hexdump = False
57
58
59 # When used as a test outcome, indicates that the test can cause a Windows
60 # server to crash, and is to be run with caution.
61 CRASHES_WINDOWS = object()
62
63
64 class ConditionalAceBaseTests(AuthnPolicyBaseTests):
65     # Constants for group SID attributes.
66     default_attrs = security.SE_GROUP_DEFAULT_FLAGS
67     resource_attrs = default_attrs | security.SE_GROUP_RESOURCE
68
69     aa_asserted_identity = (
70         security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
71     service_asserted_identity = security.SID_SERVICE_ASSERTED_IDENTITY
72
73     @classmethod
74     def setUpClass(cls):
75         super().setUpClass()
76
77         cls._setup = False
78
79     def setUp(self):
80         super().setUp()
81         self.do_asn1_print = global_asn1_print
82         self.do_hexdump = global_hexdump
83
84         if not self._setup:
85             samdb = self.get_samdb()
86             cls = type(self)
87
88             # Create a machine account with which to perform FAST.
89             cls._mach_creds = self.get_cached_creds(
90                 account_type=self.AccountType.COMPUTER)
91
92             # Create some new groups.
93
94             group0_name = self.get_new_username()
95             group0_dn = self.create_group(samdb, group0_name)
96             cls._group0_sid = self.get_objectSid(samdb, group0_dn)
97
98             group1_name = self.get_new_username()
99             group1_dn = self.create_group(samdb, group1_name)
100             cls._group1_sid = self.get_objectSid(samdb, group1_dn)
101
102             # Create machine accounts with which to perform FAST that belong to
103             # various arrangements of the groups.
104
105             cls._member_of_both_creds = self.get_cached_creds(
106                 account_type=self.AccountType.COMPUTER,
107                 opts={'member_of': (group0_dn, group1_dn)})
108
109             cls._member_of_one_creds = self.get_cached_creds(
110                 account_type=self.AccountType.COMPUTER,
111                 opts={'member_of': (group1_dn,)})
112
113             # Create some authentication silos.
114             cls._unenforced_silo = self.create_authn_silo(enforced=False)
115             cls._enforced_silo = self.create_authn_silo(enforced=True)
116
117             # Create machine accounts with which to perform FAST that belong to
118             # the respective silos.
119
120             cls._member_of_unenforced_silo = self._get_creds(
121                 account_type=self.AccountType.COMPUTER,
122                 assigned_silo=self._unenforced_silo,
123                 cached=True)
124             self.add_to_group(str(self._member_of_unenforced_silo.get_dn()),
125                               self._unenforced_silo.dn,
126                               'msDS-AuthNPolicySiloMembers',
127                               expect_attr=False)
128
129             cls._member_of_enforced_silo = self._get_creds(
130                 account_type=self.AccountType.COMPUTER,
131                 assigned_silo=self._enforced_silo,
132                 cached=True)
133             self.add_to_group(str(self._member_of_enforced_silo.get_dn()),
134                               self._enforced_silo.dn,
135                               'msDS-AuthNPolicySiloMembers',
136                               expect_attr=False)
137
138             # Create a couple of multi‐valued string claims for testing claim
139             # value comparisons.
140
141             cls.claim0_attr = 'carLicense'
142             cls.claim0_id = self.get_new_username()
143             self.create_claim(cls.claim0_id,
144                               enabled=True,
145                               attribute=cls.claim0_attr,
146                               single_valued=False,
147                               source_type='AD',
148                               for_classes=['computer', 'user'],
149                               value_type=claims.CLAIM_TYPE_STRING)
150
151             cls.claim1_attr = 'departmentNumber'
152             cls.claim1_id = self.get_new_username()
153             self.create_claim(cls.claim1_id,
154                               enabled=True,
155                               attribute=cls.claim1_attr,
156                               single_valued=False,
157                               source_type='AD',
158                               for_classes=['computer', 'user'],
159                               value_type=claims.CLAIM_TYPE_STRING)
160
161             cls._setup = True
162
163     # For debugging purposes. Prints out the SDDL representation of
164     # authentication policy conditions set by the Windows GUI.
165     def _print_authn_policy_sddl(self, policy_id):
166         policy_dn = self.get_authn_policies_dn()
167         policy_dn.add_child(f'CN={policy_id}')
168
169         attrs = [
170             'msDS-ComputerAllowedToAuthenticateTo',
171             'msDS-ServiceAllowedToAuthenticateFrom',
172             'msDS-ServiceAllowedToAuthenticateTo',
173             'msDS-UserAllowedToAuthenticateFrom',
174             'msDS-UserAllowedToAuthenticateTo',
175         ]
176
177         samdb = self.get_samdb()
178         res = samdb.search(policy_dn, scope=ldb.SCOPE_BASE, attrs=attrs)
179         self.assertEqual(1, len(res),
180                          f'Authentication policy {policy_id} not found')
181         result = res[0]
182
183         def print_sddl(attr):
184             sd = result.get(attr, idx=0)
185             if sd is None:
186                 return
187
188             sec_desc = ndr_unpack(security.descriptor, sd)
189             print(f'{attr}: {sec_desc.as_sddl()}')
190
191         for attr in attrs:
192             print_sddl(attr)
193
194     def sddl_array_from_sids(self, sids):
195         def sddl_from_sid_entry(sid_entry):
196             sid, _, _ = sid_entry
197             return f'SID({sid})'
198
199         return f"{{{', '.join(map(sddl_from_sid_entry, sids))}}}"
200
201     def allow_if(self, condition):
202         return f'O:SYD:(XA;;CR;;;WD;({condition}))'
203
204
205 @DynamicTestCase
206 class ConditionalAceTests(ConditionalAceBaseTests):
207     @classmethod
208     def setUpDynamicTestCases(cls):
209         FILTER = env_get_var_value('FILTER', allow_missing=True)
210
211         # These operators are arranged so that each operator precedes its own
212         # affixes.
213         op_names = OrderedDict([
214             ('!=', 'does not equal'),
215             ('!', 'not'),
216             ('&&', 'and'),
217             ('<=', 'is less than or equals'),
218             ('<', 'is less than'),
219             ('==', 'equals'),
220             ('>=', 'exceeds or equals'),
221             ('>', 'exceeds'),
222             ('Not_Any_of', 'matches none of'),
223             ('Any_of', 'matches any of'),
224             ('Not_Contains', 'does not contain'),
225             ('Contains', 'contains'),
226             ('Not_Member_of_Any', 'the user belongs to none of'),
227             ('Not_Device_Member_of_Any', 'the device belongs to none of'),  # TODO: no test for this yet
228             ('Device_Member_of_Any', 'the device belongs to any of'),  # TODO: no test for this yet
229             ('Not_Device_Member_of', 'the device does not belong to'),  # TODO: no test for this yet
230             ('Device_Member_of', 'the device belongs to'),
231             ('Not_Exists', 'there does not exist'),
232             ('Exists', 'there exists'),
233             ('Member_of_Any', 'the user belongs to any of'),
234             ('Not_Member_of', 'the user does not belong to'),
235             ('Member_of', 'the user belongs to'),
236             ('||', 'or'),
237         ])
238
239         # This is a safety measure to ensure correct ordering of op_names
240         keys = list(op_names.keys())
241         for i in range(len(keys)):
242             for j in range(i + 1, len(keys)):
243                 if keys[i] in keys[j]:
244                     raise AssertionError((keys[i], keys[j]))
245
246         for case in cls.pac_claim_cases:
247             if len(case) == 3:
248                 pac_claims, expression, outcome = case
249                 claim_map = None
250             elif len(case) == 4:
251                 pac_claims, expression, claim_map, outcome = case
252             else:
253                 raise AssertionError(
254                     f'found {len(case)} items in case, expected 3–4')
255
256             expression_name = expression
257             for op, op_name in op_names.items():
258                 expression_name = expression_name.replace(op, op_name)
259
260             name = f'{pac_claims}_{expression_name}'
261
262             if claim_map is not None:
263                 name += f'_{claim_map}'
264
265             name = re.sub(r'\W+', '_', name)
266             if len(name) > 150:
267                 name = f'{name[:125]}+{len(name) - 125}‐more'
268
269             if FILTER and not re.search(FILTER, name):
270                 continue
271
272             cls.generate_dynamic_test('test_pac_claim_cmp', name,
273                                       pac_claims, expression, claim_map,
274                                       outcome)
275
276         for case in cls.claim_against_claim_cases:
277             lhs, op, rhs, outcome = case
278             op_name = op_names[op]
279
280             name = f'{lhs}_{op_name}_{rhs}'
281
282             name = re.sub(r'\W+', '_', name)
283             if FILTER and not re.search(FILTER, name):
284                 continue
285
286             cls.generate_dynamic_test('test_cmp', name,
287                                       lhs, op, rhs, outcome)
288
289         for case in cls.claim_against_literal_cases:
290             lhs, op, rhs, outcome = case
291             op_name = op_names[op]
292
293             name = f'{lhs}_{op_name}_literal_{rhs}'
294
295             name = re.sub(r'\W+', '_', name)
296             if FILTER and not re.search(FILTER, name):
297                 continue
298
299             cls.generate_dynamic_test('test_cmp', name,
300                                       lhs, op, rhs, outcome, True)
301
302     def test_allowed_from_member_of_each(self):
303         # Create an authentication policy that allows accounts belonging to
304         # both groups.
305         policy = self.create_authn_policy(
306             enforced=True,
307             user_allowed_from=(
308                 f'O:SYD:(XA;;CR;;;WD;(Member_of '
309                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
310         )
311
312         # Create a user account with the assigned policy.
313         client_creds = self._get_creds(account_type=self.AccountType.USER,
314                                        assigned_policy=policy)
315
316         # Show that we get a policy error if the machine account does not
317         # belong to both groups.
318         armor_tgt = self.get_tgt(self._member_of_one_creds)
319         self._get_tgt(client_creds, armor_tgt=armor_tgt,
320                       expected_error=KDC_ERR_POLICY)
321
322         # Otherwise, authentication should succeed.
323         armor_tgt = self.get_tgt(self._member_of_both_creds)
324         self._get_tgt(client_creds, armor_tgt=armor_tgt,
325                       expected_error=0)
326
327     def test_allowed_from_member_of_any(self):
328         # Create an authentication policy that allows accounts belonging to
329         # either group.
330         policy = self.create_authn_policy(
331             enforced=True,
332             user_allowed_from=(
333                 f'O:SYD:(XA;;CR;;;WD;(Member_of_Any '
334                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
335         )
336
337         # Create a user account with the assigned policy.
338         client_creds = self._get_creds(account_type=self.AccountType.USER,
339                                        assigned_policy=policy)
340
341         # Show that we get a policy error if the machine account belongs to
342         # neither group.
343         armor_tgt = self.get_tgt(self._mach_creds)
344         self._get_tgt(client_creds, armor_tgt=armor_tgt,
345                       expected_error=KDC_ERR_POLICY)
346
347         # Otherwise, authentication should succeed.
348         armor_tgt = self.get_tgt(self._member_of_one_creds)
349         self._get_tgt(client_creds, armor_tgt=armor_tgt,
350                       expected_error=0)
351
352     def test_allowed_from_not_member_of_each(self):
353         # Create an authentication policy that allows accounts not belonging to
354         # both groups.
355         policy = self.create_authn_policy(
356             enforced=True,
357             user_allowed_from=(
358                 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of '
359                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
360         )
361
362         # Create a user account with the assigned policy.
363         client_creds = self._get_creds(account_type=self.AccountType.USER,
364                                        assigned_policy=policy)
365
366         # Show that we get a policy error if the machine account belongs to
367         # both groups.
368         armor_tgt = self.get_tgt(self._member_of_both_creds)
369         self._get_tgt(client_creds, armor_tgt=armor_tgt,
370                       expected_error=KDC_ERR_POLICY)
371
372         # Otherwise, authentication should succeed.
373         armor_tgt = self.get_tgt(self._member_of_one_creds)
374         self._get_tgt(client_creds, armor_tgt=armor_tgt,
375                       expected_error=0)
376
377     def test_allowed_from_not_member_of_any(self):
378         # Create an authentication policy that allows accounts belonging to
379         # neither group.
380         policy = self.create_authn_policy(
381             enforced=True,
382             user_allowed_from=(
383                 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of_Any '
384                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
385         )
386
387         # Create a user account with the assigned policy.
388         client_creds = self._get_creds(account_type=self.AccountType.USER,
389                                        assigned_policy=policy)
390
391         # Show that we get a policy error if the machine account belongs to one
392         # of the groups.
393         armor_tgt = self.get_tgt(self._member_of_one_creds)
394         self._get_tgt(client_creds, armor_tgt=armor_tgt,
395                       expected_error=KDC_ERR_POLICY)
396
397         # Otherwise, authentication should succeed.
398         armor_tgt = self.get_tgt(self._mach_creds)
399         self._get_tgt(client_creds, armor_tgt=armor_tgt,
400                       expected_error=0)
401
402     def test_allowed_from_member_of_each_deny(self):
403         # Create an authentication policy that denies accounts belonging to
404         # both groups, and allows other accounts.
405         policy = self.create_authn_policy(
406             enforced=True,
407             user_allowed_from=(
408                 f'O:SYD:(XD;;CR;;;WD;(Member_of '
409                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
410                 f'(A;;CR;;;WD)'),
411         )
412
413         # Create a user account with the assigned policy.
414         client_creds = self._get_creds(account_type=self.AccountType.USER,
415                                        assigned_policy=policy)
416
417         # Show that we get a policy error if the machine account belongs to
418         # both groups.
419         armor_tgt = self.get_tgt(self._member_of_both_creds)
420         self._get_tgt(client_creds, armor_tgt=armor_tgt,
421                       expected_error=KDC_ERR_POLICY)
422
423         # Otherwise, authentication should succeed.
424         armor_tgt = self.get_tgt(self._member_of_one_creds)
425         self._get_tgt(client_creds, armor_tgt=armor_tgt,
426                       expected_error=0)
427
428     def test_allowed_from_member_of_any_deny(self):
429         # Create an authentication policy that denies accounts belonging to
430         # either group, and allows other accounts.
431         policy = self.create_authn_policy(
432             enforced=True,
433             user_allowed_from=(
434                 f'O:SYD:(XD;;CR;;;WD;(Member_of_Any '
435                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
436                 f'(A;;CR;;;WD)'),
437         )
438
439         # Create a user account with the assigned policy.
440         client_creds = self._get_creds(account_type=self.AccountType.USER,
441                                        assigned_policy=policy)
442
443         # Show that we get a policy error if the machine account belongs to
444         # either group.
445         armor_tgt = self.get_tgt(self._member_of_one_creds)
446         self._get_tgt(client_creds, armor_tgt=armor_tgt,
447                       expected_error=KDC_ERR_POLICY)
448
449         # Otherwise, authentication should succeed.
450         armor_tgt = self.get_tgt(self._mach_creds)
451         self._get_tgt(client_creds, armor_tgt=armor_tgt,
452                       expected_error=0)
453
454     def test_allowed_from_not_member_of_each_deny(self):
455         # Create an authentication policy that denies accounts not belonging to
456         # both groups, and allows other accounts.
457         policy = self.create_authn_policy(
458             enforced=True,
459             user_allowed_from=(
460                 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of '
461                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
462                 f'(A;;CR;;;WD)'),
463         )
464
465         # Create a user account with the assigned policy.
466         client_creds = self._get_creds(account_type=self.AccountType.USER,
467                                        assigned_policy=policy)
468
469         # Show that we get a policy error if the machine account doesn’t belong
470         # to both groups.
471         armor_tgt = self.get_tgt(self._member_of_one_creds)
472         self._get_tgt(client_creds, armor_tgt=armor_tgt,
473                       expected_error=KDC_ERR_POLICY)
474
475         # Otherwise, authentication should succeed.
476         armor_tgt = self.get_tgt(self._member_of_both_creds)
477         self._get_tgt(client_creds, armor_tgt=armor_tgt,
478                       expected_error=0)
479
480     def test_allowed_from_not_member_of_any_deny(self):
481         # Create an authentication policy that denies accounts belonging to
482         # neither group, and allows other accounts.
483         policy = self.create_authn_policy(
484             enforced=True,
485             user_allowed_from=(
486                 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of_Any '
487                 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
488                 f'(A;;CR;;;WD)'),
489         )
490
491         # Create a user account with the assigned policy.
492         client_creds = self._get_creds(account_type=self.AccountType.USER,
493                                        assigned_policy=policy)
494
495         # Show that we get a policy error if the machine account belongs to
496         # neither group.
497         armor_tgt = self.get_tgt(self._mach_creds)
498         self._get_tgt(client_creds, armor_tgt=armor_tgt,
499                       expected_error=KDC_ERR_POLICY)
500
501         # Otherwise, authentication should succeed.
502         armor_tgt = self.get_tgt(self._member_of_one_creds)
503         self._get_tgt(client_creds, armor_tgt=armor_tgt,
504                       expected_error=0)
505
506     def test_allowed_from_unenforced_silo_equals(self):
507         # Create an authentication policy that allows accounts belonging to the
508         # unenforced silo.
509         policy = self.create_authn_policy(
510             enforced=True,
511             user_allowed_from=(
512                 f'O:SYD:(XA;;CR;;;WD;'
513                 f'(@User.ad://ext/AuthenticationSilo == '
514                 f'"{self._unenforced_silo}"))'),
515         )
516
517         # Create a user account with the assigned policy.
518         client_creds = self._get_creds(account_type=self.AccountType.USER,
519                                        assigned_policy=policy)
520
521         # As the policy is unenforced, the ‘ad://ext/AuthenticationSilo’ claim
522         # will not be present in the TGT, and the ACE will never allow access.
523
524         armor_tgt = self.get_tgt(self._mach_creds)
525         self._get_tgt(client_creds, armor_tgt=armor_tgt,
526                       expected_error=KDC_ERR_POLICY)
527
528         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
529         self._get_tgt(client_creds, armor_tgt=armor_tgt,
530                       expected_error=KDC_ERR_POLICY)
531
532         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
533         self._get_tgt(client_creds, armor_tgt=armor_tgt,
534                       expected_error=KDC_ERR_POLICY)
535
536     def test_allowed_from_enforced_silo_equals(self):
537         # Create an authentication policy that allows accounts belonging to the
538         # enforced silo.
539         policy = self.create_authn_policy(
540             enforced=True,
541             user_allowed_from=(
542                 f'O:SYD:(XA;;CR;;;WD;'
543                 f'(@User.ad://ext/AuthenticationSilo == '
544                 f'"{self._enforced_silo}"))'),
545         )
546
547         # Create a user account with the assigned policy.
548         client_creds = self._get_creds(account_type=self.AccountType.USER,
549                                        assigned_policy=policy)
550
551         # Show that we get a policy error if the machine account does not
552         # belong to the silo.
553         armor_tgt = self.get_tgt(self._mach_creds)
554         self._get_tgt(client_creds, armor_tgt=armor_tgt,
555                       expected_error=KDC_ERR_POLICY)
556
557         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
558         self._get_tgt(client_creds, armor_tgt=armor_tgt,
559                       expected_error=KDC_ERR_POLICY)
560
561         # Otherwise, authentication should succeed.
562         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
563         self._get_tgt(client_creds, armor_tgt=armor_tgt,
564                       expected_error=0)
565
566     def test_allowed_from_unenforced_silo_not_equals(self):
567         # Create an authentication policy that allows accounts not belonging to
568         # the unenforced silo.
569         policy = self.create_authn_policy(
570             enforced=True,
571             user_allowed_from=(
572                 f'O:SYD:(XA;;CR;;;WD;'
573                 f'(@User.ad://ext/AuthenticationSilo != '
574                 f'"{self._unenforced_silo}"))'),
575         )
576
577         # Create a user account with the assigned policy.
578         client_creds = self._get_creds(account_type=self.AccountType.USER,
579                                        assigned_policy=policy)
580
581         # Show that authentication fails unless the account belongs to a silo
582         # other than the unenforced silo.
583
584         armor_tgt = self.get_tgt(self._mach_creds)
585         self._get_tgt(client_creds, armor_tgt=armor_tgt,
586                       expected_error=KDC_ERR_POLICY)
587
588         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
589         self._get_tgt(client_creds, armor_tgt=armor_tgt,
590                       expected_error=KDC_ERR_POLICY)
591
592         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
593         self._get_tgt(client_creds, armor_tgt=armor_tgt,
594                       expected_error=0)
595
596     def test_allowed_from_enforced_silo_not_equals(self):
597         # Create an authentication policy that allows accounts not belonging to
598         # the enforced silo.
599         policy = self.create_authn_policy(
600             enforced=True,
601             user_allowed_from=(
602                 f'O:SYD:(XA;;CR;;;WD;'
603                 f'(@User.ad://ext/AuthenticationSilo != '
604                 f'"{self._enforced_silo}"))'),
605         )
606
607         # Create a user account with the assigned policy.
608         client_creds = self._get_creds(account_type=self.AccountType.USER,
609                                        assigned_policy=policy)
610
611         # Show that authentication always fails, as none of the machine
612         # accounts belong to a silo that is not the enforced one. (The
613         # unenforced silo doesn’t count, as it will never appear in a claim.)
614
615         armor_tgt = self.get_tgt(self._mach_creds)
616         self._get_tgt(client_creds, armor_tgt=armor_tgt,
617                       expected_error=KDC_ERR_POLICY)
618
619         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
620         self._get_tgt(client_creds, armor_tgt=armor_tgt,
621                       expected_error=KDC_ERR_POLICY)
622
623         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
624         self._get_tgt(client_creds, armor_tgt=armor_tgt,
625                       expected_error=KDC_ERR_POLICY)
626
627     def test_allowed_from_unenforced_silo_equals_deny(self):
628         # Create an authentication policy that denies accounts belonging to the
629         # unenforced silo, and allows other accounts.
630         policy = self.create_authn_policy(
631             enforced=True,
632             user_allowed_from=(
633                 f'O:SYD:(XD;;CR;;;WD;'
634                 f'(@User.ad://ext/AuthenticationSilo == '
635                 f'"{self._unenforced_silo}"))'
636                 f'(A;;CR;;;WD)'),
637         )
638
639         # Create a user account with the assigned policy.
640         client_creds = self._get_creds(account_type=self.AccountType.USER,
641                                        assigned_policy=policy)
642
643         # Show that authentication fails unless the account belongs to a silo
644         # other than the unenforced silo.
645
646         armor_tgt = self.get_tgt(self._mach_creds)
647         self._get_tgt(client_creds, armor_tgt=armor_tgt,
648                       expected_error=KDC_ERR_POLICY)
649
650         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
651         self._get_tgt(client_creds, armor_tgt=armor_tgt,
652                       expected_error=KDC_ERR_POLICY)
653
654         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
655         self._get_tgt(client_creds, armor_tgt=armor_tgt,
656                       expected_error=0)
657
658     def test_allowed_from_enforced_silo_equals_deny(self):
659         # Create an authentication policy that denies accounts belonging to the
660         # enforced silo, and allows other accounts.
661         policy = self.create_authn_policy(
662             enforced=True,
663             user_allowed_from=(
664                 f'O:SYD:(XD;;CR;;;WD;'
665                 f'(@User.ad://ext/AuthenticationSilo == '
666                 f'"{self._enforced_silo}"))'
667                 f'(A;;CR;;;WD)'),
668         )
669
670         # Create a user account with the assigned policy.
671         client_creds = self._get_creds(account_type=self.AccountType.USER,
672                                        assigned_policy=policy)
673
674         # Show that authentication always fails, as none of the machine
675         # accounts belong to a silo that is not the enforced one. (The
676         # unenforced silo doesn’t count, as it will never appear in a claim.)
677
678         armor_tgt = self.get_tgt(self._mach_creds)
679         self._get_tgt(client_creds, armor_tgt=armor_tgt,
680                       expected_error=KDC_ERR_POLICY)
681
682         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
683         self._get_tgt(client_creds, armor_tgt=armor_tgt,
684                       expected_error=KDC_ERR_POLICY)
685
686         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
687         self._get_tgt(client_creds, armor_tgt=armor_tgt,
688                       expected_error=KDC_ERR_POLICY)
689
690     def test_allowed_from_unenforced_silo_not_equals_deny(self):
691         # Create an authentication policy that denies accounts not belonging to
692         # the unenforced silo, and allows other accounts.
693         policy = self.create_authn_policy(
694             enforced=True,
695             user_allowed_from=(
696                 f'O:SYD:(XD;;CR;;;WD;'
697                 f'(@User.ad://ext/AuthenticationSilo != '
698                 f'"{self._unenforced_silo}"))'
699                 f'(A;;CR;;;WD)'),
700         )
701
702         # Create a user account with the assigned policy.
703         client_creds = self._get_creds(account_type=self.AccountType.USER,
704                                        assigned_policy=policy)
705
706         # Show that authentication always fails, as the unenforced silo will
707         # never appear in a claim.
708
709         armor_tgt = self.get_tgt(self._mach_creds)
710         self._get_tgt(client_creds, armor_tgt=armor_tgt,
711                       expected_error=KDC_ERR_POLICY)
712
713         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
714         self._get_tgt(client_creds, armor_tgt=armor_tgt,
715                       expected_error=KDC_ERR_POLICY)
716
717         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
718         self._get_tgt(client_creds, armor_tgt=armor_tgt,
719                       expected_error=KDC_ERR_POLICY)
720
721     def test_allowed_from_enforced_silo_not_equals_deny(self):
722         # Create an authentication policy that denies accounts not belonging to
723         # the enforced silo, and allows other accounts.
724         policy = self.create_authn_policy(
725             enforced=True,
726             user_allowed_from=(
727                 f'O:SYD:(XD;;CR;;;WD;'
728                 f'(@User.ad://ext/AuthenticationSilo != '
729                 f'"{self._enforced_silo}"))'
730                 f'(A;;CR;;;WD)'),
731         )
732
733         # Create a user account with the assigned policy.
734         client_creds = self._get_creds(account_type=self.AccountType.USER,
735                                        assigned_policy=policy)
736
737         # Show that authentication fails unless the account belongs to the
738         # enforced silo.
739
740         armor_tgt = self.get_tgt(self._mach_creds)
741         self._get_tgt(client_creds, armor_tgt=armor_tgt,
742                       expected_error=KDC_ERR_POLICY)
743
744         armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
745         self._get_tgt(client_creds, armor_tgt=armor_tgt,
746                       expected_error=KDC_ERR_POLICY)
747
748         armor_tgt = self.get_tgt(self._member_of_enforced_silo)
749         self._get_tgt(client_creds, armor_tgt=armor_tgt,
750                       expected_error=0)
751
752     def test_allowed_from_claim_equals_claim(self):
753         # Create a couple of claim types.
754
755         claim0_id = self.get_new_username()
756         self.create_claim(claim0_id,
757                           enabled=True,
758                           attribute='carLicense',
759                           single_valued=True,
760                           source_type='AD',
761                           for_classes=['computer'],
762                           value_type=claims.CLAIM_TYPE_STRING)
763
764         claim1_id = self.get_new_username()
765         self.create_claim(claim1_id,
766                           enabled=True,
767                           attribute='comment',
768                           single_valued=True,
769                           source_type='AD',
770                           for_classes=['computer'],
771                           value_type=claims.CLAIM_TYPE_STRING)
772
773         # Create an authentication policy that allows accounts having the two
774         # claims be equal.
775         policy = self.create_authn_policy(
776             enforced=True,
777             user_allowed_from=(
778                 f'O:SYD:(XA;;CR;;;WD;'
779                 f'(@User.{claim0_id} == @User.{claim1_id}))'),
780         )
781
782         # Create a user account with the assigned policy.
783         client_creds = self._get_creds(account_type=self.AccountType.USER,
784                                        assigned_policy=policy)
785
786         armor_tgt = self.get_tgt(self._mach_creds)
787         self._get_tgt(client_creds, armor_tgt=armor_tgt,
788                       expected_error=KDC_ERR_POLICY)
789
790         mach_creds = self.get_cached_creds(
791             account_type=self.AccountType.COMPUTER,
792             opts={
793                 'additional_details': (
794                     ('carLicense', 'foo'),
795                     ('comment', 'foo'),
796                 ),
797             })
798         armor_tgt = self.get_tgt(
799             mach_creds,
800             expect_client_claims=True,
801             expected_client_claims={
802                 claim0_id: {
803                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
804                     'type': claims.CLAIM_TYPE_STRING,
805                     'values': ('foo',),
806                 },
807                 claim1_id: {
808                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
809                     'type': claims.CLAIM_TYPE_STRING,
810                     'values': ('foo',),
811                 },
812             })
813         self._get_tgt(client_creds, armor_tgt=armor_tgt,
814                       expected_error=0)
815
816     def test_allowed_to_client_equals(self):
817         client_claim_attr = 'carLicense'
818         client_claim_value = 'foo bar'
819         client_claim_values = client_claim_value,
820
821         client_claim_id = self.get_new_username()
822         self.create_claim(client_claim_id,
823                           enabled=True,
824                           attribute=client_claim_attr,
825                           single_valued=True,
826                           source_type='AD',
827                           for_classes=['user'],
828                           value_type=claims.CLAIM_TYPE_STRING)
829
830         # Create an authentication policy that allows authorization if the
831         # client has a particular claim value.
832         policy = self.create_authn_policy(
833             enforced=True,
834             computer_allowed_to=(
835                 f'O:SYD:(XA;;CR;;;WD;'
836                 f'((@User.{client_claim_id} == "{client_claim_value}")))'),
837         )
838
839         # Create a computer account with the assigned policy.
840         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
841                                        assigned_policy=policy)
842
843         armor_tgt = self.get_tgt(self._mach_creds)
844
845         # Create a user account without the claim value.
846         client_creds = self.get_cached_creds(
847             account_type=self.AccountType.USER)
848         tgt = self.get_tgt(client_creds)
849         # Show that obtaining a service ticket is denied.
850         self._tgs_req(
851             tgt, KDC_ERR_POLICY, client_creds, target_creds,
852             armor_tgt=armor_tgt,
853             expect_edata=self.expect_padata_outer,
854             # We aren’t particular about whether or not we get an NTSTATUS.
855             expect_status=None,
856             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
857             check_patypes=False)
858
859         # Create a user account with the claim value.
860         client_creds = self.get_cached_creds(
861             account_type=self.AccountType.USER,
862             opts={
863                 'additional_details': (
864                     (client_claim_attr, client_claim_values),
865                 ),
866             })
867         tgt = self.get_tgt(
868             client_creds,
869             expect_client_claims=True,
870             expected_client_claims={
871                 client_claim_id: {
872                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
873                     'type': claims.CLAIM_TYPE_STRING,
874                     'values': client_claim_values,
875                 },
876             })
877         # Show that obtaining a service ticket is allowed.
878         self._tgs_req(tgt, 0, client_creds, target_creds,
879                       armor_tgt=armor_tgt)
880
881     def test_allowed_to_device_equals(self):
882         device_claim_attr = 'carLicense'
883         device_claim_value = 'bar'
884         device_claim_values = device_claim_value,
885
886         device_claim_id = self.get_new_username()
887         self.create_claim(device_claim_id,
888                           enabled=True,
889                           attribute=device_claim_attr,
890                           single_valued=True,
891                           source_type='AD',
892                           for_classes=['computer'],
893                           value_type=claims.CLAIM_TYPE_STRING)
894
895         # Create a user account.
896         client_creds = self.get_cached_creds(
897             account_type=self.AccountType.USER)
898         tgt = self.get_tgt(client_creds)
899
900         # Create an authentication policy that allows authorization if the
901         # device has a particular claim value.
902         policy = self.create_authn_policy(
903             enforced=True,
904             computer_allowed_to=(
905                 f'O:SYD:(XA;;CR;;;WD;'
906                 f'(@Device.{device_claim_id} == "{device_claim_value}"))'),
907         )
908
909         # Create a computer account with the assigned policy.
910         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
911                                        assigned_policy=policy)
912
913         armor_tgt = self.get_tgt(self._mach_creds)
914         # Show that obtaining a service ticket is denied when the claim value
915         # is not present.
916         self._tgs_req(
917             tgt, KDC_ERR_POLICY, client_creds, target_creds,
918             armor_tgt=armor_tgt,
919             expect_edata=self.expect_padata_outer,
920             # We aren’t particular about whether or not we get an NTSTATUS.
921             expect_status=None,
922             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
923             check_patypes=False)
924
925         mach_creds = self.get_cached_creds(
926             account_type=self.AccountType.COMPUTER,
927             opts={
928                 'additional_details': (
929                     (device_claim_attr, device_claim_values),
930                 ),
931             })
932         armor_tgt = self.get_tgt(
933             mach_creds,
934             expect_client_claims=True,
935             expected_client_claims={
936                 device_claim_id: {
937                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
938                     'type': claims.CLAIM_TYPE_STRING,
939                     'values': device_claim_values,
940                 },
941             })
942         # Show that obtaining a service ticket is allowed when the claim value
943         # is present.
944         self._tgs_req(tgt, 0, client_creds, target_creds,
945                       armor_tgt=armor_tgt)
946
947     claim_against_claim_cases = [
948         # If either side is missing, the result is unknown.
949         ((), '==', (), None),
950         ((), '!=', (), None),
951         ('a', '==', (), None),
952         ((), '==', 'b', None),
953         # Straightforward equality and inequality checks work.
954         ('foo', '==', 'foo', True),
955         ('foo', '==', 'bar', False),
956         ('foo', '!=', 'foo', False),
957         ('foo', '!=', 'bar', True),
958         # We can perform less‐than and greater‐than operations.
959         ('cat', '<', 'dog', True),
960         ('cat', '<=', 'dog', True),
961         ('cat', '>', 'dog', False),
962         ('cat', '>=', 'dog', False),
963         ('foo', '<=', 'foo', True),
964         ('foo', '>=', 'foo', True),
965         ('foo', '<', 'foo bar', True),
966         ('foo bar', '>', 'foo', True),
967         # String comparison is case‐sensitive.
968         ('foo bar', '==', 'Foo BAR', True),
969         ('foo bar', '==', 'FOO BAR', True),
970         ('ćàț', '==', 'ĆÀȚ', True),
971         ('ḽ', '==', 'Ḽ', True),
972         ('ⅸ', '==', 'Ⅸ', True),
973         ('ꙭ', '==', 'Ꙭ', True),
974         ('ⱦ', '==', 'Ⱦ', True),  # Lowercased variant added in Unicode 5.0.
975         ('ԛԣ', '==', 'ԚԢ', True),  # All added in Unicode 5.1.
976         ('foo', '<', 'foo', True),
977         ('ćàș', '<', 'ĆÀȚ', True),
978         ('cat', '<', 'ćàț', True),
979         # This is done by converting to UPPER CASE. Hence, both ‘A’ (U+41) and
980         # ‘a’ (U+61) compare less than ‘_’ (U+5F).
981         ('A', '<', '_', True),
982         ('a', '<', '_', True),
983         # But not all uppercased/lowercased pairs are considered to be equal in
984         # this way.
985         ('ß', '<', 'ẞ', True),
986         ('ß', '>', 'SS', True),
987         ('ⳬ', '>', 'Ⳬ', True),  # Added in Unicode 5.2.
988         ('ʞ', '<', 'Ʞ', True),  # Uppercased variant added in Unicode 6.0.
989         ('ʞ', '<', 'ʟ', True),  # U+029E < U+029F < U+A7B0 (upper variant, Ʞ)
990         ('ꞧ', '>', 'Ꞧ', True),  # Added in Unicode 6.0.
991         ('ɜ', '<', 'Ɜ', True),  # Uppercased variant added in Unicode 7.0.
992         #
993         # Strings are compared as UTF‐16 code units, rather than as Unicode
994         # codepoints. So while you might expect ‘𐀀’ (U+10000) to compare
995         # greater than ‘豈’ (U+F900), it is actually considered to be the
996         # *smaller* of the pair. That is because it is encoded as a sequence of
997         # two code units, 0xd800 and 0xdc00, which combination compares less
998         # than the single code unit 0xf900.
999         ('ퟻ', '<', '𐀀', True),
1000         ('𐀀', '<', '豈', True),
1001         ('ퟻ', '<', '豈', True),
1002         # Composites can be compared.
1003         (('foo', 'bar'), '==', ('foo', 'bar'), True),
1004         (('foo', 'bar'), '==', ('foo', 'baz'), False),
1005         # The individual components don’t have to match in case.
1006         (('foo', 'bar'), '==', ('FOO', 'BAR'), True),
1007         # Nor must they match in order.
1008         (('foo', 'bar'), '==', ('bar', 'foo'), True),
1009         # Composites of different lengths compare unequal.
1010         (('foo', 'bar'), '!=', 'foo', True),
1011         (('foo', 'bar'), '!=', ('foo', 'bar', 'baz'), True),
1012         # But composites don’t have a defined ordering, and aren’t considered
1013         # greater or lesser than one another.
1014         (('foo', 'bar'), '<', ('foo', 'bar'), None),
1015         (('foo', 'bar'), '<=', ('foo', 'bar'), None),
1016         (('foo', 'bar'), '>', ('foo', 'bar', 'baz'), None),
1017         (('foo', 'bar'), '>=', ('foo', 'bar', 'baz'), None),
1018         # We can test for containment.
1019         (('foo', 'bar'), 'Contains', ('FOO'), True),
1020         (('foo', 'bar'), 'Contains', ('foo', 'bar'), True),
1021         (('foo', 'bar'), 'Not_Contains', ('foo', 'bar'), False),
1022         (('foo', 'bar'), 'Contains', ('foo', 'bar', 'baz'), False),
1023         (('foo', 'bar'), 'Not_Contains', ('foo', 'bar', 'baz'), True),
1024         # We can test whether the operands have any elements in common.
1025         ('foo', 'Any_of', 'foo', True),
1026         (('foo', 'bar'), 'Any_of', 'BAR', True),
1027         (('foo', 'bar'), 'Any_of', 'baz', False),
1028         (('foo', 'bar'), 'Not_Any_of', 'baz', True),
1029         (('foo', 'bar'), 'Any_of', ('bar', 'baz'), True),
1030         (('foo', 'bar'), 'Not_Any_of', ('bar', 'baz'), False),
1031     ]
1032
1033     claim_against_literal_cases = [
1034         # String comparisons also work against literals.
1035         ('foo bar', '==', '"foo bar"', True),
1036         # Composites can be compared with literals.
1037         ((), '==', '{{}}', None),
1038         ('foo', '!=', '{{}}', True),
1039         ('bar', '==', '{{"bar"}}', True),
1040         (('apple', 'banana'), '==', '{{"APPLE", "BANANA"}}', True),
1041         (('apple', 'banana'), '==', '{{"BANANA", "APPLE"}}', True),
1042         (('apple', 'banana'), '==', '{{"apple", "banana", "apple"}}', False),
1043         # We can test for containment.
1044         ((), 'Contains', '{{}}', False),
1045         ((), 'Not_Contains', '{{}}', True),
1046         ((), 'Contains', '{{"foo"}}', None),
1047         ((), 'Not_Contains', '{{"foo", "bar"}}', None),
1048         ('foo', 'Contains', '{{}}', False),
1049         ('bar', 'Contains', '{{"bar"}}', True),
1050         (('foo', 'bar'), 'Contains', '{{"foo", "bar"}}', True),
1051         (('foo', 'bar'), 'Contains', '{{"foo", "bar", "baz"}}', False),
1052         # The right‐hand side of Contains or Not_Contains does not have to be a
1053         # composite.
1054         ('foo', 'Contains', '"foo"', True),
1055         (('foo', 'bar'), 'Not_Contains', '"foo"', False),
1056         # It’s fine if the right‐hand side contains duplicate elements.
1057         (('foo', 'bar'), 'Contains', '{{"foo", "bar", "bar"}}', True),
1058         # We can test whether the operands have any elements in common.
1059         ((), 'Any_of', '{{}}', None),
1060         ((), 'Not_Any_of', '{{}}', None),
1061         ('foo', 'Any_of', '{{}}', False),
1062         ('foo', 'Not_Any_of', '{{}}', True),
1063         ('bar', 'Any_of', '{{"bar"}}', True),
1064         (('foo', 'bar'), 'Any_of', '{{"bar", "baz"}}', True),
1065         (('foo', 'bar'), 'Any_of', '{{"baz"}}', False),
1066         # The right‐hand side of Any_of or Not_Any_of must be a composite.
1067         ('foo', 'Any_of', '"foo"', None),
1068         (('foo', 'bar'), 'Not_Any_of', '"baz"', None),
1069         # A string won’t compare equal to a numeric literal.
1070         ('42', '==', '"42"', True),
1071         ('42', '==', '42', None),
1072         # Nor can composites that mismatch in type be compared.
1073         (('123', '456'), '==', '{{"123", "456"}}', True),
1074         (('654', '321'), '==', '{{654, 321}}', None),
1075         (('foo', 'bar'), 'Contains', '{{1, 2, 3}}', None),
1076     ]
1077
1078     def _test_cmp_with_args(self, lhs, op, rhs, outcome, rhs_is_literal=False):
1079         # Construct a conditional ACE expression that evaluates to True if the
1080         # two claim values are equal.
1081         if rhs_is_literal:
1082             self.assertIsInstance(rhs, str)
1083             rhs = rhs.format(self=self)
1084             expression = f'(@User.{self.claim0_id} {op} {rhs})'
1085         else:
1086             expression = f'(@User.{self.claim0_id} {op} @User.{self.claim1_id})'
1087
1088         # Create an authentication policy that will allow authentication when
1089         # the expression is true, and a second that will deny authentication in
1090         # the same circumstance. By observing the results of authenticating
1091         # against each of these policies in turn, we can determine whether the
1092         # expression evaluates to a True, False, or Unknown value.
1093
1094         allowed_sddl = f'O:SYD:(XA;;CR;;;WD;{expression})'
1095         denied_sddl = f'O:SYD:(XD;;CR;;;WD;{expression})(A;;CR;;;WD)'
1096
1097         allowed_policy = self.create_authn_policy(
1098             enforced=True,
1099             user_allowed_from=allowed_sddl)
1100         denied_policy = self.create_authn_policy(
1101             enforced=True,
1102             user_allowed_from=denied_sddl)
1103
1104         # Create a user account assigned to each policy.
1105         allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1106                                         assigned_policy=allowed_policy)
1107         denied_creds = self._get_creds(account_type=self.AccountType.USER,
1108                                        assigned_policy=denied_policy)
1109
1110         additional_details = ()
1111         if lhs:
1112             additional_details += ((self.claim0_attr, lhs),)
1113         if rhs and not rhs_is_literal:
1114             additional_details += ((self.claim1_attr, rhs),)
1115
1116         # Create a computer account with the provided attribute values.
1117         mach_creds = self.get_cached_creds(
1118             account_type=self.AccountType.COMPUTER,
1119             opts={'additional_details': additional_details})
1120
1121         def expected_values(val):
1122             if isinstance(val, (str, bytes)):
1123                 return val,
1124
1125             return val
1126
1127         expected_client_claims = {}
1128         if lhs:
1129             expected_client_claims[self.claim0_id] = {
1130                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1131                 'type': claims.CLAIM_TYPE_STRING,
1132                 'values': expected_values(lhs),
1133             }
1134         if rhs and not rhs_is_literal:
1135             expected_client_claims[self.claim1_id] = {
1136                 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1137                 'type': claims.CLAIM_TYPE_STRING,
1138                 'values': expected_values(rhs),
1139             }
1140
1141         # Fetch the computer account’s TGT, and ensure it contains the claims.
1142         armor_tgt = self.get_tgt(
1143             mach_creds,
1144             expect_client_claims=bool(expected_client_claims) or None,
1145             expected_client_claims=expected_client_claims)
1146
1147         # The first or the second authentication request is expected to succeed
1148         # if the outcome is True or False, respectively. An Unknown outcome,
1149         # represented by None, will result in a policy error in either case.
1150         allowed_error = 0 if outcome is True else KDC_ERR_POLICY
1151         denied_error = 0 if outcome is False else KDC_ERR_POLICY
1152
1153         # Attempt to authenticate and ensure that we observe the expected
1154         # results.
1155         self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1156                       expected_error=allowed_error)
1157         self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1158                       expected_error=denied_error)
1159
1160     pac_claim_cases = [
1161         # Test a very simple expression with various claims.
1162         ([
1163             (claims.CLAIMS_SOURCE_TYPE_AD, [
1164                 ('{non_empty_string}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1165             ]),
1166         ], '{non_empty_string}', True),
1167         ([
1168             (claims.CLAIMS_SOURCE_TYPE_AD, [
1169                 ('{zero_uint}', claims.CLAIM_TYPE_UINT64, [0]),
1170             ]),
1171         ], '{zero_uint}', False),
1172         ([
1173             (claims.CLAIMS_SOURCE_TYPE_AD, [
1174                 ('{nonzero_uint}', claims.CLAIM_TYPE_UINT64, [1]),
1175             ]),
1176         ], '{nonzero_uint}', True),
1177         ([
1178             (claims.CLAIMS_SOURCE_TYPE_AD, [
1179                 ('{zero_uints}', claims.CLAIM_TYPE_UINT64, [0, 0]),
1180             ]),
1181         ], '{zero_uints}', KDC_ERR_GENERIC),
1182         ([
1183             (claims.CLAIMS_SOURCE_TYPE_AD, [
1184                 ('{zero_and_one_uint}', claims.CLAIM_TYPE_UINT64, [0, 1]),
1185             ]),
1186         ], '{zero_and_one_uint}', True),
1187         ([
1188             (claims.CLAIMS_SOURCE_TYPE_AD, [
1189                 ('{one_and_zero_uint}', claims.CLAIM_TYPE_UINT64, [1, 0]),
1190             ]),
1191         ], '{one_and_zero_uint}', True),
1192         ([
1193             (claims.CLAIMS_SOURCE_TYPE_AD, [
1194                 ('{zero_int}', claims.CLAIM_TYPE_INT64, [0]),
1195             ]),
1196         ], '{zero_int}', False),
1197         ([
1198             (claims.CLAIMS_SOURCE_TYPE_AD, [
1199                 ('{nonzero_int}', claims.CLAIM_TYPE_INT64, [1]),
1200             ]),
1201         ], '{nonzero_int}', True),
1202         ([
1203             (claims.CLAIMS_SOURCE_TYPE_AD, [
1204                 ('{zero_ints}', claims.CLAIM_TYPE_INT64, [0, 0]),
1205             ]),
1206         ], '{zero_ints}', KDC_ERR_GENERIC),
1207         ([
1208             (claims.CLAIMS_SOURCE_TYPE_AD, [
1209                 ('{zero_and_one_int}', claims.CLAIM_TYPE_INT64, [0, 1]),
1210             ]),
1211         ], '{zero_and_one_int}', True),
1212         ([
1213             (claims.CLAIMS_SOURCE_TYPE_AD, [
1214                 ('{one_and_zero_int}', claims.CLAIM_TYPE_INT64, [1, 0]),
1215             ]),
1216         ], '{one_and_zero_int}', True),
1217         ([
1218             (claims.CLAIMS_SOURCE_TYPE_AD, [
1219                 ('{false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1220             ]),
1221         ], '{false_boolean}', False),
1222         ([
1223             (claims.CLAIMS_SOURCE_TYPE_AD, [
1224                 ('{true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1225             ]),
1226         ], '{true_boolean}', True),
1227         ([
1228             (claims.CLAIMS_SOURCE_TYPE_AD, [
1229                 ('{false_booleans}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1230             ]),
1231         ], '{false_booleans}', KDC_ERR_GENERIC),
1232         ([
1233             (claims.CLAIMS_SOURCE_TYPE_AD, [
1234                 ('{false_and_true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0, 1]),
1235             ]),
1236         ], '{false_and_true_boolean}', True),
1237         ([
1238             (claims.CLAIMS_SOURCE_TYPE_AD, [
1239                 ('{true_and_false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1, 0]),
1240             ]),
1241         ], '{true_and_false_boolean}', True),
1242         # Test a basic comparison against a literal.
1243         ([
1244             (claims.CLAIMS_SOURCE_TYPE_AD, [
1245                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1246             ]),
1247         ], '{a} == "foo bar"', True),
1248         # Claims can be compared against one another.
1249         ([
1250             (claims.CLAIMS_SOURCE_TYPE_AD, [
1251                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1252                 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO BAR']),
1253             ]),
1254         ], '{a} == {b}', True),
1255         ([
1256             (claims.CLAIMS_SOURCE_TYPE_AD, [
1257                 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO', 'BAR', 'BAZ']),
1258                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'bar', 'baz']),
1259             ]),
1260         ], '{a} != {b}', False),
1261         # Certificate claims are also valid.
1262         ([
1263             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1264                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1265             ]),
1266         ], '{a} == "foo"', True),
1267         # Other claim source types are ignored.
1268         ([
1269             (0, [
1270                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1271             ]),
1272         ], '{a} == "foo"', None),
1273         ([
1274             (3, [
1275                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1276             ]),
1277         ], '{a} == "foo"', None),
1278         # If multiple claims have the same ID, the *last* one takes precedence.
1279         ([
1280             (claims.CLAIMS_SOURCE_TYPE_AD, [
1281                 ('{a}', claims.CLAIM_TYPE_STRING, ['this is not the value…']),
1282                 ('{a}', claims.CLAIM_TYPE_STRING, ['…nor is this…']),
1283             ]),
1284             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1285                 ('{a}', claims.CLAIM_TYPE_STRING, ['…and this isn’t either.']),
1286             ]),
1287             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1288                 ('{a}', claims.CLAIM_TYPE_STRING, ['here’s the actual value!']),
1289             ]),
1290             (3, [
1291                 ('{a}', claims.CLAIM_TYPE_STRING, ['this is a red herring.']),
1292             ]),
1293         ], '{a} == "here’s the actual value!"', True),
1294         # Claim values can be empty.
1295         ([
1296             (claims.CLAIMS_SOURCE_TYPE_AD, [
1297                 ('{empty_claim_string}', claims.CLAIM_TYPE_STRING, []),
1298             ]),
1299         ], '{empty_claim_string} != "foo bar"', None),
1300         ([
1301             (claims.CLAIMS_SOURCE_TYPE_AD, [
1302                 ('{empty_claim_boolean}', claims.CLAIM_TYPE_BOOLEAN, []),
1303             ]),
1304         ], 'Exists {empty_claim_boolean}', None),
1305         # Test unsigned integer equality.
1306         ([
1307             (claims.CLAIMS_SOURCE_TYPE_AD, [
1308                 ('{a}', claims.CLAIM_TYPE_UINT64, [42]),
1309             ]),
1310         ], '{a} == 42', True),
1311         ([
1312             (claims.CLAIMS_SOURCE_TYPE_AD, [
1313                 ('{a}', claims.CLAIM_TYPE_UINT64, [0]),
1314             ]),
1315         ], '{a} == 3', False),
1316         ([
1317             (claims.CLAIMS_SOURCE_TYPE_AD, [
1318                 ('{a}', claims.CLAIM_TYPE_UINT64, [1, 2, 3]),
1319             ]),
1320         ], '{a} == {{1, 2, 3}}', True),
1321         ([
1322             (claims.CLAIMS_SOURCE_TYPE_AD, [
1323                 ('{a}', claims.CLAIM_TYPE_UINT64, [4, 5, 6]),
1324             ]),
1325         ], '{a} != {{1, 2, 3}}', True),
1326         # Test unsigned integer comparison. Ensure we don’t run into any
1327         # integer overflow issues.
1328         ([
1329             (claims.CLAIMS_SOURCE_TYPE_AD, [
1330                 ('{a}', claims.CLAIM_TYPE_UINT64, [1 << 32]),
1331             ]),
1332         ], '{a} > 0', True),
1333         # Test signed integer comparisons.
1334         ([
1335             (claims.CLAIMS_SOURCE_TYPE_AD, [
1336                 ('{a}', claims.CLAIM_TYPE_INT64, [42]),
1337             ]),
1338         ], '{a} == 42', True),
1339         ([
1340             (claims.CLAIMS_SOURCE_TYPE_AD, [
1341                 ('{a}', claims.CLAIM_TYPE_INT64, [42 << 32]),
1342             ]),
1343         ], f'{{a}} == {42 << 32}', True),
1344         # Test boolean claims. Be careful! Windows will *crash* if you send it
1345         # claims that aren’t real booleans (not 0 or 1). I doubt Microsoft will
1346         # consider this a security issue though.
1347         ([
1348             (claims.CLAIMS_SOURCE_TYPE_AD, [
1349                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [2]),
1350                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [3]),
1351             ]),
1352         ], '{a} == {b}', (None, CRASHES_WINDOWS)),
1353         ([
1354             (claims.CLAIMS_SOURCE_TYPE_AD, [
1355                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1356                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1357             ]),
1358         ], '{a} == {b}', True),
1359         ([
1360             (claims.CLAIMS_SOURCE_TYPE_AD, [
1361                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1362             ]),
1363         ], '{a} == 42', None),
1364         ([
1365             (claims.CLAIMS_SOURCE_TYPE_AD, [
1366                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1367                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1368             ]),
1369         ], '{a} && {b}', True),
1370         ([
1371             (claims.CLAIMS_SOURCE_TYPE_AD, [
1372                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1373                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1374             ]),
1375         ], '{a} && {b}', False),
1376         ([
1377             (claims.CLAIMS_SOURCE_TYPE_AD, [
1378                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1379                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1380             ]),
1381         ], '{a} && {b}', False),
1382         ([
1383             (claims.CLAIMS_SOURCE_TYPE_AD, [
1384                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1385                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1386             ]),
1387         ], '{a} || {b}', True),
1388         ([
1389             (claims.CLAIMS_SOURCE_TYPE_AD, [
1390                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1391                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1392             ]),
1393         ], '{a} || {b}', True),
1394         ([
1395             (claims.CLAIMS_SOURCE_TYPE_AD, [
1396                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1397                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1398             ]),
1399         ], '{a} || {b}', False),
1400         ([
1401             (claims.CLAIMS_SOURCE_TYPE_AD, [
1402                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1403             ]),
1404         ], '!({a})', True),
1405         ([
1406             (claims.CLAIMS_SOURCE_TYPE_AD, [
1407                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1408             ]),
1409         ], '!(!(!(!({a}))))', False),
1410         ([
1411             (claims.CLAIMS_SOURCE_TYPE_AD, [
1412                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1413             ]),
1414         ], '!({a} && {a})', True),
1415         ([
1416             (claims.CLAIMS_SOURCE_TYPE_AD, [
1417                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1418                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1419             ]),
1420         ], '{a} && !({b} || {b})', True),
1421         ([
1422             (claims.CLAIMS_SOURCE_TYPE_AD, [
1423                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1424             ]),
1425         ], '!({a}) || !({a})', True),
1426         ([
1427             (claims.CLAIMS_SOURCE_TYPE_AD, [
1428                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1429                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1430             ]),
1431         ], '{a} && !({b})', None),
1432         # Expressions containing the ‘not’ operator are occasionally evaluated
1433         # inconsistently, as evidenced here. ‘a || !a’ evaluates to ‘unknown’…
1434         ([
1435             (claims.CLAIMS_SOURCE_TYPE_AD, [
1436                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1437             ]),
1438         ], '{a} || !({a})', None),
1439         # …but ‘!a || a’ — the same expression, just with the operands switched
1440         # round — evaluates to ‘true’.
1441         ([
1442             (claims.CLAIMS_SOURCE_TYPE_AD, [
1443                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1444             ]),
1445         ], '!({a}) || {a}', True),
1446         # This inconsistency is not observed with other boolean expressions,
1447         # such as ‘a || a’.
1448         ([
1449             (claims.CLAIMS_SOURCE_TYPE_AD, [
1450                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1451             ]),
1452         ], '{a} || ({a} || {a})', True),
1453         ([
1454             (claims.CLAIMS_SOURCE_TYPE_AD, [
1455                 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1456             ]),
1457         ], '({b} || {b}) || {b}', True),
1458         # Test a very large claim. Much larger than this, and
1459         # conditional_ace_encode_binary() will refuse to encode the conditions.
1460         ([
1461             (claims.CLAIMS_SOURCE_TYPE_AD, [
1462                 ('{large_claim}', claims.CLAIM_TYPE_STRING, ['z' * 4900]),
1463             ]),
1464         ], f'{{large_claim}} == "{"z" * 4900}"', True),
1465         # Test an even larger claim. Windows does not appear to like receiving
1466         # a claim this large.
1467         ([
1468             (claims.CLAIMS_SOURCE_TYPE_AD, [
1469                 ('{larger_claim}', claims.CLAIM_TYPE_STRING, ['z' * 100000]),
1470             ]),
1471         ], '{larger_claim} > "z"', (True, CRASHES_WINDOWS)),
1472         # Test a great number of claims. Windows does not appear to like
1473         # receiving this many claims.
1474         ([
1475             (claims.CLAIMS_SOURCE_TYPE_AD, [
1476                 ('{many_claims}', claims.CLAIM_TYPE_UINT64,
1477                  list(range(0, 100000))),
1478             ]),
1479         ], '{many_claims} Any_of "99999"', (True, CRASHES_WINDOWS)),
1480         # Test a claim with a very long name. Much larger than this, and
1481         # conditional_ace_encode_binary() will refuse to encode the conditions.
1482         ([
1483             (claims.CLAIMS_SOURCE_TYPE_AD, [
1484                 ('{long_name}', claims.CLAIM_TYPE_STRING, ['a']),
1485             ]),
1486         ], '{long_name} == "a"', {'long_name': 'z' * 4900}, True),
1487         # Test attribute name escaping.
1488         ([
1489             (claims.CLAIMS_SOURCE_TYPE_AD, [
1490                 ('{escaped_claim}', claims.CLAIM_TYPE_STRING, ['claim value']),
1491             ]),
1492         ], '{escaped_claim} == "claim value"',
1493            {'escaped_claim': '(:foo:! /&/ :bar:!)'}, True),
1494         # Test a claim whose name consists entirely of dots.
1495         ([
1496             (claims.CLAIMS_SOURCE_TYPE_AD, [
1497                 ('{dotty_claim}', claims.CLAIM_TYPE_STRING, ['a']),
1498             ]),
1499         ], '{dotty_claim} == "a"', {'dotty_claim': '...'}, True),
1500         # Test a claim whose name consists of the first thousand non‐zero
1501         # Unicode codepoints.
1502         ([
1503             (claims.CLAIMS_SOURCE_TYPE_AD, [
1504                 ('{1000_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1505             ]),
1506         ], '{1000_unicode} == "a"',
1507            {'1000_unicode': ''.join(map(chr, range(1, 1001)))}, True),
1508         # Test a claim whose name consists of some higher Unicode codepoints,
1509         # including non‐BMP ones.
1510         ([
1511             (claims.CLAIMS_SOURCE_TYPE_AD, [
1512                 ('{higher_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1513             ]),
1514         ], '{higher_unicode} == "a"',
1515            {'higher_unicode': ''.join(map(chr, range(0xfe00, 0x10800)))}, True),
1516         # Duplicate claim values are not allowed…
1517         ([
1518             (claims.CLAIMS_SOURCE_TYPE_AD, [
1519                 ('{a}', claims.CLAIM_TYPE_INT64, [42, 42, 42]),
1520             ]),
1521         ], '{a} == {a}', KDC_ERR_GENERIC),
1522         ([
1523             (claims.CLAIMS_SOURCE_TYPE_AD, [
1524                 ('{a}', claims.CLAIM_TYPE_UINT64, [42, 42]),
1525             ]),
1526         ], '{a} == {a}', KDC_ERR_GENERIC),
1527         ([
1528             (claims.CLAIMS_SOURCE_TYPE_AD, [
1529                 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'foo']),
1530             ]),
1531         ], '{a} == {a}', KDC_ERR_GENERIC),
1532         ([
1533             (claims.CLAIMS_SOURCE_TYPE_AD, [
1534                 ('{a}', claims.CLAIM_TYPE_STRING, ['FOO', 'foo']),
1535             ]),
1536         ], '{a} == {a}', KDC_ERR_GENERIC),
1537         ([
1538             (claims.CLAIMS_SOURCE_TYPE_AD, [
1539                 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1540             ]),
1541         ], '{a} == {a}', KDC_ERR_GENERIC),
1542         # …but it’s OK if duplicate values are spread across multiple claim
1543         # entries.
1544         ([
1545             (claims.CLAIMS_SOURCE_TYPE_AD, [
1546                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1547                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1548             ]),
1549             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1550                 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1551                 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1552             ]),
1553             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1554                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1555                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1556                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1557                 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1558             ]),
1559         ], '{dup} == {dup}', True),
1560         # Test invalid claim types. Be careful! Windows will *crash* if you
1561         # send it invalid claim types. I doubt Microsoft will consider this a
1562         # security issue though.
1563         ([
1564             (claims.CLAIMS_SOURCE_TYPE_AD, [
1565                 ('{invalid_sid}', 5, []),
1566             ]),
1567         ], '{invalid_sid} == {invalid_sid}', (None, CRASHES_WINDOWS)),
1568         ([
1569             (claims.CLAIMS_SOURCE_TYPE_AD, [
1570                 ('{invalid_octet_string}', 16, []),
1571             ]),
1572         ], '{invalid_octet_string} == {invalid_octet_string}', (None, CRASHES_WINDOWS)),
1573         # Sending an empty string will crash Windows.
1574         ([
1575             (claims.CLAIMS_SOURCE_TYPE_AD, [
1576                 ('{empty_string}', claims.CLAIM_TYPE_STRING, ['']),
1577             ]),
1578         ], '{empty_string}', (None, CRASHES_WINDOWS)),
1579         # But sending empty arrays is OK.
1580         ([
1581             (claims.CLAIMS_SOURCE_TYPE_AD, [
1582                 ('{empty_array}', claims.CLAIM_TYPE_INT64, []),
1583                 ('{empty_array}', claims.CLAIM_TYPE_UINT64, []),
1584                 ('{empty_array}', claims.CLAIM_TYPE_BOOLEAN, []),
1585                 ('{empty_array}', claims.CLAIM_TYPE_STRING, []),
1586             ]),
1587         ], '{empty_array}', None),
1588     ]
1589
1590     def _test_pac_claim_cmp_with_args(self,
1591                                       pac_claims,
1592                                       expression,
1593                                       claim_map,
1594                                       outcome):
1595         self.assertIsInstance(expression, str)
1596
1597         try:
1598             outcome, crashes_windows = outcome
1599             self.assertIs(crashes_windows, CRASHES_WINDOWS)
1600             if not self.crash_windows:
1601                 self.skipTest('test crashes Windows servers')
1602         except TypeError:
1603             self.assertIsNot(outcome, CRASHES_WINDOWS)
1604
1605         if claim_map is None:
1606             claim_map = {}
1607
1608         claim_ids = {}
1609
1610         def get_claim_id(claim_name):
1611             claim = claim_ids.get(claim_name)
1612             if claim is None:
1613                 claim = claim_map.pop(claim_name, None)
1614                 if claim is None:
1615                     claim = self.get_new_username()
1616
1617                 claim_ids[claim_name] = claim
1618
1619             return claim
1620
1621         def formatted_claim_expression(expr):
1622             formatter = Formatter()
1623             result = []
1624
1625             for literal_text, field_name, format_spec, conversion in (
1626                     formatter.parse(expr)):
1627                 self.assertFalse(format_spec,
1628                                  f'format specifier ({format_spec}) should '
1629                                  f'not be specified')
1630                 self.assertFalse(conversion,
1631                                  f'conversion ({conversion}) should not be '
1632                                  'specified')
1633
1634                 result.append(literal_text)
1635
1636                 if field_name is not None:
1637                     self.assertTrue(field_name,
1638                                     'a field name should be specified')
1639
1640                     claim_id = get_claim_id(field_name)
1641                     claim_id = escaped_claim_id(claim_id)
1642                     result.append(f'@User.{claim_id}')
1643
1644             return ''.join(result)
1645
1646         # Construct the conditional ACE expression.
1647         expression = formatted_claim_expression(expression)
1648
1649         self.assertFalse(claim_map, 'unused claim mapping(s) remain')
1650
1651         # Create an authentication policy that will allow authentication when
1652         # the expression is true, and a second that will deny authentication in
1653         # the same circumstance. By observing the results of authenticating
1654         # against each of these policies in turn, we can determine whether the
1655         # expression evaluates to a True, False, or Unknown value.
1656
1657         allowed_sddl = f'O:SYD:(XA;;CR;;;WD;({expression}))'
1658         denied_sddl = f'O:SYD:(XD;;CR;;;WD;({expression}))(A;;CR;;;WD)'
1659
1660         allowed_policy = self.create_authn_policy(
1661             enforced=True,
1662             user_allowed_from=allowed_sddl)
1663         denied_policy = self.create_authn_policy(
1664             enforced=True,
1665             user_allowed_from=denied_sddl)
1666
1667         # Create a user account assigned to each policy.
1668         allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1669                                         assigned_policy=allowed_policy)
1670         denied_creds = self._get_creds(account_type=self.AccountType.USER,
1671                                        assigned_policy=denied_policy)
1672
1673         # Create a computer account.
1674         mach_creds = self.get_cached_creds(
1675             account_type=self.AccountType.COMPUTER)
1676
1677         def expected_values(val):
1678             if isinstance(val, (str, bytes)):
1679                 return val,
1680
1681             return val
1682
1683         # Fetch the computer account’s TGT.
1684         armor_tgt = self.get_tgt(mach_creds)
1685
1686         if pac_claims:
1687             # Replace the claims in the PAC with our own.
1688             armor_tgt = self.modified_ticket(
1689                 armor_tgt,
1690                 modify_pac_fn=partial(self.set_pac_claims,
1691                                       client_claims=pac_claims,
1692                                       claim_ids=claim_ids),
1693                 checksum_keys=self.get_krbtgt_checksum_key())
1694
1695         # The first or the second authentication request is expected to succeed
1696         # if the outcome is True or False, respectively. An Unknown outcome,
1697         # represented by None, will result in a policy error in either case.
1698         if outcome is True:
1699             allowed_error, denied_error = 0, KDC_ERR_POLICY
1700         elif outcome is False:
1701             allowed_error, denied_error = KDC_ERR_POLICY, 0
1702         elif outcome is None:
1703             allowed_error, denied_error = KDC_ERR_POLICY, KDC_ERR_POLICY
1704         else:
1705             allowed_error, denied_error = outcome, outcome
1706
1707         # Attempt to authenticate and ensure that we observe the expected
1708         # results.
1709         self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1710                       expected_error=allowed_error)
1711         self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1712                       expected_error=denied_error)
1713
1714     def test_rbcd_without_aa_asserted_identity(self):
1715         service_sids = {
1716             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1717             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1718         }
1719
1720         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1721                    service_sids=service_sids,
1722                    code=KDC_ERR_BADOPTION,
1723                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1724                    edata=self.expect_padata_outer)
1725
1726         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1727                    service_sids=service_sids,
1728                    code=KDC_ERR_POLICY,
1729                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1730                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1731                    reason=AuditReason.ACCESS_DENIED,
1732                    edata=self.expect_padata_outer)
1733
1734     def test_rbcd_with_aa_asserted_identity(self):
1735         service_sids = {
1736             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1737             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1738             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1739         }
1740
1741         expected_groups = service_sids | {
1742             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1743         }
1744
1745         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1746                    service_sids=service_sids,
1747                    expected_groups=expected_groups)
1748
1749         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1750                    service_sids=service_sids,
1751                    expected_groups=expected_groups)
1752
1753     def test_rbcd_without_service_asserted_identity(self):
1754         service_sids = {
1755             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1756             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1757         }
1758
1759         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1760                    service_sids=service_sids,
1761                    code=KDC_ERR_BADOPTION,
1762                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1763                    edata=self.expect_padata_outer)
1764
1765         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1766                    service_sids=service_sids,
1767                    code=KDC_ERR_POLICY,
1768                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1769                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1770                    reason=AuditReason.ACCESS_DENIED,
1771                    edata=self.expect_padata_outer)
1772
1773     def test_rbcd_with_service_asserted_identity(self):
1774         service_sids = {
1775             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1776             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1777             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1778         }
1779
1780         expected_groups = {
1781             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1782             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1783             # The Application Authority Asserted Identity SID has replaced the
1784             # Service Asserted Identity SID.
1785             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1786             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1787         }
1788
1789         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1790                    service_sids=service_sids,
1791                    expected_groups=expected_groups)
1792
1793         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1794                    service_sids=service_sids,
1795                    expected_groups=expected_groups)
1796
1797     def test_rbcd_without_claims_valid(self):
1798         service_sids = {
1799             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1800             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1801         }
1802
1803         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1804                    service_sids=service_sids,
1805                    code=KDC_ERR_BADOPTION,
1806                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1807                    edata=self.expect_padata_outer)
1808
1809         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1810                    service_sids=service_sids,
1811                    code=KDC_ERR_POLICY,
1812                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1813                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1814                    reason=AuditReason.ACCESS_DENIED,
1815                    edata=self.expect_padata_outer)
1816
1817     def test_rbcd_with_claims_valid(self):
1818         service_sids = {
1819             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1820             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1821             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1822         }
1823
1824         expected_groups = service_sids | {
1825             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1826         }
1827
1828         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1829                    service_sids=service_sids,
1830                    expected_groups=expected_groups)
1831
1832         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1833                    service_sids=service_sids,
1834                    expected_groups=expected_groups)
1835
1836     def test_rbcd_without_compounded_authentication(self):
1837         service_sids = {
1838             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1839             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1840         }
1841
1842         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1843                    service_sids=service_sids,
1844                    code=KDC_ERR_BADOPTION,
1845                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1846                    edata=self.expect_padata_outer)
1847
1848         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1849                    service_sids=service_sids,
1850                    code=KDC_ERR_POLICY,
1851                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1852                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1853                    reason=AuditReason.ACCESS_DENIED,
1854                    edata=self.expect_padata_outer)
1855
1856     def test_rbcd_with_compounded_authentication(self):
1857         service_sids = {
1858             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1859             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1860             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1861         }
1862
1863         expected_groups = {
1864             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1865             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1866             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1867             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1868         }
1869
1870         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1871                    service_sids=service_sids,
1872                    expected_groups=expected_groups)
1873
1874         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1875                    service_sids=service_sids,
1876                    expected_groups=expected_groups)
1877
1878     def test_rbcd_client_without_aa_asserted_identity(self):
1879         client_sids = {
1880             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1881             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1882         }
1883
1884         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1885                    client_sids=client_sids)
1886
1887         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1888                    client_sids=client_sids)
1889
1890     def test_rbcd_client_with_aa_asserted_identity(self):
1891         client_sids = {
1892             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1893             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1894             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1895         }
1896
1897         self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1898                    client_sids=client_sids,
1899                    expected_groups=client_sids)
1900
1901         self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1902                    client_sids=client_sids,
1903                    expected_groups=client_sids)
1904
1905     def test_rbcd_client_without_service_asserted_identity(self):
1906         client_sids = {
1907             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1908             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1909         }
1910
1911         self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1912                    client_sids=client_sids,
1913                    code=KDC_ERR_BADOPTION,
1914                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1915                    edata=self.expect_padata_outer)
1916
1917         self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1918                    client_sids=client_sids,
1919                    code=KDC_ERR_POLICY,
1920                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1921                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1922                    reason=AuditReason.ACCESS_DENIED,
1923                    edata=self.expect_padata_outer)
1924
1925     def test_rbcd_client_with_service_asserted_identity(self):
1926         client_sids = {
1927             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1928             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1929             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1930         }
1931
1932         self._rbcd(f'Not_Member_of SID({self.service_asserted_identity})',
1933                    client_sids=client_sids,
1934                    expected_groups=client_sids)
1935
1936         self._rbcd(target_policy=f'Not_Member_of SID({self.service_asserted_identity})',
1937                    client_sids=client_sids,
1938                    expected_groups=client_sids)
1939
1940     def test_rbcd_client_without_claims_valid(self):
1941         client_sids = {
1942             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1943             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1944         }
1945
1946         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1947                    client_sids=client_sids)
1948
1949         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1950                    client_sids=client_sids)
1951
1952     def test_rbcd_client_with_claims_valid(self):
1953         client_sids = {
1954             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1955             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1956             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1957         }
1958
1959         self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1960                    client_sids=client_sids,
1961                    expected_groups=client_sids)
1962
1963         self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1964                    client_sids=client_sids,
1965                    expected_groups=client_sids)
1966
1967     def test_rbcd_client_without_compounded_authentication(self):
1968         client_sids = {
1969             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1970             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1971         }
1972
1973         self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1974                    client_sids=client_sids,
1975                    code=KDC_ERR_BADOPTION,
1976                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1977                    edata=self.expect_padata_outer)
1978
1979         self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1980                    client_sids=client_sids,
1981                    code=KDC_ERR_POLICY,
1982                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1983                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1984                    reason=AuditReason.ACCESS_DENIED,
1985                    edata=self.expect_padata_outer)
1986
1987     def test_rbcd_client_with_compounded_authentication(self):
1988         client_sids = {
1989             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1990             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1991             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1992         }
1993
1994         self._rbcd(f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1995                    client_sids=client_sids,
1996                    expected_groups=client_sids)
1997
1998         self._rbcd(target_policy=f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1999                    client_sids=client_sids,
2000                    expected_groups=client_sids)
2001
2002     def test_rbcd_device_without_aa_asserted_identity(self):
2003         device_sids = {
2004             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2005             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2006         }
2007
2008         self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2009                    device_sids=device_sids,
2010                    code=KDC_ERR_BADOPTION,
2011                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2012                    edata=self.expect_padata_outer)
2013
2014         self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2015                    device_sids=device_sids,
2016                    code=KDC_ERR_POLICY,
2017                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2018                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2019                    reason=AuditReason.ACCESS_DENIED,
2020                    edata=self.expect_padata_outer)
2021
2022     def test_rbcd_device_without_aa_asserted_identity_not_memberof(self):
2023         device_sids = {
2024             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2025             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2026         }
2027
2028         self._rbcd(f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2029                    device_sids=device_sids)
2030
2031         self._rbcd(target_policy=f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2032                    device_sids=device_sids)
2033
2034     def test_rbcd_device_with_aa_asserted_identity(self):
2035         device_sids = {
2036             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2037             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2038             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2039         }
2040
2041         self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2042                    device_sids=device_sids)
2043
2044         self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2045                    device_sids=device_sids)
2046
2047     def test_rbcd_device_without_service_asserted_identity(self):
2048         device_sids = {
2049             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2050             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2051         }
2052
2053         self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2054                    device_sids=device_sids,
2055                    code=KDC_ERR_BADOPTION,
2056                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2057                    edata=self.expect_padata_outer)
2058
2059         self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2060                    device_sids=device_sids,
2061                    code=KDC_ERR_POLICY,
2062                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2063                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2064                    reason=AuditReason.ACCESS_DENIED,
2065                    edata=self.expect_padata_outer)
2066
2067     def test_rbcd_device_with_service_asserted_identity(self):
2068         device_sids = {
2069             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2070             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2071             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2072         }
2073
2074         self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2075                    device_sids=device_sids)
2076
2077         self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2078                    device_sids=device_sids)
2079
2080     def test_rbcd_device_without_claims_valid(self):
2081         device_sids = {
2082             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2083             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2084         }
2085
2086         self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2087                    device_sids=device_sids,
2088                    code=KDC_ERR_BADOPTION,
2089                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2090                    edata=self.expect_padata_outer)
2091
2092         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2093                    device_sids=device_sids,
2094                    code=KDC_ERR_POLICY,
2095                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2096                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2097                    reason=AuditReason.ACCESS_DENIED,
2098                    edata=self.expect_padata_outer)
2099
2100     def test_rbcd_device_with_claims_valid(self):
2101         device_sids = {
2102             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2103             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2104             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2105         }
2106
2107         self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2108                    device_sids=device_sids)
2109
2110         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2111                    device_sids=device_sids)
2112
2113     def test_rbcd_device_without_compounded_authentication(self):
2114         device_sids = {
2115             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2116             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2117         }
2118
2119         self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2120                    device_sids=device_sids,
2121                    code=KDC_ERR_BADOPTION,
2122                    status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2123                    edata=self.expect_padata_outer)
2124
2125         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2126                    device_sids=device_sids,
2127                    code=KDC_ERR_POLICY,
2128                    status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2129                    event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2130                    reason=AuditReason.ACCESS_DENIED,
2131                    edata=self.expect_padata_outer)
2132
2133     def test_rbcd_device_with_compounded_authentication(self):
2134         device_sids = {
2135             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2136             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2137             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2138         }
2139
2140         self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2141                    device_sids=device_sids)
2142
2143         self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2144                    device_sids=device_sids)
2145
2146     def test_rbcd(self):
2147         self._rbcd('Member_of SID({service_sid})')
2148
2149     def test_rbcd_device_from_rodc(self):
2150         self._rbcd('Member_of SID({service_sid})',
2151                    device_from_rodc=True,
2152                    code=(0, CRASHES_WINDOWS))
2153
2154     def test_rbcd_service_from_rodc(self):
2155         self._rbcd('Member_of SID({service_sid})',
2156                    service_from_rodc=True)
2157
2158     def test_rbcd_device_and_service_from_rodc(self):
2159         self._rbcd('Member_of SID({service_sid})',
2160                    service_from_rodc=True,
2161                    device_from_rodc=True,
2162                    code=(0, CRASHES_WINDOWS))
2163
2164     def test_rbcd_client_from_rodc(self):
2165         self._rbcd('Member_of SID({service_sid})',
2166                    client_from_rodc=True)
2167
2168     def test_rbcd_client_and_device_from_rodc(self):
2169         self._rbcd('Member_of SID({service_sid})',
2170                    client_from_rodc=True,
2171                    device_from_rodc=True,
2172                    code=(0, CRASHES_WINDOWS))
2173
2174     def test_rbcd_client_and_service_from_rodc(self):
2175         self._rbcd('Member_of SID({service_sid})',
2176                    client_from_rodc=True,
2177                    service_from_rodc=True)
2178
2179     def test_rbcd_all_from_rodc(self):
2180         self._rbcd('Member_of SID({service_sid})',
2181                    client_from_rodc=True,
2182                    service_from_rodc=True,
2183                    device_from_rodc=True,
2184                    code=(0, CRASHES_WINDOWS))
2185
2186     def test_delegating_proxy_in_world_group_rbcd(self):
2187         self._check_delegating_proxy_in_group_rbcd(security.SID_WORLD)
2188
2189     def test_delegating_proxy_in_network_group_rbcd(self):
2190         self._check_delegating_proxy_not_in_group_rbcd(security.SID_NT_NETWORK)
2191
2192     def test_delegating_proxy_in_authenticated_users_rbcd(self):
2193         self._check_delegating_proxy_in_group_rbcd(
2194             security.SID_NT_AUTHENTICATED_USERS)
2195
2196     def test_delegating_proxy_in_aa_asserted_identity_rbcd(self):
2197         self._check_delegating_proxy_in_group_rbcd(
2198             security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
2199
2200     def test_delegating_proxy_in_service_asserted_identity_rbcd(self):
2201         self._check_delegating_proxy_not_in_group_rbcd(
2202             security.SID_SERVICE_ASSERTED_IDENTITY)
2203
2204     def test_delegating_proxy_in_compounded_authentication_rbcd(self):
2205         self._check_delegating_proxy_not_in_group_rbcd(
2206             security.SID_COMPOUNDED_AUTHENTICATION)
2207
2208     def test_delegating_proxy_in_claims_valid_rbcd(self):
2209         self._check_delegating_proxy_in_group_rbcd(security.SID_CLAIMS_VALID)
2210
2211     def test_device_in_world_group_rbcd(self):
2212         self._check_device_in_group_rbcd(security.SID_WORLD)
2213
2214     def test_device_in_network_group_rbcd(self):
2215         self._check_device_not_in_group_rbcd(security.SID_NT_NETWORK)
2216
2217     def test_device_in_authenticated_users_rbcd(self):
2218         self._check_device_in_group_rbcd(security.SID_NT_AUTHENTICATED_USERS)
2219
2220     def test_device_in_aa_asserted_identity_rbcd(self):
2221         self._check_device_in_group_rbcd(
2222             security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
2223
2224     def test_device_in_service_asserted_identity_rbcd(self):
2225         self._check_device_not_in_group_rbcd(
2226             security.SID_SERVICE_ASSERTED_IDENTITY)
2227
2228     def test_device_in_compounded_authentication_rbcd(self):
2229         self._check_device_not_in_group_rbcd(
2230             security.SID_COMPOUNDED_AUTHENTICATION)
2231
2232     def test_device_in_claims_valid_rbcd(self):
2233         self._check_device_in_group_rbcd(security.SID_CLAIMS_VALID)
2234
2235     def _check_delegating_proxy_in_group_rbcd(self, group):
2236         self._check_membership_rbcd(group, expect_in_group=True)
2237
2238     def _check_delegating_proxy_not_in_group_rbcd(self, group):
2239         self._check_membership_rbcd(group, expect_in_group=False)
2240
2241     def _check_device_in_group_rbcd(self, group):
2242         self._check_membership_rbcd(group, expect_in_group=True, device=True)
2243
2244     def _check_device_not_in_group_rbcd(self, group):
2245         self._check_membership_rbcd(group, expect_in_group=False, device=True)
2246
2247     def _check_membership_rbcd(self,
2248                                group,
2249                                *,
2250                                expect_in_group,
2251                                device=False):
2252         """Test that authentication succeeds or fails when the delegating proxy
2253         is required to belong to a certain group.
2254         """
2255
2256         sddl_op = 'Device_Member_of' if device else 'Member_of'
2257
2258         samdb = self.get_samdb()
2259         functional_level = self.get_domain_functional_level(samdb)
2260
2261         if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2262             self.skipTest('RBCD requires FL2008')
2263
2264         # Create a machine account with which to perform FAST.
2265         mach_creds = self.get_cached_creds(
2266             account_type=self.AccountType.COMPUTER,
2267             opts={'id': 'device'})
2268         mach_tgt = self.get_tgt(mach_creds)
2269
2270         # Create a user account.
2271         client_creds = self._get_creds(account_type=self.AccountType.USER)
2272
2273         client_tkt_options = 'forwardable'
2274         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2275         client_tgt = self.get_tgt(client_creds,
2276                                   kdc_options=client_tkt_options,
2277                                   expected_flags=expected_flags)
2278
2279         client_sid = client_creds.get_sid()
2280
2281         client_username = client_creds.get_username()
2282         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2283                                                  names=[client_username])
2284
2285         service_creds = self.get_cached_creds(
2286             account_type=self.AccountType.COMPUTER,
2287             opts={'id': 'service'})
2288         service_tgt = self.get_tgt(service_creds)
2289
2290         client_service_tkt = self.get_service_ticket(
2291             client_tgt,
2292             service_creds,
2293             kdc_options=client_tkt_options,
2294             expected_flags=expected_flags)
2295
2296         domain_sid_str = samdb.get_domain_sid()
2297         domain_sid = security.dom_sid(domain_sid_str)
2298
2299         # Require the principal to belong to a certain group.
2300         in_group_sddl = self.allow_if(f'{sddl_op} {{SID({group})}}')
2301         in_group_descriptor = security.descriptor.from_sddl(in_group_sddl,
2302                                                             domain_sid)
2303
2304         # Create a target account that allows RBCD if the principal belongs to
2305         # the group.
2306         in_group_target_creds = self.get_cached_creds(
2307             account_type=self.AccountType.COMPUTER,
2308             opts={
2309                 'additional_details': (
2310                     ('msDS-AllowedToActOnBehalfOfOtherIdentity',
2311                      ndr_pack(in_group_descriptor)),
2312                 ),
2313             })
2314
2315         kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2316
2317         in_group_target_key = self.TicketDecryptionKey_from_creds(
2318             in_group_target_creds)
2319         in_group_target_etypes = in_group_target_creds.tgs_supported_enctypes
2320
2321         service_name = service_creds.get_username()
2322         if service_name[-1] == '$':
2323             service_name = service_name[:-1]
2324         expected_transited_services = [
2325             f'host/{service_name}@{service_creds.get_realm()}'
2326         ]
2327
2328         pac_options = '1001'  # supports claims, RBCD
2329
2330         success_result = 0, None, None
2331         failure_result = (
2332             KDC_ERR_BADOPTION,
2333             ntstatus.NT_STATUS_UNSUCCESSFUL,
2334             self.expect_padata_outer,
2335         )
2336
2337         code, status, expect_edata = (success_result if expect_in_group
2338                                       else failure_result)
2339
2340         # Test whether obtaining a service ticket with RBCD is allowed.
2341         self._tgs_req(service_tgt,
2342                       code,
2343                       service_creds,
2344                       in_group_target_creds,
2345                       armor_tgt=mach_tgt,
2346                       kdc_options=kdc_options,
2347                       pac_options=pac_options,
2348                       expected_cname=client_cname,
2349                       expected_account_name=client_username,
2350                       additional_ticket=client_service_tkt,
2351                       decryption_key=in_group_target_key,
2352                       expected_sid=client_sid,
2353                       expected_supported_etypes=in_group_target_etypes,
2354                       expected_proxy_target=in_group_target_creds.get_spn(),
2355                       expected_transited_services=expected_transited_services,
2356                       expected_status=status,
2357                       expect_edata=expect_edata)
2358
2359         effective_client_creds = service_creds if code else client_creds
2360         self.check_tgs_log(effective_client_creds, in_group_target_creds,
2361                            checked_creds=service_creds,
2362                            status=status)
2363
2364         # Require the principal not to belong to a certain group.
2365         not_in_group_sddl = self.allow_if(f'Not_{sddl_op} {{SID({group})}}')
2366         not_in_group_descriptor = security.descriptor.from_sddl(
2367             not_in_group_sddl, domain_sid)
2368
2369         # Create a target account that allows RBCD if the principal does not
2370         # belong to the group.
2371         not_in_group_target_creds = self.get_cached_creds(
2372             account_type=self.AccountType.COMPUTER,
2373             opts={
2374                 'additional_details': (
2375                     ('msDS-AllowedToActOnBehalfOfOtherIdentity',
2376                      ndr_pack(not_in_group_descriptor)),
2377                 ),
2378             })
2379
2380         not_in_group_target_key = self.TicketDecryptionKey_from_creds(
2381             not_in_group_target_creds)
2382         not_in_group_target_etypes = (
2383             not_in_group_target_creds.tgs_supported_enctypes)
2384
2385         code, status, expect_edata = (failure_result if expect_in_group
2386                                       else success_result)
2387
2388         # Test whether obtaining a service ticket with RBCD is allowed.
2389         self._tgs_req(service_tgt,
2390                       code,
2391                       service_creds,
2392                       not_in_group_target_creds,
2393                       armor_tgt=mach_tgt,
2394                       kdc_options=kdc_options,
2395                       pac_options=pac_options,
2396                       expected_cname=client_cname,
2397                       expected_account_name=client_username,
2398                       additional_ticket=client_service_tkt,
2399                       decryption_key=not_in_group_target_key,
2400                       expected_sid=client_sid,
2401                       expected_supported_etypes=not_in_group_target_etypes,
2402                       expected_proxy_target=not_in_group_target_creds.get_spn(),
2403                       expected_transited_services=expected_transited_services,
2404                       expected_status=status,
2405                       expect_edata=expect_edata)
2406
2407         effective_client_creds = service_creds if code else client_creds
2408         self.check_tgs_log(effective_client_creds, not_in_group_target_creds,
2409                            checked_creds=service_creds,
2410                            status=status)
2411
2412     def _rbcd(self,
2413               rbcd_expression=None,
2414               *,
2415               code=0,
2416               status=None,
2417               event=AuditEvent.OK,
2418               reason=AuditReason.NONE,
2419               edata=False,
2420               target_policy=None,
2421               client_from_rodc=False,
2422               service_from_rodc=False,
2423               device_from_rodc=False,
2424               client_sids=None,
2425               client_claims=None,
2426               service_sids=None,
2427               service_claims=None,
2428               device_sids=None,
2429               device_claims=None,
2430               expected_groups=None,
2431               expected_claims=None):
2432         try:
2433             code, crashes_windows = code
2434             self.assertIs(crashes_windows, CRASHES_WINDOWS)
2435             if not self.crash_windows:
2436                 self.skipTest('test crashes Windows servers')
2437         except TypeError:
2438             self.assertIsNot(code, CRASHES_WINDOWS)
2439
2440         samdb = self.get_samdb()
2441         functional_level = self.get_domain_functional_level(samdb)
2442
2443         if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2444             self.skipTest('RBCD requires FL2008')
2445
2446         domain_sid_str = samdb.get_domain_sid()
2447         domain_sid = security.dom_sid(domain_sid_str)
2448
2449         client_creds = self.get_cached_creds(
2450             account_type=self.AccountType.USER,
2451             opts={
2452                 'allowed_replication_mock': client_from_rodc,
2453                 'revealed_to_mock_rodc': client_from_rodc,
2454             })
2455         client_sid = client_creds.get_sid()
2456
2457         client_username = client_creds.get_username()
2458         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2459                                                  names=[client_username])
2460
2461         client_tkt_options = 'forwardable'
2462         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2463
2464         checksum_key = self.get_krbtgt_checksum_key()
2465
2466         if client_from_rodc or service_from_rodc or device_from_rodc:
2467             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2468             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2469             rodc_checksum_key = {
2470                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2471             }
2472
2473         client_tgt = self.get_tgt(client_creds,
2474                                   kdc_options=client_tkt_options,
2475                                   expected_flags=expected_flags)
2476
2477         # Create a machine account with which to perform FAST.
2478         mach_creds = self.get_cached_creds(
2479             account_type=self.AccountType.COMPUTER,
2480             opts={
2481                 'allowed_replication_mock': device_from_rodc,
2482                 'revealed_to_mock_rodc': device_from_rodc,
2483             })
2484         mach_tgt = self.get_tgt(mach_creds)
2485         device_modify_pac_fn = []
2486         if device_sids is not None:
2487             device_modify_pac_fn.append(partial(self.set_pac_sids,
2488                                                 new_sids=device_sids))
2489         if device_claims is not None:
2490             device_modify_pac_fn.append(partial(self.set_pac_claims,
2491                                                 client_claims=device_claims))
2492         mach_tgt = self.modified_ticket(
2493             mach_tgt,
2494             modify_pac_fn=device_modify_pac_fn,
2495             new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2496             checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2497
2498         service_creds = self.get_cached_creds(
2499             account_type=self.AccountType.COMPUTER,
2500             opts={
2501                 'id': 1,
2502                 'allowed_replication_mock': service_from_rodc,
2503                 'revealed_to_mock_rodc': service_from_rodc,
2504             })
2505         service_tgt = self.get_tgt(service_creds)
2506
2507         service_modify_pac_fn = []
2508         if service_sids is not None:
2509             service_modify_pac_fn.append(partial(self.set_pac_sids,
2510                                                  new_sids=service_sids))
2511         if service_claims is not None:
2512             service_modify_pac_fn.append(partial(self.set_pac_claims,
2513                                                  client_claims=service_claims))
2514         service_tgt = self.modified_ticket(
2515             service_tgt,
2516             modify_pac_fn=service_modify_pac_fn,
2517             new_ticket_key=rodc_krbtgt_key if service_from_rodc else None,
2518             checksum_keys=rodc_checksum_key if service_from_rodc else checksum_key)
2519
2520         if target_policy is None:
2521             policy = None
2522             assigned_policy = None
2523         else:
2524             sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(service_sid=service_creds.get_sid())}))'
2525             policy = self.create_authn_policy(enforced=True,
2526                                               computer_allowed_to=sddl)
2527             assigned_policy = str(policy.dn)
2528
2529         if rbcd_expression is not None:
2530             sddl = f'O:SYD:(XA;;CR;;;WD;({rbcd_expression.format(service_sid=service_creds.get_sid())}))'
2531         else:
2532             sddl = 'O:SYD:(A;;CR;;;WD)'
2533         descriptor = security.descriptor.from_sddl(sddl, domain_sid)
2534         descriptor = ndr_pack(descriptor)
2535
2536         # Create a target account with the assigned policy.
2537         target_creds = self.get_cached_creds(
2538             account_type=self.AccountType.COMPUTER,
2539             opts={
2540                 'assigned_policy': assigned_policy,
2541                 'additional_details': (
2542                     ('msDS-AllowedToActOnBehalfOfOtherIdentity', descriptor),
2543                 ),
2544             })
2545
2546         client_service_tkt = self.get_service_ticket(
2547             client_tgt,
2548             service_creds,
2549             kdc_options=client_tkt_options,
2550             expected_flags=expected_flags)
2551         client_modify_pac_fn = []
2552         if client_sids is not None:
2553             client_modify_pac_fn.append(partial(self.set_pac_sids,
2554                                                 new_sids=client_sids))
2555         if client_claims is not None:
2556             client_modify_pac_fn.append(partial(self.set_pac_claims,
2557                                                 client_claims=client_claims))
2558         client_service_tkt = self.modified_ticket(client_service_tkt,
2559                                                   modify_pac_fn=client_modify_pac_fn,
2560                                                   checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2561
2562         kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2563
2564         target_decryption_key = self.TicketDecryptionKey_from_creds(
2565             target_creds)
2566         target_etypes = target_creds.tgs_supported_enctypes
2567
2568         service_name = service_creds.get_username()
2569         if service_name[-1] == '$':
2570             service_name = service_name[:-1]
2571         expected_transited_services = [
2572             f'host/{service_name}@{service_creds.get_realm()}'
2573         ]
2574
2575         expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2576
2577         # Show that obtaining a service ticket with RBCD is allowed.
2578         self._tgs_req(service_tgt, code, service_creds, target_creds,
2579                       armor_tgt=mach_tgt,
2580                       kdc_options=kdc_options,
2581                       pac_options='1001',  # supports claims, RBCD
2582                       expected_cname=client_cname,
2583                       expected_account_name=client_username,
2584                       additional_ticket=client_service_tkt,
2585                       decryption_key=target_decryption_key,
2586                       expected_sid=client_sid,
2587                       expected_groups=expected_groups,
2588                       expect_client_claims=bool(expected_claims) or None,
2589                       expected_client_claims=expected_claims,
2590                       expected_supported_etypes=target_etypes,
2591                       expected_proxy_target=target_creds.get_spn(),
2592                       expected_transited_services=expected_transited_services,
2593                       expected_status=status,
2594                       expect_edata=edata)
2595
2596         if code:
2597             effective_client_creds = service_creds
2598         else:
2599             effective_client_creds = client_creds
2600
2601         self.check_tgs_log(effective_client_creds, target_creds,
2602                            policy=policy,
2603                            checked_creds=service_creds,
2604                            status=status,
2605                            event=event,
2606                            reason=reason)
2607
2608     def test_tgs_claims_valid_missing(self):
2609         """Test that the Claims Valid SID is not added to the PAC when
2610         performing a TGS‐REQ."""
2611         client_sids = {
2612             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2613             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2614             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2615         }
2616
2617         self._tgs(use_fast=False,
2618                   client_sids=client_sids,
2619                   expected_groups=client_sids)
2620
2621     def test_tgs_claims_valid_missing_from_rodc(self):
2622         """Test that the Claims Valid SID *is* added to the PAC when
2623         performing a TGS‐REQ with an RODC‐issued TGT."""
2624         client_sids = {
2625             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2626             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2627             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2628         }
2629
2630         expected_groups = client_sids | {
2631             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2632         }
2633
2634         self._tgs(use_fast=False,
2635                   client_from_rodc=True,
2636                   client_sids=client_sids,
2637                   expected_groups=expected_groups)
2638
2639     def test_tgs_aa_asserted_identity(self):
2640         """Test performing a TGS‐REQ with the Authentication Identity Asserted
2641         Identity SID present."""
2642         client_sids = {
2643             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2644             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2645             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2646             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2647         }
2648
2649         self._tgs(use_fast=False,
2650                   client_sids=client_sids,
2651                   expected_groups=client_sids)
2652
2653     def test_tgs_aa_asserted_identity_no_attrs(self):
2654         """Test performing a TGS‐REQ with the Authentication Identity Asserted
2655         Identity SID present, albeit without any attributes."""
2656         client_sids = {
2657             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2658             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2659             # Put the Asserted Identity SID in the PAC without any flags set.
2660             (self.aa_asserted_identity, SidType.EXTRA_SID, 0),
2661             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2662         }
2663
2664         self._tgs(use_fast=False,
2665                   client_sids=client_sids,
2666                   expected_groups=client_sids)
2667
2668     def test_tgs_aa_asserted_identity_from_rodc(self):
2669         """Test that the Authentication Identity Asserted Identity SID in an
2670         RODC‐issued PAC is preserved when performing a TGS‐REQ."""
2671         client_sids = {
2672             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2673             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2674             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2675             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2676         }
2677
2678         self._tgs(use_fast=False,
2679                   client_from_rodc=True,
2680                   client_sids=client_sids,
2681                   expected_groups=client_sids)
2682
2683     def test_tgs_aa_asserted_identity_from_rodc_no_attrs_from_rodc(self):
2684         """Test that the Authentication Identity Asserted Identity SID without
2685         attributes in an RODC‐issued PAC is preserved when performing a
2686         TGS‐REQ."""
2687         client_sids = {
2688             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2689             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2690             # Put the Asserted Identity SID in the PAC without any flags set.
2691             (self.aa_asserted_identity, SidType.EXTRA_SID, 0),
2692             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2693         }
2694
2695         expected_groups = {
2696             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2697             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2698             # The SID in the resulting PAC has the default attributes.
2699             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2700             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2701         }
2702
2703         self._tgs(use_fast=False,
2704                   client_from_rodc=True,
2705                   client_sids=client_sids,
2706                   expected_groups=expected_groups)
2707
2708     def test_tgs_compound_authentication(self):
2709         """Test performing a TGS‐REQ with the Compounded Authentication SID
2710         present."""
2711         client_sids = {
2712             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2713             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2714             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2715             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2716         }
2717
2718         self._tgs(use_fast=False,
2719                   client_sids=client_sids,
2720                   expected_groups=client_sids)
2721
2722     def test_tgs_compound_authentication_from_rodc(self):
2723         """Test that the Compounded Authentication SID in an
2724         RODC‐issued PAC is not preserved when performing a TGS‐REQ."""
2725         client_sids = {
2726             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2727             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2728             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2729             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2730         }
2731
2732         expected_groups = {
2733             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2734             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2735             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2736         }
2737
2738         self._tgs(use_fast=False,
2739                   client_from_rodc=True,
2740                   client_sids=client_sids,
2741                   expected_groups=expected_groups)
2742
2743     def test_tgs_asserted_identity_missing(self):
2744         """Test that the Authentication Identity Asserted Identity SID is not
2745         added to the PAC when performing a TGS‐REQ."""
2746         client_sids = {
2747             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2748             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2749             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2750         }
2751
2752         self._tgs(use_fast=False,
2753                   client_sids=client_sids,
2754                   expected_groups=client_sids)
2755
2756     def test_tgs_asserted_identity_missing_from_rodc(self):
2757         """Test that the Authentication Identity Asserted Identity SID is not
2758         added to an RODC‐issued PAC when performing a TGS‐REQ."""
2759         client_sids = {
2760             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2761             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2762             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2763         }
2764
2765         self._tgs(use_fast=False,
2766                   client_from_rodc=True,
2767                   client_sids=client_sids,
2768                   expected_groups=client_sids)
2769
2770     def test_tgs_service_asserted_identity(self):
2771         """Test performing a TGS‐REQ with the Service Asserted Identity SID
2772         present."""
2773         client_sids = {
2774             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2775             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2776             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2777             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2778         }
2779
2780         self._tgs(use_fast=False,
2781                   client_sids=client_sids,
2782                   expected_groups=client_sids)
2783
2784     def test_tgs_service_asserted_identity_from_rodc(self):
2785         """Test that the Service Asserted Identity SID in an
2786         RODC‐issued PAC is not preserved when performing a TGS‐REQ."""
2787         client_sids = {
2788             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2789             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2790             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2791             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2792         }
2793
2794         expected_groups = {
2795             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2796             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2797             # Don’t expect the Service Asserted Identity SID.
2798             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2799         }
2800
2801         self._tgs(use_fast=False,
2802                   client_from_rodc=True,
2803                   client_sids=client_sids,
2804                   expected_groups=expected_groups)
2805
2806     def test_tgs_without_aa_asserted_identity(self):
2807         client_sids = {
2808             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2809             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2810         }
2811
2812         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2813                   client_sids=client_sids,
2814                   code=KDC_ERR_POLICY,
2815                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2816                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2817                   reason=AuditReason.ACCESS_DENIED,
2818                   edata=self.expect_padata_outer)
2819
2820     def test_tgs_without_aa_asserted_identity_client_from_rodc(self):
2821         client_sids = {
2822             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2823             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2824         }
2825
2826         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2827                   client_from_rodc=True,
2828                   client_sids=client_sids,
2829                   code=KDC_ERR_POLICY,
2830                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2831                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2832                   reason=AuditReason.ACCESS_DENIED,
2833                   edata=self.expect_padata_outer)
2834
2835     def test_tgs_without_aa_asserted_identity_device_from_rodc(self):
2836         client_sids = {
2837             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2838             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2839         }
2840
2841         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2842                   device_from_rodc=True,
2843                   client_sids=client_sids,
2844                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2845                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2846                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2847                   reason=AuditReason.ACCESS_DENIED,
2848                   edata=self.expect_padata_outer)
2849
2850     def test_tgs_without_aa_asserted_identity_both_from_rodc(self):
2851         client_sids = {
2852             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2853             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2854         }
2855
2856         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2857                   client_from_rodc=True,
2858                   device_from_rodc=True,
2859                   client_sids=client_sids,
2860                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2861                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2862                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2863                   reason=AuditReason.ACCESS_DENIED,
2864                   edata=self.expect_padata_outer)
2865
2866     def test_tgs_with_aa_asserted_identity(self):
2867         client_sids = {
2868             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2869             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2870             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2871         }
2872
2873         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2874                   client_sids=client_sids,
2875                   expected_groups=client_sids)
2876
2877     def test_tgs_with_aa_asserted_identity_client_from_rodc(self):
2878         client_sids = {
2879             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2880             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2881             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2882         }
2883
2884         expected_groups = client_sids | {
2885             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2886         }
2887
2888         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2889                   client_from_rodc=True,
2890                   client_sids=client_sids,
2891                   expected_groups=expected_groups)
2892
2893     def test_tgs_with_aa_asserted_identity_device_from_rodc(self):
2894         client_sids = {
2895             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2896             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2897             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2898         }
2899
2900         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2901                   device_from_rodc=True,
2902                   client_sids=client_sids,
2903                   expected_groups=client_sids,
2904                   code=(0, CRASHES_WINDOWS))
2905
2906     def test_tgs_with_aa_asserted_identity_both_from_rodc(self):
2907         client_sids = {
2908             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2909             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2910             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2911         }
2912
2913         expected_groups = client_sids | {
2914             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2915         }
2916
2917         self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2918                   client_from_rodc=True,
2919                   device_from_rodc=True,
2920                   client_sids=client_sids,
2921                   expected_groups=expected_groups,
2922                   code=(0, CRASHES_WINDOWS))
2923
2924     def test_tgs_without_service_asserted_identity(self):
2925         client_sids = {
2926             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2927             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2928         }
2929
2930         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2931                   client_sids=client_sids,
2932                   code=KDC_ERR_POLICY,
2933                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2934                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2935                   reason=AuditReason.ACCESS_DENIED,
2936                   edata=self.expect_padata_outer)
2937
2938     def test_tgs_without_service_asserted_identity_client_from_rodc(self):
2939         client_sids = {
2940             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2941             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2942         }
2943
2944         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2945                   client_from_rodc=True,
2946                   client_sids=client_sids,
2947                   code=KDC_ERR_POLICY,
2948                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2949                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2950                   reason=AuditReason.ACCESS_DENIED,
2951                   edata=self.expect_padata_outer)
2952
2953     def test_tgs_without_service_asserted_identity_device_from_rodc(self):
2954         client_sids = {
2955             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2956             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2957         }
2958
2959         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2960                   device_from_rodc=True,
2961                   client_sids=client_sids,
2962                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2963                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2964                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2965                   reason=AuditReason.ACCESS_DENIED,
2966                   edata=self.expect_padata_outer)
2967
2968     def test_tgs_without_service_asserted_identity_both_from_rodc(self):
2969         client_sids = {
2970             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2971             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2972         }
2973
2974         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2975                   client_from_rodc=True,
2976                   device_from_rodc=True,
2977                   client_sids=client_sids,
2978                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2979                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2980                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2981                   reason=AuditReason.ACCESS_DENIED,
2982                   edata=self.expect_padata_outer)
2983
2984     def test_tgs_with_service_asserted_identity(self):
2985         client_sids = {
2986             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2987             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2988             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2989         }
2990
2991         self._tgs(f'Member_of SID({self.service_asserted_identity})',
2992                   client_sids=client_sids,
2993                   expected_groups=client_sids)
2994
2995     def test_tgs_with_service_asserted_identity_client_from_rodc(self):
2996         client_sids = {
2997             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2998             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2999             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3000         }
3001
3002         self._tgs(f'Member_of SID({self.service_asserted_identity})',
3003                   client_from_rodc=True,
3004                   client_sids=client_sids,
3005                   code=KDC_ERR_POLICY,
3006                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3007                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3008                   reason=AuditReason.ACCESS_DENIED,
3009                   edata=self.expect_padata_outer)
3010
3011     def test_tgs_with_service_asserted_identity_device_from_rodc(self):
3012         client_sids = {
3013             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3014             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3015             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3016         }
3017
3018         self._tgs(f'Member_of SID({self.service_asserted_identity})',
3019                   device_from_rodc=True,
3020                   client_sids=client_sids,
3021                   expected_groups=client_sids,
3022                   code=(0, CRASHES_WINDOWS))
3023
3024     def test_tgs_with_service_asserted_identity_both_from_rodc(self):
3025         client_sids = {
3026             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3027             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3028             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3029         }
3030
3031         self._tgs(f'Member_of SID({self.service_asserted_identity})',
3032                   client_from_rodc=True,
3033                   device_from_rodc=True,
3034                   client_sids=client_sids,
3035                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
3036                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3037                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3038                   reason=AuditReason.ACCESS_DENIED,
3039                   edata=self.expect_padata_outer)
3040
3041     def test_tgs_without_claims_valid(self):
3042         client_sids = {
3043             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3044             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3045         }
3046
3047         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3048                   client_sids=client_sids,
3049                   code=KDC_ERR_POLICY,
3050                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3051                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3052                   reason=AuditReason.ACCESS_DENIED,
3053                   edata=self.expect_padata_outer)
3054
3055     def test_tgs_without_claims_valid_client_from_rodc(self):
3056         client_sids = {
3057             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3058             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3059         }
3060
3061         expected_groups = client_sids | {
3062             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3063         }
3064
3065         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3066                   client_from_rodc=True,
3067                   client_sids=client_sids,
3068                   expected_groups=expected_groups)
3069
3070     def test_tgs_without_claims_valid_device_from_rodc(self):
3071         client_sids = {
3072             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3073             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3074         }
3075
3076         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3077                   device_from_rodc=True,
3078                   client_sids=client_sids,
3079                   code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
3080                   status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3081                   event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3082                   reason=AuditReason.ACCESS_DENIED,
3083                   edata=self.expect_padata_outer)
3084
3085     def test_tgs_without_claims_valid_both_from_rodc(self):
3086         client_sids = {
3087             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3088             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3089         }
3090
3091         expected_groups = client_sids | {
3092             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3093         }
3094
3095         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3096                   client_from_rodc=True,
3097                   device_from_rodc=True,
3098                   client_sids=client_sids,
3099                   expected_groups=expected_groups,
3100                   code=(0, CRASHES_WINDOWS))
3101
3102     def test_tgs_with_claims_valid(self):
3103         client_sids = {
3104             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3105             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3106             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3107         }
3108
3109         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3110                   client_sids=client_sids,
3111                   expected_groups=client_sids)
3112
3113     def test_tgs_with_claims_valid_client_from_rodc(self):
3114         client_sids = {
3115             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3116             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3117             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3118         }
3119
3120         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3121                   client_from_rodc=True,
3122                   client_sids=client_sids,
3123                   expected_groups=client_sids)
3124
3125     def test_tgs_with_claims_valid_device_from_rodc(self):
3126         client_sids = {
3127             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3128             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3129             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3130         }
3131
3132         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3133                   device_from_rodc=True,
3134                   client_sids=client_sids,
3135                   expected_groups=client_sids,
3136                   code=(0, CRASHES_WINDOWS))
3137
3138     def test_tgs_with_claims_valid_both_from_rodc(self):
3139         client_sids = {
3140             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3141             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3142             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3143         }
3144
3145         self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3146                   client_from_rodc=True,
3147                   device_from_rodc=True,
3148                   client_sids=client_sids,
3149                   expected_groups=client_sids,
3150                   code=(0, CRASHES_WINDOWS))
3151
3152     def _tgs(self,
3153              target_policy=None,
3154              *,
3155              code=0,
3156              event=AuditEvent.OK,
3157              reason=AuditReason.NONE,
3158              status=None,
3159              edata=False,
3160              use_fast=True,
3161              client_from_rodc=None,
3162              device_from_rodc=None,
3163              client_sids=None,
3164              client_claims=None,
3165              device_sids=None,
3166              device_claims=None,
3167              expected_groups=None,
3168              expected_claims=None):
3169         try:
3170             code, crashes_windows = code
3171             self.assertIs(crashes_windows, CRASHES_WINDOWS)
3172             if not self.crash_windows:
3173                 self.skipTest('test crashes Windows servers')
3174         except TypeError:
3175             self.assertIsNot(code, CRASHES_WINDOWS)
3176
3177         if not use_fast:
3178             self.assertIsNone(device_from_rodc)
3179             self.assertIsNone(device_sids)
3180             self.assertIsNone(device_claims)
3181
3182         if client_from_rodc is None:
3183             client_from_rodc = False
3184
3185         if device_from_rodc is None:
3186             device_from_rodc = False
3187
3188         client_creds = self.get_cached_creds(
3189             account_type=self.AccountType.USER,
3190             opts={
3191                 'allowed_replication_mock': client_from_rodc,
3192                 'revealed_to_mock_rodc': client_from_rodc,
3193             })
3194         client_sid = client_creds.get_sid()
3195
3196         client_username = client_creds.get_username()
3197         client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3198                                                  names=[client_username])
3199
3200         client_tkt_options = 'forwardable'
3201         expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
3202
3203         checksum_key = self.get_krbtgt_checksum_key()
3204
3205         if client_from_rodc or device_from_rodc:
3206             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
3207             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
3208             rodc_checksum_key = {
3209                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
3210             }
3211
3212         client_tgt = self.get_tgt(client_creds,
3213                                   kdc_options=client_tkt_options,
3214                                   expected_flags=expected_flags)
3215
3216         client_modify_pac_fn = []
3217         if client_sids is not None:
3218             client_modify_pac_fn.append(partial(self.set_pac_sids,
3219                                                 new_sids=client_sids))
3220         if client_claims is not None:
3221             client_modify_pac_fn.append(partial(self.set_pac_claims,
3222                                                 client_claims=client_claims))
3223         client_tgt = self.modified_ticket(
3224             client_tgt,
3225             modify_pac_fn=client_modify_pac_fn,
3226             new_ticket_key=rodc_krbtgt_key if client_from_rodc else None,
3227             checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
3228
3229         if use_fast:
3230             # Create a machine account with which to perform FAST.
3231             mach_creds = self.get_cached_creds(
3232                 account_type=self.AccountType.COMPUTER,
3233                 opts={
3234                     'allowed_replication_mock': device_from_rodc,
3235                     'revealed_to_mock_rodc': device_from_rodc,
3236                 })
3237             mach_tgt = self.get_tgt(mach_creds)
3238             device_modify_pac_fn = []
3239             if device_sids is not None:
3240                 device_modify_pac_fn.append(partial(self.set_pac_sids,
3241                                                     new_sids=device_sids))
3242             if device_claims is not None:
3243                 device_modify_pac_fn.append(partial(self.set_pac_claims,
3244                                                     client_claims=device_claims))
3245             mach_tgt = self.modified_ticket(
3246                 mach_tgt,
3247                 modify_pac_fn=device_modify_pac_fn,
3248                 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
3249                 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
3250         else:
3251             mach_tgt = None
3252
3253         if target_policy is None:
3254             policy = None
3255             assigned_policy = None
3256         else:
3257             sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(client_sid=client_creds.get_sid())}))'
3258             policy = self.create_authn_policy(enforced=True,
3259                                               computer_allowed_to=sddl)
3260             assigned_policy = str(policy.dn)
3261
3262         # Create a target account with the assigned policy.
3263         target_creds = self.get_cached_creds(
3264             account_type=self.AccountType.COMPUTER,
3265             opts={'assigned_policy': assigned_policy})
3266
3267         target_decryption_key = self.TicketDecryptionKey_from_creds(
3268             target_creds)
3269         target_etypes = target_creds.tgs_supported_enctypes
3270
3271         samdb = self.get_samdb()
3272         domain_sid_str = samdb.get_domain_sid()
3273
3274         expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
3275
3276         # Show that obtaining a service ticket is allowed.
3277         self._tgs_req(client_tgt, code, client_creds, target_creds,
3278                       armor_tgt=mach_tgt,
3279                       expected_cname=client_cname,
3280                       expected_account_name=client_username,
3281                       decryption_key=target_decryption_key,
3282                       expected_sid=client_sid,
3283                       expected_groups=expected_groups,
3284                       expect_client_claims=bool(expected_claims) or None,
3285                       expected_client_claims=expected_claims,
3286                       expected_supported_etypes=target_etypes,
3287                       expected_status=status,
3288                       expect_edata=edata)
3289
3290         self.check_tgs_log(client_creds, target_creds,
3291                            policy=policy,
3292                            checked_creds=client_creds,
3293                            status=status,
3294                            event=event,
3295                            reason=reason)
3296
3297     def test_conditional_ace_allowed_from_user_allow(self):
3298         # Create a machine account with which to perform FAST.
3299         mach_creds = self.get_cached_creds(
3300             account_type=self.AccountType.COMPUTER)
3301         mach_tgt = self.get_tgt(mach_creds)
3302
3303         # Create an authentication policy that explicitly allows the machine
3304         # account for a user.
3305         allowed = (f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};'
3306                    f'(Member_of SID({mach_creds.get_sid()})))')
3307         denied = 'O:SYD:(D;;CR;;;WD)'
3308         policy = self.create_authn_policy(enforced=True,
3309                                           user_allowed_from=allowed,
3310                                           service_allowed_from=denied)
3311
3312         # Create a user account with the assigned policy.
3313         client_creds = self._get_creds(account_type=self.AccountType.USER,
3314                                        assigned_policy=policy)
3315
3316         # Show that authentication succeeds.
3317         self._get_tgt(client_creds, armor_tgt=mach_tgt,
3318                       expected_error=0)
3319
3320         self.check_as_log(
3321             client_creds,
3322             armor_creds=mach_creds,
3323             client_policy=policy)
3324
3325     def test_conditional_ace_allowed_from_user_deny(self):
3326         # Create a machine account with which to perform FAST.
3327         mach_creds = self.get_cached_creds(
3328             account_type=self.AccountType.COMPUTER)
3329         mach_tgt = self.get_tgt(mach_creds)
3330
3331         # Create an authentication policy that explicitly denies the machine
3332         # account for a user.
3333         allowed = 'O:SYD:(A;;CR;;;WD)'
3334         denied = (f'O:SYD:(XD;;CR;;;{mach_creds.get_sid()};'
3335                   f'(Member_of SID({mach_creds.get_sid()})))'
3336                   f'(A;;CR;;;WD)')
3337         policy = self.create_authn_policy(enforced=True,
3338                                           user_allowed_from=denied,
3339                                           service_allowed_from=allowed)
3340
3341         # Create a user account with the assigned policy.
3342         client_creds = self._get_creds(account_type=self.AccountType.USER,
3343                                        assigned_policy=policy)
3344
3345         # Show that we get a policy error when trying to authenticate.
3346         self._get_tgt(client_creds, armor_tgt=mach_tgt,
3347                       expected_error=KDC_ERR_POLICY)
3348
3349         self.check_as_log(
3350             client_creds,
3351             armor_creds=mach_creds,
3352             client_policy=policy,
3353             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3354             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3355             reason=AuditReason.ACCESS_DENIED,
3356             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3357
3358
3359 class DeviceRestrictionTests(ConditionalAceBaseTests):
3360     def test_pac_groups_not_present(self):
3361         """Test that authentication fails if the device does not belong to some
3362         required groups.
3363         """
3364
3365         required_sids = {
3366             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3367             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3368         }
3369
3370         # Create a machine account with which to perform FAST.
3371         mach_creds = self.get_cached_creds(
3372             account_type=self.AccountType.COMPUTER,
3373             opts={'id': 'device'})
3374         mach_tgt = self.get_tgt(mach_creds)
3375
3376         # Create an authentication policy that requires the device to belong to
3377         # certain groups.
3378         client_policy_sddl = self.allow_if(
3379             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3380         client_policy = self.create_authn_policy(
3381             enforced=True, user_allowed_from=client_policy_sddl)
3382
3383         # Create a user account with the assigned policy.
3384         client_creds = self._get_creds(account_type=self.AccountType.USER,
3385                                        assigned_policy=client_policy)
3386
3387         # Show that authentication fails.
3388         self._armored_as_req(client_creds,
3389                              self.get_krbtgt_creds(),
3390                              mach_tgt,
3391                              expected_error=KDC_ERR_POLICY)
3392
3393         self.check_as_log(
3394             client_creds,
3395             armor_creds=mach_creds,
3396             client_policy=client_policy,
3397             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3398             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3399             reason=AuditReason.ACCESS_DENIED,
3400             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3401
3402     def test_pac_groups_present(self):
3403         """Test that authentication succeeds if the device belongs to some
3404         required groups.
3405         """
3406
3407         required_sids = {
3408             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3409             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3410         }
3411
3412         device_sids = required_sids | {
3413             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3414             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3415         }
3416
3417         # Create a machine account with which to perform FAST.
3418         mach_creds = self.get_cached_creds(
3419             account_type=self.AccountType.COMPUTER,
3420             opts={'id': 'device'})
3421         mach_tgt = self.get_tgt(mach_creds)
3422
3423         # Add the required groups to the machine account’s TGT.
3424         mach_tgt = self.modified_ticket(
3425             mach_tgt,
3426             modify_pac_fn=partial(self.set_pac_sids,
3427                                   new_sids=device_sids),
3428             checksum_keys=self.get_krbtgt_checksum_key())
3429
3430         # Create an authentication policy that requires the device to belong to
3431         # certain groups.
3432         client_policy_sddl = self.allow_if(
3433             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3434         client_policy = self.create_authn_policy(
3435             enforced=True, user_allowed_from=client_policy_sddl)
3436
3437         # Create a user account with the assigned policy.
3438         client_creds = self._get_creds(account_type=self.AccountType.USER,
3439                                        assigned_policy=client_policy)
3440
3441         # Show that authentication succeeds.
3442         self._armored_as_req(client_creds,
3443                              self.get_krbtgt_creds(),
3444                              mach_tgt)
3445
3446         self.check_as_log(client_creds,
3447                           armor_creds=mach_creds,
3448                           client_policy=client_policy)
3449
3450     def test_pac_resource_groups_present(self):
3451         """Test that authentication succeeds if the device belongs to some
3452         required resource groups.
3453         """
3454
3455         required_sids = {
3456             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3457             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3458             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3459         }
3460
3461         device_sids = required_sids | {
3462             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3463             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3464         }
3465
3466         # Create a machine account with which to perform FAST.
3467         mach_creds = self.get_cached_creds(
3468             account_type=self.AccountType.COMPUTER,
3469             opts={'id': 'device'})
3470         mach_tgt = self.get_tgt(mach_creds)
3471
3472         # Add the required groups to the machine account’s TGT.
3473         mach_tgt = self.modified_ticket(
3474             mach_tgt,
3475             modify_pac_fn=partial(self.set_pac_sids,
3476                                   new_sids=device_sids),
3477             checksum_keys=self.get_krbtgt_checksum_key())
3478
3479         # Create an authentication policy that requires the device to belong to
3480         # certain groups.
3481         client_policy_sddl = self.allow_if(
3482             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3483         client_policy = self.create_authn_policy(
3484             enforced=True, user_allowed_from=client_policy_sddl)
3485
3486         # Create a user account with the assigned policy.
3487         client_creds = self._get_creds(account_type=self.AccountType.USER,
3488                                        assigned_policy=client_policy)
3489
3490         # Show that authentication fails.
3491         self._armored_as_req(client_creds,
3492                              self.get_krbtgt_creds(),
3493                              mach_tgt,
3494                              expected_error=KDC_ERR_POLICY)
3495
3496         self.check_as_log(
3497             client_creds,
3498             armor_creds=mach_creds,
3499             client_policy=client_policy,
3500             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3501             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3502             reason=AuditReason.ACCESS_DENIED,
3503             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3504
3505     def test_pac_resource_groups_present_to_service_sid_compression(self):
3506         """Test that authentication succeeds if the device belongs to some
3507         required resource groups, and the request is to a service that supports
3508         SID compression.
3509         """
3510
3511         required_sids = {
3512             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3513             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3514             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3515         }
3516
3517         device_sids = required_sids | {
3518             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3519             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3520         }
3521
3522         # Create a machine account with which to perform FAST.
3523         mach_creds = self.get_cached_creds(
3524             account_type=self.AccountType.COMPUTER,
3525             opts={'id': 'device'})
3526         mach_tgt = self.get_tgt(mach_creds)
3527
3528         # Add the required groups to the machine account’s TGT.
3529         mach_tgt = self.modified_ticket(
3530             mach_tgt,
3531             modify_pac_fn=partial(self.set_pac_sids,
3532                                   new_sids=device_sids),
3533             checksum_keys=self.get_krbtgt_checksum_key())
3534
3535         # Create an authentication policy that requires the device to belong to
3536         # certain groups.
3537         client_policy_sddl = self.allow_if(
3538             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3539         client_policy = self.create_authn_policy(
3540             enforced=True, user_allowed_from=client_policy_sddl)
3541
3542         # Create a user account with the assigned policy.
3543         client_creds = self._get_creds(account_type=self.AccountType.USER,
3544                                        assigned_policy=client_policy)
3545
3546         target_creds = self.get_cached_creds(
3547             account_type=self.AccountType.COMPUTER,
3548             opts={'id': 'target'})
3549
3550         # Show that authentication fails.
3551         self._armored_as_req(client_creds,
3552                              target_creds,
3553                              mach_tgt,
3554                              expected_error=KDC_ERR_POLICY)
3555
3556         self.check_as_log(
3557             client_creds,
3558             armor_creds=mach_creds,
3559             client_policy=client_policy,
3560             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3561             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3562             reason=AuditReason.ACCESS_DENIED,
3563             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3564
3565     def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3566         """Test that authentication succeeds if the device belongs to some
3567         required resource groups, and the request is to a service that does not
3568         support SID compression.
3569         """
3570
3571         required_sids = {
3572             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3573             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3574             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3575         }
3576
3577         device_sids = required_sids | {
3578             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3579             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3580         }
3581
3582         # Create a machine account with which to perform FAST.
3583         mach_creds = self.get_cached_creds(
3584             account_type=self.AccountType.COMPUTER,
3585             opts={'id': 'device'})
3586         mach_tgt = self.get_tgt(mach_creds)
3587
3588         # Add the required groups to the machine account’s TGT.
3589         mach_tgt = self.modified_ticket(
3590             mach_tgt,
3591             modify_pac_fn=partial(self.set_pac_sids,
3592                                   new_sids=device_sids),
3593             checksum_keys=self.get_krbtgt_checksum_key())
3594
3595         # Create an authentication policy that requires the device to belong to
3596         # certain groups.
3597         client_policy_sddl = self.allow_if(
3598             f'Member_of {self.sddl_array_from_sids(required_sids)}')
3599         client_policy = self.create_authn_policy(
3600             enforced=True, user_allowed_from=client_policy_sddl)
3601
3602         # Create a user account with the assigned policy.
3603         client_creds = self._get_creds(account_type=self.AccountType.USER,
3604                                        assigned_policy=client_policy)
3605
3606         target_creds = self.get_cached_creds(
3607             account_type=self.AccountType.COMPUTER,
3608             opts={
3609                 'id': 'target',
3610                 'supported_enctypes': (
3611                     security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3612                         security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK),
3613                 'sid_compression_support': False,
3614             })
3615
3616         # Show that authentication fails.
3617         self._armored_as_req(client_creds,
3618                              target_creds,
3619                              mach_tgt,
3620                              expected_error=KDC_ERR_POLICY)
3621
3622         self.check_as_log(
3623             client_creds,
3624             armor_creds=mach_creds,
3625             client_policy=client_policy,
3626             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3627             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3628             reason=AuditReason.ACCESS_DENIED,
3629             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3630
3631     def test_pac_well_known_groups_not_present(self):
3632         """Test that authentication fails if the device does not belong to one
3633         or more required well‐known groups.
3634         """
3635
3636         required_sids = {
3637             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3638             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3639             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3640             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3641         }
3642
3643         device_sids = {
3644             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3645             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3646         }
3647
3648         # Create a machine account with which to perform FAST.
3649         mach_creds = self.get_cached_creds(
3650             account_type=self.AccountType.COMPUTER,
3651             opts={'id': 'device'})
3652         mach_tgt = self.get_tgt(mach_creds)
3653
3654         # Modify the machine account’s TGT to contain only the SID of the
3655         # machine account’s primary group.
3656         mach_tgt = self.modified_ticket(
3657             mach_tgt,
3658             modify_pac_fn=partial(self.set_pac_sids,
3659                                   new_sids=device_sids),
3660             checksum_keys=self.get_krbtgt_checksum_key())
3661
3662         # Create an authentication policy that requires the device to belong to
3663         # certain groups.
3664         client_policy_sddl = self.allow_if(
3665             f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3666         client_policy = self.create_authn_policy(
3667             enforced=True, user_allowed_from=client_policy_sddl)
3668
3669         # Create a user account with the assigned policy.
3670         client_creds = self._get_creds(account_type=self.AccountType.USER,
3671                                        assigned_policy=client_policy)
3672
3673         # Show that authentication fails.
3674         self._armored_as_req(client_creds,
3675                              self.get_krbtgt_creds(),
3676                              mach_tgt,
3677                              expected_error=KDC_ERR_POLICY)
3678
3679         self.check_as_log(
3680             client_creds,
3681             armor_creds=mach_creds,
3682             client_policy=client_policy,
3683             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3684             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3685             reason=AuditReason.ACCESS_DENIED,
3686             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3687
3688     def test_pac_device_info(self):
3689         """Test the groups of the client and the device after performing a
3690         FAST‐armored AS‐REQ.
3691         """
3692
3693         device_sids = {
3694             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3695             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3696         }
3697
3698         # Create a machine account with which to perform FAST.
3699         mach_creds = self.get_cached_creds(
3700             account_type=self.AccountType.COMPUTER,
3701             opts={'id': 'device'})
3702         mach_tgt = self.get_tgt(mach_creds)
3703
3704         # Add the required groups to the machine account’s TGT.
3705         mach_tgt = self.modified_ticket(
3706             mach_tgt,
3707             modify_pac_fn=partial(self.set_pac_sids,
3708                                   new_sids=device_sids),
3709             checksum_keys=self.get_krbtgt_checksum_key())
3710
3711         # Create a user account.
3712         client_creds = self._get_creds(account_type=self.AccountType.USER)
3713
3714         target_creds = self.get_cached_creds(
3715             account_type=self.AccountType.COMPUTER,
3716             opts={'id': 'target'})
3717
3718         expected_sids = {
3719             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3720             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3721             # The client’s groups are to include the Asserted Identity and
3722             # Claims Valid SIDs.
3723             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3724             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3725         }
3726
3727         samdb = self.get_samdb()
3728         domain_sid_str = samdb.get_domain_sid()
3729
3730         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3731
3732         # Show that authentication succeeds. Check that the groups in the PAC
3733         # are as expected.
3734         self._armored_as_req(client_creds,
3735                              target_creds,
3736                              mach_tgt,
3737                              expected_groups=expected_sids,
3738                              expect_device_info=False,
3739                              expected_device_groups=None)
3740
3741         self.check_as_log(
3742             client_creds,
3743             armor_creds=mach_creds)
3744
3745     def test_pac_claims_not_present(self):
3746         """Test that authentication fails if the device does not have a
3747         required claim.
3748         """
3749
3750         claim_id = 'the name of the claim'
3751         claim_value = 'the value of the claim'
3752
3753         # Create a machine account with which to perform FAST.
3754         mach_creds = self.get_cached_creds(
3755             account_type=self.AccountType.COMPUTER,
3756             opts={'id': 'device'})
3757         mach_tgt = self.get_tgt(mach_creds)
3758
3759         # Create an authentication policy that requires the device to have a
3760         # certain claim.
3761         client_policy_sddl = self.allow_if(
3762             f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
3763         client_policy = self.create_authn_policy(
3764             enforced=True, user_allowed_from=client_policy_sddl)
3765
3766         # Create a user account with the assigned policy.
3767         client_creds = self._get_creds(account_type=self.AccountType.USER,
3768                                        assigned_policy=client_policy)
3769
3770         # Show that authentication fails.
3771         self._armored_as_req(client_creds,
3772                              self.get_krbtgt_creds(),
3773                              mach_tgt,
3774                              expected_error=KDC_ERR_POLICY)
3775
3776         self.check_as_log(
3777             client_creds,
3778             armor_creds=mach_creds,
3779             client_policy=client_policy,
3780             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3781             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3782             reason=AuditReason.ACCESS_DENIED,
3783             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3784
3785     def test_pac_claims_present(self):
3786         """Test that authentication succeeds if the device has a required
3787         claim.
3788         """
3789
3790         claim_id = 'the name of the claim'
3791         claim_value = 'the value of the claim'
3792
3793         pac_claims = [
3794             (claims.CLAIMS_SOURCE_TYPE_AD, [
3795                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3796             ]),
3797         ]
3798
3799         # Create a machine account with which to perform FAST.
3800         mach_creds = self.get_cached_creds(
3801             account_type=self.AccountType.COMPUTER,
3802             opts={'id': 'device'})
3803         mach_tgt = self.get_tgt(mach_creds)
3804
3805         # Add the required claim to the machine account’s TGT.
3806         mach_tgt = self.modified_ticket(
3807             mach_tgt,
3808             modify_pac_fn=partial(self.set_pac_claims,
3809                                   client_claims=pac_claims),
3810             checksum_keys=self.get_krbtgt_checksum_key())
3811
3812         # Create an authentication policy that requires the device to have a
3813         # certain claim.
3814         client_policy_sddl = self.allow_if(
3815             f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
3816         client_policy = self.create_authn_policy(
3817             enforced=True, user_allowed_from=client_policy_sddl)
3818
3819         # Create a user account with the assigned policy.
3820         client_creds = self._get_creds(account_type=self.AccountType.USER,
3821                                        assigned_policy=client_policy)
3822
3823         # Show that authentication succeeds.
3824         self._armored_as_req(client_creds,
3825                              self.get_krbtgt_creds(),
3826                              mach_tgt)
3827
3828         self.check_as_log(client_creds,
3829                           armor_creds=mach_creds,
3830                           client_policy=client_policy)
3831
3832     def test_pac_claims_invalid(self):
3833         """Test that authentication fails if the device’s required claim is not
3834         valid.
3835         """
3836
3837         claim_id = 'the name of the claim'
3838         claim_value = 'the value of the claim'
3839
3840         pac_claims = [
3841             (claims.CLAIMS_SOURCE_TYPE_AD, [
3842                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3843             ]),
3844         ]
3845
3846         # The device’s SIDs do not include the Claims Valid SID.
3847         device_sids = {
3848             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3849             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3850         }
3851
3852         # Create a machine account with which to perform FAST.
3853         mach_creds = self.get_cached_creds(
3854             account_type=self.AccountType.COMPUTER,
3855             opts={'id': 'device'})
3856         mach_tgt = self.get_tgt(mach_creds)
3857
3858         # Add the SIDs and the required claim to the machine account’s TGT.
3859         mach_tgt = self.modified_ticket(
3860             mach_tgt,
3861             modify_pac_fn=[
3862                 partial(self.set_pac_claims, client_claims=pac_claims),
3863                 partial(self.set_pac_sids, new_sids=device_sids)],
3864             checksum_keys=self.get_krbtgt_checksum_key())
3865
3866         # Create an authentication policy that requires the device to have a
3867         # certain claim.
3868         client_policy_sddl = self.allow_if(
3869             f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
3870         client_policy = self.create_authn_policy(
3871             enforced=True, user_allowed_from=client_policy_sddl)
3872
3873         # Create a user account with the assigned policy.
3874         client_creds = self._get_creds(account_type=self.AccountType.USER,
3875                                        assigned_policy=client_policy)
3876
3877         # Show that authentication fails.
3878         self._armored_as_req(client_creds,
3879                              self.get_krbtgt_creds(),
3880                              mach_tgt,
3881                              expected_error=KDC_ERR_POLICY)
3882
3883         self.check_as_log(
3884             client_creds,
3885             armor_creds=mach_creds,
3886             client_policy=client_policy,
3887             client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3888             event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3889             reason=AuditReason.ACCESS_DENIED,
3890             status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3891
3892     def test_device_in_world_group(self):
3893         self._check_device_in_group(security.SID_WORLD)
3894
3895     def test_device_in_network_group(self):
3896         self._check_device_not_in_group(security.SID_NT_NETWORK)
3897
3898     def test_device_in_authenticated_users(self):
3899         self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
3900
3901     def test_device_in_aa_asserted_identity(self):
3902         self._check_device_in_group(
3903             security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
3904
3905     def test_device_in_service_asserted_identity(self):
3906         self._check_device_not_in_group(security.SID_SERVICE_ASSERTED_IDENTITY)
3907
3908     def test_device_in_compounded_authentication(self):
3909         self._check_device_not_in_group(security.SID_COMPOUNDED_AUTHENTICATION)
3910
3911     def test_device_in_claims_valid(self):
3912         self._check_device_in_group(security.SID_CLAIMS_VALID)
3913
3914     def _check_device_in_group(self, group):
3915         self._check_device_membership(group, expect_in_group=True)
3916
3917     def _check_device_not_in_group(self, group):
3918         self._check_device_membership(group, expect_in_group=False)
3919
3920     def _check_device_membership(self, group, *, expect_in_group):
3921         """Test that authentication succeeds or fails when the device is
3922         required to belong to a certain group.
3923         """
3924
3925         # Create a machine account with which to perform FAST.
3926         mach_creds = self.get_cached_creds(
3927             account_type=self.AccountType.COMPUTER,
3928             opts={'id': 'device'})
3929         mach_tgt = self.get_tgt(mach_creds)
3930
3931         # Create an authentication policy that requires the device to belong to
3932         # a certain group.
3933         in_group_sddl = self.allow_if(f'Member_of {{SID({group})}}')
3934         in_group_policy = self.create_authn_policy(
3935             enforced=True, user_allowed_from=in_group_sddl)
3936
3937         # Create a user account with the assigned policy.
3938         client_creds = self._get_creds(account_type=self.AccountType.USER,
3939                                        assigned_policy=in_group_policy)
3940
3941         krbtgt_creds = self.get_krbtgt_creds()
3942
3943         # Test whether authentication succeeds or fails.
3944         self._armored_as_req(
3945             client_creds,
3946             krbtgt_creds,
3947             mach_tgt,
3948             expected_error=0 if expect_in_group else KDC_ERR_POLICY)
3949
3950         policy_success_args = {}
3951         policy_failure_args = {
3952             'client_policy_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3953             'event': AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3954             'reason': AuditReason.ACCESS_DENIED,
3955             'status': ntstatus.NT_STATUS_INVALID_WORKSTATION,
3956         }
3957
3958         self.check_as_log(client_creds,
3959                           armor_creds=mach_creds,
3960                           client_policy=in_group_policy,
3961                           **(policy_success_args if expect_in_group
3962                              else policy_failure_args))
3963
3964         # Create an authentication policy that requires the device not to belong
3965         # to the group.
3966         not_in_group_sddl = self.allow_if(f'Not_Member_of {{SID({group})}}')
3967         not_in_group_policy = self.create_authn_policy(
3968             enforced=True, user_allowed_from=not_in_group_sddl)
3969
3970         # Create a user account with the assigned policy.
3971         client_creds = self._get_creds(account_type=self.AccountType.USER,
3972                                        assigned_policy=not_in_group_policy)
3973
3974         # Test whether authentication succeeds or fails.
3975         self._armored_as_req(
3976             client_creds,
3977             krbtgt_creds,
3978             mach_tgt,
3979             expected_error=KDC_ERR_POLICY if expect_in_group else 0)
3980
3981         self.check_as_log(client_creds,
3982                           armor_creds=mach_creds,
3983                           client_policy=not_in_group_policy,
3984                           **(policy_failure_args if expect_in_group
3985                              else policy_success_args))
3986
3987
3988 class TgsReqServicePolicyTests(ConditionalAceBaseTests):
3989     def test_pac_groups_not_present(self):
3990         """Test that authorization succeeds if the client does not belong to
3991         some required groups.
3992         """
3993
3994         required_sids = {
3995             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3996             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3997         }
3998
3999         # Create a machine account with which to perform FAST.
4000         mach_creds = self.get_cached_creds(
4001             account_type=self.AccountType.COMPUTER,
4002             opts={'id': 'device'})
4003         mach_tgt = self.get_tgt(mach_creds)
4004
4005         # Create a user account.
4006         client_creds = self._get_creds(account_type=self.AccountType.USER)
4007         client_tgt = self.get_tgt(client_creds)
4008
4009         # Create an authentication policy that requires the client to belong to
4010         # certain groups.
4011         target_policy_sddl = self.allow_if(
4012             f'Member_of {self.sddl_array_from_sids(required_sids)}')
4013         target_policy = self.create_authn_policy(
4014             enforced=True, computer_allowed_to=target_policy_sddl)
4015
4016         # Create a target account with the assigned policy.
4017         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4018                                        assigned_policy=target_policy)
4019
4020         # Show that authorization fails.
4021         self._tgs_req(
4022             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4023             armor_tgt=mach_tgt,
4024             expect_edata=self.expect_padata_outer,
4025             # We aren’t particular about whether or not we get an NTSTATUS.
4026             expect_status=None,
4027             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4028
4029         self.check_tgs_log(
4030             client_creds, target_creds,
4031             policy=target_policy,
4032             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4033             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4034             reason=AuditReason.ACCESS_DENIED)
4035
4036     def test_pac_groups_present(self):
4037         """Test that authorization succeeds if the client belongs to some
4038         required groups.
4039         """
4040
4041         required_sids = {
4042             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
4043             ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
4044         }
4045
4046         client_sids = required_sids | {
4047             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4048             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4049         }
4050
4051         # Create a machine account with which to perform FAST.
4052         mach_creds = self.get_cached_creds(
4053             account_type=self.AccountType.COMPUTER,
4054             opts={'id': 'device'})
4055         mach_tgt = self.get_tgt(mach_creds)
4056
4057         # Create a user account.
4058         client_creds = self._get_creds(account_type=self.AccountType.USER)
4059         client_tgt = self.get_tgt(client_creds)
4060
4061         # Add the required groups to the client’s TGT.
4062         client_tgt = self.modified_ticket(
4063             client_tgt,
4064             modify_pac_fn=partial(self.set_pac_sids,
4065                                   new_sids=client_sids),
4066             checksum_keys=self.get_krbtgt_checksum_key())
4067
4068         # Create an authentication policy that requires the client to belong to
4069         # certain groups.
4070         target_policy_sddl = self.allow_if(
4071             f'Member_of {self.sddl_array_from_sids(required_sids)}')
4072         target_policy = self.create_authn_policy(
4073             enforced=True, computer_allowed_to=target_policy_sddl)
4074
4075         # Create a target account with the assigned policy.
4076         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4077                                        assigned_policy=target_policy)
4078
4079         # Show that authorization succeeds.
4080         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4081
4082         self.check_tgs_log(client_creds, target_creds,
4083                            policy=target_policy)
4084
4085     def test_pac_resource_groups_present_to_service_sid_compression(self):
4086         """Test that authorization succeeds if the client belongs to some
4087         required resource groups, and the request is to a service that supports
4088         SID compression.
4089         """
4090
4091         required_sids = {
4092             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
4093             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
4094             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
4095         }
4096
4097         client_sids = required_sids | {
4098             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4099             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4100         }
4101
4102         # Create a machine account with which to perform FAST.
4103         mach_creds = self.get_cached_creds(
4104             account_type=self.AccountType.COMPUTER,
4105             opts={'id': 'device'})
4106         mach_tgt = self.get_tgt(mach_creds)
4107
4108         # Create a user account.
4109         client_creds = self._get_creds(account_type=self.AccountType.USER)
4110         client_tgt = self.get_tgt(client_creds)
4111
4112         # Add the required groups to the client’s TGT.
4113         client_tgt = self.modified_ticket(
4114             client_tgt,
4115             modify_pac_fn=partial(self.set_pac_sids,
4116                                   new_sids=client_sids),
4117             checksum_keys=self.get_krbtgt_checksum_key())
4118
4119         # Create an authentication policy that requires the client to belong to
4120         # certain groups.
4121         target_policy_sddl = self.allow_if(
4122             f'Member_of {self.sddl_array_from_sids(required_sids)}')
4123         target_policy = self.create_authn_policy(
4124             enforced=True, computer_allowed_to=target_policy_sddl)
4125
4126         # Create a target account with the assigned policy.
4127         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4128                                        assigned_policy=target_policy)
4129
4130         # Show that authorization fails.
4131         self._tgs_req(
4132             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4133             armor_tgt=mach_tgt,
4134             expect_edata=self.expect_padata_outer,
4135             # We aren’t particular about whether or not we get an NTSTATUS.
4136             expect_status=None,
4137             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4138
4139         self.check_tgs_log(
4140             client_creds, target_creds,
4141             policy=target_policy,
4142             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4143             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4144             reason=AuditReason.ACCESS_DENIED)
4145
4146     def test_pac_resource_groups_present_to_service_no_sid_compression(self):
4147         """Test that authorization succeeds if the client belongs to some
4148         required resource groups, and the request is to a service that does not
4149         support SID compression.
4150         """
4151
4152         required_sids = {
4153             ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
4154             ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
4155             ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
4156         }
4157
4158         client_sids = required_sids | {
4159             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4160             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4161         }
4162
4163         # Create a machine account with which to perform FAST.
4164         mach_creds = self.get_cached_creds(
4165             account_type=self.AccountType.COMPUTER,
4166             opts={'id': 'device'})
4167         mach_tgt = self.get_tgt(mach_creds)
4168
4169         # Create a user account.
4170         client_creds = self._get_creds(account_type=self.AccountType.USER)
4171         client_tgt = self.get_tgt(client_creds)
4172
4173         # Add the required groups to the client’s TGT.
4174         client_tgt = self.modified_ticket(
4175             client_tgt,
4176             modify_pac_fn=partial(self.set_pac_sids,
4177                                   new_sids=client_sids),
4178             checksum_keys=self.get_krbtgt_checksum_key())
4179
4180         # Create an authentication policy that requires the client to belong to
4181         # certain groups.
4182         target_policy_sddl = self.allow_if(
4183             f'Member_of {self.sddl_array_from_sids(required_sids)}')
4184         target_policy = self.create_authn_policy(
4185             enforced=True, computer_allowed_to=target_policy_sddl)
4186
4187         # Create a target account with the assigned policy.
4188         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4189                                        assigned_policy=target_policy,
4190                                        additional_details={
4191                                            'msDS-SupportedEncryptionTypes': str((
4192                                                security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
4193                                                    security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) | (
4194                                                        security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED))})
4195
4196         # Show that authorization fails.
4197         self._tgs_req(
4198             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4199             armor_tgt=mach_tgt,
4200             expect_edata=self.expect_padata_outer,
4201             # We aren’t particular about whether or not we get an NTSTATUS.
4202             expect_status=None,
4203             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4204
4205         self.check_tgs_log(
4206             client_creds, target_creds,
4207             policy=target_policy,
4208             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4209             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4210             reason=AuditReason.ACCESS_DENIED)
4211
4212     def test_pac_well_known_groups_not_present(self):
4213         """Test that authorization fails if the client does not belong to one
4214         or more required well‐known groups.
4215         """
4216
4217         required_sids = {
4218             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
4219             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
4220             (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
4221             (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
4222         }
4223
4224         client_sids = {
4225             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4226             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4227         }
4228
4229         # Create a machine account with which to perform FAST.
4230         mach_creds = self.get_cached_creds(
4231             account_type=self.AccountType.COMPUTER,
4232             opts={'id': 'device'})
4233         mach_tgt = self.get_tgt(mach_creds)
4234
4235         # Create a user account.
4236         client_creds = self._get_creds(account_type=self.AccountType.USER)
4237         client_tgt = self.get_tgt(client_creds)
4238
4239         # Modify the client’s TGT to contain only the SID of the client’s
4240         # primary group.
4241         client_tgt = self.modified_ticket(
4242             client_tgt,
4243             modify_pac_fn=partial(self.set_pac_sids,
4244                                   new_sids=client_sids),
4245             checksum_keys=self.get_krbtgt_checksum_key())
4246
4247         # Create an authentication policy that requires the client to belong to
4248         # certain groups.
4249         target_policy_sddl = self.allow_if(
4250             f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
4251         target_policy = self.create_authn_policy(
4252             enforced=True, computer_allowed_to=target_policy_sddl)
4253
4254         # Create a target account with the assigned policy.
4255         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4256                                        assigned_policy=target_policy)
4257
4258         # Show that authorization fails.
4259         self._tgs_req(
4260             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4261             armor_tgt=mach_tgt,
4262             expect_edata=self.expect_padata_outer,
4263             # We aren’t particular about whether or not we get an NTSTATUS.
4264             expect_status=None,
4265             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4266
4267         self.check_tgs_log(
4268             client_creds, target_creds,
4269             policy=target_policy,
4270             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4271             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4272             reason=AuditReason.ACCESS_DENIED)
4273
4274     def test_pac_device_info(self):
4275         self._run_pac_device_info_test()
4276
4277     def test_pac_device_info_target_policy(self):
4278         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4279         self._run_pac_device_info_test(target_policy=target_policy)
4280
4281     def test_pac_device_info_rodc_issued(self):
4282         self._run_pac_device_info_test(rodc_issued=True)
4283
4284     def test_pac_device_info_existing_device_info(self):
4285         self._run_pac_device_info_test(existing_device_info=True)
4286
4287     def test_pac_device_info_existing_device_info_target_policy(self):
4288         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4289         self._run_pac_device_info_test(target_policy=target_policy,
4290                                        existing_device_info=True)
4291
4292     def test_pac_device_info_existing_device_info_rodc_issued(self):
4293         self._run_pac_device_info_test(rodc_issued=True,
4294                                        existing_device_info=True)
4295
4296     def test_pac_device_info_existing_device_claims(self):
4297         self._run_pac_device_info_test(existing_device_claims=True)
4298
4299     def test_pac_device_info_existing_device_claims_target_policy(self):
4300         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4301         self._run_pac_device_info_test(target_policy=target_policy,
4302                                        existing_device_claims=True)
4303
4304     def test_pac_device_info_existing_device_claims_rodc_issued(self):
4305         self._run_pac_device_info_test(rodc_issued=True,
4306                                        existing_device_claims=True)
4307
4308     def test_pac_device_info_existing_device_info_and_claims(self):
4309         self._run_pac_device_info_test(existing_device_claims=True,
4310                                        existing_device_info=True)
4311
4312     def test_pac_device_info_existing_device_info_and_claims_target_policy(self):
4313         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4314         self._run_pac_device_info_test(target_policy=target_policy,
4315                                        existing_device_claims=True,
4316                                        existing_device_info=True)
4317
4318     def test_pac_device_info_existing_device_info_and_claims_rodc_issued(self):
4319         self._run_pac_device_info_test(rodc_issued=True,
4320                                        existing_device_claims=True,
4321                                        existing_device_info=True)
4322
4323     def test_pac_device_info_no_compound_id_support(self):
4324         self._run_pac_device_info_test(compound_id_support=False)
4325
4326     def test_pac_device_info_no_compound_id_support_target_policy(self):
4327         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4328         self._run_pac_device_info_test(target_policy=target_policy,
4329                                        compound_id_support=False)
4330
4331     def test_pac_device_info_no_compound_id_support_rodc_issued(self):
4332         self._run_pac_device_info_test(rodc_issued=True,
4333                                        compound_id_support=False)
4334
4335     def test_pac_device_info_no_compound_id_support_existing_device_info(self):
4336         self._run_pac_device_info_test(compound_id_support=False,
4337                                        existing_device_info=True)
4338
4339     def test_pac_device_info_no_compound_id_support_existing_device_info_target_policy(self):
4340         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4341         self._run_pac_device_info_test(target_policy=target_policy,
4342                                        compound_id_support=False,
4343                                        existing_device_info=True)
4344
4345     def test_pac_device_info_no_compound_id_support_existing_device_info_rodc_issued(self):
4346         self._run_pac_device_info_test(rodc_issued=True,
4347                                        compound_id_support=False,
4348                                        existing_device_info=True)
4349
4350     def test_pac_device_info_no_compound_id_support_existing_device_claims(self):
4351         self._run_pac_device_info_test(compound_id_support=False,
4352                                        existing_device_claims=True)
4353
4354     def test_pac_device_info_no_compound_id_support_existing_device_claims_target_policy(self):
4355         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4356         self._run_pac_device_info_test(target_policy=target_policy,
4357                                        compound_id_support=False,
4358                                        existing_device_claims=True)
4359
4360     def test_pac_device_info_no_compound_id_support_existing_device_claims_rodc_issued(self):
4361         self._run_pac_device_info_test(rodc_issued=True,
4362                                        compound_id_support=False,
4363                                        existing_device_claims=True)
4364
4365     def test_pac_device_info_no_compound_id_support_existing_device_info_and_claims(self):
4366         self._run_pac_device_info_test(compound_id_support=False,
4367                                        existing_device_claims=True,
4368                                        existing_device_info=True)
4369
4370     def test_pac_device_info_no_compound_id_support_existing_device_info_and_claims_target_policy(self):
4371         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4372         self._run_pac_device_info_test(target_policy=target_policy,
4373                                        compound_id_support=False,
4374                                        existing_device_claims=True,
4375                                        existing_device_info=True)
4376
4377     def test_pac_device_info_no_compound_id_support_existing_device_info_and_claims_rodc_issued(self):
4378         self._run_pac_device_info_test(rodc_issued=True,
4379                                        compound_id_support=False,
4380                                        existing_device_claims=True,
4381                                        existing_device_info=True)
4382
4383     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info(self):
4384         self._run_pac_device_info_test(device_claims_valid=False,
4385                                        compound_id_support=False,
4386                                        existing_device_info=True)
4387
4388     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_target_policy(self):
4389         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4390         self._run_pac_device_info_test(target_policy=target_policy,
4391                                        device_claims_valid=False,
4392                                        compound_id_support=False,
4393                                        existing_device_info=True)
4394
4395     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_rodc_issued(self):
4396         self._run_pac_device_info_test(rodc_issued=True,
4397                                        device_claims_valid=False,
4398                                        compound_id_support=False,
4399                                        existing_device_info=True)
4400
4401     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_claims(self):
4402         self._run_pac_device_info_test(device_claims_valid=False,
4403                                        compound_id_support=False,
4404                                        existing_device_claims=True)
4405
4406     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_claims_target_policy(self):
4407         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4408         self._run_pac_device_info_test(target_policy=target_policy,
4409                                        device_claims_valid=False,
4410                                        compound_id_support=False,
4411                                        existing_device_claims=True)
4412
4413     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_claims_rodc_issued(self):
4414         self._run_pac_device_info_test(rodc_issued=True,
4415                                        device_claims_valid=False,
4416                                        compound_id_support=False,
4417                                        existing_device_claims=True)
4418
4419     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_and_claims(self):
4420         self._run_pac_device_info_test(device_claims_valid=False,
4421                                        compound_id_support=False,
4422                                        existing_device_claims=True,
4423                                        existing_device_info=True)
4424
4425     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_and_claims_target_policy(self):
4426         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4427         self._run_pac_device_info_test(target_policy=target_policy,
4428                                        device_claims_valid=False,
4429                                        compound_id_support=False,
4430                                        existing_device_claims=True,
4431                                        existing_device_info=True)
4432
4433     def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_and_claims_rodc_issued(self):
4434         self._run_pac_device_info_test(rodc_issued=True,
4435                                        device_claims_valid=False,
4436                                        compound_id_support=False,
4437                                        existing_device_claims=True,
4438                                        existing_device_info=True)
4439
4440     def test_pac_device_info_no_claims_valid(self):
4441         self._run_pac_device_info_test(device_claims_valid=False)
4442
4443     def test_pac_device_info_no_claims_valid_target_policy(self):
4444         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4445         self._run_pac_device_info_test(target_policy=target_policy,
4446                                        device_claims_valid=False)
4447
4448     def test_pac_device_info_no_claims_valid_rodc_issued(self):
4449         self._run_pac_device_info_test(rodc_issued=True,
4450                                        device_claims_valid=False)
4451
4452     def test_pac_device_info_no_claims_valid_existing_device_info(self):
4453         self._run_pac_device_info_test(device_claims_valid=False,
4454                                        existing_device_info=True)
4455
4456     def test_pac_device_info_no_claims_valid_existing_device_info_target_policy(self):
4457         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4458         self._run_pac_device_info_test(target_policy=target_policy,
4459                                        device_claims_valid=False,
4460                                        existing_device_info=True)
4461
4462     def test_pac_device_info_no_claims_valid_existing_device_info_rodc_issued(self):
4463         self._run_pac_device_info_test(rodc_issued=True,
4464                                        device_claims_valid=False,
4465                                        existing_device_info=True)
4466
4467     def test_pac_device_info_no_claims_valid_existing_device_claims(self):
4468         self._run_pac_device_info_test(device_claims_valid=False,
4469                                        existing_device_claims=True)
4470
4471     def test_pac_device_info_no_claims_valid_existing_device_claims_target_policy(self):
4472         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4473         self._run_pac_device_info_test(target_policy=target_policy,
4474                                        device_claims_valid=False,
4475                                        existing_device_claims=True)
4476
4477     def test_pac_device_info_no_claims_valid_existing_device_claims_rodc_issued(self):
4478         self._run_pac_device_info_test(rodc_issued=True,
4479                                        device_claims_valid=False,
4480                                        existing_device_claims=True)
4481
4482     def test_pac_device_info_no_claims_valid_existing_device_info_and_claims(self):
4483         self._run_pac_device_info_test(device_claims_valid=False,
4484                                        existing_device_claims=True,
4485                                        existing_device_info=True)
4486
4487     def test_pac_device_info_no_claims_valid_existing_device_info_and_claims_target_policy(self):
4488         target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4489         self._run_pac_device_info_test(target_policy=target_policy,
4490                                        device_claims_valid=False,
4491                                        existing_device_claims=True,
4492                                        existing_device_info=True)
4493
4494     def test_pac_device_info_no_claims_valid_existing_device_info_and_claims_rodc_issued(self):
4495         self._run_pac_device_info_test(rodc_issued=True,
4496                                        device_claims_valid=False,
4497                                        existing_device_claims=True,
4498                                        existing_device_info=True)
4499
4500     def _run_pac_device_info_test(self, *,
4501                                   target_policy=None,
4502                                   rodc_issued=False,
4503                                   compound_id_support=True,
4504                                   device_claims_valid=True,
4505                                   existing_device_claims=False,
4506                                   existing_device_info=False):
4507         """Test the groups of the client and the device after performing a
4508         FAST‐armored TGS‐REQ.
4509         """
4510
4511         client_claim_id = 'the name of the client’s client claim'
4512         client_claim_value = 'the value of the client’s client claim'
4513
4514         client_claims = [
4515             (claims.CLAIMS_SOURCE_TYPE_AD, [
4516                 (client_claim_id, claims.CLAIM_TYPE_STRING, [client_claim_value]),
4517             ]),
4518         ]
4519
4520         if not rodc_issued:
4521             expected_client_claims = {
4522                 client_claim_id: {
4523                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
4524                     'type': claims.CLAIM_TYPE_STRING,
4525                     'values': (client_claim_value,),
4526                 },
4527             }
4528         else:
4529             expected_client_claims = None
4530
4531         device_claim_id = 'the name of the device’s client claim'
4532         device_claim_value = 'the value of the device’s client claim'
4533
4534         device_claims = [
4535             (claims.CLAIMS_SOURCE_TYPE_AD, [
4536                 (device_claim_id, claims.CLAIM_TYPE_STRING, [device_claim_value]),
4537             ]),
4538         ]
4539
4540         existing_claim_id = 'the name of an existing device claim'
4541         existing_claim_value = 'the value of an existing device claim'
4542
4543         existing_claims = [
4544             (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
4545                 (existing_claim_id, claims.CLAIM_TYPE_STRING, [existing_claim_value]),
4546             ]),
4547         ]
4548
4549         if rodc_issued:
4550             expected_device_claims = None
4551         elif existing_device_info and existing_device_claims:
4552             expected_device_claims = {
4553                 existing_claim_id: {
4554                     'source_type': claims.CLAIMS_SOURCE_TYPE_CERTIFICATE,
4555                     'type': claims.CLAIM_TYPE_STRING,
4556                     'values': (existing_claim_value,),
4557                 },
4558             }
4559         elif compound_id_support and not existing_device_info and not existing_device_claims:
4560             expected_device_claims = {
4561                 device_claim_id: {
4562                     'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
4563                     'type': claims.CLAIM_TYPE_STRING,
4564                     'values': (device_claim_value,),
4565                 },
4566             }
4567         else:
4568             expected_device_claims = None
4569
4570         # Create a machine account with which to perform FAST.
4571         mach_creds = self.get_cached_creds(
4572             account_type=self.AccountType.COMPUTER,
4573             opts={'id': 'device'})
4574         mach_tgt = self.get_tgt(mach_creds)
4575
4576         client_sids = {
4577             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4578             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4579             # This to ensure we have EXTRA_SIDS set already, as
4580             # windows won't set that flag otherwise when adding one
4581             # more
4582             ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
4583         }
4584
4585         device_sid_0 = 'S-1-3-4-5'
4586         device_sid_1 = 'S-1-4-5-6'
4587
4588         policy_sids = {
4589             'device_0': device_sid_0,
4590             'device_1': device_sid_1,
4591         }
4592
4593         device_sids = {
4594             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4595             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4596             (device_sid_0, SidType.EXTRA_SID, self.resource_attrs),
4597             (device_sid_1, SidType.EXTRA_SID, self.resource_attrs),
4598         }
4599
4600         if device_claims_valid:
4601             device_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
4602
4603         checksum_key = self.get_krbtgt_checksum_key()
4604
4605         # Modify the machine account’s TGT to contain only the SID of the
4606         # machine account’s primary group.
4607         mach_tgt = self.modified_ticket(
4608             mach_tgt,
4609             modify_pac_fn=[
4610                 partial(self.set_pac_sids,
4611                         new_sids=device_sids),
4612                 partial(self.set_pac_claims, client_claims=device_claims),
4613             ],
4614             checksum_keys=checksum_key)
4615
4616         # Create a user account.
4617         client_creds = self.get_cached_creds(
4618             account_type=self.AccountType.USER,
4619             opts={
4620                 'allowed_replication_mock': rodc_issued,
4621                 'revealed_to_mock_rodc': rodc_issued,
4622             })
4623         client_tgt = self.get_tgt(client_creds)
4624
4625         client_modify_pac_fns = [
4626             partial(self.set_pac_sids,
4627                     new_sids=client_sids),
4628             partial(self.set_pac_claims, client_claims=client_claims),
4629         ]
4630
4631         if existing_device_claims:
4632             client_modify_pac_fns.append(
4633                 partial(self.set_pac_claims, device_claims=existing_claims))
4634         if existing_device_info:
4635             # These are different from the SIDs in the device’s TGT.
4636             existing_sid_0 = 'S-1-7-8-9'
4637             existing_sid_1 = 'S-1-9-8-7'
4638
4639             policy_sids.update({
4640                 'existing_0': existing_sid_0,
4641                 'existing_1': existing_sid_1,
4642             })
4643
4644             existing_sids = {
4645                 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4646                 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4647                 (existing_sid_0, SidType.EXTRA_SID, self.resource_attrs),
4648                 (existing_sid_1, SidType.EXTRA_SID, self.resource_attrs),
4649             }
4650
4651             client_modify_pac_fns.append(partial(
4652                 self.set_pac_device_sids, new_sids=existing_sids, user_rid=mach_creds.get_rid()))
4653
4654         if rodc_issued:
4655             rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
4656             rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
4657             rodc_checksum_key = {
4658                 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
4659             }
4660
4661         # Modify the client’s TGT to contain only the SID of the client’s
4662         # primary group.
4663         client_tgt = self.modified_ticket(
4664             client_tgt,
4665             modify_pac_fn=client_modify_pac_fns,
4666             new_ticket_key=rodc_krbtgt_key if rodc_issued else None,
4667             checksum_keys=rodc_checksum_key if rodc_issued else checksum_key)
4668
4669         if target_policy is None:
4670             policy = None
4671             assigned_policy = None
4672         else:
4673             policy = self.create_authn_policy(
4674                 enforced=True,
4675                 computer_allowed_to=target_policy.format_map(policy_sids))
4676             assigned_policy = str(policy.dn)
4677
4678         target_creds = self.get_cached_creds(
4679             account_type=self.AccountType.COMPUTER,
4680             opts={
4681                 'supported_enctypes':
4682                     security.KERB_ENCTYPE_RC4_HMAC_MD5
4683                     | security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96,
4684                 # Indicate that Compound Identity is supported.
4685                 'compound_id_support': compound_id_support,
4686                 'assigned_policy': assigned_policy,
4687             })
4688
4689         expected_sids = {
4690             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4691             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4692             # The client’s groups are not to include the Asserted Identity and
4693             # Claims Valid SIDs.
4694         }
4695         if rodc_issued:
4696             expected_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
4697         else:
4698             expected_sids.add(('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs))
4699
4700         if rodc_issued:
4701             expected_device_sids = None
4702         elif existing_device_info:
4703             expected_device_sids = {
4704                 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4705                 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4706                 ('S-1-7-8-9', SidType.EXTRA_SID, self.resource_attrs),
4707                 ('S-1-9-8-7', SidType.EXTRA_SID, self.resource_attrs),
4708             }
4709         elif compound_id_support and not existing_device_claims:
4710             expected_sids.add((security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs))
4711
4712             expected_device_sids = {
4713                 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4714                 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4715                 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
4716                 ('S-1-4-5-6', SidType.EXTRA_SID, self.resource_attrs),
4717             }
4718
4719             if device_claims_valid:
4720                 expected_device_sids.add(frozenset([(security.SID_CLAIMS_VALID, SidType.RESOURCE_SID, self.default_attrs)]))
4721         else:
4722             expected_device_sids = None
4723
4724         samdb = self.get_samdb()
4725         domain_sid_str = samdb.get_domain_sid()
4726
4727         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
4728         # The device SIDs will be put into the PAC unmodified.
4729         expected_device_sids = self.map_sids(expected_device_sids, None, domain_sid_str)
4730
4731         # Show that authorization succeeds.
4732         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
4733                       expected_groups=expected_sids,
4734                       expect_device_info=bool(expected_device_sids),
4735                       expected_device_domain_sid=domain_sid_str,
4736                       expected_device_groups=expected_device_sids,
4737                       expect_client_claims=True,
4738                       expected_client_claims=expected_client_claims,
4739                       expect_device_claims=bool(expected_device_claims),
4740                       expected_device_claims=expected_device_claims)
4741
4742         self.check_tgs_log(client_creds, target_creds, policy=policy)
4743
4744     def test_pac_extra_sids_behaviour(self):
4745         """Test the groups of the client and the device after performing a
4746         FAST‐armored TGS‐REQ.
4747         """
4748
4749         # Create a machine account with which to perform FAST.
4750         mach_creds = self.get_cached_creds(
4751             account_type=self.AccountType.COMPUTER,
4752             opts={'id': 'device'})
4753         mach_tgt = self.get_tgt(mach_creds)
4754
4755         client_sids = {
4756             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4757             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4758         }
4759
4760         # Create a user account.
4761         client_creds = self._get_creds(account_type=self.AccountType.USER)
4762         client_tgt = self.get_tgt(client_creds)
4763
4764         # Modify the client’s TGT to contain only the SID of the client’s
4765         # primary group.
4766         client_tgt = self.modified_ticket(
4767             client_tgt,
4768             modify_pac_fn=partial(self.set_pac_sids,
4769                                   new_sids=client_sids),
4770             checksum_keys=self.get_krbtgt_checksum_key())
4771
4772         # Indicate that Compound Identity is supported.
4773         target_creds, _ = self.get_target(to_krbtgt=False, compound_id=True)
4774
4775         expected_sids = {
4776             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4777             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4778             (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs)
4779             # The client’s groups are not to include the Asserted Identity and
4780             # Claims Valid SIDs.
4781         }
4782
4783         samdb = self.get_samdb()
4784         domain_sid_str = samdb.get_domain_sid()
4785
4786         expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
4787
4788         # Show that authorization succeeds.
4789         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
4790                       expected_groups=expected_sids)
4791
4792         self.check_tgs_log(client_creds, target_creds)
4793
4794     def test_pac_claims_not_present(self):
4795         """Test that authentication fails if the device does not have a
4796         required claim.
4797         """
4798
4799         claim_id = 'the name of the claim'
4800         claim_value = 'the value of the claim'
4801
4802         # Create a machine account with which to perform FAST.
4803         mach_creds = self.get_cached_creds(
4804             account_type=self.AccountType.COMPUTER,
4805             opts={'id': 'device'})
4806         mach_tgt = self.get_tgt(mach_creds)
4807
4808         # Create an authentication policy that requires the device to have a
4809         # certain claim.
4810         target_policy_sddl = self.allow_if(
4811             f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
4812         target_policy = self.create_authn_policy(
4813             enforced=True, computer_allowed_to=target_policy_sddl)
4814
4815         # Create a user account.
4816         client_creds = self._get_creds(account_type=self.AccountType.USER)
4817         client_tgt = self.get_tgt(client_creds)
4818
4819         # Create a target account with the assigned policy.
4820         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4821                                        assigned_policy=target_policy)
4822
4823         # Show that authorization fails.
4824         self._tgs_req(
4825             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4826             armor_tgt=mach_tgt,
4827             expect_edata=self.expect_padata_outer,
4828             # We aren’t particular about whether or not we get an NTSTATUS.
4829             expect_status=None,
4830             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4831
4832         self.check_tgs_log(
4833             client_creds,
4834             target_creds,
4835             policy=target_policy,
4836             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4837             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4838             reason=AuditReason.ACCESS_DENIED)
4839
4840     def test_pac_claims_present(self):
4841         """Test that authentication succeeds if the user has a required
4842         claim.
4843         """
4844
4845         claim_id = 'the name of the claim'
4846         claim_value = 'the value of the claim'
4847
4848         pac_claims = [
4849             (claims.CLAIMS_SOURCE_TYPE_AD, [
4850                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4851             ]),
4852         ]
4853
4854         # Create a machine account with which to perform FAST.
4855         mach_creds = self.get_cached_creds(
4856             account_type=self.AccountType.COMPUTER,
4857             opts={'id': 'device'})
4858         mach_tgt = self.get_tgt(mach_creds)
4859
4860         # Create an authentication policy that requires the user to have a
4861         # certain claim.
4862         target_policy_sddl = self.allow_if(
4863             f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
4864         target_policy = self.create_authn_policy(
4865             enforced=True, computer_allowed_to=target_policy_sddl)
4866
4867         # Create a user account.
4868         client_creds = self._get_creds(account_type=self.AccountType.USER)
4869         client_tgt = self.get_tgt(client_creds)
4870
4871         # Add the required claim to the client’s TGT.
4872         client_tgt = self.modified_ticket(
4873             client_tgt,
4874             modify_pac_fn=partial(self.set_pac_claims,
4875                                   client_claims=pac_claims),
4876             checksum_keys=self.get_krbtgt_checksum_key())
4877
4878         # Create a target account with the assigned policy.
4879         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4880                                        assigned_policy=target_policy)
4881
4882         # Show that authorization succeeds.
4883         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4884
4885         self.check_tgs_log(client_creds, target_creds,
4886                            policy=target_policy)
4887
4888     def test_pac_claims_invalid(self):
4889         """Test that authentication fails if the device’s required claim is not
4890         valid.
4891         """
4892
4893         claim_id = 'the name of the claim'
4894         claim_value = 'the value of the claim'
4895
4896         pac_claims = [
4897             (claims.CLAIMS_SOURCE_TYPE_AD, [
4898                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4899             ]),
4900         ]
4901
4902         # The device’s SIDs do not include the Claims Valid SID.
4903         device_sids = {
4904             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4905             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4906         }
4907
4908         # Create a machine account with which to perform FAST.
4909         mach_creds = self.get_cached_creds(
4910             account_type=self.AccountType.COMPUTER,
4911             opts={'id': 'device'})
4912         mach_tgt = self.get_tgt(mach_creds)
4913
4914         # Create an authentication policy that requires the device to have a
4915         # certain claim.
4916         target_policy_sddl = self.allow_if(
4917             f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
4918         target_policy = self.create_authn_policy(
4919             enforced=True, computer_allowed_to=target_policy_sddl)
4920
4921         # Create a user account.
4922         client_creds = self._get_creds(account_type=self.AccountType.USER)
4923         client_tgt = self.get_tgt(client_creds)
4924
4925         # Add the SIDs and the required claim to the client’s TGT.
4926         client_tgt = self.modified_ticket(
4927             client_tgt,
4928             modify_pac_fn=[
4929                 partial(self.set_pac_claims, client_claims=pac_claims),
4930                 partial(self.set_pac_sids, new_sids=device_sids)],
4931             checksum_keys=self.get_krbtgt_checksum_key())
4932
4933         # Create a target account with the assigned policy.
4934         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4935                                        assigned_policy=target_policy)
4936
4937         # Show that authorization fails.
4938         self._tgs_req(
4939             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4940             armor_tgt=mach_tgt,
4941             expect_edata=self.expect_padata_outer,
4942             # We aren’t particular about whether or not we get an NTSTATUS.
4943             expect_status=None,
4944             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4945
4946         self.check_tgs_log(
4947             client_creds,
4948             target_creds,
4949             policy=target_policy,
4950             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4951             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4952             reason=AuditReason.ACCESS_DENIED)
4953
4954     def test_pac_device_claims_not_present(self):
4955         """Test that authorization fails if the device does not have a
4956         required claim.
4957         """
4958
4959         claim_id = 'the name of the claim'
4960         claim_value = 'the value of the claim'
4961
4962         # Create a machine account with which to perform FAST.
4963         mach_creds = self.get_cached_creds(
4964             account_type=self.AccountType.COMPUTER,
4965             opts={'id': 'device'})
4966         mach_tgt = self.get_tgt(mach_creds)
4967
4968         # Create an authentication policy that requires the device to have a
4969         # certain device claim.
4970         target_policy_sddl = self.allow_if(
4971             f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
4972         target_policy = self.create_authn_policy(
4973             enforced=True, computer_allowed_to=target_policy_sddl)
4974
4975         # Create a user account.
4976         client_creds = self._get_creds(account_type=self.AccountType.USER)
4977         client_tgt = self.get_tgt(client_creds)
4978
4979         # Create a target account with the assigned policy.
4980         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4981                                        assigned_policy=target_policy)
4982
4983         # Show that authorization fails.
4984         self._tgs_req(
4985             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4986             armor_tgt=mach_tgt,
4987             expect_edata=self.expect_padata_outer,
4988             # We aren’t particular about whether or not we get an NTSTATUS.
4989             expect_status=None,
4990             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4991
4992         self.check_tgs_log(
4993             client_creds,
4994             target_creds,
4995             policy=target_policy,
4996             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4997             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4998             reason=AuditReason.ACCESS_DENIED)
4999
5000     def test_pac_device_claims_present(self):
5001         """Test that authorization succeeds if the device has a required claim.
5002         """
5003
5004         claim_id = 'the name of the claim'
5005         claim_value = 'the value of the claim'
5006
5007         pac_claims = [
5008             (claims.CLAIMS_SOURCE_TYPE_AD, [
5009                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
5010             ]),
5011         ]
5012
5013         # Create a machine account with which to perform FAST.
5014         mach_creds = self.get_cached_creds(
5015             account_type=self.AccountType.COMPUTER,
5016             opts={'id': 'device'})
5017         mach_tgt = self.get_tgt(mach_creds)
5018
5019         # Add the required claim to the machine account’s TGT.
5020         mach_tgt = self.modified_ticket(
5021             mach_tgt,
5022             modify_pac_fn=partial(self.set_pac_claims,
5023                                   client_claims=pac_claims),
5024             checksum_keys=self.get_krbtgt_checksum_key())
5025
5026         # Create an authentication policy that requires the device to have a
5027         # certain device claim.
5028         target_policy_sddl = self.allow_if(
5029             f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
5030         target_policy = self.create_authn_policy(
5031             enforced=True, computer_allowed_to=target_policy_sddl)
5032
5033         # Create a user account.
5034         client_creds = self._get_creds(account_type=self.AccountType.USER)
5035         client_tgt = self.get_tgt(client_creds)
5036
5037         # Create a target account with the assigned policy.
5038         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5039                                        assigned_policy=target_policy)
5040
5041         # Show that authorization succeeds.
5042         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
5043
5044         self.check_tgs_log(client_creds, target_creds,
5045                            policy=target_policy)
5046
5047     def test_pac_device_claims_invalid(self):
5048         """Test that authorization fails if the device’s required claim is not
5049         valid.
5050         """
5051
5052         claim_id = 'the name of the claim'
5053         claim_value = 'the value of the claim'
5054
5055         pac_claims = [
5056             (claims.CLAIMS_SOURCE_TYPE_AD, [
5057                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
5058             ]),
5059         ]
5060
5061         # The device’s SIDs do not include the Claims Valid SID.
5062         device_sids = {
5063             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5064             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5065         }
5066
5067         # Create a machine account with which to perform FAST.
5068         mach_creds = self.get_cached_creds(
5069             account_type=self.AccountType.COMPUTER,
5070             opts={'id': 'device'})
5071         mach_tgt = self.get_tgt(mach_creds)
5072
5073         # Add the SIDs and the required claim to the machine account’s TGT.
5074         mach_tgt = self.modified_ticket(
5075             mach_tgt,
5076             modify_pac_fn=[
5077                 partial(self.set_pac_claims, client_claims=pac_claims),
5078                 partial(self.set_pac_sids, new_sids=device_sids)],
5079             checksum_keys=self.get_krbtgt_checksum_key())
5080
5081         # Create an authentication policy that requires the device to have a
5082         # certain claim.
5083         target_policy_sddl = self.allow_if(
5084             f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
5085         target_policy = self.create_authn_policy(
5086             enforced=True, computer_allowed_to=target_policy_sddl)
5087
5088         # Create a user account.
5089         client_creds = self._get_creds(account_type=self.AccountType.USER)
5090         client_tgt = self.get_tgt(client_creds)
5091
5092         # Create a target account with the assigned policy.
5093         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5094                                        assigned_policy=target_policy)
5095
5096         # Show that authorization fails.
5097         self._tgs_req(
5098             client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
5099             armor_tgt=mach_tgt,
5100             expect_edata=self.expect_padata_outer,
5101             # We aren’t particular about whether or not we get an NTSTATUS.
5102             expect_status=None,
5103             expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
5104
5105         self.check_tgs_log(
5106             client_creds,
5107             target_creds,
5108             policy=target_policy,
5109             status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
5110             event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
5111             reason=AuditReason.ACCESS_DENIED)
5112
5113     def test_pac_device_claims_invalid_no_attrs(self):
5114         """Test that authorization fails if the device’s required claim is not
5115         valid.
5116         """
5117
5118         claim_id = 'the name of the claim'
5119         claim_value = 'the value of the claim'
5120
5121         pac_claims = [
5122             (claims.CLAIMS_SOURCE_TYPE_AD, [
5123                 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
5124             ]),
5125         ]
5126
5127         device_sids = {
5128             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5129             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5130             # The device’s SIDs include the Claims Valid SID, but it has no
5131             # attributes.
5132             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, 0),
5133         }
5134
5135         # Create a machine account with which to perform FAST.
5136         mach_creds = self.get_cached_creds(
5137             account_type=self.AccountType.COMPUTER,
5138             opts={'id': 'device'})
5139         mach_tgt = self.get_tgt(mach_creds)
5140
5141         # Add the SIDs and the required claim to the machine account’s TGT.
5142         mach_tgt = self.modified_ticket(
5143             mach_tgt,
5144             modify_pac_fn=[
5145                 partial(self.set_pac_claims, client_claims=pac_claims),
5146                 partial(self.set_pac_sids, new_sids=device_sids)],
5147             checksum_keys=self.get_krbtgt_checksum_key())
5148
5149         # Create an authentication policy that requires the device to have a
5150         # certain claim.
5151         target_policy_sddl = self.allow_if(
5152             f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
5153         target_policy = self.create_authn_policy(
5154             enforced=True, computer_allowed_to=target_policy_sddl)
5155
5156         # Create a user account.
5157         client_creds = self._get_creds(account_type=self.AccountType.USER)
5158         client_tgt = self.get_tgt(client_creds)
5159
5160         # Create a target account with the assigned policy.
5161         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5162                                        assigned_policy=target_policy)
5163
5164         # Show that authorization succeeds.
5165         self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
5166
5167         self.check_tgs_log(client_creds, target_creds,
5168                            policy=target_policy)
5169
5170     def test_simple_as_req_client_and_target_policy(self):
5171         # Create a machine account with which to perform FAST.
5172         mach_creds = self.get_cached_creds(
5173             account_type=self.AccountType.COMPUTER)
5174         mach_tgt = self.get_tgt(mach_creds)
5175
5176         # Create an authentication policy that explicitly allows the machine
5177         # account for a user.
5178         client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of {{SID({mach_creds.get_sid()}), SID({mach_creds.get_sid()})}}))'
5179         client_policy = self.create_authn_policy(enforced=True,
5180                                                  user_allowed_from=client_policy_sddl)
5181
5182         # Create a user account with the assigned policy.
5183         client_creds = self._get_creds(account_type=self.AccountType.USER,
5184                                        assigned_policy=client_policy)
5185
5186         # Create an authentication policy that applies to a computer and
5187         # explicitly allows the user account to obtain a service ticket.
5188         target_policy_sddl = f'O:SYD:(XA;;CR;;;{client_creds.get_sid()};(Member_of SID({client_creds.get_sid()})))'
5189         target_policy = self.create_authn_policy(enforced=True,
5190                                                  computer_allowed_to=target_policy_sddl)
5191
5192         # Create a computer account with the assigned policy.
5193         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5194                                        assigned_policy=target_policy)
5195
5196         expected_groups = {
5197             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5198             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5199             (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
5200             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
5201         }
5202
5203         # Show that obtaining a service ticket with an AS‐REQ is allowed.
5204         self._armored_as_req(client_creds,
5205                           target_creds,
5206                           mach_tgt,
5207                           expected_groups=expected_groups)
5208
5209         self.check_as_log(client_creds,
5210                           armor_creds=mach_creds,
5211                           client_policy=client_policy,
5212                           server_policy=target_policy)
5213
5214     def test_device_in_world_group(self):
5215         self._check_device_in_group(security.SID_WORLD)
5216
5217     def test_device_in_network_group(self):
5218         self._check_device_not_in_group(security.SID_NT_NETWORK)
5219
5220     def test_device_in_authenticated_users(self):
5221         self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
5222
5223     def test_device_in_aa_asserted_identity(self):
5224         self._check_device_in_group(
5225             security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
5226
5227     def test_device_in_service_asserted_identity(self):
5228         self._check_device_not_in_group(security.SID_SERVICE_ASSERTED_IDENTITY)
5229
5230     def test_device_in_compounded_authentication(self):
5231         self._check_device_not_in_group(security.SID_COMPOUNDED_AUTHENTICATION)
5232
5233     def test_device_in_claims_valid(self):
5234         self._check_device_in_group(security.SID_CLAIMS_VALID)
5235
5236     def _check_device_in_group(self, group):
5237         self._check_device_membership(group, expect_in_group=True)
5238
5239     def _check_device_not_in_group(self, group):
5240         self._check_device_membership(group, expect_in_group=False)
5241
5242     def _check_device_membership(self, group, *, expect_in_group):
5243         """Test that authentication succeeds or fails when the device is
5244         required to belong to a certain group.
5245         """
5246
5247         # Create a machine account with which to perform FAST.
5248         mach_creds = self.get_cached_creds(
5249             account_type=self.AccountType.COMPUTER,
5250             opts={'id': 'device'})
5251         mach_tgt = self.get_tgt(mach_creds)
5252
5253         # Create an authentication policy that requires the device to belong to
5254         # a certain group.
5255         in_group_sddl = self.allow_if(f'Device_Member_of {{SID({group})}}')
5256         in_group_policy = self.create_authn_policy(
5257             enforced=True, computer_allowed_to=in_group_sddl)
5258
5259         # Create a user account.
5260         client_creds = self._get_creds(account_type=self.AccountType.USER)
5261         client_tgt = self.get_tgt(client_creds)
5262
5263         # Create a target account with the assigned policy.
5264         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5265                                        assigned_policy=in_group_policy)
5266
5267         tgs_success_args = {}
5268         tgs_failure_args = {
5269             'expect_edata': self.expect_padata_outer,
5270             # We aren’t particular about whether or not we get an NTSTATUS.
5271             'expect_status': None,
5272             'expected_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
5273         }
5274
5275         # Test whether authorization succeeds or fails.
5276         self._tgs_req(client_tgt,
5277                       0 if expect_in_group else KDC_ERR_POLICY,
5278                       client_creds,
5279                       target_creds,
5280                       armor_tgt=mach_tgt,
5281                       **(tgs_success_args if expect_in_group
5282                       else tgs_failure_args))
5283
5284         policy_success_args = {}
5285         policy_failure_args = {
5286             'status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
5287             'event': AuditEvent.KERBEROS_SERVER_RESTRICTION,
5288             'reason': AuditReason.ACCESS_DENIED,
5289         }
5290
5291         self.check_tgs_log(client_creds, target_creds,
5292                            policy=in_group_policy,
5293                            **(policy_success_args if expect_in_group
5294                            else policy_failure_args))
5295
5296         # Create an authentication policy that requires the device not to belong
5297         # to the group.
5298         not_in_group_sddl = self.allow_if(
5299             f'Not_Device_Member_of {{SID({group})}}')
5300         not_in_group_policy = self.create_authn_policy(
5301             enforced=True, computer_allowed_to=not_in_group_sddl)
5302
5303         # Create a target account with the assigned policy.
5304         target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5305                                        assigned_policy=not_in_group_policy)
5306
5307         # Test whether authorization succeeds or fails.
5308         self._tgs_req(client_tgt,
5309                       KDC_ERR_POLICY if expect_in_group else 0,
5310                       client_creds,
5311                       target_creds,
5312                       armor_tgt=mach_tgt,
5313                       **(tgs_failure_args if expect_in_group
5314                       else tgs_success_args))
5315
5316         self.check_tgs_log(client_creds, target_creds,
5317                            policy=not_in_group_policy,
5318                            **(policy_failure_args if expect_in_group
5319                               else policy_success_args))
5320
5321     def test_simple_as_req_client_policy_only(self):
5322         # Create a machine account with which to perform FAST.
5323         mach_creds = self.get_cached_creds(
5324             account_type=self.AccountType.COMPUTER)
5325         mach_tgt = self.get_tgt(mach_creds)
5326
5327         # Create an authentication policy that explicitly allows the machine
5328         # account for a user.
5329         client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of SID({mach_creds.get_sid()})))'
5330         client_policy = self.create_authn_policy(enforced=True,
5331                                                  user_allowed_from=client_policy_sddl)
5332
5333         # Create a user account with the assigned policy.
5334         client_creds = self._get_creds(account_type=self.AccountType.USER,
5335                                        assigned_policy=client_policy)
5336
5337         expected_groups = {
5338             (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5339             (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5340             (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
5341             (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
5342         }
5343
5344         # Show that obtaining a service ticket with an AS‐REQ is allowed.
5345         self._armored_as_req(client_creds,
5346                           self.get_krbtgt_creds(),
5347                           mach_tgt,
5348                           expected_groups=expected_groups)
5349
5350         self.check_as_log(client_creds,
5351                           armor_creds=mach_creds,
5352                           client_policy=client_policy)
5353
5354
5355 if __name__ == '__main__':
5356     global_asn1_print = False
5357     global_hexdump = False
5358     import unittest
5359     unittest.main()