CVE-2022-32744 tests/krb5: Correctly calculate salt for pre-existing accounts
[samba.git] / python / samba / tests / krb5 / raw_testcase.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
28
29 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
30 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
31 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
32 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
33
34 from pyasn1.codec.ber.encoder import BitStringEncoder
35
36 from samba.credentials import Credentials
37 from samba.dcerpc import krb5pac, security
38 from samba.gensec import FEATURE_SEAL
39 from samba.ndr import ndr_pack, ndr_unpack
40
41 import samba.tests
42 from samba.tests import TestCaseInTempDir
43
44 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
45 from samba.tests.krb5.rfc4120_constants import (
46     AD_IF_RELEVANT,
47     AD_WIN2K_PAC,
48     FX_FAST_ARMOR_AP_REQUEST,
49     KDC_ERR_GENERIC,
50     KDC_ERR_PREAUTH_FAILED,
51     KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
52     KERB_ERR_TYPE_EXTENDED,
53     KRB_AP_REQ,
54     KRB_AS_REP,
55     KRB_AS_REQ,
56     KRB_ERROR,
57     KRB_TGS_REP,
58     KRB_TGS_REQ,
59     KU_AP_REQ_AUTH,
60     KU_AS_REP_ENC_PART,
61     KU_ENC_CHALLENGE_KDC,
62     KU_FAST_ENC,
63     KU_FAST_FINISHED,
64     KU_FAST_REP,
65     KU_FAST_REQ_CHKSUM,
66     KU_NON_KERB_CKSUM_SALT,
67     KU_TGS_REP_ENC_PART_SESSION,
68     KU_TGS_REP_ENC_PART_SUB_KEY,
69     KU_TGS_REQ_AUTH,
70     KU_TGS_REQ_AUTH_CKSUM,
71     KU_TGS_REQ_AUTH_DAT_SESSION,
72     KU_TGS_REQ_AUTH_DAT_SUBKEY,
73     KU_TICKET,
74     NT_SRV_INST,
75     NT_WELLKNOWN,
76     PADATA_ENCRYPTED_CHALLENGE,
77     PADATA_ENC_TIMESTAMP,
78     PADATA_ETYPE_INFO,
79     PADATA_ETYPE_INFO2,
80     PADATA_FOR_USER,
81     PADATA_FX_COOKIE,
82     PADATA_FX_ERROR,
83     PADATA_FX_FAST,
84     PADATA_KDC_REQ,
85     PADATA_PAC_OPTIONS,
86     PADATA_PAC_REQUEST,
87     PADATA_PK_AS_REQ,
88     PADATA_PK_AS_REP_19,
89     PADATA_SUPPORTED_ETYPES
90 )
91 import samba.tests.krb5.kcrypto as kcrypto
92
93
94 def BitStringEncoder_encodeValue32(
95         self, value, asn1Spec, encodeFun, **options):
96     #
97     # BitStrings like KDCOptions or TicketFlags should at least
98     # be 32-Bit on the wire
99     #
100     if asn1Spec is not None:
101         # TODO: try to avoid ASN.1 schema instantiation
102         value = asn1Spec.clone(value)
103
104     valueLength = len(value)
105     if valueLength % 8:
106         alignedValue = value << (8 - valueLength % 8)
107     else:
108         alignedValue = value
109
110     substrate = alignedValue.asOctets()
111     length = len(substrate)
112     # We need at least 32-Bit / 4-Bytes
113     if length < 4:
114         padding = 4 - length
115     else:
116         padding = 0
117     ret = b'\x00' + substrate + (b'\x00' * padding)
118     return ret, False, True
119
120
121 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
122
123
124 def BitString_NamedValues_prettyPrint(self, scope=0):
125     ret = "%s" % self.asBinary()
126     bits = []
127     highest_bit = 32
128     for byte in self.asNumbers():
129         for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
130             mask = 1 << bit
131             if byte & mask:
132                 val = 1
133             else:
134                 val = 0
135             bits.append(val)
136     if len(bits) < highest_bit:
137         for bitPosition in range(len(bits), highest_bit):
138             bits.append(0)
139     indent = " " * scope
140     delim = ": (\n%s " % indent
141     for bitPosition in range(highest_bit):
142         if bitPosition in self.prettyPrintNamedValues:
143             name = self.prettyPrintNamedValues[bitPosition]
144         elif bits[bitPosition] != 0:
145             name = "unknown-bit-%u" % bitPosition
146         else:
147             continue
148         ret += "%s%s:%u" % (delim, name, bits[bitPosition])
149         delim = ",\n%s " % indent
150     ret += "\n%s)" % indent
151     return ret
152
153
154 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
155     krb5_asn1.TicketFlagsValues.namedValues
156 krb5_asn1.TicketFlags.namedValues =\
157     krb5_asn1.TicketFlagsValues.namedValues
158 krb5_asn1.TicketFlags.prettyPrint =\
159     BitString_NamedValues_prettyPrint
160 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
161     krb5_asn1.KDCOptionsValues.namedValues
162 krb5_asn1.KDCOptions.namedValues =\
163     krb5_asn1.KDCOptionsValues.namedValues
164 krb5_asn1.KDCOptions.prettyPrint =\
165     BitString_NamedValues_prettyPrint
166 krb5_asn1.APOptions.prettyPrintNamedValues =\
167     krb5_asn1.APOptionsValues.namedValues
168 krb5_asn1.APOptions.namedValues =\
169     krb5_asn1.APOptionsValues.namedValues
170 krb5_asn1.APOptions.prettyPrint =\
171     BitString_NamedValues_prettyPrint
172 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
173     krb5_asn1.PACOptionFlagsValues.namedValues
174 krb5_asn1.PACOptionFlags.namedValues =\
175     krb5_asn1.PACOptionFlagsValues.namedValues
176 krb5_asn1.PACOptionFlags.prettyPrint =\
177     BitString_NamedValues_prettyPrint
178
179
180 def Integer_NamedValues_prettyPrint(self, scope=0):
181     intval = int(self)
182     if intval in self.prettyPrintNamedValues:
183         name = self.prettyPrintNamedValues[intval]
184     else:
185         name = "<__unknown__>"
186     ret = "%d (0x%x) %s" % (intval, intval, name)
187     return ret
188
189
190 krb5_asn1.NameType.prettyPrintNamedValues =\
191     krb5_asn1.NameTypeValues.namedValues
192 krb5_asn1.NameType.prettyPrint =\
193     Integer_NamedValues_prettyPrint
194 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
195     krb5_asn1.AuthDataTypeValues.namedValues
196 krb5_asn1.AuthDataType.prettyPrint =\
197     Integer_NamedValues_prettyPrint
198 krb5_asn1.PADataType.prettyPrintNamedValues =\
199     krb5_asn1.PADataTypeValues.namedValues
200 krb5_asn1.PADataType.prettyPrint =\
201     Integer_NamedValues_prettyPrint
202 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
203     krb5_asn1.EncryptionTypeValues.namedValues
204 krb5_asn1.EncryptionType.prettyPrint =\
205     Integer_NamedValues_prettyPrint
206 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
207     krb5_asn1.ChecksumTypeValues.namedValues
208 krb5_asn1.ChecksumType.prettyPrint =\
209     Integer_NamedValues_prettyPrint
210 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
211     krb5_asn1.KerbErrorDataTypeValues.namedValues
212 krb5_asn1.KerbErrorDataType.prettyPrint =\
213     Integer_NamedValues_prettyPrint
214
215
216 class Krb5EncryptionKey:
217     def __init__(self, key, kvno):
218         EncTypeChecksum = {
219             kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
220             kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
221             kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
222         }
223         self.key = key
224         self.etype = key.enctype
225         self.ctype = EncTypeChecksum[self.etype]
226         self.kvno = kvno
227
228     def encrypt(self, usage, plaintext):
229         ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
230         return ciphertext
231
232     def decrypt(self, usage, ciphertext):
233         plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
234         return plaintext
235
236     def make_zeroed_checksum(self, ctype=None):
237         if ctype is None:
238             ctype = self.ctype
239
240         checksum_len = kcrypto.checksum_len(ctype)
241         return bytes(checksum_len)
242
243     def make_checksum(self, usage, plaintext, ctype=None):
244         if ctype is None:
245             ctype = self.ctype
246         cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
247         return cksum
248
249     def verify_checksum(self, usage, plaintext, ctype, cksum):
250         if self.ctype != ctype:
251             raise AssertionError(f'key checksum type ({self.ctype}) != '
252                                  f'checksum type ({ctype})')
253
254         kcrypto.verify_checksum(ctype,
255                                 self.key,
256                                 usage,
257                                 plaintext,
258                                 cksum)
259
260     def export_obj(self):
261         EncryptionKey_obj = {
262             'keytype': self.etype,
263             'keyvalue': self.key.contents,
264         }
265         return EncryptionKey_obj
266
267
268 class RodcPacEncryptionKey(Krb5EncryptionKey):
269     def __init__(self, key, kvno, rodc_id=None):
270         super().__init__(key, kvno)
271
272         if rodc_id is None:
273             kvno = self.kvno
274             if kvno is not None:
275                 kvno >>= 16
276                 kvno &= (1 << 16) - 1
277
278             rodc_id = kvno or None
279
280         if rodc_id is not None:
281             self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
282         else:
283             self.rodc_id = b''
284
285     def make_rodc_zeroed_checksum(self, ctype=None):
286         checksum = super().make_zeroed_checksum(ctype)
287         return checksum + bytes(len(self.rodc_id))
288
289     def make_rodc_checksum(self, usage, plaintext, ctype=None):
290         checksum = super().make_checksum(usage, plaintext, ctype)
291         return checksum + self.rodc_id
292
293     def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
294         if self.rodc_id:
295             cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
296
297             if self.rodc_id != cksum_rodc_id:
298                 raise AssertionError(f'{self.rodc_id.hex()} != '
299                                      f'{cksum_rodc_id.hex()}')
300
301         super().verify_checksum(usage,
302                                 plaintext,
303                                 ctype,
304                                 cksum)
305
306
307 class ZeroedChecksumKey(RodcPacEncryptionKey):
308     def make_checksum(self, usage, plaintext, ctype=None):
309         return self.make_zeroed_checksum(ctype)
310
311     def make_rodc_checksum(self, usage, plaintext, ctype=None):
312         return self.make_rodc_zeroed_checksum(ctype)
313
314
315 class WrongLengthChecksumKey(RodcPacEncryptionKey):
316     def __init__(self, key, kvno, length):
317         super().__init__(key, kvno)
318
319         self._length = length
320
321     @classmethod
322     def _adjust_to_length(cls, checksum, length):
323         diff = length - len(checksum)
324         if diff > 0:
325             checksum += bytes(diff)
326         elif diff < 0:
327             checksum = checksum[:length]
328
329         return checksum
330
331     def make_zeroed_checksum(self, ctype=None):
332         return bytes(self._length)
333
334     def make_checksum(self, usage, plaintext, ctype=None):
335         checksum = super().make_checksum(usage, plaintext, ctype)
336         return self._adjust_to_length(checksum, self._length)
337
338     def make_rodc_zeroed_checksum(self, ctype=None):
339         return bytes(self._length)
340
341     def make_rodc_checksum(self, usage, plaintext, ctype=None):
342         checksum = super().make_rodc_checksum(usage, plaintext, ctype)
343         return self._adjust_to_length(checksum, self._length)
344
345
346 class KerberosCredentials(Credentials):
347
348     fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
349                            security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
350                            security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
351
352     def __init__(self):
353         super(KerberosCredentials, self).__init__()
354         all_enc_types = 0
355         all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
356         all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
357         all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
358
359         self.as_supported_enctypes = all_enc_types
360         self.tgs_supported_enctypes = all_enc_types
361         self.ap_supported_enctypes = all_enc_types
362
363         self.kvno = None
364         self.forced_keys = {}
365
366         self.forced_salt = None
367
368         self.dn = None
369         self.upn = None
370         self.spn = None
371
372     def set_as_supported_enctypes(self, value):
373         self.as_supported_enctypes = int(value)
374
375     def set_tgs_supported_enctypes(self, value):
376         self.tgs_supported_enctypes = int(value)
377
378     def set_ap_supported_enctypes(self, value):
379         self.ap_supported_enctypes = int(value)
380
381     etype_map = collections.OrderedDict([
382         (kcrypto.Enctype.AES256,
383             security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
384         (kcrypto.Enctype.AES128,
385             security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
386         (kcrypto.Enctype.RC4,
387             security.KERB_ENCTYPE_RC4_HMAC_MD5),
388         (kcrypto.Enctype.DES_MD5,
389             security.KERB_ENCTYPE_DES_CBC_MD5),
390         (kcrypto.Enctype.DES_CRC,
391             security.KERB_ENCTYPE_DES_CBC_CRC)
392     ])
393
394     @classmethod
395     def etypes_to_bits(cls, etypes):
396         bits = 0
397         for etype in etypes:
398             bit = cls.etype_map[etype]
399             if bits & bit:
400                 raise ValueError(f'Got duplicate etype: {etype}')
401             bits |= bit
402
403         return bits
404
405     @classmethod
406     def bits_to_etypes(cls, bits):
407         etypes = ()
408         for etype, bit in cls.etype_map.items():
409             if bit & bits:
410                 bits &= ~bit
411                 etypes += (etype,)
412
413         bits &= ~cls.fast_supported_bits
414         if bits != 0:
415             raise ValueError(f'Unsupported etype bits: {bits}')
416
417         return etypes
418
419     def get_as_krb5_etypes(self):
420         return self.bits_to_etypes(self.as_supported_enctypes)
421
422     def get_tgs_krb5_etypes(self):
423         return self.bits_to_etypes(self.tgs_supported_enctypes)
424
425     def get_ap_krb5_etypes(self):
426         return self.bits_to_etypes(self.ap_supported_enctypes)
427
428     def set_kvno(self, kvno):
429         # Sign-extend from 32 bits.
430         if kvno & 1 << 31:
431             kvno |= -1 << 31
432         self.kvno = kvno
433
434     def get_kvno(self):
435         return self.kvno
436
437     def set_forced_key(self, etype, hexkey):
438         etype = int(etype)
439         contents = binascii.a2b_hex(hexkey)
440         key = kcrypto.Key(etype, contents)
441         self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
442
443     def get_forced_key(self, etype):
444         etype = int(etype)
445         return self.forced_keys.get(etype)
446
447     def set_forced_salt(self, salt):
448         self.forced_salt = bytes(salt)
449
450     def get_forced_salt(self):
451         return self.forced_salt
452
453     def get_salt(self):
454         if self.forced_salt is not None:
455             return self.forced_salt
456
457         upn = self.get_upn()
458         if upn is not None:
459             salt_name = upn.rsplit('@', 1)[0].replace('/', '')
460         else:
461             salt_name = self.get_username()
462
463         if self.get_workstation():
464             salt_name = self.get_username().lower()
465             if salt_name[-1] == '$':
466                 salt_name = salt_name[:-1]
467             salt_string = '%shost%s.%s' % (
468                 self.get_realm().upper(),
469                 salt_name,
470                 self.get_realm().lower())
471         else:
472             salt_string = self.get_realm().upper() + salt_name
473
474         return salt_string.encode('utf-8')
475
476     def set_dn(self, dn):
477         self.dn = dn
478
479     def get_dn(self):
480         return self.dn
481
482     def set_spn(self, spn):
483         self.spn = spn
484
485     def get_spn(self):
486         return self.spn
487
488     def set_upn(self, upn):
489         self.upn = upn
490
491     def get_upn(self):
492         return self.upn
493
494
495 class KerberosTicketCreds:
496     def __init__(self, ticket, session_key,
497                  crealm=None, cname=None,
498                  srealm=None, sname=None,
499                  decryption_key=None,
500                  ticket_private=None,
501                  encpart_private=None):
502         self.ticket = ticket
503         self.session_key = session_key
504         self.crealm = crealm
505         self.cname = cname
506         self.srealm = srealm
507         self.sname = sname
508         self.decryption_key = decryption_key
509         self.ticket_private = ticket_private
510         self.encpart_private = encpart_private
511
512
513 class RawKerberosTest(TestCaseInTempDir):
514     """A raw Kerberos Test case."""
515
516     pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
517                           krb5pac.PAC_TYPE_KDC_CHECKSUM,
518                           krb5pac.PAC_TYPE_TICKET_CHECKSUM}
519
520     etypes_to_test = (
521         {"value": -1111, "name": "dummy", },
522         {"value": kcrypto.Enctype.AES256, "name": "aes128", },
523         {"value": kcrypto.Enctype.AES128, "name": "aes256", },
524         {"value": kcrypto.Enctype.RC4, "name": "rc4", },
525     )
526
527     setup_etype_test_permutations_done = False
528
529     @classmethod
530     def setup_etype_test_permutations(cls):
531         if cls.setup_etype_test_permutations_done:
532             return
533
534         res = []
535
536         num_idxs = len(cls.etypes_to_test)
537         permutations = []
538         for num in range(1, num_idxs + 1):
539             chunk = list(itertools.permutations(range(num_idxs), num))
540             for e in chunk:
541                 el = list(e)
542                 permutations.append(el)
543
544         for p in permutations:
545             name = None
546             etypes = ()
547             for idx in p:
548                 n = cls.etypes_to_test[idx]["name"]
549                 if name is None:
550                     name = n
551                 else:
552                     name += "_%s" % n
553                 etypes += (cls.etypes_to_test[idx]["value"],)
554
555             r = {"name": name, "etypes": etypes, }
556             res.append(r)
557
558         cls.etype_test_permutations = res
559         cls.setup_etype_test_permutations_done = True
560
561     @classmethod
562     def etype_test_permutation_name_idx(cls):
563         cls.setup_etype_test_permutations()
564         res = []
565         idx = 0
566         for e in cls.etype_test_permutations:
567             r = (e['name'], idx)
568             idx += 1
569             res.append(r)
570         return res
571
572     def etype_test_permutation_by_idx(self, idx):
573         e = self.etype_test_permutations[idx]
574         return (e['name'], e['etypes'])
575
576     @classmethod
577     def setUpClass(cls):
578         super().setUpClass()
579
580         cls.host = samba.tests.env_get_var_value('SERVER')
581         cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
582
583         # A dictionary containing credentials that have already been
584         # obtained.
585         cls.creds_dict = {}
586
587         kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
588                                                          allow_missing=True)
589         if kdc_fast_support is None:
590             kdc_fast_support = '0'
591         cls.kdc_fast_support = bool(int(kdc_fast_support))
592
593         tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
594                                                         allow_missing=True)
595         if tkt_sig_support is None:
596             tkt_sig_support = '0'
597         cls.tkt_sig_support = bool(int(tkt_sig_support))
598
599         expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
600                                                    allow_missing=True)
601         if expect_pac is None:
602             expect_pac = '1'
603         cls.expect_pac = bool(int(expect_pac))
604
605         expect_extra_pac_buffers = samba.tests.env_get_var_value(
606             'EXPECT_EXTRA_PAC_BUFFERS',
607             allow_missing=True)
608         if expect_extra_pac_buffers is None:
609             expect_extra_pac_buffers = '1'
610         cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
611
612     def setUp(self):
613         super().setUp()
614         self.do_asn1_print = False
615         self.do_hexdump = False
616
617         strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
618                                                         allow_missing=True)
619         if strict_checking is None:
620             strict_checking = '1'
621         self.strict_checking = bool(int(strict_checking))
622
623         self.s = None
624
625         self.unspecified_kvno = object()
626
627     def tearDown(self):
628         self._disconnect("tearDown")
629         super().tearDown()
630
631     def _disconnect(self, reason):
632         if self.s is None:
633             return
634         self.s.close()
635         self.s = None
636         if self.do_hexdump:
637             sys.stderr.write("disconnect[%s]\n" % reason)
638
639     def _connect_tcp(self, host):
640         tcp_port = 88
641         try:
642             self.a = socket.getaddrinfo(host, tcp_port, socket.AF_UNSPEC,
643                                         socket.SOCK_STREAM, socket.SOL_TCP,
644                                         0)
645             self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
646             self.s.settimeout(10)
647             self.s.connect(self.a[0][4])
648         except socket.error:
649             self.s.close()
650             raise
651         except IOError:
652             self.s.close()
653             raise
654
655     def connect(self, host):
656         self.assertNotConnected()
657         self._connect_tcp(host)
658         if self.do_hexdump:
659             sys.stderr.write("connected[%s]\n" % host)
660
661     def env_get_var(self, varname, prefix,
662                     fallback_default=True,
663                     allow_missing=False):
664         val = None
665         if prefix is not None:
666             allow_missing_prefix = allow_missing or fallback_default
667             val = samba.tests.env_get_var_value(
668                 '%s_%s' % (prefix, varname),
669                 allow_missing=allow_missing_prefix)
670         else:
671             fallback_default = True
672         if val is None and fallback_default:
673             val = samba.tests.env_get_var_value(varname,
674                                                 allow_missing=allow_missing)
675         return val
676
677     def _get_krb5_creds_from_env(self, prefix,
678                                  default_username=None,
679                                  allow_missing_password=False,
680                                  allow_missing_keys=True,
681                                  require_strongest_key=False):
682         c = KerberosCredentials()
683         c.guess()
684
685         domain = self.env_get_var('DOMAIN', prefix)
686         realm = self.env_get_var('REALM', prefix)
687         allow_missing_username = default_username is not None
688         username = self.env_get_var('USERNAME', prefix,
689                                     fallback_default=False,
690                                     allow_missing=allow_missing_username)
691         if username is None:
692             username = default_username
693         password = self.env_get_var('PASSWORD', prefix,
694                                     fallback_default=False,
695                                     allow_missing=allow_missing_password)
696         c.set_domain(domain)
697         c.set_realm(realm)
698         c.set_username(username)
699         if password is not None:
700             c.set_password(password)
701         as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
702                                                  prefix, allow_missing=True)
703         if as_supported_enctypes is not None:
704             c.set_as_supported_enctypes(as_supported_enctypes)
705         tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
706                                                   prefix, allow_missing=True)
707         if tgs_supported_enctypes is not None:
708             c.set_tgs_supported_enctypes(tgs_supported_enctypes)
709         ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
710                                                  prefix, allow_missing=True)
711         if ap_supported_enctypes is not None:
712             c.set_ap_supported_enctypes(ap_supported_enctypes)
713
714         if require_strongest_key:
715             kvno_allow_missing = False
716             if password is None:
717                 aes256_allow_missing = False
718             else:
719                 aes256_allow_missing = True
720         else:
721             kvno_allow_missing = allow_missing_keys
722             aes256_allow_missing = allow_missing_keys
723         kvno = self.env_get_var('KVNO', prefix,
724                                 fallback_default=False,
725                                 allow_missing=kvno_allow_missing)
726         if kvno is not None:
727             c.set_kvno(int(kvno))
728         aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
729                                       fallback_default=False,
730                                       allow_missing=aes256_allow_missing)
731         if aes256_key is not None:
732             c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
733         aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
734                                       fallback_default=False,
735                                       allow_missing=True)
736         if aes128_key is not None:
737             c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
738         rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
739                                    fallback_default=False, allow_missing=True)
740         if rc4_key is not None:
741             c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
742
743         if not allow_missing_keys:
744             self.assertTrue(c.forced_keys,
745                             'Please supply %s encryption keys '
746                             'in environment' % prefix)
747
748         return c
749
750     def _get_krb5_creds(self,
751                         prefix,
752                         default_username=None,
753                         allow_missing_password=False,
754                         allow_missing_keys=True,
755                         require_strongest_key=False,
756                         fallback_creds_fn=None):
757         if prefix in self.creds_dict:
758             return self.creds_dict[prefix]
759
760         # We don't have the credentials already
761         creds = None
762         env_err = None
763         try:
764             # Try to obtain them from the environment
765             creds = self._get_krb5_creds_from_env(
766                 prefix,
767                 default_username=default_username,
768                 allow_missing_password=allow_missing_password,
769                 allow_missing_keys=allow_missing_keys,
770                 require_strongest_key=require_strongest_key)
771         except Exception as err:
772             # An error occurred, so save it for later
773             env_err = err
774         else:
775             self.assertIsNotNone(creds)
776             # Save the obtained credentials
777             self.creds_dict[prefix] = creds
778             return creds
779
780         if fallback_creds_fn is not None:
781             try:
782                 # Try to use the fallback method
783                 creds = fallback_creds_fn()
784             except Exception as err:
785                 print("ERROR FROM ENV: %r" % (env_err))
786                 print("FALLBACK-FN: %s" % (fallback_creds_fn))
787                 print("FALLBACK-ERROR: %r" % (err))
788             else:
789                 self.assertIsNotNone(creds)
790                 # Save the obtained credentials
791                 self.creds_dict[prefix] = creds
792                 return creds
793
794         # Both methods failed, so raise the exception from the
795         # environment method
796         raise env_err
797
798     def get_user_creds(self,
799                        allow_missing_password=False,
800                        allow_missing_keys=True):
801         c = self._get_krb5_creds(prefix=None,
802                                  allow_missing_password=allow_missing_password,
803                                  allow_missing_keys=allow_missing_keys)
804         return c
805
806     def get_service_creds(self,
807                           allow_missing_password=False,
808                           allow_missing_keys=True):
809         c = self._get_krb5_creds(prefix='SERVICE',
810                                  allow_missing_password=allow_missing_password,
811                                  allow_missing_keys=allow_missing_keys)
812         return c
813
814     def get_client_creds(self,
815                          allow_missing_password=False,
816                          allow_missing_keys=True):
817         c = self._get_krb5_creds(prefix='CLIENT',
818                                  allow_missing_password=allow_missing_password,
819                                  allow_missing_keys=allow_missing_keys)
820         return c
821
822     def get_server_creds(self,
823                          allow_missing_password=False,
824                          allow_missing_keys=True):
825         c = self._get_krb5_creds(prefix='SERVER',
826                                  allow_missing_password=allow_missing_password,
827                                  allow_missing_keys=allow_missing_keys)
828         return c
829
830     def get_admin_creds(self,
831                         allow_missing_password=False,
832                         allow_missing_keys=True):
833         c = self._get_krb5_creds(prefix='ADMIN',
834                                  allow_missing_password=allow_missing_password,
835                                  allow_missing_keys=allow_missing_keys)
836         c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
837         c.set_workstation('')
838         return c
839
840     def get_rodc_krbtgt_creds(self,
841                               require_keys=True,
842                               require_strongest_key=False):
843         if require_strongest_key:
844             self.assertTrue(require_keys)
845         c = self._get_krb5_creds(prefix='RODC_KRBTGT',
846                                  allow_missing_password=True,
847                                  allow_missing_keys=not require_keys,
848                                  require_strongest_key=require_strongest_key)
849         return c
850
851     def get_krbtgt_creds(self,
852                          require_keys=True,
853                          require_strongest_key=False):
854         if require_strongest_key:
855             self.assertTrue(require_keys)
856         c = self._get_krb5_creds(prefix='KRBTGT',
857                                  default_username='krbtgt',
858                                  allow_missing_password=True,
859                                  allow_missing_keys=not require_keys,
860                                  require_strongest_key=require_strongest_key)
861         return c
862
863     def get_anon_creds(self):
864         c = Credentials()
865         c.set_anonymous()
866         return c
867
868     def asn1_dump(self, name, obj, asn1_print=None):
869         if asn1_print is None:
870             asn1_print = self.do_asn1_print
871         if asn1_print:
872             if name is not None:
873                 sys.stderr.write("%s:\n%s" % (name, obj))
874             else:
875                 sys.stderr.write("%s" % (obj))
876
877     def hex_dump(self, name, blob, hexdump=None):
878         if hexdump is None:
879             hexdump = self.do_hexdump
880         if hexdump:
881             sys.stderr.write(
882                 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
883
884     def der_decode(
885             self,
886             blob,
887             asn1Spec=None,
888             native_encode=True,
889             asn1_print=None,
890             hexdump=None):
891         if asn1Spec is not None:
892             class_name = type(asn1Spec).__name__.split(':')[0]
893         else:
894             class_name = "<None-asn1Spec>"
895         self.hex_dump(class_name, blob, hexdump=hexdump)
896         obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
897         self.asn1_dump(None, obj, asn1_print=asn1_print)
898         if native_encode:
899             obj = pyasn1_native_encode(obj)
900         return obj
901
902     def der_encode(
903             self,
904             obj,
905             asn1Spec=None,
906             native_decode=True,
907             asn1_print=None,
908             hexdump=None):
909         if native_decode:
910             obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
911         class_name = type(obj).__name__.split(':')[0]
912         if class_name is not None:
913             self.asn1_dump(None, obj, asn1_print=asn1_print)
914         blob = pyasn1_der_encode(obj)
915         if class_name is not None:
916             self.hex_dump(class_name, blob, hexdump=hexdump)
917         return blob
918
919     def send_pdu(self, req, asn1_print=None, hexdump=None):
920         try:
921             k5_pdu = self.der_encode(
922                 req, native_decode=False, asn1_print=asn1_print, hexdump=False)
923             header = struct.pack('>I', len(k5_pdu))
924             req_pdu = header
925             req_pdu += k5_pdu
926             self.hex_dump("send_pdu", header, hexdump=hexdump)
927             self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
928             while True:
929                 sent = self.s.send(req_pdu, 0)
930                 if sent == len(req_pdu):
931                     break
932                 req_pdu = req_pdu[sent:]
933         except socket.error as e:
934             self._disconnect("send_pdu: %s" % e)
935             raise
936         except IOError as e:
937             self._disconnect("send_pdu: %s" % e)
938             raise
939
940     def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
941         rep_pdu = None
942         try:
943             if timeout is not None:
944                 self.s.settimeout(timeout)
945             rep_pdu = self.s.recv(num_recv, 0)
946             self.s.settimeout(10)
947             if len(rep_pdu) == 0:
948                 self._disconnect("recv_raw: EOF")
949                 return None
950             self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
951         except socket.timeout:
952             self.s.settimeout(10)
953             sys.stderr.write("recv_raw: TIMEOUT\n")
954         except socket.error as e:
955             self._disconnect("recv_raw: %s" % e)
956             raise
957         except IOError as e:
958             self._disconnect("recv_raw: %s" % e)
959             raise
960         return rep_pdu
961
962     def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
963         rep_pdu = None
964         rep = None
965         raw_pdu = self.recv_raw(
966             num_recv=4, hexdump=hexdump, timeout=timeout)
967         if raw_pdu is None:
968             return (None, None)
969         header = struct.unpack(">I", raw_pdu[0:4])
970         k5_len = header[0]
971         if k5_len == 0:
972             return (None, "")
973         missing = k5_len
974         rep_pdu = b''
975         while missing > 0:
976             raw_pdu = self.recv_raw(
977                 num_recv=missing, hexdump=hexdump, timeout=timeout)
978             self.assertGreaterEqual(len(raw_pdu), 1)
979             rep_pdu += raw_pdu
980             missing = k5_len - len(rep_pdu)
981         k5_raw = self.der_decode(
982             rep_pdu,
983             asn1Spec=None,
984             native_encode=False,
985             asn1_print=False,
986             hexdump=False)
987         pvno = k5_raw['field-0']
988         self.assertEqual(pvno, 5)
989         msg_type = k5_raw['field-1']
990         self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
991         if msg_type == KRB_AS_REP:
992             asn1Spec = krb5_asn1.AS_REP()
993         elif msg_type == KRB_TGS_REP:
994             asn1Spec = krb5_asn1.TGS_REP()
995         elif msg_type == KRB_ERROR:
996             asn1Spec = krb5_asn1.KRB_ERROR()
997         rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
998                               asn1_print=asn1_print, hexdump=False)
999         return (rep, rep_pdu)
1000
1001     def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1002         (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
1003                                            hexdump=hexdump,
1004                                            timeout=timeout)
1005         return rep
1006
1007     def assertIsConnected(self):
1008         self.assertIsNotNone(self.s, msg="Not connected")
1009
1010     def assertNotConnected(self):
1011         self.assertIsNone(self.s, msg="Is connected")
1012
1013     def send_recv_transaction(
1014             self,
1015             req,
1016             asn1_print=None,
1017             hexdump=None,
1018             timeout=None,
1019             to_rodc=False):
1020         host = self.host if to_rodc else self.dc_host
1021         self.connect(host)
1022         try:
1023             self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1024             rep = self.recv_pdu(
1025                 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1026         except Exception:
1027             self._disconnect("transaction failed")
1028             raise
1029         self._disconnect("transaction done")
1030         return rep
1031
1032     def assertNoValue(self, value):
1033         self.assertTrue(value.isNoValue)
1034
1035     def assertHasValue(self, value):
1036         self.assertIsNotNone(value)
1037
1038     def getElementValue(self, obj, elem):
1039         return obj.get(elem)
1040
1041     def assertElementMissing(self, obj, elem):
1042         v = self.getElementValue(obj, elem)
1043         self.assertIsNone(v)
1044
1045     def assertElementPresent(self, obj, elem, expect_empty=False):
1046         v = self.getElementValue(obj, elem)
1047         self.assertIsNotNone(v)
1048         if self.strict_checking:
1049             if isinstance(v, collections.abc.Container):
1050                 if expect_empty:
1051                     self.assertEqual(0, len(v))
1052                 else:
1053                     self.assertNotEqual(0, len(v))
1054
1055     def assertElementEqual(self, obj, elem, value):
1056         v = self.getElementValue(obj, elem)
1057         self.assertIsNotNone(v)
1058         self.assertEqual(v, value)
1059
1060     def assertElementEqualUTF8(self, obj, elem, value):
1061         v = self.getElementValue(obj, elem)
1062         self.assertIsNotNone(v)
1063         self.assertEqual(v, bytes(value, 'utf8'))
1064
1065     def assertPrincipalEqual(self, princ1, princ2):
1066         self.assertEqual(princ1['name-type'], princ2['name-type'])
1067         self.assertEqual(
1068             len(princ1['name-string']),
1069             len(princ2['name-string']),
1070             msg="princ1=%s != princ2=%s" % (princ1, princ2))
1071         for idx in range(len(princ1['name-string'])):
1072             self.assertEqual(
1073                 princ1['name-string'][idx],
1074                 princ2['name-string'][idx],
1075                 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1076
1077     def assertElementEqualPrincipal(self, obj, elem, value):
1078         v = self.getElementValue(obj, elem)
1079         self.assertIsNotNone(v)
1080         v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1081         self.assertPrincipalEqual(v, value)
1082
1083     def assertElementKVNO(self, obj, elem, value):
1084         v = self.getElementValue(obj, elem)
1085         if value == "autodetect":
1086             value = v
1087         if value is not None:
1088             self.assertIsNotNone(v)
1089             # The value on the wire should never be 0
1090             self.assertNotEqual(v, 0)
1091             # unspecified_kvno means we don't know the kvno,
1092             # but want to enforce its presence
1093             if value is not self.unspecified_kvno:
1094                 value = int(value)
1095                 self.assertNotEqual(value, 0)
1096                 self.assertEqual(v, value)
1097         else:
1098             self.assertIsNone(v)
1099
1100     def assertElementFlags(self, obj, elem, expected, unexpected):
1101         v = self.getElementValue(obj, elem)
1102         self.assertIsNotNone(v)
1103         if expected is not None:
1104             self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1105             for i, flag in enumerate(expected):
1106                 if flag == 1:
1107                     self.assertEqual('1', v[i],
1108                                      f"'{expected.namedValues[i]}' "
1109                                      f"expected in {v}")
1110         if unexpected is not None:
1111             self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1112             for i, flag in enumerate(unexpected):
1113                 if flag == 1:
1114                     self.assertEqual('0', v[i],
1115                                      f"'{unexpected.namedValues[i]}' "
1116                                      f"unexpected in {v}")
1117
1118     def assertSequenceElementsEqual(self, expected, got, *,
1119                                     require_strict=None,
1120                                     require_ordered=True):
1121         if self.strict_checking and require_ordered:
1122             self.assertEqual(expected, got)
1123         else:
1124             fail_msg = f'expected: {expected} got: {got}'
1125
1126             if not self.strict_checking and require_strict is not None:
1127                 fail_msg += f' (ignoring: {require_strict})'
1128                 expected = (x for x in expected if x not in require_strict)
1129                 got = (x for x in got if x not in require_strict)
1130
1131             self.assertCountEqual(expected, got, fail_msg)
1132
1133     def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1134         if epoch is None:
1135             epoch = time.time()
1136         if offset is not None:
1137             epoch = epoch + int(offset)
1138         dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1139         return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1140
1141     def get_KerberosTime(self, epoch=None, offset=None):
1142         (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1143         return s
1144
1145     def get_EpochFromKerberosTime(self, kerberos_time):
1146         if isinstance(kerberos_time, bytes):
1147             kerberos_time = kerberos_time.decode()
1148
1149         epoch = datetime.datetime.strptime(kerberos_time,
1150                                            '%Y%m%d%H%M%SZ')
1151         epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1152         epoch = int(epoch.timestamp())
1153
1154         return epoch
1155
1156     def get_Nonce(self):
1157         nonce_min = 0x7f000000
1158         nonce_max = 0x7fffffff
1159         v = random.randint(nonce_min, nonce_max)
1160         return v
1161
1162     def get_pa_dict(self, pa_data):
1163         pa_dict = {}
1164
1165         if pa_data is not None:
1166             for pa in pa_data:
1167                 pa_type = pa['padata-type']
1168                 if pa_type in pa_dict:
1169                     raise RuntimeError(f'Duplicate type {pa_type}')
1170                 pa_dict[pa_type] = pa['padata-value']
1171
1172         return pa_dict
1173
1174     def SessionKey_create(self, etype, contents, kvno=None):
1175         key = kcrypto.Key(etype, contents)
1176         return RodcPacEncryptionKey(key, kvno)
1177
1178     def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1179                            params=None):
1180         self.assertIsNotNone(pwd)
1181         self.assertIsNotNone(salt)
1182         key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1183         return RodcPacEncryptionKey(key, kvno)
1184
1185     def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1186         e = etype_info2['etype']
1187
1188         salt = etype_info2.get('salt')
1189
1190         if e == kcrypto.Enctype.RC4:
1191             nthash = creds.get_nt_hash()
1192             return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1193
1194         params = etype_info2.get('s2kparams')
1195
1196         password = creds.get_password()
1197         return self.PasswordKey_create(
1198             etype=e, pwd=password, salt=salt, kvno=kvno, params=params)
1199
1200     def TicketDecryptionKey_from_creds(self, creds, etype=None):
1201
1202         if etype is None:
1203             etypes = creds.get_tgs_krb5_etypes()
1204             if etypes:
1205                 etype = etypes[0]
1206             else:
1207                 etype = kcrypto.Enctype.RC4
1208
1209         forced_key = creds.get_forced_key(etype)
1210         if forced_key is not None:
1211             return forced_key
1212
1213         kvno = creds.get_kvno()
1214
1215         fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1216                     "nor a password specified, " % (
1217                         creds.get_username(), etype, kvno))
1218
1219         if etype == kcrypto.Enctype.RC4:
1220             nthash = creds.get_nt_hash()
1221             self.assertIsNotNone(nthash, msg=fail_msg)
1222             return self.SessionKey_create(etype=etype,
1223                                           contents=nthash,
1224                                           kvno=kvno)
1225
1226         password = creds.get_password()
1227         self.assertIsNotNone(password, msg=fail_msg)
1228         salt = creds.get_salt()
1229         return self.PasswordKey_create(etype=etype,
1230                                        pwd=password,
1231                                        salt=salt,
1232                                        kvno=kvno)
1233
1234     def RandomKey(self, etype):
1235         e = kcrypto._get_enctype_profile(etype)
1236         contents = samba.generate_random_bytes(e.keysize)
1237         return self.SessionKey_create(etype=etype, contents=contents)
1238
1239     def EncryptionKey_import(self, EncryptionKey_obj):
1240         return self.SessionKey_create(EncryptionKey_obj['keytype'],
1241                                       EncryptionKey_obj['keyvalue'])
1242
1243     def EncryptedData_create(self, key, usage, plaintext):
1244         # EncryptedData   ::= SEQUENCE {
1245         #         etype   [0] Int32 -- EncryptionType --,
1246         #         kvno    [1] Int32 OPTIONAL,
1247         #         cipher  [2] OCTET STRING -- ciphertext
1248         # }
1249         ciphertext = key.encrypt(usage, plaintext)
1250         EncryptedData_obj = {
1251             'etype': key.etype,
1252             'cipher': ciphertext
1253         }
1254         if key.kvno is not None:
1255             EncryptedData_obj['kvno'] = key.kvno
1256         return EncryptedData_obj
1257
1258     def Checksum_create(self, key, usage, plaintext, ctype=None):
1259         # Checksum        ::= SEQUENCE {
1260         #        cksumtype       [0] Int32,
1261         #        checksum        [1] OCTET STRING
1262         # }
1263         if ctype is None:
1264             ctype = key.ctype
1265         checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1266         Checksum_obj = {
1267             'cksumtype': ctype,
1268             'checksum': checksum,
1269         }
1270         return Checksum_obj
1271
1272     @classmethod
1273     def PrincipalName_create(cls, name_type, names):
1274         # PrincipalName   ::= SEQUENCE {
1275         #         name-type       [0] Int32,
1276         #         name-string     [1] SEQUENCE OF KerberosString
1277         # }
1278         PrincipalName_obj = {
1279             'name-type': name_type,
1280             'name-string': names,
1281         }
1282         return PrincipalName_obj
1283
1284     def AuthorizationData_create(self, ad_type, ad_data):
1285         # AuthorizationData ::= SEQUENCE {
1286         #         ad-type         [0] Int32,
1287         #         ad-data         [1] OCTET STRING
1288         # }
1289         AUTH_DATA_obj = {
1290             'ad-type': ad_type,
1291             'ad-data': ad_data
1292         }
1293         return AUTH_DATA_obj
1294
1295     def PA_DATA_create(self, padata_type, padata_value):
1296         # PA-DATA         ::= SEQUENCE {
1297         #         -- NOTE: first tag is [1], not [0]
1298         #         padata-type     [1] Int32,
1299         #         padata-value    [2] OCTET STRING -- might be encoded AP-REQ
1300         # }
1301         PA_DATA_obj = {
1302             'padata-type': padata_type,
1303             'padata-value': padata_value,
1304         }
1305         return PA_DATA_obj
1306
1307     def PA_ENC_TS_ENC_create(self, ts, usec):
1308         # PA-ENC-TS-ENC ::= SEQUENCE {
1309         #        patimestamp[0]          KerberosTime, -- client's time
1310         #        pausec[1]               krb5int32 OPTIONAL
1311         # }
1312         PA_ENC_TS_ENC_obj = {
1313             'patimestamp': ts,
1314             'pausec': usec,
1315         }
1316         return PA_ENC_TS_ENC_obj
1317
1318     def PA_PAC_OPTIONS_create(self, options):
1319         # PA-PAC-OPTIONS  ::= SEQUENCE {
1320         #         options         [0] PACOptionFlags
1321         # }
1322         PA_PAC_OPTIONS_obj = {
1323             'options': options
1324         }
1325         return PA_PAC_OPTIONS_obj
1326
1327     def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1328         # KrbFastArmor    ::= SEQUENCE {
1329         #         armor-type      [0] Int32,
1330         #         armor-value     [1] OCTET STRING,
1331         #         ...
1332         # }
1333         KRB_FAST_ARMOR_obj = {
1334             'armor-type': armor_type,
1335             'armor-value': armor_value
1336         }
1337         return KRB_FAST_ARMOR_obj
1338
1339     def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1340         # KrbFastReq      ::= SEQUENCE {
1341         #         fast-options    [0] FastOptions,
1342         #         padata          [1] SEQUENCE OF PA-DATA,
1343         #         req-body        [2] KDC-REQ-BODY,
1344         #         ...
1345         # }
1346         KRB_FAST_REQ_obj = {
1347             'fast-options': fast_options,
1348             'padata': padata,
1349             'req-body': req_body
1350         }
1351         return KRB_FAST_REQ_obj
1352
1353     def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1354         # KrbFastArmoredReq ::= SEQUENCE {
1355         #         armor           [0] KrbFastArmor OPTIONAL,
1356         #         req-checksum    [1] Checksum,
1357         #         enc-fast-req    [2] EncryptedData -- KrbFastReq --
1358         # }
1359         KRB_FAST_ARMORED_REQ_obj = {
1360             'req-checksum': req_checksum,
1361             'enc-fast-req': enc_fast_req
1362         }
1363         if armor is not None:
1364             KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1365         return KRB_FAST_ARMORED_REQ_obj
1366
1367     def PA_FX_FAST_REQUEST_create(self, armored_data):
1368         # PA-FX-FAST-REQUEST ::= CHOICE {
1369         #         armored-data    [0] KrbFastArmoredReq,
1370         #         ...
1371         # }
1372         PA_FX_FAST_REQUEST_obj = {
1373             'armored-data': armored_data
1374         }
1375         return PA_FX_FAST_REQUEST_obj
1376
1377     def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1378         # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1379         #         include-pac[0] BOOLEAN --If TRUE, and no pac present,
1380         #                                --    include PAC.
1381         #                                --If FALSE, and PAC present,
1382         #                                --    remove PAC.
1383         # }
1384         KERB_PA_PAC_REQUEST_obj = {
1385             'include-pac': include_pac,
1386         }
1387         if not pa_data_create:
1388             return KERB_PA_PAC_REQUEST_obj
1389         pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1390                                  asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1391         pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1392         return pa_data
1393
1394     def get_pa_pac_options(self, options):
1395         pac_options = self.PA_PAC_OPTIONS_create(options)
1396         pac_options = self.der_encode(pac_options,
1397                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1398         pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1399
1400         return pac_options
1401
1402     def KDC_REQ_BODY_create(self,
1403                             kdc_options,
1404                             cname,
1405                             realm,
1406                             sname,
1407                             from_time,
1408                             till_time,
1409                             renew_time,
1410                             nonce,
1411                             etypes,
1412                             addresses,
1413                             additional_tickets,
1414                             EncAuthorizationData,
1415                             EncAuthorizationData_key,
1416                             EncAuthorizationData_usage,
1417                             asn1_print=None,
1418                             hexdump=None):
1419         # KDC-REQ-BODY    ::= SEQUENCE {
1420         #        kdc-options             [0] KDCOptions,
1421         #        cname                   [1] PrincipalName OPTIONAL
1422         #                                    -- Used only in AS-REQ --,
1423         #        realm                   [2] Realm
1424         #                                    -- Server's realm
1425         #                                    -- Also client's in AS-REQ --,
1426         #        sname                   [3] PrincipalName OPTIONAL,
1427         #        from                    [4] KerberosTime OPTIONAL,
1428         #        till                    [5] KerberosTime,
1429         #        rtime                   [6] KerberosTime OPTIONAL,
1430         #        nonce                   [7] UInt32,
1431         #        etype                   [8] SEQUENCE OF Int32
1432         #                                    -- EncryptionType
1433         #                                    -- in preference order --,
1434         #        addresses               [9] HostAddresses OPTIONAL,
1435         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1436         #                                    -- AuthorizationData --,
1437         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1438         #                                        -- NOTE: not empty
1439         # }
1440         if EncAuthorizationData is not None:
1441             enc_ad_plain = self.der_encode(
1442                 EncAuthorizationData,
1443                 asn1Spec=krb5_asn1.AuthorizationData(),
1444                 asn1_print=asn1_print,
1445                 hexdump=hexdump)
1446             enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1447                                                EncAuthorizationData_usage,
1448                                                enc_ad_plain)
1449         else:
1450             enc_ad = None
1451         KDC_REQ_BODY_obj = {
1452             'kdc-options': kdc_options,
1453             'realm': realm,
1454             'till': till_time,
1455             'nonce': nonce,
1456             'etype': etypes,
1457         }
1458         if cname is not None:
1459             KDC_REQ_BODY_obj['cname'] = cname
1460         if sname is not None:
1461             KDC_REQ_BODY_obj['sname'] = sname
1462         if from_time is not None:
1463             KDC_REQ_BODY_obj['from'] = from_time
1464         if renew_time is not None:
1465             KDC_REQ_BODY_obj['rtime'] = renew_time
1466         if addresses is not None:
1467             KDC_REQ_BODY_obj['addresses'] = addresses
1468         if enc_ad is not None:
1469             KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1470         if additional_tickets is not None:
1471             KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1472         return KDC_REQ_BODY_obj
1473
1474     def KDC_REQ_create(self,
1475                        msg_type,
1476                        padata,
1477                        req_body,
1478                        asn1Spec=None,
1479                        asn1_print=None,
1480                        hexdump=None):
1481         # KDC-REQ         ::= SEQUENCE {
1482         #        -- NOTE: first tag is [1], not [0]
1483         #        pvno            [1] INTEGER (5) ,
1484         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1485         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1486         #                            -- NOTE: not empty --,
1487         #        req-body        [4] KDC-REQ-BODY
1488         # }
1489         #
1490         KDC_REQ_obj = {
1491             'pvno': 5,
1492             'msg-type': msg_type,
1493             'req-body': req_body,
1494         }
1495         if padata is not None:
1496             KDC_REQ_obj['padata'] = padata
1497         if asn1Spec is not None:
1498             KDC_REQ_decoded = pyasn1_native_decode(
1499                 KDC_REQ_obj, asn1Spec=asn1Spec)
1500         else:
1501             KDC_REQ_decoded = None
1502         return KDC_REQ_obj, KDC_REQ_decoded
1503
1504     def AS_REQ_create(self,
1505                       padata,       # optional
1506                       kdc_options,  # required
1507                       cname,        # optional
1508                       realm,        # required
1509                       sname,        # optional
1510                       from_time,    # optional
1511                       till_time,    # required
1512                       renew_time,   # optional
1513                       nonce,        # required
1514                       etypes,       # required
1515                       addresses,    # optional
1516                       additional_tickets,
1517                       native_decoded_only=True,
1518                       asn1_print=None,
1519                       hexdump=None):
1520         # KDC-REQ         ::= SEQUENCE {
1521         #        -- NOTE: first tag is [1], not [0]
1522         #        pvno            [1] INTEGER (5) ,
1523         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1524         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1525         #                            -- NOTE: not empty --,
1526         #        req-body        [4] KDC-REQ-BODY
1527         # }
1528         #
1529         # KDC-REQ-BODY    ::= SEQUENCE {
1530         #        kdc-options             [0] KDCOptions,
1531         #        cname                   [1] PrincipalName OPTIONAL
1532         #                                    -- Used only in AS-REQ --,
1533         #        realm                   [2] Realm
1534         #                                    -- Server's realm
1535         #                                    -- Also client's in AS-REQ --,
1536         #        sname                   [3] PrincipalName OPTIONAL,
1537         #        from                    [4] KerberosTime OPTIONAL,
1538         #        till                    [5] KerberosTime,
1539         #        rtime                   [6] KerberosTime OPTIONAL,
1540         #        nonce                   [7] UInt32,
1541         #        etype                   [8] SEQUENCE OF Int32
1542         #                                    -- EncryptionType
1543         #                                    -- in preference order --,
1544         #        addresses               [9] HostAddresses OPTIONAL,
1545         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1546         #                                    -- AuthorizationData --,
1547         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1548         #                                        -- NOTE: not empty
1549         # }
1550         KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1551             kdc_options,
1552             cname,
1553             realm,
1554             sname,
1555             from_time,
1556             till_time,
1557             renew_time,
1558             nonce,
1559             etypes,
1560             addresses,
1561             additional_tickets,
1562             EncAuthorizationData=None,
1563             EncAuthorizationData_key=None,
1564             EncAuthorizationData_usage=None,
1565             asn1_print=asn1_print,
1566             hexdump=hexdump)
1567         obj, decoded = self.KDC_REQ_create(
1568             msg_type=KRB_AS_REQ,
1569             padata=padata,
1570             req_body=KDC_REQ_BODY_obj,
1571             asn1Spec=krb5_asn1.AS_REQ(),
1572             asn1_print=asn1_print,
1573             hexdump=hexdump)
1574         if native_decoded_only:
1575             return decoded
1576         return decoded, obj
1577
1578     def AP_REQ_create(self, ap_options, ticket, authenticator):
1579         # AP-REQ          ::= [APPLICATION 14] SEQUENCE {
1580         #        pvno            [0] INTEGER (5),
1581         #        msg-type        [1] INTEGER (14),
1582         #        ap-options      [2] APOptions,
1583         #        ticket          [3] Ticket,
1584         #        authenticator   [4] EncryptedData -- Authenticator
1585         # }
1586         AP_REQ_obj = {
1587             'pvno': 5,
1588             'msg-type': KRB_AP_REQ,
1589             'ap-options': ap_options,
1590             'ticket': ticket,
1591             'authenticator': authenticator,
1592         }
1593         return AP_REQ_obj
1594
1595     def Authenticator_create(
1596             self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1597             authorization_data):
1598         # -- Unencrypted authenticator
1599         # Authenticator   ::= [APPLICATION 2] SEQUENCE  {
1600         #        authenticator-vno       [0] INTEGER (5),
1601         #        crealm                  [1] Realm,
1602         #        cname                   [2] PrincipalName,
1603         #        cksum                   [3] Checksum OPTIONAL,
1604         #        cusec                   [4] Microseconds,
1605         #        ctime                   [5] KerberosTime,
1606         #        subkey                  [6] EncryptionKey OPTIONAL,
1607         #        seq-number              [7] UInt32 OPTIONAL,
1608         #        authorization-data      [8] AuthorizationData OPTIONAL
1609         # }
1610         Authenticator_obj = {
1611             'authenticator-vno': 5,
1612             'crealm': crealm,
1613             'cname': cname,
1614             'cusec': cusec,
1615             'ctime': ctime,
1616         }
1617         if cksum is not None:
1618             Authenticator_obj['cksum'] = cksum
1619         if subkey is not None:
1620             Authenticator_obj['subkey'] = subkey
1621         if seq_number is not None:
1622             Authenticator_obj['seq-number'] = seq_number
1623         if authorization_data is not None:
1624             Authenticator_obj['authorization-data'] = authorization_data
1625         return Authenticator_obj
1626
1627     def TGS_REQ_create(self,
1628                        padata,       # optional
1629                        cusec,
1630                        ctime,
1631                        ticket,
1632                        kdc_options,  # required
1633                        cname,        # optional
1634                        realm,        # required
1635                        sname,        # optional
1636                        from_time,    # optional
1637                        till_time,    # required
1638                        renew_time,   # optional
1639                        nonce,        # required
1640                        etypes,       # required
1641                        addresses,    # optional
1642                        EncAuthorizationData,
1643                        EncAuthorizationData_key,
1644                        additional_tickets,
1645                        ticket_session_key,
1646                        authenticator_subkey=None,
1647                        body_checksum_type=None,
1648                        native_decoded_only=True,
1649                        asn1_print=None,
1650                        hexdump=None):
1651         # KDC-REQ         ::= SEQUENCE {
1652         #        -- NOTE: first tag is [1], not [0]
1653         #        pvno            [1] INTEGER (5) ,
1654         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1655         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1656         #                            -- NOTE: not empty --,
1657         #        req-body        [4] KDC-REQ-BODY
1658         # }
1659         #
1660         # KDC-REQ-BODY    ::= SEQUENCE {
1661         #        kdc-options             [0] KDCOptions,
1662         #        cname                   [1] PrincipalName OPTIONAL
1663         #                                    -- Used only in AS-REQ --,
1664         #        realm                   [2] Realm
1665         #                                    -- Server's realm
1666         #                                    -- Also client's in AS-REQ --,
1667         #        sname                   [3] PrincipalName OPTIONAL,
1668         #        from                    [4] KerberosTime OPTIONAL,
1669         #        till                    [5] KerberosTime,
1670         #        rtime                   [6] KerberosTime OPTIONAL,
1671         #        nonce                   [7] UInt32,
1672         #        etype                   [8] SEQUENCE OF Int32
1673         #                                    -- EncryptionType
1674         #                                    -- in preference order --,
1675         #        addresses               [9] HostAddresses OPTIONAL,
1676         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1677         #                                    -- AuthorizationData --,
1678         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1679         #                                        -- NOTE: not empty
1680         # }
1681
1682         if authenticator_subkey is not None:
1683             EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1684         else:
1685             EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1686
1687         req_body = self.KDC_REQ_BODY_create(
1688             kdc_options=kdc_options,
1689             cname=None,
1690             realm=realm,
1691             sname=sname,
1692             from_time=from_time,
1693             till_time=till_time,
1694             renew_time=renew_time,
1695             nonce=nonce,
1696             etypes=etypes,
1697             addresses=addresses,
1698             additional_tickets=additional_tickets,
1699             EncAuthorizationData=EncAuthorizationData,
1700             EncAuthorizationData_key=EncAuthorizationData_key,
1701             EncAuthorizationData_usage=EncAuthorizationData_usage)
1702         req_body_blob = self.der_encode(req_body,
1703                                         asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1704                                         asn1_print=asn1_print, hexdump=hexdump)
1705
1706         req_body_checksum = self.Checksum_create(ticket_session_key,
1707                                                  KU_TGS_REQ_AUTH_CKSUM,
1708                                                  req_body_blob,
1709                                                  ctype=body_checksum_type)
1710
1711         subkey_obj = None
1712         if authenticator_subkey is not None:
1713             subkey_obj = authenticator_subkey.export_obj()
1714         seq_number = random.randint(0, 0xfffffffe)
1715         authenticator = self.Authenticator_create(
1716             crealm=realm,
1717             cname=cname,
1718             cksum=req_body_checksum,
1719             cusec=cusec,
1720             ctime=ctime,
1721             subkey=subkey_obj,
1722             seq_number=seq_number,
1723             authorization_data=None)
1724         authenticator = self.der_encode(
1725             authenticator,
1726             asn1Spec=krb5_asn1.Authenticator(),
1727             asn1_print=asn1_print,
1728             hexdump=hexdump)
1729
1730         authenticator = self.EncryptedData_create(
1731             ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1732
1733         ap_options = krb5_asn1.APOptions('0')
1734         ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1735                                     ticket=ticket,
1736                                     authenticator=authenticator)
1737         ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1738                                  asn1_print=asn1_print, hexdump=hexdump)
1739         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1740         if padata is not None:
1741             padata.append(pa_tgs_req)
1742         else:
1743             padata = [pa_tgs_req]
1744
1745         obj, decoded = self.KDC_REQ_create(
1746             msg_type=KRB_TGS_REQ,
1747             padata=padata,
1748             req_body=req_body,
1749             asn1Spec=krb5_asn1.TGS_REQ(),
1750             asn1_print=asn1_print,
1751             hexdump=hexdump)
1752         if native_decoded_only:
1753             return decoded
1754         return decoded, obj
1755
1756     def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1757         # PA-S4U2Self     ::= SEQUENCE {
1758         #        name            [0] PrincipalName,
1759         #        realm           [1] Realm,
1760         #        cksum           [2] Checksum,
1761         #        auth            [3] GeneralString
1762         # }
1763         cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1764         for n in name['name-string']:
1765             cksum_data += n.encode()
1766         cksum_data += realm.encode()
1767         cksum_data += "Kerberos".encode()
1768         cksum = self.Checksum_create(tgt_session_key,
1769                                      KU_NON_KERB_CKSUM_SALT,
1770                                      cksum_data,
1771                                      ctype)
1772
1773         PA_S4U2Self_obj = {
1774             'name': name,
1775             'realm': realm,
1776             'cksum': cksum,
1777             'auth': "Kerberos",
1778         }
1779         pa_s4u2self = self.der_encode(
1780             PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1781         return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1782
1783     def _generic_kdc_exchange(self,
1784                               kdc_exchange_dict,  # required
1785                               cname=None,  # optional
1786                               realm=None,  # required
1787                               sname=None,  # optional
1788                               from_time=None,  # optional
1789                               till_time=None,  # required
1790                               renew_time=None,  # optional
1791                               etypes=None,  # required
1792                               addresses=None,  # optional
1793                               additional_tickets=None,  # optional
1794                               EncAuthorizationData=None,  # optional
1795                               EncAuthorizationData_key=None,  # optional
1796                               EncAuthorizationData_usage=None):  # optional
1797
1798         check_error_fn = kdc_exchange_dict['check_error_fn']
1799         check_rep_fn = kdc_exchange_dict['check_rep_fn']
1800         generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
1801         generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
1802         generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
1803         generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
1804         callback_dict = kdc_exchange_dict['callback_dict']
1805         req_msg_type = kdc_exchange_dict['req_msg_type']
1806         req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
1807         rep_msg_type = kdc_exchange_dict['rep_msg_type']
1808
1809         expected_error_mode = kdc_exchange_dict['expected_error_mode']
1810         kdc_options = kdc_exchange_dict['kdc_options']
1811
1812         pac_request = kdc_exchange_dict['pac_request']
1813         pac_options = kdc_exchange_dict['pac_options']
1814
1815         # Parameters specific to the inner request body
1816         inner_req = kdc_exchange_dict['inner_req']
1817
1818         # Parameters specific to the outer request body
1819         outer_req = kdc_exchange_dict['outer_req']
1820
1821         if till_time is None:
1822             till_time = self.get_KerberosTime(offset=36000)
1823
1824         if 'nonce' in kdc_exchange_dict:
1825             nonce = kdc_exchange_dict['nonce']
1826         else:
1827             nonce = self.get_Nonce()
1828             kdc_exchange_dict['nonce'] = nonce
1829
1830         req_body = self.KDC_REQ_BODY_create(
1831             kdc_options=kdc_options,
1832             cname=cname,
1833             realm=realm,
1834             sname=sname,
1835             from_time=from_time,
1836             till_time=till_time,
1837             renew_time=renew_time,
1838             nonce=nonce,
1839             etypes=etypes,
1840             addresses=addresses,
1841             additional_tickets=additional_tickets,
1842             EncAuthorizationData=EncAuthorizationData,
1843             EncAuthorizationData_key=EncAuthorizationData_key,
1844             EncAuthorizationData_usage=EncAuthorizationData_usage)
1845
1846         inner_req_body = dict(req_body)
1847         if inner_req is not None:
1848             for key, value in inner_req.items():
1849                 if value is not None:
1850                     inner_req_body[key] = value
1851                 else:
1852                     del inner_req_body[key]
1853         if outer_req is not None:
1854             for key, value in outer_req.items():
1855                 if value is not None:
1856                     req_body[key] = value
1857                 else:
1858                     del req_body[key]
1859
1860         additional_padata = []
1861         if pac_request is not None:
1862             pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
1863             additional_padata.append(pa_pac_request)
1864         if pac_options is not None:
1865             pa_pac_options = self.get_pa_pac_options(pac_options)
1866             additional_padata.append(pa_pac_options)
1867
1868         if req_msg_type == KRB_AS_REQ:
1869             tgs_req = None
1870             tgs_req_padata = None
1871         else:
1872             self.assertEqual(KRB_TGS_REQ, req_msg_type)
1873
1874             tgs_req = self.generate_ap_req(kdc_exchange_dict,
1875                                            callback_dict,
1876                                            req_body,
1877                                            armor=False)
1878             tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
1879
1880         if generate_fast_padata_fn is not None:
1881             self.assertIsNotNone(generate_fast_fn)
1882             # This can alter req_body...
1883             fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
1884                                                             callback_dict,
1885                                                             req_body)
1886         else:
1887             fast_padata = []
1888
1889         if generate_fast_armor_fn is not None:
1890             self.assertIsNotNone(generate_fast_fn)
1891             fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
1892                                                  callback_dict,
1893                                                  req_body,
1894                                                  armor=True)
1895
1896             fast_armor_type = kdc_exchange_dict['fast_armor_type']
1897             fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
1898                                                     fast_ap_req)
1899         else:
1900             fast_armor = None
1901
1902         if generate_padata_fn is not None:
1903             # This can alter req_body...
1904             outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
1905                                                         callback_dict,
1906                                                         req_body)
1907             self.assertIsNotNone(outer_padata)
1908             self.assertNotIn(PADATA_KDC_REQ,
1909                              [pa['padata-type'] for pa in outer_padata],
1910                              'Don\'t create TGS-REQ manually')
1911         else:
1912             outer_padata = None
1913
1914         if generate_fast_fn is not None:
1915             armor_key = kdc_exchange_dict['armor_key']
1916             self.assertIsNotNone(armor_key)
1917
1918             if req_msg_type == KRB_AS_REQ:
1919                 checksum_blob = self.der_encode(
1920                     req_body,
1921                     asn1Spec=krb5_asn1.KDC_REQ_BODY())
1922             else:
1923                 self.assertEqual(KRB_TGS_REQ, req_msg_type)
1924                 checksum_blob = tgs_req
1925
1926             checksum = self.Checksum_create(armor_key,
1927                                             KU_FAST_REQ_CHKSUM,
1928                                             checksum_blob)
1929
1930             fast_padata += additional_padata
1931             fast = generate_fast_fn(kdc_exchange_dict,
1932                                     callback_dict,
1933                                     inner_req_body,
1934                                     fast_padata,
1935                                     fast_armor,
1936                                     checksum)
1937         else:
1938             fast = None
1939
1940         padata = []
1941
1942         if tgs_req_padata is not None:
1943             padata.append(tgs_req_padata)
1944
1945         if fast is not None:
1946             padata.append(fast)
1947
1948         if outer_padata is not None:
1949             padata += outer_padata
1950
1951         if fast is None:
1952             padata += additional_padata
1953
1954         if not padata:
1955             padata = None
1956
1957         kdc_exchange_dict['req_padata'] = padata
1958         kdc_exchange_dict['fast_padata'] = fast_padata
1959         kdc_exchange_dict['req_body'] = inner_req_body
1960
1961         req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
1962                                                    padata=padata,
1963                                                    req_body=req_body,
1964                                                    asn1Spec=req_asn1Spec())
1965
1966         to_rodc = kdc_exchange_dict['to_rodc']
1967
1968         rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
1969         self.assertIsNotNone(rep)
1970
1971         msg_type = self.getElementValue(rep, 'msg-type')
1972         self.assertIsNotNone(msg_type)
1973
1974         expected_msg_type = None
1975         if check_error_fn is not None:
1976             expected_msg_type = KRB_ERROR
1977             self.assertIsNone(check_rep_fn)
1978             self.assertNotEqual(0, len(expected_error_mode))
1979             self.assertNotIn(0, expected_error_mode)
1980         if check_rep_fn is not None:
1981             expected_msg_type = rep_msg_type
1982             self.assertIsNone(check_error_fn)
1983             self.assertEqual(0, len(expected_error_mode))
1984         self.assertIsNotNone(expected_msg_type)
1985         if msg_type == KRB_ERROR:
1986             error_code = self.getElementValue(rep, 'error-code')
1987             fail_msg = f'Got unexpected error: {error_code}'
1988         else:
1989             fail_msg = f'Expected to fail with error: {expected_error_mode}'
1990         self.assertEqual(msg_type, expected_msg_type, fail_msg)
1991
1992         if msg_type == KRB_ERROR:
1993             return check_error_fn(kdc_exchange_dict,
1994                                   callback_dict,
1995                                   rep)
1996
1997         return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
1998
1999     def as_exchange_dict(self,
2000                          expected_crealm=None,
2001                          expected_cname=None,
2002                          expected_anon=False,
2003                          expected_srealm=None,
2004                          expected_sname=None,
2005                          expected_account_name=None,
2006                          expected_upn_name=None,
2007                          expected_sid=None,
2008                          expected_supported_etypes=None,
2009                          expected_flags=None,
2010                          unexpected_flags=None,
2011                          ticket_decryption_key=None,
2012                          expect_ticket_checksum=None,
2013                          generate_fast_fn=None,
2014                          generate_fast_armor_fn=None,
2015                          generate_fast_padata_fn=None,
2016                          fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2017                          generate_padata_fn=None,
2018                          check_error_fn=None,
2019                          check_rep_fn=None,
2020                          check_kdc_private_fn=None,
2021                          callback_dict=None,
2022                          expected_error_mode=0,
2023                          expected_status=None,
2024                          client_as_etypes=None,
2025                          expected_salt=None,
2026                          authenticator_subkey=None,
2027                          preauth_key=None,
2028                          armor_key=None,
2029                          armor_tgt=None,
2030                          armor_subkey=None,
2031                          auth_data=None,
2032                          kdc_options='',
2033                          inner_req=None,
2034                          outer_req=None,
2035                          pac_request=None,
2036                          pac_options=None,
2037                          expect_edata=None,
2038                          expect_pac=True,
2039                          expect_claims=True,
2040                          expect_upn_dns_info_ex=None,
2041                          expect_pac_attrs=None,
2042                          expect_pac_attrs_pac_request=None,
2043                          expect_requester_sid=None,
2044                          to_rodc=False):
2045         if expected_error_mode == 0:
2046             expected_error_mode = ()
2047         elif not isinstance(expected_error_mode, collections.abc.Container):
2048             expected_error_mode = (expected_error_mode,)
2049
2050         kdc_exchange_dict = {
2051             'req_msg_type': KRB_AS_REQ,
2052             'req_asn1Spec': krb5_asn1.AS_REQ,
2053             'rep_msg_type': KRB_AS_REP,
2054             'rep_asn1Spec': krb5_asn1.AS_REP,
2055             'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2056             'expected_crealm': expected_crealm,
2057             'expected_cname': expected_cname,
2058             'expected_anon': expected_anon,
2059             'expected_srealm': expected_srealm,
2060             'expected_sname': expected_sname,
2061             'expected_account_name': expected_account_name,
2062             'expected_upn_name': expected_upn_name,
2063             'expected_sid': expected_sid,
2064             'expected_supported_etypes': expected_supported_etypes,
2065             'expected_flags': expected_flags,
2066             'unexpected_flags': unexpected_flags,
2067             'ticket_decryption_key': ticket_decryption_key,
2068             'expect_ticket_checksum': expect_ticket_checksum,
2069             'generate_fast_fn': generate_fast_fn,
2070             'generate_fast_armor_fn': generate_fast_armor_fn,
2071             'generate_fast_padata_fn': generate_fast_padata_fn,
2072             'fast_armor_type': fast_armor_type,
2073             'generate_padata_fn': generate_padata_fn,
2074             'check_error_fn': check_error_fn,
2075             'check_rep_fn': check_rep_fn,
2076             'check_kdc_private_fn': check_kdc_private_fn,
2077             'callback_dict': callback_dict,
2078             'expected_error_mode': expected_error_mode,
2079             'expected_status': expected_status,
2080             'client_as_etypes': client_as_etypes,
2081             'expected_salt': expected_salt,
2082             'authenticator_subkey': authenticator_subkey,
2083             'preauth_key': preauth_key,
2084             'armor_key': armor_key,
2085             'armor_tgt': armor_tgt,
2086             'armor_subkey': armor_subkey,
2087             'auth_data': auth_data,
2088             'kdc_options': kdc_options,
2089             'inner_req': inner_req,
2090             'outer_req': outer_req,
2091             'pac_request': pac_request,
2092             'pac_options': pac_options,
2093             'expect_edata': expect_edata,
2094             'expect_pac': expect_pac,
2095             'expect_claims': expect_claims,
2096             'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2097             'expect_pac_attrs': expect_pac_attrs,
2098             'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2099             'expect_requester_sid': expect_requester_sid,
2100             'to_rodc': to_rodc
2101         }
2102         if callback_dict is None:
2103             callback_dict = {}
2104
2105         return kdc_exchange_dict
2106
2107     def tgs_exchange_dict(self,
2108                           expected_crealm=None,
2109                           expected_cname=None,
2110                           expected_anon=False,
2111                           expected_srealm=None,
2112                           expected_sname=None,
2113                           expected_account_name=None,
2114                           expected_upn_name=None,
2115                           expected_sid=None,
2116                           expected_supported_etypes=None,
2117                           expected_flags=None,
2118                           unexpected_flags=None,
2119                           ticket_decryption_key=None,
2120                           expect_ticket_checksum=None,
2121                           generate_fast_fn=None,
2122                           generate_fast_armor_fn=None,
2123                           generate_fast_padata_fn=None,
2124                           fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2125                           generate_padata_fn=None,
2126                           check_error_fn=None,
2127                           check_rep_fn=None,
2128                           check_kdc_private_fn=None,
2129                           expected_error_mode=0,
2130                           expected_status=None,
2131                           callback_dict=None,
2132                           tgt=None,
2133                           armor_key=None,
2134                           armor_tgt=None,
2135                           armor_subkey=None,
2136                           authenticator_subkey=None,
2137                           auth_data=None,
2138                           body_checksum_type=None,
2139                           kdc_options='',
2140                           inner_req=None,
2141                           outer_req=None,
2142                           pac_request=None,
2143                           pac_options=None,
2144                           expect_edata=None,
2145                           expect_pac=True,
2146                           expect_claims=True,
2147                           expect_upn_dns_info_ex=None,
2148                           expect_pac_attrs=None,
2149                           expect_pac_attrs_pac_request=None,
2150                           expect_requester_sid=None,
2151                           expected_proxy_target=None,
2152                           expected_transited_services=None,
2153                           to_rodc=False):
2154         if expected_error_mode == 0:
2155             expected_error_mode = ()
2156         elif not isinstance(expected_error_mode, collections.abc.Container):
2157             expected_error_mode = (expected_error_mode,)
2158
2159         kdc_exchange_dict = {
2160             'req_msg_type': KRB_TGS_REQ,
2161             'req_asn1Spec': krb5_asn1.TGS_REQ,
2162             'rep_msg_type': KRB_TGS_REP,
2163             'rep_asn1Spec': krb5_asn1.TGS_REP,
2164             'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2165             'expected_crealm': expected_crealm,
2166             'expected_cname': expected_cname,
2167             'expected_anon': expected_anon,
2168             'expected_srealm': expected_srealm,
2169             'expected_sname': expected_sname,
2170             'expected_account_name': expected_account_name,
2171             'expected_upn_name': expected_upn_name,
2172             'expected_sid': expected_sid,
2173             'expected_supported_etypes': expected_supported_etypes,
2174             'expected_flags': expected_flags,
2175             'unexpected_flags': unexpected_flags,
2176             'ticket_decryption_key': ticket_decryption_key,
2177             'expect_ticket_checksum': expect_ticket_checksum,
2178             'generate_fast_fn': generate_fast_fn,
2179             'generate_fast_armor_fn': generate_fast_armor_fn,
2180             'generate_fast_padata_fn': generate_fast_padata_fn,
2181             'fast_armor_type': fast_armor_type,
2182             'generate_padata_fn': generate_padata_fn,
2183             'check_error_fn': check_error_fn,
2184             'check_rep_fn': check_rep_fn,
2185             'check_kdc_private_fn': check_kdc_private_fn,
2186             'callback_dict': callback_dict,
2187             'expected_error_mode': expected_error_mode,
2188             'expected_status': expected_status,
2189             'tgt': tgt,
2190             'body_checksum_type': body_checksum_type,
2191             'armor_key': armor_key,
2192             'armor_tgt': armor_tgt,
2193             'armor_subkey': armor_subkey,
2194             'auth_data': auth_data,
2195             'authenticator_subkey': authenticator_subkey,
2196             'kdc_options': kdc_options,
2197             'inner_req': inner_req,
2198             'outer_req': outer_req,
2199             'pac_request': pac_request,
2200             'pac_options': pac_options,
2201             'expect_edata': expect_edata,
2202             'expect_pac': expect_pac,
2203             'expect_claims': expect_claims,
2204             'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2205             'expect_pac_attrs': expect_pac_attrs,
2206             'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2207             'expect_requester_sid': expect_requester_sid,
2208             'expected_proxy_target': expected_proxy_target,
2209             'expected_transited_services': expected_transited_services,
2210             'to_rodc': to_rodc
2211         }
2212         if callback_dict is None:
2213             callback_dict = {}
2214
2215         return kdc_exchange_dict
2216
2217     def generic_check_kdc_rep(self,
2218                               kdc_exchange_dict,
2219                               callback_dict,
2220                               rep):
2221
2222         expected_crealm = kdc_exchange_dict['expected_crealm']
2223         expected_anon = kdc_exchange_dict['expected_anon']
2224         expected_srealm = kdc_exchange_dict['expected_srealm']
2225         expected_sname = kdc_exchange_dict['expected_sname']
2226         ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2227         check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2228         rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2229         msg_type = kdc_exchange_dict['rep_msg_type']
2230         armor_key = kdc_exchange_dict['armor_key']
2231
2232         self.assertElementEqual(rep, 'msg-type', msg_type)  # AS-REP | TGS-REP
2233         padata = self.getElementValue(rep, 'padata')
2234         if self.strict_checking:
2235             self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2236             if expected_anon:
2237                 expected_cname = self.PrincipalName_create(
2238                     name_type=NT_WELLKNOWN,
2239                     names=['WELLKNOWN', 'ANONYMOUS'])
2240             else:
2241                 expected_cname = kdc_exchange_dict['expected_cname']
2242             self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2243         self.assertElementPresent(rep, 'ticket')
2244         ticket = self.getElementValue(rep, 'ticket')
2245         ticket_encpart = None
2246         ticket_cipher = None
2247         self.assertIsNotNone(ticket)
2248         if ticket is not None:  # Never None, but gives indentation
2249             self.assertElementEqual(ticket, 'tkt-vno', 5)
2250             self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2251             self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2252             self.assertElementPresent(ticket, 'enc-part')
2253             ticket_encpart = self.getElementValue(ticket, 'enc-part')
2254             self.assertIsNotNone(ticket_encpart)
2255             if ticket_encpart is not None:  # Never None, but gives indentation
2256                 self.assertElementPresent(ticket_encpart, 'etype')
2257
2258                 kdc_options = kdc_exchange_dict['kdc_options']
2259                 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
2260                 expect_kvno = (pos >= len(kdc_options)
2261                                or kdc_options[pos] != '1')
2262                 if expect_kvno:
2263                     # 'unspecified' means present, with any value != 0
2264                     self.assertElementKVNO(ticket_encpart, 'kvno',
2265                                            self.unspecified_kvno)
2266                 else:
2267                     # For user-to-user, don't expect a kvno.
2268                     self.assertElementMissing(ticket_encpart, 'kvno')
2269
2270                 self.assertElementPresent(ticket_encpart, 'cipher')
2271                 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2272         self.assertElementPresent(rep, 'enc-part')
2273         encpart = self.getElementValue(rep, 'enc-part')
2274         encpart_cipher = None
2275         self.assertIsNotNone(encpart)
2276         if encpart is not None:  # Never None, but gives indentation
2277             self.assertElementPresent(encpart, 'etype')
2278             self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2279             self.assertElementPresent(encpart, 'cipher')
2280             encpart_cipher = self.getElementValue(encpart, 'cipher')
2281
2282         ticket_checksum = None
2283
2284         # Get the decryption key for the encrypted part
2285         encpart_decryption_key, encpart_decryption_usage = (
2286             self.get_preauth_key(kdc_exchange_dict))
2287
2288         if armor_key is not None:
2289             pa_dict = self.get_pa_dict(padata)
2290
2291             if PADATA_FX_FAST in pa_dict:
2292                 fx_fast_data = pa_dict[PADATA_FX_FAST]
2293                 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2294                                                         fx_fast_data,
2295                                                         armor_key,
2296                                                         finished=True)
2297
2298                 if 'strengthen-key' in fast_response:
2299                     strengthen_key = self.EncryptionKey_import(
2300                         fast_response['strengthen-key'])
2301                     encpart_decryption_key = (
2302                         self.generate_strengthen_reply_key(
2303                             strengthen_key,
2304                             encpart_decryption_key))
2305
2306                 fast_finished = fast_response.get('finished')
2307                 if fast_finished is not None:
2308                     ticket_checksum = fast_finished['ticket-checksum']
2309
2310                 self.check_rep_padata(kdc_exchange_dict,
2311                                       callback_dict,
2312                                       fast_response['padata'],
2313                                       error_code=0)
2314
2315         ticket_private = None
2316         if ticket_decryption_key is not None:
2317             self.assertElementEqual(ticket_encpart, 'etype',
2318                                     ticket_decryption_key.etype)
2319             self.assertElementKVNO(ticket_encpart, 'kvno',
2320                                    ticket_decryption_key.kvno)
2321             ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2322                                                            ticket_cipher)
2323             ticket_private = self.der_decode(
2324                 ticket_decpart,
2325                 asn1Spec=krb5_asn1.EncTicketPart())
2326
2327         encpart_private = None
2328         self.assertIsNotNone(encpart_decryption_key)
2329         if encpart_decryption_key is not None:
2330             self.assertElementEqual(encpart, 'etype',
2331                                     encpart_decryption_key.etype)
2332             if self.strict_checking:
2333                 self.assertElementKVNO(encpart, 'kvno',
2334                                        encpart_decryption_key.kvno)
2335             rep_decpart = encpart_decryption_key.decrypt(
2336                 encpart_decryption_usage,
2337                 encpart_cipher)
2338             # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2339             # application tag 26
2340             try:
2341                 encpart_private = self.der_decode(
2342                     rep_decpart,
2343                     asn1Spec=rep_encpart_asn1Spec())
2344             except Exception:
2345                 encpart_private = self.der_decode(
2346                     rep_decpart,
2347                     asn1Spec=krb5_asn1.EncTGSRepPart())
2348
2349         self.assertIsNotNone(check_kdc_private_fn)
2350         if check_kdc_private_fn is not None:
2351             check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2352                                  rep, ticket_private, encpart_private,
2353                                  ticket_checksum)
2354
2355         return rep
2356
2357     def check_fx_fast_data(self,
2358                            kdc_exchange_dict,
2359                            fx_fast_data,
2360                            armor_key,
2361                            finished=False,
2362                            expect_strengthen_key=True):
2363         fx_fast_data = self.der_decode(fx_fast_data,
2364                                        asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2365
2366         enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2367         self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2368
2369         fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2370
2371         fast_response = self.der_decode(fast_rep,
2372                                         asn1Spec=krb5_asn1.KrbFastResponse())
2373
2374         if expect_strengthen_key and self.strict_checking:
2375             self.assertIn('strengthen-key', fast_response)
2376
2377         if finished:
2378             self.assertIn('finished', fast_response)
2379
2380         # Ensure that the nonce matches the nonce in the body of the request
2381         # (RFC6113 5.4.3).
2382         nonce = kdc_exchange_dict['nonce']
2383         self.assertEqual(nonce, fast_response['nonce'])
2384
2385         return fast_response
2386
2387     def generic_check_kdc_private(self,
2388                                   kdc_exchange_dict,
2389                                   callback_dict,
2390                                   rep,
2391                                   ticket_private,
2392                                   encpart_private,
2393                                   ticket_checksum):
2394         kdc_options = kdc_exchange_dict['kdc_options']
2395         canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2396         canonicalize = (canon_pos < len(kdc_options)
2397                         and kdc_options[canon_pos] == '1')
2398         renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2399         renewable = (renewable_pos < len(kdc_options)
2400                      and kdc_options[renewable_pos] == '1')
2401         renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
2402         renew = (renew_pos < len(kdc_options)
2403                  and kdc_options[renew_pos] == '1')
2404         expect_renew_till = renewable or renew
2405
2406         expected_crealm = kdc_exchange_dict['expected_crealm']
2407         expected_cname = kdc_exchange_dict['expected_cname']
2408         expected_srealm = kdc_exchange_dict['expected_srealm']
2409         expected_sname = kdc_exchange_dict['expected_sname']
2410         ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2411
2412         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2413
2414         expected_flags = kdc_exchange_dict.get('expected_flags')
2415         unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2416
2417         ticket = self.getElementValue(rep, 'ticket')
2418
2419         if ticket_checksum is not None:
2420             armor_key = kdc_exchange_dict['armor_key']
2421             self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2422
2423         to_rodc = kdc_exchange_dict['to_rodc']
2424         if to_rodc:
2425             krbtgt_creds = self.get_rodc_krbtgt_creds()
2426         else:
2427             krbtgt_creds = self.get_krbtgt_creds()
2428         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2429
2430         krbtgt_keys = [krbtgt_key]
2431         if not self.strict_checking:
2432             krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
2433                 krbtgt_creds,
2434                 etype=kcrypto.Enctype.RC4)
2435             krbtgt_keys.append(krbtgt_key_rc4)
2436
2437         if self.expect_pac and self.is_tgs(expected_sname):
2438             expect_pac = True
2439         else:
2440             expect_pac = kdc_exchange_dict['expect_pac']
2441
2442         ticket_session_key = None
2443         if ticket_private is not None:
2444             self.assertElementFlags(ticket_private, 'flags',
2445                                     expected_flags,
2446                                     unexpected_flags)
2447             self.assertElementPresent(ticket_private, 'key')
2448             ticket_key = self.getElementValue(ticket_private, 'key')
2449             self.assertIsNotNone(ticket_key)
2450             if ticket_key is not None:  # Never None, but gives indentation
2451                 self.assertElementPresent(ticket_key, 'keytype')
2452                 self.assertElementPresent(ticket_key, 'keyvalue')
2453                 ticket_session_key = self.EncryptionKey_import(ticket_key)
2454             self.assertElementEqualUTF8(ticket_private, 'crealm',
2455                                         expected_crealm)
2456             if self.strict_checking:
2457                 self.assertElementEqualPrincipal(ticket_private, 'cname',
2458                                                  expected_cname)
2459             self.assertElementPresent(ticket_private, 'transited')
2460             self.assertElementPresent(ticket_private, 'authtime')
2461             if self.strict_checking:
2462                 self.assertElementPresent(ticket_private, 'starttime')
2463             self.assertElementPresent(ticket_private, 'endtime')
2464             if expect_renew_till:
2465                 if self.strict_checking:
2466                     self.assertElementPresent(ticket_private, 'renew-till')
2467             else:
2468                 self.assertElementMissing(ticket_private, 'renew-till')
2469             if self.strict_checking:
2470                 self.assertElementEqual(ticket_private, 'caddr', [])
2471             if expect_pac is not None:
2472                 self.assertElementPresent(ticket_private, 'authorization-data',
2473                                           expect_empty=not expect_pac)
2474
2475         encpart_session_key = None
2476         if encpart_private is not None:
2477             self.assertElementPresent(encpart_private, 'key')
2478             encpart_key = self.getElementValue(encpart_private, 'key')
2479             self.assertIsNotNone(encpart_key)
2480             if encpart_key is not None:  # Never None, but gives indentation
2481                 self.assertElementPresent(encpart_key, 'keytype')
2482                 self.assertElementPresent(encpart_key, 'keyvalue')
2483                 encpart_session_key = self.EncryptionKey_import(encpart_key)
2484             self.assertElementPresent(encpart_private, 'last-req')
2485             self.assertElementEqual(encpart_private, 'nonce',
2486                                     kdc_exchange_dict['nonce'])
2487             if rep_msg_type == KRB_AS_REP:
2488                 if self.strict_checking:
2489                     self.assertElementPresent(encpart_private,
2490                                               'key-expiration')
2491             else:
2492                 self.assertElementMissing(encpart_private,
2493                                           'key-expiration')
2494             self.assertElementFlags(encpart_private, 'flags',
2495                                     expected_flags,
2496                                     unexpected_flags)
2497             self.assertElementPresent(encpart_private, 'authtime')
2498             if self.strict_checking:
2499                 self.assertElementPresent(encpart_private, 'starttime')
2500             self.assertElementPresent(encpart_private, 'endtime')
2501             if expect_renew_till:
2502                 if self.strict_checking:
2503                     self.assertElementPresent(encpart_private, 'renew-till')
2504             else:
2505                 self.assertElementMissing(encpart_private, 'renew-till')
2506             self.assertElementEqualUTF8(encpart_private, 'srealm',
2507                                         expected_srealm)
2508             self.assertElementEqualPrincipal(encpart_private, 'sname',
2509                                              expected_sname)
2510             if self.strict_checking:
2511                 self.assertElementEqual(encpart_private, 'caddr', [])
2512
2513             sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2514
2515             if self.strict_checking:
2516                 if canonicalize or '1' in sent_pac_options:
2517                     self.assertElementPresent(encpart_private,
2518                                               'encrypted-pa-data')
2519                     enc_pa_dict = self.get_pa_dict(
2520                         encpart_private['encrypted-pa-data'])
2521                     if canonicalize:
2522                         self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2523
2524                         expected_supported_etypes = kdc_exchange_dict[
2525                             'expected_supported_etypes']
2526                         expected_supported_etypes |= (
2527                             security.KERB_ENCTYPE_DES_CBC_CRC |
2528                             security.KERB_ENCTYPE_DES_CBC_MD5 |
2529                             security.KERB_ENCTYPE_RC4_HMAC_MD5)
2530
2531                         (supported_etypes,) = struct.unpack(
2532                             '<L',
2533                             enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2534
2535                         self.assertEqual(supported_etypes,
2536                                          expected_supported_etypes)
2537                     else:
2538                         self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2539
2540                     if '1' in sent_pac_options:
2541                         self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2542
2543                         pac_options = self.der_decode(
2544                             enc_pa_dict[PADATA_PAC_OPTIONS],
2545                             asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2546
2547                         self.assertElementEqual(pac_options, 'options',
2548                                                 sent_pac_options)
2549                     else:
2550                         self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2551                 else:
2552                     self.assertElementEqual(encpart_private,
2553                                             'encrypted-pa-data',
2554                                             [])
2555
2556         if ticket_session_key is not None and encpart_session_key is not None:
2557             self.assertEqual(ticket_session_key.etype,
2558                              encpart_session_key.etype)
2559             self.assertEqual(ticket_session_key.key.contents,
2560                              encpart_session_key.key.contents)
2561         if encpart_session_key is not None:
2562             session_key = encpart_session_key
2563         else:
2564             session_key = ticket_session_key
2565         ticket_creds = KerberosTicketCreds(
2566             ticket,
2567             session_key,
2568             crealm=expected_crealm,
2569             cname=expected_cname,
2570             srealm=expected_srealm,
2571             sname=expected_sname,
2572             decryption_key=ticket_decryption_key,
2573             ticket_private=ticket_private,
2574             encpart_private=encpart_private)
2575
2576         if ticket_private is not None:
2577             pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
2578             if expect_pac is True:
2579                 self.assertIsNotNone(pac_data)
2580             elif expect_pac is False:
2581                 self.assertIsNone(pac_data)
2582
2583             if pac_data is not None:
2584                 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2585
2586         expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2587         if expect_ticket_checksum:
2588             self.assertIsNotNone(ticket_decryption_key)
2589
2590         if ticket_decryption_key is not None:
2591             service_ticket = (not self.is_tgs(expected_sname)
2592                               and rep_msg_type == KRB_TGS_REP)
2593             self.verify_ticket(ticket_creds, krbtgt_keys,
2594                                service_ticket=service_ticket,
2595                                expect_pac=expect_pac,
2596                                expect_ticket_checksum=expect_ticket_checksum
2597                                or self.tkt_sig_support)
2598
2599         kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2600
2601     def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2602         pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2603
2604         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2605         armor_tgt = kdc_exchange_dict['armor_tgt']
2606
2607         expected_sname = kdc_exchange_dict['expected_sname']
2608         expect_claims = kdc_exchange_dict['expect_claims']
2609
2610         expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2611                           krb5pac.PAC_TYPE_SRV_CHECKSUM,
2612                           krb5pac.PAC_TYPE_KDC_CHECKSUM,
2613                           krb5pac.PAC_TYPE_LOGON_NAME,
2614                           krb5pac.PAC_TYPE_UPN_DNS_INFO]
2615
2616         kdc_options = kdc_exchange_dict['kdc_options']
2617         pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2618         constrained_delegation = (pos < len(kdc_options)
2619                                   and kdc_options[pos] == '1')
2620         if constrained_delegation:
2621             expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2622
2623         if self.kdc_fast_support:
2624             if expect_claims:
2625                 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2626
2627             if (rep_msg_type == KRB_TGS_REP
2628                     and armor_tgt is not None):
2629                 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2630                 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2631
2632         if not self.is_tgs(expected_sname) and rep_msg_type == KRB_TGS_REP:
2633             expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2634
2635         require_strict = {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO}
2636         if not self.tkt_sig_support:
2637             require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2638
2639         expect_extra_pac_buffers = self.is_tgs(expected_sname)
2640
2641         expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
2642
2643         if expect_pac_attrs:
2644             expect_pac_attrs_pac_request = kdc_exchange_dict[
2645                 'expect_pac_attrs_pac_request']
2646         else:
2647             expect_pac_attrs_pac_request = kdc_exchange_dict[
2648                 'pac_request']
2649
2650         if expect_pac_attrs is None:
2651             if self.expect_extra_pac_buffers:
2652                 expect_pac_attrs = expect_extra_pac_buffers
2653             else:
2654                 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
2655         if expect_pac_attrs:
2656             expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
2657
2658         expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
2659
2660         if expect_requester_sid is None:
2661             if self.expect_extra_pac_buffers:
2662                 expect_requester_sid = expect_extra_pac_buffers
2663             else:
2664                 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
2665         if expect_requester_sid:
2666             expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
2667
2668         buffer_types = [pac_buffer.type
2669                         for pac_buffer in pac.buffers]
2670         self.assertSequenceElementsEqual(
2671             expected_types, buffer_types,
2672             require_ordered=False,
2673             require_strict=require_strict)
2674
2675         expected_account_name = kdc_exchange_dict['expected_account_name']
2676         expected_sid = kdc_exchange_dict['expected_sid']
2677
2678         expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
2679         if expect_upn_dns_info_ex is None and (
2680                 expected_account_name is not None
2681                 or expected_sid is not None):
2682             expect_upn_dns_info_ex = True
2683
2684         for pac_buffer in pac.buffers:
2685             if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
2686                 expected_proxy_target = kdc_exchange_dict[
2687                     'expected_proxy_target']
2688                 expected_transited_services = kdc_exchange_dict[
2689                     'expected_transited_services']
2690
2691                 delegation_info = pac_buffer.info.info
2692
2693                 self.assertEqual(expected_proxy_target,
2694                                  str(delegation_info.proxy_target))
2695
2696                 transited_services = list(map(
2697                     str, delegation_info.transited_services))
2698                 self.assertEqual(expected_transited_services,
2699                                  transited_services)
2700
2701             elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
2702                 expected_cname = kdc_exchange_dict['expected_cname']
2703                 account_name = expected_cname['name-string'][0]
2704
2705                 self.assertEqual(account_name, pac_buffer.info.account_name)
2706
2707             elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
2708                 logon_info = pac_buffer.info.info.info3.base
2709
2710                 if expected_account_name is not None:
2711                     self.assertEqual(expected_account_name,
2712                                      str(logon_info.account_name))
2713
2714                 if expected_sid is not None:
2715                     expected_rid = int(expected_sid.rsplit('-', 1)[1])
2716                     self.assertEqual(expected_rid, logon_info.rid)
2717
2718             elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
2719                 upn_dns_info = pac_buffer.info
2720                 upn_dns_info_ex = upn_dns_info.ex
2721
2722                 expected_realm = kdc_exchange_dict['expected_crealm']
2723                 self.assertEqual(expected_realm,
2724                                  upn_dns_info.dns_domain_name)
2725
2726                 expected_upn_name = kdc_exchange_dict['expected_upn_name']
2727                 if expected_upn_name is not None:
2728                     self.assertEqual(expected_upn_name,
2729                                      upn_dns_info.upn_name)
2730
2731                 if expect_upn_dns_info_ex:
2732                     self.assertIsNotNone(upn_dns_info_ex)
2733
2734                 if upn_dns_info_ex is not None:
2735                     if expected_account_name is not None:
2736                         self.assertEqual(expected_account_name,
2737                                          upn_dns_info_ex.samaccountname)
2738
2739                     if expected_sid is not None:
2740                         self.assertEqual(expected_sid,
2741                                          str(upn_dns_info_ex.objectsid))
2742
2743             elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
2744                       and expect_pac_attrs):
2745                 attr_info = pac_buffer.info
2746
2747                 self.assertEqual(2, attr_info.flags_length)
2748
2749                 flags = attr_info.flags
2750
2751                 requested_pac = bool(flags & 1)
2752                 given_pac = bool(flags & 2)
2753
2754                 self.assertEqual(expect_pac_attrs_pac_request is True,
2755                                  requested_pac)
2756                 self.assertEqual(expect_pac_attrs_pac_request is None,
2757                                  given_pac)
2758
2759             elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
2760                       and expect_requester_sid):
2761                 requester_sid = pac_buffer.info.sid
2762
2763                 if expected_sid is not None:
2764                     self.assertEqual(expected_sid, str(requester_sid))
2765
2766     def generic_check_kdc_error(self,
2767                                 kdc_exchange_dict,
2768                                 callback_dict,
2769                                 rep,
2770                                 inner=False):
2771
2772         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2773
2774         expected_anon = kdc_exchange_dict['expected_anon']
2775         expected_srealm = kdc_exchange_dict['expected_srealm']
2776         expected_sname = kdc_exchange_dict['expected_sname']
2777         expected_error_mode = kdc_exchange_dict['expected_error_mode']
2778
2779         sent_fast = self.sent_fast(kdc_exchange_dict)
2780
2781         fast_armor_type = kdc_exchange_dict['fast_armor_type']
2782
2783         self.assertElementEqual(rep, 'pvno', 5)
2784         self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
2785         error_code = self.getElementValue(rep, 'error-code')
2786         self.assertIn(error_code, expected_error_mode)
2787         if self.strict_checking:
2788             self.assertElementMissing(rep, 'ctime')
2789             self.assertElementMissing(rep, 'cusec')
2790         self.assertElementPresent(rep, 'stime')
2791         self.assertElementPresent(rep, 'susec')
2792         # error-code checked above
2793         if self.strict_checking:
2794             self.assertElementMissing(rep, 'crealm')
2795             if expected_anon and not inner:
2796                 expected_cname = self.PrincipalName_create(
2797                     name_type=NT_WELLKNOWN,
2798                     names=['WELLKNOWN', 'ANONYMOUS'])
2799                 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2800             else:
2801                 self.assertElementMissing(rep, 'cname')
2802             self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
2803             self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
2804             self.assertElementMissing(rep, 'e-text')
2805         expected_status = kdc_exchange_dict['expected_status']
2806         expect_edata = kdc_exchange_dict['expect_edata']
2807         if expect_edata is None:
2808             expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
2809                             and (not sent_fast or fast_armor_type is None
2810                                  or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
2811                             and not inner)
2812         if not expect_edata:
2813             self.assertIsNone(expected_status)
2814             self.assertElementMissing(rep, 'e-data')
2815             return rep
2816         edata = self.getElementValue(rep, 'e-data')
2817         if self.strict_checking:
2818             self.assertIsNotNone(edata)
2819         if edata is not None:
2820             if rep_msg_type == KRB_TGS_REP and not sent_fast:
2821                 error_data = self.der_decode(
2822                     edata,
2823                     asn1Spec=krb5_asn1.KERB_ERROR_DATA())
2824                 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
2825                                  error_data['data-type'])
2826
2827                 extended_error = error_data['data-value']
2828
2829                 self.assertEqual(12, len(extended_error))
2830
2831                 status = int.from_bytes(extended_error[:4], 'little')
2832                 flags = int.from_bytes(extended_error[8:], 'little')
2833
2834                 self.assertEqual(expected_status, status)
2835
2836                 self.assertEqual(3, flags)
2837             else:
2838                 self.assertIsNone(expected_status)
2839
2840                 rep_padata = self.der_decode(edata,
2841                                              asn1Spec=krb5_asn1.METHOD_DATA())
2842                 self.assertGreater(len(rep_padata), 0)
2843
2844                 if sent_fast:
2845                     self.assertEqual(1, len(rep_padata))
2846                     rep_pa_dict = self.get_pa_dict(rep_padata)
2847                     self.assertIn(PADATA_FX_FAST, rep_pa_dict)
2848
2849                     armor_key = kdc_exchange_dict['armor_key']
2850                     self.assertIsNotNone(armor_key)
2851                     fast_response = self.check_fx_fast_data(
2852                         kdc_exchange_dict,
2853                         rep_pa_dict[PADATA_FX_FAST],
2854                         armor_key,
2855                         expect_strengthen_key=False)
2856
2857                     rep_padata = fast_response['padata']
2858
2859                 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
2860                                                     callback_dict,
2861                                                     rep_padata,
2862                                                     error_code)
2863
2864                 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
2865
2866         return rep
2867
2868     def check_rep_padata(self,
2869                          kdc_exchange_dict,
2870                          callback_dict,
2871                          rep_padata,
2872                          error_code):
2873         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2874
2875         req_body = kdc_exchange_dict['req_body']
2876         proposed_etypes = req_body['etype']
2877         client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
2878
2879         sent_fast = self.sent_fast(kdc_exchange_dict)
2880         sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
2881
2882         if rep_msg_type == KRB_TGS_REP:
2883             self.assertTrue(sent_fast)
2884
2885         expect_etype_info2 = ()
2886         expect_etype_info = False
2887         expected_aes_type = 0
2888         expected_rc4_type = 0
2889         if kcrypto.Enctype.RC4 in proposed_etypes:
2890             expect_etype_info = True
2891         for etype in proposed_etypes:
2892             if etype not in client_as_etypes:
2893                 continue
2894             if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
2895                 expect_etype_info = False
2896                 if etype > expected_aes_type:
2897                     expected_aes_type = etype
2898             if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
2899                 if etype > expected_rc4_type:
2900                     expected_rc4_type = etype
2901
2902         if expected_aes_type != 0:
2903             expect_etype_info2 += (expected_aes_type,)
2904         if expected_rc4_type != 0:
2905             expect_etype_info2 += (expected_rc4_type,)
2906
2907         expected_patypes = ()
2908         if sent_fast and error_code != 0:
2909             expected_patypes += (PADATA_FX_ERROR,)
2910             expected_patypes += (PADATA_FX_COOKIE,)
2911
2912         if rep_msg_type == KRB_TGS_REP:
2913             sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2914             if ('1' in sent_pac_options
2915                     and error_code not in (0, KDC_ERR_GENERIC)):
2916                 expected_patypes += (PADATA_PAC_OPTIONS,)
2917         elif error_code != KDC_ERR_GENERIC:
2918             if expect_etype_info:
2919                 self.assertGreater(len(expect_etype_info2), 0)
2920                 expected_patypes += (PADATA_ETYPE_INFO,)
2921             if len(expect_etype_info2) != 0:
2922                 expected_patypes += (PADATA_ETYPE_INFO2,)
2923
2924             if error_code != KDC_ERR_PREAUTH_FAILED:
2925                 if sent_fast:
2926                     expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
2927                 else:
2928                     expected_patypes += (PADATA_ENC_TIMESTAMP,)
2929
2930                 if not sent_enc_challenge:
2931                     expected_patypes += (PADATA_PK_AS_REQ,)
2932                     expected_patypes += (PADATA_PK_AS_REP_19,)
2933
2934             if (self.kdc_fast_support
2935                     and not sent_fast
2936                     and not sent_enc_challenge):
2937                 expected_patypes += (PADATA_FX_FAST,)
2938                 expected_patypes += (PADATA_FX_COOKIE,)
2939
2940         got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
2941         self.assertSequenceElementsEqual(expected_patypes, got_patypes,
2942                                          require_strict={PADATA_FX_COOKIE,
2943                                                          PADATA_FX_FAST,
2944                                                          PADATA_PAC_OPTIONS,
2945                                                          PADATA_PK_AS_REP_19,
2946                                                          PADATA_PK_AS_REQ})
2947
2948         if not expected_patypes:
2949             return None
2950
2951         pa_dict = self.get_pa_dict(rep_padata)
2952
2953         enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
2954         if enc_timestamp is not None:
2955             self.assertEqual(len(enc_timestamp), 0)
2956
2957         pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
2958         if pk_as_req is not None:
2959             self.assertEqual(len(pk_as_req), 0)
2960
2961         pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
2962         if pk_as_rep19 is not None:
2963             self.assertEqual(len(pk_as_rep19), 0)
2964
2965         fx_fast = pa_dict.get(PADATA_FX_FAST)
2966         if fx_fast is not None:
2967             self.assertEqual(len(fx_fast), 0)
2968
2969         fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
2970         if fast_cookie is not None:
2971             kdc_exchange_dict['fast_cookie'] = fast_cookie
2972
2973         fast_error = pa_dict.get(PADATA_FX_ERROR)
2974         if fast_error is not None:
2975             fast_error = self.der_decode(fast_error,
2976                                          asn1Spec=krb5_asn1.KRB_ERROR())
2977             self.generic_check_kdc_error(kdc_exchange_dict,
2978                                          callback_dict,
2979                                          fast_error,
2980                                          inner=True)
2981
2982         pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
2983         if pac_options is not None:
2984             pac_options = self.der_decode(
2985                 pac_options,
2986                 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2987             self.assertElementEqual(pac_options, 'options', sent_pac_options)
2988
2989         enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
2990         if enc_challenge is not None:
2991             if not sent_enc_challenge:
2992                 self.assertEqual(len(enc_challenge), 0)
2993             else:
2994                 armor_key = kdc_exchange_dict['armor_key']
2995                 self.assertIsNotNone(armor_key)
2996
2997                 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
2998
2999                 kdc_challenge_key = self.generate_kdc_challenge_key(
3000                     armor_key, preauth_key)
3001
3002                 # Ensure that the encrypted challenge FAST factor is supported
3003                 # (RFC6113 5.4.6).
3004                 if self.strict_checking:
3005                     self.assertNotEqual(len(enc_challenge), 0)
3006                 if len(enc_challenge) != 0:
3007                     encrypted_challenge = self.der_decode(
3008                         enc_challenge,
3009                         asn1Spec=krb5_asn1.EncryptedData())
3010                     self.assertEqual(encrypted_challenge['etype'],
3011                                      kdc_challenge_key.etype)
3012
3013                     challenge = kdc_challenge_key.decrypt(
3014                         KU_ENC_CHALLENGE_KDC,
3015                         encrypted_challenge['cipher'])
3016                     challenge = self.der_decode(
3017                         challenge,
3018                         asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
3019
3020                     # Retrieve the returned timestamp.
3021                     rep_patime = challenge['patimestamp']
3022                     self.assertIn('pausec', challenge)
3023
3024                     # Ensure the returned time is within five minutes of the
3025                     # current time.
3026                     rep_time = self.get_EpochFromKerberosTime(rep_patime)
3027                     current_time = time.time()
3028
3029                     self.assertLess(current_time - 300, rep_time)
3030                     self.assertLess(rep_time, current_time + 300)
3031
3032         etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
3033         if etype_info2 is not None:
3034             etype_info2 = self.der_decode(etype_info2,
3035                                           asn1Spec=krb5_asn1.ETYPE_INFO2())
3036             self.assertGreaterEqual(len(etype_info2), 1)
3037             if self.strict_checking:
3038                 self.assertEqual(len(etype_info2), len(expect_etype_info2))
3039             for i in range(0, len(etype_info2)):
3040                 e = self.getElementValue(etype_info2[i], 'etype')
3041                 if self.strict_checking:
3042                     self.assertEqual(e, expect_etype_info2[i])
3043                 salt = self.getElementValue(etype_info2[i], 'salt')
3044                 if e == kcrypto.Enctype.RC4:
3045                     if self.strict_checking:
3046                         self.assertIsNone(salt)
3047                 else:
3048                     self.assertIsNotNone(salt)
3049                     expected_salt = kdc_exchange_dict['expected_salt']
3050                     if expected_salt is not None:
3051                         self.assertEqual(salt, expected_salt)
3052                 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
3053                 if self.strict_checking:
3054                     self.assertIsNone(s2kparams)
3055
3056         etype_info = pa_dict.get(PADATA_ETYPE_INFO)
3057         if etype_info is not None:
3058             etype_info = self.der_decode(etype_info,
3059                                          asn1Spec=krb5_asn1.ETYPE_INFO())
3060             self.assertEqual(len(etype_info), 1)
3061             e = self.getElementValue(etype_info[0], 'etype')
3062             self.assertEqual(e, kcrypto.Enctype.RC4)
3063             self.assertEqual(e, expect_etype_info2[0])
3064             salt = self.getElementValue(etype_info[0], 'salt')
3065             if self.strict_checking:
3066                 self.assertIsNotNone(salt)
3067                 self.assertEqual(len(salt), 0)
3068
3069         return etype_info2
3070
3071     def generate_simple_fast(self,
3072                              kdc_exchange_dict,
3073                              _callback_dict,
3074                              req_body,
3075                              fast_padata,
3076                              fast_armor,
3077                              checksum,
3078                              fast_options=''):
3079         armor_key = kdc_exchange_dict['armor_key']
3080
3081         fast_req = self.KRB_FAST_REQ_create(fast_options,
3082                                             fast_padata,
3083                                             req_body)
3084         fast_req = self.der_encode(fast_req,
3085                                    asn1Spec=krb5_asn1.KrbFastReq())
3086         fast_req = self.EncryptedData_create(armor_key,
3087                                              KU_FAST_ENC,
3088                                              fast_req)
3089
3090         fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
3091                                                             checksum,
3092                                                             fast_req)
3093
3094         fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
3095         fx_fast_request = self.der_encode(
3096             fx_fast_request,
3097             asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
3098
3099         fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
3100                                           fx_fast_request)
3101
3102         return fast_padata
3103
3104     def generate_ap_req(self,
3105                         kdc_exchange_dict,
3106                         _callback_dict,
3107                         req_body,
3108                         armor):
3109         if armor:
3110             tgt = kdc_exchange_dict['armor_tgt']
3111             authenticator_subkey = kdc_exchange_dict['armor_subkey']
3112
3113             req_body_checksum = None
3114         else:
3115             tgt = kdc_exchange_dict['tgt']
3116             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3117             body_checksum_type = kdc_exchange_dict['body_checksum_type']
3118
3119             req_body_blob = self.der_encode(req_body,
3120                                             asn1Spec=krb5_asn1.KDC_REQ_BODY())
3121
3122             req_body_checksum = self.Checksum_create(tgt.session_key,
3123                                                      KU_TGS_REQ_AUTH_CKSUM,
3124                                                      req_body_blob,
3125                                                      ctype=body_checksum_type)
3126
3127         auth_data = kdc_exchange_dict['auth_data']
3128
3129         subkey_obj = None
3130         if authenticator_subkey is not None:
3131             subkey_obj = authenticator_subkey.export_obj()
3132         seq_number = random.randint(0, 0xfffffffe)
3133         (ctime, cusec) = self.get_KerberosTimeWithUsec()
3134         authenticator_obj = self.Authenticator_create(
3135             crealm=tgt.crealm,
3136             cname=tgt.cname,
3137             cksum=req_body_checksum,
3138             cusec=cusec,
3139             ctime=ctime,
3140             subkey=subkey_obj,
3141             seq_number=seq_number,
3142             authorization_data=auth_data)
3143         authenticator_blob = self.der_encode(
3144             authenticator_obj,
3145             asn1Spec=krb5_asn1.Authenticator())
3146
3147         usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
3148         authenticator = self.EncryptedData_create(tgt.session_key,
3149                                                   usage,
3150                                                   authenticator_blob)
3151
3152         ap_options = krb5_asn1.APOptions('0')
3153         ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
3154                                         ticket=tgt.ticket,
3155                                         authenticator=authenticator)
3156         ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
3157
3158         return ap_req
3159
3160     def generate_simple_tgs_padata(self,
3161                                    kdc_exchange_dict,
3162                                    callback_dict,
3163                                    req_body):
3164         ap_req = self.generate_ap_req(kdc_exchange_dict,
3165                                       callback_dict,
3166                                       req_body,
3167                                       armor=False)
3168         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
3169         padata = [pa_tgs_req]
3170
3171         return padata, req_body
3172
3173     def get_preauth_key(self, kdc_exchange_dict):
3174         msg_type = kdc_exchange_dict['rep_msg_type']
3175
3176         if msg_type == KRB_AS_REP:
3177             key = kdc_exchange_dict['preauth_key']
3178             usage = KU_AS_REP_ENC_PART
3179         else:  # KRB_TGS_REP
3180             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3181             if authenticator_subkey is not None:
3182                 key = authenticator_subkey
3183                 usage = KU_TGS_REP_ENC_PART_SUB_KEY
3184             else:
3185                 tgt = kdc_exchange_dict['tgt']
3186                 key = tgt.session_key
3187                 usage = KU_TGS_REP_ENC_PART_SESSION
3188
3189         self.assertIsNotNone(key)
3190
3191         return key, usage
3192
3193     def generate_armor_key(self, subkey, session_key):
3194         armor_key = kcrypto.cf2(subkey.key,
3195                                 session_key.key,
3196                                 b'subkeyarmor',
3197                                 b'ticketarmor')
3198         armor_key = Krb5EncryptionKey(armor_key, None)
3199
3200         return armor_key
3201
3202     def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3203         strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3204                                            reply_key.key,
3205                                            b'strengthenkey',
3206                                            b'replykey')
3207         strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3208                                                  reply_key.kvno)
3209
3210         return strengthen_reply_key
3211
3212     def generate_client_challenge_key(self, armor_key, longterm_key):
3213         client_challenge_key = kcrypto.cf2(armor_key.key,
3214                                            longterm_key.key,
3215                                            b'clientchallengearmor',
3216                                            b'challengelongterm')
3217         client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3218
3219         return client_challenge_key
3220
3221     def generate_kdc_challenge_key(self, armor_key, longterm_key):
3222         kdc_challenge_key = kcrypto.cf2(armor_key.key,
3223                                         longterm_key.key,
3224                                         b'kdcchallengearmor',
3225                                         b'challengelongterm')
3226         kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3227
3228         return kdc_challenge_key
3229
3230     def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3231         expected_type = expected_checksum['cksumtype']
3232         self.assertEqual(armor_key.ctype, expected_type)
3233
3234         ticket_blob = self.der_encode(ticket,
3235                                       asn1Spec=krb5_asn1.Ticket())
3236         checksum = self.Checksum_create(armor_key,
3237                                         KU_FAST_FINISHED,
3238                                         ticket_blob)
3239         self.assertEqual(expected_checksum, checksum)
3240
3241     def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
3242                       expect_pac=True,
3243                       expect_ticket_checksum=True):
3244         # Decrypt the ticket.
3245
3246         key = ticket.decryption_key
3247         enc_part = ticket.ticket['enc-part']
3248
3249         self.assertElementEqual(enc_part, 'etype', key.etype)
3250         self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3251
3252         enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3253         enc_part = self.der_decode(
3254             enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3255
3256         # Fetch the authorization data from the ticket.
3257         auth_data = enc_part.get('authorization-data')
3258         if expect_pac:
3259             self.assertIsNotNone(auth_data)
3260         elif auth_data is None:
3261             return
3262
3263         # Get a copy of the authdata with an empty PAC, and the existing PAC
3264         # (if present).
3265         empty_pac = self.get_empty_pac()
3266         auth_data, pac_data = self.replace_pac(auth_data,
3267                                                empty_pac,
3268                                                expect_pac=expect_pac)
3269         if not expect_pac:
3270             return
3271
3272         # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3273         # raw type to create a new PAC with zeroed signatures for
3274         # verification. This is because on Windows, the resource_groups field
3275         # is added to PAC_LOGON_INFO after the info3 field has been created,
3276         # which results in a different ordering of pointer values than Samba
3277         # (see commit 0e201ecdc53). Using the raw type avoids changing
3278         # PAC_LOGON_INFO, so verification against Windows can work. We still
3279         # need the PAC_DATA type to retrieve the actual checksums, because the
3280         # signatures in the raw type may contain padding bytes.
3281         pac = ndr_unpack(krb5pac.PAC_DATA,
3282                          pac_data)
3283         raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3284                              pac_data)
3285
3286         checksums = {}
3287
3288         for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3289             buffer_type = pac_buffer.type
3290             if buffer_type in self.pac_checksum_types:
3291                 self.assertNotIn(buffer_type, checksums,
3292                                  f'Duplicate checksum type {buffer_type}')
3293
3294                 # Fetch the checksum and the checksum type from the PAC buffer.
3295                 checksum = pac_buffer.info.signature
3296                 ctype = pac_buffer.info.type
3297                 if ctype & 1 << 31:
3298                     ctype |= -1 << 31
3299
3300                 checksums[buffer_type] = checksum, ctype
3301
3302                 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3303                     # Zero the checksum field so that we can later verify the
3304                     # checksums. The ticket checksum field is not zeroed.
3305
3306                     signature = ndr_unpack(
3307                         krb5pac.PAC_SIGNATURE_DATA,
3308                         raw_pac_buffer.info.remaining)
3309                     signature.signature = bytes(len(checksum))
3310                     raw_pac_buffer.info.remaining = ndr_pack(
3311                         signature)
3312
3313         # Re-encode the PAC.
3314         pac_data = ndr_pack(raw_pac)
3315
3316         # Verify the signatures.
3317
3318         server_checksum, server_ctype = checksums[
3319             krb5pac.PAC_TYPE_SRV_CHECKSUM]
3320         key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3321                             pac_data,
3322                             server_ctype,
3323                             server_checksum)
3324
3325         kdc_checksum, kdc_ctype = checksums[
3326             krb5pac.PAC_TYPE_KDC_CHECKSUM]
3327
3328         if isinstance(krbtgt_keys, collections.abc.Container):
3329             if self.strict_checking:
3330                 krbtgt_key = krbtgt_keys[0]
3331             else:
3332                 krbtgt_key = next(key for key in krbtgt_keys
3333                                   if key.ctype == kdc_ctype)
3334         else:
3335             krbtgt_key = krbtgt_keys
3336
3337         krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3338                                         server_checksum,
3339                                         kdc_ctype,
3340                                         kdc_checksum)
3341
3342         if not service_ticket:
3343             self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3344         else:
3345             ticket_checksum, ticket_ctype = checksums.get(
3346                 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3347                 (None, None))
3348             if expect_ticket_checksum:
3349                 self.assertIsNotNone(ticket_checksum)
3350             elif expect_ticket_checksum is False:
3351                 self.assertIsNone(ticket_checksum)
3352             if ticket_checksum is not None:
3353                 enc_part['authorization-data'] = auth_data
3354                 enc_part = self.der_encode(enc_part,
3355                                            asn1Spec=krb5_asn1.EncTicketPart())
3356
3357                 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3358                                                 enc_part,
3359                                                 ticket_ctype,
3360                                                 ticket_checksum)
3361
3362     def modified_ticket(self,
3363                         ticket, *,
3364                         new_ticket_key=None,
3365                         modify_fn=None,
3366                         modify_pac_fn=None,
3367                         exclude_pac=False,
3368                         allow_empty_authdata=False,
3369                         update_pac_checksums=True,
3370                         checksum_keys=None,
3371                         include_checksums=None):
3372         if checksum_keys is None:
3373             # A dict containing a key for each checksum type to be created in
3374             # the PAC.
3375             checksum_keys = {}
3376
3377         if include_checksums is None:
3378             # A dict containing a value for each checksum type; True if the
3379             # checksum type is to be included in the PAC, False if it is to be
3380             # excluded, or None/not present if the checksum is to be included
3381             # based on its presence in the original PAC.
3382             include_checksums = {}
3383
3384         # Check that the values passed in by the caller make sense.
3385
3386         self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3387         self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3388
3389         if exclude_pac:
3390             self.assertIsNone(modify_pac_fn)
3391
3392             update_pac_checksums = False
3393
3394         if not update_pac_checksums:
3395             self.assertFalse(checksum_keys)
3396             self.assertFalse(include_checksums)
3397
3398         expect_pac = modify_pac_fn is not None
3399
3400         key = ticket.decryption_key
3401
3402         if new_ticket_key is None:
3403             # Use the same key to re-encrypt the ticket.
3404             new_ticket_key = key
3405
3406         if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3407             # If the server signature key is not present, fall back to the key
3408             # used to encrypt the ticket.
3409             checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3410
3411         if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3412             # If the ticket signature key is not present, fall back to the key
3413             # used for the KDC signature.
3414             kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3415             if kdc_checksum_key is not None:
3416                 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3417                     kdc_checksum_key)
3418
3419         # Decrypt the ticket.
3420
3421         enc_part = ticket.ticket['enc-part']
3422
3423         self.assertElementEqual(enc_part, 'etype', key.etype)
3424         self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3425
3426         enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3427         enc_part = self.der_decode(
3428             enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3429
3430         # Modify the ticket here.
3431         if modify_fn is not None:
3432             enc_part = modify_fn(enc_part)
3433
3434         auth_data = enc_part.get('authorization-data')
3435         if expect_pac:
3436             self.assertIsNotNone(auth_data)
3437         if auth_data is not None:
3438             new_pac = None
3439             if not exclude_pac:
3440                 # Get a copy of the authdata with an empty PAC, and the
3441                 # existing PAC (if present).
3442                 empty_pac = self.get_empty_pac()
3443                 empty_pac_auth_data, pac_data = self.replace_pac(
3444                     auth_data,
3445                     empty_pac,
3446                     expect_pac=expect_pac)
3447
3448                 if pac_data is not None:
3449                     pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3450
3451                     # Modify the PAC here.
3452                     if modify_pac_fn is not None:
3453                         pac = modify_pac_fn(pac)
3454
3455                     if update_pac_checksums:
3456                         # Get the enc-part with an empty PAC, which is needed
3457                         # to create a ticket signature.
3458                         enc_part_to_sign = enc_part.copy()
3459                         enc_part_to_sign['authorization-data'] = (
3460                             empty_pac_auth_data)
3461                         enc_part_to_sign = self.der_encode(
3462                             enc_part_to_sign,
3463                             asn1Spec=krb5_asn1.EncTicketPart())
3464
3465                         self.update_pac_checksums(pac,
3466                                                   checksum_keys,
3467                                                   include_checksums,
3468                                                   enc_part_to_sign)
3469
3470                     # Re-encode the PAC.
3471                     pac_data = ndr_pack(pac)
3472                     new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3473                                                             pac_data)
3474
3475             # Replace the PAC in the authorization data and re-add it to the
3476             # ticket enc-part.
3477             auth_data, _ = self.replace_pac(
3478                 auth_data, new_pac,
3479                 expect_pac=expect_pac,
3480                 allow_empty_authdata=allow_empty_authdata)
3481             enc_part['authorization-data'] = auth_data
3482
3483         # Re-encrypt the ticket enc-part with the new key.
3484         enc_part_new = self.der_encode(enc_part,
3485                                        asn1Spec=krb5_asn1.EncTicketPart())
3486         enc_part_new = self.EncryptedData_create(new_ticket_key,
3487                                                  KU_TICKET,
3488                                                  enc_part_new)
3489
3490         # Create a copy of the ticket with the new enc-part.
3491         new_ticket = ticket.ticket.copy()
3492         new_ticket['enc-part'] = enc_part_new
3493
3494         new_ticket_creds = KerberosTicketCreds(
3495             new_ticket,
3496             session_key=ticket.session_key,
3497             crealm=ticket.crealm,
3498             cname=ticket.cname,
3499             srealm=ticket.srealm,
3500             sname=ticket.sname,
3501             decryption_key=new_ticket_key,
3502             ticket_private=enc_part,
3503             encpart_private=ticket.encpart_private)
3504
3505         return new_ticket_creds
3506
3507     def update_pac_checksums(self,
3508                              pac,
3509                              checksum_keys,
3510                              include_checksums,
3511                              enc_part=None):
3512         pac_buffers = pac.buffers
3513         checksum_buffers = {}
3514
3515         # Find the relevant PAC checksum buffers.
3516         for pac_buffer in pac_buffers:
3517             buffer_type = pac_buffer.type
3518             if buffer_type in self.pac_checksum_types:
3519                 self.assertNotIn(buffer_type, checksum_buffers,
3520                                  f'Duplicate checksum type {buffer_type}')
3521
3522                 checksum_buffers[buffer_type] = pac_buffer
3523
3524         # Create any additional buffers that were requested but not
3525         # present. Conversely, remove any buffers that were requested to be
3526         # removed.
3527         for buffer_type in self.pac_checksum_types:
3528             if buffer_type in checksum_buffers:
3529                 if include_checksums.get(buffer_type) is False:
3530                     checksum_buffer = checksum_buffers.pop(buffer_type)
3531
3532                     pac.num_buffers -= 1
3533                     pac_buffers.remove(checksum_buffer)
3534
3535             elif include_checksums.get(buffer_type) is True:
3536                 info = krb5pac.PAC_SIGNATURE_DATA()
3537
3538                 checksum_buffer = krb5pac.PAC_BUFFER()
3539                 checksum_buffer.type = buffer_type
3540                 checksum_buffer.info = info
3541
3542                 pac_buffers.append(checksum_buffer)
3543                 pac.num_buffers += 1
3544
3545                 checksum_buffers[buffer_type] = checksum_buffer
3546
3547         # Fill the relevant checksum buffers.
3548         for buffer_type, checksum_buffer in checksum_buffers.items():
3549             checksum_key = checksum_keys[buffer_type]
3550             ctype = checksum_key.ctype & ((1 << 32) - 1)
3551
3552             if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3553                 self.assertIsNotNone(enc_part)
3554
3555                 signature = checksum_key.make_rodc_checksum(
3556                     KU_NON_KERB_CKSUM_SALT,
3557                     enc_part)
3558
3559             elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3560                 signature = checksum_key.make_zeroed_checksum()
3561
3562             else:
3563                 signature = checksum_key.make_rodc_zeroed_checksum()
3564
3565             checksum_buffer.info.signature = signature
3566             checksum_buffer.info.type = ctype
3567
3568         # Add the new checksum buffers to the PAC.
3569         pac.buffers = pac_buffers
3570
3571         # Calculate the server and KDC checksums and insert them into the PAC.
3572
3573         server_checksum_buffer = checksum_buffers.get(
3574             krb5pac.PAC_TYPE_SRV_CHECKSUM)
3575         if server_checksum_buffer is not None:
3576             server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3577
3578             pac_data = ndr_pack(pac)
3579             server_checksum = server_checksum_key.make_checksum(
3580                 KU_NON_KERB_CKSUM_SALT,
3581                 pac_data)
3582
3583             server_checksum_buffer.info.signature = server_checksum
3584
3585         kdc_checksum_buffer = checksum_buffers.get(
3586             krb5pac.PAC_TYPE_KDC_CHECKSUM)
3587         if kdc_checksum_buffer is not None:
3588             if server_checksum_buffer is None:
3589                 # There's no server signature to make the checksum over, so
3590                 # just make the checksum over an empty bytes object.
3591                 server_checksum = bytes()
3592
3593             kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3594
3595             kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3596                 KU_NON_KERB_CKSUM_SALT,
3597                 server_checksum)
3598
3599             kdc_checksum_buffer.info.signature = kdc_checksum
3600
3601     def replace_pac(self, auth_data, new_pac, expect_pac=True,
3602                     allow_empty_authdata=False):
3603         if new_pac is not None:
3604             self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3605             self.assertElementPresent(new_pac, 'ad-data')
3606
3607         new_auth_data = []
3608
3609         ad_relevant = None
3610         old_pac = None
3611
3612         for authdata_elem in auth_data:
3613             if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3614                 ad_relevant = self.der_decode(
3615                     authdata_elem['ad-data'],
3616                     asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3617
3618                 relevant_elems = []
3619                 for relevant_elem in ad_relevant:
3620                     if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3621                         self.assertIsNone(old_pac, 'Multiple PACs detected')
3622                         old_pac = relevant_elem['ad-data']
3623
3624                         if new_pac is not None:
3625                             relevant_elems.append(new_pac)
3626                     else:
3627                         relevant_elems.append(relevant_elem)
3628                 if expect_pac:
3629                     self.assertIsNotNone(old_pac, 'Expected PAC')
3630
3631                 if relevant_elems or allow_empty_authdata:
3632                     ad_relevant = self.der_encode(
3633                         relevant_elems,
3634                         asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3635
3636                     authdata_elem = self.AuthorizationData_create(
3637                         AD_IF_RELEVANT,
3638                         ad_relevant)
3639                 else:
3640                     authdata_elem = None
3641
3642             if authdata_elem is not None or allow_empty_authdata:
3643                 new_auth_data.append(authdata_elem)
3644
3645         if expect_pac:
3646             self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3647
3648         return new_auth_data, old_pac
3649
3650     def get_pac(self, auth_data, expect_pac=True):
3651         _, pac = self.replace_pac(auth_data, None, expect_pac)
3652         return pac
3653
3654     def get_ticket_pac(self, ticket, expect_pac=True):
3655         auth_data = ticket.ticket_private.get('authorization-data')
3656         if expect_pac:
3657             self.assertIsNotNone(auth_data)
3658         elif auth_data is None:
3659             return None
3660
3661         return self.get_pac(auth_data, expect_pac=expect_pac)
3662
3663     def get_krbtgt_checksum_key(self):
3664         krbtgt_creds = self.get_krbtgt_creds()
3665         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
3666
3667         return {
3668             krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
3669         }
3670
3671     def is_tgs(self, principal):
3672         name = principal['name-string'][0]
3673         return name in ('krbtgt', b'krbtgt')
3674
3675     def is_tgt(self, ticket):
3676         sname = ticket.ticket['sname']
3677         return self.is_tgs(sname)
3678
3679     def get_empty_pac(self):
3680         return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
3681
3682     def get_outer_pa_dict(self, kdc_exchange_dict):
3683         return self.get_pa_dict(kdc_exchange_dict['req_padata'])
3684
3685     def get_fast_pa_dict(self, kdc_exchange_dict):
3686         req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
3687
3688         if req_pa_dict:
3689             return req_pa_dict
3690
3691         return self.get_outer_pa_dict(kdc_exchange_dict)
3692
3693     def sent_fast(self, kdc_exchange_dict):
3694         outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
3695
3696         return PADATA_FX_FAST in outer_pa_dict
3697
3698     def sent_enc_challenge(self, kdc_exchange_dict):
3699         fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3700
3701         return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
3702
3703     def get_sent_pac_options(self, kdc_exchange_dict):
3704         fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
3705
3706         if PADATA_PAC_OPTIONS not in fast_pa_dict:
3707             return ''
3708
3709         pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
3710                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3711         pac_options = pac_options['options']
3712
3713         # Mask out unsupported bits.
3714         pac_options, remaining = pac_options[:4], pac_options[4:]
3715         pac_options += '0' * len(remaining)
3716
3717         return pac_options
3718
3719     def get_krbtgt_sname(self):
3720         krbtgt_creds = self.get_krbtgt_creds()
3721         krbtgt_username = krbtgt_creds.get_username()
3722         krbtgt_realm = krbtgt_creds.get_realm()
3723         krbtgt_sname = self.PrincipalName_create(
3724             name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
3725
3726         return krbtgt_sname
3727
3728     def _test_as_exchange(self,
3729                           cname,
3730                           realm,
3731                           sname,
3732                           till,
3733                           client_as_etypes,
3734                           expected_error_mode,
3735                           expected_crealm,
3736                           expected_cname,
3737                           expected_srealm,
3738                           expected_sname,
3739                           expected_salt,
3740                           etypes,
3741                           padata,
3742                           kdc_options,
3743                           expected_account_name=None,
3744                           expected_upn_name=None,
3745                           expected_sid=None,
3746                           expected_flags=None,
3747                           unexpected_flags=None,
3748                           expected_supported_etypes=None,
3749                           preauth_key=None,
3750                           ticket_decryption_key=None,
3751                           pac_request=None,
3752                           pac_options=None,
3753                           expect_pac=True,
3754                           expect_pac_attrs=None,
3755                           expect_pac_attrs_pac_request=None,
3756                           expect_requester_sid=None,
3757                           to_rodc=False):
3758
3759         def _generate_padata_copy(_kdc_exchange_dict,
3760                                   _callback_dict,
3761                                   req_body):
3762             return padata, req_body
3763
3764         if not expected_error_mode:
3765             check_error_fn = None
3766             check_rep_fn = self.generic_check_kdc_rep
3767         else:
3768             check_error_fn = self.generic_check_kdc_error
3769             check_rep_fn = None
3770
3771         if padata is not None:
3772             generate_padata_fn = _generate_padata_copy
3773         else:
3774             generate_padata_fn = None
3775
3776         kdc_exchange_dict = self.as_exchange_dict(
3777             expected_crealm=expected_crealm,
3778             expected_cname=expected_cname,
3779             expected_srealm=expected_srealm,
3780             expected_sname=expected_sname,
3781             expected_account_name=expected_account_name,
3782             expected_upn_name=expected_upn_name,
3783             expected_sid=expected_sid,
3784             expected_supported_etypes=expected_supported_etypes,
3785             ticket_decryption_key=ticket_decryption_key,
3786             generate_padata_fn=generate_padata_fn,
3787             check_error_fn=check_error_fn,
3788             check_rep_fn=check_rep_fn,
3789             check_kdc_private_fn=self.generic_check_kdc_private,
3790             expected_error_mode=expected_error_mode,
3791             client_as_etypes=client_as_etypes,
3792             expected_salt=expected_salt,
3793             expected_flags=expected_flags,
3794             unexpected_flags=unexpected_flags,
3795             preauth_key=preauth_key,
3796             kdc_options=str(kdc_options),
3797             pac_request=pac_request,
3798             pac_options=pac_options,
3799             expect_pac=expect_pac,
3800             expect_pac_attrs=expect_pac_attrs,
3801             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
3802             expect_requester_sid=expect_requester_sid,
3803             to_rodc=to_rodc)
3804
3805         rep = self._generic_kdc_exchange(kdc_exchange_dict,
3806                                          cname=cname,
3807                                          realm=realm,
3808                                          sname=sname,
3809                                          till_time=till,
3810                                          etypes=etypes)
3811
3812         return rep, kdc_exchange_dict