1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
34 from pyasn1.codec.ber.encoder import BitStringEncoder
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
42 from samba.tests import TestCaseInTempDir
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
48 FX_FAST_ARMOR_AP_REQUEST,
50 KDC_ERR_PREAUTH_FAILED,
51 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52 KERB_ERR_TYPE_EXTENDED,
66 KU_NON_KERB_CKSUM_SALT,
67 KU_TGS_REP_ENC_PART_SESSION,
68 KU_TGS_REP_ENC_PART_SUB_KEY,
70 KU_TGS_REQ_AUTH_CKSUM,
71 KU_TGS_REQ_AUTH_DAT_SESSION,
72 KU_TGS_REQ_AUTH_DAT_SUBKEY,
76 PADATA_ENCRYPTED_CHALLENGE,
89 PADATA_SUPPORTED_ETYPES
91 import samba.tests.krb5.kcrypto as kcrypto
94 def BitStringEncoder_encodeValue32(
95 self, value, asn1Spec, encodeFun, **options):
97 # BitStrings like KDCOptions or TicketFlags should at least
98 # be 32-Bit on the wire
100 if asn1Spec is not None:
101 # TODO: try to avoid ASN.1 schema instantiation
102 value = asn1Spec.clone(value)
104 valueLength = len(value)
106 alignedValue = value << (8 - valueLength % 8)
110 substrate = alignedValue.asOctets()
111 length = len(substrate)
112 # We need at least 32-Bit / 4-Bytes
117 ret = b'\x00' + substrate + (b'\x00' * padding)
118 return ret, False, True
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125 ret = "%s" % self.asBinary()
128 for byte in self.asNumbers():
129 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
136 if len(bits) < highest_bit:
137 for bitPosition in range(len(bits), highest_bit):
140 delim = ": (\n%s " % indent
141 for bitPosition in range(highest_bit):
142 if bitPosition in self.prettyPrintNamedValues:
143 name = self.prettyPrintNamedValues[bitPosition]
144 elif bits[bitPosition] != 0:
145 name = "unknown-bit-%u" % bitPosition
148 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149 delim = ",\n%s " % indent
150 ret += "\n%s)" % indent
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155 krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157 krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159 BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161 krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163 krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165 BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167 krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169 krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171 BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173 krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175 krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177 BitString_NamedValues_prettyPrint
180 def Integer_NamedValues_prettyPrint(self, scope=0):
182 if intval in self.prettyPrintNamedValues:
183 name = self.prettyPrintNamedValues[intval]
185 name = "<__unknown__>"
186 ret = "%d (0x%x) %s" % (intval, intval, name)
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191 krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193 Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195 krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197 Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199 krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201 Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203 krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205 Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207 krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209 Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211 krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213 Integer_NamedValues_prettyPrint
216 class Krb5EncryptionKey:
217 def __init__(self, key, kvno):
219 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
224 self.etype = key.enctype
225 self.ctype = EncTypeChecksum[self.etype]
228 def encrypt(self, usage, plaintext):
229 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
232 def decrypt(self, usage, ciphertext):
233 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
236 def make_zeroed_checksum(self, ctype=None):
240 checksum_len = kcrypto.checksum_len(ctype)
241 return bytes(checksum_len)
243 def make_checksum(self, usage, plaintext, ctype=None):
246 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
249 def verify_checksum(self, usage, plaintext, ctype, cksum):
250 if self.ctype != ctype:
251 raise AssertionError(f'key checksum type ({self.ctype}) != '
252 f'checksum type ({ctype})')
254 kcrypto.verify_checksum(ctype,
260 def export_obj(self):
261 EncryptionKey_obj = {
262 'keytype': self.etype,
263 'keyvalue': self.key.contents,
265 return EncryptionKey_obj
268 class RodcPacEncryptionKey(Krb5EncryptionKey):
269 def __init__(self, key, kvno, rodc_id=None):
270 super().__init__(key, kvno)
276 kvno &= (1 << 16) - 1
278 rodc_id = kvno or None
280 if rodc_id is not None:
281 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
285 def make_rodc_zeroed_checksum(self, ctype=None):
286 checksum = super().make_zeroed_checksum(ctype)
287 return checksum + bytes(len(self.rodc_id))
289 def make_rodc_checksum(self, usage, plaintext, ctype=None):
290 checksum = super().make_checksum(usage, plaintext, ctype)
291 return checksum + self.rodc_id
293 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
295 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
297 if self.rodc_id != cksum_rodc_id:
298 raise AssertionError(f'{self.rodc_id.hex()} != '
299 f'{cksum_rodc_id.hex()}')
301 super().verify_checksum(usage,
307 class ZeroedChecksumKey(RodcPacEncryptionKey):
308 def make_checksum(self, usage, plaintext, ctype=None):
309 return self.make_zeroed_checksum(ctype)
311 def make_rodc_checksum(self, usage, plaintext, ctype=None):
312 return self.make_rodc_zeroed_checksum(ctype)
315 class WrongLengthChecksumKey(RodcPacEncryptionKey):
316 def __init__(self, key, kvno, length):
317 super().__init__(key, kvno)
319 self._length = length
322 def _adjust_to_length(cls, checksum, length):
323 diff = length - len(checksum)
325 checksum += bytes(diff)
327 checksum = checksum[:length]
331 def make_zeroed_checksum(self, ctype=None):
332 return bytes(self._length)
334 def make_checksum(self, usage, plaintext, ctype=None):
335 checksum = super().make_checksum(usage, plaintext, ctype)
336 return self._adjust_to_length(checksum, self._length)
338 def make_rodc_zeroed_checksum(self, ctype=None):
339 return bytes(self._length)
341 def make_rodc_checksum(self, usage, plaintext, ctype=None):
342 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
343 return self._adjust_to_length(checksum, self._length)
346 class KerberosCredentials(Credentials):
348 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
349 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
350 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
353 super(KerberosCredentials, self).__init__()
355 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
356 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
357 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
359 self.as_supported_enctypes = all_enc_types
360 self.tgs_supported_enctypes = all_enc_types
361 self.ap_supported_enctypes = all_enc_types
364 self.forced_keys = {}
366 self.forced_salt = None
372 def set_as_supported_enctypes(self, value):
373 self.as_supported_enctypes = int(value)
375 def set_tgs_supported_enctypes(self, value):
376 self.tgs_supported_enctypes = int(value)
378 def set_ap_supported_enctypes(self, value):
379 self.ap_supported_enctypes = int(value)
381 etype_map = collections.OrderedDict([
382 (kcrypto.Enctype.AES256,
383 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
384 (kcrypto.Enctype.AES128,
385 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
386 (kcrypto.Enctype.RC4,
387 security.KERB_ENCTYPE_RC4_HMAC_MD5),
388 (kcrypto.Enctype.DES_MD5,
389 security.KERB_ENCTYPE_DES_CBC_MD5),
390 (kcrypto.Enctype.DES_CRC,
391 security.KERB_ENCTYPE_DES_CBC_CRC)
395 def etypes_to_bits(cls, etypes):
398 bit = cls.etype_map[etype]
400 raise ValueError(f'Got duplicate etype: {etype}')
406 def bits_to_etypes(cls, bits):
408 for etype, bit in cls.etype_map.items():
413 bits &= ~cls.fast_supported_bits
415 raise ValueError(f'Unsupported etype bits: {bits}')
419 def get_as_krb5_etypes(self):
420 return self.bits_to_etypes(self.as_supported_enctypes)
422 def get_tgs_krb5_etypes(self):
423 return self.bits_to_etypes(self.tgs_supported_enctypes)
425 def get_ap_krb5_etypes(self):
426 return self.bits_to_etypes(self.ap_supported_enctypes)
428 def set_kvno(self, kvno):
429 # Sign-extend from 32 bits.
437 def set_forced_key(self, etype, hexkey):
439 contents = binascii.a2b_hex(hexkey)
440 key = kcrypto.Key(etype, contents)
441 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
443 def get_forced_key(self, etype):
445 return self.forced_keys.get(etype)
447 def set_forced_salt(self, salt):
448 self.forced_salt = bytes(salt)
450 def get_forced_salt(self):
451 return self.forced_salt
454 if self.forced_salt is not None:
455 return self.forced_salt
459 salt_name = upn.rsplit('@', 1)[0].replace('/', '')
461 salt_name = self.get_username()
463 if self.get_workstation():
464 salt_name = self.get_username().lower()
465 if salt_name[-1] == '$':
466 salt_name = salt_name[:-1]
467 salt_string = '%shost%s.%s' % (
468 self.get_realm().upper(),
470 self.get_realm().lower())
472 salt_string = self.get_realm().upper() + salt_name
474 return salt_string.encode('utf-8')
476 def set_dn(self, dn):
482 def set_spn(self, spn):
488 def set_upn(self, upn):
495 class KerberosTicketCreds:
496 def __init__(self, ticket, session_key,
497 crealm=None, cname=None,
498 srealm=None, sname=None,
501 encpart_private=None):
503 self.session_key = session_key
508 self.decryption_key = decryption_key
509 self.ticket_private = ticket_private
510 self.encpart_private = encpart_private
513 class RawKerberosTest(TestCaseInTempDir):
514 """A raw Kerberos Test case."""
516 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
517 krb5pac.PAC_TYPE_KDC_CHECKSUM,
518 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
521 {"value": -1111, "name": "dummy", },
522 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
523 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
524 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
527 setup_etype_test_permutations_done = False
530 def setup_etype_test_permutations(cls):
531 if cls.setup_etype_test_permutations_done:
536 num_idxs = len(cls.etypes_to_test)
538 for num in range(1, num_idxs + 1):
539 chunk = list(itertools.permutations(range(num_idxs), num))
542 permutations.append(el)
544 for p in permutations:
548 n = cls.etypes_to_test[idx]["name"]
553 etypes += (cls.etypes_to_test[idx]["value"],)
555 r = {"name": name, "etypes": etypes, }
558 cls.etype_test_permutations = res
559 cls.setup_etype_test_permutations_done = True
562 def etype_test_permutation_name_idx(cls):
563 cls.setup_etype_test_permutations()
566 for e in cls.etype_test_permutations:
572 def etype_test_permutation_by_idx(self, idx):
573 e = self.etype_test_permutations[idx]
574 return (e['name'], e['etypes'])
580 cls.host = samba.tests.env_get_var_value('SERVER')
581 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
583 # A dictionary containing credentials that have already been
587 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
589 if kdc_fast_support is None:
590 kdc_fast_support = '0'
591 cls.kdc_fast_support = bool(int(kdc_fast_support))
593 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
595 if tkt_sig_support is None:
596 tkt_sig_support = '0'
597 cls.tkt_sig_support = bool(int(tkt_sig_support))
599 expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
601 if expect_pac is None:
603 cls.expect_pac = bool(int(expect_pac))
605 expect_extra_pac_buffers = samba.tests.env_get_var_value(
606 'EXPECT_EXTRA_PAC_BUFFERS',
608 if expect_extra_pac_buffers is None:
609 expect_extra_pac_buffers = '1'
610 cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
614 self.do_asn1_print = False
615 self.do_hexdump = False
617 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
619 if strict_checking is None:
620 strict_checking = '1'
621 self.strict_checking = bool(int(strict_checking))
625 self.unspecified_kvno = object()
628 self._disconnect("tearDown")
631 def _disconnect(self, reason):
637 sys.stderr.write("disconnect[%s]\n" % reason)
639 def _connect_tcp(self, host):
642 self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
643 socket.SOCK_STREAM, socket.SOL_TCP,
645 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
646 self.s.settimeout(10)
647 self.s.connect(self.a[0][4])
655 def connect(self, host):
656 self.assertNotConnected()
657 self._connect_tcp(host)
659 sys.stderr.write("connected[%s]\n" % host)
661 def env_get_var(self, varname, prefix,
662 fallback_default=True,
663 allow_missing=False):
665 if prefix is not None:
666 allow_missing_prefix = allow_missing or fallback_default
667 val = samba.tests.env_get_var_value(
668 '%s_%s' % (prefix, varname),
669 allow_missing=allow_missing_prefix)
671 fallback_default = True
672 if val is None and fallback_default:
673 val = samba.tests.env_get_var_value(varname,
674 allow_missing=allow_missing)
677 def _get_krb5_creds_from_env(self, prefix,
678 default_username=None,
679 allow_missing_password=False,
680 allow_missing_keys=True,
681 require_strongest_key=False):
682 c = KerberosCredentials()
685 domain = self.env_get_var('DOMAIN', prefix)
686 realm = self.env_get_var('REALM', prefix)
687 allow_missing_username = default_username is not None
688 username = self.env_get_var('USERNAME', prefix,
689 fallback_default=False,
690 allow_missing=allow_missing_username)
692 username = default_username
693 password = self.env_get_var('PASSWORD', prefix,
694 fallback_default=False,
695 allow_missing=allow_missing_password)
698 c.set_username(username)
699 if password is not None:
700 c.set_password(password)
701 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
702 prefix, allow_missing=True)
703 if as_supported_enctypes is not None:
704 c.set_as_supported_enctypes(as_supported_enctypes)
705 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
706 prefix, allow_missing=True)
707 if tgs_supported_enctypes is not None:
708 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
709 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
710 prefix, allow_missing=True)
711 if ap_supported_enctypes is not None:
712 c.set_ap_supported_enctypes(ap_supported_enctypes)
714 if require_strongest_key:
715 kvno_allow_missing = False
717 aes256_allow_missing = False
719 aes256_allow_missing = True
721 kvno_allow_missing = allow_missing_keys
722 aes256_allow_missing = allow_missing_keys
723 kvno = self.env_get_var('KVNO', prefix,
724 fallback_default=False,
725 allow_missing=kvno_allow_missing)
727 c.set_kvno(int(kvno))
728 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
729 fallback_default=False,
730 allow_missing=aes256_allow_missing)
731 if aes256_key is not None:
732 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
733 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
734 fallback_default=False,
736 if aes128_key is not None:
737 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
738 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
739 fallback_default=False, allow_missing=True)
740 if rc4_key is not None:
741 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
743 if not allow_missing_keys:
744 self.assertTrue(c.forced_keys,
745 'Please supply %s encryption keys '
746 'in environment' % prefix)
750 def _get_krb5_creds(self,
752 default_username=None,
753 allow_missing_password=False,
754 allow_missing_keys=True,
755 require_strongest_key=False,
756 fallback_creds_fn=None):
757 if prefix in self.creds_dict:
758 return self.creds_dict[prefix]
760 # We don't have the credentials already
764 # Try to obtain them from the environment
765 creds = self._get_krb5_creds_from_env(
767 default_username=default_username,
768 allow_missing_password=allow_missing_password,
769 allow_missing_keys=allow_missing_keys,
770 require_strongest_key=require_strongest_key)
771 except Exception as err:
772 # An error occurred, so save it for later
775 self.assertIsNotNone(creds)
776 # Save the obtained credentials
777 self.creds_dict[prefix] = creds
780 if fallback_creds_fn is not None:
782 # Try to use the fallback method
783 creds = fallback_creds_fn()
784 except Exception as err:
785 print("ERROR FROM ENV: %r" % (env_err))
786 print("FALLBACK-FN: %s" % (fallback_creds_fn))
787 print("FALLBACK-ERROR: %r" % (err))
789 self.assertIsNotNone(creds)
790 # Save the obtained credentials
791 self.creds_dict[prefix] = creds
794 # Both methods failed, so raise the exception from the
798 def get_user_creds(self,
799 allow_missing_password=False,
800 allow_missing_keys=True):
801 c = self._get_krb5_creds(prefix=None,
802 allow_missing_password=allow_missing_password,
803 allow_missing_keys=allow_missing_keys)
806 def get_service_creds(self,
807 allow_missing_password=False,
808 allow_missing_keys=True):
809 c = self._get_krb5_creds(prefix='SERVICE',
810 allow_missing_password=allow_missing_password,
811 allow_missing_keys=allow_missing_keys)
814 def get_client_creds(self,
815 allow_missing_password=False,
816 allow_missing_keys=True):
817 c = self._get_krb5_creds(prefix='CLIENT',
818 allow_missing_password=allow_missing_password,
819 allow_missing_keys=allow_missing_keys)
822 def get_server_creds(self,
823 allow_missing_password=False,
824 allow_missing_keys=True):
825 c = self._get_krb5_creds(prefix='SERVER',
826 allow_missing_password=allow_missing_password,
827 allow_missing_keys=allow_missing_keys)
830 def get_admin_creds(self,
831 allow_missing_password=False,
832 allow_missing_keys=True):
833 c = self._get_krb5_creds(prefix='ADMIN',
834 allow_missing_password=allow_missing_password,
835 allow_missing_keys=allow_missing_keys)
836 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
837 c.set_workstation('')
840 def get_rodc_krbtgt_creds(self,
842 require_strongest_key=False):
843 if require_strongest_key:
844 self.assertTrue(require_keys)
845 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
846 allow_missing_password=True,
847 allow_missing_keys=not require_keys,
848 require_strongest_key=require_strongest_key)
851 def get_krbtgt_creds(self,
853 require_strongest_key=False):
854 if require_strongest_key:
855 self.assertTrue(require_keys)
856 c = self._get_krb5_creds(prefix='KRBTGT',
857 default_username='krbtgt',
858 allow_missing_password=True,
859 allow_missing_keys=not require_keys,
860 require_strongest_key=require_strongest_key)
863 def get_anon_creds(self):
868 def asn1_dump(self, name, obj, asn1_print=None):
869 if asn1_print is None:
870 asn1_print = self.do_asn1_print
873 sys.stderr.write("%s:\n%s" % (name, obj))
875 sys.stderr.write("%s" % (obj))
877 def hex_dump(self, name, blob, hexdump=None):
879 hexdump = self.do_hexdump
882 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
891 if asn1Spec is not None:
892 class_name = type(asn1Spec).__name__.split(':')[0]
894 class_name = "<None-asn1Spec>"
895 self.hex_dump(class_name, blob, hexdump=hexdump)
896 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
897 self.asn1_dump(None, obj, asn1_print=asn1_print)
899 obj = pyasn1_native_encode(obj)
910 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
911 class_name = type(obj).__name__.split(':')[0]
912 if class_name is not None:
913 self.asn1_dump(None, obj, asn1_print=asn1_print)
914 blob = pyasn1_der_encode(obj)
915 if class_name is not None:
916 self.hex_dump(class_name, blob, hexdump=hexdump)
919 def send_pdu(self, req, asn1_print=None, hexdump=None):
921 k5_pdu = self.der_encode(
922 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
923 header = struct.pack('>I', len(k5_pdu))
926 self.hex_dump("send_pdu", header, hexdump=hexdump)
927 self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
929 sent = self.s.send(req_pdu, 0)
930 if sent == len(req_pdu):
932 req_pdu = req_pdu[sent:]
933 except socket.error as e:
934 self._disconnect("send_pdu: %s" % e)
937 self._disconnect("send_pdu: %s" % e)
940 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
943 if timeout is not None:
944 self.s.settimeout(timeout)
945 rep_pdu = self.s.recv(num_recv, 0)
946 self.s.settimeout(10)
947 if len(rep_pdu) == 0:
948 self._disconnect("recv_raw: EOF")
950 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
951 except socket.timeout:
952 self.s.settimeout(10)
953 sys.stderr.write("recv_raw: TIMEOUT\n")
954 except socket.error as e:
955 self._disconnect("recv_raw: %s" % e)
958 self._disconnect("recv_raw: %s" % e)
962 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
965 raw_pdu = self.recv_raw(
966 num_recv=4, hexdump=hexdump, timeout=timeout)
969 header = struct.unpack(">I", raw_pdu[0:4])
976 raw_pdu = self.recv_raw(
977 num_recv=missing, hexdump=hexdump, timeout=timeout)
978 self.assertGreaterEqual(len(raw_pdu), 1)
980 missing = k5_len - len(rep_pdu)
981 k5_raw = self.der_decode(
987 pvno = k5_raw['field-0']
988 self.assertEqual(pvno, 5)
989 msg_type = k5_raw['field-1']
990 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
991 if msg_type == KRB_AS_REP:
992 asn1Spec = krb5_asn1.AS_REP()
993 elif msg_type == KRB_TGS_REP:
994 asn1Spec = krb5_asn1.TGS_REP()
995 elif msg_type == KRB_ERROR:
996 asn1Spec = krb5_asn1.KRB_ERROR()
997 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
998 asn1_print=asn1_print, hexdump=False)
999 return (rep, rep_pdu)
1001 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1002 (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
1007 def assertIsConnected(self):
1008 self.assertIsNotNone(self.s, msg="Not connected")
1010 def assertNotConnected(self):
1011 self.assertIsNone(self.s, msg="Is connected")
1013 def send_recv_transaction(
1020 host = self.host if to_rodc else self.dc_host
1023 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1024 rep = self.recv_pdu(
1025 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1027 self._disconnect("transaction failed")
1029 self._disconnect("transaction done")
1032 def assertNoValue(self, value):
1033 self.assertTrue(value.isNoValue)
1035 def assertHasValue(self, value):
1036 self.assertIsNotNone(value)
1038 def getElementValue(self, obj, elem):
1039 return obj.get(elem)
1041 def assertElementMissing(self, obj, elem):
1042 v = self.getElementValue(obj, elem)
1043 self.assertIsNone(v)
1045 def assertElementPresent(self, obj, elem, expect_empty=False):
1046 v = self.getElementValue(obj, elem)
1047 self.assertIsNotNone(v)
1048 if self.strict_checking:
1049 if isinstance(v, collections.abc.Container):
1051 self.assertEqual(0, len(v))
1053 self.assertNotEqual(0, len(v))
1055 def assertElementEqual(self, obj, elem, value):
1056 v = self.getElementValue(obj, elem)
1057 self.assertIsNotNone(v)
1058 self.assertEqual(v, value)
1060 def assertElementEqualUTF8(self, obj, elem, value):
1061 v = self.getElementValue(obj, elem)
1062 self.assertIsNotNone(v)
1063 self.assertEqual(v, bytes(value, 'utf8'))
1065 def assertPrincipalEqual(self, princ1, princ2):
1066 self.assertEqual(princ1['name-type'], princ2['name-type'])
1068 len(princ1['name-string']),
1069 len(princ2['name-string']),
1070 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1071 for idx in range(len(princ1['name-string'])):
1073 princ1['name-string'][idx],
1074 princ2['name-string'][idx],
1075 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1077 def assertElementEqualPrincipal(self, obj, elem, value):
1078 v = self.getElementValue(obj, elem)
1079 self.assertIsNotNone(v)
1080 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1081 self.assertPrincipalEqual(v, value)
1083 def assertElementKVNO(self, obj, elem, value):
1084 v = self.getElementValue(obj, elem)
1085 if value == "autodetect":
1087 if value is not None:
1088 self.assertIsNotNone(v)
1089 # The value on the wire should never be 0
1090 self.assertNotEqual(v, 0)
1091 # unspecified_kvno means we don't know the kvno,
1092 # but want to enforce its presence
1093 if value is not self.unspecified_kvno:
1095 self.assertNotEqual(value, 0)
1096 self.assertEqual(v, value)
1098 self.assertIsNone(v)
1100 def assertElementFlags(self, obj, elem, expected, unexpected):
1101 v = self.getElementValue(obj, elem)
1102 self.assertIsNotNone(v)
1103 if expected is not None:
1104 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1105 for i, flag in enumerate(expected):
1107 self.assertEqual('1', v[i],
1108 f"'{expected.namedValues[i]}' "
1110 if unexpected is not None:
1111 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1112 for i, flag in enumerate(unexpected):
1114 self.assertEqual('0', v[i],
1115 f"'{unexpected.namedValues[i]}' "
1116 f"unexpected in {v}")
1118 def assertSequenceElementsEqual(self, expected, got, *,
1119 require_strict=None,
1120 require_ordered=True):
1121 if self.strict_checking and require_ordered:
1122 self.assertEqual(expected, got)
1124 fail_msg = f'expected: {expected} got: {got}'
1126 if not self.strict_checking and require_strict is not None:
1127 fail_msg += f' (ignoring: {require_strict})'
1128 expected = (x for x in expected if x not in require_strict)
1129 got = (x for x in got if x not in require_strict)
1131 self.assertCountEqual(expected, got, fail_msg)
1133 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1136 if offset is not None:
1137 epoch = epoch + int(offset)
1138 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1139 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1141 def get_KerberosTime(self, epoch=None, offset=None):
1142 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1145 def get_EpochFromKerberosTime(self, kerberos_time):
1146 if isinstance(kerberos_time, bytes):
1147 kerberos_time = kerberos_time.decode()
1149 epoch = datetime.datetime.strptime(kerberos_time,
1151 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1152 epoch = int(epoch.timestamp())
1156 def get_Nonce(self):
1157 nonce_min = 0x7f000000
1158 nonce_max = 0x7fffffff
1159 v = random.randint(nonce_min, nonce_max)
1162 def get_pa_dict(self, pa_data):
1165 if pa_data is not None:
1167 pa_type = pa['padata-type']
1168 if pa_type in pa_dict:
1169 raise RuntimeError(f'Duplicate type {pa_type}')
1170 pa_dict[pa_type] = pa['padata-value']
1174 def SessionKey_create(self, etype, contents, kvno=None):
1175 key = kcrypto.Key(etype, contents)
1176 return RodcPacEncryptionKey(key, kvno)
1178 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1180 self.assertIsNotNone(pwd)
1181 self.assertIsNotNone(salt)
1182 key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1183 return RodcPacEncryptionKey(key, kvno)
1185 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1186 e = etype_info2['etype']
1188 salt = etype_info2.get('salt')
1190 if e == kcrypto.Enctype.RC4:
1191 nthash = creds.get_nt_hash()
1192 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1194 params = etype_info2.get('s2kparams')
1196 password = creds.get_password()
1197 return self.PasswordKey_create(
1198 etype=e, pwd=password, salt=salt, kvno=kvno, params=params)
1200 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1203 etypes = creds.get_tgs_krb5_etypes()
1207 etype = kcrypto.Enctype.RC4
1209 forced_key = creds.get_forced_key(etype)
1210 if forced_key is not None:
1213 kvno = creds.get_kvno()
1215 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1216 "nor a password specified, " % (
1217 creds.get_username(), etype, kvno))
1219 if etype == kcrypto.Enctype.RC4:
1220 nthash = creds.get_nt_hash()
1221 self.assertIsNotNone(nthash, msg=fail_msg)
1222 return self.SessionKey_create(etype=etype,
1226 password = creds.get_password()
1227 self.assertIsNotNone(password, msg=fail_msg)
1228 salt = creds.get_salt()
1229 return self.PasswordKey_create(etype=etype,
1234 def RandomKey(self, etype):
1235 e = kcrypto._get_enctype_profile(etype)
1236 contents = samba.generate_random_bytes(e.keysize)
1237 return self.SessionKey_create(etype=etype, contents=contents)
1239 def EncryptionKey_import(self, EncryptionKey_obj):
1240 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1241 EncryptionKey_obj['keyvalue'])
1243 def EncryptedData_create(self, key, usage, plaintext):
1244 # EncryptedData ::= SEQUENCE {
1245 # etype [0] Int32 -- EncryptionType --,
1246 # kvno [1] Int32 OPTIONAL,
1247 # cipher [2] OCTET STRING -- ciphertext
1249 ciphertext = key.encrypt(usage, plaintext)
1250 EncryptedData_obj = {
1252 'cipher': ciphertext
1254 if key.kvno is not None:
1255 EncryptedData_obj['kvno'] = key.kvno
1256 return EncryptedData_obj
1258 def Checksum_create(self, key, usage, plaintext, ctype=None):
1259 # Checksum ::= SEQUENCE {
1260 # cksumtype [0] Int32,
1261 # checksum [1] OCTET STRING
1265 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1268 'checksum': checksum,
1273 def PrincipalName_create(cls, name_type, names):
1274 # PrincipalName ::= SEQUENCE {
1275 # name-type [0] Int32,
1276 # name-string [1] SEQUENCE OF KerberosString
1278 PrincipalName_obj = {
1279 'name-type': name_type,
1280 'name-string': names,
1282 return PrincipalName_obj
1284 def AuthorizationData_create(self, ad_type, ad_data):
1285 # AuthorizationData ::= SEQUENCE {
1286 # ad-type [0] Int32,
1287 # ad-data [1] OCTET STRING
1293 return AUTH_DATA_obj
1295 def PA_DATA_create(self, padata_type, padata_value):
1296 # PA-DATA ::= SEQUENCE {
1297 # -- NOTE: first tag is [1], not [0]
1298 # padata-type [1] Int32,
1299 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1302 'padata-type': padata_type,
1303 'padata-value': padata_value,
1307 def PA_ENC_TS_ENC_create(self, ts, usec):
1308 # PA-ENC-TS-ENC ::= SEQUENCE {
1309 # patimestamp[0] KerberosTime, -- client's time
1310 # pausec[1] krb5int32 OPTIONAL
1312 PA_ENC_TS_ENC_obj = {
1316 return PA_ENC_TS_ENC_obj
1318 def PA_PAC_OPTIONS_create(self, options):
1319 # PA-PAC-OPTIONS ::= SEQUENCE {
1320 # options [0] PACOptionFlags
1322 PA_PAC_OPTIONS_obj = {
1325 return PA_PAC_OPTIONS_obj
1327 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1328 # KrbFastArmor ::= SEQUENCE {
1329 # armor-type [0] Int32,
1330 # armor-value [1] OCTET STRING,
1333 KRB_FAST_ARMOR_obj = {
1334 'armor-type': armor_type,
1335 'armor-value': armor_value
1337 return KRB_FAST_ARMOR_obj
1339 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1340 # KrbFastReq ::= SEQUENCE {
1341 # fast-options [0] FastOptions,
1342 # padata [1] SEQUENCE OF PA-DATA,
1343 # req-body [2] KDC-REQ-BODY,
1346 KRB_FAST_REQ_obj = {
1347 'fast-options': fast_options,
1349 'req-body': req_body
1351 return KRB_FAST_REQ_obj
1353 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1354 # KrbFastArmoredReq ::= SEQUENCE {
1355 # armor [0] KrbFastArmor OPTIONAL,
1356 # req-checksum [1] Checksum,
1357 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1359 KRB_FAST_ARMORED_REQ_obj = {
1360 'req-checksum': req_checksum,
1361 'enc-fast-req': enc_fast_req
1363 if armor is not None:
1364 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1365 return KRB_FAST_ARMORED_REQ_obj
1367 def PA_FX_FAST_REQUEST_create(self, armored_data):
1368 # PA-FX-FAST-REQUEST ::= CHOICE {
1369 # armored-data [0] KrbFastArmoredReq,
1372 PA_FX_FAST_REQUEST_obj = {
1373 'armored-data': armored_data
1375 return PA_FX_FAST_REQUEST_obj
1377 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1378 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1379 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1381 # --If FALSE, and PAC present,
1384 KERB_PA_PAC_REQUEST_obj = {
1385 'include-pac': include_pac,
1387 if not pa_data_create:
1388 return KERB_PA_PAC_REQUEST_obj
1389 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1390 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1391 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1394 def get_pa_pac_options(self, options):
1395 pac_options = self.PA_PAC_OPTIONS_create(options)
1396 pac_options = self.der_encode(pac_options,
1397 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1398 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1402 def KDC_REQ_BODY_create(self,
1414 EncAuthorizationData,
1415 EncAuthorizationData_key,
1416 EncAuthorizationData_usage,
1419 # KDC-REQ-BODY ::= SEQUENCE {
1420 # kdc-options [0] KDCOptions,
1421 # cname [1] PrincipalName OPTIONAL
1422 # -- Used only in AS-REQ --,
1425 # -- Also client's in AS-REQ --,
1426 # sname [3] PrincipalName OPTIONAL,
1427 # from [4] KerberosTime OPTIONAL,
1428 # till [5] KerberosTime,
1429 # rtime [6] KerberosTime OPTIONAL,
1431 # etype [8] SEQUENCE OF Int32
1433 # -- in preference order --,
1434 # addresses [9] HostAddresses OPTIONAL,
1435 # enc-authorization-data [10] EncryptedData OPTIONAL
1436 # -- AuthorizationData --,
1437 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1438 # -- NOTE: not empty
1440 if EncAuthorizationData is not None:
1441 enc_ad_plain = self.der_encode(
1442 EncAuthorizationData,
1443 asn1Spec=krb5_asn1.AuthorizationData(),
1444 asn1_print=asn1_print,
1446 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1447 EncAuthorizationData_usage,
1451 KDC_REQ_BODY_obj = {
1452 'kdc-options': kdc_options,
1458 if cname is not None:
1459 KDC_REQ_BODY_obj['cname'] = cname
1460 if sname is not None:
1461 KDC_REQ_BODY_obj['sname'] = sname
1462 if from_time is not None:
1463 KDC_REQ_BODY_obj['from'] = from_time
1464 if renew_time is not None:
1465 KDC_REQ_BODY_obj['rtime'] = renew_time
1466 if addresses is not None:
1467 KDC_REQ_BODY_obj['addresses'] = addresses
1468 if enc_ad is not None:
1469 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1470 if additional_tickets is not None:
1471 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1472 return KDC_REQ_BODY_obj
1474 def KDC_REQ_create(self,
1481 # KDC-REQ ::= SEQUENCE {
1482 # -- NOTE: first tag is [1], not [0]
1483 # pvno [1] INTEGER (5) ,
1484 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1485 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1486 # -- NOTE: not empty --,
1487 # req-body [4] KDC-REQ-BODY
1492 'msg-type': msg_type,
1493 'req-body': req_body,
1495 if padata is not None:
1496 KDC_REQ_obj['padata'] = padata
1497 if asn1Spec is not None:
1498 KDC_REQ_decoded = pyasn1_native_decode(
1499 KDC_REQ_obj, asn1Spec=asn1Spec)
1501 KDC_REQ_decoded = None
1502 return KDC_REQ_obj, KDC_REQ_decoded
1504 def AS_REQ_create(self,
1506 kdc_options, # required
1510 from_time, # optional
1511 till_time, # required
1512 renew_time, # optional
1515 addresses, # optional
1517 native_decoded_only=True,
1520 # KDC-REQ ::= SEQUENCE {
1521 # -- NOTE: first tag is [1], not [0]
1522 # pvno [1] INTEGER (5) ,
1523 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1524 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1525 # -- NOTE: not empty --,
1526 # req-body [4] KDC-REQ-BODY
1529 # KDC-REQ-BODY ::= SEQUENCE {
1530 # kdc-options [0] KDCOptions,
1531 # cname [1] PrincipalName OPTIONAL
1532 # -- Used only in AS-REQ --,
1535 # -- Also client's in AS-REQ --,
1536 # sname [3] PrincipalName OPTIONAL,
1537 # from [4] KerberosTime OPTIONAL,
1538 # till [5] KerberosTime,
1539 # rtime [6] KerberosTime OPTIONAL,
1541 # etype [8] SEQUENCE OF Int32
1543 # -- in preference order --,
1544 # addresses [9] HostAddresses OPTIONAL,
1545 # enc-authorization-data [10] EncryptedData OPTIONAL
1546 # -- AuthorizationData --,
1547 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1548 # -- NOTE: not empty
1550 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1562 EncAuthorizationData=None,
1563 EncAuthorizationData_key=None,
1564 EncAuthorizationData_usage=None,
1565 asn1_print=asn1_print,
1567 obj, decoded = self.KDC_REQ_create(
1568 msg_type=KRB_AS_REQ,
1570 req_body=KDC_REQ_BODY_obj,
1571 asn1Spec=krb5_asn1.AS_REQ(),
1572 asn1_print=asn1_print,
1574 if native_decoded_only:
1578 def AP_REQ_create(self, ap_options, ticket, authenticator):
1579 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1580 # pvno [0] INTEGER (5),
1581 # msg-type [1] INTEGER (14),
1582 # ap-options [2] APOptions,
1583 # ticket [3] Ticket,
1584 # authenticator [4] EncryptedData -- Authenticator
1588 'msg-type': KRB_AP_REQ,
1589 'ap-options': ap_options,
1591 'authenticator': authenticator,
1595 def Authenticator_create(
1596 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1597 authorization_data):
1598 # -- Unencrypted authenticator
1599 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1600 # authenticator-vno [0] INTEGER (5),
1602 # cname [2] PrincipalName,
1603 # cksum [3] Checksum OPTIONAL,
1604 # cusec [4] Microseconds,
1605 # ctime [5] KerberosTime,
1606 # subkey [6] EncryptionKey OPTIONAL,
1607 # seq-number [7] UInt32 OPTIONAL,
1608 # authorization-data [8] AuthorizationData OPTIONAL
1610 Authenticator_obj = {
1611 'authenticator-vno': 5,
1617 if cksum is not None:
1618 Authenticator_obj['cksum'] = cksum
1619 if subkey is not None:
1620 Authenticator_obj['subkey'] = subkey
1621 if seq_number is not None:
1622 Authenticator_obj['seq-number'] = seq_number
1623 if authorization_data is not None:
1624 Authenticator_obj['authorization-data'] = authorization_data
1625 return Authenticator_obj
1627 def TGS_REQ_create(self,
1632 kdc_options, # required
1636 from_time, # optional
1637 till_time, # required
1638 renew_time, # optional
1641 addresses, # optional
1642 EncAuthorizationData,
1643 EncAuthorizationData_key,
1646 authenticator_subkey=None,
1647 body_checksum_type=None,
1648 native_decoded_only=True,
1651 # KDC-REQ ::= SEQUENCE {
1652 # -- NOTE: first tag is [1], not [0]
1653 # pvno [1] INTEGER (5) ,
1654 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1655 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1656 # -- NOTE: not empty --,
1657 # req-body [4] KDC-REQ-BODY
1660 # KDC-REQ-BODY ::= SEQUENCE {
1661 # kdc-options [0] KDCOptions,
1662 # cname [1] PrincipalName OPTIONAL
1663 # -- Used only in AS-REQ --,
1666 # -- Also client's in AS-REQ --,
1667 # sname [3] PrincipalName OPTIONAL,
1668 # from [4] KerberosTime OPTIONAL,
1669 # till [5] KerberosTime,
1670 # rtime [6] KerberosTime OPTIONAL,
1672 # etype [8] SEQUENCE OF Int32
1674 # -- in preference order --,
1675 # addresses [9] HostAddresses OPTIONAL,
1676 # enc-authorization-data [10] EncryptedData OPTIONAL
1677 # -- AuthorizationData --,
1678 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1679 # -- NOTE: not empty
1682 if authenticator_subkey is not None:
1683 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1685 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1687 req_body = self.KDC_REQ_BODY_create(
1688 kdc_options=kdc_options,
1692 from_time=from_time,
1693 till_time=till_time,
1694 renew_time=renew_time,
1697 addresses=addresses,
1698 additional_tickets=additional_tickets,
1699 EncAuthorizationData=EncAuthorizationData,
1700 EncAuthorizationData_key=EncAuthorizationData_key,
1701 EncAuthorizationData_usage=EncAuthorizationData_usage)
1702 req_body_blob = self.der_encode(req_body,
1703 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1704 asn1_print=asn1_print, hexdump=hexdump)
1706 req_body_checksum = self.Checksum_create(ticket_session_key,
1707 KU_TGS_REQ_AUTH_CKSUM,
1709 ctype=body_checksum_type)
1712 if authenticator_subkey is not None:
1713 subkey_obj = authenticator_subkey.export_obj()
1714 seq_number = random.randint(0, 0xfffffffe)
1715 authenticator = self.Authenticator_create(
1718 cksum=req_body_checksum,
1722 seq_number=seq_number,
1723 authorization_data=None)
1724 authenticator = self.der_encode(
1726 asn1Spec=krb5_asn1.Authenticator(),
1727 asn1_print=asn1_print,
1730 authenticator = self.EncryptedData_create(
1731 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1733 ap_options = krb5_asn1.APOptions('0')
1734 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1736 authenticator=authenticator)
1737 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1738 asn1_print=asn1_print, hexdump=hexdump)
1739 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1740 if padata is not None:
1741 padata.append(pa_tgs_req)
1743 padata = [pa_tgs_req]
1745 obj, decoded = self.KDC_REQ_create(
1746 msg_type=KRB_TGS_REQ,
1749 asn1Spec=krb5_asn1.TGS_REQ(),
1750 asn1_print=asn1_print,
1752 if native_decoded_only:
1756 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1757 # PA-S4U2Self ::= SEQUENCE {
1758 # name [0] PrincipalName,
1760 # cksum [2] Checksum,
1761 # auth [3] GeneralString
1763 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1764 for n in name['name-string']:
1765 cksum_data += n.encode()
1766 cksum_data += realm.encode()
1767 cksum_data += "Kerberos".encode()
1768 cksum = self.Checksum_create(tgt_session_key,
1769 KU_NON_KERB_CKSUM_SALT,
1779 pa_s4u2self = self.der_encode(
1780 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1781 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1783 def _generic_kdc_exchange(self,
1784 kdc_exchange_dict, # required
1785 cname=None, # optional
1786 realm=None, # required
1787 sname=None, # optional
1788 from_time=None, # optional
1789 till_time=None, # required
1790 renew_time=None, # optional
1791 etypes=None, # required
1792 addresses=None, # optional
1793 additional_tickets=None, # optional
1794 EncAuthorizationData=None, # optional
1795 EncAuthorizationData_key=None, # optional
1796 EncAuthorizationData_usage=None): # optional
1798 check_error_fn = kdc_exchange_dict['check_error_fn']
1799 check_rep_fn = kdc_exchange_dict['check_rep_fn']
1800 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1801 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1802 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1803 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1804 callback_dict = kdc_exchange_dict['callback_dict']
1805 req_msg_type = kdc_exchange_dict['req_msg_type']
1806 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1807 rep_msg_type = kdc_exchange_dict['rep_msg_type']
1809 expected_error_mode = kdc_exchange_dict['expected_error_mode']
1810 kdc_options = kdc_exchange_dict['kdc_options']
1812 pac_request = kdc_exchange_dict['pac_request']
1813 pac_options = kdc_exchange_dict['pac_options']
1815 # Parameters specific to the inner request body
1816 inner_req = kdc_exchange_dict['inner_req']
1818 # Parameters specific to the outer request body
1819 outer_req = kdc_exchange_dict['outer_req']
1821 if till_time is None:
1822 till_time = self.get_KerberosTime(offset=36000)
1824 if 'nonce' in kdc_exchange_dict:
1825 nonce = kdc_exchange_dict['nonce']
1827 nonce = self.get_Nonce()
1828 kdc_exchange_dict['nonce'] = nonce
1830 req_body = self.KDC_REQ_BODY_create(
1831 kdc_options=kdc_options,
1835 from_time=from_time,
1836 till_time=till_time,
1837 renew_time=renew_time,
1840 addresses=addresses,
1841 additional_tickets=additional_tickets,
1842 EncAuthorizationData=EncAuthorizationData,
1843 EncAuthorizationData_key=EncAuthorizationData_key,
1844 EncAuthorizationData_usage=EncAuthorizationData_usage)
1846 inner_req_body = dict(req_body)
1847 if inner_req is not None:
1848 for key, value in inner_req.items():
1849 if value is not None:
1850 inner_req_body[key] = value
1852 del inner_req_body[key]
1853 if outer_req is not None:
1854 for key, value in outer_req.items():
1855 if value is not None:
1856 req_body[key] = value
1860 additional_padata = []
1861 if pac_request is not None:
1862 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1863 additional_padata.append(pa_pac_request)
1864 if pac_options is not None:
1865 pa_pac_options = self.get_pa_pac_options(pac_options)
1866 additional_padata.append(pa_pac_options)
1868 if req_msg_type == KRB_AS_REQ:
1870 tgs_req_padata = None
1872 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1874 tgs_req = self.generate_ap_req(kdc_exchange_dict,
1878 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1880 if generate_fast_padata_fn is not None:
1881 self.assertIsNotNone(generate_fast_fn)
1882 # This can alter req_body...
1883 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1889 if generate_fast_armor_fn is not None:
1890 self.assertIsNotNone(generate_fast_fn)
1891 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1896 fast_armor_type = kdc_exchange_dict['fast_armor_type']
1897 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1902 if generate_padata_fn is not None:
1903 # This can alter req_body...
1904 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1907 self.assertIsNotNone(outer_padata)
1908 self.assertNotIn(PADATA_KDC_REQ,
1909 [pa['padata-type'] for pa in outer_padata],
1910 'Don\'t create TGS-REQ manually')
1914 if generate_fast_fn is not None:
1915 armor_key = kdc_exchange_dict['armor_key']
1916 self.assertIsNotNone(armor_key)
1918 if req_msg_type == KRB_AS_REQ:
1919 checksum_blob = self.der_encode(
1921 asn1Spec=krb5_asn1.KDC_REQ_BODY())
1923 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1924 checksum_blob = tgs_req
1926 checksum = self.Checksum_create(armor_key,
1930 fast_padata += additional_padata
1931 fast = generate_fast_fn(kdc_exchange_dict,
1942 if tgs_req_padata is not None:
1943 padata.append(tgs_req_padata)
1945 if fast is not None:
1948 if outer_padata is not None:
1949 padata += outer_padata
1952 padata += additional_padata
1957 kdc_exchange_dict['req_padata'] = padata
1958 kdc_exchange_dict['fast_padata'] = fast_padata
1959 kdc_exchange_dict['req_body'] = inner_req_body
1961 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1964 asn1Spec=req_asn1Spec())
1966 to_rodc = kdc_exchange_dict['to_rodc']
1968 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1969 self.assertIsNotNone(rep)
1971 msg_type = self.getElementValue(rep, 'msg-type')
1972 self.assertIsNotNone(msg_type)
1974 expected_msg_type = None
1975 if check_error_fn is not None:
1976 expected_msg_type = KRB_ERROR
1977 self.assertIsNone(check_rep_fn)
1978 self.assertNotEqual(0, len(expected_error_mode))
1979 self.assertNotIn(0, expected_error_mode)
1980 if check_rep_fn is not None:
1981 expected_msg_type = rep_msg_type
1982 self.assertIsNone(check_error_fn)
1983 self.assertEqual(0, len(expected_error_mode))
1984 self.assertIsNotNone(expected_msg_type)
1985 if msg_type == KRB_ERROR:
1986 error_code = self.getElementValue(rep, 'error-code')
1987 fail_msg = f'Got unexpected error: {error_code}'
1989 fail_msg = f'Expected to fail with error: {expected_error_mode}'
1990 self.assertEqual(msg_type, expected_msg_type, fail_msg)
1992 if msg_type == KRB_ERROR:
1993 return check_error_fn(kdc_exchange_dict,
1997 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1999 def as_exchange_dict(self,
2000 expected_crealm=None,
2001 expected_cname=None,
2002 expected_anon=False,
2003 expected_srealm=None,
2004 expected_sname=None,
2005 expected_account_name=None,
2006 expected_upn_name=None,
2008 expected_supported_etypes=None,
2009 expected_flags=None,
2010 unexpected_flags=None,
2011 ticket_decryption_key=None,
2012 expect_ticket_checksum=None,
2013 generate_fast_fn=None,
2014 generate_fast_armor_fn=None,
2015 generate_fast_padata_fn=None,
2016 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2017 generate_padata_fn=None,
2018 check_error_fn=None,
2020 check_kdc_private_fn=None,
2022 expected_error_mode=0,
2023 expected_status=None,
2024 client_as_etypes=None,
2026 authenticator_subkey=None,
2040 expect_upn_dns_info_ex=None,
2041 expect_pac_attrs=None,
2042 expect_pac_attrs_pac_request=None,
2043 expect_requester_sid=None,
2045 if expected_error_mode == 0:
2046 expected_error_mode = ()
2047 elif not isinstance(expected_error_mode, collections.abc.Container):
2048 expected_error_mode = (expected_error_mode,)
2050 kdc_exchange_dict = {
2051 'req_msg_type': KRB_AS_REQ,
2052 'req_asn1Spec': krb5_asn1.AS_REQ,
2053 'rep_msg_type': KRB_AS_REP,
2054 'rep_asn1Spec': krb5_asn1.AS_REP,
2055 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2056 'expected_crealm': expected_crealm,
2057 'expected_cname': expected_cname,
2058 'expected_anon': expected_anon,
2059 'expected_srealm': expected_srealm,
2060 'expected_sname': expected_sname,
2061 'expected_account_name': expected_account_name,
2062 'expected_upn_name': expected_upn_name,
2063 'expected_sid': expected_sid,
2064 'expected_supported_etypes': expected_supported_etypes,
2065 'expected_flags': expected_flags,
2066 'unexpected_flags': unexpected_flags,
2067 'ticket_decryption_key': ticket_decryption_key,
2068 'expect_ticket_checksum': expect_ticket_checksum,
2069 'generate_fast_fn': generate_fast_fn,
2070 'generate_fast_armor_fn': generate_fast_armor_fn,
2071 'generate_fast_padata_fn': generate_fast_padata_fn,
2072 'fast_armor_type': fast_armor_type,
2073 'generate_padata_fn': generate_padata_fn,
2074 'check_error_fn': check_error_fn,
2075 'check_rep_fn': check_rep_fn,
2076 'check_kdc_private_fn': check_kdc_private_fn,
2077 'callback_dict': callback_dict,
2078 'expected_error_mode': expected_error_mode,
2079 'expected_status': expected_status,
2080 'client_as_etypes': client_as_etypes,
2081 'expected_salt': expected_salt,
2082 'authenticator_subkey': authenticator_subkey,
2083 'preauth_key': preauth_key,
2084 'armor_key': armor_key,
2085 'armor_tgt': armor_tgt,
2086 'armor_subkey': armor_subkey,
2087 'auth_data': auth_data,
2088 'kdc_options': kdc_options,
2089 'inner_req': inner_req,
2090 'outer_req': outer_req,
2091 'pac_request': pac_request,
2092 'pac_options': pac_options,
2093 'expect_edata': expect_edata,
2094 'expect_pac': expect_pac,
2095 'expect_claims': expect_claims,
2096 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2097 'expect_pac_attrs': expect_pac_attrs,
2098 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2099 'expect_requester_sid': expect_requester_sid,
2102 if callback_dict is None:
2105 return kdc_exchange_dict
2107 def tgs_exchange_dict(self,
2108 expected_crealm=None,
2109 expected_cname=None,
2110 expected_anon=False,
2111 expected_srealm=None,
2112 expected_sname=None,
2113 expected_account_name=None,
2114 expected_upn_name=None,
2116 expected_supported_etypes=None,
2117 expected_flags=None,
2118 unexpected_flags=None,
2119 ticket_decryption_key=None,
2120 expect_ticket_checksum=None,
2121 generate_fast_fn=None,
2122 generate_fast_armor_fn=None,
2123 generate_fast_padata_fn=None,
2124 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2125 generate_padata_fn=None,
2126 check_error_fn=None,
2128 check_kdc_private_fn=None,
2129 expected_error_mode=0,
2130 expected_status=None,
2136 authenticator_subkey=None,
2138 body_checksum_type=None,
2147 expect_upn_dns_info_ex=None,
2148 expect_pac_attrs=None,
2149 expect_pac_attrs_pac_request=None,
2150 expect_requester_sid=None,
2151 expected_proxy_target=None,
2152 expected_transited_services=None,
2154 if expected_error_mode == 0:
2155 expected_error_mode = ()
2156 elif not isinstance(expected_error_mode, collections.abc.Container):
2157 expected_error_mode = (expected_error_mode,)
2159 kdc_exchange_dict = {
2160 'req_msg_type': KRB_TGS_REQ,
2161 'req_asn1Spec': krb5_asn1.TGS_REQ,
2162 'rep_msg_type': KRB_TGS_REP,
2163 'rep_asn1Spec': krb5_asn1.TGS_REP,
2164 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2165 'expected_crealm': expected_crealm,
2166 'expected_cname': expected_cname,
2167 'expected_anon': expected_anon,
2168 'expected_srealm': expected_srealm,
2169 'expected_sname': expected_sname,
2170 'expected_account_name': expected_account_name,
2171 'expected_upn_name': expected_upn_name,
2172 'expected_sid': expected_sid,
2173 'expected_supported_etypes': expected_supported_etypes,
2174 'expected_flags': expected_flags,
2175 'unexpected_flags': unexpected_flags,
2176 'ticket_decryption_key': ticket_decryption_key,
2177 'expect_ticket_checksum': expect_ticket_checksum,
2178 'generate_fast_fn': generate_fast_fn,
2179 'generate_fast_armor_fn': generate_fast_armor_fn,
2180 'generate_fast_padata_fn': generate_fast_padata_fn,
2181 'fast_armor_type': fast_armor_type,
2182 'generate_padata_fn': generate_padata_fn,
2183 'check_error_fn': check_error_fn,
2184 'check_rep_fn': check_rep_fn,
2185 'check_kdc_private_fn': check_kdc_private_fn,
2186 'callback_dict': callback_dict,
2187 'expected_error_mode': expected_error_mode,
2188 'expected_status': expected_status,
2190 'body_checksum_type': body_checksum_type,
2191 'armor_key': armor_key,
2192 'armor_tgt': armor_tgt,
2193 'armor_subkey': armor_subkey,
2194 'auth_data': auth_data,
2195 'authenticator_subkey': authenticator_subkey,
2196 'kdc_options': kdc_options,
2197 'inner_req': inner_req,
2198 'outer_req': outer_req,
2199 'pac_request': pac_request,
2200 'pac_options': pac_options,
2201 'expect_edata': expect_edata,
2202 'expect_pac': expect_pac,
2203 'expect_claims': expect_claims,
2204 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2205 'expect_pac_attrs': expect_pac_attrs,
2206 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2207 'expect_requester_sid': expect_requester_sid,
2208 'expected_proxy_target': expected_proxy_target,
2209 'expected_transited_services': expected_transited_services,
2212 if callback_dict is None:
2215 return kdc_exchange_dict
2217 def generic_check_kdc_rep(self,
2222 expected_crealm = kdc_exchange_dict['expected_crealm']
2223 expected_anon = kdc_exchange_dict['expected_anon']
2224 expected_srealm = kdc_exchange_dict['expected_srealm']
2225 expected_sname = kdc_exchange_dict['expected_sname']
2226 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2227 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2228 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2229 msg_type = kdc_exchange_dict['rep_msg_type']
2230 armor_key = kdc_exchange_dict['armor_key']
2232 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2233 padata = self.getElementValue(rep, 'padata')
2234 if self.strict_checking:
2235 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2237 expected_cname = self.PrincipalName_create(
2238 name_type=NT_WELLKNOWN,
2239 names=['WELLKNOWN', 'ANONYMOUS'])
2241 expected_cname = kdc_exchange_dict['expected_cname']
2242 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2243 self.assertElementPresent(rep, 'ticket')
2244 ticket = self.getElementValue(rep, 'ticket')
2245 ticket_encpart = None
2246 ticket_cipher = None
2247 self.assertIsNotNone(ticket)
2248 if ticket is not None: # Never None, but gives indentation
2249 self.assertElementEqual(ticket, 'tkt-vno', 5)
2250 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2251 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2252 self.assertElementPresent(ticket, 'enc-part')
2253 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2254 self.assertIsNotNone(ticket_encpart)
2255 if ticket_encpart is not None: # Never None, but gives indentation
2256 self.assertElementPresent(ticket_encpart, 'etype')
2258 kdc_options = kdc_exchange_dict['kdc_options']
2259 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
2260 expect_kvno = (pos >= len(kdc_options)
2261 or kdc_options[pos] != '1')
2263 # 'unspecified' means present, with any value != 0
2264 self.assertElementKVNO(ticket_encpart, 'kvno',
2265 self.unspecified_kvno)
2267 # For user-to-user, don't expect a kvno.
2268 self.assertElementMissing(ticket_encpart, 'kvno')
2270 self.assertElementPresent(ticket_encpart, 'cipher')
2271 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2272 self.assertElementPresent(rep, 'enc-part')
2273 encpart = self.getElementValue(rep, 'enc-part')
2274 encpart_cipher = None
2275 self.assertIsNotNone(encpart)
2276 if encpart is not None: # Never None, but gives indentation
2277 self.assertElementPresent(encpart, 'etype')
2278 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2279 self.assertElementPresent(encpart, 'cipher')
2280 encpart_cipher = self.getElementValue(encpart, 'cipher')
2282 ticket_checksum = None
2284 # Get the decryption key for the encrypted part
2285 encpart_decryption_key, encpart_decryption_usage = (
2286 self.get_preauth_key(kdc_exchange_dict))
2288 if armor_key is not None:
2289 pa_dict = self.get_pa_dict(padata)
2291 if PADATA_FX_FAST in pa_dict:
2292 fx_fast_data = pa_dict[PADATA_FX_FAST]
2293 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2298 if 'strengthen-key' in fast_response:
2299 strengthen_key = self.EncryptionKey_import(
2300 fast_response['strengthen-key'])
2301 encpart_decryption_key = (
2302 self.generate_strengthen_reply_key(
2304 encpart_decryption_key))
2306 fast_finished = fast_response.get('finished')
2307 if fast_finished is not None:
2308 ticket_checksum = fast_finished['ticket-checksum']
2310 self.check_rep_padata(kdc_exchange_dict,
2312 fast_response['padata'],
2315 ticket_private = None
2316 if ticket_decryption_key is not None:
2317 self.assertElementEqual(ticket_encpart, 'etype',
2318 ticket_decryption_key.etype)
2319 self.assertElementKVNO(ticket_encpart, 'kvno',
2320 ticket_decryption_key.kvno)
2321 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2323 ticket_private = self.der_decode(
2325 asn1Spec=krb5_asn1.EncTicketPart())
2327 encpart_private = None
2328 self.assertIsNotNone(encpart_decryption_key)
2329 if encpart_decryption_key is not None:
2330 self.assertElementEqual(encpart, 'etype',
2331 encpart_decryption_key.etype)
2332 if self.strict_checking:
2333 self.assertElementKVNO(encpart, 'kvno',
2334 encpart_decryption_key.kvno)
2335 rep_decpart = encpart_decryption_key.decrypt(
2336 encpart_decryption_usage,
2338 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2339 # application tag 26
2341 encpart_private = self.der_decode(
2343 asn1Spec=rep_encpart_asn1Spec())
2345 encpart_private = self.der_decode(
2347 asn1Spec=krb5_asn1.EncTGSRepPart())
2349 self.assertIsNotNone(check_kdc_private_fn)
2350 if check_kdc_private_fn is not None:
2351 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2352 rep, ticket_private, encpart_private,
2357 def check_fx_fast_data(self,
2362 expect_strengthen_key=True):
2363 fx_fast_data = self.der_decode(fx_fast_data,
2364 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2366 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2367 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2369 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2371 fast_response = self.der_decode(fast_rep,
2372 asn1Spec=krb5_asn1.KrbFastResponse())
2374 if expect_strengthen_key and self.strict_checking:
2375 self.assertIn('strengthen-key', fast_response)
2378 self.assertIn('finished', fast_response)
2380 # Ensure that the nonce matches the nonce in the body of the request
2382 nonce = kdc_exchange_dict['nonce']
2383 self.assertEqual(nonce, fast_response['nonce'])
2385 return fast_response
2387 def generic_check_kdc_private(self,
2394 kdc_options = kdc_exchange_dict['kdc_options']
2395 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2396 canonicalize = (canon_pos < len(kdc_options)
2397 and kdc_options[canon_pos] == '1')
2398 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2399 renewable = (renewable_pos < len(kdc_options)
2400 and kdc_options[renewable_pos] == '1')
2401 renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
2402 renew = (renew_pos < len(kdc_options)
2403 and kdc_options[renew_pos] == '1')
2404 expect_renew_till = renewable or renew
2406 expected_crealm = kdc_exchange_dict['expected_crealm']
2407 expected_cname = kdc_exchange_dict['expected_cname']
2408 expected_srealm = kdc_exchange_dict['expected_srealm']
2409 expected_sname = kdc_exchange_dict['expected_sname']
2410 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2412 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2414 expected_flags = kdc_exchange_dict.get('expected_flags')
2415 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2417 ticket = self.getElementValue(rep, 'ticket')
2419 if ticket_checksum is not None:
2420 armor_key = kdc_exchange_dict['armor_key']
2421 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2423 to_rodc = kdc_exchange_dict['to_rodc']
2425 krbtgt_creds = self.get_rodc_krbtgt_creds()
2427 krbtgt_creds = self.get_krbtgt_creds()
2428 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2430 krbtgt_keys = [krbtgt_key]
2431 if not self.strict_checking:
2432 krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
2434 etype=kcrypto.Enctype.RC4)
2435 krbtgt_keys.append(krbtgt_key_rc4)
2437 if self.expect_pac and self.is_tgs(expected_sname):
2440 expect_pac = kdc_exchange_dict['expect_pac']
2442 ticket_session_key = None
2443 if ticket_private is not None:
2444 self.assertElementFlags(ticket_private, 'flags',
2447 self.assertElementPresent(ticket_private, 'key')
2448 ticket_key = self.getElementValue(ticket_private, 'key')
2449 self.assertIsNotNone(ticket_key)
2450 if ticket_key is not None: # Never None, but gives indentation
2451 self.assertElementPresent(ticket_key, 'keytype')
2452 self.assertElementPresent(ticket_key, 'keyvalue')
2453 ticket_session_key = self.EncryptionKey_import(ticket_key)
2454 self.assertElementEqualUTF8(ticket_private, 'crealm',
2456 if self.strict_checking:
2457 self.assertElementEqualPrincipal(ticket_private, 'cname',
2459 self.assertElementPresent(ticket_private, 'transited')
2460 self.assertElementPresent(ticket_private, 'authtime')
2461 if self.strict_checking:
2462 self.assertElementPresent(ticket_private, 'starttime')
2463 self.assertElementPresent(ticket_private, 'endtime')
2464 if expect_renew_till:
2465 if self.strict_checking:
2466 self.assertElementPresent(ticket_private, 'renew-till')
2468 self.assertElementMissing(ticket_private, 'renew-till')
2469 if self.strict_checking:
2470 self.assertElementEqual(ticket_private, 'caddr', [])
2471 if expect_pac is not None:
2472 self.assertElementPresent(ticket_private, 'authorization-data',
2473 expect_empty=not expect_pac)
2475 encpart_session_key = None
2476 if encpart_private is not None:
2477 self.assertElementPresent(encpart_private, 'key')
2478 encpart_key = self.getElementValue(encpart_private, 'key')
2479 self.assertIsNotNone(encpart_key)
2480 if encpart_key is not None: # Never None, but gives indentation
2481 self.assertElementPresent(encpart_key, 'keytype')
2482 self.assertElementPresent(encpart_key, 'keyvalue')
2483 encpart_session_key = self.EncryptionKey_import(encpart_key)
2484 self.assertElementPresent(encpart_private, 'last-req')
2485 self.assertElementEqual(encpart_private, 'nonce',
2486 kdc_exchange_dict['nonce'])
2487 if rep_msg_type == KRB_AS_REP:
2488 if self.strict_checking:
2489 self.assertElementPresent(encpart_private,
2492 self.assertElementMissing(encpart_private,
2494 self.assertElementFlags(encpart_private, 'flags',
2497 self.assertElementPresent(encpart_private, 'authtime')
2498 if self.strict_checking:
2499 self.assertElementPresent(encpart_private, 'starttime')
2500 self.assertElementPresent(encpart_private, 'endtime')
2501 if expect_renew_till:
2502 if self.strict_checking:
2503 self.assertElementPresent(encpart_private, 'renew-till')
2505 self.assertElementMissing(encpart_private, 'renew-till')
2506 self.assertElementEqualUTF8(encpart_private, 'srealm',
2508 self.assertElementEqualPrincipal(encpart_private, 'sname',
2510 if self.strict_checking:
2511 self.assertElementEqual(encpart_private, 'caddr', [])
2513 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2515 if self.strict_checking:
2516 if canonicalize or '1' in sent_pac_options:
2517 self.assertElementPresent(encpart_private,
2518 'encrypted-pa-data')
2519 enc_pa_dict = self.get_pa_dict(
2520 encpart_private['encrypted-pa-data'])
2522 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2524 expected_supported_etypes = kdc_exchange_dict[
2525 'expected_supported_etypes']
2526 expected_supported_etypes |= (
2527 security.KERB_ENCTYPE_DES_CBC_CRC |
2528 security.KERB_ENCTYPE_DES_CBC_MD5 |
2529 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2531 (supported_etypes,) = struct.unpack(
2533 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2535 self.assertEqual(supported_etypes,
2536 expected_supported_etypes)
2538 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2540 if '1' in sent_pac_options:
2541 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2543 pac_options = self.der_decode(
2544 enc_pa_dict[PADATA_PAC_OPTIONS],
2545 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2547 self.assertElementEqual(pac_options, 'options',
2550 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2552 self.assertElementEqual(encpart_private,
2553 'encrypted-pa-data',
2556 if ticket_session_key is not None and encpart_session_key is not None:
2557 self.assertEqual(ticket_session_key.etype,
2558 encpart_session_key.etype)
2559 self.assertEqual(ticket_session_key.key.contents,
2560 encpart_session_key.key.contents)
2561 if encpart_session_key is not None:
2562 session_key = encpart_session_key
2564 session_key = ticket_session_key
2565 ticket_creds = KerberosTicketCreds(
2568 crealm=expected_crealm,
2569 cname=expected_cname,
2570 srealm=expected_srealm,
2571 sname=expected_sname,
2572 decryption_key=ticket_decryption_key,
2573 ticket_private=ticket_private,
2574 encpart_private=encpart_private)
2576 if ticket_private is not None:
2577 pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
2578 if expect_pac is True:
2579 self.assertIsNotNone(pac_data)
2580 elif expect_pac is False:
2581 self.assertIsNone(pac_data)
2583 if pac_data is not None:
2584 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2586 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2587 if expect_ticket_checksum:
2588 self.assertIsNotNone(ticket_decryption_key)
2590 if ticket_decryption_key is not None:
2591 service_ticket = (not self.is_tgs(expected_sname)
2592 and rep_msg_type == KRB_TGS_REP)
2593 self.verify_ticket(ticket_creds, krbtgt_keys,
2594 service_ticket=service_ticket,
2595 expect_pac=expect_pac,
2596 expect_ticket_checksum=expect_ticket_checksum
2597 or self.tkt_sig_support)
2599 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2601 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2602 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2604 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2605 armor_tgt = kdc_exchange_dict['armor_tgt']
2607 expected_sname = kdc_exchange_dict['expected_sname']
2608 expect_claims = kdc_exchange_dict['expect_claims']
2610 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2611 krb5pac.PAC_TYPE_SRV_CHECKSUM,
2612 krb5pac.PAC_TYPE_KDC_CHECKSUM,
2613 krb5pac.PAC_TYPE_LOGON_NAME,
2614 krb5pac.PAC_TYPE_UPN_DNS_INFO]
2616 kdc_options = kdc_exchange_dict['kdc_options']
2617 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2618 constrained_delegation = (pos < len(kdc_options)
2619 and kdc_options[pos] == '1')
2620 if constrained_delegation:
2621 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2623 if self.kdc_fast_support:
2625 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2627 if (rep_msg_type == KRB_TGS_REP
2628 and armor_tgt is not None):
2629 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2630 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2632 if not self.is_tgs(expected_sname) and rep_msg_type == KRB_TGS_REP:
2633 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2635 require_strict = {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO}
2636 if not self.tkt_sig_support:
2637 require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2639 expect_extra_pac_buffers = self.is_tgs(expected_sname)
2641 expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
2643 if expect_pac_attrs:
2644 expect_pac_attrs_pac_request = kdc_exchange_dict[
2645 'expect_pac_attrs_pac_request']
2647 expect_pac_attrs_pac_request = kdc_exchange_dict[
2650 if expect_pac_attrs is None:
2651 if self.expect_extra_pac_buffers:
2652 expect_pac_attrs = expect_extra_pac_buffers
2654 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
2655 if expect_pac_attrs:
2656 expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
2658 expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
2660 if expect_requester_sid is None:
2661 if self.expect_extra_pac_buffers:
2662 expect_requester_sid = expect_extra_pac_buffers
2664 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
2665 if expect_requester_sid:
2666 expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
2668 buffer_types = [pac_buffer.type
2669 for pac_buffer in pac.buffers]
2670 self.assertSequenceElementsEqual(
2671 expected_types, buffer_types,
2672 require_ordered=False,
2673 require_strict=require_strict)
2675 expected_account_name = kdc_exchange_dict['expected_account_name']
2676 expected_sid = kdc_exchange_dict['expected_sid']
2678 expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
2679 if expect_upn_dns_info_ex is None and (
2680 expected_account_name is not None
2681 or expected_sid is not None):
2682 expect_upn_dns_info_ex = True
2684 for pac_buffer in pac.buffers:
2685 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
2686 expected_proxy_target = kdc_exchange_dict[
2687 'expected_proxy_target']
2688 expected_transited_services = kdc_exchange_dict[
2689 'expected_transited_services']
2691 delegation_info = pac_buffer.info.info
2693 self.assertEqual(expected_proxy_target,
2694 str(delegation_info.proxy_target))
2696 transited_services = list(map(
2697 str, delegation_info.transited_services))
2698 self.assertEqual(expected_transited_services,
2701 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2702 expected_cname = kdc_exchange_dict['expected_cname']
2703 account_name = expected_cname['name-string'][0]
2705 self.assertEqual(account_name, pac_buffer.info.account_name)
2707 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2708 logon_info = pac_buffer.info.info.info3.base
2710 if expected_account_name is not None:
2711 self.assertEqual(expected_account_name,
2712 str(logon_info.account_name))
2714 if expected_sid is not None:
2715 expected_rid = int(expected_sid.rsplit('-', 1)[1])
2716 self.assertEqual(expected_rid, logon_info.rid)
2718 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
2719 upn_dns_info = pac_buffer.info
2720 upn_dns_info_ex = upn_dns_info.ex
2722 expected_realm = kdc_exchange_dict['expected_crealm']
2723 self.assertEqual(expected_realm,
2724 upn_dns_info.dns_domain_name)
2726 expected_upn_name = kdc_exchange_dict['expected_upn_name']
2727 if expected_upn_name is not None:
2728 self.assertEqual(expected_upn_name,
2729 upn_dns_info.upn_name)
2731 if expect_upn_dns_info_ex:
2732 self.assertIsNotNone(upn_dns_info_ex)
2734 if upn_dns_info_ex is not None:
2735 if expected_account_name is not None:
2736 self.assertEqual(expected_account_name,
2737 upn_dns_info_ex.samaccountname)
2739 if expected_sid is not None:
2740 self.assertEqual(expected_sid,
2741 str(upn_dns_info_ex.objectsid))
2743 elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
2744 and expect_pac_attrs):
2745 attr_info = pac_buffer.info
2747 self.assertEqual(2, attr_info.flags_length)
2749 flags = attr_info.flags
2751 requested_pac = bool(flags & 1)
2752 given_pac = bool(flags & 2)
2754 self.assertEqual(expect_pac_attrs_pac_request is True,
2756 self.assertEqual(expect_pac_attrs_pac_request is None,
2759 elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
2760 and expect_requester_sid):
2761 requester_sid = pac_buffer.info.sid
2763 if expected_sid is not None:
2764 self.assertEqual(expected_sid, str(requester_sid))
2766 def generic_check_kdc_error(self,
2772 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2774 expected_anon = kdc_exchange_dict['expected_anon']
2775 expected_srealm = kdc_exchange_dict['expected_srealm']
2776 expected_sname = kdc_exchange_dict['expected_sname']
2777 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2779 sent_fast = self.sent_fast(kdc_exchange_dict)
2781 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2783 self.assertElementEqual(rep, 'pvno', 5)
2784 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2785 error_code = self.getElementValue(rep, 'error-code')
2786 self.assertIn(error_code, expected_error_mode)
2787 if self.strict_checking:
2788 self.assertElementMissing(rep, 'ctime')
2789 self.assertElementMissing(rep, 'cusec')
2790 self.assertElementPresent(rep, 'stime')
2791 self.assertElementPresent(rep, 'susec')
2792 # error-code checked above
2793 if self.strict_checking:
2794 self.assertElementMissing(rep, 'crealm')
2795 if expected_anon and not inner:
2796 expected_cname = self.PrincipalName_create(
2797 name_type=NT_WELLKNOWN,
2798 names=['WELLKNOWN', 'ANONYMOUS'])
2799 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2801 self.assertElementMissing(rep, 'cname')
2802 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2803 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2804 self.assertElementMissing(rep, 'e-text')
2805 expected_status = kdc_exchange_dict['expected_status']
2806 expect_edata = kdc_exchange_dict['expect_edata']
2807 if expect_edata is None:
2808 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2809 and (not sent_fast or fast_armor_type is None
2810 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
2812 if not expect_edata:
2813 self.assertIsNone(expected_status)
2814 self.assertElementMissing(rep, 'e-data')
2816 edata = self.getElementValue(rep, 'e-data')
2817 if self.strict_checking:
2818 self.assertIsNotNone(edata)
2819 if edata is not None:
2820 if rep_msg_type == KRB_TGS_REP and not sent_fast:
2821 error_data = self.der_decode(
2823 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
2824 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
2825 error_data['data-type'])
2827 extended_error = error_data['data-value']
2829 self.assertEqual(12, len(extended_error))
2831 status = int.from_bytes(extended_error[:4], 'little')
2832 flags = int.from_bytes(extended_error[8:], 'little')
2834 self.assertEqual(expected_status, status)
2836 self.assertEqual(3, flags)
2838 self.assertIsNone(expected_status)
2840 rep_padata = self.der_decode(edata,
2841 asn1Spec=krb5_asn1.METHOD_DATA())
2842 self.assertGreater(len(rep_padata), 0)
2845 self.assertEqual(1, len(rep_padata))
2846 rep_pa_dict = self.get_pa_dict(rep_padata)
2847 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2849 armor_key = kdc_exchange_dict['armor_key']
2850 self.assertIsNotNone(armor_key)
2851 fast_response = self.check_fx_fast_data(
2853 rep_pa_dict[PADATA_FX_FAST],
2855 expect_strengthen_key=False)
2857 rep_padata = fast_response['padata']
2859 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2864 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2868 def check_rep_padata(self,
2873 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2875 req_body = kdc_exchange_dict['req_body']
2876 proposed_etypes = req_body['etype']
2877 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2879 sent_fast = self.sent_fast(kdc_exchange_dict)
2880 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2882 if rep_msg_type == KRB_TGS_REP:
2883 self.assertTrue(sent_fast)
2885 expect_etype_info2 = ()
2886 expect_etype_info = False
2887 expected_aes_type = 0
2888 expected_rc4_type = 0
2889 if kcrypto.Enctype.RC4 in proposed_etypes:
2890 expect_etype_info = True
2891 for etype in proposed_etypes:
2892 if etype not in client_as_etypes:
2894 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2895 expect_etype_info = False
2896 if etype > expected_aes_type:
2897 expected_aes_type = etype
2898 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2899 if etype > expected_rc4_type:
2900 expected_rc4_type = etype
2902 if expected_aes_type != 0:
2903 expect_etype_info2 += (expected_aes_type,)
2904 if expected_rc4_type != 0:
2905 expect_etype_info2 += (expected_rc4_type,)
2907 expected_patypes = ()
2908 if sent_fast and error_code != 0:
2909 expected_patypes += (PADATA_FX_ERROR,)
2910 expected_patypes += (PADATA_FX_COOKIE,)
2912 if rep_msg_type == KRB_TGS_REP:
2913 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2914 if ('1' in sent_pac_options
2915 and error_code not in (0, KDC_ERR_GENERIC)):
2916 expected_patypes += (PADATA_PAC_OPTIONS,)
2917 elif error_code != KDC_ERR_GENERIC:
2918 if expect_etype_info:
2919 self.assertGreater(len(expect_etype_info2), 0)
2920 expected_patypes += (PADATA_ETYPE_INFO,)
2921 if len(expect_etype_info2) != 0:
2922 expected_patypes += (PADATA_ETYPE_INFO2,)
2924 if error_code != KDC_ERR_PREAUTH_FAILED:
2926 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2928 expected_patypes += (PADATA_ENC_TIMESTAMP,)
2930 if not sent_enc_challenge:
2931 expected_patypes += (PADATA_PK_AS_REQ,)
2932 expected_patypes += (PADATA_PK_AS_REP_19,)
2934 if (self.kdc_fast_support
2936 and not sent_enc_challenge):
2937 expected_patypes += (PADATA_FX_FAST,)
2938 expected_patypes += (PADATA_FX_COOKIE,)
2940 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
2941 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
2942 require_strict={PADATA_FX_COOKIE,
2945 PADATA_PK_AS_REP_19,
2948 if not expected_patypes:
2951 pa_dict = self.get_pa_dict(rep_padata)
2953 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
2954 if enc_timestamp is not None:
2955 self.assertEqual(len(enc_timestamp), 0)
2957 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
2958 if pk_as_req is not None:
2959 self.assertEqual(len(pk_as_req), 0)
2961 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
2962 if pk_as_rep19 is not None:
2963 self.assertEqual(len(pk_as_rep19), 0)
2965 fx_fast = pa_dict.get(PADATA_FX_FAST)
2966 if fx_fast is not None:
2967 self.assertEqual(len(fx_fast), 0)
2969 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
2970 if fast_cookie is not None:
2971 kdc_exchange_dict['fast_cookie'] = fast_cookie
2973 fast_error = pa_dict.get(PADATA_FX_ERROR)
2974 if fast_error is not None:
2975 fast_error = self.der_decode(fast_error,
2976 asn1Spec=krb5_asn1.KRB_ERROR())
2977 self.generic_check_kdc_error(kdc_exchange_dict,
2982 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
2983 if pac_options is not None:
2984 pac_options = self.der_decode(
2986 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2987 self.assertElementEqual(pac_options, 'options', sent_pac_options)
2989 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
2990 if enc_challenge is not None:
2991 if not sent_enc_challenge:
2992 self.assertEqual(len(enc_challenge), 0)
2994 armor_key = kdc_exchange_dict['armor_key']
2995 self.assertIsNotNone(armor_key)
2997 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2999 kdc_challenge_key = self.generate_kdc_challenge_key(
3000 armor_key, preauth_key)
3002 # Ensure that the encrypted challenge FAST factor is supported
3004 if self.strict_checking:
3005 self.assertNotEqual(len(enc_challenge), 0)
3006 if len(enc_challenge) != 0:
3007 encrypted_challenge = self.der_decode(
3009 asn1Spec=krb5_asn1.EncryptedData())
3010 self.assertEqual(encrypted_challenge['etype'],
3011 kdc_challenge_key.etype)
3013 challenge = kdc_challenge_key.decrypt(
3014 KU_ENC_CHALLENGE_KDC,
3015 encrypted_challenge['cipher'])
3016 challenge = self.der_decode(
3018 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
3020 # Retrieve the returned timestamp.
3021 rep_patime = challenge['patimestamp']
3022 self.assertIn('pausec', challenge)
3024 # Ensure the returned time is within five minutes of the
3026 rep_time = self.get_EpochFromKerberosTime(rep_patime)
3027 current_time = time.time()
3029 self.assertLess(current_time - 300, rep_time)
3030 self.assertLess(rep_time, current_time + 300)
3032 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
3033 if etype_info2 is not None:
3034 etype_info2 = self.der_decode(etype_info2,
3035 asn1Spec=krb5_asn1.ETYPE_INFO2())
3036 self.assertGreaterEqual(len(etype_info2), 1)
3037 if self.strict_checking:
3038 self.assertEqual(len(etype_info2), len(expect_etype_info2))
3039 for i in range(0, len(etype_info2)):
3040 e = self.getElementValue(etype_info2[i], 'etype')
3041 if self.strict_checking:
3042 self.assertEqual(e, expect_etype_info2[i])
3043 salt = self.getElementValue(etype_info2[i], 'salt')
3044 if e == kcrypto.Enctype.RC4:
3045 if self.strict_checking:
3046 self.assertIsNone(salt)
3048 self.assertIsNotNone(salt)
3049 expected_salt = kdc_exchange_dict['expected_salt']
3050 if expected_salt is not None:
3051 self.assertEqual(salt, expected_salt)
3052 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
3053 if self.strict_checking:
3054 self.assertIsNone(s2kparams)
3056 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
3057 if etype_info is not None:
3058 etype_info = self.der_decode(etype_info,
3059 asn1Spec=krb5_asn1.ETYPE_INFO())
3060 self.assertEqual(len(etype_info), 1)
3061 e = self.getElementValue(etype_info[0], 'etype')
3062 self.assertEqual(e, kcrypto.Enctype.RC4)
3063 self.assertEqual(e, expect_etype_info2[0])
3064 salt = self.getElementValue(etype_info[0], 'salt')
3065 if self.strict_checking:
3066 self.assertIsNotNone(salt)
3067 self.assertEqual(len(salt), 0)
3071 def generate_simple_fast(self,
3079 armor_key = kdc_exchange_dict['armor_key']
3081 fast_req = self.KRB_FAST_REQ_create(fast_options,
3084 fast_req = self.der_encode(fast_req,
3085 asn1Spec=krb5_asn1.KrbFastReq())
3086 fast_req = self.EncryptedData_create(armor_key,
3090 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
3094 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
3095 fx_fast_request = self.der_encode(
3097 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
3099 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
3104 def generate_ap_req(self,
3110 tgt = kdc_exchange_dict['armor_tgt']
3111 authenticator_subkey = kdc_exchange_dict['armor_subkey']
3113 req_body_checksum = None
3115 tgt = kdc_exchange_dict['tgt']
3116 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3117 body_checksum_type = kdc_exchange_dict['body_checksum_type']
3119 req_body_blob = self.der_encode(req_body,
3120 asn1Spec=krb5_asn1.KDC_REQ_BODY())
3122 req_body_checksum = self.Checksum_create(tgt.session_key,
3123 KU_TGS_REQ_AUTH_CKSUM,
3125 ctype=body_checksum_type)
3127 auth_data = kdc_exchange_dict['auth_data']
3130 if authenticator_subkey is not None:
3131 subkey_obj = authenticator_subkey.export_obj()
3132 seq_number = random.randint(0, 0xfffffffe)
3133 (ctime, cusec) = self.get_KerberosTimeWithUsec()
3134 authenticator_obj = self.Authenticator_create(
3137 cksum=req_body_checksum,
3141 seq_number=seq_number,
3142 authorization_data=auth_data)
3143 authenticator_blob = self.der_encode(
3145 asn1Spec=krb5_asn1.Authenticator())
3147 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
3148 authenticator = self.EncryptedData_create(tgt.session_key,
3152 ap_options = krb5_asn1.APOptions('0')
3153 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
3155 authenticator=authenticator)
3156 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
3160 def generate_simple_tgs_padata(self,
3164 ap_req = self.generate_ap_req(kdc_exchange_dict,
3168 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
3169 padata = [pa_tgs_req]
3171 return padata, req_body
3173 def get_preauth_key(self, kdc_exchange_dict):
3174 msg_type = kdc_exchange_dict['rep_msg_type']
3176 if msg_type == KRB_AS_REP:
3177 key = kdc_exchange_dict['preauth_key']
3178 usage = KU_AS_REP_ENC_PART
3180 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3181 if authenticator_subkey is not None:
3182 key = authenticator_subkey
3183 usage = KU_TGS_REP_ENC_PART_SUB_KEY
3185 tgt = kdc_exchange_dict['tgt']
3186 key = tgt.session_key
3187 usage = KU_TGS_REP_ENC_PART_SESSION
3189 self.assertIsNotNone(key)
3193 def generate_armor_key(self, subkey, session_key):
3194 armor_key = kcrypto.cf2(subkey.key,
3198 armor_key = Krb5EncryptionKey(armor_key, None)
3202 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3203 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3207 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3210 return strengthen_reply_key
3212 def generate_client_challenge_key(self, armor_key, longterm_key):
3213 client_challenge_key = kcrypto.cf2(armor_key.key,
3215 b'clientchallengearmor',
3216 b'challengelongterm')
3217 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3219 return client_challenge_key
3221 def generate_kdc_challenge_key(self, armor_key, longterm_key):
3222 kdc_challenge_key = kcrypto.cf2(armor_key.key,
3224 b'kdcchallengearmor',
3225 b'challengelongterm')
3226 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3228 return kdc_challenge_key
3230 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3231 expected_type = expected_checksum['cksumtype']
3232 self.assertEqual(armor_key.ctype, expected_type)
3234 ticket_blob = self.der_encode(ticket,
3235 asn1Spec=krb5_asn1.Ticket())
3236 checksum = self.Checksum_create(armor_key,
3239 self.assertEqual(expected_checksum, checksum)
3241 def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
3243 expect_ticket_checksum=True):
3244 # Decrypt the ticket.
3246 key = ticket.decryption_key
3247 enc_part = ticket.ticket['enc-part']
3249 self.assertElementEqual(enc_part, 'etype', key.etype)
3250 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3252 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3253 enc_part = self.der_decode(
3254 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3256 # Fetch the authorization data from the ticket.
3257 auth_data = enc_part.get('authorization-data')
3259 self.assertIsNotNone(auth_data)
3260 elif auth_data is None:
3263 # Get a copy of the authdata with an empty PAC, and the existing PAC
3265 empty_pac = self.get_empty_pac()
3266 auth_data, pac_data = self.replace_pac(auth_data,
3268 expect_pac=expect_pac)
3272 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3273 # raw type to create a new PAC with zeroed signatures for
3274 # verification. This is because on Windows, the resource_groups field
3275 # is added to PAC_LOGON_INFO after the info3 field has been created,
3276 # which results in a different ordering of pointer values than Samba
3277 # (see commit 0e201ecdc53). Using the raw type avoids changing
3278 # PAC_LOGON_INFO, so verification against Windows can work. We still
3279 # need the PAC_DATA type to retrieve the actual checksums, because the
3280 # signatures in the raw type may contain padding bytes.
3281 pac = ndr_unpack(krb5pac.PAC_DATA,
3283 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3288 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3289 buffer_type = pac_buffer.type
3290 if buffer_type in self.pac_checksum_types:
3291 self.assertNotIn(buffer_type, checksums,
3292 f'Duplicate checksum type {buffer_type}')
3294 # Fetch the checksum and the checksum type from the PAC buffer.
3295 checksum = pac_buffer.info.signature
3296 ctype = pac_buffer.info.type
3300 checksums[buffer_type] = checksum, ctype
3302 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3303 # Zero the checksum field so that we can later verify the
3304 # checksums. The ticket checksum field is not zeroed.
3306 signature = ndr_unpack(
3307 krb5pac.PAC_SIGNATURE_DATA,
3308 raw_pac_buffer.info.remaining)
3309 signature.signature = bytes(len(checksum))
3310 raw_pac_buffer.info.remaining = ndr_pack(
3313 # Re-encode the PAC.
3314 pac_data = ndr_pack(raw_pac)
3316 # Verify the signatures.
3318 server_checksum, server_ctype = checksums[
3319 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3320 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3325 kdc_checksum, kdc_ctype = checksums[
3326 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3328 if isinstance(krbtgt_keys, collections.abc.Container):
3329 if self.strict_checking:
3330 krbtgt_key = krbtgt_keys[0]
3332 krbtgt_key = next(key for key in krbtgt_keys
3333 if key.ctype == kdc_ctype)
3335 krbtgt_key = krbtgt_keys
3337 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3342 if not service_ticket:
3343 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3345 ticket_checksum, ticket_ctype = checksums.get(
3346 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3348 if expect_ticket_checksum:
3349 self.assertIsNotNone(ticket_checksum)
3350 elif expect_ticket_checksum is False:
3351 self.assertIsNone(ticket_checksum)
3352 if ticket_checksum is not None:
3353 enc_part['authorization-data'] = auth_data
3354 enc_part = self.der_encode(enc_part,
3355 asn1Spec=krb5_asn1.EncTicketPart())
3357 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3362 def modified_ticket(self,
3364 new_ticket_key=None,
3368 allow_empty_authdata=False,
3369 update_pac_checksums=True,
3371 include_checksums=None):
3372 if checksum_keys is None:
3373 # A dict containing a key for each checksum type to be created in
3377 if include_checksums is None:
3378 # A dict containing a value for each checksum type; True if the
3379 # checksum type is to be included in the PAC, False if it is to be
3380 # excluded, or None/not present if the checksum is to be included
3381 # based on its presence in the original PAC.
3382 include_checksums = {}
3384 # Check that the values passed in by the caller make sense.
3386 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3387 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3390 self.assertIsNone(modify_pac_fn)
3392 update_pac_checksums = False
3394 if not update_pac_checksums:
3395 self.assertFalse(checksum_keys)
3396 self.assertFalse(include_checksums)
3398 expect_pac = modify_pac_fn is not None
3400 key = ticket.decryption_key
3402 if new_ticket_key is None:
3403 # Use the same key to re-encrypt the ticket.
3404 new_ticket_key = key
3406 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3407 # If the server signature key is not present, fall back to the key
3408 # used to encrypt the ticket.
3409 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3411 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3412 # If the ticket signature key is not present, fall back to the key
3413 # used for the KDC signature.
3414 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3415 if kdc_checksum_key is not None:
3416 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3419 # Decrypt the ticket.
3421 enc_part = ticket.ticket['enc-part']
3423 self.assertElementEqual(enc_part, 'etype', key.etype)
3424 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3426 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3427 enc_part = self.der_decode(
3428 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3430 # Modify the ticket here.
3431 if modify_fn is not None:
3432 enc_part = modify_fn(enc_part)
3434 auth_data = enc_part.get('authorization-data')
3436 self.assertIsNotNone(auth_data)
3437 if auth_data is not None:
3440 # Get a copy of the authdata with an empty PAC, and the
3441 # existing PAC (if present).
3442 empty_pac = self.get_empty_pac()
3443 empty_pac_auth_data, pac_data = self.replace_pac(
3446 expect_pac=expect_pac)
3448 if pac_data is not None:
3449 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3451 # Modify the PAC here.
3452 if modify_pac_fn is not None:
3453 pac = modify_pac_fn(pac)
3455 if update_pac_checksums:
3456 # Get the enc-part with an empty PAC, which is needed
3457 # to create a ticket signature.
3458 enc_part_to_sign = enc_part.copy()
3459 enc_part_to_sign['authorization-data'] = (
3460 empty_pac_auth_data)
3461 enc_part_to_sign = self.der_encode(
3463 asn1Spec=krb5_asn1.EncTicketPart())
3465 self.update_pac_checksums(pac,
3470 # Re-encode the PAC.
3471 pac_data = ndr_pack(pac)
3472 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3475 # Replace the PAC in the authorization data and re-add it to the
3477 auth_data, _ = self.replace_pac(
3479 expect_pac=expect_pac,
3480 allow_empty_authdata=allow_empty_authdata)
3481 enc_part['authorization-data'] = auth_data
3483 # Re-encrypt the ticket enc-part with the new key.
3484 enc_part_new = self.der_encode(enc_part,
3485 asn1Spec=krb5_asn1.EncTicketPart())
3486 enc_part_new = self.EncryptedData_create(new_ticket_key,
3490 # Create a copy of the ticket with the new enc-part.
3491 new_ticket = ticket.ticket.copy()
3492 new_ticket['enc-part'] = enc_part_new
3494 new_ticket_creds = KerberosTicketCreds(
3496 session_key=ticket.session_key,
3497 crealm=ticket.crealm,
3499 srealm=ticket.srealm,
3501 decryption_key=new_ticket_key,
3502 ticket_private=enc_part,
3503 encpart_private=ticket.encpart_private)
3505 return new_ticket_creds
3507 def update_pac_checksums(self,
3512 pac_buffers = pac.buffers
3513 checksum_buffers = {}
3515 # Find the relevant PAC checksum buffers.
3516 for pac_buffer in pac_buffers:
3517 buffer_type = pac_buffer.type
3518 if buffer_type in self.pac_checksum_types:
3519 self.assertNotIn(buffer_type, checksum_buffers,
3520 f'Duplicate checksum type {buffer_type}')
3522 checksum_buffers[buffer_type] = pac_buffer
3524 # Create any additional buffers that were requested but not
3525 # present. Conversely, remove any buffers that were requested to be
3527 for buffer_type in self.pac_checksum_types:
3528 if buffer_type in checksum_buffers:
3529 if include_checksums.get(buffer_type) is False:
3530 checksum_buffer = checksum_buffers.pop(buffer_type)
3532 pac.num_buffers -= 1
3533 pac_buffers.remove(checksum_buffer)
3535 elif include_checksums.get(buffer_type) is True:
3536 info = krb5pac.PAC_SIGNATURE_DATA()
3538 checksum_buffer = krb5pac.PAC_BUFFER()
3539 checksum_buffer.type = buffer_type
3540 checksum_buffer.info = info
3542 pac_buffers.append(checksum_buffer)
3543 pac.num_buffers += 1
3545 checksum_buffers[buffer_type] = checksum_buffer
3547 # Fill the relevant checksum buffers.
3548 for buffer_type, checksum_buffer in checksum_buffers.items():
3549 checksum_key = checksum_keys[buffer_type]
3550 ctype = checksum_key.ctype & ((1 << 32) - 1)
3552 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3553 self.assertIsNotNone(enc_part)
3555 signature = checksum_key.make_rodc_checksum(
3556 KU_NON_KERB_CKSUM_SALT,
3559 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3560 signature = checksum_key.make_zeroed_checksum()
3563 signature = checksum_key.make_rodc_zeroed_checksum()
3565 checksum_buffer.info.signature = signature
3566 checksum_buffer.info.type = ctype
3568 # Add the new checksum buffers to the PAC.
3569 pac.buffers = pac_buffers
3571 # Calculate the server and KDC checksums and insert them into the PAC.
3573 server_checksum_buffer = checksum_buffers.get(
3574 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3575 if server_checksum_buffer is not None:
3576 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3578 pac_data = ndr_pack(pac)
3579 server_checksum = server_checksum_key.make_checksum(
3580 KU_NON_KERB_CKSUM_SALT,
3583 server_checksum_buffer.info.signature = server_checksum
3585 kdc_checksum_buffer = checksum_buffers.get(
3586 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3587 if kdc_checksum_buffer is not None:
3588 if server_checksum_buffer is None:
3589 # There's no server signature to make the checksum over, so
3590 # just make the checksum over an empty bytes object.
3591 server_checksum = bytes()
3593 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3595 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3596 KU_NON_KERB_CKSUM_SALT,
3599 kdc_checksum_buffer.info.signature = kdc_checksum
3601 def replace_pac(self, auth_data, new_pac, expect_pac=True,
3602 allow_empty_authdata=False):
3603 if new_pac is not None:
3604 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3605 self.assertElementPresent(new_pac, 'ad-data')
3612 for authdata_elem in auth_data:
3613 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3614 ad_relevant = self.der_decode(
3615 authdata_elem['ad-data'],
3616 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3619 for relevant_elem in ad_relevant:
3620 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3621 self.assertIsNone(old_pac, 'Multiple PACs detected')
3622 old_pac = relevant_elem['ad-data']
3624 if new_pac is not None:
3625 relevant_elems.append(new_pac)
3627 relevant_elems.append(relevant_elem)
3629 self.assertIsNotNone(old_pac, 'Expected PAC')
3631 if relevant_elems or allow_empty_authdata:
3632 ad_relevant = self.der_encode(
3634 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3636 authdata_elem = self.AuthorizationData_create(
3640 authdata_elem = None
3642 if authdata_elem is not None or allow_empty_authdata:
3643 new_auth_data.append(authdata_elem)
3646 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3648 return new_auth_data, old_pac
3650 def get_pac(self, auth_data, expect_pac=True):
3651 _, pac = self.replace_pac(auth_data, None, expect_pac)
3654 def get_ticket_pac(self, ticket, expect_pac=True):
3655 auth_data = ticket.ticket_private.get('authorization-data')
3657 self.assertIsNotNone(auth_data)
3658 elif auth_data is None:
3661 return self.get_pac(auth_data, expect_pac=expect_pac)
3663 def get_krbtgt_checksum_key(self):
3664 krbtgt_creds = self.get_krbtgt_creds()
3665 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3668 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3671 def is_tgs(self, principal):
3672 name = principal['name-string'][0]
3673 return name in ('krbtgt', b'krbtgt')
3675 def is_tgt(self, ticket):
3676 sname = ticket.ticket['sname']
3677 return self.is_tgs(sname)
3679 def get_empty_pac(self):
3680 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3682 def get_outer_pa_dict(self, kdc_exchange_dict):
3683 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3685 def get_fast_pa_dict(self, kdc_exchange_dict):
3686 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3691 return self.get_outer_pa_dict(kdc_exchange_dict)
3693 def sent_fast(self, kdc_exchange_dict):
3694 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3696 return PADATA_FX_FAST in outer_pa_dict
3698 def sent_enc_challenge(self, kdc_exchange_dict):
3699 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3701 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3703 def get_sent_pac_options(self, kdc_exchange_dict):
3704 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3706 if PADATA_PAC_OPTIONS not in fast_pa_dict:
3709 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3710 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3711 pac_options = pac_options['options']
3713 # Mask out unsupported bits.
3714 pac_options, remaining = pac_options[:4], pac_options[4:]
3715 pac_options += '0' * len(remaining)
3719 def get_krbtgt_sname(self):
3720 krbtgt_creds = self.get_krbtgt_creds()
3721 krbtgt_username = krbtgt_creds.get_username()
3722 krbtgt_realm = krbtgt_creds.get_realm()
3723 krbtgt_sname = self.PrincipalName_create(
3724 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3728 def _test_as_exchange(self,
3734 expected_error_mode,
3743 expected_account_name=None,
3744 expected_upn_name=None,
3746 expected_flags=None,
3747 unexpected_flags=None,
3748 expected_supported_etypes=None,
3750 ticket_decryption_key=None,
3754 expect_pac_attrs=None,
3755 expect_pac_attrs_pac_request=None,
3756 expect_requester_sid=None,
3759 def _generate_padata_copy(_kdc_exchange_dict,
3762 return padata, req_body
3764 if not expected_error_mode:
3765 check_error_fn = None
3766 check_rep_fn = self.generic_check_kdc_rep
3768 check_error_fn = self.generic_check_kdc_error
3771 if padata is not None:
3772 generate_padata_fn = _generate_padata_copy
3774 generate_padata_fn = None
3776 kdc_exchange_dict = self.as_exchange_dict(
3777 expected_crealm=expected_crealm,
3778 expected_cname=expected_cname,
3779 expected_srealm=expected_srealm,
3780 expected_sname=expected_sname,
3781 expected_account_name=expected_account_name,
3782 expected_upn_name=expected_upn_name,
3783 expected_sid=expected_sid,
3784 expected_supported_etypes=expected_supported_etypes,
3785 ticket_decryption_key=ticket_decryption_key,
3786 generate_padata_fn=generate_padata_fn,
3787 check_error_fn=check_error_fn,
3788 check_rep_fn=check_rep_fn,
3789 check_kdc_private_fn=self.generic_check_kdc_private,
3790 expected_error_mode=expected_error_mode,
3791 client_as_etypes=client_as_etypes,
3792 expected_salt=expected_salt,
3793 expected_flags=expected_flags,
3794 unexpected_flags=unexpected_flags,
3795 preauth_key=preauth_key,
3796 kdc_options=str(kdc_options),
3797 pac_request=pac_request,
3798 pac_options=pac_options,
3799 expect_pac=expect_pac,
3800 expect_pac_attrs=expect_pac_attrs,
3801 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3802 expect_requester_sid=expect_requester_sid,
3805 rep = self._generic_kdc_exchange(kdc_exchange_dict,
3812 return rep, kdc_exchange_dict