2 # Unix SMB/CIFS implementation.
3 # Copyright (C) Stefan Metzmacher 2020
4 # Copyright (C) Catalyst.Net Ltd 2023
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, 'bin/python')
24 os.environ['PYTHONUNBUFFERED'] = '1'
26 from collections import OrderedDict
27 from functools import partial
29 from string import Formatter
33 from samba import dsdb, ntstatus
34 from samba.dcerpc import claims, krb5pac, security
35 from samba.ndr import ndr_pack, ndr_unpack
36 from samba.sd_utils import escaped_claim_id
38 from samba.tests import DynamicTestCase, env_get_var_value
39 from samba.tests.krb5.authn_policy_tests import (
44 from samba.tests.krb5.raw_testcase import RawKerberosTest
45 from samba.tests.krb5.rfc4120_constants import (
51 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
53 SidType = RawKerberosTest.SidType
55 global_asn1_print = False
56 global_hexdump = False
59 # When used as a test outcome, indicates that the test can cause a Windows
60 # server to crash, and is to be run with caution.
61 CRASHES_WINDOWS = object()
64 class ConditionalAceBaseTests(AuthnPolicyBaseTests):
65 # Constants for group SID attributes.
66 default_attrs = security.SE_GROUP_DEFAULT_FLAGS
67 resource_attrs = default_attrs | security.SE_GROUP_RESOURCE
69 aa_asserted_identity = (
70 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
71 service_asserted_identity = security.SID_SERVICE_ASSERTED_IDENTITY
81 self.do_asn1_print = global_asn1_print
82 self.do_hexdump = global_hexdump
85 samdb = self.get_samdb()
88 # Create a machine account with which to perform FAST.
89 cls._mach_creds = self.get_cached_creds(
90 account_type=self.AccountType.COMPUTER)
92 # Create some new groups.
94 group0_name = self.get_new_username()
95 group0_dn = self.create_group(samdb, group0_name)
96 cls._group0_sid = self.get_objectSid(samdb, group0_dn)
98 group1_name = self.get_new_username()
99 group1_dn = self.create_group(samdb, group1_name)
100 cls._group1_sid = self.get_objectSid(samdb, group1_dn)
102 # Create machine accounts with which to perform FAST that belong to
103 # various arrangements of the groups.
105 cls._member_of_both_creds = self.get_cached_creds(
106 account_type=self.AccountType.COMPUTER,
107 opts={'member_of': (group0_dn, group1_dn)})
109 cls._member_of_one_creds = self.get_cached_creds(
110 account_type=self.AccountType.COMPUTER,
111 opts={'member_of': (group1_dn,)})
113 # Create some authentication silos.
114 cls._unenforced_silo = self.create_authn_silo(enforced=False)
115 cls._enforced_silo = self.create_authn_silo(enforced=True)
117 # Create machine accounts with which to perform FAST that belong to
118 # the respective silos.
120 cls._member_of_unenforced_silo = self._get_creds(
121 account_type=self.AccountType.COMPUTER,
122 assigned_silo=self._unenforced_silo,
124 self.add_to_group(str(self._member_of_unenforced_silo.get_dn()),
125 self._unenforced_silo.dn,
126 'msDS-AuthNPolicySiloMembers',
129 cls._member_of_enforced_silo = self._get_creds(
130 account_type=self.AccountType.COMPUTER,
131 assigned_silo=self._enforced_silo,
133 self.add_to_group(str(self._member_of_enforced_silo.get_dn()),
134 self._enforced_silo.dn,
135 'msDS-AuthNPolicySiloMembers',
138 # Create a couple of multi‐valued string claims for testing claim
141 cls.claim0_attr = 'carLicense'
142 cls.claim0_id = self.get_new_username()
143 self.create_claim(cls.claim0_id,
145 attribute=cls.claim0_attr,
148 for_classes=['computer', 'user'],
149 value_type=claims.CLAIM_TYPE_STRING)
151 cls.claim1_attr = 'departmentNumber'
152 cls.claim1_id = self.get_new_username()
153 self.create_claim(cls.claim1_id,
155 attribute=cls.claim1_attr,
158 for_classes=['computer', 'user'],
159 value_type=claims.CLAIM_TYPE_STRING)
163 # For debugging purposes. Prints out the SDDL representation of
164 # authentication policy conditions set by the Windows GUI.
165 def _print_authn_policy_sddl(self, policy_id):
166 policy_dn = self.get_authn_policies_dn()
167 policy_dn.add_child(f'CN={policy_id}')
170 'msDS-ComputerAllowedToAuthenticateTo',
171 'msDS-ServiceAllowedToAuthenticateFrom',
172 'msDS-ServiceAllowedToAuthenticateTo',
173 'msDS-UserAllowedToAuthenticateFrom',
174 'msDS-UserAllowedToAuthenticateTo',
177 samdb = self.get_samdb()
178 res = samdb.search(policy_dn, scope=ldb.SCOPE_BASE, attrs=attrs)
179 self.assertEqual(1, len(res),
180 f'Authentication policy {policy_id} not found')
183 def print_sddl(attr):
184 sd = result.get(attr, idx=0)
188 sec_desc = ndr_unpack(security.descriptor, sd)
189 print(f'{attr}: {sec_desc.as_sddl()}')
194 def sddl_array_from_sids(self, sids):
195 def sddl_from_sid_entry(sid_entry):
196 sid, _, _ = sid_entry
199 return f"{{{', '.join(map(sddl_from_sid_entry, sids))}}}"
201 def allow_if(self, condition):
202 return f'O:SYD:(XA;;CR;;;WD;({condition}))'
206 class ConditionalAceTests(ConditionalAceBaseTests):
208 def setUpDynamicTestCases(cls):
209 FILTER = env_get_var_value('FILTER', allow_missing=True)
211 # These operators are arranged so that each operator precedes its own
213 op_names = OrderedDict([
214 ('!=', 'does not equal'),
217 ('<=', 'is less than or equals'),
218 ('<', 'is less than'),
220 ('>=', 'exceeds or equals'),
222 ('Not_Any_of', 'matches none of'),
223 ('Any_of', 'matches any of'),
224 ('Not_Contains', 'does not contain'),
225 ('Contains', 'contains'),
226 ('Not_Member_of_Any', 'the user belongs to none of'),
227 ('Not_Device_Member_of_Any', 'the device belongs to none of'), # TODO: no test for this yet
228 ('Device_Member_of_Any', 'the device belongs to any of'), # TODO: no test for this yet
229 ('Not_Device_Member_of', 'the device does not belong to'), # TODO: no test for this yet
230 ('Device_Member_of', 'the device belongs to'),
231 ('Not_Exists', 'there does not exist'),
232 ('Exists', 'there exists'),
233 ('Member_of_Any', 'the user belongs to any of'),
234 ('Not_Member_of', 'the user does not belong to'),
235 ('Member_of', 'the user belongs to'),
239 # This is a safety measure to ensure correct ordering of op_names
240 keys = list(op_names.keys())
241 for i in range(len(keys)):
242 for j in range(i + 1, len(keys)):
243 if keys[i] in keys[j]:
244 raise AssertionError((keys[i], keys[j]))
246 for case in cls.pac_claim_cases:
248 pac_claims, expression, outcome = case
251 pac_claims, expression, claim_map, outcome = case
253 raise AssertionError(
254 f'found {len(case)} items in case, expected 3–4')
256 expression_name = expression
257 for op, op_name in op_names.items():
258 expression_name = expression_name.replace(op, op_name)
260 name = f'{pac_claims}_{expression_name}'
262 if claim_map is not None:
263 name += f'_{claim_map}'
265 name = re.sub(r'\W+', '_', name)
267 name = f'{name[:125]}+{len(name) - 125}‐more'
269 if FILTER and not re.search(FILTER, name):
272 cls.generate_dynamic_test('test_pac_claim_cmp', name,
273 pac_claims, expression, claim_map,
276 for case in cls.claim_against_claim_cases:
277 lhs, op, rhs, outcome = case
278 op_name = op_names[op]
280 name = f'{lhs}_{op_name}_{rhs}'
282 name = re.sub(r'\W+', '_', name)
283 if FILTER and not re.search(FILTER, name):
286 cls.generate_dynamic_test('test_cmp', name,
287 lhs, op, rhs, outcome)
289 for case in cls.claim_against_literal_cases:
290 lhs, op, rhs, outcome = case
291 op_name = op_names[op]
293 name = f'{lhs}_{op_name}_literal_{rhs}'
295 name = re.sub(r'\W+', '_', name)
296 if FILTER and not re.search(FILTER, name):
299 cls.generate_dynamic_test('test_cmp', name,
300 lhs, op, rhs, outcome, True)
302 def test_allowed_from_member_of_each(self):
303 # Create an authentication policy that allows accounts belonging to
305 policy = self.create_authn_policy(
308 f'O:SYD:(XA;;CR;;;WD;(Member_of '
309 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
312 # Create a user account with the assigned policy.
313 client_creds = self._get_creds(account_type=self.AccountType.USER,
314 assigned_policy=policy)
316 # Show that we get a policy error if the machine account does not
317 # belong to both groups.
318 armor_tgt = self.get_tgt(self._member_of_one_creds)
319 self._get_tgt(client_creds, armor_tgt=armor_tgt,
320 expected_error=KDC_ERR_POLICY)
322 # Otherwise, authentication should succeed.
323 armor_tgt = self.get_tgt(self._member_of_both_creds)
324 self._get_tgt(client_creds, armor_tgt=armor_tgt,
327 def test_allowed_from_member_of_any(self):
328 # Create an authentication policy that allows accounts belonging to
330 policy = self.create_authn_policy(
333 f'O:SYD:(XA;;CR;;;WD;(Member_of_Any '
334 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
337 # Create a user account with the assigned policy.
338 client_creds = self._get_creds(account_type=self.AccountType.USER,
339 assigned_policy=policy)
341 # Show that we get a policy error if the machine account belongs to
343 armor_tgt = self.get_tgt(self._mach_creds)
344 self._get_tgt(client_creds, armor_tgt=armor_tgt,
345 expected_error=KDC_ERR_POLICY)
347 # Otherwise, authentication should succeed.
348 armor_tgt = self.get_tgt(self._member_of_one_creds)
349 self._get_tgt(client_creds, armor_tgt=armor_tgt,
352 def test_allowed_from_not_member_of_each(self):
353 # Create an authentication policy that allows accounts not belonging to
355 policy = self.create_authn_policy(
358 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of '
359 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
362 # Create a user account with the assigned policy.
363 client_creds = self._get_creds(account_type=self.AccountType.USER,
364 assigned_policy=policy)
366 # Show that we get a policy error if the machine account belongs to
368 armor_tgt = self.get_tgt(self._member_of_both_creds)
369 self._get_tgt(client_creds, armor_tgt=armor_tgt,
370 expected_error=KDC_ERR_POLICY)
372 # Otherwise, authentication should succeed.
373 armor_tgt = self.get_tgt(self._member_of_one_creds)
374 self._get_tgt(client_creds, armor_tgt=armor_tgt,
377 def test_allowed_from_not_member_of_any(self):
378 # Create an authentication policy that allows accounts belonging to
380 policy = self.create_authn_policy(
383 f'O:SYD:(XA;;CR;;;WD;(Not_Member_of_Any '
384 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'),
387 # Create a user account with the assigned policy.
388 client_creds = self._get_creds(account_type=self.AccountType.USER,
389 assigned_policy=policy)
391 # Show that we get a policy error if the machine account belongs to one
393 armor_tgt = self.get_tgt(self._member_of_one_creds)
394 self._get_tgt(client_creds, armor_tgt=armor_tgt,
395 expected_error=KDC_ERR_POLICY)
397 # Otherwise, authentication should succeed.
398 armor_tgt = self.get_tgt(self._mach_creds)
399 self._get_tgt(client_creds, armor_tgt=armor_tgt,
402 def test_allowed_from_member_of_each_deny(self):
403 # Create an authentication policy that denies accounts belonging to
404 # both groups, and allows other accounts.
405 policy = self.create_authn_policy(
408 f'O:SYD:(XD;;CR;;;WD;(Member_of '
409 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
413 # Create a user account with the assigned policy.
414 client_creds = self._get_creds(account_type=self.AccountType.USER,
415 assigned_policy=policy)
417 # Show that we get a policy error if the machine account belongs to
419 armor_tgt = self.get_tgt(self._member_of_both_creds)
420 self._get_tgt(client_creds, armor_tgt=armor_tgt,
421 expected_error=KDC_ERR_POLICY)
423 # Otherwise, authentication should succeed.
424 armor_tgt = self.get_tgt(self._member_of_one_creds)
425 self._get_tgt(client_creds, armor_tgt=armor_tgt,
428 def test_allowed_from_member_of_any_deny(self):
429 # Create an authentication policy that denies accounts belonging to
430 # either group, and allows other accounts.
431 policy = self.create_authn_policy(
434 f'O:SYD:(XD;;CR;;;WD;(Member_of_Any '
435 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
439 # Create a user account with the assigned policy.
440 client_creds = self._get_creds(account_type=self.AccountType.USER,
441 assigned_policy=policy)
443 # Show that we get a policy error if the machine account belongs to
445 armor_tgt = self.get_tgt(self._member_of_one_creds)
446 self._get_tgt(client_creds, armor_tgt=armor_tgt,
447 expected_error=KDC_ERR_POLICY)
449 # Otherwise, authentication should succeed.
450 armor_tgt = self.get_tgt(self._mach_creds)
451 self._get_tgt(client_creds, armor_tgt=armor_tgt,
454 def test_allowed_from_not_member_of_each_deny(self):
455 # Create an authentication policy that denies accounts not belonging to
456 # both groups, and allows other accounts.
457 policy = self.create_authn_policy(
460 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of '
461 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
465 # Create a user account with the assigned policy.
466 client_creds = self._get_creds(account_type=self.AccountType.USER,
467 assigned_policy=policy)
469 # Show that we get a policy error if the machine account doesn’t belong
471 armor_tgt = self.get_tgt(self._member_of_one_creds)
472 self._get_tgt(client_creds, armor_tgt=armor_tgt,
473 expected_error=KDC_ERR_POLICY)
475 # Otherwise, authentication should succeed.
476 armor_tgt = self.get_tgt(self._member_of_both_creds)
477 self._get_tgt(client_creds, armor_tgt=armor_tgt,
480 def test_allowed_from_not_member_of_any_deny(self):
481 # Create an authentication policy that denies accounts belonging to
482 # neither group, and allows other accounts.
483 policy = self.create_authn_policy(
486 f'O:SYD:(XD;;CR;;;WD;(Not_Member_of_Any '
487 f'{{SID({self._group0_sid}), SID({self._group1_sid})}}))'
491 # Create a user account with the assigned policy.
492 client_creds = self._get_creds(account_type=self.AccountType.USER,
493 assigned_policy=policy)
495 # Show that we get a policy error if the machine account belongs to
497 armor_tgt = self.get_tgt(self._mach_creds)
498 self._get_tgt(client_creds, armor_tgt=armor_tgt,
499 expected_error=KDC_ERR_POLICY)
501 # Otherwise, authentication should succeed.
502 armor_tgt = self.get_tgt(self._member_of_one_creds)
503 self._get_tgt(client_creds, armor_tgt=armor_tgt,
506 def test_allowed_from_unenforced_silo_equals(self):
507 # Create an authentication policy that allows accounts belonging to the
509 policy = self.create_authn_policy(
512 f'O:SYD:(XA;;CR;;;WD;'
513 f'(@User.ad://ext/AuthenticationSilo == '
514 f'"{self._unenforced_silo}"))'),
517 # Create a user account with the assigned policy.
518 client_creds = self._get_creds(account_type=self.AccountType.USER,
519 assigned_policy=policy)
521 # As the policy is unenforced, the ‘ad://ext/AuthenticationSilo’ claim
522 # will not be present in the TGT, and the ACE will never allow access.
524 armor_tgt = self.get_tgt(self._mach_creds)
525 self._get_tgt(client_creds, armor_tgt=armor_tgt,
526 expected_error=KDC_ERR_POLICY)
528 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
529 self._get_tgt(client_creds, armor_tgt=armor_tgt,
530 expected_error=KDC_ERR_POLICY)
532 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
533 self._get_tgt(client_creds, armor_tgt=armor_tgt,
534 expected_error=KDC_ERR_POLICY)
536 def test_allowed_from_enforced_silo_equals(self):
537 # Create an authentication policy that allows accounts belonging to the
539 policy = self.create_authn_policy(
542 f'O:SYD:(XA;;CR;;;WD;'
543 f'(@User.ad://ext/AuthenticationSilo == '
544 f'"{self._enforced_silo}"))'),
547 # Create a user account with the assigned policy.
548 client_creds = self._get_creds(account_type=self.AccountType.USER,
549 assigned_policy=policy)
551 # Show that we get a policy error if the machine account does not
552 # belong to the silo.
553 armor_tgt = self.get_tgt(self._mach_creds)
554 self._get_tgt(client_creds, armor_tgt=armor_tgt,
555 expected_error=KDC_ERR_POLICY)
557 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
558 self._get_tgt(client_creds, armor_tgt=armor_tgt,
559 expected_error=KDC_ERR_POLICY)
561 # Otherwise, authentication should succeed.
562 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
563 self._get_tgt(client_creds, armor_tgt=armor_tgt,
566 def test_allowed_from_unenforced_silo_not_equals(self):
567 # Create an authentication policy that allows accounts not belonging to
568 # the unenforced silo.
569 policy = self.create_authn_policy(
572 f'O:SYD:(XA;;CR;;;WD;'
573 f'(@User.ad://ext/AuthenticationSilo != '
574 f'"{self._unenforced_silo}"))'),
577 # Create a user account with the assigned policy.
578 client_creds = self._get_creds(account_type=self.AccountType.USER,
579 assigned_policy=policy)
581 # Show that authentication fails unless the account belongs to a silo
582 # other than the unenforced silo.
584 armor_tgt = self.get_tgt(self._mach_creds)
585 self._get_tgt(client_creds, armor_tgt=armor_tgt,
586 expected_error=KDC_ERR_POLICY)
588 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
589 self._get_tgt(client_creds, armor_tgt=armor_tgt,
590 expected_error=KDC_ERR_POLICY)
592 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
593 self._get_tgt(client_creds, armor_tgt=armor_tgt,
596 def test_allowed_from_enforced_silo_not_equals(self):
597 # Create an authentication policy that allows accounts not belonging to
599 policy = self.create_authn_policy(
602 f'O:SYD:(XA;;CR;;;WD;'
603 f'(@User.ad://ext/AuthenticationSilo != '
604 f'"{self._enforced_silo}"))'),
607 # Create a user account with the assigned policy.
608 client_creds = self._get_creds(account_type=self.AccountType.USER,
609 assigned_policy=policy)
611 # Show that authentication always fails, as none of the machine
612 # accounts belong to a silo that is not the enforced one. (The
613 # unenforced silo doesn’t count, as it will never appear in a claim.)
615 armor_tgt = self.get_tgt(self._mach_creds)
616 self._get_tgt(client_creds, armor_tgt=armor_tgt,
617 expected_error=KDC_ERR_POLICY)
619 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
620 self._get_tgt(client_creds, armor_tgt=armor_tgt,
621 expected_error=KDC_ERR_POLICY)
623 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
624 self._get_tgt(client_creds, armor_tgt=armor_tgt,
625 expected_error=KDC_ERR_POLICY)
627 def test_allowed_from_unenforced_silo_equals_deny(self):
628 # Create an authentication policy that denies accounts belonging to the
629 # unenforced silo, and allows other accounts.
630 policy = self.create_authn_policy(
633 f'O:SYD:(XD;;CR;;;WD;'
634 f'(@User.ad://ext/AuthenticationSilo == '
635 f'"{self._unenforced_silo}"))'
639 # Create a user account with the assigned policy.
640 client_creds = self._get_creds(account_type=self.AccountType.USER,
641 assigned_policy=policy)
643 # Show that authentication fails unless the account belongs to a silo
644 # other than the unenforced silo.
646 armor_tgt = self.get_tgt(self._mach_creds)
647 self._get_tgt(client_creds, armor_tgt=armor_tgt,
648 expected_error=KDC_ERR_POLICY)
650 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
651 self._get_tgt(client_creds, armor_tgt=armor_tgt,
652 expected_error=KDC_ERR_POLICY)
654 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
655 self._get_tgt(client_creds, armor_tgt=armor_tgt,
658 def test_allowed_from_enforced_silo_equals_deny(self):
659 # Create an authentication policy that denies accounts belonging to the
660 # enforced silo, and allows other accounts.
661 policy = self.create_authn_policy(
664 f'O:SYD:(XD;;CR;;;WD;'
665 f'(@User.ad://ext/AuthenticationSilo == '
666 f'"{self._enforced_silo}"))'
670 # Create a user account with the assigned policy.
671 client_creds = self._get_creds(account_type=self.AccountType.USER,
672 assigned_policy=policy)
674 # Show that authentication always fails, as none of the machine
675 # accounts belong to a silo that is not the enforced one. (The
676 # unenforced silo doesn’t count, as it will never appear in a claim.)
678 armor_tgt = self.get_tgt(self._mach_creds)
679 self._get_tgt(client_creds, armor_tgt=armor_tgt,
680 expected_error=KDC_ERR_POLICY)
682 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
683 self._get_tgt(client_creds, armor_tgt=armor_tgt,
684 expected_error=KDC_ERR_POLICY)
686 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
687 self._get_tgt(client_creds, armor_tgt=armor_tgt,
688 expected_error=KDC_ERR_POLICY)
690 def test_allowed_from_unenforced_silo_not_equals_deny(self):
691 # Create an authentication policy that denies accounts not belonging to
692 # the unenforced silo, and allows other accounts.
693 policy = self.create_authn_policy(
696 f'O:SYD:(XD;;CR;;;WD;'
697 f'(@User.ad://ext/AuthenticationSilo != '
698 f'"{self._unenforced_silo}"))'
702 # Create a user account with the assigned policy.
703 client_creds = self._get_creds(account_type=self.AccountType.USER,
704 assigned_policy=policy)
706 # Show that authentication always fails, as the unenforced silo will
707 # never appear in a claim.
709 armor_tgt = self.get_tgt(self._mach_creds)
710 self._get_tgt(client_creds, armor_tgt=armor_tgt,
711 expected_error=KDC_ERR_POLICY)
713 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
714 self._get_tgt(client_creds, armor_tgt=armor_tgt,
715 expected_error=KDC_ERR_POLICY)
717 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
718 self._get_tgt(client_creds, armor_tgt=armor_tgt,
719 expected_error=KDC_ERR_POLICY)
721 def test_allowed_from_enforced_silo_not_equals_deny(self):
722 # Create an authentication policy that denies accounts not belonging to
723 # the enforced silo, and allows other accounts.
724 policy = self.create_authn_policy(
727 f'O:SYD:(XD;;CR;;;WD;'
728 f'(@User.ad://ext/AuthenticationSilo != '
729 f'"{self._enforced_silo}"))'
733 # Create a user account with the assigned policy.
734 client_creds = self._get_creds(account_type=self.AccountType.USER,
735 assigned_policy=policy)
737 # Show that authentication fails unless the account belongs to the
740 armor_tgt = self.get_tgt(self._mach_creds)
741 self._get_tgt(client_creds, armor_tgt=armor_tgt,
742 expected_error=KDC_ERR_POLICY)
744 armor_tgt = self.get_tgt(self._member_of_unenforced_silo)
745 self._get_tgt(client_creds, armor_tgt=armor_tgt,
746 expected_error=KDC_ERR_POLICY)
748 armor_tgt = self.get_tgt(self._member_of_enforced_silo)
749 self._get_tgt(client_creds, armor_tgt=armor_tgt,
752 def test_allowed_from_claim_equals_claim(self):
753 # Create a couple of claim types.
755 claim0_id = self.get_new_username()
756 self.create_claim(claim0_id,
758 attribute='carLicense',
761 for_classes=['computer'],
762 value_type=claims.CLAIM_TYPE_STRING)
764 claim1_id = self.get_new_username()
765 self.create_claim(claim1_id,
770 for_classes=['computer'],
771 value_type=claims.CLAIM_TYPE_STRING)
773 # Create an authentication policy that allows accounts having the two
775 policy = self.create_authn_policy(
778 f'O:SYD:(XA;;CR;;;WD;'
779 f'(@User.{claim0_id} == @User.{claim1_id}))'),
782 # Create a user account with the assigned policy.
783 client_creds = self._get_creds(account_type=self.AccountType.USER,
784 assigned_policy=policy)
786 armor_tgt = self.get_tgt(self._mach_creds)
787 self._get_tgt(client_creds, armor_tgt=armor_tgt,
788 expected_error=KDC_ERR_POLICY)
790 mach_creds = self.get_cached_creds(
791 account_type=self.AccountType.COMPUTER,
793 'additional_details': (
794 ('carLicense', 'foo'),
798 armor_tgt = self.get_tgt(
800 expect_client_claims=True,
801 expected_client_claims={
803 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
804 'type': claims.CLAIM_TYPE_STRING,
808 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
809 'type': claims.CLAIM_TYPE_STRING,
813 self._get_tgt(client_creds, armor_tgt=armor_tgt,
816 def test_allowed_to_client_equals(self):
817 client_claim_attr = 'carLicense'
818 client_claim_value = 'foo bar'
819 client_claim_values = client_claim_value,
821 client_claim_id = self.get_new_username()
822 self.create_claim(client_claim_id,
824 attribute=client_claim_attr,
827 for_classes=['user'],
828 value_type=claims.CLAIM_TYPE_STRING)
830 # Create an authentication policy that allows authorization if the
831 # client has a particular claim value.
832 policy = self.create_authn_policy(
834 computer_allowed_to=(
835 f'O:SYD:(XA;;CR;;;WD;'
836 f'((@User.{client_claim_id} == "{client_claim_value}")))'),
839 # Create a computer account with the assigned policy.
840 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
841 assigned_policy=policy)
843 armor_tgt = self.get_tgt(self._mach_creds)
845 # Create a user account without the claim value.
846 client_creds = self.get_cached_creds(
847 account_type=self.AccountType.USER)
848 tgt = self.get_tgt(client_creds)
849 # Show that obtaining a service ticket is denied.
851 tgt, KDC_ERR_POLICY, client_creds, target_creds,
853 expect_edata=self.expect_padata_outer,
854 # We aren’t particular about whether or not we get an NTSTATUS.
856 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
859 # Create a user account with the claim value.
860 client_creds = self.get_cached_creds(
861 account_type=self.AccountType.USER,
863 'additional_details': (
864 (client_claim_attr, client_claim_values),
869 expect_client_claims=True,
870 expected_client_claims={
872 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
873 'type': claims.CLAIM_TYPE_STRING,
874 'values': client_claim_values,
877 # Show that obtaining a service ticket is allowed.
878 self._tgs_req(tgt, 0, client_creds, target_creds,
881 def test_allowed_to_device_equals(self):
882 device_claim_attr = 'carLicense'
883 device_claim_value = 'bar'
884 device_claim_values = device_claim_value,
886 device_claim_id = self.get_new_username()
887 self.create_claim(device_claim_id,
889 attribute=device_claim_attr,
892 for_classes=['computer'],
893 value_type=claims.CLAIM_TYPE_STRING)
895 # Create a user account.
896 client_creds = self.get_cached_creds(
897 account_type=self.AccountType.USER)
898 tgt = self.get_tgt(client_creds)
900 # Create an authentication policy that allows authorization if the
901 # device has a particular claim value.
902 policy = self.create_authn_policy(
904 computer_allowed_to=(
905 f'O:SYD:(XA;;CR;;;WD;'
906 f'(@Device.{device_claim_id} == "{device_claim_value}"))'),
909 # Create a computer account with the assigned policy.
910 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
911 assigned_policy=policy)
913 armor_tgt = self.get_tgt(self._mach_creds)
914 # Show that obtaining a service ticket is denied when the claim value
917 tgt, KDC_ERR_POLICY, client_creds, target_creds,
919 expect_edata=self.expect_padata_outer,
920 # We aren’t particular about whether or not we get an NTSTATUS.
922 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
925 mach_creds = self.get_cached_creds(
926 account_type=self.AccountType.COMPUTER,
928 'additional_details': (
929 (device_claim_attr, device_claim_values),
932 armor_tgt = self.get_tgt(
934 expect_client_claims=True,
935 expected_client_claims={
937 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
938 'type': claims.CLAIM_TYPE_STRING,
939 'values': device_claim_values,
942 # Show that obtaining a service ticket is allowed when the claim value
944 self._tgs_req(tgt, 0, client_creds, target_creds,
947 claim_against_claim_cases = [
948 # If either side is missing, the result is unknown.
949 ((), '==', (), None),
950 ((), '!=', (), None),
951 ('a', '==', (), None),
952 ((), '==', 'b', None),
953 # Straightforward equality and inequality checks work.
954 ('foo', '==', 'foo', True),
955 ('foo', '==', 'bar', False),
956 ('foo', '!=', 'foo', False),
957 ('foo', '!=', 'bar', True),
958 # We can perform less‐than and greater‐than operations.
959 ('cat', '<', 'dog', True),
960 ('cat', '<=', 'dog', True),
961 ('cat', '>', 'dog', False),
962 ('cat', '>=', 'dog', False),
963 ('foo', '<=', 'foo', True),
964 ('foo', '>=', 'foo', True),
965 ('foo', '<', 'foo bar', True),
966 ('foo bar', '>', 'foo', True),
967 # String comparison is case‐sensitive.
968 ('foo bar', '==', 'Foo BAR', True),
969 ('foo bar', '==', 'FOO BAR', True),
970 ('ćàț', '==', 'ĆÀȚ', True),
971 ('ḽ', '==', 'Ḽ', True),
972 ('ⅸ', '==', 'Ⅸ', True),
973 ('ꙭ', '==', 'Ꙭ', True),
974 ('ⱦ', '==', 'Ⱦ', True), # Lowercased variant added in Unicode 5.0.
975 ('ԛԣ', '==', 'ԚԢ', True), # All added in Unicode 5.1.
976 ('foo', '<', 'foo', True),
977 ('ćàș', '<', 'ĆÀȚ', True),
978 ('cat', '<', 'ćàț', True),
979 # This is done by converting to UPPER CASE. Hence, both ‘A’ (U+41) and
980 # ‘a’ (U+61) compare less than ‘_’ (U+5F).
981 ('A', '<', '_', True),
982 ('a', '<', '_', True),
983 # But not all uppercased/lowercased pairs are considered to be equal in
985 ('ß', '<', 'ẞ', True),
986 ('ß', '>', 'SS', True),
987 ('ⳬ', '>', 'Ⳬ', True), # Added in Unicode 5.2.
988 ('ʞ', '<', 'Ʞ', True), # Uppercased variant added in Unicode 6.0.
989 ('ʞ', '<', 'ʟ', True), # U+029E < U+029F < U+A7B0 (upper variant, Ʞ)
990 ('ꞧ', '>', 'Ꞧ', True), # Added in Unicode 6.0.
991 ('ɜ', '<', 'Ɜ', True), # Uppercased variant added in Unicode 7.0.
993 # Strings are compared as UTF‐16 code units, rather than as Unicode
994 # codepoints. So while you might expect ‘𐀀’ (U+10000) to compare
995 # greater than ‘豈’ (U+F900), it is actually considered to be the
996 # *smaller* of the pair. That is because it is encoded as a sequence of
997 # two code units, 0xd800 and 0xdc00, which combination compares less
998 # than the single code unit 0xf900.
999 ('ퟻ', '<', '𐀀', True),
1000 ('𐀀', '<', '豈', True),
1001 ('ퟻ', '<', '豈', True),
1002 # Composites can be compared.
1003 (('foo', 'bar'), '==', ('foo', 'bar'), True),
1004 (('foo', 'bar'), '==', ('foo', 'baz'), False),
1005 # The individual components don’t have to match in case.
1006 (('foo', 'bar'), '==', ('FOO', 'BAR'), True),
1007 # Nor must they match in order.
1008 (('foo', 'bar'), '==', ('bar', 'foo'), True),
1009 # Composites of different lengths compare unequal.
1010 (('foo', 'bar'), '!=', 'foo', True),
1011 (('foo', 'bar'), '!=', ('foo', 'bar', 'baz'), True),
1012 # But composites don’t have a defined ordering, and aren’t considered
1013 # greater or lesser than one another.
1014 (('foo', 'bar'), '<', ('foo', 'bar'), None),
1015 (('foo', 'bar'), '<=', ('foo', 'bar'), None),
1016 (('foo', 'bar'), '>', ('foo', 'bar', 'baz'), None),
1017 (('foo', 'bar'), '>=', ('foo', 'bar', 'baz'), None),
1018 # We can test for containment.
1019 (('foo', 'bar'), 'Contains', ('FOO'), True),
1020 (('foo', 'bar'), 'Contains', ('foo', 'bar'), True),
1021 (('foo', 'bar'), 'Not_Contains', ('foo', 'bar'), False),
1022 (('foo', 'bar'), 'Contains', ('foo', 'bar', 'baz'), False),
1023 (('foo', 'bar'), 'Not_Contains', ('foo', 'bar', 'baz'), True),
1024 # We can test whether the operands have any elements in common.
1025 ('foo', 'Any_of', 'foo', True),
1026 (('foo', 'bar'), 'Any_of', 'BAR', True),
1027 (('foo', 'bar'), 'Any_of', 'baz', False),
1028 (('foo', 'bar'), 'Not_Any_of', 'baz', True),
1029 (('foo', 'bar'), 'Any_of', ('bar', 'baz'), True),
1030 (('foo', 'bar'), 'Not_Any_of', ('bar', 'baz'), False),
1033 claim_against_literal_cases = [
1034 # String comparisons also work against literals.
1035 ('foo bar', '==', '"foo bar"', True),
1036 # Composites can be compared with literals.
1037 ((), '==', '{{}}', None),
1038 ('foo', '!=', '{{}}', True),
1039 ('bar', '==', '{{"bar"}}', True),
1040 (('apple', 'banana'), '==', '{{"APPLE", "BANANA"}}', True),
1041 (('apple', 'banana'), '==', '{{"BANANA", "APPLE"}}', True),
1042 (('apple', 'banana'), '==', '{{"apple", "banana", "apple"}}', False),
1043 # We can test for containment.
1044 ((), 'Contains', '{{}}', False),
1045 ((), 'Not_Contains', '{{}}', True),
1046 ((), 'Contains', '{{"foo"}}', None),
1047 ((), 'Not_Contains', '{{"foo", "bar"}}', None),
1048 ('foo', 'Contains', '{{}}', False),
1049 ('bar', 'Contains', '{{"bar"}}', True),
1050 (('foo', 'bar'), 'Contains', '{{"foo", "bar"}}', True),
1051 (('foo', 'bar'), 'Contains', '{{"foo", "bar", "baz"}}', False),
1052 # The right‐hand side of Contains or Not_Contains does not have to be a
1054 ('foo', 'Contains', '"foo"', True),
1055 (('foo', 'bar'), 'Not_Contains', '"foo"', False),
1056 # It’s fine if the right‐hand side contains duplicate elements.
1057 (('foo', 'bar'), 'Contains', '{{"foo", "bar", "bar"}}', True),
1058 # We can test whether the operands have any elements in common.
1059 ((), 'Any_of', '{{}}', None),
1060 ((), 'Not_Any_of', '{{}}', None),
1061 ('foo', 'Any_of', '{{}}', False),
1062 ('foo', 'Not_Any_of', '{{}}', True),
1063 ('bar', 'Any_of', '{{"bar"}}', True),
1064 (('foo', 'bar'), 'Any_of', '{{"bar", "baz"}}', True),
1065 (('foo', 'bar'), 'Any_of', '{{"baz"}}', False),
1066 # The right‐hand side of Any_of or Not_Any_of must be a composite.
1067 ('foo', 'Any_of', '"foo"', None),
1068 (('foo', 'bar'), 'Not_Any_of', '"baz"', None),
1069 # A string won’t compare equal to a numeric literal.
1070 ('42', '==', '"42"', True),
1071 ('42', '==', '42', None),
1072 # Nor can composites that mismatch in type be compared.
1073 (('123', '456'), '==', '{{"123", "456"}}', True),
1074 (('654', '321'), '==', '{{654, 321}}', None),
1075 (('foo', 'bar'), 'Contains', '{{1, 2, 3}}', None),
1078 def _test_cmp_with_args(self, lhs, op, rhs, outcome, rhs_is_literal=False):
1079 # Construct a conditional ACE expression that evaluates to True if the
1080 # two claim values are equal.
1082 self.assertIsInstance(rhs, str)
1083 rhs = rhs.format(self=self)
1084 expression = f'(@User.{self.claim0_id} {op} {rhs})'
1086 expression = f'(@User.{self.claim0_id} {op} @User.{self.claim1_id})'
1088 # Create an authentication policy that will allow authentication when
1089 # the expression is true, and a second that will deny authentication in
1090 # the same circumstance. By observing the results of authenticating
1091 # against each of these policies in turn, we can determine whether the
1092 # expression evaluates to a True, False, or Unknown value.
1094 allowed_sddl = f'O:SYD:(XA;;CR;;;WD;{expression})'
1095 denied_sddl = f'O:SYD:(XD;;CR;;;WD;{expression})(A;;CR;;;WD)'
1097 allowed_policy = self.create_authn_policy(
1099 user_allowed_from=allowed_sddl)
1100 denied_policy = self.create_authn_policy(
1102 user_allowed_from=denied_sddl)
1104 # Create a user account assigned to each policy.
1105 allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1106 assigned_policy=allowed_policy)
1107 denied_creds = self._get_creds(account_type=self.AccountType.USER,
1108 assigned_policy=denied_policy)
1110 additional_details = ()
1112 additional_details += ((self.claim0_attr, lhs),)
1113 if rhs and not rhs_is_literal:
1114 additional_details += ((self.claim1_attr, rhs),)
1116 # Create a computer account with the provided attribute values.
1117 mach_creds = self.get_cached_creds(
1118 account_type=self.AccountType.COMPUTER,
1119 opts={'additional_details': additional_details})
1121 def expected_values(val):
1122 if isinstance(val, (str, bytes)):
1127 expected_client_claims = {}
1129 expected_client_claims[self.claim0_id] = {
1130 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1131 'type': claims.CLAIM_TYPE_STRING,
1132 'values': expected_values(lhs),
1134 if rhs and not rhs_is_literal:
1135 expected_client_claims[self.claim1_id] = {
1136 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
1137 'type': claims.CLAIM_TYPE_STRING,
1138 'values': expected_values(rhs),
1141 # Fetch the computer account’s TGT, and ensure it contains the claims.
1142 armor_tgt = self.get_tgt(
1144 expect_client_claims=bool(expected_client_claims) or None,
1145 expected_client_claims=expected_client_claims)
1147 # The first or the second authentication request is expected to succeed
1148 # if the outcome is True or False, respectively. An Unknown outcome,
1149 # represented by None, will result in a policy error in either case.
1150 allowed_error = 0 if outcome is True else KDC_ERR_POLICY
1151 denied_error = 0 if outcome is False else KDC_ERR_POLICY
1153 # Attempt to authenticate and ensure that we observe the expected
1155 self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1156 expected_error=allowed_error)
1157 self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1158 expected_error=denied_error)
1161 # Test a very simple expression with various claims.
1163 (claims.CLAIMS_SOURCE_TYPE_AD, [
1164 ('{non_empty_string}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1166 ], '{non_empty_string}', True),
1168 (claims.CLAIMS_SOURCE_TYPE_AD, [
1169 ('{zero_uint}', claims.CLAIM_TYPE_UINT64, [0]),
1171 ], '{zero_uint}', False),
1173 (claims.CLAIMS_SOURCE_TYPE_AD, [
1174 ('{nonzero_uint}', claims.CLAIM_TYPE_UINT64, [1]),
1176 ], '{nonzero_uint}', True),
1178 (claims.CLAIMS_SOURCE_TYPE_AD, [
1179 ('{zero_uints}', claims.CLAIM_TYPE_UINT64, [0, 0]),
1181 ], '{zero_uints}', KDC_ERR_GENERIC),
1183 (claims.CLAIMS_SOURCE_TYPE_AD, [
1184 ('{zero_and_one_uint}', claims.CLAIM_TYPE_UINT64, [0, 1]),
1186 ], '{zero_and_one_uint}', True),
1188 (claims.CLAIMS_SOURCE_TYPE_AD, [
1189 ('{one_and_zero_uint}', claims.CLAIM_TYPE_UINT64, [1, 0]),
1191 ], '{one_and_zero_uint}', True),
1193 (claims.CLAIMS_SOURCE_TYPE_AD, [
1194 ('{zero_int}', claims.CLAIM_TYPE_INT64, [0]),
1196 ], '{zero_int}', False),
1198 (claims.CLAIMS_SOURCE_TYPE_AD, [
1199 ('{nonzero_int}', claims.CLAIM_TYPE_INT64, [1]),
1201 ], '{nonzero_int}', True),
1203 (claims.CLAIMS_SOURCE_TYPE_AD, [
1204 ('{zero_ints}', claims.CLAIM_TYPE_INT64, [0, 0]),
1206 ], '{zero_ints}', KDC_ERR_GENERIC),
1208 (claims.CLAIMS_SOURCE_TYPE_AD, [
1209 ('{zero_and_one_int}', claims.CLAIM_TYPE_INT64, [0, 1]),
1211 ], '{zero_and_one_int}', True),
1213 (claims.CLAIMS_SOURCE_TYPE_AD, [
1214 ('{one_and_zero_int}', claims.CLAIM_TYPE_INT64, [1, 0]),
1216 ], '{one_and_zero_int}', True),
1218 (claims.CLAIMS_SOURCE_TYPE_AD, [
1219 ('{false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1221 ], '{false_boolean}', False),
1223 (claims.CLAIMS_SOURCE_TYPE_AD, [
1224 ('{true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1226 ], '{true_boolean}', True),
1228 (claims.CLAIMS_SOURCE_TYPE_AD, [
1229 ('{false_booleans}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1231 ], '{false_booleans}', KDC_ERR_GENERIC),
1233 (claims.CLAIMS_SOURCE_TYPE_AD, [
1234 ('{false_and_true_boolean}', claims.CLAIM_TYPE_BOOLEAN, [0, 1]),
1236 ], '{false_and_true_boolean}', True),
1238 (claims.CLAIMS_SOURCE_TYPE_AD, [
1239 ('{true_and_false_boolean}', claims.CLAIM_TYPE_BOOLEAN, [1, 0]),
1241 ], '{true_and_false_boolean}', True),
1242 # Test a basic comparison against a literal.
1244 (claims.CLAIMS_SOURCE_TYPE_AD, [
1245 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1247 ], '{a} == "foo bar"', True),
1248 # Claims can be compared against one another.
1250 (claims.CLAIMS_SOURCE_TYPE_AD, [
1251 ('{a}', claims.CLAIM_TYPE_STRING, ['foo bar']),
1252 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO BAR']),
1254 ], '{a} == {b}', True),
1256 (claims.CLAIMS_SOURCE_TYPE_AD, [
1257 ('{b}', claims.CLAIM_TYPE_STRING, ['FOO', 'BAR', 'BAZ']),
1258 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'bar', 'baz']),
1260 ], '{a} != {b}', False),
1261 # Certificate claims are also valid.
1263 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1264 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1266 ], '{a} == "foo"', True),
1267 # Other claim source types are ignored.
1270 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1272 ], '{a} == "foo"', None),
1275 ('{a}', claims.CLAIM_TYPE_STRING, ['foo']),
1277 ], '{a} == "foo"', None),
1278 # If multiple claims have the same ID, the *last* one takes precedence.
1280 (claims.CLAIMS_SOURCE_TYPE_AD, [
1281 ('{a}', claims.CLAIM_TYPE_STRING, ['this is not the value…']),
1282 ('{a}', claims.CLAIM_TYPE_STRING, ['…nor is this…']),
1284 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1285 ('{a}', claims.CLAIM_TYPE_STRING, ['…and this isn’t either.']),
1287 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1288 ('{a}', claims.CLAIM_TYPE_STRING, ['here’s the actual value!']),
1291 ('{a}', claims.CLAIM_TYPE_STRING, ['this is a red herring.']),
1293 ], '{a} == "here’s the actual value!"', True),
1294 # Claim values can be empty.
1296 (claims.CLAIMS_SOURCE_TYPE_AD, [
1297 ('{empty_claim_string}', claims.CLAIM_TYPE_STRING, []),
1299 ], '{empty_claim_string} != "foo bar"', None),
1301 (claims.CLAIMS_SOURCE_TYPE_AD, [
1302 ('{empty_claim_boolean}', claims.CLAIM_TYPE_BOOLEAN, []),
1304 ], 'Exists {empty_claim_boolean}', None),
1305 # Test unsigned integer equality.
1307 (claims.CLAIMS_SOURCE_TYPE_AD, [
1308 ('{a}', claims.CLAIM_TYPE_UINT64, [42]),
1310 ], '{a} == 42', True),
1312 (claims.CLAIMS_SOURCE_TYPE_AD, [
1313 ('{a}', claims.CLAIM_TYPE_UINT64, [0]),
1315 ], '{a} == 3', False),
1317 (claims.CLAIMS_SOURCE_TYPE_AD, [
1318 ('{a}', claims.CLAIM_TYPE_UINT64, [1, 2, 3]),
1320 ], '{a} == {{1, 2, 3}}', True),
1322 (claims.CLAIMS_SOURCE_TYPE_AD, [
1323 ('{a}', claims.CLAIM_TYPE_UINT64, [4, 5, 6]),
1325 ], '{a} != {{1, 2, 3}}', True),
1326 # Test unsigned integer comparison. Ensure we don’t run into any
1327 # integer overflow issues.
1329 (claims.CLAIMS_SOURCE_TYPE_AD, [
1330 ('{a}', claims.CLAIM_TYPE_UINT64, [1 << 32]),
1332 ], '{a} > 0', True),
1333 # Test signed integer comparisons.
1335 (claims.CLAIMS_SOURCE_TYPE_AD, [
1336 ('{a}', claims.CLAIM_TYPE_INT64, [42]),
1338 ], '{a} == 42', True),
1340 (claims.CLAIMS_SOURCE_TYPE_AD, [
1341 ('{a}', claims.CLAIM_TYPE_INT64, [42 << 32]),
1343 ], f'{{a}} == {42 << 32}', True),
1344 # Test boolean claims. Be careful! Windows will *crash* if you send it
1345 # claims that aren’t real booleans (not 0 or 1). I doubt Microsoft will
1346 # consider this a security issue though.
1348 (claims.CLAIMS_SOURCE_TYPE_AD, [
1349 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [2]),
1350 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [3]),
1352 ], '{a} == {b}', (None, CRASHES_WINDOWS)),
1354 (claims.CLAIMS_SOURCE_TYPE_AD, [
1355 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1356 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1358 ], '{a} == {b}', True),
1360 (claims.CLAIMS_SOURCE_TYPE_AD, [
1361 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1363 ], '{a} == 42', None),
1365 (claims.CLAIMS_SOURCE_TYPE_AD, [
1366 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1367 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1369 ], '{a} && {b}', True),
1371 (claims.CLAIMS_SOURCE_TYPE_AD, [
1372 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1373 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1375 ], '{a} && {b}', False),
1377 (claims.CLAIMS_SOURCE_TYPE_AD, [
1378 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1379 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1381 ], '{a} && {b}', False),
1383 (claims.CLAIMS_SOURCE_TYPE_AD, [
1384 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1385 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1387 ], '{a} || {b}', True),
1389 (claims.CLAIMS_SOURCE_TYPE_AD, [
1390 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1391 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1393 ], '{a} || {b}', True),
1395 (claims.CLAIMS_SOURCE_TYPE_AD, [
1396 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1397 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1399 ], '{a} || {b}', False),
1401 (claims.CLAIMS_SOURCE_TYPE_AD, [
1402 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1406 (claims.CLAIMS_SOURCE_TYPE_AD, [
1407 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1409 ], '!(!(!(!({a}))))', False),
1411 (claims.CLAIMS_SOURCE_TYPE_AD, [
1412 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1414 ], '!({a} && {a})', True),
1416 (claims.CLAIMS_SOURCE_TYPE_AD, [
1417 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1418 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1420 ], '{a} && !({b} || {b})', True),
1422 (claims.CLAIMS_SOURCE_TYPE_AD, [
1423 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1425 ], '!({a}) || !({a})', True),
1427 (claims.CLAIMS_SOURCE_TYPE_AD, [
1428 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1429 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [0]),
1431 ], '{a} && !({b})', None),
1432 # Expressions containing the ‘not’ operator are occasionally evaluated
1433 # inconsistently, as evidenced here. ‘a || !a’ evaluates to ‘unknown’…
1435 (claims.CLAIMS_SOURCE_TYPE_AD, [
1436 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1438 ], '{a} || !({a})', None),
1439 # …but ‘!a || a’ — the same expression, just with the operands switched
1440 # round — evaluates to ‘true’.
1442 (claims.CLAIMS_SOURCE_TYPE_AD, [
1443 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1445 ], '!({a}) || {a}', True),
1446 # This inconsistency is not observed with other boolean expressions,
1449 (claims.CLAIMS_SOURCE_TYPE_AD, [
1450 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1452 ], '{a} || ({a} || {a})', True),
1454 (claims.CLAIMS_SOURCE_TYPE_AD, [
1455 ('{b}', claims.CLAIM_TYPE_BOOLEAN, [1]),
1457 ], '({b} || {b}) || {b}', True),
1458 # Test a very large claim. Much larger than this, and
1459 # conditional_ace_encode_binary() will refuse to encode the conditions.
1461 (claims.CLAIMS_SOURCE_TYPE_AD, [
1462 ('{large_claim}', claims.CLAIM_TYPE_STRING, ['z' * 4900]),
1464 ], f'{{large_claim}} == "{"z" * 4900}"', True),
1465 # Test an even larger claim. Windows does not appear to like receiving
1466 # a claim this large.
1468 (claims.CLAIMS_SOURCE_TYPE_AD, [
1469 ('{larger_claim}', claims.CLAIM_TYPE_STRING, ['z' * 100000]),
1471 ], '{larger_claim} > "z"', (True, CRASHES_WINDOWS)),
1472 # Test a great number of claims. Windows does not appear to like
1473 # receiving this many claims.
1475 (claims.CLAIMS_SOURCE_TYPE_AD, [
1476 ('{many_claims}', claims.CLAIM_TYPE_UINT64,
1477 list(range(0, 100000))),
1479 ], '{many_claims} Any_of "99999"', (True, CRASHES_WINDOWS)),
1480 # Test a claim with a very long name. Much larger than this, and
1481 # conditional_ace_encode_binary() will refuse to encode the conditions.
1483 (claims.CLAIMS_SOURCE_TYPE_AD, [
1484 ('{long_name}', claims.CLAIM_TYPE_STRING, ['a']),
1486 ], '{long_name} == "a"', {'long_name': 'z' * 4900}, True),
1487 # Test attribute name escaping.
1489 (claims.CLAIMS_SOURCE_TYPE_AD, [
1490 ('{escaped_claim}', claims.CLAIM_TYPE_STRING, ['claim value']),
1492 ], '{escaped_claim} == "claim value"',
1493 {'escaped_claim': '(:foo:! /&/ :bar:!)'}, True),
1494 # Test a claim whose name consists entirely of dots.
1496 (claims.CLAIMS_SOURCE_TYPE_AD, [
1497 ('{dotty_claim}', claims.CLAIM_TYPE_STRING, ['a']),
1499 ], '{dotty_claim} == "a"', {'dotty_claim': '...'}, True),
1500 # Test a claim whose name consists of the first thousand non‐zero
1501 # Unicode codepoints.
1503 (claims.CLAIMS_SOURCE_TYPE_AD, [
1504 ('{1000_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1506 ], '{1000_unicode} == "a"',
1507 {'1000_unicode': ''.join(map(chr, range(1, 1001)))}, True),
1508 # Test a claim whose name consists of some higher Unicode codepoints,
1509 # including non‐BMP ones.
1511 (claims.CLAIMS_SOURCE_TYPE_AD, [
1512 ('{higher_unicode}', claims.CLAIM_TYPE_STRING, ['a']),
1514 ], '{higher_unicode} == "a"',
1515 {'higher_unicode': ''.join(map(chr, range(0xfe00, 0x10800)))}, True),
1516 # Duplicate claim values are not allowed…
1518 (claims.CLAIMS_SOURCE_TYPE_AD, [
1519 ('{a}', claims.CLAIM_TYPE_INT64, [42, 42, 42]),
1521 ], '{a} == {a}', KDC_ERR_GENERIC),
1523 (claims.CLAIMS_SOURCE_TYPE_AD, [
1524 ('{a}', claims.CLAIM_TYPE_UINT64, [42, 42]),
1526 ], '{a} == {a}', KDC_ERR_GENERIC),
1528 (claims.CLAIMS_SOURCE_TYPE_AD, [
1529 ('{a}', claims.CLAIM_TYPE_STRING, ['foo', 'foo']),
1531 ], '{a} == {a}', KDC_ERR_GENERIC),
1533 (claims.CLAIMS_SOURCE_TYPE_AD, [
1534 ('{a}', claims.CLAIM_TYPE_STRING, ['FOO', 'foo']),
1536 ], '{a} == {a}', KDC_ERR_GENERIC),
1538 (claims.CLAIMS_SOURCE_TYPE_AD, [
1539 ('{a}', claims.CLAIM_TYPE_BOOLEAN, [0, 0]),
1541 ], '{a} == {a}', KDC_ERR_GENERIC),
1542 # …but it’s OK if duplicate values are spread across multiple claim
1545 (claims.CLAIMS_SOURCE_TYPE_AD, [
1546 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1547 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1549 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1550 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1551 ('{dup}', claims.CLAIM_TYPE_UINT64, [42]),
1553 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
1554 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1555 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo']),
1556 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1557 ('{dup}', claims.CLAIM_TYPE_STRING, ['foo', 'bar']),
1559 ], '{dup} == {dup}', True),
1560 # Test invalid claim types. Be careful! Windows will *crash* if you
1561 # send it invalid claim types. I doubt Microsoft will consider this a
1562 # security issue though.
1564 (claims.CLAIMS_SOURCE_TYPE_AD, [
1565 ('{invalid_sid}', 5, []),
1567 ], '{invalid_sid} == {invalid_sid}', (None, CRASHES_WINDOWS)),
1569 (claims.CLAIMS_SOURCE_TYPE_AD, [
1570 ('{invalid_octet_string}', 16, []),
1572 ], '{invalid_octet_string} == {invalid_octet_string}', (None, CRASHES_WINDOWS)),
1573 # Sending an empty string will crash Windows.
1575 (claims.CLAIMS_SOURCE_TYPE_AD, [
1576 ('{empty_string}', claims.CLAIM_TYPE_STRING, ['']),
1578 ], '{empty_string}', (None, CRASHES_WINDOWS)),
1579 # But sending empty arrays is OK.
1581 (claims.CLAIMS_SOURCE_TYPE_AD, [
1582 ('{empty_array}', claims.CLAIM_TYPE_INT64, []),
1583 ('{empty_array}', claims.CLAIM_TYPE_UINT64, []),
1584 ('{empty_array}', claims.CLAIM_TYPE_BOOLEAN, []),
1585 ('{empty_array}', claims.CLAIM_TYPE_STRING, []),
1587 ], '{empty_array}', None),
1590 def _test_pac_claim_cmp_with_args(self,
1595 self.assertIsInstance(expression, str)
1598 outcome, crashes_windows = outcome
1599 self.assertIs(crashes_windows, CRASHES_WINDOWS)
1600 if not self.crash_windows:
1601 self.skipTest('test crashes Windows servers')
1603 self.assertIsNot(outcome, CRASHES_WINDOWS)
1605 if claim_map is None:
1610 def get_claim_id(claim_name):
1611 claim = claim_ids.get(claim_name)
1613 claim = claim_map.pop(claim_name, None)
1615 claim = self.get_new_username()
1617 claim_ids[claim_name] = claim
1621 def formatted_claim_expression(expr):
1622 formatter = Formatter()
1625 for literal_text, field_name, format_spec, conversion in (
1626 formatter.parse(expr)):
1627 self.assertFalse(format_spec,
1628 f'format specifier ({format_spec}) should '
1629 f'not be specified')
1630 self.assertFalse(conversion,
1631 f'conversion ({conversion}) should not be '
1634 result.append(literal_text)
1636 if field_name is not None:
1637 self.assertTrue(field_name,
1638 'a field name should be specified')
1640 claim_id = get_claim_id(field_name)
1641 claim_id = escaped_claim_id(claim_id)
1642 result.append(f'@User.{claim_id}')
1644 return ''.join(result)
1646 # Construct the conditional ACE expression.
1647 expression = formatted_claim_expression(expression)
1649 self.assertFalse(claim_map, 'unused claim mapping(s) remain')
1651 # Create an authentication policy that will allow authentication when
1652 # the expression is true, and a second that will deny authentication in
1653 # the same circumstance. By observing the results of authenticating
1654 # against each of these policies in turn, we can determine whether the
1655 # expression evaluates to a True, False, or Unknown value.
1657 allowed_sddl = f'O:SYD:(XA;;CR;;;WD;({expression}))'
1658 denied_sddl = f'O:SYD:(XD;;CR;;;WD;({expression}))(A;;CR;;;WD)'
1660 allowed_policy = self.create_authn_policy(
1662 user_allowed_from=allowed_sddl)
1663 denied_policy = self.create_authn_policy(
1665 user_allowed_from=denied_sddl)
1667 # Create a user account assigned to each policy.
1668 allowed_creds = self._get_creds(account_type=self.AccountType.USER,
1669 assigned_policy=allowed_policy)
1670 denied_creds = self._get_creds(account_type=self.AccountType.USER,
1671 assigned_policy=denied_policy)
1673 # Create a computer account.
1674 mach_creds = self.get_cached_creds(
1675 account_type=self.AccountType.COMPUTER)
1677 def expected_values(val):
1678 if isinstance(val, (str, bytes)):
1683 # Fetch the computer account’s TGT.
1684 armor_tgt = self.get_tgt(mach_creds)
1687 # Replace the claims in the PAC with our own.
1688 armor_tgt = self.modified_ticket(
1690 modify_pac_fn=partial(self.set_pac_claims,
1691 client_claims=pac_claims,
1692 claim_ids=claim_ids),
1693 checksum_keys=self.get_krbtgt_checksum_key())
1695 # The first or the second authentication request is expected to succeed
1696 # if the outcome is True or False, respectively. An Unknown outcome,
1697 # represented by None, will result in a policy error in either case.
1699 allowed_error, denied_error = 0, KDC_ERR_POLICY
1700 elif outcome is False:
1701 allowed_error, denied_error = KDC_ERR_POLICY, 0
1702 elif outcome is None:
1703 allowed_error, denied_error = KDC_ERR_POLICY, KDC_ERR_POLICY
1705 allowed_error, denied_error = outcome, outcome
1707 # Attempt to authenticate and ensure that we observe the expected
1709 self._get_tgt(allowed_creds, armor_tgt=armor_tgt,
1710 expected_error=allowed_error)
1711 self._get_tgt(denied_creds, armor_tgt=armor_tgt,
1712 expected_error=denied_error)
1714 def test_rbcd_without_aa_asserted_identity(self):
1716 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1717 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1720 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1721 service_sids=service_sids,
1722 code=KDC_ERR_BADOPTION,
1723 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1724 edata=self.expect_padata_outer)
1726 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1727 service_sids=service_sids,
1728 code=KDC_ERR_POLICY,
1729 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1730 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1731 reason=AuditReason.ACCESS_DENIED,
1732 edata=self.expect_padata_outer)
1734 def test_rbcd_with_aa_asserted_identity(self):
1736 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1737 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1738 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1741 expected_groups = service_sids | {
1742 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1745 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1746 service_sids=service_sids,
1747 expected_groups=expected_groups)
1749 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1750 service_sids=service_sids,
1751 expected_groups=expected_groups)
1753 def test_rbcd_without_service_asserted_identity(self):
1755 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1756 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1759 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1760 service_sids=service_sids,
1761 code=KDC_ERR_BADOPTION,
1762 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1763 edata=self.expect_padata_outer)
1765 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1766 service_sids=service_sids,
1767 code=KDC_ERR_POLICY,
1768 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1769 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1770 reason=AuditReason.ACCESS_DENIED,
1771 edata=self.expect_padata_outer)
1773 def test_rbcd_with_service_asserted_identity(self):
1775 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1776 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1777 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1781 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1782 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1783 # The Application Authority Asserted Identity SID has replaced the
1784 # Service Asserted Identity SID.
1785 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1786 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1789 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1790 service_sids=service_sids,
1791 expected_groups=expected_groups)
1793 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1794 service_sids=service_sids,
1795 expected_groups=expected_groups)
1797 def test_rbcd_without_claims_valid(self):
1799 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1800 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1803 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1804 service_sids=service_sids,
1805 code=KDC_ERR_BADOPTION,
1806 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1807 edata=self.expect_padata_outer)
1809 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1810 service_sids=service_sids,
1811 code=KDC_ERR_POLICY,
1812 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1813 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1814 reason=AuditReason.ACCESS_DENIED,
1815 edata=self.expect_padata_outer)
1817 def test_rbcd_with_claims_valid(self):
1819 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1820 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1821 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1824 expected_groups = service_sids | {
1825 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1828 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1829 service_sids=service_sids,
1830 expected_groups=expected_groups)
1832 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1833 service_sids=service_sids,
1834 expected_groups=expected_groups)
1836 def test_rbcd_without_compounded_authentication(self):
1838 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1839 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1842 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1843 service_sids=service_sids,
1844 code=KDC_ERR_BADOPTION,
1845 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1846 edata=self.expect_padata_outer)
1848 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1849 service_sids=service_sids,
1850 code=KDC_ERR_POLICY,
1851 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1852 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1853 reason=AuditReason.ACCESS_DENIED,
1854 edata=self.expect_padata_outer)
1856 def test_rbcd_with_compounded_authentication(self):
1858 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1859 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1860 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1864 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1865 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1866 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1867 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1870 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1871 service_sids=service_sids,
1872 expected_groups=expected_groups)
1874 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1875 service_sids=service_sids,
1876 expected_groups=expected_groups)
1878 def test_rbcd_client_without_aa_asserted_identity(self):
1880 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1881 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1884 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1885 client_sids=client_sids)
1887 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1888 client_sids=client_sids)
1890 def test_rbcd_client_with_aa_asserted_identity(self):
1892 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1893 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1894 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1897 self._rbcd(f'Member_of SID({self.aa_asserted_identity})',
1898 client_sids=client_sids,
1899 expected_groups=client_sids)
1901 self._rbcd(target_policy=f'Member_of SID({self.aa_asserted_identity})',
1902 client_sids=client_sids,
1903 expected_groups=client_sids)
1905 def test_rbcd_client_without_service_asserted_identity(self):
1907 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1908 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1911 self._rbcd(f'Member_of SID({self.service_asserted_identity})',
1912 client_sids=client_sids,
1913 code=KDC_ERR_BADOPTION,
1914 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1915 edata=self.expect_padata_outer)
1917 self._rbcd(target_policy=f'Member_of SID({self.service_asserted_identity})',
1918 client_sids=client_sids,
1919 code=KDC_ERR_POLICY,
1920 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1921 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1922 reason=AuditReason.ACCESS_DENIED,
1923 edata=self.expect_padata_outer)
1925 def test_rbcd_client_with_service_asserted_identity(self):
1927 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1928 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1929 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
1932 self._rbcd(f'Not_Member_of SID({self.service_asserted_identity})',
1933 client_sids=client_sids,
1934 expected_groups=client_sids)
1936 self._rbcd(target_policy=f'Not_Member_of SID({self.service_asserted_identity})',
1937 client_sids=client_sids,
1938 expected_groups=client_sids)
1940 def test_rbcd_client_without_claims_valid(self):
1942 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1943 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1946 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1947 client_sids=client_sids)
1949 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1950 client_sids=client_sids)
1952 def test_rbcd_client_with_claims_valid(self):
1954 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1955 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1956 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
1959 self._rbcd(f'Member_of SID({security.SID_CLAIMS_VALID})',
1960 client_sids=client_sids,
1961 expected_groups=client_sids)
1963 self._rbcd(target_policy=f'Member_of SID({security.SID_CLAIMS_VALID})',
1964 client_sids=client_sids,
1965 expected_groups=client_sids)
1967 def test_rbcd_client_without_compounded_authentication(self):
1969 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1970 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1973 self._rbcd(f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1974 client_sids=client_sids,
1975 code=KDC_ERR_BADOPTION,
1976 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
1977 edata=self.expect_padata_outer)
1979 self._rbcd(target_policy=f'Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1980 client_sids=client_sids,
1981 code=KDC_ERR_POLICY,
1982 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
1983 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
1984 reason=AuditReason.ACCESS_DENIED,
1985 edata=self.expect_padata_outer)
1987 def test_rbcd_client_with_compounded_authentication(self):
1989 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
1990 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
1991 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
1994 self._rbcd(f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1995 client_sids=client_sids,
1996 expected_groups=client_sids)
1998 self._rbcd(target_policy=f'Not_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
1999 client_sids=client_sids,
2000 expected_groups=client_sids)
2002 def test_rbcd_device_without_aa_asserted_identity(self):
2004 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2005 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2008 self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2009 device_sids=device_sids,
2010 code=KDC_ERR_BADOPTION,
2011 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2012 edata=self.expect_padata_outer)
2014 self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2015 device_sids=device_sids,
2016 code=KDC_ERR_POLICY,
2017 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2018 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2019 reason=AuditReason.ACCESS_DENIED,
2020 edata=self.expect_padata_outer)
2022 def test_rbcd_device_without_aa_asserted_identity_not_memberof(self):
2024 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2025 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2028 self._rbcd(f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2029 device_sids=device_sids)
2031 self._rbcd(target_policy=f'Not_Device_Member_of SID({self.aa_asserted_identity})',
2032 device_sids=device_sids)
2034 def test_rbcd_device_with_aa_asserted_identity(self):
2036 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2037 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2038 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2041 self._rbcd(f'Device_Member_of SID({self.aa_asserted_identity})',
2042 device_sids=device_sids)
2044 self._rbcd(target_policy=f'Device_Member_of SID({self.aa_asserted_identity})',
2045 device_sids=device_sids)
2047 def test_rbcd_device_without_service_asserted_identity(self):
2049 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2050 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2053 self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2054 device_sids=device_sids,
2055 code=KDC_ERR_BADOPTION,
2056 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2057 edata=self.expect_padata_outer)
2059 self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2060 device_sids=device_sids,
2061 code=KDC_ERR_POLICY,
2062 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2063 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2064 reason=AuditReason.ACCESS_DENIED,
2065 edata=self.expect_padata_outer)
2067 def test_rbcd_device_with_service_asserted_identity(self):
2069 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2070 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2071 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2074 self._rbcd(f'Device_Member_of SID({self.service_asserted_identity})',
2075 device_sids=device_sids)
2077 self._rbcd(target_policy=f'Device_Member_of SID({self.service_asserted_identity})',
2078 device_sids=device_sids)
2080 def test_rbcd_device_without_claims_valid(self):
2082 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2083 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2086 self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2087 device_sids=device_sids,
2088 code=KDC_ERR_BADOPTION,
2089 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2090 edata=self.expect_padata_outer)
2092 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2093 device_sids=device_sids,
2094 code=KDC_ERR_POLICY,
2095 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2096 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2097 reason=AuditReason.ACCESS_DENIED,
2098 edata=self.expect_padata_outer)
2100 def test_rbcd_device_with_claims_valid(self):
2102 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2103 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2104 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2107 self._rbcd(f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2108 device_sids=device_sids)
2110 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_CLAIMS_VALID})',
2111 device_sids=device_sids)
2113 def test_rbcd_device_without_compounded_authentication(self):
2115 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2116 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2119 self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2120 device_sids=device_sids,
2121 code=KDC_ERR_BADOPTION,
2122 status=ntstatus.NT_STATUS_UNSUCCESSFUL,
2123 edata=self.expect_padata_outer)
2125 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2126 device_sids=device_sids,
2127 code=KDC_ERR_POLICY,
2128 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2129 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2130 reason=AuditReason.ACCESS_DENIED,
2131 edata=self.expect_padata_outer)
2133 def test_rbcd_device_with_compounded_authentication(self):
2135 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2136 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2137 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2140 self._rbcd(f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2141 device_sids=device_sids)
2143 self._rbcd(target_policy=f'Device_Member_of SID({security.SID_COMPOUNDED_AUTHENTICATION})',
2144 device_sids=device_sids)
2146 def test_rbcd(self):
2147 self._rbcd('Member_of SID({service_sid})')
2149 def test_rbcd_device_from_rodc(self):
2150 self._rbcd('Member_of SID({service_sid})',
2151 device_from_rodc=True,
2152 code=(0, CRASHES_WINDOWS))
2154 def test_rbcd_service_from_rodc(self):
2155 self._rbcd('Member_of SID({service_sid})',
2156 service_from_rodc=True)
2158 def test_rbcd_device_and_service_from_rodc(self):
2159 self._rbcd('Member_of SID({service_sid})',
2160 service_from_rodc=True,
2161 device_from_rodc=True,
2162 code=(0, CRASHES_WINDOWS))
2164 def test_rbcd_client_from_rodc(self):
2165 self._rbcd('Member_of SID({service_sid})',
2166 client_from_rodc=True)
2168 def test_rbcd_client_and_device_from_rodc(self):
2169 self._rbcd('Member_of SID({service_sid})',
2170 client_from_rodc=True,
2171 device_from_rodc=True,
2172 code=(0, CRASHES_WINDOWS))
2174 def test_rbcd_client_and_service_from_rodc(self):
2175 self._rbcd('Member_of SID({service_sid})',
2176 client_from_rodc=True,
2177 service_from_rodc=True)
2179 def test_rbcd_all_from_rodc(self):
2180 self._rbcd('Member_of SID({service_sid})',
2181 client_from_rodc=True,
2182 service_from_rodc=True,
2183 device_from_rodc=True,
2184 code=(0, CRASHES_WINDOWS))
2186 def test_delegating_proxy_in_world_group_rbcd(self):
2187 self._check_delegating_proxy_in_group_rbcd(security.SID_WORLD)
2189 def test_delegating_proxy_in_network_group_rbcd(self):
2190 self._check_delegating_proxy_not_in_group_rbcd(security.SID_NT_NETWORK)
2192 def test_delegating_proxy_in_authenticated_users_rbcd(self):
2193 self._check_delegating_proxy_in_group_rbcd(
2194 security.SID_NT_AUTHENTICATED_USERS)
2196 def test_delegating_proxy_in_aa_asserted_identity_rbcd(self):
2197 self._check_delegating_proxy_in_group_rbcd(
2198 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
2200 def test_delegating_proxy_in_service_asserted_identity_rbcd(self):
2201 self._check_delegating_proxy_not_in_group_rbcd(
2202 security.SID_SERVICE_ASSERTED_IDENTITY)
2204 def test_delegating_proxy_in_compounded_authentication_rbcd(self):
2205 self._check_delegating_proxy_not_in_group_rbcd(
2206 security.SID_COMPOUNDED_AUTHENTICATION)
2208 def test_delegating_proxy_in_claims_valid_rbcd(self):
2209 self._check_delegating_proxy_in_group_rbcd(security.SID_CLAIMS_VALID)
2211 def test_device_in_world_group_rbcd(self):
2212 self._check_device_in_group_rbcd(security.SID_WORLD)
2214 def test_device_in_network_group_rbcd(self):
2215 self._check_device_not_in_group_rbcd(security.SID_NT_NETWORK)
2217 def test_device_in_authenticated_users_rbcd(self):
2218 self._check_device_in_group_rbcd(security.SID_NT_AUTHENTICATED_USERS)
2220 def test_device_in_aa_asserted_identity_rbcd(self):
2221 self._check_device_in_group_rbcd(
2222 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
2224 def test_device_in_service_asserted_identity_rbcd(self):
2225 self._check_device_not_in_group_rbcd(
2226 security.SID_SERVICE_ASSERTED_IDENTITY)
2228 def test_device_in_compounded_authentication_rbcd(self):
2229 self._check_device_not_in_group_rbcd(
2230 security.SID_COMPOUNDED_AUTHENTICATION)
2232 def test_device_in_claims_valid_rbcd(self):
2233 self._check_device_in_group_rbcd(security.SID_CLAIMS_VALID)
2235 def _check_delegating_proxy_in_group_rbcd(self, group):
2236 self._check_membership_rbcd(group, expect_in_group=True)
2238 def _check_delegating_proxy_not_in_group_rbcd(self, group):
2239 self._check_membership_rbcd(group, expect_in_group=False)
2241 def _check_device_in_group_rbcd(self, group):
2242 self._check_membership_rbcd(group, expect_in_group=True, device=True)
2244 def _check_device_not_in_group_rbcd(self, group):
2245 self._check_membership_rbcd(group, expect_in_group=False, device=True)
2247 def _check_membership_rbcd(self,
2252 """Test that authentication succeeds or fails when the delegating proxy
2253 is required to belong to a certain group.
2256 sddl_op = 'Device_Member_of' if device else 'Member_of'
2258 samdb = self.get_samdb()
2259 functional_level = self.get_domain_functional_level(samdb)
2261 if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2262 self.skipTest('RBCD requires FL2008')
2264 # Create a machine account with which to perform FAST.
2265 mach_creds = self.get_cached_creds(
2266 account_type=self.AccountType.COMPUTER,
2267 opts={'id': 'device'})
2268 mach_tgt = self.get_tgt(mach_creds)
2270 # Create a user account.
2271 client_creds = self._get_creds(account_type=self.AccountType.USER)
2273 client_tkt_options = 'forwardable'
2274 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2275 client_tgt = self.get_tgt(client_creds,
2276 kdc_options=client_tkt_options,
2277 expected_flags=expected_flags)
2279 client_sid = client_creds.get_sid()
2281 client_username = client_creds.get_username()
2282 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2283 names=[client_username])
2285 service_creds = self.get_cached_creds(
2286 account_type=self.AccountType.COMPUTER,
2287 opts={'id': 'service'})
2288 service_tgt = self.get_tgt(service_creds)
2290 client_service_tkt = self.get_service_ticket(
2293 kdc_options=client_tkt_options,
2294 expected_flags=expected_flags)
2296 domain_sid_str = samdb.get_domain_sid()
2297 domain_sid = security.dom_sid(domain_sid_str)
2299 # Require the principal to belong to a certain group.
2300 in_group_sddl = self.allow_if(f'{sddl_op} {{SID({group})}}')
2301 in_group_descriptor = security.descriptor.from_sddl(in_group_sddl,
2304 # Create a target account that allows RBCD if the principal belongs to
2306 in_group_target_creds = self.get_cached_creds(
2307 account_type=self.AccountType.COMPUTER,
2309 'additional_details': (
2310 ('msDS-AllowedToActOnBehalfOfOtherIdentity',
2311 ndr_pack(in_group_descriptor)),
2315 kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2317 in_group_target_key = self.TicketDecryptionKey_from_creds(
2318 in_group_target_creds)
2319 in_group_target_etypes = in_group_target_creds.tgs_supported_enctypes
2321 service_name = service_creds.get_username()
2322 if service_name[-1] == '$':
2323 service_name = service_name[:-1]
2324 expected_transited_services = [
2325 f'host/{service_name}@{service_creds.get_realm()}'
2328 pac_options = '1001' # supports claims, RBCD
2330 success_result = 0, None, None
2333 ntstatus.NT_STATUS_UNSUCCESSFUL,
2334 self.expect_padata_outer,
2337 code, status, expect_edata = (success_result if expect_in_group
2338 else failure_result)
2340 # Test whether obtaining a service ticket with RBCD is allowed.
2341 self._tgs_req(service_tgt,
2344 in_group_target_creds,
2346 kdc_options=kdc_options,
2347 pac_options=pac_options,
2348 expected_cname=client_cname,
2349 expected_account_name=client_username,
2350 additional_ticket=client_service_tkt,
2351 decryption_key=in_group_target_key,
2352 expected_sid=client_sid,
2353 expected_supported_etypes=in_group_target_etypes,
2354 expected_proxy_target=in_group_target_creds.get_spn(),
2355 expected_transited_services=expected_transited_services,
2356 expected_status=status,
2357 expect_edata=expect_edata)
2359 effective_client_creds = service_creds if code else client_creds
2360 self.check_tgs_log(effective_client_creds, in_group_target_creds,
2361 checked_creds=service_creds,
2364 # Require the principal not to belong to a certain group.
2365 not_in_group_sddl = self.allow_if(f'Not_{sddl_op} {{SID({group})}}')
2366 not_in_group_descriptor = security.descriptor.from_sddl(
2367 not_in_group_sddl, domain_sid)
2369 # Create a target account that allows RBCD if the principal does not
2370 # belong to the group.
2371 not_in_group_target_creds = self.get_cached_creds(
2372 account_type=self.AccountType.COMPUTER,
2374 'additional_details': (
2375 ('msDS-AllowedToActOnBehalfOfOtherIdentity',
2376 ndr_pack(not_in_group_descriptor)),
2380 not_in_group_target_key = self.TicketDecryptionKey_from_creds(
2381 not_in_group_target_creds)
2382 not_in_group_target_etypes = (
2383 not_in_group_target_creds.tgs_supported_enctypes)
2385 code, status, expect_edata = (failure_result if expect_in_group
2386 else success_result)
2388 # Test whether obtaining a service ticket with RBCD is allowed.
2389 self._tgs_req(service_tgt,
2392 not_in_group_target_creds,
2394 kdc_options=kdc_options,
2395 pac_options=pac_options,
2396 expected_cname=client_cname,
2397 expected_account_name=client_username,
2398 additional_ticket=client_service_tkt,
2399 decryption_key=not_in_group_target_key,
2400 expected_sid=client_sid,
2401 expected_supported_etypes=not_in_group_target_etypes,
2402 expected_proxy_target=not_in_group_target_creds.get_spn(),
2403 expected_transited_services=expected_transited_services,
2404 expected_status=status,
2405 expect_edata=expect_edata)
2407 effective_client_creds = service_creds if code else client_creds
2408 self.check_tgs_log(effective_client_creds, not_in_group_target_creds,
2409 checked_creds=service_creds,
2413 rbcd_expression=None,
2417 event=AuditEvent.OK,
2418 reason=AuditReason.NONE,
2421 client_from_rodc=False,
2422 service_from_rodc=False,
2423 device_from_rodc=False,
2427 service_claims=None,
2430 expected_groups=None,
2431 expected_claims=None):
2433 code, crashes_windows = code
2434 self.assertIs(crashes_windows, CRASHES_WINDOWS)
2435 if not self.crash_windows:
2436 self.skipTest('test crashes Windows servers')
2438 self.assertIsNot(code, CRASHES_WINDOWS)
2440 samdb = self.get_samdb()
2441 functional_level = self.get_domain_functional_level(samdb)
2443 if functional_level < dsdb.DS_DOMAIN_FUNCTION_2008:
2444 self.skipTest('RBCD requires FL2008')
2446 domain_sid_str = samdb.get_domain_sid()
2447 domain_sid = security.dom_sid(domain_sid_str)
2449 client_creds = self.get_cached_creds(
2450 account_type=self.AccountType.USER,
2452 'allowed_replication_mock': client_from_rodc,
2453 'revealed_to_mock_rodc': client_from_rodc,
2455 client_sid = client_creds.get_sid()
2457 client_username = client_creds.get_username()
2458 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
2459 names=[client_username])
2461 client_tkt_options = 'forwardable'
2462 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
2464 checksum_key = self.get_krbtgt_checksum_key()
2466 if client_from_rodc or service_from_rodc or device_from_rodc:
2467 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
2468 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
2469 rodc_checksum_key = {
2470 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
2473 client_tgt = self.get_tgt(client_creds,
2474 kdc_options=client_tkt_options,
2475 expected_flags=expected_flags)
2477 # Create a machine account with which to perform FAST.
2478 mach_creds = self.get_cached_creds(
2479 account_type=self.AccountType.COMPUTER,
2481 'allowed_replication_mock': device_from_rodc,
2482 'revealed_to_mock_rodc': device_from_rodc,
2484 mach_tgt = self.get_tgt(mach_creds)
2485 device_modify_pac_fn = []
2486 if device_sids is not None:
2487 device_modify_pac_fn.append(partial(self.set_pac_sids,
2488 new_sids=device_sids))
2489 if device_claims is not None:
2490 device_modify_pac_fn.append(partial(self.set_pac_claims,
2491 client_claims=device_claims))
2492 mach_tgt = self.modified_ticket(
2494 modify_pac_fn=device_modify_pac_fn,
2495 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
2496 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
2498 service_creds = self.get_cached_creds(
2499 account_type=self.AccountType.COMPUTER,
2502 'allowed_replication_mock': service_from_rodc,
2503 'revealed_to_mock_rodc': service_from_rodc,
2505 service_tgt = self.get_tgt(service_creds)
2507 service_modify_pac_fn = []
2508 if service_sids is not None:
2509 service_modify_pac_fn.append(partial(self.set_pac_sids,
2510 new_sids=service_sids))
2511 if service_claims is not None:
2512 service_modify_pac_fn.append(partial(self.set_pac_claims,
2513 client_claims=service_claims))
2514 service_tgt = self.modified_ticket(
2516 modify_pac_fn=service_modify_pac_fn,
2517 new_ticket_key=rodc_krbtgt_key if service_from_rodc else None,
2518 checksum_keys=rodc_checksum_key if service_from_rodc else checksum_key)
2520 if target_policy is None:
2522 assigned_policy = None
2524 sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(service_sid=service_creds.get_sid())}))'
2525 policy = self.create_authn_policy(enforced=True,
2526 computer_allowed_to=sddl)
2527 assigned_policy = str(policy.dn)
2529 if rbcd_expression is not None:
2530 sddl = f'O:SYD:(XA;;CR;;;WD;({rbcd_expression.format(service_sid=service_creds.get_sid())}))'
2532 sddl = 'O:SYD:(A;;CR;;;WD)'
2533 descriptor = security.descriptor.from_sddl(sddl, domain_sid)
2534 descriptor = ndr_pack(descriptor)
2536 # Create a target account with the assigned policy.
2537 target_creds = self.get_cached_creds(
2538 account_type=self.AccountType.COMPUTER,
2540 'assigned_policy': assigned_policy,
2541 'additional_details': (
2542 ('msDS-AllowedToActOnBehalfOfOtherIdentity', descriptor),
2546 client_service_tkt = self.get_service_ticket(
2549 kdc_options=client_tkt_options,
2550 expected_flags=expected_flags)
2551 client_modify_pac_fn = []
2552 if client_sids is not None:
2553 client_modify_pac_fn.append(partial(self.set_pac_sids,
2554 new_sids=client_sids))
2555 if client_claims is not None:
2556 client_modify_pac_fn.append(partial(self.set_pac_claims,
2557 client_claims=client_claims))
2558 client_service_tkt = self.modified_ticket(client_service_tkt,
2559 modify_pac_fn=client_modify_pac_fn,
2560 checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
2562 kdc_options = str(krb5_asn1.KDCOptions('cname-in-addl-tkt'))
2564 target_decryption_key = self.TicketDecryptionKey_from_creds(
2566 target_etypes = target_creds.tgs_supported_enctypes
2568 service_name = service_creds.get_username()
2569 if service_name[-1] == '$':
2570 service_name = service_name[:-1]
2571 expected_transited_services = [
2572 f'host/{service_name}@{service_creds.get_realm()}'
2575 expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
2577 # Show that obtaining a service ticket with RBCD is allowed.
2578 self._tgs_req(service_tgt, code, service_creds, target_creds,
2580 kdc_options=kdc_options,
2581 pac_options='1001', # supports claims, RBCD
2582 expected_cname=client_cname,
2583 expected_account_name=client_username,
2584 additional_ticket=client_service_tkt,
2585 decryption_key=target_decryption_key,
2586 expected_sid=client_sid,
2587 expected_groups=expected_groups,
2588 expect_client_claims=bool(expected_claims) or None,
2589 expected_client_claims=expected_claims,
2590 expected_supported_etypes=target_etypes,
2591 expected_proxy_target=target_creds.get_spn(),
2592 expected_transited_services=expected_transited_services,
2593 expected_status=status,
2597 effective_client_creds = service_creds
2599 effective_client_creds = client_creds
2601 self.check_tgs_log(effective_client_creds, target_creds,
2603 checked_creds=service_creds,
2608 def test_tgs_claims_valid_missing(self):
2609 """Test that the Claims Valid SID is not added to the PAC when
2610 performing a TGS‐REQ."""
2612 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2613 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2614 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2617 self._tgs(use_fast=False,
2618 client_sids=client_sids,
2619 expected_groups=client_sids)
2621 def test_tgs_claims_valid_missing_from_rodc(self):
2622 """Test that the Claims Valid SID *is* added to the PAC when
2623 performing a TGS‐REQ with an RODC‐issued TGT."""
2625 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2626 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2627 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2630 expected_groups = client_sids | {
2631 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2634 self._tgs(use_fast=False,
2635 client_from_rodc=True,
2636 client_sids=client_sids,
2637 expected_groups=expected_groups)
2639 def test_tgs_aa_asserted_identity(self):
2640 """Test performing a TGS‐REQ with the Authentication Identity Asserted
2641 Identity SID present."""
2643 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2644 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2645 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2646 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2649 self._tgs(use_fast=False,
2650 client_sids=client_sids,
2651 expected_groups=client_sids)
2653 def test_tgs_aa_asserted_identity_no_attrs(self):
2654 """Test performing a TGS‐REQ with the Authentication Identity Asserted
2655 Identity SID present, albeit without any attributes."""
2657 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2658 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2659 # Put the Asserted Identity SID in the PAC without any flags set.
2660 (self.aa_asserted_identity, SidType.EXTRA_SID, 0),
2661 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2664 self._tgs(use_fast=False,
2665 client_sids=client_sids,
2666 expected_groups=client_sids)
2668 def test_tgs_aa_asserted_identity_from_rodc(self):
2669 """Test that the Authentication Identity Asserted Identity SID in an
2670 RODC‐issued PAC is preserved when performing a TGS‐REQ."""
2672 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2673 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2674 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2675 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2678 self._tgs(use_fast=False,
2679 client_from_rodc=True,
2680 client_sids=client_sids,
2681 expected_groups=client_sids)
2683 def test_tgs_aa_asserted_identity_from_rodc_no_attrs_from_rodc(self):
2684 """Test that the Authentication Identity Asserted Identity SID without
2685 attributes in an RODC‐issued PAC is preserved when performing a
2688 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2689 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2690 # Put the Asserted Identity SID in the PAC without any flags set.
2691 (self.aa_asserted_identity, SidType.EXTRA_SID, 0),
2692 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2696 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2697 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2698 # The SID in the resulting PAC has the default attributes.
2699 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2700 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2703 self._tgs(use_fast=False,
2704 client_from_rodc=True,
2705 client_sids=client_sids,
2706 expected_groups=expected_groups)
2708 def test_tgs_compound_authentication(self):
2709 """Test performing a TGS‐REQ with the Compounded Authentication SID
2712 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2713 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2714 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2715 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2718 self._tgs(use_fast=False,
2719 client_sids=client_sids,
2720 expected_groups=client_sids)
2722 def test_tgs_compound_authentication_from_rodc(self):
2723 """Test that the Compounded Authentication SID in an
2724 RODC‐issued PAC is not preserved when performing a TGS‐REQ."""
2726 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2727 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2728 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2729 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
2733 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2734 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2735 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2738 self._tgs(use_fast=False,
2739 client_from_rodc=True,
2740 client_sids=client_sids,
2741 expected_groups=expected_groups)
2743 def test_tgs_asserted_identity_missing(self):
2744 """Test that the Authentication Identity Asserted Identity SID is not
2745 added to the PAC when performing a TGS‐REQ."""
2747 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2748 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2749 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2752 self._tgs(use_fast=False,
2753 client_sids=client_sids,
2754 expected_groups=client_sids)
2756 def test_tgs_asserted_identity_missing_from_rodc(self):
2757 """Test that the Authentication Identity Asserted Identity SID is not
2758 added to an RODC‐issued PAC when performing a TGS‐REQ."""
2760 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2761 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2762 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2765 self._tgs(use_fast=False,
2766 client_from_rodc=True,
2767 client_sids=client_sids,
2768 expected_groups=client_sids)
2770 def test_tgs_service_asserted_identity(self):
2771 """Test performing a TGS‐REQ with the Service Asserted Identity SID
2774 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2775 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2776 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2777 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2780 self._tgs(use_fast=False,
2781 client_sids=client_sids,
2782 expected_groups=client_sids)
2784 def test_tgs_service_asserted_identity_from_rodc(self):
2785 """Test that the Service Asserted Identity SID in an
2786 RODC‐issued PAC is not preserved when performing a TGS‐REQ."""
2788 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2789 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2790 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2791 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2795 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2796 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2797 # Don’t expect the Service Asserted Identity SID.
2798 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2801 self._tgs(use_fast=False,
2802 client_from_rodc=True,
2803 client_sids=client_sids,
2804 expected_groups=expected_groups)
2806 def test_tgs_without_aa_asserted_identity(self):
2808 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2809 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2812 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2813 client_sids=client_sids,
2814 code=KDC_ERR_POLICY,
2815 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2816 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2817 reason=AuditReason.ACCESS_DENIED,
2818 edata=self.expect_padata_outer)
2820 def test_tgs_without_aa_asserted_identity_client_from_rodc(self):
2822 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2823 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2826 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2827 client_from_rodc=True,
2828 client_sids=client_sids,
2829 code=KDC_ERR_POLICY,
2830 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2831 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2832 reason=AuditReason.ACCESS_DENIED,
2833 edata=self.expect_padata_outer)
2835 def test_tgs_without_aa_asserted_identity_device_from_rodc(self):
2837 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2838 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2841 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2842 device_from_rodc=True,
2843 client_sids=client_sids,
2844 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2845 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2846 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2847 reason=AuditReason.ACCESS_DENIED,
2848 edata=self.expect_padata_outer)
2850 def test_tgs_without_aa_asserted_identity_both_from_rodc(self):
2852 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2853 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2856 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2857 client_from_rodc=True,
2858 device_from_rodc=True,
2859 client_sids=client_sids,
2860 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2861 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2862 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2863 reason=AuditReason.ACCESS_DENIED,
2864 edata=self.expect_padata_outer)
2866 def test_tgs_with_aa_asserted_identity(self):
2868 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2869 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2870 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2873 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2874 client_sids=client_sids,
2875 expected_groups=client_sids)
2877 def test_tgs_with_aa_asserted_identity_client_from_rodc(self):
2879 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2880 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2881 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2884 expected_groups = client_sids | {
2885 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2888 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2889 client_from_rodc=True,
2890 client_sids=client_sids,
2891 expected_groups=expected_groups)
2893 def test_tgs_with_aa_asserted_identity_device_from_rodc(self):
2895 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2896 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2897 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2900 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2901 device_from_rodc=True,
2902 client_sids=client_sids,
2903 expected_groups=client_sids,
2904 code=(0, CRASHES_WINDOWS))
2906 def test_tgs_with_aa_asserted_identity_both_from_rodc(self):
2908 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2909 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2910 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2913 expected_groups = client_sids | {
2914 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
2917 self._tgs(f'Member_of SID({self.aa_asserted_identity})',
2918 client_from_rodc=True,
2919 device_from_rodc=True,
2920 client_sids=client_sids,
2921 expected_groups=expected_groups,
2922 code=(0, CRASHES_WINDOWS))
2924 def test_tgs_without_service_asserted_identity(self):
2926 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2927 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2930 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2931 client_sids=client_sids,
2932 code=KDC_ERR_POLICY,
2933 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2934 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2935 reason=AuditReason.ACCESS_DENIED,
2936 edata=self.expect_padata_outer)
2938 def test_tgs_without_service_asserted_identity_client_from_rodc(self):
2940 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2941 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2944 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2945 client_from_rodc=True,
2946 client_sids=client_sids,
2947 code=KDC_ERR_POLICY,
2948 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2949 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2950 reason=AuditReason.ACCESS_DENIED,
2951 edata=self.expect_padata_outer)
2953 def test_tgs_without_service_asserted_identity_device_from_rodc(self):
2955 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2956 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2959 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2960 device_from_rodc=True,
2961 client_sids=client_sids,
2962 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2963 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2964 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2965 reason=AuditReason.ACCESS_DENIED,
2966 edata=self.expect_padata_outer)
2968 def test_tgs_without_service_asserted_identity_both_from_rodc(self):
2970 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2971 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2974 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2975 client_from_rodc=True,
2976 device_from_rodc=True,
2977 client_sids=client_sids,
2978 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
2979 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
2980 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
2981 reason=AuditReason.ACCESS_DENIED,
2982 edata=self.expect_padata_outer)
2984 def test_tgs_with_service_asserted_identity(self):
2986 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2987 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2988 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
2991 self._tgs(f'Member_of SID({self.service_asserted_identity})',
2992 client_sids=client_sids,
2993 expected_groups=client_sids)
2995 def test_tgs_with_service_asserted_identity_client_from_rodc(self):
2997 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
2998 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
2999 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3002 self._tgs(f'Member_of SID({self.service_asserted_identity})',
3003 client_from_rodc=True,
3004 client_sids=client_sids,
3005 code=KDC_ERR_POLICY,
3006 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3007 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3008 reason=AuditReason.ACCESS_DENIED,
3009 edata=self.expect_padata_outer)
3011 def test_tgs_with_service_asserted_identity_device_from_rodc(self):
3013 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3014 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3015 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3018 self._tgs(f'Member_of SID({self.service_asserted_identity})',
3019 device_from_rodc=True,
3020 client_sids=client_sids,
3021 expected_groups=client_sids,
3022 code=(0, CRASHES_WINDOWS))
3024 def test_tgs_with_service_asserted_identity_both_from_rodc(self):
3026 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3027 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3028 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3031 self._tgs(f'Member_of SID({self.service_asserted_identity})',
3032 client_from_rodc=True,
3033 device_from_rodc=True,
3034 client_sids=client_sids,
3035 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
3036 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3037 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3038 reason=AuditReason.ACCESS_DENIED,
3039 edata=self.expect_padata_outer)
3041 def test_tgs_without_claims_valid(self):
3043 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3044 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3047 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3048 client_sids=client_sids,
3049 code=KDC_ERR_POLICY,
3050 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3051 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3052 reason=AuditReason.ACCESS_DENIED,
3053 edata=self.expect_padata_outer)
3055 def test_tgs_without_claims_valid_client_from_rodc(self):
3057 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3058 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3061 expected_groups = client_sids | {
3062 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3065 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3066 client_from_rodc=True,
3067 client_sids=client_sids,
3068 expected_groups=expected_groups)
3070 def test_tgs_without_claims_valid_device_from_rodc(self):
3072 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3073 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3076 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3077 device_from_rodc=True,
3078 client_sids=client_sids,
3079 code=(KDC_ERR_POLICY, CRASHES_WINDOWS),
3080 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3081 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
3082 reason=AuditReason.ACCESS_DENIED,
3083 edata=self.expect_padata_outer)
3085 def test_tgs_without_claims_valid_both_from_rodc(self):
3087 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3088 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3091 expected_groups = client_sids | {
3092 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3095 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3096 client_from_rodc=True,
3097 device_from_rodc=True,
3098 client_sids=client_sids,
3099 expected_groups=expected_groups,
3100 code=(0, CRASHES_WINDOWS))
3102 def test_tgs_with_claims_valid(self):
3104 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3105 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3106 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3109 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3110 client_sids=client_sids,
3111 expected_groups=client_sids)
3113 def test_tgs_with_claims_valid_client_from_rodc(self):
3115 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3116 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3117 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3120 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3121 client_from_rodc=True,
3122 client_sids=client_sids,
3123 expected_groups=client_sids)
3125 def test_tgs_with_claims_valid_device_from_rodc(self):
3127 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3128 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3129 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3132 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3133 device_from_rodc=True,
3134 client_sids=client_sids,
3135 expected_groups=client_sids,
3136 code=(0, CRASHES_WINDOWS))
3138 def test_tgs_with_claims_valid_both_from_rodc(self):
3140 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3141 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3142 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3145 self._tgs(f'Member_of SID({security.SID_CLAIMS_VALID})',
3146 client_from_rodc=True,
3147 device_from_rodc=True,
3148 client_sids=client_sids,
3149 expected_groups=client_sids,
3150 code=(0, CRASHES_WINDOWS))
3156 event=AuditEvent.OK,
3157 reason=AuditReason.NONE,
3161 client_from_rodc=None,
3162 device_from_rodc=None,
3167 expected_groups=None,
3168 expected_claims=None):
3170 code, crashes_windows = code
3171 self.assertIs(crashes_windows, CRASHES_WINDOWS)
3172 if not self.crash_windows:
3173 self.skipTest('test crashes Windows servers')
3175 self.assertIsNot(code, CRASHES_WINDOWS)
3178 self.assertIsNone(device_from_rodc)
3179 self.assertIsNone(device_sids)
3180 self.assertIsNone(device_claims)
3182 if client_from_rodc is None:
3183 client_from_rodc = False
3185 if device_from_rodc is None:
3186 device_from_rodc = False
3188 client_creds = self.get_cached_creds(
3189 account_type=self.AccountType.USER,
3191 'allowed_replication_mock': client_from_rodc,
3192 'revealed_to_mock_rodc': client_from_rodc,
3194 client_sid = client_creds.get_sid()
3196 client_username = client_creds.get_username()
3197 client_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
3198 names=[client_username])
3200 client_tkt_options = 'forwardable'
3201 expected_flags = krb5_asn1.TicketFlags(client_tkt_options)
3203 checksum_key = self.get_krbtgt_checksum_key()
3205 if client_from_rodc or device_from_rodc:
3206 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
3207 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
3208 rodc_checksum_key = {
3209 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
3212 client_tgt = self.get_tgt(client_creds,
3213 kdc_options=client_tkt_options,
3214 expected_flags=expected_flags)
3216 client_modify_pac_fn = []
3217 if client_sids is not None:
3218 client_modify_pac_fn.append(partial(self.set_pac_sids,
3219 new_sids=client_sids))
3220 if client_claims is not None:
3221 client_modify_pac_fn.append(partial(self.set_pac_claims,
3222 client_claims=client_claims))
3223 client_tgt = self.modified_ticket(
3225 modify_pac_fn=client_modify_pac_fn,
3226 new_ticket_key=rodc_krbtgt_key if client_from_rodc else None,
3227 checksum_keys=rodc_checksum_key if client_from_rodc else checksum_key)
3230 # Create a machine account with which to perform FAST.
3231 mach_creds = self.get_cached_creds(
3232 account_type=self.AccountType.COMPUTER,
3234 'allowed_replication_mock': device_from_rodc,
3235 'revealed_to_mock_rodc': device_from_rodc,
3237 mach_tgt = self.get_tgt(mach_creds)
3238 device_modify_pac_fn = []
3239 if device_sids is not None:
3240 device_modify_pac_fn.append(partial(self.set_pac_sids,
3241 new_sids=device_sids))
3242 if device_claims is not None:
3243 device_modify_pac_fn.append(partial(self.set_pac_claims,
3244 client_claims=device_claims))
3245 mach_tgt = self.modified_ticket(
3247 modify_pac_fn=device_modify_pac_fn,
3248 new_ticket_key=rodc_krbtgt_key if device_from_rodc else None,
3249 checksum_keys=rodc_checksum_key if device_from_rodc else checksum_key)
3253 if target_policy is None:
3255 assigned_policy = None
3257 sddl = f'O:SYD:(XA;;CR;;;WD;({target_policy.format(client_sid=client_creds.get_sid())}))'
3258 policy = self.create_authn_policy(enforced=True,
3259 computer_allowed_to=sddl)
3260 assigned_policy = str(policy.dn)
3262 # Create a target account with the assigned policy.
3263 target_creds = self.get_cached_creds(
3264 account_type=self.AccountType.COMPUTER,
3265 opts={'assigned_policy': assigned_policy})
3267 target_decryption_key = self.TicketDecryptionKey_from_creds(
3269 target_etypes = target_creds.tgs_supported_enctypes
3271 samdb = self.get_samdb()
3272 domain_sid_str = samdb.get_domain_sid()
3274 expected_groups = self.map_sids(expected_groups, None, domain_sid_str)
3276 # Show that obtaining a service ticket is allowed.
3277 self._tgs_req(client_tgt, code, client_creds, target_creds,
3279 expected_cname=client_cname,
3280 expected_account_name=client_username,
3281 decryption_key=target_decryption_key,
3282 expected_sid=client_sid,
3283 expected_groups=expected_groups,
3284 expect_client_claims=bool(expected_claims) or None,
3285 expected_client_claims=expected_claims,
3286 expected_supported_etypes=target_etypes,
3287 expected_status=status,
3290 self.check_tgs_log(client_creds, target_creds,
3292 checked_creds=client_creds,
3297 def test_conditional_ace_allowed_from_user_allow(self):
3298 # Create a machine account with which to perform FAST.
3299 mach_creds = self.get_cached_creds(
3300 account_type=self.AccountType.COMPUTER)
3301 mach_tgt = self.get_tgt(mach_creds)
3303 # Create an authentication policy that explicitly allows the machine
3304 # account for a user.
3305 allowed = (f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};'
3306 f'(Member_of SID({mach_creds.get_sid()})))')
3307 denied = 'O:SYD:(D;;CR;;;WD)'
3308 policy = self.create_authn_policy(enforced=True,
3309 user_allowed_from=allowed,
3310 service_allowed_from=denied)
3312 # Create a user account with the assigned policy.
3313 client_creds = self._get_creds(account_type=self.AccountType.USER,
3314 assigned_policy=policy)
3316 # Show that authentication succeeds.
3317 self._get_tgt(client_creds, armor_tgt=mach_tgt,
3322 armor_creds=mach_creds,
3323 client_policy=policy)
3325 def test_conditional_ace_allowed_from_user_deny(self):
3326 # Create a machine account with which to perform FAST.
3327 mach_creds = self.get_cached_creds(
3328 account_type=self.AccountType.COMPUTER)
3329 mach_tgt = self.get_tgt(mach_creds)
3331 # Create an authentication policy that explicitly denies the machine
3332 # account for a user.
3333 allowed = 'O:SYD:(A;;CR;;;WD)'
3334 denied = (f'O:SYD:(XD;;CR;;;{mach_creds.get_sid()};'
3335 f'(Member_of SID({mach_creds.get_sid()})))'
3337 policy = self.create_authn_policy(enforced=True,
3338 user_allowed_from=denied,
3339 service_allowed_from=allowed)
3341 # Create a user account with the assigned policy.
3342 client_creds = self._get_creds(account_type=self.AccountType.USER,
3343 assigned_policy=policy)
3345 # Show that we get a policy error when trying to authenticate.
3346 self._get_tgt(client_creds, armor_tgt=mach_tgt,
3347 expected_error=KDC_ERR_POLICY)
3351 armor_creds=mach_creds,
3352 client_policy=policy,
3353 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3354 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3355 reason=AuditReason.ACCESS_DENIED,
3356 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3359 class DeviceRestrictionTests(ConditionalAceBaseTests):
3360 def test_pac_groups_not_present(self):
3361 """Test that authentication fails if the device does not belong to some
3366 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3367 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3370 # Create a machine account with which to perform FAST.
3371 mach_creds = self.get_cached_creds(
3372 account_type=self.AccountType.COMPUTER,
3373 opts={'id': 'device'})
3374 mach_tgt = self.get_tgt(mach_creds)
3376 # Create an authentication policy that requires the device to belong to
3378 client_policy_sddl = self.allow_if(
3379 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3380 client_policy = self.create_authn_policy(
3381 enforced=True, user_allowed_from=client_policy_sddl)
3383 # Create a user account with the assigned policy.
3384 client_creds = self._get_creds(account_type=self.AccountType.USER,
3385 assigned_policy=client_policy)
3387 # Show that authentication fails.
3388 self._armored_as_req(client_creds,
3389 self.get_krbtgt_creds(),
3391 expected_error=KDC_ERR_POLICY)
3395 armor_creds=mach_creds,
3396 client_policy=client_policy,
3397 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3398 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3399 reason=AuditReason.ACCESS_DENIED,
3400 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3402 def test_pac_groups_present(self):
3403 """Test that authentication succeeds if the device belongs to some
3408 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3409 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3412 device_sids = required_sids | {
3413 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3414 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3417 # Create a machine account with which to perform FAST.
3418 mach_creds = self.get_cached_creds(
3419 account_type=self.AccountType.COMPUTER,
3420 opts={'id': 'device'})
3421 mach_tgt = self.get_tgt(mach_creds)
3423 # Add the required groups to the machine account’s TGT.
3424 mach_tgt = self.modified_ticket(
3426 modify_pac_fn=partial(self.set_pac_sids,
3427 new_sids=device_sids),
3428 checksum_keys=self.get_krbtgt_checksum_key())
3430 # Create an authentication policy that requires the device to belong to
3432 client_policy_sddl = self.allow_if(
3433 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3434 client_policy = self.create_authn_policy(
3435 enforced=True, user_allowed_from=client_policy_sddl)
3437 # Create a user account with the assigned policy.
3438 client_creds = self._get_creds(account_type=self.AccountType.USER,
3439 assigned_policy=client_policy)
3441 # Show that authentication succeeds.
3442 self._armored_as_req(client_creds,
3443 self.get_krbtgt_creds(),
3446 self.check_as_log(client_creds,
3447 armor_creds=mach_creds,
3448 client_policy=client_policy)
3450 def test_pac_resource_groups_present(self):
3451 """Test that authentication succeeds if the device belongs to some
3452 required resource groups.
3456 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3457 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3458 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3461 device_sids = required_sids | {
3462 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3463 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3466 # Create a machine account with which to perform FAST.
3467 mach_creds = self.get_cached_creds(
3468 account_type=self.AccountType.COMPUTER,
3469 opts={'id': 'device'})
3470 mach_tgt = self.get_tgt(mach_creds)
3472 # Add the required groups to the machine account’s TGT.
3473 mach_tgt = self.modified_ticket(
3475 modify_pac_fn=partial(self.set_pac_sids,
3476 new_sids=device_sids),
3477 checksum_keys=self.get_krbtgt_checksum_key())
3479 # Create an authentication policy that requires the device to belong to
3481 client_policy_sddl = self.allow_if(
3482 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3483 client_policy = self.create_authn_policy(
3484 enforced=True, user_allowed_from=client_policy_sddl)
3486 # Create a user account with the assigned policy.
3487 client_creds = self._get_creds(account_type=self.AccountType.USER,
3488 assigned_policy=client_policy)
3490 # Show that authentication fails.
3491 self._armored_as_req(client_creds,
3492 self.get_krbtgt_creds(),
3494 expected_error=KDC_ERR_POLICY)
3498 armor_creds=mach_creds,
3499 client_policy=client_policy,
3500 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3501 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3502 reason=AuditReason.ACCESS_DENIED,
3503 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3505 def test_pac_resource_groups_present_to_service_sid_compression(self):
3506 """Test that authentication succeeds if the device belongs to some
3507 required resource groups, and the request is to a service that supports
3512 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3513 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3514 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3517 device_sids = required_sids | {
3518 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3519 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3522 # Create a machine account with which to perform FAST.
3523 mach_creds = self.get_cached_creds(
3524 account_type=self.AccountType.COMPUTER,
3525 opts={'id': 'device'})
3526 mach_tgt = self.get_tgt(mach_creds)
3528 # Add the required groups to the machine account’s TGT.
3529 mach_tgt = self.modified_ticket(
3531 modify_pac_fn=partial(self.set_pac_sids,
3532 new_sids=device_sids),
3533 checksum_keys=self.get_krbtgt_checksum_key())
3535 # Create an authentication policy that requires the device to belong to
3537 client_policy_sddl = self.allow_if(
3538 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3539 client_policy = self.create_authn_policy(
3540 enforced=True, user_allowed_from=client_policy_sddl)
3542 # Create a user account with the assigned policy.
3543 client_creds = self._get_creds(account_type=self.AccountType.USER,
3544 assigned_policy=client_policy)
3546 target_creds = self.get_cached_creds(
3547 account_type=self.AccountType.COMPUTER,
3548 opts={'id': 'target'})
3550 # Show that authentication fails.
3551 self._armored_as_req(client_creds,
3554 expected_error=KDC_ERR_POLICY)
3558 armor_creds=mach_creds,
3559 client_policy=client_policy,
3560 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3561 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3562 reason=AuditReason.ACCESS_DENIED,
3563 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3565 def test_pac_resource_groups_present_to_service_no_sid_compression(self):
3566 """Test that authentication succeeds if the device belongs to some
3567 required resource groups, and the request is to a service that does not
3568 support SID compression.
3572 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
3573 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
3574 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
3577 device_sids = required_sids | {
3578 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3579 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3582 # Create a machine account with which to perform FAST.
3583 mach_creds = self.get_cached_creds(
3584 account_type=self.AccountType.COMPUTER,
3585 opts={'id': 'device'})
3586 mach_tgt = self.get_tgt(mach_creds)
3588 # Add the required groups to the machine account’s TGT.
3589 mach_tgt = self.modified_ticket(
3591 modify_pac_fn=partial(self.set_pac_sids,
3592 new_sids=device_sids),
3593 checksum_keys=self.get_krbtgt_checksum_key())
3595 # Create an authentication policy that requires the device to belong to
3597 client_policy_sddl = self.allow_if(
3598 f'Member_of {self.sddl_array_from_sids(required_sids)}')
3599 client_policy = self.create_authn_policy(
3600 enforced=True, user_allowed_from=client_policy_sddl)
3602 # Create a user account with the assigned policy.
3603 client_creds = self._get_creds(account_type=self.AccountType.USER,
3604 assigned_policy=client_policy)
3606 target_creds = self.get_cached_creds(
3607 account_type=self.AccountType.COMPUTER,
3610 'supported_enctypes': (
3611 security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
3612 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK),
3613 'sid_compression_support': False,
3616 # Show that authentication fails.
3617 self._armored_as_req(client_creds,
3620 expected_error=KDC_ERR_POLICY)
3624 armor_creds=mach_creds,
3625 client_policy=client_policy,
3626 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3627 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3628 reason=AuditReason.ACCESS_DENIED,
3629 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3631 def test_pac_well_known_groups_not_present(self):
3632 """Test that authentication fails if the device does not belong to one
3633 or more required well‐known groups.
3637 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3638 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
3639 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3640 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3644 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3645 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3648 # Create a machine account with which to perform FAST.
3649 mach_creds = self.get_cached_creds(
3650 account_type=self.AccountType.COMPUTER,
3651 opts={'id': 'device'})
3652 mach_tgt = self.get_tgt(mach_creds)
3654 # Modify the machine account’s TGT to contain only the SID of the
3655 # machine account’s primary group.
3656 mach_tgt = self.modified_ticket(
3658 modify_pac_fn=partial(self.set_pac_sids,
3659 new_sids=device_sids),
3660 checksum_keys=self.get_krbtgt_checksum_key())
3662 # Create an authentication policy that requires the device to belong to
3664 client_policy_sddl = self.allow_if(
3665 f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
3666 client_policy = self.create_authn_policy(
3667 enforced=True, user_allowed_from=client_policy_sddl)
3669 # Create a user account with the assigned policy.
3670 client_creds = self._get_creds(account_type=self.AccountType.USER,
3671 assigned_policy=client_policy)
3673 # Show that authentication fails.
3674 self._armored_as_req(client_creds,
3675 self.get_krbtgt_creds(),
3677 expected_error=KDC_ERR_POLICY)
3681 armor_creds=mach_creds,
3682 client_policy=client_policy,
3683 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3684 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3685 reason=AuditReason.ACCESS_DENIED,
3686 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3688 def test_pac_device_info(self):
3689 """Test the groups of the client and the device after performing a
3690 FAST‐armored AS‐REQ.
3694 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3695 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3698 # Create a machine account with which to perform FAST.
3699 mach_creds = self.get_cached_creds(
3700 account_type=self.AccountType.COMPUTER,
3701 opts={'id': 'device'})
3702 mach_tgt = self.get_tgt(mach_creds)
3704 # Add the required groups to the machine account’s TGT.
3705 mach_tgt = self.modified_ticket(
3707 modify_pac_fn=partial(self.set_pac_sids,
3708 new_sids=device_sids),
3709 checksum_keys=self.get_krbtgt_checksum_key())
3711 # Create a user account.
3712 client_creds = self._get_creds(account_type=self.AccountType.USER)
3714 target_creds = self.get_cached_creds(
3715 account_type=self.AccountType.COMPUTER,
3716 opts={'id': 'target'})
3719 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3720 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3721 # The client’s groups are to include the Asserted Identity and
3722 # Claims Valid SIDs.
3723 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
3724 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
3727 samdb = self.get_samdb()
3728 domain_sid_str = samdb.get_domain_sid()
3730 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
3732 # Show that authentication succeeds. Check that the groups in the PAC
3734 self._armored_as_req(client_creds,
3737 expected_groups=expected_sids,
3738 expect_device_info=False,
3739 expected_device_groups=None)
3743 armor_creds=mach_creds)
3745 def test_pac_claims_not_present(self):
3746 """Test that authentication fails if the device does not have a
3750 claim_id = 'the name of the claim'
3751 claim_value = 'the value of the claim'
3753 # Create a machine account with which to perform FAST.
3754 mach_creds = self.get_cached_creds(
3755 account_type=self.AccountType.COMPUTER,
3756 opts={'id': 'device'})
3757 mach_tgt = self.get_tgt(mach_creds)
3759 # Create an authentication policy that requires the device to have a
3761 client_policy_sddl = self.allow_if(
3762 f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
3763 client_policy = self.create_authn_policy(
3764 enforced=True, user_allowed_from=client_policy_sddl)
3766 # Create a user account with the assigned policy.
3767 client_creds = self._get_creds(account_type=self.AccountType.USER,
3768 assigned_policy=client_policy)
3770 # Show that authentication fails.
3771 self._armored_as_req(client_creds,
3772 self.get_krbtgt_creds(),
3774 expected_error=KDC_ERR_POLICY)
3778 armor_creds=mach_creds,
3779 client_policy=client_policy,
3780 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3781 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3782 reason=AuditReason.ACCESS_DENIED,
3783 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3785 def test_pac_claims_present(self):
3786 """Test that authentication succeeds if the device has a required
3790 claim_id = 'the name of the claim'
3791 claim_value = 'the value of the claim'
3794 (claims.CLAIMS_SOURCE_TYPE_AD, [
3795 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3799 # Create a machine account with which to perform FAST.
3800 mach_creds = self.get_cached_creds(
3801 account_type=self.AccountType.COMPUTER,
3802 opts={'id': 'device'})
3803 mach_tgt = self.get_tgt(mach_creds)
3805 # Add the required claim to the machine account’s TGT.
3806 mach_tgt = self.modified_ticket(
3808 modify_pac_fn=partial(self.set_pac_claims,
3809 client_claims=pac_claims),
3810 checksum_keys=self.get_krbtgt_checksum_key())
3812 # Create an authentication policy that requires the device to have a
3814 client_policy_sddl = self.allow_if(
3815 f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
3816 client_policy = self.create_authn_policy(
3817 enforced=True, user_allowed_from=client_policy_sddl)
3819 # Create a user account with the assigned policy.
3820 client_creds = self._get_creds(account_type=self.AccountType.USER,
3821 assigned_policy=client_policy)
3823 # Show that authentication succeeds.
3824 self._armored_as_req(client_creds,
3825 self.get_krbtgt_creds(),
3828 self.check_as_log(client_creds,
3829 armor_creds=mach_creds,
3830 client_policy=client_policy)
3832 def test_pac_claims_invalid(self):
3833 """Test that authentication fails if the device’s required claim is not
3837 claim_id = 'the name of the claim'
3838 claim_value = 'the value of the claim'
3841 (claims.CLAIMS_SOURCE_TYPE_AD, [
3842 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
3846 # The device’s SIDs do not include the Claims Valid SID.
3848 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
3849 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
3852 # Create a machine account with which to perform FAST.
3853 mach_creds = self.get_cached_creds(
3854 account_type=self.AccountType.COMPUTER,
3855 opts={'id': 'device'})
3856 mach_tgt = self.get_tgt(mach_creds)
3858 # Add the SIDs and the required claim to the machine account’s TGT.
3859 mach_tgt = self.modified_ticket(
3862 partial(self.set_pac_claims, client_claims=pac_claims),
3863 partial(self.set_pac_sids, new_sids=device_sids)],
3864 checksum_keys=self.get_krbtgt_checksum_key())
3866 # Create an authentication policy that requires the device to have a
3868 client_policy_sddl = self.allow_if(
3869 f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
3870 client_policy = self.create_authn_policy(
3871 enforced=True, user_allowed_from=client_policy_sddl)
3873 # Create a user account with the assigned policy.
3874 client_creds = self._get_creds(account_type=self.AccountType.USER,
3875 assigned_policy=client_policy)
3877 # Show that authentication fails.
3878 self._armored_as_req(client_creds,
3879 self.get_krbtgt_creds(),
3881 expected_error=KDC_ERR_POLICY)
3885 armor_creds=mach_creds,
3886 client_policy=client_policy,
3887 client_policy_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3888 event=AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3889 reason=AuditReason.ACCESS_DENIED,
3890 status=ntstatus.NT_STATUS_INVALID_WORKSTATION)
3892 def test_device_in_world_group(self):
3893 self._check_device_in_group(security.SID_WORLD)
3895 def test_device_in_network_group(self):
3896 self._check_device_not_in_group(security.SID_NT_NETWORK)
3898 def test_device_in_authenticated_users(self):
3899 self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
3901 def test_device_in_aa_asserted_identity(self):
3902 self._check_device_in_group(
3903 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
3905 def test_device_in_service_asserted_identity(self):
3906 self._check_device_not_in_group(security.SID_SERVICE_ASSERTED_IDENTITY)
3908 def test_device_in_compounded_authentication(self):
3909 self._check_device_not_in_group(security.SID_COMPOUNDED_AUTHENTICATION)
3911 def test_device_in_claims_valid(self):
3912 self._check_device_in_group(security.SID_CLAIMS_VALID)
3914 def _check_device_in_group(self, group):
3915 self._check_device_membership(group, expect_in_group=True)
3917 def _check_device_not_in_group(self, group):
3918 self._check_device_membership(group, expect_in_group=False)
3920 def _check_device_membership(self, group, *, expect_in_group):
3921 """Test that authentication succeeds or fails when the device is
3922 required to belong to a certain group.
3925 # Create a machine account with which to perform FAST.
3926 mach_creds = self.get_cached_creds(
3927 account_type=self.AccountType.COMPUTER,
3928 opts={'id': 'device'})
3929 mach_tgt = self.get_tgt(mach_creds)
3931 # Create an authentication policy that requires the device to belong to
3933 in_group_sddl = self.allow_if(f'Member_of {{SID({group})}}')
3934 in_group_policy = self.create_authn_policy(
3935 enforced=True, user_allowed_from=in_group_sddl)
3937 # Create a user account with the assigned policy.
3938 client_creds = self._get_creds(account_type=self.AccountType.USER,
3939 assigned_policy=in_group_policy)
3941 krbtgt_creds = self.get_krbtgt_creds()
3943 # Test whether authentication succeeds or fails.
3944 self._armored_as_req(
3948 expected_error=0 if expect_in_group else KDC_ERR_POLICY)
3950 policy_success_args = {}
3951 policy_failure_args = {
3952 'client_policy_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
3953 'event': AuditEvent.KERBEROS_DEVICE_RESTRICTION,
3954 'reason': AuditReason.ACCESS_DENIED,
3955 'status': ntstatus.NT_STATUS_INVALID_WORKSTATION,
3958 self.check_as_log(client_creds,
3959 armor_creds=mach_creds,
3960 client_policy=in_group_policy,
3961 **(policy_success_args if expect_in_group
3962 else policy_failure_args))
3964 # Create an authentication policy that requires the device not to belong
3966 not_in_group_sddl = self.allow_if(f'Not_Member_of {{SID({group})}}')
3967 not_in_group_policy = self.create_authn_policy(
3968 enforced=True, user_allowed_from=not_in_group_sddl)
3970 # Create a user account with the assigned policy.
3971 client_creds = self._get_creds(account_type=self.AccountType.USER,
3972 assigned_policy=not_in_group_policy)
3974 # Test whether authentication succeeds or fails.
3975 self._armored_as_req(
3979 expected_error=KDC_ERR_POLICY if expect_in_group else 0)
3981 self.check_as_log(client_creds,
3982 armor_creds=mach_creds,
3983 client_policy=not_in_group_policy,
3984 **(policy_failure_args if expect_in_group
3985 else policy_success_args))
3988 class TgsReqServicePolicyTests(ConditionalAceBaseTests):
3989 def test_pac_groups_not_present(self):
3990 """Test that authorization succeeds if the client does not belong to
3991 some required groups.
3995 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
3996 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
3999 # Create a machine account with which to perform FAST.
4000 mach_creds = self.get_cached_creds(
4001 account_type=self.AccountType.COMPUTER,
4002 opts={'id': 'device'})
4003 mach_tgt = self.get_tgt(mach_creds)
4005 # Create a user account.
4006 client_creds = self._get_creds(account_type=self.AccountType.USER)
4007 client_tgt = self.get_tgt(client_creds)
4009 # Create an authentication policy that requires the client to belong to
4011 target_policy_sddl = self.allow_if(
4012 f'Member_of {self.sddl_array_from_sids(required_sids)}')
4013 target_policy = self.create_authn_policy(
4014 enforced=True, computer_allowed_to=target_policy_sddl)
4016 # Create a target account with the assigned policy.
4017 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4018 assigned_policy=target_policy)
4020 # Show that authorization fails.
4022 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4024 expect_edata=self.expect_padata_outer,
4025 # We aren’t particular about whether or not we get an NTSTATUS.
4027 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4030 client_creds, target_creds,
4031 policy=target_policy,
4032 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4033 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4034 reason=AuditReason.ACCESS_DENIED)
4036 def test_pac_groups_present(self):
4037 """Test that authorization succeeds if the client belongs to some
4042 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
4043 ('S-1-9-8-7', SidType.EXTRA_SID, self.default_attrs),
4046 client_sids = required_sids | {
4047 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4048 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4051 # Create a machine account with which to perform FAST.
4052 mach_creds = self.get_cached_creds(
4053 account_type=self.AccountType.COMPUTER,
4054 opts={'id': 'device'})
4055 mach_tgt = self.get_tgt(mach_creds)
4057 # Create a user account.
4058 client_creds = self._get_creds(account_type=self.AccountType.USER)
4059 client_tgt = self.get_tgt(client_creds)
4061 # Add the required groups to the client’s TGT.
4062 client_tgt = self.modified_ticket(
4064 modify_pac_fn=partial(self.set_pac_sids,
4065 new_sids=client_sids),
4066 checksum_keys=self.get_krbtgt_checksum_key())
4068 # Create an authentication policy that requires the client to belong to
4070 target_policy_sddl = self.allow_if(
4071 f'Member_of {self.sddl_array_from_sids(required_sids)}')
4072 target_policy = self.create_authn_policy(
4073 enforced=True, computer_allowed_to=target_policy_sddl)
4075 # Create a target account with the assigned policy.
4076 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4077 assigned_policy=target_policy)
4079 # Show that authorization succeeds.
4080 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4082 self.check_tgs_log(client_creds, target_creds,
4083 policy=target_policy)
4085 def test_pac_resource_groups_present_to_service_sid_compression(self):
4086 """Test that authorization succeeds if the client belongs to some
4087 required resource groups, and the request is to a service that supports
4092 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
4093 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
4094 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
4097 client_sids = required_sids | {
4098 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4099 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4102 # Create a machine account with which to perform FAST.
4103 mach_creds = self.get_cached_creds(
4104 account_type=self.AccountType.COMPUTER,
4105 opts={'id': 'device'})
4106 mach_tgt = self.get_tgt(mach_creds)
4108 # Create a user account.
4109 client_creds = self._get_creds(account_type=self.AccountType.USER)
4110 client_tgt = self.get_tgt(client_creds)
4112 # Add the required groups to the client’s TGT.
4113 client_tgt = self.modified_ticket(
4115 modify_pac_fn=partial(self.set_pac_sids,
4116 new_sids=client_sids),
4117 checksum_keys=self.get_krbtgt_checksum_key())
4119 # Create an authentication policy that requires the client to belong to
4121 target_policy_sddl = self.allow_if(
4122 f'Member_of {self.sddl_array_from_sids(required_sids)}')
4123 target_policy = self.create_authn_policy(
4124 enforced=True, computer_allowed_to=target_policy_sddl)
4126 # Create a target account with the assigned policy.
4127 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4128 assigned_policy=target_policy)
4130 # Show that authorization fails.
4132 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4134 expect_edata=self.expect_padata_outer,
4135 # We aren’t particular about whether or not we get an NTSTATUS.
4137 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4140 client_creds, target_creds,
4141 policy=target_policy,
4142 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4143 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4144 reason=AuditReason.ACCESS_DENIED)
4146 def test_pac_resource_groups_present_to_service_no_sid_compression(self):
4147 """Test that authorization succeeds if the client belongs to some
4148 required resource groups, and the request is to a service that does not
4149 support SID compression.
4153 ('S-1-2-3-4', SidType.RESOURCE_SID, self.resource_attrs),
4154 ('S-1-2-3-5', SidType.RESOURCE_SID, self.resource_attrs),
4155 ('S-1-2-3-6', SidType.RESOURCE_SID, self.resource_attrs),
4158 client_sids = required_sids | {
4159 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4160 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4163 # Create a machine account with which to perform FAST.
4164 mach_creds = self.get_cached_creds(
4165 account_type=self.AccountType.COMPUTER,
4166 opts={'id': 'device'})
4167 mach_tgt = self.get_tgt(mach_creds)
4169 # Create a user account.
4170 client_creds = self._get_creds(account_type=self.AccountType.USER)
4171 client_tgt = self.get_tgt(client_creds)
4173 # Add the required groups to the client’s TGT.
4174 client_tgt = self.modified_ticket(
4176 modify_pac_fn=partial(self.set_pac_sids,
4177 new_sids=client_sids),
4178 checksum_keys=self.get_krbtgt_checksum_key())
4180 # Create an authentication policy that requires the client to belong to
4182 target_policy_sddl = self.allow_if(
4183 f'Member_of {self.sddl_array_from_sids(required_sids)}')
4184 target_policy = self.create_authn_policy(
4185 enforced=True, computer_allowed_to=target_policy_sddl)
4187 # Create a target account with the assigned policy.
4188 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4189 assigned_policy=target_policy,
4190 additional_details={
4191 'msDS-SupportedEncryptionTypes': str((
4192 security.KERB_ENCTYPE_RC4_HMAC_MD5) | (
4193 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96_SK) | (
4194 security.KERB_ENCTYPE_RESOURCE_SID_COMPRESSION_DISABLED))})
4196 # Show that authorization fails.
4198 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4200 expect_edata=self.expect_padata_outer,
4201 # We aren’t particular about whether or not we get an NTSTATUS.
4203 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4206 client_creds, target_creds,
4207 policy=target_policy,
4208 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4209 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4210 reason=AuditReason.ACCESS_DENIED)
4212 def test_pac_well_known_groups_not_present(self):
4213 """Test that authorization fails if the client does not belong to one
4214 or more required well‐known groups.
4218 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
4219 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs),
4220 (self.aa_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
4221 (self.service_asserted_identity, SidType.EXTRA_SID, self.default_attrs),
4225 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4226 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4229 # Create a machine account with which to perform FAST.
4230 mach_creds = self.get_cached_creds(
4231 account_type=self.AccountType.COMPUTER,
4232 opts={'id': 'device'})
4233 mach_tgt = self.get_tgt(mach_creds)
4235 # Create a user account.
4236 client_creds = self._get_creds(account_type=self.AccountType.USER)
4237 client_tgt = self.get_tgt(client_creds)
4239 # Modify the client’s TGT to contain only the SID of the client’s
4241 client_tgt = self.modified_ticket(
4243 modify_pac_fn=partial(self.set_pac_sids,
4244 new_sids=client_sids),
4245 checksum_keys=self.get_krbtgt_checksum_key())
4247 # Create an authentication policy that requires the client to belong to
4249 target_policy_sddl = self.allow_if(
4250 f'Member_of_any {self.sddl_array_from_sids(required_sids)}')
4251 target_policy = self.create_authn_policy(
4252 enforced=True, computer_allowed_to=target_policy_sddl)
4254 # Create a target account with the assigned policy.
4255 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4256 assigned_policy=target_policy)
4258 # Show that authorization fails.
4260 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4262 expect_edata=self.expect_padata_outer,
4263 # We aren’t particular about whether or not we get an NTSTATUS.
4265 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4268 client_creds, target_creds,
4269 policy=target_policy,
4270 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4271 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4272 reason=AuditReason.ACCESS_DENIED)
4274 def test_pac_device_info(self):
4275 self._run_pac_device_info_test()
4277 def test_pac_device_info_target_policy(self):
4278 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4279 self._run_pac_device_info_test(target_policy=target_policy)
4281 def test_pac_device_info_rodc_issued(self):
4282 self._run_pac_device_info_test(rodc_issued=True)
4284 def test_pac_device_info_existing_device_info(self):
4285 self._run_pac_device_info_test(existing_device_info=True)
4287 def test_pac_device_info_existing_device_info_target_policy(self):
4288 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4289 self._run_pac_device_info_test(target_policy=target_policy,
4290 existing_device_info=True)
4292 def test_pac_device_info_existing_device_info_rodc_issued(self):
4293 self._run_pac_device_info_test(rodc_issued=True,
4294 existing_device_info=True)
4296 def test_pac_device_info_existing_device_claims(self):
4297 self._run_pac_device_info_test(existing_device_claims=True)
4299 def test_pac_device_info_existing_device_claims_target_policy(self):
4300 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4301 self._run_pac_device_info_test(target_policy=target_policy,
4302 existing_device_claims=True)
4304 def test_pac_device_info_existing_device_claims_rodc_issued(self):
4305 self._run_pac_device_info_test(rodc_issued=True,
4306 existing_device_claims=True)
4308 def test_pac_device_info_existing_device_info_and_claims(self):
4309 self._run_pac_device_info_test(existing_device_claims=True,
4310 existing_device_info=True)
4312 def test_pac_device_info_existing_device_info_and_claims_target_policy(self):
4313 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4314 self._run_pac_device_info_test(target_policy=target_policy,
4315 existing_device_claims=True,
4316 existing_device_info=True)
4318 def test_pac_device_info_existing_device_info_and_claims_rodc_issued(self):
4319 self._run_pac_device_info_test(rodc_issued=True,
4320 existing_device_claims=True,
4321 existing_device_info=True)
4323 def test_pac_device_info_no_compound_id_support(self):
4324 self._run_pac_device_info_test(compound_id_support=False)
4326 def test_pac_device_info_no_compound_id_support_target_policy(self):
4327 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4328 self._run_pac_device_info_test(target_policy=target_policy,
4329 compound_id_support=False)
4331 def test_pac_device_info_no_compound_id_support_rodc_issued(self):
4332 self._run_pac_device_info_test(rodc_issued=True,
4333 compound_id_support=False)
4335 def test_pac_device_info_no_compound_id_support_existing_device_info(self):
4336 self._run_pac_device_info_test(compound_id_support=False,
4337 existing_device_info=True)
4339 def test_pac_device_info_no_compound_id_support_existing_device_info_target_policy(self):
4340 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4341 self._run_pac_device_info_test(target_policy=target_policy,
4342 compound_id_support=False,
4343 existing_device_info=True)
4345 def test_pac_device_info_no_compound_id_support_existing_device_info_rodc_issued(self):
4346 self._run_pac_device_info_test(rodc_issued=True,
4347 compound_id_support=False,
4348 existing_device_info=True)
4350 def test_pac_device_info_no_compound_id_support_existing_device_claims(self):
4351 self._run_pac_device_info_test(compound_id_support=False,
4352 existing_device_claims=True)
4354 def test_pac_device_info_no_compound_id_support_existing_device_claims_target_policy(self):
4355 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4356 self._run_pac_device_info_test(target_policy=target_policy,
4357 compound_id_support=False,
4358 existing_device_claims=True)
4360 def test_pac_device_info_no_compound_id_support_existing_device_claims_rodc_issued(self):
4361 self._run_pac_device_info_test(rodc_issued=True,
4362 compound_id_support=False,
4363 existing_device_claims=True)
4365 def test_pac_device_info_no_compound_id_support_existing_device_info_and_claims(self):
4366 self._run_pac_device_info_test(compound_id_support=False,
4367 existing_device_claims=True,
4368 existing_device_info=True)
4370 def test_pac_device_info_no_compound_id_support_existing_device_info_and_claims_target_policy(self):
4371 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4372 self._run_pac_device_info_test(target_policy=target_policy,
4373 compound_id_support=False,
4374 existing_device_claims=True,
4375 existing_device_info=True)
4377 def test_pac_device_info_no_compound_id_support_existing_device_info_and_claims_rodc_issued(self):
4378 self._run_pac_device_info_test(rodc_issued=True,
4379 compound_id_support=False,
4380 existing_device_claims=True,
4381 existing_device_info=True)
4383 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info(self):
4384 self._run_pac_device_info_test(device_claims_valid=False,
4385 compound_id_support=False,
4386 existing_device_info=True)
4388 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_target_policy(self):
4389 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4390 self._run_pac_device_info_test(target_policy=target_policy,
4391 device_claims_valid=False,
4392 compound_id_support=False,
4393 existing_device_info=True)
4395 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_rodc_issued(self):
4396 self._run_pac_device_info_test(rodc_issued=True,
4397 device_claims_valid=False,
4398 compound_id_support=False,
4399 existing_device_info=True)
4401 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_claims(self):
4402 self._run_pac_device_info_test(device_claims_valid=False,
4403 compound_id_support=False,
4404 existing_device_claims=True)
4406 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_claims_target_policy(self):
4407 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4408 self._run_pac_device_info_test(target_policy=target_policy,
4409 device_claims_valid=False,
4410 compound_id_support=False,
4411 existing_device_claims=True)
4413 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_claims_rodc_issued(self):
4414 self._run_pac_device_info_test(rodc_issued=True,
4415 device_claims_valid=False,
4416 compound_id_support=False,
4417 existing_device_claims=True)
4419 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_and_claims(self):
4420 self._run_pac_device_info_test(device_claims_valid=False,
4421 compound_id_support=False,
4422 existing_device_claims=True,
4423 existing_device_info=True)
4425 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_and_claims_target_policy(self):
4426 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4427 self._run_pac_device_info_test(target_policy=target_policy,
4428 device_claims_valid=False,
4429 compound_id_support=False,
4430 existing_device_claims=True,
4431 existing_device_info=True)
4433 def test_pac_device_info_no_compound_id_support_no_claims_valid_existing_device_info_and_claims_rodc_issued(self):
4434 self._run_pac_device_info_test(rodc_issued=True,
4435 device_claims_valid=False,
4436 compound_id_support=False,
4437 existing_device_claims=True,
4438 existing_device_info=True)
4440 def test_pac_device_info_no_claims_valid(self):
4441 self._run_pac_device_info_test(device_claims_valid=False)
4443 def test_pac_device_info_no_claims_valid_target_policy(self):
4444 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4445 self._run_pac_device_info_test(target_policy=target_policy,
4446 device_claims_valid=False)
4448 def test_pac_device_info_no_claims_valid_rodc_issued(self):
4449 self._run_pac_device_info_test(rodc_issued=True,
4450 device_claims_valid=False)
4452 def test_pac_device_info_no_claims_valid_existing_device_info(self):
4453 self._run_pac_device_info_test(device_claims_valid=False,
4454 existing_device_info=True)
4456 def test_pac_device_info_no_claims_valid_existing_device_info_target_policy(self):
4457 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4458 self._run_pac_device_info_test(target_policy=target_policy,
4459 device_claims_valid=False,
4460 existing_device_info=True)
4462 def test_pac_device_info_no_claims_valid_existing_device_info_rodc_issued(self):
4463 self._run_pac_device_info_test(rodc_issued=True,
4464 device_claims_valid=False,
4465 existing_device_info=True)
4467 def test_pac_device_info_no_claims_valid_existing_device_claims(self):
4468 self._run_pac_device_info_test(device_claims_valid=False,
4469 existing_device_claims=True)
4471 def test_pac_device_info_no_claims_valid_existing_device_claims_target_policy(self):
4472 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4473 self._run_pac_device_info_test(target_policy=target_policy,
4474 device_claims_valid=False,
4475 existing_device_claims=True)
4477 def test_pac_device_info_no_claims_valid_existing_device_claims_rodc_issued(self):
4478 self._run_pac_device_info_test(rodc_issued=True,
4479 device_claims_valid=False,
4480 existing_device_claims=True)
4482 def test_pac_device_info_no_claims_valid_existing_device_info_and_claims(self):
4483 self._run_pac_device_info_test(device_claims_valid=False,
4484 existing_device_claims=True,
4485 existing_device_info=True)
4487 def test_pac_device_info_no_claims_valid_existing_device_info_and_claims_target_policy(self):
4488 target_policy = self.allow_if('Device_Member_of {{SID({device_0})}}')
4489 self._run_pac_device_info_test(target_policy=target_policy,
4490 device_claims_valid=False,
4491 existing_device_claims=True,
4492 existing_device_info=True)
4494 def test_pac_device_info_no_claims_valid_existing_device_info_and_claims_rodc_issued(self):
4495 self._run_pac_device_info_test(rodc_issued=True,
4496 device_claims_valid=False,
4497 existing_device_claims=True,
4498 existing_device_info=True)
4500 def _run_pac_device_info_test(self, *,
4503 compound_id_support=True,
4504 device_claims_valid=True,
4505 existing_device_claims=False,
4506 existing_device_info=False):
4507 """Test the groups of the client and the device after performing a
4508 FAST‐armored TGS‐REQ.
4511 client_claim_id = 'the name of the client’s client claim'
4512 client_claim_value = 'the value of the client’s client claim'
4515 (claims.CLAIMS_SOURCE_TYPE_AD, [
4516 (client_claim_id, claims.CLAIM_TYPE_STRING, [client_claim_value]),
4521 expected_client_claims = {
4523 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
4524 'type': claims.CLAIM_TYPE_STRING,
4525 'values': (client_claim_value,),
4529 expected_client_claims = None
4531 device_claim_id = 'the name of the device’s client claim'
4532 device_claim_value = 'the value of the device’s client claim'
4535 (claims.CLAIMS_SOURCE_TYPE_AD, [
4536 (device_claim_id, claims.CLAIM_TYPE_STRING, [device_claim_value]),
4540 existing_claim_id = 'the name of an existing device claim'
4541 existing_claim_value = 'the value of an existing device claim'
4544 (claims.CLAIMS_SOURCE_TYPE_CERTIFICATE, [
4545 (existing_claim_id, claims.CLAIM_TYPE_STRING, [existing_claim_value]),
4550 expected_device_claims = None
4551 elif existing_device_info and existing_device_claims:
4552 expected_device_claims = {
4553 existing_claim_id: {
4554 'source_type': claims.CLAIMS_SOURCE_TYPE_CERTIFICATE,
4555 'type': claims.CLAIM_TYPE_STRING,
4556 'values': (existing_claim_value,),
4559 elif compound_id_support and not existing_device_info and not existing_device_claims:
4560 expected_device_claims = {
4562 'source_type': claims.CLAIMS_SOURCE_TYPE_AD,
4563 'type': claims.CLAIM_TYPE_STRING,
4564 'values': (device_claim_value,),
4568 expected_device_claims = None
4570 # Create a machine account with which to perform FAST.
4571 mach_creds = self.get_cached_creds(
4572 account_type=self.AccountType.COMPUTER,
4573 opts={'id': 'device'})
4574 mach_tgt = self.get_tgt(mach_creds)
4577 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4578 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4579 # This to ensure we have EXTRA_SIDS set already, as
4580 # windows won't set that flag otherwise when adding one
4582 ('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs),
4585 device_sid_0 = 'S-1-3-4-5'
4586 device_sid_1 = 'S-1-4-5-6'
4589 'device_0': device_sid_0,
4590 'device_1': device_sid_1,
4594 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4595 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4596 (device_sid_0, SidType.EXTRA_SID, self.resource_attrs),
4597 (device_sid_1, SidType.EXTRA_SID, self.resource_attrs),
4600 if device_claims_valid:
4601 device_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
4603 checksum_key = self.get_krbtgt_checksum_key()
4605 # Modify the machine account’s TGT to contain only the SID of the
4606 # machine account’s primary group.
4607 mach_tgt = self.modified_ticket(
4610 partial(self.set_pac_sids,
4611 new_sids=device_sids),
4612 partial(self.set_pac_claims, client_claims=device_claims),
4614 checksum_keys=checksum_key)
4616 # Create a user account.
4617 client_creds = self.get_cached_creds(
4618 account_type=self.AccountType.USER,
4620 'allowed_replication_mock': rodc_issued,
4621 'revealed_to_mock_rodc': rodc_issued,
4623 client_tgt = self.get_tgt(client_creds)
4625 client_modify_pac_fns = [
4626 partial(self.set_pac_sids,
4627 new_sids=client_sids),
4628 partial(self.set_pac_claims, client_claims=client_claims),
4631 if existing_device_claims:
4632 client_modify_pac_fns.append(
4633 partial(self.set_pac_claims, device_claims=existing_claims))
4634 if existing_device_info:
4635 # These are different from the SIDs in the device’s TGT.
4636 existing_sid_0 = 'S-1-7-8-9'
4637 existing_sid_1 = 'S-1-9-8-7'
4639 policy_sids.update({
4640 'existing_0': existing_sid_0,
4641 'existing_1': existing_sid_1,
4645 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4646 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4647 (existing_sid_0, SidType.EXTRA_SID, self.resource_attrs),
4648 (existing_sid_1, SidType.EXTRA_SID, self.resource_attrs),
4651 client_modify_pac_fns.append(partial(
4652 self.set_pac_device_sids, new_sids=existing_sids, user_rid=mach_creds.get_rid()))
4655 rodc_krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
4656 rodc_krbtgt_key = self.TicketDecryptionKey_from_creds(rodc_krbtgt_creds)
4657 rodc_checksum_key = {
4658 krb5pac.PAC_TYPE_KDC_CHECKSUM: rodc_krbtgt_key,
4661 # Modify the client’s TGT to contain only the SID of the client’s
4663 client_tgt = self.modified_ticket(
4665 modify_pac_fn=client_modify_pac_fns,
4666 new_ticket_key=rodc_krbtgt_key if rodc_issued else None,
4667 checksum_keys=rodc_checksum_key if rodc_issued else checksum_key)
4669 if target_policy is None:
4671 assigned_policy = None
4673 policy = self.create_authn_policy(
4675 computer_allowed_to=target_policy.format_map(policy_sids))
4676 assigned_policy = str(policy.dn)
4678 target_creds = self.get_cached_creds(
4679 account_type=self.AccountType.COMPUTER,
4681 'supported_enctypes':
4682 security.KERB_ENCTYPE_RC4_HMAC_MD5
4683 | security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96,
4684 # Indicate that Compound Identity is supported.
4685 'compound_id_support': compound_id_support,
4686 'assigned_policy': assigned_policy,
4690 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4691 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4692 # The client’s groups are not to include the Asserted Identity and
4693 # Claims Valid SIDs.
4696 expected_sids.add((security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs))
4698 expected_sids.add(('S-1-2-3-4', SidType.EXTRA_SID, self.default_attrs))
4701 expected_device_sids = None
4702 elif existing_device_info:
4703 expected_device_sids = {
4704 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4705 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4706 ('S-1-7-8-9', SidType.EXTRA_SID, self.resource_attrs),
4707 ('S-1-9-8-7', SidType.EXTRA_SID, self.resource_attrs),
4709 elif compound_id_support and not existing_device_claims:
4710 expected_sids.add((security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs))
4712 expected_device_sids = {
4713 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4714 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4715 ('S-1-3-4-5', SidType.EXTRA_SID, self.resource_attrs),
4716 ('S-1-4-5-6', SidType.EXTRA_SID, self.resource_attrs),
4719 if device_claims_valid:
4720 expected_device_sids.add(frozenset([(security.SID_CLAIMS_VALID, SidType.RESOURCE_SID, self.default_attrs)]))
4722 expected_device_sids = None
4724 samdb = self.get_samdb()
4725 domain_sid_str = samdb.get_domain_sid()
4727 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
4728 # The device SIDs will be put into the PAC unmodified.
4729 expected_device_sids = self.map_sids(expected_device_sids, None, domain_sid_str)
4731 # Show that authorization succeeds.
4732 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
4733 expected_groups=expected_sids,
4734 expect_device_info=bool(expected_device_sids),
4735 expected_device_domain_sid=domain_sid_str,
4736 expected_device_groups=expected_device_sids,
4737 expect_client_claims=True,
4738 expected_client_claims=expected_client_claims,
4739 expect_device_claims=bool(expected_device_claims),
4740 expected_device_claims=expected_device_claims)
4742 self.check_tgs_log(client_creds, target_creds, policy=policy)
4744 def test_pac_extra_sids_behaviour(self):
4745 """Test the groups of the client and the device after performing a
4746 FAST‐armored TGS‐REQ.
4749 # Create a machine account with which to perform FAST.
4750 mach_creds = self.get_cached_creds(
4751 account_type=self.AccountType.COMPUTER,
4752 opts={'id': 'device'})
4753 mach_tgt = self.get_tgt(mach_creds)
4756 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4757 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4760 # Create a user account.
4761 client_creds = self._get_creds(account_type=self.AccountType.USER)
4762 client_tgt = self.get_tgt(client_creds)
4764 # Modify the client’s TGT to contain only the SID of the client’s
4766 client_tgt = self.modified_ticket(
4768 modify_pac_fn=partial(self.set_pac_sids,
4769 new_sids=client_sids),
4770 checksum_keys=self.get_krbtgt_checksum_key())
4772 # Indicate that Compound Identity is supported.
4773 target_creds, _ = self.get_target(to_krbtgt=False, compound_id=True)
4776 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4777 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4778 (security.SID_COMPOUNDED_AUTHENTICATION, SidType.EXTRA_SID, self.default_attrs)
4779 # The client’s groups are not to include the Asserted Identity and
4780 # Claims Valid SIDs.
4783 samdb = self.get_samdb()
4784 domain_sid_str = samdb.get_domain_sid()
4786 expected_sids = self.map_sids(expected_sids, None, domain_sid_str)
4788 # Show that authorization succeeds.
4789 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt,
4790 expected_groups=expected_sids)
4792 self.check_tgs_log(client_creds, target_creds)
4794 def test_pac_claims_not_present(self):
4795 """Test that authentication fails if the device does not have a
4799 claim_id = 'the name of the claim'
4800 claim_value = 'the value of the claim'
4802 # Create a machine account with which to perform FAST.
4803 mach_creds = self.get_cached_creds(
4804 account_type=self.AccountType.COMPUTER,
4805 opts={'id': 'device'})
4806 mach_tgt = self.get_tgt(mach_creds)
4808 # Create an authentication policy that requires the device to have a
4810 target_policy_sddl = self.allow_if(
4811 f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
4812 target_policy = self.create_authn_policy(
4813 enforced=True, computer_allowed_to=target_policy_sddl)
4815 # Create a user account.
4816 client_creds = self._get_creds(account_type=self.AccountType.USER)
4817 client_tgt = self.get_tgt(client_creds)
4819 # Create a target account with the assigned policy.
4820 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4821 assigned_policy=target_policy)
4823 # Show that authorization fails.
4825 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4827 expect_edata=self.expect_padata_outer,
4828 # We aren’t particular about whether or not we get an NTSTATUS.
4830 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4835 policy=target_policy,
4836 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4837 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4838 reason=AuditReason.ACCESS_DENIED)
4840 def test_pac_claims_present(self):
4841 """Test that authentication succeeds if the user has a required
4845 claim_id = 'the name of the claim'
4846 claim_value = 'the value of the claim'
4849 (claims.CLAIMS_SOURCE_TYPE_AD, [
4850 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4854 # Create a machine account with which to perform FAST.
4855 mach_creds = self.get_cached_creds(
4856 account_type=self.AccountType.COMPUTER,
4857 opts={'id': 'device'})
4858 mach_tgt = self.get_tgt(mach_creds)
4860 # Create an authentication policy that requires the user to have a
4862 target_policy_sddl = self.allow_if(
4863 f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
4864 target_policy = self.create_authn_policy(
4865 enforced=True, computer_allowed_to=target_policy_sddl)
4867 # Create a user account.
4868 client_creds = self._get_creds(account_type=self.AccountType.USER)
4869 client_tgt = self.get_tgt(client_creds)
4871 # Add the required claim to the client’s TGT.
4872 client_tgt = self.modified_ticket(
4874 modify_pac_fn=partial(self.set_pac_claims,
4875 client_claims=pac_claims),
4876 checksum_keys=self.get_krbtgt_checksum_key())
4878 # Create a target account with the assigned policy.
4879 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4880 assigned_policy=target_policy)
4882 # Show that authorization succeeds.
4883 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
4885 self.check_tgs_log(client_creds, target_creds,
4886 policy=target_policy)
4888 def test_pac_claims_invalid(self):
4889 """Test that authentication fails if the device’s required claim is not
4893 claim_id = 'the name of the claim'
4894 claim_value = 'the value of the claim'
4897 (claims.CLAIMS_SOURCE_TYPE_AD, [
4898 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
4902 # The device’s SIDs do not include the Claims Valid SID.
4904 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
4905 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
4908 # Create a machine account with which to perform FAST.
4909 mach_creds = self.get_cached_creds(
4910 account_type=self.AccountType.COMPUTER,
4911 opts={'id': 'device'})
4912 mach_tgt = self.get_tgt(mach_creds)
4914 # Create an authentication policy that requires the device to have a
4916 target_policy_sddl = self.allow_if(
4917 f'@User.{escaped_claim_id(claim_id)} == "{claim_value}"')
4918 target_policy = self.create_authn_policy(
4919 enforced=True, computer_allowed_to=target_policy_sddl)
4921 # Create a user account.
4922 client_creds = self._get_creds(account_type=self.AccountType.USER)
4923 client_tgt = self.get_tgt(client_creds)
4925 # Add the SIDs and the required claim to the client’s TGT.
4926 client_tgt = self.modified_ticket(
4929 partial(self.set_pac_claims, client_claims=pac_claims),
4930 partial(self.set_pac_sids, new_sids=device_sids)],
4931 checksum_keys=self.get_krbtgt_checksum_key())
4933 # Create a target account with the assigned policy.
4934 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4935 assigned_policy=target_policy)
4937 # Show that authorization fails.
4939 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4941 expect_edata=self.expect_padata_outer,
4942 # We aren’t particular about whether or not we get an NTSTATUS.
4944 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4949 policy=target_policy,
4950 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4951 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4952 reason=AuditReason.ACCESS_DENIED)
4954 def test_pac_device_claims_not_present(self):
4955 """Test that authorization fails if the device does not have a
4959 claim_id = 'the name of the claim'
4960 claim_value = 'the value of the claim'
4962 # Create a machine account with which to perform FAST.
4963 mach_creds = self.get_cached_creds(
4964 account_type=self.AccountType.COMPUTER,
4965 opts={'id': 'device'})
4966 mach_tgt = self.get_tgt(mach_creds)
4968 # Create an authentication policy that requires the device to have a
4969 # certain device claim.
4970 target_policy_sddl = self.allow_if(
4971 f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
4972 target_policy = self.create_authn_policy(
4973 enforced=True, computer_allowed_to=target_policy_sddl)
4975 # Create a user account.
4976 client_creds = self._get_creds(account_type=self.AccountType.USER)
4977 client_tgt = self.get_tgt(client_creds)
4979 # Create a target account with the assigned policy.
4980 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
4981 assigned_policy=target_policy)
4983 # Show that authorization fails.
4985 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
4987 expect_edata=self.expect_padata_outer,
4988 # We aren’t particular about whether or not we get an NTSTATUS.
4990 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
4995 policy=target_policy,
4996 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
4997 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
4998 reason=AuditReason.ACCESS_DENIED)
5000 def test_pac_device_claims_present(self):
5001 """Test that authorization succeeds if the device has a required claim.
5004 claim_id = 'the name of the claim'
5005 claim_value = 'the value of the claim'
5008 (claims.CLAIMS_SOURCE_TYPE_AD, [
5009 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
5013 # Create a machine account with which to perform FAST.
5014 mach_creds = self.get_cached_creds(
5015 account_type=self.AccountType.COMPUTER,
5016 opts={'id': 'device'})
5017 mach_tgt = self.get_tgt(mach_creds)
5019 # Add the required claim to the machine account’s TGT.
5020 mach_tgt = self.modified_ticket(
5022 modify_pac_fn=partial(self.set_pac_claims,
5023 client_claims=pac_claims),
5024 checksum_keys=self.get_krbtgt_checksum_key())
5026 # Create an authentication policy that requires the device to have a
5027 # certain device claim.
5028 target_policy_sddl = self.allow_if(
5029 f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
5030 target_policy = self.create_authn_policy(
5031 enforced=True, computer_allowed_to=target_policy_sddl)
5033 # Create a user account.
5034 client_creds = self._get_creds(account_type=self.AccountType.USER)
5035 client_tgt = self.get_tgt(client_creds)
5037 # Create a target account with the assigned policy.
5038 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5039 assigned_policy=target_policy)
5041 # Show that authorization succeeds.
5042 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
5044 self.check_tgs_log(client_creds, target_creds,
5045 policy=target_policy)
5047 def test_pac_device_claims_invalid(self):
5048 """Test that authorization fails if the device’s required claim is not
5052 claim_id = 'the name of the claim'
5053 claim_value = 'the value of the claim'
5056 (claims.CLAIMS_SOURCE_TYPE_AD, [
5057 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
5061 # The device’s SIDs do not include the Claims Valid SID.
5063 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5064 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5067 # Create a machine account with which to perform FAST.
5068 mach_creds = self.get_cached_creds(
5069 account_type=self.AccountType.COMPUTER,
5070 opts={'id': 'device'})
5071 mach_tgt = self.get_tgt(mach_creds)
5073 # Add the SIDs and the required claim to the machine account’s TGT.
5074 mach_tgt = self.modified_ticket(
5077 partial(self.set_pac_claims, client_claims=pac_claims),
5078 partial(self.set_pac_sids, new_sids=device_sids)],
5079 checksum_keys=self.get_krbtgt_checksum_key())
5081 # Create an authentication policy that requires the device to have a
5083 target_policy_sddl = self.allow_if(
5084 f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
5085 target_policy = self.create_authn_policy(
5086 enforced=True, computer_allowed_to=target_policy_sddl)
5088 # Create a user account.
5089 client_creds = self._get_creds(account_type=self.AccountType.USER)
5090 client_tgt = self.get_tgt(client_creds)
5092 # Create a target account with the assigned policy.
5093 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5094 assigned_policy=target_policy)
5096 # Show that authorization fails.
5098 client_tgt, KDC_ERR_POLICY, client_creds, target_creds,
5100 expect_edata=self.expect_padata_outer,
5101 # We aren’t particular about whether or not we get an NTSTATUS.
5103 expected_status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED)
5108 policy=target_policy,
5109 status=ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
5110 event=AuditEvent.KERBEROS_SERVER_RESTRICTION,
5111 reason=AuditReason.ACCESS_DENIED)
5113 def test_pac_device_claims_invalid_no_attrs(self):
5114 """Test that authorization fails if the device’s required claim is not
5118 claim_id = 'the name of the claim'
5119 claim_value = 'the value of the claim'
5122 (claims.CLAIMS_SOURCE_TYPE_AD, [
5123 (claim_id, claims.CLAIM_TYPE_STRING, [claim_value]),
5128 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5129 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5130 # The device’s SIDs include the Claims Valid SID, but it has no
5132 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, 0),
5135 # Create a machine account with which to perform FAST.
5136 mach_creds = self.get_cached_creds(
5137 account_type=self.AccountType.COMPUTER,
5138 opts={'id': 'device'})
5139 mach_tgt = self.get_tgt(mach_creds)
5141 # Add the SIDs and the required claim to the machine account’s TGT.
5142 mach_tgt = self.modified_ticket(
5145 partial(self.set_pac_claims, client_claims=pac_claims),
5146 partial(self.set_pac_sids, new_sids=device_sids)],
5147 checksum_keys=self.get_krbtgt_checksum_key())
5149 # Create an authentication policy that requires the device to have a
5151 target_policy_sddl = self.allow_if(
5152 f'@Device.{escaped_claim_id(claim_id)} == "{claim_value}"')
5153 target_policy = self.create_authn_policy(
5154 enforced=True, computer_allowed_to=target_policy_sddl)
5156 # Create a user account.
5157 client_creds = self._get_creds(account_type=self.AccountType.USER)
5158 client_tgt = self.get_tgt(client_creds)
5160 # Create a target account with the assigned policy.
5161 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5162 assigned_policy=target_policy)
5164 # Show that authorization succeeds.
5165 self._tgs_req(client_tgt, 0, client_creds, target_creds, armor_tgt=mach_tgt)
5167 self.check_tgs_log(client_creds, target_creds,
5168 policy=target_policy)
5170 def test_simple_as_req_client_and_target_policy(self):
5171 # Create a machine account with which to perform FAST.
5172 mach_creds = self.get_cached_creds(
5173 account_type=self.AccountType.COMPUTER)
5174 mach_tgt = self.get_tgt(mach_creds)
5176 # Create an authentication policy that explicitly allows the machine
5177 # account for a user.
5178 client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of {{SID({mach_creds.get_sid()}), SID({mach_creds.get_sid()})}}))'
5179 client_policy = self.create_authn_policy(enforced=True,
5180 user_allowed_from=client_policy_sddl)
5182 # Create a user account with the assigned policy.
5183 client_creds = self._get_creds(account_type=self.AccountType.USER,
5184 assigned_policy=client_policy)
5186 # Create an authentication policy that applies to a computer and
5187 # explicitly allows the user account to obtain a service ticket.
5188 target_policy_sddl = f'O:SYD:(XA;;CR;;;{client_creds.get_sid()};(Member_of SID({client_creds.get_sid()})))'
5189 target_policy = self.create_authn_policy(enforced=True,
5190 computer_allowed_to=target_policy_sddl)
5192 # Create a computer account with the assigned policy.
5193 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5194 assigned_policy=target_policy)
5197 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5198 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5199 (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
5200 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
5203 # Show that obtaining a service ticket with an AS‐REQ is allowed.
5204 self._armored_as_req(client_creds,
5207 expected_groups=expected_groups)
5209 self.check_as_log(client_creds,
5210 armor_creds=mach_creds,
5211 client_policy=client_policy,
5212 server_policy=target_policy)
5214 def test_device_in_world_group(self):
5215 self._check_device_in_group(security.SID_WORLD)
5217 def test_device_in_network_group(self):
5218 self._check_device_not_in_group(security.SID_NT_NETWORK)
5220 def test_device_in_authenticated_users(self):
5221 self._check_device_in_group(security.SID_NT_AUTHENTICATED_USERS)
5223 def test_device_in_aa_asserted_identity(self):
5224 self._check_device_in_group(
5225 security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY)
5227 def test_device_in_service_asserted_identity(self):
5228 self._check_device_not_in_group(security.SID_SERVICE_ASSERTED_IDENTITY)
5230 def test_device_in_compounded_authentication(self):
5231 self._check_device_not_in_group(security.SID_COMPOUNDED_AUTHENTICATION)
5233 def test_device_in_claims_valid(self):
5234 self._check_device_in_group(security.SID_CLAIMS_VALID)
5236 def _check_device_in_group(self, group):
5237 self._check_device_membership(group, expect_in_group=True)
5239 def _check_device_not_in_group(self, group):
5240 self._check_device_membership(group, expect_in_group=False)
5242 def _check_device_membership(self, group, *, expect_in_group):
5243 """Test that authentication succeeds or fails when the device is
5244 required to belong to a certain group.
5247 # Create a machine account with which to perform FAST.
5248 mach_creds = self.get_cached_creds(
5249 account_type=self.AccountType.COMPUTER,
5250 opts={'id': 'device'})
5251 mach_tgt = self.get_tgt(mach_creds)
5253 # Create an authentication policy that requires the device to belong to
5255 in_group_sddl = self.allow_if(f'Device_Member_of {{SID({group})}}')
5256 in_group_policy = self.create_authn_policy(
5257 enforced=True, computer_allowed_to=in_group_sddl)
5259 # Create a user account.
5260 client_creds = self._get_creds(account_type=self.AccountType.USER)
5261 client_tgt = self.get_tgt(client_creds)
5263 # Create a target account with the assigned policy.
5264 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5265 assigned_policy=in_group_policy)
5267 tgs_success_args = {}
5268 tgs_failure_args = {
5269 'expect_edata': self.expect_padata_outer,
5270 # We aren’t particular about whether or not we get an NTSTATUS.
5271 'expect_status': None,
5272 'expected_status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
5275 # Test whether authorization succeeds or fails.
5276 self._tgs_req(client_tgt,
5277 0 if expect_in_group else KDC_ERR_POLICY,
5281 **(tgs_success_args if expect_in_group
5282 else tgs_failure_args))
5284 policy_success_args = {}
5285 policy_failure_args = {
5286 'status': ntstatus.NT_STATUS_AUTHENTICATION_FIREWALL_FAILED,
5287 'event': AuditEvent.KERBEROS_SERVER_RESTRICTION,
5288 'reason': AuditReason.ACCESS_DENIED,
5291 self.check_tgs_log(client_creds, target_creds,
5292 policy=in_group_policy,
5293 **(policy_success_args if expect_in_group
5294 else policy_failure_args))
5296 # Create an authentication policy that requires the device not to belong
5298 not_in_group_sddl = self.allow_if(
5299 f'Not_Device_Member_of {{SID({group})}}')
5300 not_in_group_policy = self.create_authn_policy(
5301 enforced=True, computer_allowed_to=not_in_group_sddl)
5303 # Create a target account with the assigned policy.
5304 target_creds = self._get_creds(account_type=self.AccountType.COMPUTER,
5305 assigned_policy=not_in_group_policy)
5307 # Test whether authorization succeeds or fails.
5308 self._tgs_req(client_tgt,
5309 KDC_ERR_POLICY if expect_in_group else 0,
5313 **(tgs_failure_args if expect_in_group
5314 else tgs_success_args))
5316 self.check_tgs_log(client_creds, target_creds,
5317 policy=not_in_group_policy,
5318 **(policy_failure_args if expect_in_group
5319 else policy_success_args))
5321 def test_simple_as_req_client_policy_only(self):
5322 # Create a machine account with which to perform FAST.
5323 mach_creds = self.get_cached_creds(
5324 account_type=self.AccountType.COMPUTER)
5325 mach_tgt = self.get_tgt(mach_creds)
5327 # Create an authentication policy that explicitly allows the machine
5328 # account for a user.
5329 client_policy_sddl = f'O:SYD:(XA;;CR;;;{mach_creds.get_sid()};(Member_of SID({mach_creds.get_sid()})))'
5330 client_policy = self.create_authn_policy(enforced=True,
5331 user_allowed_from=client_policy_sddl)
5333 # Create a user account with the assigned policy.
5334 client_creds = self._get_creds(account_type=self.AccountType.USER,
5335 assigned_policy=client_policy)
5338 (security.DOMAIN_RID_USERS, SidType.BASE_SID, self.default_attrs),
5339 (security.DOMAIN_RID_USERS, SidType.PRIMARY_GID, None),
5340 (security.SID_AUTHENTICATION_AUTHORITY_ASSERTED_IDENTITY, SidType.EXTRA_SID, self.default_attrs),
5341 (security.SID_CLAIMS_VALID, SidType.EXTRA_SID, self.default_attrs),
5344 # Show that obtaining a service ticket with an AS‐REQ is allowed.
5345 self._armored_as_req(client_creds,
5346 self.get_krbtgt_creds(),
5348 expected_groups=expected_groups)
5350 self.check_as_log(client_creds,
5351 armor_creds=mach_creds,
5352 client_policy=client_policy)
5355 if __name__ == '__main__':
5356 global_asn1_print = False
5357 global_hexdump = False