1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
31 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
32 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
33 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
34 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
36 from pyasn1.codec.ber.encoder import BitStringEncoder
38 from pyasn1.error import PyAsn1Error
40 from samba.credentials import Credentials
41 from samba.dcerpc import krb5pac, security
42 from samba.gensec import FEATURE_SEAL
43 from samba.ndr import ndr_pack, ndr_unpack
46 from samba.tests import TestCaseInTempDir
48 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
49 from samba.tests.krb5.rfc4120_constants import (
52 FX_FAST_ARMOR_AP_REQUEST,
54 KDC_ERR_PREAUTH_FAILED,
55 KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
56 KERB_ERR_TYPE_EXTENDED,
74 KU_NON_KERB_CKSUM_SALT,
75 KU_TGS_REP_ENC_PART_SESSION,
76 KU_TGS_REP_ENC_PART_SUB_KEY,
78 KU_TGS_REQ_AUTH_CKSUM,
79 KU_TGS_REQ_AUTH_DAT_SESSION,
80 KU_TGS_REQ_AUTH_DAT_SUBKEY,
85 PADATA_ENCRYPTED_CHALLENGE,
98 PADATA_SUPPORTED_ETYPES
100 import samba.tests.krb5.kcrypto as kcrypto
103 def BitStringEncoder_encodeValue32(
104 self, value, asn1Spec, encodeFun, **options):
106 # BitStrings like KDCOptions or TicketFlags should at least
107 # be 32-Bit on the wire
109 if asn1Spec is not None:
110 # TODO: try to avoid ASN.1 schema instantiation
111 value = asn1Spec.clone(value)
113 valueLength = len(value)
115 alignedValue = value << (8 - valueLength % 8)
119 substrate = alignedValue.asOctets()
120 length = len(substrate)
121 # We need at least 32-Bit / 4-Bytes
126 ret = b'\x00' + substrate + (b'\x00' * padding)
127 return ret, False, True
130 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
133 def BitString_NamedValues_prettyPrint(self, scope=0):
134 ret = "%s" % self.asBinary()
137 for byte in self.asNumbers():
138 for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
145 if len(bits) < highest_bit:
146 for bitPosition in range(len(bits), highest_bit):
149 delim = ": (\n%s " % indent
150 for bitPosition in range(highest_bit):
151 if bitPosition in self.prettyPrintNamedValues:
152 name = self.prettyPrintNamedValues[bitPosition]
153 elif bits[bitPosition] != 0:
154 name = "unknown-bit-%u" % bitPosition
157 ret += "%s%s:%u" % (delim, name, bits[bitPosition])
158 delim = ",\n%s " % indent
159 ret += "\n%s)" % indent
163 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
164 krb5_asn1.TicketFlagsValues.namedValues
165 krb5_asn1.TicketFlags.namedValues =\
166 krb5_asn1.TicketFlagsValues.namedValues
167 krb5_asn1.TicketFlags.prettyPrint =\
168 BitString_NamedValues_prettyPrint
169 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
170 krb5_asn1.KDCOptionsValues.namedValues
171 krb5_asn1.KDCOptions.namedValues =\
172 krb5_asn1.KDCOptionsValues.namedValues
173 krb5_asn1.KDCOptions.prettyPrint =\
174 BitString_NamedValues_prettyPrint
175 krb5_asn1.APOptions.prettyPrintNamedValues =\
176 krb5_asn1.APOptionsValues.namedValues
177 krb5_asn1.APOptions.namedValues =\
178 krb5_asn1.APOptionsValues.namedValues
179 krb5_asn1.APOptions.prettyPrint =\
180 BitString_NamedValues_prettyPrint
181 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
182 krb5_asn1.PACOptionFlagsValues.namedValues
183 krb5_asn1.PACOptionFlags.namedValues =\
184 krb5_asn1.PACOptionFlagsValues.namedValues
185 krb5_asn1.PACOptionFlags.prettyPrint =\
186 BitString_NamedValues_prettyPrint
189 def Integer_NamedValues_prettyPrint(self, scope=0):
191 if intval in self.prettyPrintNamedValues:
192 name = self.prettyPrintNamedValues[intval]
194 name = "<__unknown__>"
195 ret = "%d (0x%x) %s" % (intval, intval, name)
199 krb5_asn1.NameType.prettyPrintNamedValues =\
200 krb5_asn1.NameTypeValues.namedValues
201 krb5_asn1.NameType.prettyPrint =\
202 Integer_NamedValues_prettyPrint
203 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
204 krb5_asn1.AuthDataTypeValues.namedValues
205 krb5_asn1.AuthDataType.prettyPrint =\
206 Integer_NamedValues_prettyPrint
207 krb5_asn1.PADataType.prettyPrintNamedValues =\
208 krb5_asn1.PADataTypeValues.namedValues
209 krb5_asn1.PADataType.prettyPrint =\
210 Integer_NamedValues_prettyPrint
211 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
212 krb5_asn1.EncryptionTypeValues.namedValues
213 krb5_asn1.EncryptionType.prettyPrint =\
214 Integer_NamedValues_prettyPrint
215 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
216 krb5_asn1.ChecksumTypeValues.namedValues
217 krb5_asn1.ChecksumType.prettyPrint =\
218 Integer_NamedValues_prettyPrint
219 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
220 krb5_asn1.KerbErrorDataTypeValues.namedValues
221 krb5_asn1.KerbErrorDataType.prettyPrint =\
222 Integer_NamedValues_prettyPrint
225 class Krb5EncryptionKey:
226 def __init__(self, key, kvno):
228 kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
229 kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
230 kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
233 self.etype = key.enctype
234 self.ctype = EncTypeChecksum[self.etype]
237 def encrypt(self, usage, plaintext):
238 ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
241 def decrypt(self, usage, ciphertext):
242 plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
245 def make_zeroed_checksum(self, ctype=None):
249 checksum_len = kcrypto.checksum_len(ctype)
250 return bytes(checksum_len)
252 def make_checksum(self, usage, plaintext, ctype=None):
255 cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
258 def verify_checksum(self, usage, plaintext, ctype, cksum):
259 if self.ctype != ctype:
260 raise AssertionError(f'key checksum type ({self.ctype}) != '
261 f'checksum type ({ctype})')
263 kcrypto.verify_checksum(ctype,
269 def export_obj(self):
270 EncryptionKey_obj = {
271 'keytype': self.etype,
272 'keyvalue': self.key.contents,
274 return EncryptionKey_obj
277 class RodcPacEncryptionKey(Krb5EncryptionKey):
278 def __init__(self, key, kvno, rodc_id=None):
279 super().__init__(key, kvno)
285 kvno &= (1 << 16) - 1
287 rodc_id = kvno or None
289 if rodc_id is not None:
290 self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
294 def make_rodc_zeroed_checksum(self, ctype=None):
295 checksum = super().make_zeroed_checksum(ctype)
296 return checksum + bytes(len(self.rodc_id))
298 def make_rodc_checksum(self, usage, plaintext, ctype=None):
299 checksum = super().make_checksum(usage, plaintext, ctype)
300 return checksum + self.rodc_id
302 def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
304 cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
306 if self.rodc_id != cksum_rodc_id:
307 raise AssertionError(f'{self.rodc_id.hex()} != '
308 f'{cksum_rodc_id.hex()}')
310 super().verify_checksum(usage,
316 class ZeroedChecksumKey(RodcPacEncryptionKey):
317 def make_checksum(self, usage, plaintext, ctype=None):
318 return self.make_zeroed_checksum(ctype)
320 def make_rodc_checksum(self, usage, plaintext, ctype=None):
321 return self.make_rodc_zeroed_checksum(ctype)
324 class WrongLengthChecksumKey(RodcPacEncryptionKey):
325 def __init__(self, key, kvno, length):
326 super().__init__(key, kvno)
328 self._length = length
331 def _adjust_to_length(cls, checksum, length):
332 diff = length - len(checksum)
334 checksum += bytes(diff)
336 checksum = checksum[:length]
340 def make_zeroed_checksum(self, ctype=None):
341 return bytes(self._length)
343 def make_checksum(self, usage, plaintext, ctype=None):
344 checksum = super().make_checksum(usage, plaintext, ctype)
345 return self._adjust_to_length(checksum, self._length)
347 def make_rodc_zeroed_checksum(self, ctype=None):
348 return bytes(self._length)
350 def make_rodc_checksum(self, usage, plaintext, ctype=None):
351 checksum = super().make_rodc_checksum(usage, plaintext, ctype)
352 return self._adjust_to_length(checksum, self._length)
355 class KerberosCredentials(Credentials):
357 fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
358 security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
359 security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
362 super(KerberosCredentials, self).__init__()
364 all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
365 all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
366 all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
368 self.as_supported_enctypes = all_enc_types
369 self.tgs_supported_enctypes = all_enc_types
370 self.ap_supported_enctypes = all_enc_types
373 self.forced_keys = {}
375 self.forced_salt = None
381 def set_as_supported_enctypes(self, value):
382 self.as_supported_enctypes = int(value)
384 def set_tgs_supported_enctypes(self, value):
385 self.tgs_supported_enctypes = int(value)
387 def set_ap_supported_enctypes(self, value):
388 self.ap_supported_enctypes = int(value)
390 etype_map = collections.OrderedDict([
391 (kcrypto.Enctype.AES256,
392 security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
393 (kcrypto.Enctype.AES128,
394 security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
395 (kcrypto.Enctype.RC4,
396 security.KERB_ENCTYPE_RC4_HMAC_MD5),
397 (kcrypto.Enctype.DES_MD5,
398 security.KERB_ENCTYPE_DES_CBC_MD5),
399 (kcrypto.Enctype.DES_CRC,
400 security.KERB_ENCTYPE_DES_CBC_CRC)
404 def etypes_to_bits(cls, etypes):
407 bit = cls.etype_map[etype]
409 raise ValueError(f'Got duplicate etype: {etype}')
415 def bits_to_etypes(cls, bits):
417 for etype, bit in cls.etype_map.items():
422 bits &= ~cls.fast_supported_bits
424 raise ValueError(f'Unsupported etype bits: {bits}')
428 def get_as_krb5_etypes(self):
429 return self.bits_to_etypes(self.as_supported_enctypes)
431 def get_tgs_krb5_etypes(self):
432 return self.bits_to_etypes(self.tgs_supported_enctypes)
434 def get_ap_krb5_etypes(self):
435 return self.bits_to_etypes(self.ap_supported_enctypes)
437 def set_kvno(self, kvno):
438 # Sign-extend from 32 bits.
446 def set_forced_key(self, etype, hexkey):
448 contents = binascii.a2b_hex(hexkey)
449 key = kcrypto.Key(etype, contents)
450 self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
452 def get_forced_key(self, etype):
454 return self.forced_keys.get(etype)
456 def set_forced_salt(self, salt):
457 self.forced_salt = bytes(salt)
459 def get_forced_salt(self):
460 return self.forced_salt
463 if self.forced_salt is not None:
464 return self.forced_salt
468 salt_name = upn.rsplit('@', 1)[0].replace('/', '')
470 salt_name = self.get_username()
472 if self.get_workstation():
473 salt_name = self.get_username().lower()
474 if salt_name[-1] == '$':
475 salt_name = salt_name[:-1]
476 salt_string = '%shost%s.%s' % (
477 self.get_realm().upper(),
479 self.get_realm().lower())
481 salt_string = self.get_realm().upper() + salt_name
483 return salt_string.encode('utf-8')
485 def set_dn(self, dn):
491 def set_spn(self, spn):
497 def set_upn(self, upn):
503 def update_password(self, password):
504 self.set_password(password)
505 self.set_kvno(self.get_kvno() + 1)
508 class KerberosTicketCreds:
509 def __init__(self, ticket, session_key,
510 crealm=None, cname=None,
511 srealm=None, sname=None,
514 encpart_private=None):
516 self.session_key = session_key
521 self.decryption_key = decryption_key
522 self.ticket_private = ticket_private
523 self.encpart_private = encpart_private
525 def set_sname(self, sname):
526 self.ticket['sname'] = sname
530 class RawKerberosTest(TestCaseInTempDir):
531 """A raw Kerberos Test case."""
533 class KpasswdMode(Enum):
537 pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
538 krb5pac.PAC_TYPE_KDC_CHECKSUM,
539 krb5pac.PAC_TYPE_TICKET_CHECKSUM}
542 {"value": -1111, "name": "dummy", },
543 {"value": kcrypto.Enctype.AES256, "name": "aes128", },
544 {"value": kcrypto.Enctype.AES128, "name": "aes256", },
545 {"value": kcrypto.Enctype.RC4, "name": "rc4", },
548 setup_etype_test_permutations_done = False
551 def setup_etype_test_permutations(cls):
552 if cls.setup_etype_test_permutations_done:
557 num_idxs = len(cls.etypes_to_test)
559 for num in range(1, num_idxs + 1):
560 chunk = list(itertools.permutations(range(num_idxs), num))
563 permutations.append(el)
565 for p in permutations:
569 n = cls.etypes_to_test[idx]["name"]
574 etypes += (cls.etypes_to_test[idx]["value"],)
576 r = {"name": name, "etypes": etypes, }
579 cls.etype_test_permutations = res
580 cls.setup_etype_test_permutations_done = True
583 def etype_test_permutation_name_idx(cls):
584 cls.setup_etype_test_permutations()
587 for e in cls.etype_test_permutations:
593 def etype_test_permutation_by_idx(self, idx):
594 e = self.etype_test_permutations[idx]
595 return (e['name'], e['etypes'])
601 cls.host = samba.tests.env_get_var_value('SERVER')
602 cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
604 # A dictionary containing credentials that have already been
608 kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
610 if kdc_fast_support is None:
611 kdc_fast_support = '0'
612 cls.kdc_fast_support = bool(int(kdc_fast_support))
614 tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
616 if tkt_sig_support is None:
617 tkt_sig_support = '0'
618 cls.tkt_sig_support = bool(int(tkt_sig_support))
620 expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
622 if expect_pac is None:
624 cls.expect_pac = bool(int(expect_pac))
626 expect_extra_pac_buffers = samba.tests.env_get_var_value(
627 'EXPECT_EXTRA_PAC_BUFFERS',
629 if expect_extra_pac_buffers is None:
630 expect_extra_pac_buffers = '1'
631 cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
635 self.do_asn1_print = False
636 self.do_hexdump = False
638 strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
640 if strict_checking is None:
641 strict_checking = '1'
642 self.strict_checking = bool(int(strict_checking))
646 self.unspecified_kvno = object()
649 self._disconnect("tearDown")
652 def _disconnect(self, reason):
658 sys.stderr.write("disconnect[%s]\n" % reason)
660 def _connect_tcp(self, host, port=None):
664 self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
665 socket.SOCK_STREAM, socket.SOL_TCP,
667 self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
668 self.s.settimeout(10)
669 self.s.connect(self.a[0][4])
677 def connect(self, host, port=None):
678 self.assertNotConnected()
679 self._connect_tcp(host, port)
681 sys.stderr.write("connected[%s]\n" % host)
683 def env_get_var(self, varname, prefix,
684 fallback_default=True,
685 allow_missing=False):
687 if prefix is not None:
688 allow_missing_prefix = allow_missing or fallback_default
689 val = samba.tests.env_get_var_value(
690 '%s_%s' % (prefix, varname),
691 allow_missing=allow_missing_prefix)
693 fallback_default = True
694 if val is None and fallback_default:
695 val = samba.tests.env_get_var_value(varname,
696 allow_missing=allow_missing)
699 def _get_krb5_creds_from_env(self, prefix,
700 default_username=None,
701 allow_missing_password=False,
702 allow_missing_keys=True,
703 require_strongest_key=False):
704 c = KerberosCredentials()
707 domain = self.env_get_var('DOMAIN', prefix)
708 realm = self.env_get_var('REALM', prefix)
709 allow_missing_username = default_username is not None
710 username = self.env_get_var('USERNAME', prefix,
711 fallback_default=False,
712 allow_missing=allow_missing_username)
714 username = default_username
715 password = self.env_get_var('PASSWORD', prefix,
716 fallback_default=False,
717 allow_missing=allow_missing_password)
720 c.set_username(username)
721 if password is not None:
722 c.set_password(password)
723 as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
724 prefix, allow_missing=True)
725 if as_supported_enctypes is not None:
726 c.set_as_supported_enctypes(as_supported_enctypes)
727 tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
728 prefix, allow_missing=True)
729 if tgs_supported_enctypes is not None:
730 c.set_tgs_supported_enctypes(tgs_supported_enctypes)
731 ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
732 prefix, allow_missing=True)
733 if ap_supported_enctypes is not None:
734 c.set_ap_supported_enctypes(ap_supported_enctypes)
736 if require_strongest_key:
737 kvno_allow_missing = False
739 aes256_allow_missing = False
741 aes256_allow_missing = True
743 kvno_allow_missing = allow_missing_keys
744 aes256_allow_missing = allow_missing_keys
745 kvno = self.env_get_var('KVNO', prefix,
746 fallback_default=False,
747 allow_missing=kvno_allow_missing)
749 c.set_kvno(int(kvno))
750 aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
751 fallback_default=False,
752 allow_missing=aes256_allow_missing)
753 if aes256_key is not None:
754 c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
755 aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
756 fallback_default=False,
758 if aes128_key is not None:
759 c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
760 rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
761 fallback_default=False, allow_missing=True)
762 if rc4_key is not None:
763 c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
765 if not allow_missing_keys:
766 self.assertTrue(c.forced_keys,
767 'Please supply %s encryption keys '
768 'in environment' % prefix)
772 def _get_krb5_creds(self,
774 default_username=None,
775 allow_missing_password=False,
776 allow_missing_keys=True,
777 require_strongest_key=False,
778 fallback_creds_fn=None):
779 if prefix in self.creds_dict:
780 return self.creds_dict[prefix]
782 # We don't have the credentials already
786 # Try to obtain them from the environment
787 creds = self._get_krb5_creds_from_env(
789 default_username=default_username,
790 allow_missing_password=allow_missing_password,
791 allow_missing_keys=allow_missing_keys,
792 require_strongest_key=require_strongest_key)
793 except Exception as err:
794 # An error occurred, so save it for later
797 self.assertIsNotNone(creds)
798 # Save the obtained credentials
799 self.creds_dict[prefix] = creds
802 if fallback_creds_fn is not None:
804 # Try to use the fallback method
805 creds = fallback_creds_fn()
806 except Exception as err:
807 print("ERROR FROM ENV: %r" % (env_err))
808 print("FALLBACK-FN: %s" % (fallback_creds_fn))
809 print("FALLBACK-ERROR: %r" % (err))
811 self.assertIsNotNone(creds)
812 # Save the obtained credentials
813 self.creds_dict[prefix] = creds
816 # Both methods failed, so raise the exception from the
820 def get_user_creds(self,
821 allow_missing_password=False,
822 allow_missing_keys=True):
823 c = self._get_krb5_creds(prefix=None,
824 allow_missing_password=allow_missing_password,
825 allow_missing_keys=allow_missing_keys)
828 def get_service_creds(self,
829 allow_missing_password=False,
830 allow_missing_keys=True):
831 c = self._get_krb5_creds(prefix='SERVICE',
832 allow_missing_password=allow_missing_password,
833 allow_missing_keys=allow_missing_keys)
836 def get_client_creds(self,
837 allow_missing_password=False,
838 allow_missing_keys=True):
839 c = self._get_krb5_creds(prefix='CLIENT',
840 allow_missing_password=allow_missing_password,
841 allow_missing_keys=allow_missing_keys)
844 def get_server_creds(self,
845 allow_missing_password=False,
846 allow_missing_keys=True):
847 c = self._get_krb5_creds(prefix='SERVER',
848 allow_missing_password=allow_missing_password,
849 allow_missing_keys=allow_missing_keys)
852 def get_admin_creds(self,
853 allow_missing_password=False,
854 allow_missing_keys=True):
855 c = self._get_krb5_creds(prefix='ADMIN',
856 allow_missing_password=allow_missing_password,
857 allow_missing_keys=allow_missing_keys)
858 c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
859 c.set_workstation('')
862 def get_rodc_krbtgt_creds(self,
864 require_strongest_key=False):
865 if require_strongest_key:
866 self.assertTrue(require_keys)
867 c = self._get_krb5_creds(prefix='RODC_KRBTGT',
868 allow_missing_password=True,
869 allow_missing_keys=not require_keys,
870 require_strongest_key=require_strongest_key)
873 def get_krbtgt_creds(self,
875 require_strongest_key=False):
876 if require_strongest_key:
877 self.assertTrue(require_keys)
878 c = self._get_krb5_creds(prefix='KRBTGT',
879 default_username='krbtgt',
880 allow_missing_password=True,
881 allow_missing_keys=not require_keys,
882 require_strongest_key=require_strongest_key)
885 def get_anon_creds(self):
890 def asn1_dump(self, name, obj, asn1_print=None):
891 if asn1_print is None:
892 asn1_print = self.do_asn1_print
895 sys.stderr.write("%s:\n%s" % (name, obj))
897 sys.stderr.write("%s" % (obj))
899 def hex_dump(self, name, blob, hexdump=None):
901 hexdump = self.do_hexdump
904 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
913 if asn1Spec is not None:
914 class_name = type(asn1Spec).__name__.split(':')[0]
916 class_name = "<None-asn1Spec>"
917 self.hex_dump(class_name, blob, hexdump=hexdump)
918 obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
919 self.asn1_dump(None, obj, asn1_print=asn1_print)
921 obj = pyasn1_native_encode(obj)
932 obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
933 class_name = type(obj).__name__.split(':')[0]
934 if class_name is not None:
935 self.asn1_dump(None, obj, asn1_print=asn1_print)
936 blob = pyasn1_der_encode(obj)
937 if class_name is not None:
938 self.hex_dump(class_name, blob, hexdump=hexdump)
941 def send_pdu(self, req, asn1_print=None, hexdump=None):
942 k5_pdu = self.der_encode(
943 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
944 self.send_msg(k5_pdu, hexdump=hexdump)
946 def send_msg(self, msg, hexdump=None):
947 header = struct.pack('>I', len(msg))
950 self.hex_dump("send_msg", header, hexdump=hexdump)
951 self.hex_dump("send_msg", msg, hexdump=hexdump)
955 sent = self.s.send(req_pdu, 0)
956 if sent == len(req_pdu):
958 req_pdu = req_pdu[sent:]
959 except socket.error as e:
960 self._disconnect("send_msg: %s" % e)
963 self._disconnect("send_msg: %s" % e)
966 def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
969 if timeout is not None:
970 self.s.settimeout(timeout)
971 rep_pdu = self.s.recv(num_recv, 0)
972 self.s.settimeout(10)
973 if len(rep_pdu) == 0:
974 self._disconnect("recv_raw: EOF")
976 self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
977 except socket.timeout:
978 self.s.settimeout(10)
979 sys.stderr.write("recv_raw: TIMEOUT\n")
980 except socket.error as e:
981 self._disconnect("recv_raw: %s" % e)
984 self._disconnect("recv_raw: %s" % e)
988 def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
989 raw_pdu = self.recv_raw(
990 num_recv=4, hexdump=hexdump, timeout=timeout)
993 header = struct.unpack(">I", raw_pdu[0:4])
1000 raw_pdu = self.recv_raw(
1001 num_recv=missing, hexdump=hexdump, timeout=timeout)
1002 self.assertGreaterEqual(len(raw_pdu), 1)
1004 missing = k5_len - len(rep_pdu)
1007 def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
1008 rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
1012 return None, rep_pdu
1013 k5_raw = self.der_decode(
1016 native_encode=False,
1019 pvno = k5_raw['field-0']
1020 self.assertEqual(pvno, 5)
1021 msg_type = k5_raw['field-1']
1022 self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
1023 if msg_type == KRB_AS_REP:
1024 asn1Spec = krb5_asn1.AS_REP()
1025 elif msg_type == KRB_TGS_REP:
1026 asn1Spec = krb5_asn1.TGS_REP()
1027 elif msg_type == KRB_ERROR:
1028 asn1Spec = krb5_asn1.KRB_ERROR()
1029 rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
1030 asn1_print=asn1_print, hexdump=False)
1031 return (rep, rep_pdu)
1033 def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1034 (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
1039 def assertIsConnected(self):
1040 self.assertIsNotNone(self.s, msg="Not connected")
1042 def assertNotConnected(self):
1043 self.assertIsNone(self.s, msg="Is connected")
1045 def send_recv_transaction(
1052 host = self.host if to_rodc else self.dc_host
1055 self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1056 rep = self.recv_pdu(
1057 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1059 self._disconnect("transaction failed")
1061 self._disconnect("transaction done")
1064 def assertNoValue(self, value):
1065 self.assertTrue(value.isNoValue)
1067 def assertHasValue(self, value):
1068 self.assertIsNotNone(value)
1070 def getElementValue(self, obj, elem):
1071 return obj.get(elem)
1073 def assertElementMissing(self, obj, elem):
1074 v = self.getElementValue(obj, elem)
1075 self.assertIsNone(v)
1077 def assertElementPresent(self, obj, elem, expect_empty=False):
1078 v = self.getElementValue(obj, elem)
1079 self.assertIsNotNone(v)
1080 if self.strict_checking:
1081 if isinstance(v, collections.abc.Container):
1083 self.assertEqual(0, len(v))
1085 self.assertNotEqual(0, len(v))
1087 def assertElementEqual(self, obj, elem, value):
1088 v = self.getElementValue(obj, elem)
1089 self.assertIsNotNone(v)
1090 self.assertEqual(v, value)
1092 def assertElementEqualUTF8(self, obj, elem, value):
1093 v = self.getElementValue(obj, elem)
1094 self.assertIsNotNone(v)
1095 self.assertEqual(v, bytes(value, 'utf8'))
1097 def assertPrincipalEqual(self, princ1, princ2):
1098 self.assertEqual(princ1['name-type'], princ2['name-type'])
1100 len(princ1['name-string']),
1101 len(princ2['name-string']),
1102 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1103 for idx in range(len(princ1['name-string'])):
1105 princ1['name-string'][idx],
1106 princ2['name-string'][idx],
1107 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1109 def assertElementEqualPrincipal(self, obj, elem, value):
1110 v = self.getElementValue(obj, elem)
1111 self.assertIsNotNone(v)
1112 v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1113 self.assertPrincipalEqual(v, value)
1115 def assertElementKVNO(self, obj, elem, value):
1116 v = self.getElementValue(obj, elem)
1117 if value == "autodetect":
1119 if value is not None:
1120 self.assertIsNotNone(v)
1121 # The value on the wire should never be 0
1122 self.assertNotEqual(v, 0)
1123 # unspecified_kvno means we don't know the kvno,
1124 # but want to enforce its presence
1125 if value is not self.unspecified_kvno:
1127 self.assertNotEqual(value, 0)
1128 self.assertEqual(v, value)
1130 self.assertIsNone(v)
1132 def assertElementFlags(self, obj, elem, expected, unexpected):
1133 v = self.getElementValue(obj, elem)
1134 self.assertIsNotNone(v)
1135 if expected is not None:
1136 self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1137 for i, flag in enumerate(expected):
1139 self.assertEqual('1', v[i],
1140 f"'{expected.namedValues[i]}' "
1142 if unexpected is not None:
1143 self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1144 for i, flag in enumerate(unexpected):
1146 self.assertEqual('0', v[i],
1147 f"'{unexpected.namedValues[i]}' "
1148 f"unexpected in {v}")
1150 def assertSequenceElementsEqual(self, expected, got, *,
1151 require_strict=None,
1152 require_ordered=True):
1153 if self.strict_checking and require_ordered:
1154 self.assertEqual(expected, got)
1156 fail_msg = f'expected: {expected} got: {got}'
1158 if not self.strict_checking and require_strict is not None:
1159 fail_msg += f' (ignoring: {require_strict})'
1160 expected = (x for x in expected if x not in require_strict)
1161 got = (x for x in got if x not in require_strict)
1163 self.assertCountEqual(expected, got, fail_msg)
1165 def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1168 if offset is not None:
1169 epoch = epoch + int(offset)
1170 dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1171 return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1173 def get_KerberosTime(self, epoch=None, offset=None):
1174 (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1177 def get_EpochFromKerberosTime(self, kerberos_time):
1178 if isinstance(kerberos_time, bytes):
1179 kerberos_time = kerberos_time.decode()
1181 epoch = datetime.datetime.strptime(kerberos_time,
1183 epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1184 epoch = int(epoch.timestamp())
1188 def get_Nonce(self):
1189 nonce_min = 0x7f000000
1190 nonce_max = 0x7fffffff
1191 v = random.randint(nonce_min, nonce_max)
1194 def get_pa_dict(self, pa_data):
1197 if pa_data is not None:
1199 pa_type = pa['padata-type']
1200 if pa_type in pa_dict:
1201 raise RuntimeError(f'Duplicate type {pa_type}')
1202 pa_dict[pa_type] = pa['padata-value']
1206 def SessionKey_create(self, etype, contents, kvno=None):
1207 key = kcrypto.Key(etype, contents)
1208 return RodcPacEncryptionKey(key, kvno)
1210 def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1212 self.assertIsNotNone(pwd)
1213 self.assertIsNotNone(salt)
1214 key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1215 return RodcPacEncryptionKey(key, kvno)
1217 def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1218 e = etype_info2['etype']
1220 salt = etype_info2.get('salt')
1222 if e == kcrypto.Enctype.RC4:
1223 nthash = creds.get_nt_hash()
1224 return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1226 params = etype_info2.get('s2kparams')
1228 password = creds.get_password()
1229 return self.PasswordKey_create(
1230 etype=e, pwd=password, salt=salt, kvno=kvno, params=params)
1232 def TicketDecryptionKey_from_creds(self, creds, etype=None):
1235 etypes = creds.get_tgs_krb5_etypes()
1239 etype = kcrypto.Enctype.RC4
1241 forced_key = creds.get_forced_key(etype)
1242 if forced_key is not None:
1245 kvno = creds.get_kvno()
1247 fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1248 "nor a password specified, " % (
1249 creds.get_username(), etype, kvno))
1251 if etype == kcrypto.Enctype.RC4:
1252 nthash = creds.get_nt_hash()
1253 self.assertIsNotNone(nthash, msg=fail_msg)
1254 return self.SessionKey_create(etype=etype,
1258 password = creds.get_password()
1259 self.assertIsNotNone(password, msg=fail_msg)
1260 salt = creds.get_salt()
1261 return self.PasswordKey_create(etype=etype,
1266 def RandomKey(self, etype):
1267 e = kcrypto._get_enctype_profile(etype)
1268 contents = samba.generate_random_bytes(e.keysize)
1269 return self.SessionKey_create(etype=etype, contents=contents)
1271 def EncryptionKey_import(self, EncryptionKey_obj):
1272 return self.SessionKey_create(EncryptionKey_obj['keytype'],
1273 EncryptionKey_obj['keyvalue'])
1275 def EncryptedData_create(self, key, usage, plaintext):
1276 # EncryptedData ::= SEQUENCE {
1277 # etype [0] Int32 -- EncryptionType --,
1278 # kvno [1] Int32 OPTIONAL,
1279 # cipher [2] OCTET STRING -- ciphertext
1281 ciphertext = key.encrypt(usage, plaintext)
1282 EncryptedData_obj = {
1284 'cipher': ciphertext
1286 if key.kvno is not None:
1287 EncryptedData_obj['kvno'] = key.kvno
1288 return EncryptedData_obj
1290 def Checksum_create(self, key, usage, plaintext, ctype=None):
1291 # Checksum ::= SEQUENCE {
1292 # cksumtype [0] Int32,
1293 # checksum [1] OCTET STRING
1297 checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1300 'checksum': checksum,
1305 def PrincipalName_create(cls, name_type, names):
1306 # PrincipalName ::= SEQUENCE {
1307 # name-type [0] Int32,
1308 # name-string [1] SEQUENCE OF KerberosString
1310 PrincipalName_obj = {
1311 'name-type': name_type,
1312 'name-string': names,
1314 return PrincipalName_obj
1316 def AuthorizationData_create(self, ad_type, ad_data):
1317 # AuthorizationData ::= SEQUENCE {
1318 # ad-type [0] Int32,
1319 # ad-data [1] OCTET STRING
1325 return AUTH_DATA_obj
1327 def PA_DATA_create(self, padata_type, padata_value):
1328 # PA-DATA ::= SEQUENCE {
1329 # -- NOTE: first tag is [1], not [0]
1330 # padata-type [1] Int32,
1331 # padata-value [2] OCTET STRING -- might be encoded AP-REQ
1334 'padata-type': padata_type,
1335 'padata-value': padata_value,
1339 def PA_ENC_TS_ENC_create(self, ts, usec):
1340 # PA-ENC-TS-ENC ::= SEQUENCE {
1341 # patimestamp[0] KerberosTime, -- client's time
1342 # pausec[1] krb5int32 OPTIONAL
1344 PA_ENC_TS_ENC_obj = {
1348 return PA_ENC_TS_ENC_obj
1350 def PA_PAC_OPTIONS_create(self, options):
1351 # PA-PAC-OPTIONS ::= SEQUENCE {
1352 # options [0] PACOptionFlags
1354 PA_PAC_OPTIONS_obj = {
1357 return PA_PAC_OPTIONS_obj
1359 def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1360 # KrbFastArmor ::= SEQUENCE {
1361 # armor-type [0] Int32,
1362 # armor-value [1] OCTET STRING,
1365 KRB_FAST_ARMOR_obj = {
1366 'armor-type': armor_type,
1367 'armor-value': armor_value
1369 return KRB_FAST_ARMOR_obj
1371 def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1372 # KrbFastReq ::= SEQUENCE {
1373 # fast-options [0] FastOptions,
1374 # padata [1] SEQUENCE OF PA-DATA,
1375 # req-body [2] KDC-REQ-BODY,
1378 KRB_FAST_REQ_obj = {
1379 'fast-options': fast_options,
1381 'req-body': req_body
1383 return KRB_FAST_REQ_obj
1385 def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1386 # KrbFastArmoredReq ::= SEQUENCE {
1387 # armor [0] KrbFastArmor OPTIONAL,
1388 # req-checksum [1] Checksum,
1389 # enc-fast-req [2] EncryptedData -- KrbFastReq --
1391 KRB_FAST_ARMORED_REQ_obj = {
1392 'req-checksum': req_checksum,
1393 'enc-fast-req': enc_fast_req
1395 if armor is not None:
1396 KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1397 return KRB_FAST_ARMORED_REQ_obj
1399 def PA_FX_FAST_REQUEST_create(self, armored_data):
1400 # PA-FX-FAST-REQUEST ::= CHOICE {
1401 # armored-data [0] KrbFastArmoredReq,
1404 PA_FX_FAST_REQUEST_obj = {
1405 'armored-data': armored_data
1407 return PA_FX_FAST_REQUEST_obj
1409 def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1410 # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1411 # include-pac[0] BOOLEAN --If TRUE, and no pac present,
1413 # --If FALSE, and PAC present,
1416 KERB_PA_PAC_REQUEST_obj = {
1417 'include-pac': include_pac,
1419 if not pa_data_create:
1420 return KERB_PA_PAC_REQUEST_obj
1421 pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1422 asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1423 pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1426 def get_pa_pac_options(self, options):
1427 pac_options = self.PA_PAC_OPTIONS_create(options)
1428 pac_options = self.der_encode(pac_options,
1429 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1430 pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1434 def KDC_REQ_BODY_create(self,
1446 EncAuthorizationData,
1447 EncAuthorizationData_key,
1448 EncAuthorizationData_usage,
1451 # KDC-REQ-BODY ::= SEQUENCE {
1452 # kdc-options [0] KDCOptions,
1453 # cname [1] PrincipalName OPTIONAL
1454 # -- Used only in AS-REQ --,
1457 # -- Also client's in AS-REQ --,
1458 # sname [3] PrincipalName OPTIONAL,
1459 # from [4] KerberosTime OPTIONAL,
1460 # till [5] KerberosTime,
1461 # rtime [6] KerberosTime OPTIONAL,
1463 # etype [8] SEQUENCE OF Int32
1465 # -- in preference order --,
1466 # addresses [9] HostAddresses OPTIONAL,
1467 # enc-authorization-data [10] EncryptedData OPTIONAL
1468 # -- AuthorizationData --,
1469 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1470 # -- NOTE: not empty
1472 if EncAuthorizationData is not None:
1473 enc_ad_plain = self.der_encode(
1474 EncAuthorizationData,
1475 asn1Spec=krb5_asn1.AuthorizationData(),
1476 asn1_print=asn1_print,
1478 enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1479 EncAuthorizationData_usage,
1483 KDC_REQ_BODY_obj = {
1484 'kdc-options': kdc_options,
1490 if cname is not None:
1491 KDC_REQ_BODY_obj['cname'] = cname
1492 if sname is not None:
1493 KDC_REQ_BODY_obj['sname'] = sname
1494 if from_time is not None:
1495 KDC_REQ_BODY_obj['from'] = from_time
1496 if renew_time is not None:
1497 KDC_REQ_BODY_obj['rtime'] = renew_time
1498 if addresses is not None:
1499 KDC_REQ_BODY_obj['addresses'] = addresses
1500 if enc_ad is not None:
1501 KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1502 if additional_tickets is not None:
1503 KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1504 return KDC_REQ_BODY_obj
1506 def KDC_REQ_create(self,
1513 # KDC-REQ ::= SEQUENCE {
1514 # -- NOTE: first tag is [1], not [0]
1515 # pvno [1] INTEGER (5) ,
1516 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1517 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1518 # -- NOTE: not empty --,
1519 # req-body [4] KDC-REQ-BODY
1524 'msg-type': msg_type,
1525 'req-body': req_body,
1527 if padata is not None:
1528 KDC_REQ_obj['padata'] = padata
1529 if asn1Spec is not None:
1530 KDC_REQ_decoded = pyasn1_native_decode(
1531 KDC_REQ_obj, asn1Spec=asn1Spec)
1533 KDC_REQ_decoded = None
1534 return KDC_REQ_obj, KDC_REQ_decoded
1536 def AS_REQ_create(self,
1538 kdc_options, # required
1542 from_time, # optional
1543 till_time, # required
1544 renew_time, # optional
1547 addresses, # optional
1549 native_decoded_only=True,
1552 # KDC-REQ ::= SEQUENCE {
1553 # -- NOTE: first tag is [1], not [0]
1554 # pvno [1] INTEGER (5) ,
1555 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1556 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1557 # -- NOTE: not empty --,
1558 # req-body [4] KDC-REQ-BODY
1561 # KDC-REQ-BODY ::= SEQUENCE {
1562 # kdc-options [0] KDCOptions,
1563 # cname [1] PrincipalName OPTIONAL
1564 # -- Used only in AS-REQ --,
1567 # -- Also client's in AS-REQ --,
1568 # sname [3] PrincipalName OPTIONAL,
1569 # from [4] KerberosTime OPTIONAL,
1570 # till [5] KerberosTime,
1571 # rtime [6] KerberosTime OPTIONAL,
1573 # etype [8] SEQUENCE OF Int32
1575 # -- in preference order --,
1576 # addresses [9] HostAddresses OPTIONAL,
1577 # enc-authorization-data [10] EncryptedData OPTIONAL
1578 # -- AuthorizationData --,
1579 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1580 # -- NOTE: not empty
1582 KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1594 EncAuthorizationData=None,
1595 EncAuthorizationData_key=None,
1596 EncAuthorizationData_usage=None,
1597 asn1_print=asn1_print,
1599 obj, decoded = self.KDC_REQ_create(
1600 msg_type=KRB_AS_REQ,
1602 req_body=KDC_REQ_BODY_obj,
1603 asn1Spec=krb5_asn1.AS_REQ(),
1604 asn1_print=asn1_print,
1606 if native_decoded_only:
1610 def AP_REQ_create(self, ap_options, ticket, authenticator):
1611 # AP-REQ ::= [APPLICATION 14] SEQUENCE {
1612 # pvno [0] INTEGER (5),
1613 # msg-type [1] INTEGER (14),
1614 # ap-options [2] APOptions,
1615 # ticket [3] Ticket,
1616 # authenticator [4] EncryptedData -- Authenticator
1620 'msg-type': KRB_AP_REQ,
1621 'ap-options': ap_options,
1623 'authenticator': authenticator,
1627 def Authenticator_create(
1628 self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1629 authorization_data):
1630 # -- Unencrypted authenticator
1631 # Authenticator ::= [APPLICATION 2] SEQUENCE {
1632 # authenticator-vno [0] INTEGER (5),
1634 # cname [2] PrincipalName,
1635 # cksum [3] Checksum OPTIONAL,
1636 # cusec [4] Microseconds,
1637 # ctime [5] KerberosTime,
1638 # subkey [6] EncryptionKey OPTIONAL,
1639 # seq-number [7] UInt32 OPTIONAL,
1640 # authorization-data [8] AuthorizationData OPTIONAL
1642 Authenticator_obj = {
1643 'authenticator-vno': 5,
1649 if cksum is not None:
1650 Authenticator_obj['cksum'] = cksum
1651 if subkey is not None:
1652 Authenticator_obj['subkey'] = subkey
1653 if seq_number is not None:
1654 Authenticator_obj['seq-number'] = seq_number
1655 if authorization_data is not None:
1656 Authenticator_obj['authorization-data'] = authorization_data
1657 return Authenticator_obj
1659 def TGS_REQ_create(self,
1664 kdc_options, # required
1668 from_time, # optional
1669 till_time, # required
1670 renew_time, # optional
1673 addresses, # optional
1674 EncAuthorizationData,
1675 EncAuthorizationData_key,
1678 authenticator_subkey=None,
1679 body_checksum_type=None,
1680 native_decoded_only=True,
1683 # KDC-REQ ::= SEQUENCE {
1684 # -- NOTE: first tag is [1], not [0]
1685 # pvno [1] INTEGER (5) ,
1686 # msg-type [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1687 # padata [3] SEQUENCE OF PA-DATA OPTIONAL
1688 # -- NOTE: not empty --,
1689 # req-body [4] KDC-REQ-BODY
1692 # KDC-REQ-BODY ::= SEQUENCE {
1693 # kdc-options [0] KDCOptions,
1694 # cname [1] PrincipalName OPTIONAL
1695 # -- Used only in AS-REQ --,
1698 # -- Also client's in AS-REQ --,
1699 # sname [3] PrincipalName OPTIONAL,
1700 # from [4] KerberosTime OPTIONAL,
1701 # till [5] KerberosTime,
1702 # rtime [6] KerberosTime OPTIONAL,
1704 # etype [8] SEQUENCE OF Int32
1706 # -- in preference order --,
1707 # addresses [9] HostAddresses OPTIONAL,
1708 # enc-authorization-data [10] EncryptedData OPTIONAL
1709 # -- AuthorizationData --,
1710 # additional-tickets [11] SEQUENCE OF Ticket OPTIONAL
1711 # -- NOTE: not empty
1714 if authenticator_subkey is not None:
1715 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1717 EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1719 req_body = self.KDC_REQ_BODY_create(
1720 kdc_options=kdc_options,
1724 from_time=from_time,
1725 till_time=till_time,
1726 renew_time=renew_time,
1729 addresses=addresses,
1730 additional_tickets=additional_tickets,
1731 EncAuthorizationData=EncAuthorizationData,
1732 EncAuthorizationData_key=EncAuthorizationData_key,
1733 EncAuthorizationData_usage=EncAuthorizationData_usage)
1734 req_body_blob = self.der_encode(req_body,
1735 asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1736 asn1_print=asn1_print, hexdump=hexdump)
1738 req_body_checksum = self.Checksum_create(ticket_session_key,
1739 KU_TGS_REQ_AUTH_CKSUM,
1741 ctype=body_checksum_type)
1744 if authenticator_subkey is not None:
1745 subkey_obj = authenticator_subkey.export_obj()
1746 seq_number = random.randint(0, 0xfffffffe)
1747 authenticator = self.Authenticator_create(
1750 cksum=req_body_checksum,
1754 seq_number=seq_number,
1755 authorization_data=None)
1756 authenticator = self.der_encode(
1758 asn1Spec=krb5_asn1.Authenticator(),
1759 asn1_print=asn1_print,
1762 authenticator = self.EncryptedData_create(
1763 ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1765 ap_options = krb5_asn1.APOptions('0')
1766 ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1768 authenticator=authenticator)
1769 ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1770 asn1_print=asn1_print, hexdump=hexdump)
1771 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1772 if padata is not None:
1773 padata.append(pa_tgs_req)
1775 padata = [pa_tgs_req]
1777 obj, decoded = self.KDC_REQ_create(
1778 msg_type=KRB_TGS_REQ,
1781 asn1Spec=krb5_asn1.TGS_REQ(),
1782 asn1_print=asn1_print,
1784 if native_decoded_only:
1788 def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1789 # PA-S4U2Self ::= SEQUENCE {
1790 # name [0] PrincipalName,
1792 # cksum [2] Checksum,
1793 # auth [3] GeneralString
1795 cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1796 for n in name['name-string']:
1797 cksum_data += n.encode()
1798 cksum_data += realm.encode()
1799 cksum_data += "Kerberos".encode()
1800 cksum = self.Checksum_create(tgt_session_key,
1801 KU_NON_KERB_CKSUM_SALT,
1811 pa_s4u2self = self.der_encode(
1812 PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1813 return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1815 def ChangePasswdDataMS_create(self,
1819 ChangePasswdDataMS_obj = {
1820 'newpasswd': new_password,
1822 if target_princ is not None:
1823 ChangePasswdDataMS_obj['targname'] = target_princ
1824 if target_realm is not None:
1825 ChangePasswdDataMS_obj['targrealm'] = target_realm
1827 change_password_data = self.der_encode(
1828 ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS())
1830 return change_password_data
1832 def KRB_PRIV_create(self,
1840 EncKrbPrivPart_obj = {
1841 'user-data': user_data,
1842 's-address': s_address,
1844 if timestamp is not None:
1845 EncKrbPrivPart_obj['timestamp'] = timestamp
1846 if usec is not None:
1847 EncKrbPrivPart_obj['usec'] = usec
1848 if seq_number is not None:
1849 EncKrbPrivPart_obj['seq-number'] = seq_number
1850 if r_address is not None:
1851 EncKrbPrivPart_obj['r-address'] = r_address
1853 enc_krb_priv_part = self.der_encode(
1854 EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart())
1856 enc_data = self.EncryptedData_create(subkey,
1862 'msg-type': KRB_PRIV,
1863 'enc-part': enc_data,
1866 krb_priv = self.der_encode(
1867 KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV())
1871 def kpasswd_create(self,
1879 self.assertIsNotNone(self.s, 'call self.connect() first')
1881 timestamp, usec = self.get_KerberosTimeWithUsec()
1883 krb_priv = self.KRB_PRIV_create(subkey,
1885 s_address=local_address,
1886 timestamp=timestamp,
1888 seq_number=seq_number,
1889 r_address=remote_address)
1891 size = 6 + len(ap_req) + len(krb_priv)
1892 self.assertLess(size, 0x10000)
1895 msg.append(size >> 8)
1896 msg.append(size & 0xff)
1897 msg.append(version >> 8)
1898 msg.append(version & 0xff)
1899 msg.append(len(ap_req) >> 8)
1900 msg.append(len(ap_req) & 0xff)
1901 # Note: for sets, there could be a little-endian four-byte length here.
1904 msg.extend(krb_priv)
1908 def get_enc_part(self, obj, key, usage):
1909 self.assertElementEqual(obj, 'pvno', 5)
1911 enc_part = obj['enc-part']
1912 self.assertElementEqual(enc_part, 'etype', key.etype)
1913 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
1915 enc_part = key.decrypt(usage, enc_part['cipher'])
1919 def kpasswd_exchange(self,
1928 send_seq_number=True):
1929 if mode is self.KpasswdMode.SET:
1931 user_data = self.ChangePasswdDataMS_create(new_password,
1934 elif mode is self.KpasswdMode.CHANGE:
1935 self.assertIsNone(target_princ,
1936 'target_princ only valid for pw set')
1937 self.assertIsNone(target_realm,
1938 'target_realm only valid for pw set')
1941 user_data = new_password.encode('utf-8')
1943 self.fail(f'invalid mode {mode}')
1945 subkey = self.RandomKey(kcrypto.Enctype.AES256)
1947 if ap_options is None:
1949 ap_options = str(krb5_asn1.APOptions(ap_options))
1951 kdc_exchange_dict = {
1953 'authenticator_subkey': subkey,
1955 'ap_options': ap_options,
1959 seq_number = random.randint(0, 0xfffffffe)
1963 ap_req = self.generate_ap_req(kdc_exchange_dict,
1967 usage=KU_AP_REQ_AUTH,
1968 seq_number=seq_number)
1970 self.connect(self.host, port=464)
1971 self.assertIsNotNone(self.s)
1973 family = self.s.family
1975 if family == socket.AF_INET:
1976 addr_type = 2 # IPv4
1977 elif family == socket.AF_INET6:
1978 addr_type = 24 # IPv6
1980 self.fail(f'unknown family {family}')
1982 def create_address(ip):
1984 'addr-type': addr_type,
1985 'address': socket.inet_pton(family, ip),
1988 local_ip = self.s.getsockname()[0]
1989 local_address = create_address(local_ip)
1991 # remote_ip = self.s.getpeername()[0]
1992 # remote_address = create_address(remote_ip)
1994 # TODO: due to a bug (?), MIT Kerberos will not accept the request
1995 # unless r-address is set to our _local_ address. Heimdal, on the other
1996 # hand, requires the r-address is set to the remote address (as
1997 # expected). To avoid problems, avoid sending r-address for now.
1998 remote_address = None
2000 msg = self.kpasswd_create(subkey,
2009 rep_pdu = self.recv_pdu_raw()
2011 self._disconnect('transaction done')
2013 self.assertIsNotNone(rep_pdu)
2015 header = rep_pdu[:6]
2018 reply_len = (header[0] << 8) | header[1]
2019 reply_version = (header[2] << 8) | header[3]
2020 ap_rep_len = (header[4] << 8) | header[5]
2022 self.assertEqual(reply_len, len(rep_pdu))
2023 self.assertEqual(1, reply_version) # KRB5_KPASSWD_VERS_CHANGEPW
2024 self.assertLess(ap_rep_len, reply_len)
2026 self.assertNotEqual(0x7e, rep_pdu[1])
2027 self.assertNotEqual(0x5e, rep_pdu[1])
2030 # We received an AP-REQ and KRB-PRIV as a response. This may or may
2031 # not indicate an error, depending on the status code.
2032 ap_rep = reply[:ap_rep_len]
2033 krb_priv = reply[ap_rep_len:]
2035 key = ticket.session_key
2037 ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
2038 self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
2039 enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
2040 enc_part = self.der_decode(
2041 enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
2043 self.assertElementPresent(enc_part, 'ctime')
2044 self.assertElementPresent(enc_part, 'cusec')
2045 # self.assertElementMissing(enc_part, 'subkey') # TODO
2046 # self.assertElementPresent(enc_part, 'seq-number') # TODO
2049 krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
2053 self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
2054 priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
2055 priv_enc_part = self.der_decode(
2056 priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
2058 self.assertElementMissing(priv_enc_part, 'timestamp')
2059 self.assertElementMissing(priv_enc_part, 'usec')
2060 # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
2061 # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
2062 # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
2064 result_data = priv_enc_part['user-data']
2066 # We received a KRB-ERROR as a response, indicating an error.
2067 krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
2069 sname = self.PrincipalName_create(
2070 name_type=NT_PRINCIPAL,
2071 names=['kadmin', 'changepw'])
2072 realm = self.get_krbtgt_creds().get_realm().upper()
2074 self.assertElementEqual(krb_error, 'pvno', 5)
2075 self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
2076 self.assertElementMissing(krb_error, 'ctime')
2077 self.assertElementMissing(krb_error, 'usec')
2078 self.assertElementPresent(krb_error, 'stime')
2079 self.assertElementPresent(krb_error, 'susec')
2081 error_code = krb_error['error-code']
2082 if isinstance(expected_code, int):
2083 self.assertEqual(error_code, expected_code)
2085 self.assertIn(error_code, expected_code)
2087 self.assertElementMissing(krb_error, 'crealm')
2088 self.assertElementMissing(krb_error, 'cname')
2089 self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
2090 self.assertElementEqualPrincipal(krb_error, 'sname', sname)
2091 self.assertElementMissing(krb_error, 'e-text')
2093 result_data = krb_error['e-data']
2095 status = result_data[:2]
2096 message = result_data[2:]
2098 status_code = (status[0] << 8) | status[1]
2099 if isinstance(expected_code, int):
2100 self.assertEqual(status_code, expected_code)
2102 self.assertIn(status_code, expected_code)
2105 self.assertEqual(0, status_code,
2106 'got an error result, but no message')
2109 # Check the first character of the message.
2111 if isinstance(expected_msg, bytes):
2112 self.assertEqual(message, expected_msg)
2114 self.assertIn(message, expected_msg)
2116 # We got AD password policy information.
2117 self.assertEqual(30, len(message))
2124 min_age) = struct.unpack('>HIIIQQ', message)
2126 def _generic_kdc_exchange(self,
2127 kdc_exchange_dict, # required
2128 cname=None, # optional
2129 realm=None, # required
2130 sname=None, # optional
2131 from_time=None, # optional
2132 till_time=None, # required
2133 renew_time=None, # optional
2134 etypes=None, # required
2135 addresses=None, # optional
2136 additional_tickets=None, # optional
2137 EncAuthorizationData=None, # optional
2138 EncAuthorizationData_key=None, # optional
2139 EncAuthorizationData_usage=None): # optional
2141 check_error_fn = kdc_exchange_dict['check_error_fn']
2142 check_rep_fn = kdc_exchange_dict['check_rep_fn']
2143 generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
2144 generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
2145 generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
2146 generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
2147 callback_dict = kdc_exchange_dict['callback_dict']
2148 req_msg_type = kdc_exchange_dict['req_msg_type']
2149 req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
2150 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2152 expected_error_mode = kdc_exchange_dict['expected_error_mode']
2153 kdc_options = kdc_exchange_dict['kdc_options']
2155 pac_request = kdc_exchange_dict['pac_request']
2156 pac_options = kdc_exchange_dict['pac_options']
2158 # Parameters specific to the inner request body
2159 inner_req = kdc_exchange_dict['inner_req']
2161 # Parameters specific to the outer request body
2162 outer_req = kdc_exchange_dict['outer_req']
2164 if till_time is None:
2165 till_time = self.get_KerberosTime(offset=36000)
2167 if 'nonce' in kdc_exchange_dict:
2168 nonce = kdc_exchange_dict['nonce']
2170 nonce = self.get_Nonce()
2171 kdc_exchange_dict['nonce'] = nonce
2173 req_body = self.KDC_REQ_BODY_create(
2174 kdc_options=kdc_options,
2178 from_time=from_time,
2179 till_time=till_time,
2180 renew_time=renew_time,
2183 addresses=addresses,
2184 additional_tickets=additional_tickets,
2185 EncAuthorizationData=EncAuthorizationData,
2186 EncAuthorizationData_key=EncAuthorizationData_key,
2187 EncAuthorizationData_usage=EncAuthorizationData_usage)
2189 inner_req_body = dict(req_body)
2190 if inner_req is not None:
2191 for key, value in inner_req.items():
2192 if value is not None:
2193 inner_req_body[key] = value
2195 del inner_req_body[key]
2196 if outer_req is not None:
2197 for key, value in outer_req.items():
2198 if value is not None:
2199 req_body[key] = value
2203 additional_padata = []
2204 if pac_request is not None:
2205 pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
2206 additional_padata.append(pa_pac_request)
2207 if pac_options is not None:
2208 pa_pac_options = self.get_pa_pac_options(pac_options)
2209 additional_padata.append(pa_pac_options)
2211 if req_msg_type == KRB_AS_REQ:
2213 tgs_req_padata = None
2215 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2217 tgs_req = self.generate_ap_req(kdc_exchange_dict,
2221 tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
2223 if generate_fast_padata_fn is not None:
2224 self.assertIsNotNone(generate_fast_fn)
2225 # This can alter req_body...
2226 fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
2232 if generate_fast_armor_fn is not None:
2233 self.assertIsNotNone(generate_fast_fn)
2234 fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
2239 fast_armor_type = kdc_exchange_dict['fast_armor_type']
2240 fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
2245 if generate_padata_fn is not None:
2246 # This can alter req_body...
2247 outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
2250 self.assertIsNotNone(outer_padata)
2251 self.assertNotIn(PADATA_KDC_REQ,
2252 [pa['padata-type'] for pa in outer_padata],
2253 'Don\'t create TGS-REQ manually')
2257 if generate_fast_fn is not None:
2258 armor_key = kdc_exchange_dict['armor_key']
2259 self.assertIsNotNone(armor_key)
2261 if req_msg_type == KRB_AS_REQ:
2262 checksum_blob = self.der_encode(
2264 asn1Spec=krb5_asn1.KDC_REQ_BODY())
2266 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2267 checksum_blob = tgs_req
2269 checksum = self.Checksum_create(armor_key,
2273 fast_padata += additional_padata
2274 fast = generate_fast_fn(kdc_exchange_dict,
2285 if tgs_req_padata is not None:
2286 padata.append(tgs_req_padata)
2288 if fast is not None:
2291 if outer_padata is not None:
2292 padata += outer_padata
2295 padata += additional_padata
2300 kdc_exchange_dict['req_padata'] = padata
2301 kdc_exchange_dict['fast_padata'] = fast_padata
2302 kdc_exchange_dict['req_body'] = inner_req_body
2304 req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
2307 asn1Spec=req_asn1Spec())
2309 to_rodc = kdc_exchange_dict['to_rodc']
2311 rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
2312 self.assertIsNotNone(rep)
2314 msg_type = self.getElementValue(rep, 'msg-type')
2315 self.assertIsNotNone(msg_type)
2317 expected_msg_type = None
2318 if check_error_fn is not None:
2319 expected_msg_type = KRB_ERROR
2320 self.assertIsNone(check_rep_fn)
2321 self.assertNotEqual(0, len(expected_error_mode))
2322 self.assertNotIn(0, expected_error_mode)
2323 if check_rep_fn is not None:
2324 expected_msg_type = rep_msg_type
2325 self.assertIsNone(check_error_fn)
2326 self.assertEqual(0, len(expected_error_mode))
2327 self.assertIsNotNone(expected_msg_type)
2328 if msg_type == KRB_ERROR:
2329 error_code = self.getElementValue(rep, 'error-code')
2330 fail_msg = f'Got unexpected error: {error_code}'
2332 fail_msg = f'Expected to fail with error: {expected_error_mode}'
2333 self.assertEqual(msg_type, expected_msg_type, fail_msg)
2335 if msg_type == KRB_ERROR:
2336 return check_error_fn(kdc_exchange_dict,
2340 return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
2342 def as_exchange_dict(self,
2343 expected_crealm=None,
2344 expected_cname=None,
2345 expected_anon=False,
2346 expected_srealm=None,
2347 expected_sname=None,
2348 expected_account_name=None,
2349 expected_upn_name=None,
2351 expected_supported_etypes=None,
2352 expected_flags=None,
2353 unexpected_flags=None,
2354 ticket_decryption_key=None,
2355 expect_ticket_checksum=None,
2356 generate_fast_fn=None,
2357 generate_fast_armor_fn=None,
2358 generate_fast_padata_fn=None,
2359 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2360 generate_padata_fn=None,
2361 check_error_fn=None,
2363 check_kdc_private_fn=None,
2365 expected_error_mode=0,
2366 expected_status=None,
2367 client_as_etypes=None,
2369 authenticator_subkey=None,
2383 expect_upn_dns_info_ex=None,
2384 expect_pac_attrs=None,
2385 expect_pac_attrs_pac_request=None,
2386 expect_requester_sid=None,
2388 if expected_error_mode == 0:
2389 expected_error_mode = ()
2390 elif not isinstance(expected_error_mode, collections.abc.Container):
2391 expected_error_mode = (expected_error_mode,)
2393 kdc_exchange_dict = {
2394 'req_msg_type': KRB_AS_REQ,
2395 'req_asn1Spec': krb5_asn1.AS_REQ,
2396 'rep_msg_type': KRB_AS_REP,
2397 'rep_asn1Spec': krb5_asn1.AS_REP,
2398 'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2399 'expected_crealm': expected_crealm,
2400 'expected_cname': expected_cname,
2401 'expected_anon': expected_anon,
2402 'expected_srealm': expected_srealm,
2403 'expected_sname': expected_sname,
2404 'expected_account_name': expected_account_name,
2405 'expected_upn_name': expected_upn_name,
2406 'expected_sid': expected_sid,
2407 'expected_supported_etypes': expected_supported_etypes,
2408 'expected_flags': expected_flags,
2409 'unexpected_flags': unexpected_flags,
2410 'ticket_decryption_key': ticket_decryption_key,
2411 'expect_ticket_checksum': expect_ticket_checksum,
2412 'generate_fast_fn': generate_fast_fn,
2413 'generate_fast_armor_fn': generate_fast_armor_fn,
2414 'generate_fast_padata_fn': generate_fast_padata_fn,
2415 'fast_armor_type': fast_armor_type,
2416 'generate_padata_fn': generate_padata_fn,
2417 'check_error_fn': check_error_fn,
2418 'check_rep_fn': check_rep_fn,
2419 'check_kdc_private_fn': check_kdc_private_fn,
2420 'callback_dict': callback_dict,
2421 'expected_error_mode': expected_error_mode,
2422 'expected_status': expected_status,
2423 'client_as_etypes': client_as_etypes,
2424 'expected_salt': expected_salt,
2425 'authenticator_subkey': authenticator_subkey,
2426 'preauth_key': preauth_key,
2427 'armor_key': armor_key,
2428 'armor_tgt': armor_tgt,
2429 'armor_subkey': armor_subkey,
2430 'auth_data': auth_data,
2431 'kdc_options': kdc_options,
2432 'inner_req': inner_req,
2433 'outer_req': outer_req,
2434 'pac_request': pac_request,
2435 'pac_options': pac_options,
2436 'expect_edata': expect_edata,
2437 'expect_pac': expect_pac,
2438 'expect_claims': expect_claims,
2439 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2440 'expect_pac_attrs': expect_pac_attrs,
2441 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2442 'expect_requester_sid': expect_requester_sid,
2445 if callback_dict is None:
2448 return kdc_exchange_dict
2450 def tgs_exchange_dict(self,
2451 expected_crealm=None,
2452 expected_cname=None,
2453 expected_anon=False,
2454 expected_srealm=None,
2455 expected_sname=None,
2456 expected_account_name=None,
2457 expected_upn_name=None,
2459 expected_supported_etypes=None,
2460 expected_flags=None,
2461 unexpected_flags=None,
2462 ticket_decryption_key=None,
2463 expect_ticket_checksum=None,
2464 generate_fast_fn=None,
2465 generate_fast_armor_fn=None,
2466 generate_fast_padata_fn=None,
2467 fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2468 generate_padata_fn=None,
2469 check_error_fn=None,
2471 check_kdc_private_fn=None,
2472 expected_error_mode=0,
2473 expected_status=None,
2479 authenticator_subkey=None,
2481 body_checksum_type=None,
2490 expect_upn_dns_info_ex=None,
2491 expect_pac_attrs=None,
2492 expect_pac_attrs_pac_request=None,
2493 expect_requester_sid=None,
2494 expected_proxy_target=None,
2495 expected_transited_services=None,
2497 if expected_error_mode == 0:
2498 expected_error_mode = ()
2499 elif not isinstance(expected_error_mode, collections.abc.Container):
2500 expected_error_mode = (expected_error_mode,)
2502 kdc_exchange_dict = {
2503 'req_msg_type': KRB_TGS_REQ,
2504 'req_asn1Spec': krb5_asn1.TGS_REQ,
2505 'rep_msg_type': KRB_TGS_REP,
2506 'rep_asn1Spec': krb5_asn1.TGS_REP,
2507 'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2508 'expected_crealm': expected_crealm,
2509 'expected_cname': expected_cname,
2510 'expected_anon': expected_anon,
2511 'expected_srealm': expected_srealm,
2512 'expected_sname': expected_sname,
2513 'expected_account_name': expected_account_name,
2514 'expected_upn_name': expected_upn_name,
2515 'expected_sid': expected_sid,
2516 'expected_supported_etypes': expected_supported_etypes,
2517 'expected_flags': expected_flags,
2518 'unexpected_flags': unexpected_flags,
2519 'ticket_decryption_key': ticket_decryption_key,
2520 'expect_ticket_checksum': expect_ticket_checksum,
2521 'generate_fast_fn': generate_fast_fn,
2522 'generate_fast_armor_fn': generate_fast_armor_fn,
2523 'generate_fast_padata_fn': generate_fast_padata_fn,
2524 'fast_armor_type': fast_armor_type,
2525 'generate_padata_fn': generate_padata_fn,
2526 'check_error_fn': check_error_fn,
2527 'check_rep_fn': check_rep_fn,
2528 'check_kdc_private_fn': check_kdc_private_fn,
2529 'callback_dict': callback_dict,
2530 'expected_error_mode': expected_error_mode,
2531 'expected_status': expected_status,
2533 'body_checksum_type': body_checksum_type,
2534 'armor_key': armor_key,
2535 'armor_tgt': armor_tgt,
2536 'armor_subkey': armor_subkey,
2537 'auth_data': auth_data,
2538 'authenticator_subkey': authenticator_subkey,
2539 'kdc_options': kdc_options,
2540 'inner_req': inner_req,
2541 'outer_req': outer_req,
2542 'pac_request': pac_request,
2543 'pac_options': pac_options,
2544 'expect_edata': expect_edata,
2545 'expect_pac': expect_pac,
2546 'expect_claims': expect_claims,
2547 'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2548 'expect_pac_attrs': expect_pac_attrs,
2549 'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2550 'expect_requester_sid': expect_requester_sid,
2551 'expected_proxy_target': expected_proxy_target,
2552 'expected_transited_services': expected_transited_services,
2555 if callback_dict is None:
2558 return kdc_exchange_dict
2560 def generic_check_kdc_rep(self,
2565 expected_crealm = kdc_exchange_dict['expected_crealm']
2566 expected_anon = kdc_exchange_dict['expected_anon']
2567 expected_srealm = kdc_exchange_dict['expected_srealm']
2568 expected_sname = kdc_exchange_dict['expected_sname']
2569 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2570 check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2571 rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2572 msg_type = kdc_exchange_dict['rep_msg_type']
2573 armor_key = kdc_exchange_dict['armor_key']
2575 self.assertElementEqual(rep, 'msg-type', msg_type) # AS-REP | TGS-REP
2576 padata = self.getElementValue(rep, 'padata')
2577 if self.strict_checking:
2578 self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2580 expected_cname = self.PrincipalName_create(
2581 name_type=NT_WELLKNOWN,
2582 names=['WELLKNOWN', 'ANONYMOUS'])
2584 expected_cname = kdc_exchange_dict['expected_cname']
2585 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2586 self.assertElementPresent(rep, 'ticket')
2587 ticket = self.getElementValue(rep, 'ticket')
2588 ticket_encpart = None
2589 ticket_cipher = None
2590 self.assertIsNotNone(ticket)
2591 if ticket is not None: # Never None, but gives indentation
2592 self.assertElementEqual(ticket, 'tkt-vno', 5)
2593 self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2594 self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2595 self.assertElementPresent(ticket, 'enc-part')
2596 ticket_encpart = self.getElementValue(ticket, 'enc-part')
2597 self.assertIsNotNone(ticket_encpart)
2598 if ticket_encpart is not None: # Never None, but gives indentation
2599 self.assertElementPresent(ticket_encpart, 'etype')
2601 kdc_options = kdc_exchange_dict['kdc_options']
2602 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
2603 expect_kvno = (pos >= len(kdc_options)
2604 or kdc_options[pos] != '1')
2606 # 'unspecified' means present, with any value != 0
2607 self.assertElementKVNO(ticket_encpart, 'kvno',
2608 self.unspecified_kvno)
2610 # For user-to-user, don't expect a kvno.
2611 self.assertElementMissing(ticket_encpart, 'kvno')
2613 self.assertElementPresent(ticket_encpart, 'cipher')
2614 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2615 self.assertElementPresent(rep, 'enc-part')
2616 encpart = self.getElementValue(rep, 'enc-part')
2617 encpart_cipher = None
2618 self.assertIsNotNone(encpart)
2619 if encpart is not None: # Never None, but gives indentation
2620 self.assertElementPresent(encpart, 'etype')
2621 self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2622 self.assertElementPresent(encpart, 'cipher')
2623 encpart_cipher = self.getElementValue(encpart, 'cipher')
2625 ticket_checksum = None
2627 # Get the decryption key for the encrypted part
2628 encpart_decryption_key, encpart_decryption_usage = (
2629 self.get_preauth_key(kdc_exchange_dict))
2631 if armor_key is not None:
2632 pa_dict = self.get_pa_dict(padata)
2634 if PADATA_FX_FAST in pa_dict:
2635 fx_fast_data = pa_dict[PADATA_FX_FAST]
2636 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2641 if 'strengthen-key' in fast_response:
2642 strengthen_key = self.EncryptionKey_import(
2643 fast_response['strengthen-key'])
2644 encpart_decryption_key = (
2645 self.generate_strengthen_reply_key(
2647 encpart_decryption_key))
2649 fast_finished = fast_response.get('finished')
2650 if fast_finished is not None:
2651 ticket_checksum = fast_finished['ticket-checksum']
2653 self.check_rep_padata(kdc_exchange_dict,
2655 fast_response['padata'],
2658 ticket_private = None
2659 if ticket_decryption_key is not None:
2660 self.assertElementEqual(ticket_encpart, 'etype',
2661 ticket_decryption_key.etype)
2662 self.assertElementKVNO(ticket_encpart, 'kvno',
2663 ticket_decryption_key.kvno)
2664 ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2666 ticket_private = self.der_decode(
2668 asn1Spec=krb5_asn1.EncTicketPart())
2670 encpart_private = None
2671 self.assertIsNotNone(encpart_decryption_key)
2672 if encpart_decryption_key is not None:
2673 self.assertElementEqual(encpart, 'etype',
2674 encpart_decryption_key.etype)
2675 if self.strict_checking:
2676 self.assertElementKVNO(encpart, 'kvno',
2677 encpart_decryption_key.kvno)
2678 rep_decpart = encpart_decryption_key.decrypt(
2679 encpart_decryption_usage,
2681 # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2682 # application tag 26
2684 encpart_private = self.der_decode(
2686 asn1Spec=rep_encpart_asn1Spec())
2688 encpart_private = self.der_decode(
2690 asn1Spec=krb5_asn1.EncTGSRepPart())
2692 self.assertIsNotNone(check_kdc_private_fn)
2693 if check_kdc_private_fn is not None:
2694 check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2695 rep, ticket_private, encpart_private,
2700 def check_fx_fast_data(self,
2705 expect_strengthen_key=True):
2706 fx_fast_data = self.der_decode(fx_fast_data,
2707 asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2709 enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2710 self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2712 fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2714 fast_response = self.der_decode(fast_rep,
2715 asn1Spec=krb5_asn1.KrbFastResponse())
2717 if expect_strengthen_key and self.strict_checking:
2718 self.assertIn('strengthen-key', fast_response)
2721 self.assertIn('finished', fast_response)
2723 # Ensure that the nonce matches the nonce in the body of the request
2725 nonce = kdc_exchange_dict['nonce']
2726 self.assertEqual(nonce, fast_response['nonce'])
2728 return fast_response
2730 def generic_check_kdc_private(self,
2737 kdc_options = kdc_exchange_dict['kdc_options']
2738 canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2739 canonicalize = (canon_pos < len(kdc_options)
2740 and kdc_options[canon_pos] == '1')
2741 renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2742 renewable = (renewable_pos < len(kdc_options)
2743 and kdc_options[renewable_pos] == '1')
2744 renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
2745 renew = (renew_pos < len(kdc_options)
2746 and kdc_options[renew_pos] == '1')
2747 expect_renew_till = renewable or renew
2749 expected_crealm = kdc_exchange_dict['expected_crealm']
2750 expected_cname = kdc_exchange_dict['expected_cname']
2751 expected_srealm = kdc_exchange_dict['expected_srealm']
2752 expected_sname = kdc_exchange_dict['expected_sname']
2753 ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2755 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2757 expected_flags = kdc_exchange_dict.get('expected_flags')
2758 unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2760 ticket = self.getElementValue(rep, 'ticket')
2762 if ticket_checksum is not None:
2763 armor_key = kdc_exchange_dict['armor_key']
2764 self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2766 to_rodc = kdc_exchange_dict['to_rodc']
2768 krbtgt_creds = self.get_rodc_krbtgt_creds()
2770 krbtgt_creds = self.get_krbtgt_creds()
2771 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2773 krbtgt_keys = [krbtgt_key]
2774 if not self.strict_checking:
2775 krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
2777 etype=kcrypto.Enctype.RC4)
2778 krbtgt_keys.append(krbtgt_key_rc4)
2780 if self.expect_pac and self.is_tgs(expected_sname):
2783 expect_pac = kdc_exchange_dict['expect_pac']
2785 ticket_session_key = None
2786 if ticket_private is not None:
2787 self.assertElementFlags(ticket_private, 'flags',
2790 self.assertElementPresent(ticket_private, 'key')
2791 ticket_key = self.getElementValue(ticket_private, 'key')
2792 self.assertIsNotNone(ticket_key)
2793 if ticket_key is not None: # Never None, but gives indentation
2794 self.assertElementPresent(ticket_key, 'keytype')
2795 self.assertElementPresent(ticket_key, 'keyvalue')
2796 ticket_session_key = self.EncryptionKey_import(ticket_key)
2797 self.assertElementEqualUTF8(ticket_private, 'crealm',
2799 if self.strict_checking:
2800 self.assertElementEqualPrincipal(ticket_private, 'cname',
2802 self.assertElementPresent(ticket_private, 'transited')
2803 self.assertElementPresent(ticket_private, 'authtime')
2804 if self.strict_checking:
2805 self.assertElementPresent(ticket_private, 'starttime')
2806 self.assertElementPresent(ticket_private, 'endtime')
2807 if expect_renew_till:
2808 if self.strict_checking:
2809 self.assertElementPresent(ticket_private, 'renew-till')
2811 self.assertElementMissing(ticket_private, 'renew-till')
2812 if self.strict_checking:
2813 self.assertElementEqual(ticket_private, 'caddr', [])
2814 if expect_pac is not None:
2815 self.assertElementPresent(ticket_private, 'authorization-data',
2816 expect_empty=not expect_pac)
2818 encpart_session_key = None
2819 if encpart_private is not None:
2820 self.assertElementPresent(encpart_private, 'key')
2821 encpart_key = self.getElementValue(encpart_private, 'key')
2822 self.assertIsNotNone(encpart_key)
2823 if encpart_key is not None: # Never None, but gives indentation
2824 self.assertElementPresent(encpart_key, 'keytype')
2825 self.assertElementPresent(encpart_key, 'keyvalue')
2826 encpart_session_key = self.EncryptionKey_import(encpart_key)
2827 self.assertElementPresent(encpart_private, 'last-req')
2828 self.assertElementEqual(encpart_private, 'nonce',
2829 kdc_exchange_dict['nonce'])
2830 if rep_msg_type == KRB_AS_REP:
2831 if self.strict_checking:
2832 self.assertElementPresent(encpart_private,
2835 self.assertElementMissing(encpart_private,
2837 self.assertElementFlags(encpart_private, 'flags',
2840 self.assertElementPresent(encpart_private, 'authtime')
2841 if self.strict_checking:
2842 self.assertElementPresent(encpart_private, 'starttime')
2843 self.assertElementPresent(encpart_private, 'endtime')
2844 if expect_renew_till:
2845 if self.strict_checking:
2846 self.assertElementPresent(encpart_private, 'renew-till')
2848 self.assertElementMissing(encpart_private, 'renew-till')
2849 self.assertElementEqualUTF8(encpart_private, 'srealm',
2851 self.assertElementEqualPrincipal(encpart_private, 'sname',
2853 if self.strict_checking:
2854 self.assertElementEqual(encpart_private, 'caddr', [])
2856 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2858 if self.strict_checking:
2859 if canonicalize or '1' in sent_pac_options:
2860 self.assertElementPresent(encpart_private,
2861 'encrypted-pa-data')
2862 enc_pa_dict = self.get_pa_dict(
2863 encpart_private['encrypted-pa-data'])
2865 self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2867 expected_supported_etypes = kdc_exchange_dict[
2868 'expected_supported_etypes']
2869 expected_supported_etypes |= (
2870 security.KERB_ENCTYPE_DES_CBC_CRC |
2871 security.KERB_ENCTYPE_DES_CBC_MD5 |
2872 security.KERB_ENCTYPE_RC4_HMAC_MD5)
2874 (supported_etypes,) = struct.unpack(
2876 enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2878 self.assertEqual(supported_etypes,
2879 expected_supported_etypes)
2881 self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2883 if '1' in sent_pac_options:
2884 self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2886 pac_options = self.der_decode(
2887 enc_pa_dict[PADATA_PAC_OPTIONS],
2888 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2890 self.assertElementEqual(pac_options, 'options',
2893 self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2895 self.assertElementEqual(encpart_private,
2896 'encrypted-pa-data',
2899 if ticket_session_key is not None and encpart_session_key is not None:
2900 self.assertEqual(ticket_session_key.etype,
2901 encpart_session_key.etype)
2902 self.assertEqual(ticket_session_key.key.contents,
2903 encpart_session_key.key.contents)
2904 if encpart_session_key is not None:
2905 session_key = encpart_session_key
2907 session_key = ticket_session_key
2908 ticket_creds = KerberosTicketCreds(
2911 crealm=expected_crealm,
2912 cname=expected_cname,
2913 srealm=expected_srealm,
2914 sname=expected_sname,
2915 decryption_key=ticket_decryption_key,
2916 ticket_private=ticket_private,
2917 encpart_private=encpart_private)
2919 if ticket_private is not None:
2920 pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
2921 if expect_pac is True:
2922 self.assertIsNotNone(pac_data)
2923 elif expect_pac is False:
2924 self.assertIsNone(pac_data)
2926 if pac_data is not None:
2927 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2929 expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2930 if expect_ticket_checksum:
2931 self.assertIsNotNone(ticket_decryption_key)
2933 if ticket_decryption_key is not None:
2934 service_ticket = (not self.is_tgs(expected_sname)
2935 and rep_msg_type == KRB_TGS_REP)
2936 self.verify_ticket(ticket_creds, krbtgt_keys,
2937 service_ticket=service_ticket,
2938 expect_pac=expect_pac,
2939 expect_ticket_checksum=expect_ticket_checksum
2940 or self.tkt_sig_support)
2942 kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2944 def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2945 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2947 rep_msg_type = kdc_exchange_dict['rep_msg_type']
2948 armor_tgt = kdc_exchange_dict['armor_tgt']
2950 expected_sname = kdc_exchange_dict['expected_sname']
2951 expect_claims = kdc_exchange_dict['expect_claims']
2953 expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2954 krb5pac.PAC_TYPE_SRV_CHECKSUM,
2955 krb5pac.PAC_TYPE_KDC_CHECKSUM,
2956 krb5pac.PAC_TYPE_LOGON_NAME,
2957 krb5pac.PAC_TYPE_UPN_DNS_INFO]
2959 kdc_options = kdc_exchange_dict['kdc_options']
2960 pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2961 constrained_delegation = (pos < len(kdc_options)
2962 and kdc_options[pos] == '1')
2963 if constrained_delegation:
2964 expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2966 if self.kdc_fast_support:
2968 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2970 if (rep_msg_type == KRB_TGS_REP
2971 and armor_tgt is not None):
2972 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2973 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2975 if not self.is_tgs(expected_sname) and rep_msg_type == KRB_TGS_REP:
2976 expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2978 require_strict = {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO}
2979 if not self.tkt_sig_support:
2980 require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2982 expect_extra_pac_buffers = self.is_tgs(expected_sname)
2984 expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
2986 if expect_pac_attrs:
2987 expect_pac_attrs_pac_request = kdc_exchange_dict[
2988 'expect_pac_attrs_pac_request']
2990 expect_pac_attrs_pac_request = kdc_exchange_dict[
2993 if expect_pac_attrs is None:
2994 if self.expect_extra_pac_buffers:
2995 expect_pac_attrs = expect_extra_pac_buffers
2997 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
2998 if expect_pac_attrs:
2999 expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
3001 expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
3003 if expect_requester_sid is None:
3004 if self.expect_extra_pac_buffers:
3005 expect_requester_sid = expect_extra_pac_buffers
3007 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
3008 if expect_requester_sid:
3009 expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
3011 buffer_types = [pac_buffer.type
3012 for pac_buffer in pac.buffers]
3013 self.assertSequenceElementsEqual(
3014 expected_types, buffer_types,
3015 require_ordered=False,
3016 require_strict=require_strict)
3018 expected_account_name = kdc_exchange_dict['expected_account_name']
3019 expected_sid = kdc_exchange_dict['expected_sid']
3021 expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
3022 if expect_upn_dns_info_ex is None and (
3023 expected_account_name is not None
3024 or expected_sid is not None):
3025 expect_upn_dns_info_ex = True
3027 for pac_buffer in pac.buffers:
3028 if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
3029 expected_proxy_target = kdc_exchange_dict[
3030 'expected_proxy_target']
3031 expected_transited_services = kdc_exchange_dict[
3032 'expected_transited_services']
3034 delegation_info = pac_buffer.info.info
3036 self.assertEqual(expected_proxy_target,
3037 str(delegation_info.proxy_target))
3039 transited_services = list(map(
3040 str, delegation_info.transited_services))
3041 self.assertEqual(expected_transited_services,
3044 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
3045 expected_cname = kdc_exchange_dict['expected_cname']
3046 account_name = expected_cname['name-string'][0]
3048 self.assertEqual(account_name, pac_buffer.info.account_name)
3050 elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
3051 logon_info = pac_buffer.info.info.info3.base
3053 if expected_account_name is not None:
3054 self.assertEqual(expected_account_name,
3055 str(logon_info.account_name))
3057 if expected_sid is not None:
3058 expected_rid = int(expected_sid.rsplit('-', 1)[1])
3059 self.assertEqual(expected_rid, logon_info.rid)
3061 elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
3062 upn_dns_info = pac_buffer.info
3063 upn_dns_info_ex = upn_dns_info.ex
3065 expected_realm = kdc_exchange_dict['expected_crealm']
3066 self.assertEqual(expected_realm,
3067 upn_dns_info.dns_domain_name)
3069 expected_upn_name = kdc_exchange_dict['expected_upn_name']
3070 if expected_upn_name is not None:
3071 self.assertEqual(expected_upn_name,
3072 upn_dns_info.upn_name)
3074 if expect_upn_dns_info_ex:
3075 self.assertIsNotNone(upn_dns_info_ex)
3077 if upn_dns_info_ex is not None:
3078 if expected_account_name is not None:
3079 self.assertEqual(expected_account_name,
3080 upn_dns_info_ex.samaccountname)
3082 if expected_sid is not None:
3083 self.assertEqual(expected_sid,
3084 str(upn_dns_info_ex.objectsid))
3086 elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
3087 and expect_pac_attrs):
3088 attr_info = pac_buffer.info
3090 self.assertEqual(2, attr_info.flags_length)
3092 flags = attr_info.flags
3094 requested_pac = bool(flags & 1)
3095 given_pac = bool(flags & 2)
3097 self.assertEqual(expect_pac_attrs_pac_request is True,
3099 self.assertEqual(expect_pac_attrs_pac_request is None,
3102 elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
3103 and expect_requester_sid):
3104 requester_sid = pac_buffer.info.sid
3106 if expected_sid is not None:
3107 self.assertEqual(expected_sid, str(requester_sid))
3109 def generic_check_kdc_error(self,
3115 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3117 expected_anon = kdc_exchange_dict['expected_anon']
3118 expected_srealm = kdc_exchange_dict['expected_srealm']
3119 expected_sname = kdc_exchange_dict['expected_sname']
3120 expected_error_mode = kdc_exchange_dict['expected_error_mode']
3122 sent_fast = self.sent_fast(kdc_exchange_dict)
3124 fast_armor_type = kdc_exchange_dict['fast_armor_type']
3126 self.assertElementEqual(rep, 'pvno', 5)
3127 self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
3128 error_code = self.getElementValue(rep, 'error-code')
3129 self.assertIn(error_code, expected_error_mode)
3130 if self.strict_checking:
3131 self.assertElementMissing(rep, 'ctime')
3132 self.assertElementMissing(rep, 'cusec')
3133 self.assertElementPresent(rep, 'stime')
3134 self.assertElementPresent(rep, 'susec')
3135 # error-code checked above
3136 if self.strict_checking:
3137 self.assertElementMissing(rep, 'crealm')
3138 if expected_anon and not inner:
3139 expected_cname = self.PrincipalName_create(
3140 name_type=NT_WELLKNOWN,
3141 names=['WELLKNOWN', 'ANONYMOUS'])
3142 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
3144 self.assertElementMissing(rep, 'cname')
3145 self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
3146 self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
3147 self.assertElementMissing(rep, 'e-text')
3148 expected_status = kdc_exchange_dict['expected_status']
3149 expect_edata = kdc_exchange_dict['expect_edata']
3150 if expect_edata is None:
3151 expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
3152 and (not sent_fast or fast_armor_type is None
3153 or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
3155 if not expect_edata:
3156 self.assertIsNone(expected_status)
3157 self.assertElementMissing(rep, 'e-data')
3159 edata = self.getElementValue(rep, 'e-data')
3160 if self.strict_checking:
3161 self.assertIsNotNone(edata)
3162 if edata is not None:
3163 if rep_msg_type == KRB_TGS_REP and not sent_fast:
3164 error_data = self.der_decode(
3166 asn1Spec=krb5_asn1.KERB_ERROR_DATA())
3167 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
3168 error_data['data-type'])
3170 extended_error = error_data['data-value']
3172 self.assertEqual(12, len(extended_error))
3174 status = int.from_bytes(extended_error[:4], 'little')
3175 flags = int.from_bytes(extended_error[8:], 'little')
3177 self.assertEqual(expected_status, status)
3179 self.assertEqual(3, flags)
3181 self.assertIsNone(expected_status)
3183 rep_padata = self.der_decode(edata,
3184 asn1Spec=krb5_asn1.METHOD_DATA())
3185 self.assertGreater(len(rep_padata), 0)
3188 self.assertEqual(1, len(rep_padata))
3189 rep_pa_dict = self.get_pa_dict(rep_padata)
3190 self.assertIn(PADATA_FX_FAST, rep_pa_dict)
3192 armor_key = kdc_exchange_dict['armor_key']
3193 self.assertIsNotNone(armor_key)
3194 fast_response = self.check_fx_fast_data(
3196 rep_pa_dict[PADATA_FX_FAST],
3198 expect_strengthen_key=False)
3200 rep_padata = fast_response['padata']
3202 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
3207 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
3211 def check_rep_padata(self,
3216 rep_msg_type = kdc_exchange_dict['rep_msg_type']
3218 req_body = kdc_exchange_dict['req_body']
3219 proposed_etypes = req_body['etype']
3220 client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
3222 sent_fast = self.sent_fast(kdc_exchange_dict)
3223 sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
3225 if rep_msg_type == KRB_TGS_REP:
3226 self.assertTrue(sent_fast)
3228 expect_etype_info2 = ()
3229 expect_etype_info = False
3230 expected_aes_type = 0
3231 expected_rc4_type = 0
3232 if kcrypto.Enctype.RC4 in proposed_etypes:
3233 expect_etype_info = True
3234 for etype in proposed_etypes:
3235 if etype not in client_as_etypes:
3237 if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
3238 expect_etype_info = False
3239 if etype > expected_aes_type:
3240 expected_aes_type = etype
3241 if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
3242 if etype > expected_rc4_type:
3243 expected_rc4_type = etype
3245 if expected_aes_type != 0:
3246 expect_etype_info2 += (expected_aes_type,)
3247 if expected_rc4_type != 0:
3248 expect_etype_info2 += (expected_rc4_type,)
3250 expected_patypes = ()
3251 if sent_fast and error_code != 0:
3252 expected_patypes += (PADATA_FX_ERROR,)
3253 expected_patypes += (PADATA_FX_COOKIE,)
3255 if rep_msg_type == KRB_TGS_REP:
3256 sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
3257 if ('1' in sent_pac_options
3258 and error_code not in (0, KDC_ERR_GENERIC)):
3259 expected_patypes += (PADATA_PAC_OPTIONS,)
3260 elif error_code != KDC_ERR_GENERIC:
3261 if expect_etype_info:
3262 self.assertGreater(len(expect_etype_info2), 0)
3263 expected_patypes += (PADATA_ETYPE_INFO,)
3264 if len(expect_etype_info2) != 0:
3265 expected_patypes += (PADATA_ETYPE_INFO2,)
3267 if error_code != KDC_ERR_PREAUTH_FAILED:
3269 expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
3271 expected_patypes += (PADATA_ENC_TIMESTAMP,)
3273 if not sent_enc_challenge:
3274 expected_patypes += (PADATA_PK_AS_REQ,)
3275 expected_patypes += (PADATA_PK_AS_REP_19,)
3277 if (self.kdc_fast_support
3279 and not sent_enc_challenge):
3280 expected_patypes += (PADATA_FX_FAST,)
3281 expected_patypes += (PADATA_FX_COOKIE,)
3283 got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
3284 self.assertSequenceElementsEqual(expected_patypes, got_patypes,
3285 require_strict={PADATA_FX_COOKIE,
3288 PADATA_PK_AS_REP_19,
3291 if not expected_patypes:
3294 pa_dict = self.get_pa_dict(rep_padata)
3296 enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
3297 if enc_timestamp is not None:
3298 self.assertEqual(len(enc_timestamp), 0)
3300 pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
3301 if pk_as_req is not None:
3302 self.assertEqual(len(pk_as_req), 0)
3304 pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
3305 if pk_as_rep19 is not None:
3306 self.assertEqual(len(pk_as_rep19), 0)
3308 fx_fast = pa_dict.get(PADATA_FX_FAST)
3309 if fx_fast is not None:
3310 self.assertEqual(len(fx_fast), 0)
3312 fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
3313 if fast_cookie is not None:
3314 kdc_exchange_dict['fast_cookie'] = fast_cookie
3316 fast_error = pa_dict.get(PADATA_FX_ERROR)
3317 if fast_error is not None:
3318 fast_error = self.der_decode(fast_error,
3319 asn1Spec=krb5_asn1.KRB_ERROR())
3320 self.generic_check_kdc_error(kdc_exchange_dict,
3325 pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
3326 if pac_options is not None:
3327 pac_options = self.der_decode(
3329 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3330 self.assertElementEqual(pac_options, 'options', sent_pac_options)
3332 enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
3333 if enc_challenge is not None:
3334 if not sent_enc_challenge:
3335 self.assertEqual(len(enc_challenge), 0)
3337 armor_key = kdc_exchange_dict['armor_key']
3338 self.assertIsNotNone(armor_key)
3340 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
3342 kdc_challenge_key = self.generate_kdc_challenge_key(
3343 armor_key, preauth_key)
3345 # Ensure that the encrypted challenge FAST factor is supported
3347 if self.strict_checking:
3348 self.assertNotEqual(len(enc_challenge), 0)
3349 if len(enc_challenge) != 0:
3350 encrypted_challenge = self.der_decode(
3352 asn1Spec=krb5_asn1.EncryptedData())
3353 self.assertEqual(encrypted_challenge['etype'],
3354 kdc_challenge_key.etype)
3356 challenge = kdc_challenge_key.decrypt(
3357 KU_ENC_CHALLENGE_KDC,
3358 encrypted_challenge['cipher'])
3359 challenge = self.der_decode(
3361 asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
3363 # Retrieve the returned timestamp.
3364 rep_patime = challenge['patimestamp']
3365 self.assertIn('pausec', challenge)
3367 # Ensure the returned time is within five minutes of the
3369 rep_time = self.get_EpochFromKerberosTime(rep_patime)
3370 current_time = time.time()
3372 self.assertLess(current_time - 300, rep_time)
3373 self.assertLess(rep_time, current_time + 300)
3375 etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
3376 if etype_info2 is not None:
3377 etype_info2 = self.der_decode(etype_info2,
3378 asn1Spec=krb5_asn1.ETYPE_INFO2())
3379 self.assertGreaterEqual(len(etype_info2), 1)
3380 if self.strict_checking:
3381 self.assertEqual(len(etype_info2), len(expect_etype_info2))
3382 for i in range(0, len(etype_info2)):
3383 e = self.getElementValue(etype_info2[i], 'etype')
3384 if self.strict_checking:
3385 self.assertEqual(e, expect_etype_info2[i])
3386 salt = self.getElementValue(etype_info2[i], 'salt')
3387 if e == kcrypto.Enctype.RC4:
3388 if self.strict_checking:
3389 self.assertIsNone(salt)
3391 self.assertIsNotNone(salt)
3392 expected_salt = kdc_exchange_dict['expected_salt']
3393 if expected_salt is not None:
3394 self.assertEqual(salt, expected_salt)
3395 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
3396 if self.strict_checking:
3397 self.assertIsNone(s2kparams)
3399 etype_info = pa_dict.get(PADATA_ETYPE_INFO)
3400 if etype_info is not None:
3401 etype_info = self.der_decode(etype_info,
3402 asn1Spec=krb5_asn1.ETYPE_INFO())
3403 self.assertEqual(len(etype_info), 1)
3404 e = self.getElementValue(etype_info[0], 'etype')
3405 self.assertEqual(e, kcrypto.Enctype.RC4)
3406 self.assertEqual(e, expect_etype_info2[0])
3407 salt = self.getElementValue(etype_info[0], 'salt')
3408 if self.strict_checking:
3409 self.assertIsNotNone(salt)
3410 self.assertEqual(len(salt), 0)
3414 def generate_simple_fast(self,
3422 armor_key = kdc_exchange_dict['armor_key']
3424 fast_req = self.KRB_FAST_REQ_create(fast_options,
3427 fast_req = self.der_encode(fast_req,
3428 asn1Spec=krb5_asn1.KrbFastReq())
3429 fast_req = self.EncryptedData_create(armor_key,
3433 fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
3437 fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
3438 fx_fast_request = self.der_encode(
3440 asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
3442 fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
3447 def generate_ap_req(self,
3454 req_body_checksum = None
3457 self.assertIsNone(req_body)
3459 tgt = kdc_exchange_dict['armor_tgt']
3460 authenticator_subkey = kdc_exchange_dict['armor_subkey']
3462 tgt = kdc_exchange_dict['tgt']
3463 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3465 if req_body is not None:
3466 body_checksum_type = kdc_exchange_dict['body_checksum_type']
3468 req_body_blob = self.der_encode(
3469 req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
3471 req_body_checksum = self.Checksum_create(
3473 KU_TGS_REQ_AUTH_CKSUM,
3475 ctype=body_checksum_type)
3477 auth_data = kdc_exchange_dict['auth_data']
3480 if authenticator_subkey is not None:
3481 subkey_obj = authenticator_subkey.export_obj()
3482 if seq_number is None:
3483 seq_number = random.randint(0, 0xfffffffe)
3484 (ctime, cusec) = self.get_KerberosTimeWithUsec()
3485 authenticator_obj = self.Authenticator_create(
3488 cksum=req_body_checksum,
3492 seq_number=seq_number,
3493 authorization_data=auth_data)
3494 authenticator_blob = self.der_encode(
3496 asn1Spec=krb5_asn1.Authenticator())
3499 usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
3500 authenticator = self.EncryptedData_create(tgt.session_key,
3504 ap_options = krb5_asn1.APOptions('0')
3505 ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
3507 authenticator=authenticator)
3508 ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
3512 def generate_simple_tgs_padata(self,
3516 ap_req = self.generate_ap_req(kdc_exchange_dict,
3520 pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
3521 padata = [pa_tgs_req]
3523 return padata, req_body
3525 def get_preauth_key(self, kdc_exchange_dict):
3526 msg_type = kdc_exchange_dict['rep_msg_type']
3528 if msg_type == KRB_AS_REP:
3529 key = kdc_exchange_dict['preauth_key']
3530 usage = KU_AS_REP_ENC_PART
3532 authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3533 if authenticator_subkey is not None:
3534 key = authenticator_subkey
3535 usage = KU_TGS_REP_ENC_PART_SUB_KEY
3537 tgt = kdc_exchange_dict['tgt']
3538 key = tgt.session_key
3539 usage = KU_TGS_REP_ENC_PART_SESSION
3541 self.assertIsNotNone(key)
3545 def generate_armor_key(self, subkey, session_key):
3546 armor_key = kcrypto.cf2(subkey.key,
3550 armor_key = Krb5EncryptionKey(armor_key, None)
3554 def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3555 strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3559 strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3562 return strengthen_reply_key
3564 def generate_client_challenge_key(self, armor_key, longterm_key):
3565 client_challenge_key = kcrypto.cf2(armor_key.key,
3567 b'clientchallengearmor',
3568 b'challengelongterm')
3569 client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3571 return client_challenge_key
3573 def generate_kdc_challenge_key(self, armor_key, longterm_key):
3574 kdc_challenge_key = kcrypto.cf2(armor_key.key,
3576 b'kdcchallengearmor',
3577 b'challengelongterm')
3578 kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3580 return kdc_challenge_key
3582 def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3583 expected_type = expected_checksum['cksumtype']
3584 self.assertEqual(armor_key.ctype, expected_type)
3586 ticket_blob = self.der_encode(ticket,
3587 asn1Spec=krb5_asn1.Ticket())
3588 checksum = self.Checksum_create(armor_key,
3591 self.assertEqual(expected_checksum, checksum)
3593 def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
3595 expect_ticket_checksum=True):
3596 # Decrypt the ticket.
3598 key = ticket.decryption_key
3599 enc_part = ticket.ticket['enc-part']
3601 self.assertElementEqual(enc_part, 'etype', key.etype)
3602 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3604 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3605 enc_part = self.der_decode(
3606 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3608 # Fetch the authorization data from the ticket.
3609 auth_data = enc_part.get('authorization-data')
3611 self.assertIsNotNone(auth_data)
3612 elif auth_data is None:
3615 # Get a copy of the authdata with an empty PAC, and the existing PAC
3617 empty_pac = self.get_empty_pac()
3618 auth_data, pac_data = self.replace_pac(auth_data,
3620 expect_pac=expect_pac)
3624 # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3625 # raw type to create a new PAC with zeroed signatures for
3626 # verification. This is because on Windows, the resource_groups field
3627 # is added to PAC_LOGON_INFO after the info3 field has been created,
3628 # which results in a different ordering of pointer values than Samba
3629 # (see commit 0e201ecdc53). Using the raw type avoids changing
3630 # PAC_LOGON_INFO, so verification against Windows can work. We still
3631 # need the PAC_DATA type to retrieve the actual checksums, because the
3632 # signatures in the raw type may contain padding bytes.
3633 pac = ndr_unpack(krb5pac.PAC_DATA,
3635 raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3640 for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3641 buffer_type = pac_buffer.type
3642 if buffer_type in self.pac_checksum_types:
3643 self.assertNotIn(buffer_type, checksums,
3644 f'Duplicate checksum type {buffer_type}')
3646 # Fetch the checksum and the checksum type from the PAC buffer.
3647 checksum = pac_buffer.info.signature
3648 ctype = pac_buffer.info.type
3652 checksums[buffer_type] = checksum, ctype
3654 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3655 # Zero the checksum field so that we can later verify the
3656 # checksums. The ticket checksum field is not zeroed.
3658 signature = ndr_unpack(
3659 krb5pac.PAC_SIGNATURE_DATA,
3660 raw_pac_buffer.info.remaining)
3661 signature.signature = bytes(len(checksum))
3662 raw_pac_buffer.info.remaining = ndr_pack(
3665 # Re-encode the PAC.
3666 pac_data = ndr_pack(raw_pac)
3668 # Verify the signatures.
3670 server_checksum, server_ctype = checksums[
3671 krb5pac.PAC_TYPE_SRV_CHECKSUM]
3672 key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3677 kdc_checksum, kdc_ctype = checksums[
3678 krb5pac.PAC_TYPE_KDC_CHECKSUM]
3680 if isinstance(krbtgt_keys, collections.abc.Container):
3681 if self.strict_checking:
3682 krbtgt_key = krbtgt_keys[0]
3684 krbtgt_key = next(key for key in krbtgt_keys
3685 if key.ctype == kdc_ctype)
3687 krbtgt_key = krbtgt_keys
3689 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3694 if not service_ticket:
3695 self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3697 ticket_checksum, ticket_ctype = checksums.get(
3698 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3700 if expect_ticket_checksum:
3701 self.assertIsNotNone(ticket_checksum)
3702 elif expect_ticket_checksum is False:
3703 self.assertIsNone(ticket_checksum)
3704 if ticket_checksum is not None:
3705 enc_part['authorization-data'] = auth_data
3706 enc_part = self.der_encode(enc_part,
3707 asn1Spec=krb5_asn1.EncTicketPart())
3709 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3714 def modified_ticket(self,
3716 new_ticket_key=None,
3720 allow_empty_authdata=False,
3721 update_pac_checksums=True,
3723 include_checksums=None):
3724 if checksum_keys is None:
3725 # A dict containing a key for each checksum type to be created in
3729 if include_checksums is None:
3730 # A dict containing a value for each checksum type; True if the
3731 # checksum type is to be included in the PAC, False if it is to be
3732 # excluded, or None/not present if the checksum is to be included
3733 # based on its presence in the original PAC.
3734 include_checksums = {}
3736 # Check that the values passed in by the caller make sense.
3738 self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3739 self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3742 self.assertIsNone(modify_pac_fn)
3744 update_pac_checksums = False
3746 if not update_pac_checksums:
3747 self.assertFalse(checksum_keys)
3748 self.assertFalse(include_checksums)
3750 expect_pac = modify_pac_fn is not None
3752 key = ticket.decryption_key
3754 if new_ticket_key is None:
3755 # Use the same key to re-encrypt the ticket.
3756 new_ticket_key = key
3758 if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3759 # If the server signature key is not present, fall back to the key
3760 # used to encrypt the ticket.
3761 checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3763 if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3764 # If the ticket signature key is not present, fall back to the key
3765 # used for the KDC signature.
3766 kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3767 if kdc_checksum_key is not None:
3768 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3771 # Decrypt the ticket.
3773 enc_part = ticket.ticket['enc-part']
3775 self.assertElementEqual(enc_part, 'etype', key.etype)
3776 self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3778 enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3779 enc_part = self.der_decode(
3780 enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3782 # Modify the ticket here.
3783 if modify_fn is not None:
3784 enc_part = modify_fn(enc_part)
3786 auth_data = enc_part.get('authorization-data')
3788 self.assertIsNotNone(auth_data)
3789 if auth_data is not None:
3792 # Get a copy of the authdata with an empty PAC, and the
3793 # existing PAC (if present).
3794 empty_pac = self.get_empty_pac()
3795 empty_pac_auth_data, pac_data = self.replace_pac(
3798 expect_pac=expect_pac)
3800 if pac_data is not None:
3801 pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3803 # Modify the PAC here.
3804 if modify_pac_fn is not None:
3805 pac = modify_pac_fn(pac)
3807 if update_pac_checksums:
3808 # Get the enc-part with an empty PAC, which is needed
3809 # to create a ticket signature.
3810 enc_part_to_sign = enc_part.copy()
3811 enc_part_to_sign['authorization-data'] = (
3812 empty_pac_auth_data)
3813 enc_part_to_sign = self.der_encode(
3815 asn1Spec=krb5_asn1.EncTicketPart())
3817 self.update_pac_checksums(pac,
3822 # Re-encode the PAC.
3823 pac_data = ndr_pack(pac)
3824 new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3827 # Replace the PAC in the authorization data and re-add it to the
3829 auth_data, _ = self.replace_pac(
3831 expect_pac=expect_pac,
3832 allow_empty_authdata=allow_empty_authdata)
3833 enc_part['authorization-data'] = auth_data
3835 # Re-encrypt the ticket enc-part with the new key.
3836 enc_part_new = self.der_encode(enc_part,
3837 asn1Spec=krb5_asn1.EncTicketPart())
3838 enc_part_new = self.EncryptedData_create(new_ticket_key,
3842 # Create a copy of the ticket with the new enc-part.
3843 new_ticket = ticket.ticket.copy()
3844 new_ticket['enc-part'] = enc_part_new
3846 new_ticket_creds = KerberosTicketCreds(
3848 session_key=ticket.session_key,
3849 crealm=ticket.crealm,
3851 srealm=ticket.srealm,
3853 decryption_key=new_ticket_key,
3854 ticket_private=enc_part,
3855 encpart_private=ticket.encpart_private)
3857 return new_ticket_creds
3859 def update_pac_checksums(self,
3864 pac_buffers = pac.buffers
3865 checksum_buffers = {}
3867 # Find the relevant PAC checksum buffers.
3868 for pac_buffer in pac_buffers:
3869 buffer_type = pac_buffer.type
3870 if buffer_type in self.pac_checksum_types:
3871 self.assertNotIn(buffer_type, checksum_buffers,
3872 f'Duplicate checksum type {buffer_type}')
3874 checksum_buffers[buffer_type] = pac_buffer
3876 # Create any additional buffers that were requested but not
3877 # present. Conversely, remove any buffers that were requested to be
3879 for buffer_type in self.pac_checksum_types:
3880 if buffer_type in checksum_buffers:
3881 if include_checksums.get(buffer_type) is False:
3882 checksum_buffer = checksum_buffers.pop(buffer_type)
3884 pac.num_buffers -= 1
3885 pac_buffers.remove(checksum_buffer)
3887 elif include_checksums.get(buffer_type) is True:
3888 info = krb5pac.PAC_SIGNATURE_DATA()
3890 checksum_buffer = krb5pac.PAC_BUFFER()
3891 checksum_buffer.type = buffer_type
3892 checksum_buffer.info = info
3894 pac_buffers.append(checksum_buffer)
3895 pac.num_buffers += 1
3897 checksum_buffers[buffer_type] = checksum_buffer
3899 # Fill the relevant checksum buffers.
3900 for buffer_type, checksum_buffer in checksum_buffers.items():
3901 checksum_key = checksum_keys[buffer_type]
3902 ctype = checksum_key.ctype & ((1 << 32) - 1)
3904 if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3905 self.assertIsNotNone(enc_part)
3907 signature = checksum_key.make_rodc_checksum(
3908 KU_NON_KERB_CKSUM_SALT,
3911 elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3912 signature = checksum_key.make_zeroed_checksum()
3915 signature = checksum_key.make_rodc_zeroed_checksum()
3917 checksum_buffer.info.signature = signature
3918 checksum_buffer.info.type = ctype
3920 # Add the new checksum buffers to the PAC.
3921 pac.buffers = pac_buffers
3923 # Calculate the server and KDC checksums and insert them into the PAC.
3925 server_checksum_buffer = checksum_buffers.get(
3926 krb5pac.PAC_TYPE_SRV_CHECKSUM)
3927 if server_checksum_buffer is not None:
3928 server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3930 pac_data = ndr_pack(pac)
3931 server_checksum = server_checksum_key.make_checksum(
3932 KU_NON_KERB_CKSUM_SALT,
3935 server_checksum_buffer.info.signature = server_checksum
3937 kdc_checksum_buffer = checksum_buffers.get(
3938 krb5pac.PAC_TYPE_KDC_CHECKSUM)
3939 if kdc_checksum_buffer is not None:
3940 if server_checksum_buffer is None:
3941 # There's no server signature to make the checksum over, so
3942 # just make the checksum over an empty bytes object.
3943 server_checksum = bytes()
3945 kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3947 kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3948 KU_NON_KERB_CKSUM_SALT,
3951 kdc_checksum_buffer.info.signature = kdc_checksum
3953 def replace_pac(self, auth_data, new_pac, expect_pac=True,
3954 allow_empty_authdata=False):
3955 if new_pac is not None:
3956 self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3957 self.assertElementPresent(new_pac, 'ad-data')
3964 for authdata_elem in auth_data:
3965 if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3966 ad_relevant = self.der_decode(
3967 authdata_elem['ad-data'],
3968 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3971 for relevant_elem in ad_relevant:
3972 if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3973 self.assertIsNone(old_pac, 'Multiple PACs detected')
3974 old_pac = relevant_elem['ad-data']
3976 if new_pac is not None:
3977 relevant_elems.append(new_pac)
3979 relevant_elems.append(relevant_elem)
3981 self.assertIsNotNone(old_pac, 'Expected PAC')
3983 if relevant_elems or allow_empty_authdata:
3984 ad_relevant = self.der_encode(
3986 asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3988 authdata_elem = self.AuthorizationData_create(
3992 authdata_elem = None
3994 if authdata_elem is not None or allow_empty_authdata:
3995 new_auth_data.append(authdata_elem)
3998 self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
4000 return new_auth_data, old_pac
4002 def get_pac(self, auth_data, expect_pac=True):
4003 _, pac = self.replace_pac(auth_data, None, expect_pac)
4006 def get_ticket_pac(self, ticket, expect_pac=True):
4007 auth_data = ticket.ticket_private.get('authorization-data')
4009 self.assertIsNotNone(auth_data)
4010 elif auth_data is None:
4013 return self.get_pac(auth_data, expect_pac=expect_pac)
4015 def get_krbtgt_checksum_key(self):
4016 krbtgt_creds = self.get_krbtgt_creds()
4017 krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
4020 krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
4023 def is_tgs(self, principal):
4024 name = principal['name-string'][0]
4025 return name in ('krbtgt', b'krbtgt')
4027 def is_tgt(self, ticket):
4028 sname = ticket.ticket['sname']
4029 return self.is_tgs(sname)
4031 def get_empty_pac(self):
4032 return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
4034 def get_outer_pa_dict(self, kdc_exchange_dict):
4035 return self.get_pa_dict(kdc_exchange_dict['req_padata'])
4037 def get_fast_pa_dict(self, kdc_exchange_dict):
4038 req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
4043 return self.get_outer_pa_dict(kdc_exchange_dict)
4045 def sent_fast(self, kdc_exchange_dict):
4046 outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
4048 return PADATA_FX_FAST in outer_pa_dict
4050 def sent_enc_challenge(self, kdc_exchange_dict):
4051 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4053 return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
4055 def get_sent_pac_options(self, kdc_exchange_dict):
4056 fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4058 if PADATA_PAC_OPTIONS not in fast_pa_dict:
4061 pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
4062 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
4063 pac_options = pac_options['options']
4065 # Mask out unsupported bits.
4066 pac_options, remaining = pac_options[:4], pac_options[4:]
4067 pac_options += '0' * len(remaining)
4071 def get_krbtgt_sname(self):
4072 krbtgt_creds = self.get_krbtgt_creds()
4073 krbtgt_username = krbtgt_creds.get_username()
4074 krbtgt_realm = krbtgt_creds.get_realm()
4075 krbtgt_sname = self.PrincipalName_create(
4076 name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
4080 def _test_as_exchange(self,
4086 expected_error_mode,
4095 expected_account_name=None,
4096 expected_upn_name=None,
4098 expected_flags=None,
4099 unexpected_flags=None,
4100 expected_supported_etypes=None,
4102 ticket_decryption_key=None,
4106 expect_pac_attrs=None,
4107 expect_pac_attrs_pac_request=None,
4108 expect_requester_sid=None,
4111 def _generate_padata_copy(_kdc_exchange_dict,
4114 return padata, req_body
4116 if not expected_error_mode:
4117 check_error_fn = None
4118 check_rep_fn = self.generic_check_kdc_rep
4120 check_error_fn = self.generic_check_kdc_error
4123 if padata is not None:
4124 generate_padata_fn = _generate_padata_copy
4126 generate_padata_fn = None
4128 kdc_exchange_dict = self.as_exchange_dict(
4129 expected_crealm=expected_crealm,
4130 expected_cname=expected_cname,
4131 expected_srealm=expected_srealm,
4132 expected_sname=expected_sname,
4133 expected_account_name=expected_account_name,
4134 expected_upn_name=expected_upn_name,
4135 expected_sid=expected_sid,
4136 expected_supported_etypes=expected_supported_etypes,
4137 ticket_decryption_key=ticket_decryption_key,
4138 generate_padata_fn=generate_padata_fn,
4139 check_error_fn=check_error_fn,
4140 check_rep_fn=check_rep_fn,
4141 check_kdc_private_fn=self.generic_check_kdc_private,
4142 expected_error_mode=expected_error_mode,
4143 client_as_etypes=client_as_etypes,
4144 expected_salt=expected_salt,
4145 expected_flags=expected_flags,
4146 unexpected_flags=unexpected_flags,
4147 preauth_key=preauth_key,
4148 kdc_options=str(kdc_options),
4149 pac_request=pac_request,
4150 pac_options=pac_options,
4151 expect_pac=expect_pac,
4152 expect_pac_attrs=expect_pac_attrs,
4153 expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
4154 expect_requester_sid=expect_requester_sid,
4157 rep = self._generic_kdc_exchange(kdc_exchange_dict,
4164 return rep, kdc_exchange_dict