CVE-2022-2031 tests/krb5: Add tests for kpasswd service
[samba.git] / python / samba / tests / krb5 / raw_testcase.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Isaac Boukris 2020
3 # Copyright (C) Stefan Metzmacher 2020
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import sys
20 import socket
21 import struct
22 import time
23 import datetime
24 import random
25 import binascii
26 import itertools
27 import collections
28
29 from enum import Enum
30
31 from pyasn1.codec.der.decoder import decode as pyasn1_der_decode
32 from pyasn1.codec.der.encoder import encode as pyasn1_der_encode
33 from pyasn1.codec.native.decoder import decode as pyasn1_native_decode
34 from pyasn1.codec.native.encoder import encode as pyasn1_native_encode
35
36 from pyasn1.codec.ber.encoder import BitStringEncoder
37
38 from pyasn1.error import PyAsn1Error
39
40 from samba.credentials import Credentials
41 from samba.dcerpc import krb5pac, security
42 from samba.gensec import FEATURE_SEAL
43 from samba.ndr import ndr_pack, ndr_unpack
44
45 import samba.tests
46 from samba.tests import TestCaseInTempDir
47
48 import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
49 from samba.tests.krb5.rfc4120_constants import (
50     AD_IF_RELEVANT,
51     AD_WIN2K_PAC,
52     FX_FAST_ARMOR_AP_REQUEST,
53     KDC_ERR_GENERIC,
54     KDC_ERR_PREAUTH_FAILED,
55     KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS,
56     KERB_ERR_TYPE_EXTENDED,
57     KRB_AP_REP,
58     KRB_AP_REQ,
59     KRB_AS_REP,
60     KRB_AS_REQ,
61     KRB_ERROR,
62     KRB_PRIV,
63     KRB_TGS_REP,
64     KRB_TGS_REQ,
65     KU_AP_REQ_AUTH,
66     KU_AS_REP_ENC_PART,
67     KU_AP_REQ_ENC_PART,
68     KU_ENC_CHALLENGE_KDC,
69     KU_FAST_ENC,
70     KU_FAST_FINISHED,
71     KU_FAST_REP,
72     KU_FAST_REQ_CHKSUM,
73     KU_KRB_PRIV,
74     KU_NON_KERB_CKSUM_SALT,
75     KU_TGS_REP_ENC_PART_SESSION,
76     KU_TGS_REP_ENC_PART_SUB_KEY,
77     KU_TGS_REQ_AUTH,
78     KU_TGS_REQ_AUTH_CKSUM,
79     KU_TGS_REQ_AUTH_DAT_SESSION,
80     KU_TGS_REQ_AUTH_DAT_SUBKEY,
81     KU_TICKET,
82     NT_PRINCIPAL,
83     NT_SRV_INST,
84     NT_WELLKNOWN,
85     PADATA_ENCRYPTED_CHALLENGE,
86     PADATA_ENC_TIMESTAMP,
87     PADATA_ETYPE_INFO,
88     PADATA_ETYPE_INFO2,
89     PADATA_FOR_USER,
90     PADATA_FX_COOKIE,
91     PADATA_FX_ERROR,
92     PADATA_FX_FAST,
93     PADATA_KDC_REQ,
94     PADATA_PAC_OPTIONS,
95     PADATA_PAC_REQUEST,
96     PADATA_PK_AS_REQ,
97     PADATA_PK_AS_REP_19,
98     PADATA_SUPPORTED_ETYPES
99 )
100 import samba.tests.krb5.kcrypto as kcrypto
101
102
103 def BitStringEncoder_encodeValue32(
104         self, value, asn1Spec, encodeFun, **options):
105     #
106     # BitStrings like KDCOptions or TicketFlags should at least
107     # be 32-Bit on the wire
108     #
109     if asn1Spec is not None:
110         # TODO: try to avoid ASN.1 schema instantiation
111         value = asn1Spec.clone(value)
112
113     valueLength = len(value)
114     if valueLength % 8:
115         alignedValue = value << (8 - valueLength % 8)
116     else:
117         alignedValue = value
118
119     substrate = alignedValue.asOctets()
120     length = len(substrate)
121     # We need at least 32-Bit / 4-Bytes
122     if length < 4:
123         padding = 4 - length
124     else:
125         padding = 0
126     ret = b'\x00' + substrate + (b'\x00' * padding)
127     return ret, False, True
128
129
130 BitStringEncoder.encodeValue = BitStringEncoder_encodeValue32
131
132
133 def BitString_NamedValues_prettyPrint(self, scope=0):
134     ret = "%s" % self.asBinary()
135     bits = []
136     highest_bit = 32
137     for byte in self.asNumbers():
138         for bit in [7, 6, 5, 4, 3, 2, 1, 0]:
139             mask = 1 << bit
140             if byte & mask:
141                 val = 1
142             else:
143                 val = 0
144             bits.append(val)
145     if len(bits) < highest_bit:
146         for bitPosition in range(len(bits), highest_bit):
147             bits.append(0)
148     indent = " " * scope
149     delim = ": (\n%s " % indent
150     for bitPosition in range(highest_bit):
151         if bitPosition in self.prettyPrintNamedValues:
152             name = self.prettyPrintNamedValues[bitPosition]
153         elif bits[bitPosition] != 0:
154             name = "unknown-bit-%u" % bitPosition
155         else:
156             continue
157         ret += "%s%s:%u" % (delim, name, bits[bitPosition])
158         delim = ",\n%s " % indent
159     ret += "\n%s)" % indent
160     return ret
161
162
163 krb5_asn1.TicketFlags.prettyPrintNamedValues =\
164     krb5_asn1.TicketFlagsValues.namedValues
165 krb5_asn1.TicketFlags.namedValues =\
166     krb5_asn1.TicketFlagsValues.namedValues
167 krb5_asn1.TicketFlags.prettyPrint =\
168     BitString_NamedValues_prettyPrint
169 krb5_asn1.KDCOptions.prettyPrintNamedValues =\
170     krb5_asn1.KDCOptionsValues.namedValues
171 krb5_asn1.KDCOptions.namedValues =\
172     krb5_asn1.KDCOptionsValues.namedValues
173 krb5_asn1.KDCOptions.prettyPrint =\
174     BitString_NamedValues_prettyPrint
175 krb5_asn1.APOptions.prettyPrintNamedValues =\
176     krb5_asn1.APOptionsValues.namedValues
177 krb5_asn1.APOptions.namedValues =\
178     krb5_asn1.APOptionsValues.namedValues
179 krb5_asn1.APOptions.prettyPrint =\
180     BitString_NamedValues_prettyPrint
181 krb5_asn1.PACOptionFlags.prettyPrintNamedValues =\
182     krb5_asn1.PACOptionFlagsValues.namedValues
183 krb5_asn1.PACOptionFlags.namedValues =\
184     krb5_asn1.PACOptionFlagsValues.namedValues
185 krb5_asn1.PACOptionFlags.prettyPrint =\
186     BitString_NamedValues_prettyPrint
187
188
189 def Integer_NamedValues_prettyPrint(self, scope=0):
190     intval = int(self)
191     if intval in self.prettyPrintNamedValues:
192         name = self.prettyPrintNamedValues[intval]
193     else:
194         name = "<__unknown__>"
195     ret = "%d (0x%x) %s" % (intval, intval, name)
196     return ret
197
198
199 krb5_asn1.NameType.prettyPrintNamedValues =\
200     krb5_asn1.NameTypeValues.namedValues
201 krb5_asn1.NameType.prettyPrint =\
202     Integer_NamedValues_prettyPrint
203 krb5_asn1.AuthDataType.prettyPrintNamedValues =\
204     krb5_asn1.AuthDataTypeValues.namedValues
205 krb5_asn1.AuthDataType.prettyPrint =\
206     Integer_NamedValues_prettyPrint
207 krb5_asn1.PADataType.prettyPrintNamedValues =\
208     krb5_asn1.PADataTypeValues.namedValues
209 krb5_asn1.PADataType.prettyPrint =\
210     Integer_NamedValues_prettyPrint
211 krb5_asn1.EncryptionType.prettyPrintNamedValues =\
212     krb5_asn1.EncryptionTypeValues.namedValues
213 krb5_asn1.EncryptionType.prettyPrint =\
214     Integer_NamedValues_prettyPrint
215 krb5_asn1.ChecksumType.prettyPrintNamedValues =\
216     krb5_asn1.ChecksumTypeValues.namedValues
217 krb5_asn1.ChecksumType.prettyPrint =\
218     Integer_NamedValues_prettyPrint
219 krb5_asn1.KerbErrorDataType.prettyPrintNamedValues =\
220     krb5_asn1.KerbErrorDataTypeValues.namedValues
221 krb5_asn1.KerbErrorDataType.prettyPrint =\
222     Integer_NamedValues_prettyPrint
223
224
225 class Krb5EncryptionKey:
226     def __init__(self, key, kvno):
227         EncTypeChecksum = {
228             kcrypto.Enctype.AES256: kcrypto.Cksumtype.SHA1_AES256,
229             kcrypto.Enctype.AES128: kcrypto.Cksumtype.SHA1_AES128,
230             kcrypto.Enctype.RC4: kcrypto.Cksumtype.HMAC_MD5,
231         }
232         self.key = key
233         self.etype = key.enctype
234         self.ctype = EncTypeChecksum[self.etype]
235         self.kvno = kvno
236
237     def encrypt(self, usage, plaintext):
238         ciphertext = kcrypto.encrypt(self.key, usage, plaintext)
239         return ciphertext
240
241     def decrypt(self, usage, ciphertext):
242         plaintext = kcrypto.decrypt(self.key, usage, ciphertext)
243         return plaintext
244
245     def make_zeroed_checksum(self, ctype=None):
246         if ctype is None:
247             ctype = self.ctype
248
249         checksum_len = kcrypto.checksum_len(ctype)
250         return bytes(checksum_len)
251
252     def make_checksum(self, usage, plaintext, ctype=None):
253         if ctype is None:
254             ctype = self.ctype
255         cksum = kcrypto.make_checksum(ctype, self.key, usage, plaintext)
256         return cksum
257
258     def verify_checksum(self, usage, plaintext, ctype, cksum):
259         if self.ctype != ctype:
260             raise AssertionError(f'key checksum type ({self.ctype}) != '
261                                  f'checksum type ({ctype})')
262
263         kcrypto.verify_checksum(ctype,
264                                 self.key,
265                                 usage,
266                                 plaintext,
267                                 cksum)
268
269     def export_obj(self):
270         EncryptionKey_obj = {
271             'keytype': self.etype,
272             'keyvalue': self.key.contents,
273         }
274         return EncryptionKey_obj
275
276
277 class RodcPacEncryptionKey(Krb5EncryptionKey):
278     def __init__(self, key, kvno, rodc_id=None):
279         super().__init__(key, kvno)
280
281         if rodc_id is None:
282             kvno = self.kvno
283             if kvno is not None:
284                 kvno >>= 16
285                 kvno &= (1 << 16) - 1
286
287             rodc_id = kvno or None
288
289         if rodc_id is not None:
290             self.rodc_id = rodc_id.to_bytes(2, byteorder='little')
291         else:
292             self.rodc_id = b''
293
294     def make_rodc_zeroed_checksum(self, ctype=None):
295         checksum = super().make_zeroed_checksum(ctype)
296         return checksum + bytes(len(self.rodc_id))
297
298     def make_rodc_checksum(self, usage, plaintext, ctype=None):
299         checksum = super().make_checksum(usage, plaintext, ctype)
300         return checksum + self.rodc_id
301
302     def verify_rodc_checksum(self, usage, plaintext, ctype, cksum):
303         if self.rodc_id:
304             cksum, cksum_rodc_id = cksum[:-2], cksum[-2:]
305
306             if self.rodc_id != cksum_rodc_id:
307                 raise AssertionError(f'{self.rodc_id.hex()} != '
308                                      f'{cksum_rodc_id.hex()}')
309
310         super().verify_checksum(usage,
311                                 plaintext,
312                                 ctype,
313                                 cksum)
314
315
316 class ZeroedChecksumKey(RodcPacEncryptionKey):
317     def make_checksum(self, usage, plaintext, ctype=None):
318         return self.make_zeroed_checksum(ctype)
319
320     def make_rodc_checksum(self, usage, plaintext, ctype=None):
321         return self.make_rodc_zeroed_checksum(ctype)
322
323
324 class WrongLengthChecksumKey(RodcPacEncryptionKey):
325     def __init__(self, key, kvno, length):
326         super().__init__(key, kvno)
327
328         self._length = length
329
330     @classmethod
331     def _adjust_to_length(cls, checksum, length):
332         diff = length - len(checksum)
333         if diff > 0:
334             checksum += bytes(diff)
335         elif diff < 0:
336             checksum = checksum[:length]
337
338         return checksum
339
340     def make_zeroed_checksum(self, ctype=None):
341         return bytes(self._length)
342
343     def make_checksum(self, usage, plaintext, ctype=None):
344         checksum = super().make_checksum(usage, plaintext, ctype)
345         return self._adjust_to_length(checksum, self._length)
346
347     def make_rodc_zeroed_checksum(self, ctype=None):
348         return bytes(self._length)
349
350     def make_rodc_checksum(self, usage, plaintext, ctype=None):
351         checksum = super().make_rodc_checksum(usage, plaintext, ctype)
352         return self._adjust_to_length(checksum, self._length)
353
354
355 class KerberosCredentials(Credentials):
356
357     fast_supported_bits = (security.KERB_ENCTYPE_FAST_SUPPORTED |
358                            security.KERB_ENCTYPE_COMPOUND_IDENTITY_SUPPORTED |
359                            security.KERB_ENCTYPE_CLAIMS_SUPPORTED)
360
361     def __init__(self):
362         super(KerberosCredentials, self).__init__()
363         all_enc_types = 0
364         all_enc_types |= security.KERB_ENCTYPE_RC4_HMAC_MD5
365         all_enc_types |= security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96
366         all_enc_types |= security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96
367
368         self.as_supported_enctypes = all_enc_types
369         self.tgs_supported_enctypes = all_enc_types
370         self.ap_supported_enctypes = all_enc_types
371
372         self.kvno = None
373         self.forced_keys = {}
374
375         self.forced_salt = None
376
377         self.dn = None
378         self.upn = None
379         self.spn = None
380
381     def set_as_supported_enctypes(self, value):
382         self.as_supported_enctypes = int(value)
383
384     def set_tgs_supported_enctypes(self, value):
385         self.tgs_supported_enctypes = int(value)
386
387     def set_ap_supported_enctypes(self, value):
388         self.ap_supported_enctypes = int(value)
389
390     etype_map = collections.OrderedDict([
391         (kcrypto.Enctype.AES256,
392             security.KERB_ENCTYPE_AES256_CTS_HMAC_SHA1_96),
393         (kcrypto.Enctype.AES128,
394             security.KERB_ENCTYPE_AES128_CTS_HMAC_SHA1_96),
395         (kcrypto.Enctype.RC4,
396             security.KERB_ENCTYPE_RC4_HMAC_MD5),
397         (kcrypto.Enctype.DES_MD5,
398             security.KERB_ENCTYPE_DES_CBC_MD5),
399         (kcrypto.Enctype.DES_CRC,
400             security.KERB_ENCTYPE_DES_CBC_CRC)
401     ])
402
403     @classmethod
404     def etypes_to_bits(cls, etypes):
405         bits = 0
406         for etype in etypes:
407             bit = cls.etype_map[etype]
408             if bits & bit:
409                 raise ValueError(f'Got duplicate etype: {etype}')
410             bits |= bit
411
412         return bits
413
414     @classmethod
415     def bits_to_etypes(cls, bits):
416         etypes = ()
417         for etype, bit in cls.etype_map.items():
418             if bit & bits:
419                 bits &= ~bit
420                 etypes += (etype,)
421
422         bits &= ~cls.fast_supported_bits
423         if bits != 0:
424             raise ValueError(f'Unsupported etype bits: {bits}')
425
426         return etypes
427
428     def get_as_krb5_etypes(self):
429         return self.bits_to_etypes(self.as_supported_enctypes)
430
431     def get_tgs_krb5_etypes(self):
432         return self.bits_to_etypes(self.tgs_supported_enctypes)
433
434     def get_ap_krb5_etypes(self):
435         return self.bits_to_etypes(self.ap_supported_enctypes)
436
437     def set_kvno(self, kvno):
438         # Sign-extend from 32 bits.
439         if kvno & 1 << 31:
440             kvno |= -1 << 31
441         self.kvno = kvno
442
443     def get_kvno(self):
444         return self.kvno
445
446     def set_forced_key(self, etype, hexkey):
447         etype = int(etype)
448         contents = binascii.a2b_hex(hexkey)
449         key = kcrypto.Key(etype, contents)
450         self.forced_keys[etype] = RodcPacEncryptionKey(key, self.kvno)
451
452     def get_forced_key(self, etype):
453         etype = int(etype)
454         return self.forced_keys.get(etype)
455
456     def set_forced_salt(self, salt):
457         self.forced_salt = bytes(salt)
458
459     def get_forced_salt(self):
460         return self.forced_salt
461
462     def get_salt(self):
463         if self.forced_salt is not None:
464             return self.forced_salt
465
466         upn = self.get_upn()
467         if upn is not None:
468             salt_name = upn.rsplit('@', 1)[0].replace('/', '')
469         else:
470             salt_name = self.get_username()
471
472         if self.get_workstation():
473             salt_name = self.get_username().lower()
474             if salt_name[-1] == '$':
475                 salt_name = salt_name[:-1]
476             salt_string = '%shost%s.%s' % (
477                 self.get_realm().upper(),
478                 salt_name,
479                 self.get_realm().lower())
480         else:
481             salt_string = self.get_realm().upper() + salt_name
482
483         return salt_string.encode('utf-8')
484
485     def set_dn(self, dn):
486         self.dn = dn
487
488     def get_dn(self):
489         return self.dn
490
491     def set_spn(self, spn):
492         self.spn = spn
493
494     def get_spn(self):
495         return self.spn
496
497     def set_upn(self, upn):
498         self.upn = upn
499
500     def get_upn(self):
501         return self.upn
502
503     def update_password(self, password):
504         self.set_password(password)
505         self.set_kvno(self.get_kvno() + 1)
506
507
508 class KerberosTicketCreds:
509     def __init__(self, ticket, session_key,
510                  crealm=None, cname=None,
511                  srealm=None, sname=None,
512                  decryption_key=None,
513                  ticket_private=None,
514                  encpart_private=None):
515         self.ticket = ticket
516         self.session_key = session_key
517         self.crealm = crealm
518         self.cname = cname
519         self.srealm = srealm
520         self.sname = sname
521         self.decryption_key = decryption_key
522         self.ticket_private = ticket_private
523         self.encpart_private = encpart_private
524
525     def set_sname(self, sname):
526         self.ticket['sname'] = sname
527         self.sname = sname
528
529
530 class RawKerberosTest(TestCaseInTempDir):
531     """A raw Kerberos Test case."""
532
533     class KpasswdMode(Enum):
534         SET = object()
535         CHANGE = object()
536
537     pac_checksum_types = {krb5pac.PAC_TYPE_SRV_CHECKSUM,
538                           krb5pac.PAC_TYPE_KDC_CHECKSUM,
539                           krb5pac.PAC_TYPE_TICKET_CHECKSUM}
540
541     etypes_to_test = (
542         {"value": -1111, "name": "dummy", },
543         {"value": kcrypto.Enctype.AES256, "name": "aes128", },
544         {"value": kcrypto.Enctype.AES128, "name": "aes256", },
545         {"value": kcrypto.Enctype.RC4, "name": "rc4", },
546     )
547
548     setup_etype_test_permutations_done = False
549
550     @classmethod
551     def setup_etype_test_permutations(cls):
552         if cls.setup_etype_test_permutations_done:
553             return
554
555         res = []
556
557         num_idxs = len(cls.etypes_to_test)
558         permutations = []
559         for num in range(1, num_idxs + 1):
560             chunk = list(itertools.permutations(range(num_idxs), num))
561             for e in chunk:
562                 el = list(e)
563                 permutations.append(el)
564
565         for p in permutations:
566             name = None
567             etypes = ()
568             for idx in p:
569                 n = cls.etypes_to_test[idx]["name"]
570                 if name is None:
571                     name = n
572                 else:
573                     name += "_%s" % n
574                 etypes += (cls.etypes_to_test[idx]["value"],)
575
576             r = {"name": name, "etypes": etypes, }
577             res.append(r)
578
579         cls.etype_test_permutations = res
580         cls.setup_etype_test_permutations_done = True
581
582     @classmethod
583     def etype_test_permutation_name_idx(cls):
584         cls.setup_etype_test_permutations()
585         res = []
586         idx = 0
587         for e in cls.etype_test_permutations:
588             r = (e['name'], idx)
589             idx += 1
590             res.append(r)
591         return res
592
593     def etype_test_permutation_by_idx(self, idx):
594         e = self.etype_test_permutations[idx]
595         return (e['name'], e['etypes'])
596
597     @classmethod
598     def setUpClass(cls):
599         super().setUpClass()
600
601         cls.host = samba.tests.env_get_var_value('SERVER')
602         cls.dc_host = samba.tests.env_get_var_value('DC_SERVER')
603
604         # A dictionary containing credentials that have already been
605         # obtained.
606         cls.creds_dict = {}
607
608         kdc_fast_support = samba.tests.env_get_var_value('FAST_SUPPORT',
609                                                          allow_missing=True)
610         if kdc_fast_support is None:
611             kdc_fast_support = '0'
612         cls.kdc_fast_support = bool(int(kdc_fast_support))
613
614         tkt_sig_support = samba.tests.env_get_var_value('TKT_SIG_SUPPORT',
615                                                         allow_missing=True)
616         if tkt_sig_support is None:
617             tkt_sig_support = '0'
618         cls.tkt_sig_support = bool(int(tkt_sig_support))
619
620         expect_pac = samba.tests.env_get_var_value('EXPECT_PAC',
621                                                    allow_missing=True)
622         if expect_pac is None:
623             expect_pac = '1'
624         cls.expect_pac = bool(int(expect_pac))
625
626         expect_extra_pac_buffers = samba.tests.env_get_var_value(
627             'EXPECT_EXTRA_PAC_BUFFERS',
628             allow_missing=True)
629         if expect_extra_pac_buffers is None:
630             expect_extra_pac_buffers = '1'
631         cls.expect_extra_pac_buffers = bool(int(expect_extra_pac_buffers))
632
633     def setUp(self):
634         super().setUp()
635         self.do_asn1_print = False
636         self.do_hexdump = False
637
638         strict_checking = samba.tests.env_get_var_value('STRICT_CHECKING',
639                                                         allow_missing=True)
640         if strict_checking is None:
641             strict_checking = '1'
642         self.strict_checking = bool(int(strict_checking))
643
644         self.s = None
645
646         self.unspecified_kvno = object()
647
648     def tearDown(self):
649         self._disconnect("tearDown")
650         super().tearDown()
651
652     def _disconnect(self, reason):
653         if self.s is None:
654             return
655         self.s.close()
656         self.s = None
657         if self.do_hexdump:
658             sys.stderr.write("disconnect[%s]\n" % reason)
659
660     def _connect_tcp(self, host, port=None):
661         if port is None:
662             port = 88
663         try:
664             self.a = socket.getaddrinfo(host, port, socket.AF_UNSPEC,
665                                         socket.SOCK_STREAM, socket.SOL_TCP,
666                                         0)
667             self.s = socket.socket(self.a[0][0], self.a[0][1], self.a[0][2])
668             self.s.settimeout(10)
669             self.s.connect(self.a[0][4])
670         except socket.error:
671             self.s.close()
672             raise
673         except IOError:
674             self.s.close()
675             raise
676
677     def connect(self, host, port=None):
678         self.assertNotConnected()
679         self._connect_tcp(host, port)
680         if self.do_hexdump:
681             sys.stderr.write("connected[%s]\n" % host)
682
683     def env_get_var(self, varname, prefix,
684                     fallback_default=True,
685                     allow_missing=False):
686         val = None
687         if prefix is not None:
688             allow_missing_prefix = allow_missing or fallback_default
689             val = samba.tests.env_get_var_value(
690                 '%s_%s' % (prefix, varname),
691                 allow_missing=allow_missing_prefix)
692         else:
693             fallback_default = True
694         if val is None and fallback_default:
695             val = samba.tests.env_get_var_value(varname,
696                                                 allow_missing=allow_missing)
697         return val
698
699     def _get_krb5_creds_from_env(self, prefix,
700                                  default_username=None,
701                                  allow_missing_password=False,
702                                  allow_missing_keys=True,
703                                  require_strongest_key=False):
704         c = KerberosCredentials()
705         c.guess()
706
707         domain = self.env_get_var('DOMAIN', prefix)
708         realm = self.env_get_var('REALM', prefix)
709         allow_missing_username = default_username is not None
710         username = self.env_get_var('USERNAME', prefix,
711                                     fallback_default=False,
712                                     allow_missing=allow_missing_username)
713         if username is None:
714             username = default_username
715         password = self.env_get_var('PASSWORD', prefix,
716                                     fallback_default=False,
717                                     allow_missing=allow_missing_password)
718         c.set_domain(domain)
719         c.set_realm(realm)
720         c.set_username(username)
721         if password is not None:
722             c.set_password(password)
723         as_supported_enctypes = self.env_get_var('AS_SUPPORTED_ENCTYPES',
724                                                  prefix, allow_missing=True)
725         if as_supported_enctypes is not None:
726             c.set_as_supported_enctypes(as_supported_enctypes)
727         tgs_supported_enctypes = self.env_get_var('TGS_SUPPORTED_ENCTYPES',
728                                                   prefix, allow_missing=True)
729         if tgs_supported_enctypes is not None:
730             c.set_tgs_supported_enctypes(tgs_supported_enctypes)
731         ap_supported_enctypes = self.env_get_var('AP_SUPPORTED_ENCTYPES',
732                                                  prefix, allow_missing=True)
733         if ap_supported_enctypes is not None:
734             c.set_ap_supported_enctypes(ap_supported_enctypes)
735
736         if require_strongest_key:
737             kvno_allow_missing = False
738             if password is None:
739                 aes256_allow_missing = False
740             else:
741                 aes256_allow_missing = True
742         else:
743             kvno_allow_missing = allow_missing_keys
744             aes256_allow_missing = allow_missing_keys
745         kvno = self.env_get_var('KVNO', prefix,
746                                 fallback_default=False,
747                                 allow_missing=kvno_allow_missing)
748         if kvno is not None:
749             c.set_kvno(int(kvno))
750         aes256_key = self.env_get_var('AES256_KEY_HEX', prefix,
751                                       fallback_default=False,
752                                       allow_missing=aes256_allow_missing)
753         if aes256_key is not None:
754             c.set_forced_key(kcrypto.Enctype.AES256, aes256_key)
755         aes128_key = self.env_get_var('AES128_KEY_HEX', prefix,
756                                       fallback_default=False,
757                                       allow_missing=True)
758         if aes128_key is not None:
759             c.set_forced_key(kcrypto.Enctype.AES128, aes128_key)
760         rc4_key = self.env_get_var('RC4_KEY_HEX', prefix,
761                                    fallback_default=False, allow_missing=True)
762         if rc4_key is not None:
763             c.set_forced_key(kcrypto.Enctype.RC4, rc4_key)
764
765         if not allow_missing_keys:
766             self.assertTrue(c.forced_keys,
767                             'Please supply %s encryption keys '
768                             'in environment' % prefix)
769
770         return c
771
772     def _get_krb5_creds(self,
773                         prefix,
774                         default_username=None,
775                         allow_missing_password=False,
776                         allow_missing_keys=True,
777                         require_strongest_key=False,
778                         fallback_creds_fn=None):
779         if prefix in self.creds_dict:
780             return self.creds_dict[prefix]
781
782         # We don't have the credentials already
783         creds = None
784         env_err = None
785         try:
786             # Try to obtain them from the environment
787             creds = self._get_krb5_creds_from_env(
788                 prefix,
789                 default_username=default_username,
790                 allow_missing_password=allow_missing_password,
791                 allow_missing_keys=allow_missing_keys,
792                 require_strongest_key=require_strongest_key)
793         except Exception as err:
794             # An error occurred, so save it for later
795             env_err = err
796         else:
797             self.assertIsNotNone(creds)
798             # Save the obtained credentials
799             self.creds_dict[prefix] = creds
800             return creds
801
802         if fallback_creds_fn is not None:
803             try:
804                 # Try to use the fallback method
805                 creds = fallback_creds_fn()
806             except Exception as err:
807                 print("ERROR FROM ENV: %r" % (env_err))
808                 print("FALLBACK-FN: %s" % (fallback_creds_fn))
809                 print("FALLBACK-ERROR: %r" % (err))
810             else:
811                 self.assertIsNotNone(creds)
812                 # Save the obtained credentials
813                 self.creds_dict[prefix] = creds
814                 return creds
815
816         # Both methods failed, so raise the exception from the
817         # environment method
818         raise env_err
819
820     def get_user_creds(self,
821                        allow_missing_password=False,
822                        allow_missing_keys=True):
823         c = self._get_krb5_creds(prefix=None,
824                                  allow_missing_password=allow_missing_password,
825                                  allow_missing_keys=allow_missing_keys)
826         return c
827
828     def get_service_creds(self,
829                           allow_missing_password=False,
830                           allow_missing_keys=True):
831         c = self._get_krb5_creds(prefix='SERVICE',
832                                  allow_missing_password=allow_missing_password,
833                                  allow_missing_keys=allow_missing_keys)
834         return c
835
836     def get_client_creds(self,
837                          allow_missing_password=False,
838                          allow_missing_keys=True):
839         c = self._get_krb5_creds(prefix='CLIENT',
840                                  allow_missing_password=allow_missing_password,
841                                  allow_missing_keys=allow_missing_keys)
842         return c
843
844     def get_server_creds(self,
845                          allow_missing_password=False,
846                          allow_missing_keys=True):
847         c = self._get_krb5_creds(prefix='SERVER',
848                                  allow_missing_password=allow_missing_password,
849                                  allow_missing_keys=allow_missing_keys)
850         return c
851
852     def get_admin_creds(self,
853                         allow_missing_password=False,
854                         allow_missing_keys=True):
855         c = self._get_krb5_creds(prefix='ADMIN',
856                                  allow_missing_password=allow_missing_password,
857                                  allow_missing_keys=allow_missing_keys)
858         c.set_gensec_features(c.get_gensec_features() | FEATURE_SEAL)
859         c.set_workstation('')
860         return c
861
862     def get_rodc_krbtgt_creds(self,
863                               require_keys=True,
864                               require_strongest_key=False):
865         if require_strongest_key:
866             self.assertTrue(require_keys)
867         c = self._get_krb5_creds(prefix='RODC_KRBTGT',
868                                  allow_missing_password=True,
869                                  allow_missing_keys=not require_keys,
870                                  require_strongest_key=require_strongest_key)
871         return c
872
873     def get_krbtgt_creds(self,
874                          require_keys=True,
875                          require_strongest_key=False):
876         if require_strongest_key:
877             self.assertTrue(require_keys)
878         c = self._get_krb5_creds(prefix='KRBTGT',
879                                  default_username='krbtgt',
880                                  allow_missing_password=True,
881                                  allow_missing_keys=not require_keys,
882                                  require_strongest_key=require_strongest_key)
883         return c
884
885     def get_anon_creds(self):
886         c = Credentials()
887         c.set_anonymous()
888         return c
889
890     def asn1_dump(self, name, obj, asn1_print=None):
891         if asn1_print is None:
892             asn1_print = self.do_asn1_print
893         if asn1_print:
894             if name is not None:
895                 sys.stderr.write("%s:\n%s" % (name, obj))
896             else:
897                 sys.stderr.write("%s" % (obj))
898
899     def hex_dump(self, name, blob, hexdump=None):
900         if hexdump is None:
901             hexdump = self.do_hexdump
902         if hexdump:
903             sys.stderr.write(
904                 "%s: %d\n%s" % (name, len(blob), self.hexdump(blob)))
905
906     def der_decode(
907             self,
908             blob,
909             asn1Spec=None,
910             native_encode=True,
911             asn1_print=None,
912             hexdump=None):
913         if asn1Spec is not None:
914             class_name = type(asn1Spec).__name__.split(':')[0]
915         else:
916             class_name = "<None-asn1Spec>"
917         self.hex_dump(class_name, blob, hexdump=hexdump)
918         obj, _ = pyasn1_der_decode(blob, asn1Spec=asn1Spec)
919         self.asn1_dump(None, obj, asn1_print=asn1_print)
920         if native_encode:
921             obj = pyasn1_native_encode(obj)
922         return obj
923
924     def der_encode(
925             self,
926             obj,
927             asn1Spec=None,
928             native_decode=True,
929             asn1_print=None,
930             hexdump=None):
931         if native_decode:
932             obj = pyasn1_native_decode(obj, asn1Spec=asn1Spec)
933         class_name = type(obj).__name__.split(':')[0]
934         if class_name is not None:
935             self.asn1_dump(None, obj, asn1_print=asn1_print)
936         blob = pyasn1_der_encode(obj)
937         if class_name is not None:
938             self.hex_dump(class_name, blob, hexdump=hexdump)
939         return blob
940
941     def send_pdu(self, req, asn1_print=None, hexdump=None):
942         k5_pdu = self.der_encode(
943             req, native_decode=False, asn1_print=asn1_print, hexdump=False)
944         self.send_msg(k5_pdu, hexdump=hexdump)
945
946     def send_msg(self, msg, hexdump=None):
947         header = struct.pack('>I', len(msg))
948         req_pdu = header
949         req_pdu += msg
950         self.hex_dump("send_msg", header, hexdump=hexdump)
951         self.hex_dump("send_msg", msg, hexdump=hexdump)
952
953         try:
954             while True:
955                 sent = self.s.send(req_pdu, 0)
956                 if sent == len(req_pdu):
957                     return
958                 req_pdu = req_pdu[sent:]
959         except socket.error as e:
960             self._disconnect("send_msg: %s" % e)
961             raise
962         except IOError as e:
963             self._disconnect("send_msg: %s" % e)
964             raise
965
966     def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
967         rep_pdu = None
968         try:
969             if timeout is not None:
970                 self.s.settimeout(timeout)
971             rep_pdu = self.s.recv(num_recv, 0)
972             self.s.settimeout(10)
973             if len(rep_pdu) == 0:
974                 self._disconnect("recv_raw: EOF")
975                 return None
976             self.hex_dump("recv_raw", rep_pdu, hexdump=hexdump)
977         except socket.timeout:
978             self.s.settimeout(10)
979             sys.stderr.write("recv_raw: TIMEOUT\n")
980         except socket.error as e:
981             self._disconnect("recv_raw: %s" % e)
982             raise
983         except IOError as e:
984             self._disconnect("recv_raw: %s" % e)
985             raise
986         return rep_pdu
987
988     def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
989         raw_pdu = self.recv_raw(
990             num_recv=4, hexdump=hexdump, timeout=timeout)
991         if raw_pdu is None:
992             return None
993         header = struct.unpack(">I", raw_pdu[0:4])
994         k5_len = header[0]
995         if k5_len == 0:
996             return ""
997         missing = k5_len
998         rep_pdu = b''
999         while missing > 0:
1000             raw_pdu = self.recv_raw(
1001                 num_recv=missing, hexdump=hexdump, timeout=timeout)
1002             self.assertGreaterEqual(len(raw_pdu), 1)
1003             rep_pdu += raw_pdu
1004             missing = k5_len - len(rep_pdu)
1005         return rep_pdu
1006
1007     def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
1008         rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
1009                                     hexdump=hexdump,
1010                                     timeout=timeout)
1011         if not rep_pdu:
1012             return None, rep_pdu
1013         k5_raw = self.der_decode(
1014             rep_pdu,
1015             asn1Spec=None,
1016             native_encode=False,
1017             asn1_print=False,
1018             hexdump=False)
1019         pvno = k5_raw['field-0']
1020         self.assertEqual(pvno, 5)
1021         msg_type = k5_raw['field-1']
1022         self.assertIn(msg_type, [KRB_AS_REP, KRB_TGS_REP, KRB_ERROR])
1023         if msg_type == KRB_AS_REP:
1024             asn1Spec = krb5_asn1.AS_REP()
1025         elif msg_type == KRB_TGS_REP:
1026             asn1Spec = krb5_asn1.TGS_REP()
1027         elif msg_type == KRB_ERROR:
1028             asn1Spec = krb5_asn1.KRB_ERROR()
1029         rep = self.der_decode(rep_pdu, asn1Spec=asn1Spec,
1030                               asn1_print=asn1_print, hexdump=False)
1031         return (rep, rep_pdu)
1032
1033     def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
1034         (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
1035                                          hexdump=hexdump,
1036                                          timeout=timeout)
1037         return rep
1038
1039     def assertIsConnected(self):
1040         self.assertIsNotNone(self.s, msg="Not connected")
1041
1042     def assertNotConnected(self):
1043         self.assertIsNone(self.s, msg="Is connected")
1044
1045     def send_recv_transaction(
1046             self,
1047             req,
1048             asn1_print=None,
1049             hexdump=None,
1050             timeout=None,
1051             to_rodc=False):
1052         host = self.host if to_rodc else self.dc_host
1053         self.connect(host)
1054         try:
1055             self.send_pdu(req, asn1_print=asn1_print, hexdump=hexdump)
1056             rep = self.recv_pdu(
1057                 asn1_print=asn1_print, hexdump=hexdump, timeout=timeout)
1058         except Exception:
1059             self._disconnect("transaction failed")
1060             raise
1061         self._disconnect("transaction done")
1062         return rep
1063
1064     def assertNoValue(self, value):
1065         self.assertTrue(value.isNoValue)
1066
1067     def assertHasValue(self, value):
1068         self.assertIsNotNone(value)
1069
1070     def getElementValue(self, obj, elem):
1071         return obj.get(elem)
1072
1073     def assertElementMissing(self, obj, elem):
1074         v = self.getElementValue(obj, elem)
1075         self.assertIsNone(v)
1076
1077     def assertElementPresent(self, obj, elem, expect_empty=False):
1078         v = self.getElementValue(obj, elem)
1079         self.assertIsNotNone(v)
1080         if self.strict_checking:
1081             if isinstance(v, collections.abc.Container):
1082                 if expect_empty:
1083                     self.assertEqual(0, len(v))
1084                 else:
1085                     self.assertNotEqual(0, len(v))
1086
1087     def assertElementEqual(self, obj, elem, value):
1088         v = self.getElementValue(obj, elem)
1089         self.assertIsNotNone(v)
1090         self.assertEqual(v, value)
1091
1092     def assertElementEqualUTF8(self, obj, elem, value):
1093         v = self.getElementValue(obj, elem)
1094         self.assertIsNotNone(v)
1095         self.assertEqual(v, bytes(value, 'utf8'))
1096
1097     def assertPrincipalEqual(self, princ1, princ2):
1098         self.assertEqual(princ1['name-type'], princ2['name-type'])
1099         self.assertEqual(
1100             len(princ1['name-string']),
1101             len(princ2['name-string']),
1102             msg="princ1=%s != princ2=%s" % (princ1, princ2))
1103         for idx in range(len(princ1['name-string'])):
1104             self.assertEqual(
1105                 princ1['name-string'][idx],
1106                 princ2['name-string'][idx],
1107                 msg="princ1=%s != princ2=%s" % (princ1, princ2))
1108
1109     def assertElementEqualPrincipal(self, obj, elem, value):
1110         v = self.getElementValue(obj, elem)
1111         self.assertIsNotNone(v)
1112         v = pyasn1_native_decode(v, asn1Spec=krb5_asn1.PrincipalName())
1113         self.assertPrincipalEqual(v, value)
1114
1115     def assertElementKVNO(self, obj, elem, value):
1116         v = self.getElementValue(obj, elem)
1117         if value == "autodetect":
1118             value = v
1119         if value is not None:
1120             self.assertIsNotNone(v)
1121             # The value on the wire should never be 0
1122             self.assertNotEqual(v, 0)
1123             # unspecified_kvno means we don't know the kvno,
1124             # but want to enforce its presence
1125             if value is not self.unspecified_kvno:
1126                 value = int(value)
1127                 self.assertNotEqual(value, 0)
1128                 self.assertEqual(v, value)
1129         else:
1130             self.assertIsNone(v)
1131
1132     def assertElementFlags(self, obj, elem, expected, unexpected):
1133         v = self.getElementValue(obj, elem)
1134         self.assertIsNotNone(v)
1135         if expected is not None:
1136             self.assertIsInstance(expected, krb5_asn1.TicketFlags)
1137             for i, flag in enumerate(expected):
1138                 if flag == 1:
1139                     self.assertEqual('1', v[i],
1140                                      f"'{expected.namedValues[i]}' "
1141                                      f"expected in {v}")
1142         if unexpected is not None:
1143             self.assertIsInstance(unexpected, krb5_asn1.TicketFlags)
1144             for i, flag in enumerate(unexpected):
1145                 if flag == 1:
1146                     self.assertEqual('0', v[i],
1147                                      f"'{unexpected.namedValues[i]}' "
1148                                      f"unexpected in {v}")
1149
1150     def assertSequenceElementsEqual(self, expected, got, *,
1151                                     require_strict=None,
1152                                     require_ordered=True):
1153         if self.strict_checking and require_ordered:
1154             self.assertEqual(expected, got)
1155         else:
1156             fail_msg = f'expected: {expected} got: {got}'
1157
1158             if not self.strict_checking and require_strict is not None:
1159                 fail_msg += f' (ignoring: {require_strict})'
1160                 expected = (x for x in expected if x not in require_strict)
1161                 got = (x for x in got if x not in require_strict)
1162
1163             self.assertCountEqual(expected, got, fail_msg)
1164
1165     def get_KerberosTimeWithUsec(self, epoch=None, offset=None):
1166         if epoch is None:
1167             epoch = time.time()
1168         if offset is not None:
1169             epoch = epoch + int(offset)
1170         dt = datetime.datetime.fromtimestamp(epoch, tz=datetime.timezone.utc)
1171         return (dt.strftime("%Y%m%d%H%M%SZ"), dt.microsecond)
1172
1173     def get_KerberosTime(self, epoch=None, offset=None):
1174         (s, _) = self.get_KerberosTimeWithUsec(epoch=epoch, offset=offset)
1175         return s
1176
1177     def get_EpochFromKerberosTime(self, kerberos_time):
1178         if isinstance(kerberos_time, bytes):
1179             kerberos_time = kerberos_time.decode()
1180
1181         epoch = datetime.datetime.strptime(kerberos_time,
1182                                            '%Y%m%d%H%M%SZ')
1183         epoch = epoch.replace(tzinfo=datetime.timezone.utc)
1184         epoch = int(epoch.timestamp())
1185
1186         return epoch
1187
1188     def get_Nonce(self):
1189         nonce_min = 0x7f000000
1190         nonce_max = 0x7fffffff
1191         v = random.randint(nonce_min, nonce_max)
1192         return v
1193
1194     def get_pa_dict(self, pa_data):
1195         pa_dict = {}
1196
1197         if pa_data is not None:
1198             for pa in pa_data:
1199                 pa_type = pa['padata-type']
1200                 if pa_type in pa_dict:
1201                     raise RuntimeError(f'Duplicate type {pa_type}')
1202                 pa_dict[pa_type] = pa['padata-value']
1203
1204         return pa_dict
1205
1206     def SessionKey_create(self, etype, contents, kvno=None):
1207         key = kcrypto.Key(etype, contents)
1208         return RodcPacEncryptionKey(key, kvno)
1209
1210     def PasswordKey_create(self, etype=None, pwd=None, salt=None, kvno=None,
1211                            params=None):
1212         self.assertIsNotNone(pwd)
1213         self.assertIsNotNone(salt)
1214         key = kcrypto.string_to_key(etype, pwd, salt, params=params)
1215         return RodcPacEncryptionKey(key, kvno)
1216
1217     def PasswordKey_from_etype_info2(self, creds, etype_info2, kvno=None):
1218         e = etype_info2['etype']
1219
1220         salt = etype_info2.get('salt')
1221
1222         if e == kcrypto.Enctype.RC4:
1223             nthash = creds.get_nt_hash()
1224             return self.SessionKey_create(etype=e, contents=nthash, kvno=kvno)
1225
1226         params = etype_info2.get('s2kparams')
1227
1228         password = creds.get_password()
1229         return self.PasswordKey_create(
1230             etype=e, pwd=password, salt=salt, kvno=kvno, params=params)
1231
1232     def TicketDecryptionKey_from_creds(self, creds, etype=None):
1233
1234         if etype is None:
1235             etypes = creds.get_tgs_krb5_etypes()
1236             if etypes:
1237                 etype = etypes[0]
1238             else:
1239                 etype = kcrypto.Enctype.RC4
1240
1241         forced_key = creds.get_forced_key(etype)
1242         if forced_key is not None:
1243             return forced_key
1244
1245         kvno = creds.get_kvno()
1246
1247         fail_msg = ("%s has no fixed key for etype[%s] kvno[%s] "
1248                     "nor a password specified, " % (
1249                         creds.get_username(), etype, kvno))
1250
1251         if etype == kcrypto.Enctype.RC4:
1252             nthash = creds.get_nt_hash()
1253             self.assertIsNotNone(nthash, msg=fail_msg)
1254             return self.SessionKey_create(etype=etype,
1255                                           contents=nthash,
1256                                           kvno=kvno)
1257
1258         password = creds.get_password()
1259         self.assertIsNotNone(password, msg=fail_msg)
1260         salt = creds.get_salt()
1261         return self.PasswordKey_create(etype=etype,
1262                                        pwd=password,
1263                                        salt=salt,
1264                                        kvno=kvno)
1265
1266     def RandomKey(self, etype):
1267         e = kcrypto._get_enctype_profile(etype)
1268         contents = samba.generate_random_bytes(e.keysize)
1269         return self.SessionKey_create(etype=etype, contents=contents)
1270
1271     def EncryptionKey_import(self, EncryptionKey_obj):
1272         return self.SessionKey_create(EncryptionKey_obj['keytype'],
1273                                       EncryptionKey_obj['keyvalue'])
1274
1275     def EncryptedData_create(self, key, usage, plaintext):
1276         # EncryptedData   ::= SEQUENCE {
1277         #         etype   [0] Int32 -- EncryptionType --,
1278         #         kvno    [1] Int32 OPTIONAL,
1279         #         cipher  [2] OCTET STRING -- ciphertext
1280         # }
1281         ciphertext = key.encrypt(usage, plaintext)
1282         EncryptedData_obj = {
1283             'etype': key.etype,
1284             'cipher': ciphertext
1285         }
1286         if key.kvno is not None:
1287             EncryptedData_obj['kvno'] = key.kvno
1288         return EncryptedData_obj
1289
1290     def Checksum_create(self, key, usage, plaintext, ctype=None):
1291         # Checksum        ::= SEQUENCE {
1292         #        cksumtype       [0] Int32,
1293         #        checksum        [1] OCTET STRING
1294         # }
1295         if ctype is None:
1296             ctype = key.ctype
1297         checksum = key.make_checksum(usage, plaintext, ctype=ctype)
1298         Checksum_obj = {
1299             'cksumtype': ctype,
1300             'checksum': checksum,
1301         }
1302         return Checksum_obj
1303
1304     @classmethod
1305     def PrincipalName_create(cls, name_type, names):
1306         # PrincipalName   ::= SEQUENCE {
1307         #         name-type       [0] Int32,
1308         #         name-string     [1] SEQUENCE OF KerberosString
1309         # }
1310         PrincipalName_obj = {
1311             'name-type': name_type,
1312             'name-string': names,
1313         }
1314         return PrincipalName_obj
1315
1316     def AuthorizationData_create(self, ad_type, ad_data):
1317         # AuthorizationData ::= SEQUENCE {
1318         #         ad-type         [0] Int32,
1319         #         ad-data         [1] OCTET STRING
1320         # }
1321         AUTH_DATA_obj = {
1322             'ad-type': ad_type,
1323             'ad-data': ad_data
1324         }
1325         return AUTH_DATA_obj
1326
1327     def PA_DATA_create(self, padata_type, padata_value):
1328         # PA-DATA         ::= SEQUENCE {
1329         #         -- NOTE: first tag is [1], not [0]
1330         #         padata-type     [1] Int32,
1331         #         padata-value    [2] OCTET STRING -- might be encoded AP-REQ
1332         # }
1333         PA_DATA_obj = {
1334             'padata-type': padata_type,
1335             'padata-value': padata_value,
1336         }
1337         return PA_DATA_obj
1338
1339     def PA_ENC_TS_ENC_create(self, ts, usec):
1340         # PA-ENC-TS-ENC ::= SEQUENCE {
1341         #        patimestamp[0]          KerberosTime, -- client's time
1342         #        pausec[1]               krb5int32 OPTIONAL
1343         # }
1344         PA_ENC_TS_ENC_obj = {
1345             'patimestamp': ts,
1346             'pausec': usec,
1347         }
1348         return PA_ENC_TS_ENC_obj
1349
1350     def PA_PAC_OPTIONS_create(self, options):
1351         # PA-PAC-OPTIONS  ::= SEQUENCE {
1352         #         options         [0] PACOptionFlags
1353         # }
1354         PA_PAC_OPTIONS_obj = {
1355             'options': options
1356         }
1357         return PA_PAC_OPTIONS_obj
1358
1359     def KRB_FAST_ARMOR_create(self, armor_type, armor_value):
1360         # KrbFastArmor    ::= SEQUENCE {
1361         #         armor-type      [0] Int32,
1362         #         armor-value     [1] OCTET STRING,
1363         #         ...
1364         # }
1365         KRB_FAST_ARMOR_obj = {
1366             'armor-type': armor_type,
1367             'armor-value': armor_value
1368         }
1369         return KRB_FAST_ARMOR_obj
1370
1371     def KRB_FAST_REQ_create(self, fast_options, padata, req_body):
1372         # KrbFastReq      ::= SEQUENCE {
1373         #         fast-options    [0] FastOptions,
1374         #         padata          [1] SEQUENCE OF PA-DATA,
1375         #         req-body        [2] KDC-REQ-BODY,
1376         #         ...
1377         # }
1378         KRB_FAST_REQ_obj = {
1379             'fast-options': fast_options,
1380             'padata': padata,
1381             'req-body': req_body
1382         }
1383         return KRB_FAST_REQ_obj
1384
1385     def KRB_FAST_ARMORED_REQ_create(self, armor, req_checksum, enc_fast_req):
1386         # KrbFastArmoredReq ::= SEQUENCE {
1387         #         armor           [0] KrbFastArmor OPTIONAL,
1388         #         req-checksum    [1] Checksum,
1389         #         enc-fast-req    [2] EncryptedData -- KrbFastReq --
1390         # }
1391         KRB_FAST_ARMORED_REQ_obj = {
1392             'req-checksum': req_checksum,
1393             'enc-fast-req': enc_fast_req
1394         }
1395         if armor is not None:
1396             KRB_FAST_ARMORED_REQ_obj['armor'] = armor
1397         return KRB_FAST_ARMORED_REQ_obj
1398
1399     def PA_FX_FAST_REQUEST_create(self, armored_data):
1400         # PA-FX-FAST-REQUEST ::= CHOICE {
1401         #         armored-data    [0] KrbFastArmoredReq,
1402         #         ...
1403         # }
1404         PA_FX_FAST_REQUEST_obj = {
1405             'armored-data': armored_data
1406         }
1407         return PA_FX_FAST_REQUEST_obj
1408
1409     def KERB_PA_PAC_REQUEST_create(self, include_pac, pa_data_create=True):
1410         # KERB-PA-PAC-REQUEST ::= SEQUENCE {
1411         #         include-pac[0] BOOLEAN --If TRUE, and no pac present,
1412         #                                --    include PAC.
1413         #                                --If FALSE, and PAC present,
1414         #                                --    remove PAC.
1415         # }
1416         KERB_PA_PAC_REQUEST_obj = {
1417             'include-pac': include_pac,
1418         }
1419         if not pa_data_create:
1420             return KERB_PA_PAC_REQUEST_obj
1421         pa_pac = self.der_encode(KERB_PA_PAC_REQUEST_obj,
1422                                  asn1Spec=krb5_asn1.KERB_PA_PAC_REQUEST())
1423         pa_data = self.PA_DATA_create(PADATA_PAC_REQUEST, pa_pac)
1424         return pa_data
1425
1426     def get_pa_pac_options(self, options):
1427         pac_options = self.PA_PAC_OPTIONS_create(options)
1428         pac_options = self.der_encode(pac_options,
1429                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
1430         pac_options = self.PA_DATA_create(PADATA_PAC_OPTIONS, pac_options)
1431
1432         return pac_options
1433
1434     def KDC_REQ_BODY_create(self,
1435                             kdc_options,
1436                             cname,
1437                             realm,
1438                             sname,
1439                             from_time,
1440                             till_time,
1441                             renew_time,
1442                             nonce,
1443                             etypes,
1444                             addresses,
1445                             additional_tickets,
1446                             EncAuthorizationData,
1447                             EncAuthorizationData_key,
1448                             EncAuthorizationData_usage,
1449                             asn1_print=None,
1450                             hexdump=None):
1451         # KDC-REQ-BODY    ::= SEQUENCE {
1452         #        kdc-options             [0] KDCOptions,
1453         #        cname                   [1] PrincipalName OPTIONAL
1454         #                                    -- Used only in AS-REQ --,
1455         #        realm                   [2] Realm
1456         #                                    -- Server's realm
1457         #                                    -- Also client's in AS-REQ --,
1458         #        sname                   [3] PrincipalName OPTIONAL,
1459         #        from                    [4] KerberosTime OPTIONAL,
1460         #        till                    [5] KerberosTime,
1461         #        rtime                   [6] KerberosTime OPTIONAL,
1462         #        nonce                   [7] UInt32,
1463         #        etype                   [8] SEQUENCE OF Int32
1464         #                                    -- EncryptionType
1465         #                                    -- in preference order --,
1466         #        addresses               [9] HostAddresses OPTIONAL,
1467         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1468         #                                    -- AuthorizationData --,
1469         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1470         #                                        -- NOTE: not empty
1471         # }
1472         if EncAuthorizationData is not None:
1473             enc_ad_plain = self.der_encode(
1474                 EncAuthorizationData,
1475                 asn1Spec=krb5_asn1.AuthorizationData(),
1476                 asn1_print=asn1_print,
1477                 hexdump=hexdump)
1478             enc_ad = self.EncryptedData_create(EncAuthorizationData_key,
1479                                                EncAuthorizationData_usage,
1480                                                enc_ad_plain)
1481         else:
1482             enc_ad = None
1483         KDC_REQ_BODY_obj = {
1484             'kdc-options': kdc_options,
1485             'realm': realm,
1486             'till': till_time,
1487             'nonce': nonce,
1488             'etype': etypes,
1489         }
1490         if cname is not None:
1491             KDC_REQ_BODY_obj['cname'] = cname
1492         if sname is not None:
1493             KDC_REQ_BODY_obj['sname'] = sname
1494         if from_time is not None:
1495             KDC_REQ_BODY_obj['from'] = from_time
1496         if renew_time is not None:
1497             KDC_REQ_BODY_obj['rtime'] = renew_time
1498         if addresses is not None:
1499             KDC_REQ_BODY_obj['addresses'] = addresses
1500         if enc_ad is not None:
1501             KDC_REQ_BODY_obj['enc-authorization-data'] = enc_ad
1502         if additional_tickets is not None:
1503             KDC_REQ_BODY_obj['additional-tickets'] = additional_tickets
1504         return KDC_REQ_BODY_obj
1505
1506     def KDC_REQ_create(self,
1507                        msg_type,
1508                        padata,
1509                        req_body,
1510                        asn1Spec=None,
1511                        asn1_print=None,
1512                        hexdump=None):
1513         # KDC-REQ         ::= SEQUENCE {
1514         #        -- NOTE: first tag is [1], not [0]
1515         #        pvno            [1] INTEGER (5) ,
1516         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1517         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1518         #                            -- NOTE: not empty --,
1519         #        req-body        [4] KDC-REQ-BODY
1520         # }
1521         #
1522         KDC_REQ_obj = {
1523             'pvno': 5,
1524             'msg-type': msg_type,
1525             'req-body': req_body,
1526         }
1527         if padata is not None:
1528             KDC_REQ_obj['padata'] = padata
1529         if asn1Spec is not None:
1530             KDC_REQ_decoded = pyasn1_native_decode(
1531                 KDC_REQ_obj, asn1Spec=asn1Spec)
1532         else:
1533             KDC_REQ_decoded = None
1534         return KDC_REQ_obj, KDC_REQ_decoded
1535
1536     def AS_REQ_create(self,
1537                       padata,       # optional
1538                       kdc_options,  # required
1539                       cname,        # optional
1540                       realm,        # required
1541                       sname,        # optional
1542                       from_time,    # optional
1543                       till_time,    # required
1544                       renew_time,   # optional
1545                       nonce,        # required
1546                       etypes,       # required
1547                       addresses,    # optional
1548                       additional_tickets,
1549                       native_decoded_only=True,
1550                       asn1_print=None,
1551                       hexdump=None):
1552         # KDC-REQ         ::= SEQUENCE {
1553         #        -- NOTE: first tag is [1], not [0]
1554         #        pvno            [1] INTEGER (5) ,
1555         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1556         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1557         #                            -- NOTE: not empty --,
1558         #        req-body        [4] KDC-REQ-BODY
1559         # }
1560         #
1561         # KDC-REQ-BODY    ::= SEQUENCE {
1562         #        kdc-options             [0] KDCOptions,
1563         #        cname                   [1] PrincipalName OPTIONAL
1564         #                                    -- Used only in AS-REQ --,
1565         #        realm                   [2] Realm
1566         #                                    -- Server's realm
1567         #                                    -- Also client's in AS-REQ --,
1568         #        sname                   [3] PrincipalName OPTIONAL,
1569         #        from                    [4] KerberosTime OPTIONAL,
1570         #        till                    [5] KerberosTime,
1571         #        rtime                   [6] KerberosTime OPTIONAL,
1572         #        nonce                   [7] UInt32,
1573         #        etype                   [8] SEQUENCE OF Int32
1574         #                                    -- EncryptionType
1575         #                                    -- in preference order --,
1576         #        addresses               [9] HostAddresses OPTIONAL,
1577         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1578         #                                    -- AuthorizationData --,
1579         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1580         #                                        -- NOTE: not empty
1581         # }
1582         KDC_REQ_BODY_obj = self.KDC_REQ_BODY_create(
1583             kdc_options,
1584             cname,
1585             realm,
1586             sname,
1587             from_time,
1588             till_time,
1589             renew_time,
1590             nonce,
1591             etypes,
1592             addresses,
1593             additional_tickets,
1594             EncAuthorizationData=None,
1595             EncAuthorizationData_key=None,
1596             EncAuthorizationData_usage=None,
1597             asn1_print=asn1_print,
1598             hexdump=hexdump)
1599         obj, decoded = self.KDC_REQ_create(
1600             msg_type=KRB_AS_REQ,
1601             padata=padata,
1602             req_body=KDC_REQ_BODY_obj,
1603             asn1Spec=krb5_asn1.AS_REQ(),
1604             asn1_print=asn1_print,
1605             hexdump=hexdump)
1606         if native_decoded_only:
1607             return decoded
1608         return decoded, obj
1609
1610     def AP_REQ_create(self, ap_options, ticket, authenticator):
1611         # AP-REQ          ::= [APPLICATION 14] SEQUENCE {
1612         #        pvno            [0] INTEGER (5),
1613         #        msg-type        [1] INTEGER (14),
1614         #        ap-options      [2] APOptions,
1615         #        ticket          [3] Ticket,
1616         #        authenticator   [4] EncryptedData -- Authenticator
1617         # }
1618         AP_REQ_obj = {
1619             'pvno': 5,
1620             'msg-type': KRB_AP_REQ,
1621             'ap-options': ap_options,
1622             'ticket': ticket,
1623             'authenticator': authenticator,
1624         }
1625         return AP_REQ_obj
1626
1627     def Authenticator_create(
1628             self, crealm, cname, cksum, cusec, ctime, subkey, seq_number,
1629             authorization_data):
1630         # -- Unencrypted authenticator
1631         # Authenticator   ::= [APPLICATION 2] SEQUENCE  {
1632         #        authenticator-vno       [0] INTEGER (5),
1633         #        crealm                  [1] Realm,
1634         #        cname                   [2] PrincipalName,
1635         #        cksum                   [3] Checksum OPTIONAL,
1636         #        cusec                   [4] Microseconds,
1637         #        ctime                   [5] KerberosTime,
1638         #        subkey                  [6] EncryptionKey OPTIONAL,
1639         #        seq-number              [7] UInt32 OPTIONAL,
1640         #        authorization-data      [8] AuthorizationData OPTIONAL
1641         # }
1642         Authenticator_obj = {
1643             'authenticator-vno': 5,
1644             'crealm': crealm,
1645             'cname': cname,
1646             'cusec': cusec,
1647             'ctime': ctime,
1648         }
1649         if cksum is not None:
1650             Authenticator_obj['cksum'] = cksum
1651         if subkey is not None:
1652             Authenticator_obj['subkey'] = subkey
1653         if seq_number is not None:
1654             Authenticator_obj['seq-number'] = seq_number
1655         if authorization_data is not None:
1656             Authenticator_obj['authorization-data'] = authorization_data
1657         return Authenticator_obj
1658
1659     def TGS_REQ_create(self,
1660                        padata,       # optional
1661                        cusec,
1662                        ctime,
1663                        ticket,
1664                        kdc_options,  # required
1665                        cname,        # optional
1666                        realm,        # required
1667                        sname,        # optional
1668                        from_time,    # optional
1669                        till_time,    # required
1670                        renew_time,   # optional
1671                        nonce,        # required
1672                        etypes,       # required
1673                        addresses,    # optional
1674                        EncAuthorizationData,
1675                        EncAuthorizationData_key,
1676                        additional_tickets,
1677                        ticket_session_key,
1678                        authenticator_subkey=None,
1679                        body_checksum_type=None,
1680                        native_decoded_only=True,
1681                        asn1_print=None,
1682                        hexdump=None):
1683         # KDC-REQ         ::= SEQUENCE {
1684         #        -- NOTE: first tag is [1], not [0]
1685         #        pvno            [1] INTEGER (5) ,
1686         #        msg-type        [2] INTEGER (10 -- AS -- | 12 -- TGS --),
1687         #        padata          [3] SEQUENCE OF PA-DATA OPTIONAL
1688         #                            -- NOTE: not empty --,
1689         #        req-body        [4] KDC-REQ-BODY
1690         # }
1691         #
1692         # KDC-REQ-BODY    ::= SEQUENCE {
1693         #        kdc-options             [0] KDCOptions,
1694         #        cname                   [1] PrincipalName OPTIONAL
1695         #                                    -- Used only in AS-REQ --,
1696         #        realm                   [2] Realm
1697         #                                    -- Server's realm
1698         #                                    -- Also client's in AS-REQ --,
1699         #        sname                   [3] PrincipalName OPTIONAL,
1700         #        from                    [4] KerberosTime OPTIONAL,
1701         #        till                    [5] KerberosTime,
1702         #        rtime                   [6] KerberosTime OPTIONAL,
1703         #        nonce                   [7] UInt32,
1704         #        etype                   [8] SEQUENCE OF Int32
1705         #                                    -- EncryptionType
1706         #                                    -- in preference order --,
1707         #        addresses               [9] HostAddresses OPTIONAL,
1708         #        enc-authorization-data  [10] EncryptedData OPTIONAL
1709         #                                    -- AuthorizationData --,
1710         #        additional-tickets      [11] SEQUENCE OF Ticket OPTIONAL
1711         #                                        -- NOTE: not empty
1712         # }
1713
1714         if authenticator_subkey is not None:
1715             EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SUBKEY
1716         else:
1717             EncAuthorizationData_usage = KU_TGS_REQ_AUTH_DAT_SESSION
1718
1719         req_body = self.KDC_REQ_BODY_create(
1720             kdc_options=kdc_options,
1721             cname=None,
1722             realm=realm,
1723             sname=sname,
1724             from_time=from_time,
1725             till_time=till_time,
1726             renew_time=renew_time,
1727             nonce=nonce,
1728             etypes=etypes,
1729             addresses=addresses,
1730             additional_tickets=additional_tickets,
1731             EncAuthorizationData=EncAuthorizationData,
1732             EncAuthorizationData_key=EncAuthorizationData_key,
1733             EncAuthorizationData_usage=EncAuthorizationData_usage)
1734         req_body_blob = self.der_encode(req_body,
1735                                         asn1Spec=krb5_asn1.KDC_REQ_BODY(),
1736                                         asn1_print=asn1_print, hexdump=hexdump)
1737
1738         req_body_checksum = self.Checksum_create(ticket_session_key,
1739                                                  KU_TGS_REQ_AUTH_CKSUM,
1740                                                  req_body_blob,
1741                                                  ctype=body_checksum_type)
1742
1743         subkey_obj = None
1744         if authenticator_subkey is not None:
1745             subkey_obj = authenticator_subkey.export_obj()
1746         seq_number = random.randint(0, 0xfffffffe)
1747         authenticator = self.Authenticator_create(
1748             crealm=realm,
1749             cname=cname,
1750             cksum=req_body_checksum,
1751             cusec=cusec,
1752             ctime=ctime,
1753             subkey=subkey_obj,
1754             seq_number=seq_number,
1755             authorization_data=None)
1756         authenticator = self.der_encode(
1757             authenticator,
1758             asn1Spec=krb5_asn1.Authenticator(),
1759             asn1_print=asn1_print,
1760             hexdump=hexdump)
1761
1762         authenticator = self.EncryptedData_create(
1763             ticket_session_key, KU_TGS_REQ_AUTH, authenticator)
1764
1765         ap_options = krb5_asn1.APOptions('0')
1766         ap_req = self.AP_REQ_create(ap_options=str(ap_options),
1767                                     ticket=ticket,
1768                                     authenticator=authenticator)
1769         ap_req = self.der_encode(ap_req, asn1Spec=krb5_asn1.AP_REQ(),
1770                                  asn1_print=asn1_print, hexdump=hexdump)
1771         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
1772         if padata is not None:
1773             padata.append(pa_tgs_req)
1774         else:
1775             padata = [pa_tgs_req]
1776
1777         obj, decoded = self.KDC_REQ_create(
1778             msg_type=KRB_TGS_REQ,
1779             padata=padata,
1780             req_body=req_body,
1781             asn1Spec=krb5_asn1.TGS_REQ(),
1782             asn1_print=asn1_print,
1783             hexdump=hexdump)
1784         if native_decoded_only:
1785             return decoded
1786         return decoded, obj
1787
1788     def PA_S4U2Self_create(self, name, realm, tgt_session_key, ctype=None):
1789         # PA-S4U2Self     ::= SEQUENCE {
1790         #        name            [0] PrincipalName,
1791         #        realm           [1] Realm,
1792         #        cksum           [2] Checksum,
1793         #        auth            [3] GeneralString
1794         # }
1795         cksum_data = name['name-type'].to_bytes(4, byteorder='little')
1796         for n in name['name-string']:
1797             cksum_data += n.encode()
1798         cksum_data += realm.encode()
1799         cksum_data += "Kerberos".encode()
1800         cksum = self.Checksum_create(tgt_session_key,
1801                                      KU_NON_KERB_CKSUM_SALT,
1802                                      cksum_data,
1803                                      ctype)
1804
1805         PA_S4U2Self_obj = {
1806             'name': name,
1807             'realm': realm,
1808             'cksum': cksum,
1809             'auth': "Kerberos",
1810         }
1811         pa_s4u2self = self.der_encode(
1812             PA_S4U2Self_obj, asn1Spec=krb5_asn1.PA_S4U2Self())
1813         return self.PA_DATA_create(PADATA_FOR_USER, pa_s4u2self)
1814
1815     def ChangePasswdDataMS_create(self,
1816                                   new_password,
1817                                   target_princ=None,
1818                                   target_realm=None):
1819         ChangePasswdDataMS_obj = {
1820             'newpasswd': new_password,
1821         }
1822         if target_princ is not None:
1823             ChangePasswdDataMS_obj['targname'] = target_princ
1824         if target_realm is not None:
1825             ChangePasswdDataMS_obj['targrealm'] = target_realm
1826
1827         change_password_data = self.der_encode(
1828             ChangePasswdDataMS_obj, asn1Spec=krb5_asn1.ChangePasswdDataMS())
1829
1830         return change_password_data
1831
1832     def KRB_PRIV_create(self,
1833                         subkey,
1834                         user_data,
1835                         s_address,
1836                         timestamp=None,
1837                         usec=None,
1838                         seq_number=None,
1839                         r_address=None):
1840         EncKrbPrivPart_obj = {
1841             'user-data': user_data,
1842             's-address': s_address,
1843         }
1844         if timestamp is not None:
1845             EncKrbPrivPart_obj['timestamp'] = timestamp
1846         if usec is not None:
1847             EncKrbPrivPart_obj['usec'] = usec
1848         if seq_number is not None:
1849             EncKrbPrivPart_obj['seq-number'] = seq_number
1850         if r_address is not None:
1851             EncKrbPrivPart_obj['r-address'] = r_address
1852
1853         enc_krb_priv_part = self.der_encode(
1854             EncKrbPrivPart_obj, asn1Spec=krb5_asn1.EncKrbPrivPart())
1855
1856         enc_data = self.EncryptedData_create(subkey,
1857                                              KU_KRB_PRIV,
1858                                              enc_krb_priv_part)
1859
1860         KRB_PRIV_obj = {
1861             'pvno': 5,
1862             'msg-type': KRB_PRIV,
1863             'enc-part': enc_data,
1864         }
1865
1866         krb_priv = self.der_encode(
1867             KRB_PRIV_obj, asn1Spec=krb5_asn1.KRB_PRIV())
1868
1869         return krb_priv
1870
1871     def kpasswd_create(self,
1872                        subkey,
1873                        user_data,
1874                        version,
1875                        seq_number,
1876                        ap_req,
1877                        local_address,
1878                        remote_address):
1879         self.assertIsNotNone(self.s, 'call self.connect() first')
1880
1881         timestamp, usec = self.get_KerberosTimeWithUsec()
1882
1883         krb_priv = self.KRB_PRIV_create(subkey,
1884                                         user_data,
1885                                         s_address=local_address,
1886                                         timestamp=timestamp,
1887                                         usec=usec,
1888                                         seq_number=seq_number,
1889                                         r_address=remote_address)
1890
1891         size = 6 + len(ap_req) + len(krb_priv)
1892         self.assertLess(size, 0x10000)
1893
1894         msg = bytearray()
1895         msg.append(size >> 8)
1896         msg.append(size & 0xff)
1897         msg.append(version >> 8)
1898         msg.append(version & 0xff)
1899         msg.append(len(ap_req) >> 8)
1900         msg.append(len(ap_req) & 0xff)
1901         # Note: for sets, there could be a little-endian four-byte length here.
1902
1903         msg.extend(ap_req)
1904         msg.extend(krb_priv)
1905
1906         return msg
1907
1908     def get_enc_part(self, obj, key, usage):
1909         self.assertElementEqual(obj, 'pvno', 5)
1910
1911         enc_part = obj['enc-part']
1912         self.assertElementEqual(enc_part, 'etype', key.etype)
1913         self.assertElementKVNO(enc_part, 'kvno', key.kvno)
1914
1915         enc_part = key.decrypt(usage, enc_part['cipher'])
1916
1917         return enc_part
1918
1919     def kpasswd_exchange(self,
1920                          ticket,
1921                          new_password,
1922                          expected_code,
1923                          expected_msg,
1924                          mode,
1925                          target_princ=None,
1926                          target_realm=None,
1927                          ap_options=None,
1928                          send_seq_number=True):
1929         if mode is self.KpasswdMode.SET:
1930             version = 0xff80
1931             user_data = self.ChangePasswdDataMS_create(new_password,
1932                                                        target_princ,
1933                                                        target_realm)
1934         elif mode is self.KpasswdMode.CHANGE:
1935             self.assertIsNone(target_princ,
1936                               'target_princ only valid for pw set')
1937             self.assertIsNone(target_realm,
1938                               'target_realm only valid for pw set')
1939
1940             version = 1
1941             user_data = new_password.encode('utf-8')
1942         else:
1943             self.fail(f'invalid mode {mode}')
1944
1945         subkey = self.RandomKey(kcrypto.Enctype.AES256)
1946
1947         if ap_options is None:
1948             ap_options = '0'
1949         ap_options = str(krb5_asn1.APOptions(ap_options))
1950
1951         kdc_exchange_dict = {
1952             'tgt': ticket,
1953             'authenticator_subkey': subkey,
1954             'auth_data': None,
1955             'ap_options': ap_options,
1956         }
1957
1958         if send_seq_number:
1959             seq_number = random.randint(0, 0xfffffffe)
1960         else:
1961             seq_number = None
1962
1963         ap_req = self.generate_ap_req(kdc_exchange_dict,
1964                                       None,
1965                                       req_body=None,
1966                                       armor=False,
1967                                       usage=KU_AP_REQ_AUTH,
1968                                       seq_number=seq_number)
1969
1970         self.connect(self.host, port=464)
1971         self.assertIsNotNone(self.s)
1972
1973         family = self.s.family
1974
1975         if family == socket.AF_INET:
1976             addr_type = 2  # IPv4
1977         elif family == socket.AF_INET6:
1978             addr_type = 24  # IPv6
1979         else:
1980             self.fail(f'unknown family {family}')
1981
1982         def create_address(ip):
1983             return {
1984                 'addr-type': addr_type,
1985                 'address': socket.inet_pton(family, ip),
1986             }
1987
1988         local_ip = self.s.getsockname()[0]
1989         local_address = create_address(local_ip)
1990
1991         # remote_ip = self.s.getpeername()[0]
1992         # remote_address = create_address(remote_ip)
1993
1994         # TODO: due to a bug (?), MIT Kerberos will not accept the request
1995         # unless r-address is set to our _local_ address. Heimdal, on the other
1996         # hand, requires the r-address is set to the remote address (as
1997         # expected). To avoid problems, avoid sending r-address for now.
1998         remote_address = None
1999
2000         msg = self.kpasswd_create(subkey,
2001                                   user_data,
2002                                   version,
2003                                   seq_number,
2004                                   ap_req,
2005                                   local_address,
2006                                   remote_address)
2007
2008         self.send_msg(msg)
2009         rep_pdu = self.recv_pdu_raw()
2010
2011         self._disconnect('transaction done')
2012
2013         self.assertIsNotNone(rep_pdu)
2014
2015         header = rep_pdu[:6]
2016         reply = rep_pdu[6:]
2017
2018         reply_len = (header[0] << 8) | header[1]
2019         reply_version = (header[2] << 8) | header[3]
2020         ap_rep_len = (header[4] << 8) | header[5]
2021
2022         self.assertEqual(reply_len, len(rep_pdu))
2023         self.assertEqual(1, reply_version)  # KRB5_KPASSWD_VERS_CHANGEPW
2024         self.assertLess(ap_rep_len, reply_len)
2025
2026         self.assertNotEqual(0x7e, rep_pdu[1])
2027         self.assertNotEqual(0x5e, rep_pdu[1])
2028
2029         if ap_rep_len:
2030             # We received an AP-REQ and KRB-PRIV as a response. This may or may
2031             # not indicate an error, depending on the status code.
2032             ap_rep = reply[:ap_rep_len]
2033             krb_priv = reply[ap_rep_len:]
2034
2035             key = ticket.session_key
2036
2037             ap_rep = self.der_decode(ap_rep, asn1Spec=krb5_asn1.AP_REP())
2038             self.assertElementEqual(ap_rep, 'msg-type', KRB_AP_REP)
2039             enc_part = self.get_enc_part(ap_rep, key, KU_AP_REQ_ENC_PART)
2040             enc_part = self.der_decode(
2041                 enc_part, asn1Spec=krb5_asn1.EncAPRepPart())
2042
2043             self.assertElementPresent(enc_part, 'ctime')
2044             self.assertElementPresent(enc_part, 'cusec')
2045             # self.assertElementMissing(enc_part, 'subkey') # TODO
2046             # self.assertElementPresent(enc_part, 'seq-number') # TODO
2047
2048             try:
2049                 krb_priv = self.der_decode(krb_priv, asn1Spec=krb5_asn1.KRB_PRIV())
2050             except PyAsn1Error:
2051                 self.fail()
2052
2053             self.assertElementEqual(krb_priv, 'msg-type', KRB_PRIV)
2054             priv_enc_part = self.get_enc_part(krb_priv, subkey, KU_KRB_PRIV)
2055             priv_enc_part = self.der_decode(
2056                 priv_enc_part, asn1Spec=krb5_asn1.EncKrbPrivPart())
2057
2058             self.assertElementMissing(priv_enc_part, 'timestamp')
2059             self.assertElementMissing(priv_enc_part, 'usec')
2060             # self.assertElementPresent(priv_enc_part, 'seq-number') # TODO
2061             # self.assertElementEqual(priv_enc_part, 's-address', remote_address) # TODO
2062             # self.assertElementMissing(priv_enc_part, 'r-address') # TODO
2063
2064             result_data = priv_enc_part['user-data']
2065         else:
2066             # We received a KRB-ERROR as a response, indicating an error.
2067             krb_error = self.der_decode(reply, asn1Spec=krb5_asn1.KRB_ERROR())
2068
2069             sname = self.PrincipalName_create(
2070                 name_type=NT_PRINCIPAL,
2071                 names=['kadmin', 'changepw'])
2072             realm = self.get_krbtgt_creds().get_realm().upper()
2073
2074             self.assertElementEqual(krb_error, 'pvno', 5)
2075             self.assertElementEqual(krb_error, 'msg-type', KRB_ERROR)
2076             self.assertElementMissing(krb_error, 'ctime')
2077             self.assertElementMissing(krb_error, 'usec')
2078             self.assertElementPresent(krb_error, 'stime')
2079             self.assertElementPresent(krb_error, 'susec')
2080
2081             error_code = krb_error['error-code']
2082             if isinstance(expected_code, int):
2083                 self.assertEqual(error_code, expected_code)
2084             else:
2085                 self.assertIn(error_code, expected_code)
2086
2087             self.assertElementMissing(krb_error, 'crealm')
2088             self.assertElementMissing(krb_error, 'cname')
2089             self.assertElementEqual(krb_error, 'realm', realm.encode('utf-8'))
2090             self.assertElementEqualPrincipal(krb_error, 'sname', sname)
2091             self.assertElementMissing(krb_error, 'e-text')
2092
2093             result_data = krb_error['e-data']
2094
2095         status = result_data[:2]
2096         message = result_data[2:]
2097
2098         status_code = (status[0] << 8) | status[1]
2099         if isinstance(expected_code, int):
2100             self.assertEqual(status_code, expected_code)
2101         else:
2102             self.assertIn(status_code, expected_code)
2103
2104         if not message:
2105             self.assertEqual(0, status_code,
2106                              'got an error result, but no message')
2107             return
2108
2109         # Check the first character of the message.
2110         if message[0]:
2111             if isinstance(expected_msg, bytes):
2112                 self.assertEqual(message, expected_msg)
2113             else:
2114                 self.assertIn(message, expected_msg)
2115         else:
2116             # We got AD password policy information.
2117             self.assertEqual(30, len(message))
2118
2119             (empty_bytes,
2120              min_length,
2121              history_length,
2122              properties,
2123              expire_time,
2124              min_age) = struct.unpack('>HIIIQQ', message)
2125
2126     def _generic_kdc_exchange(self,
2127                               kdc_exchange_dict,  # required
2128                               cname=None,  # optional
2129                               realm=None,  # required
2130                               sname=None,  # optional
2131                               from_time=None,  # optional
2132                               till_time=None,  # required
2133                               renew_time=None,  # optional
2134                               etypes=None,  # required
2135                               addresses=None,  # optional
2136                               additional_tickets=None,  # optional
2137                               EncAuthorizationData=None,  # optional
2138                               EncAuthorizationData_key=None,  # optional
2139                               EncAuthorizationData_usage=None):  # optional
2140
2141         check_error_fn = kdc_exchange_dict['check_error_fn']
2142         check_rep_fn = kdc_exchange_dict['check_rep_fn']
2143         generate_fast_fn = kdc_exchange_dict['generate_fast_fn']
2144         generate_fast_armor_fn = kdc_exchange_dict['generate_fast_armor_fn']
2145         generate_fast_padata_fn = kdc_exchange_dict['generate_fast_padata_fn']
2146         generate_padata_fn = kdc_exchange_dict['generate_padata_fn']
2147         callback_dict = kdc_exchange_dict['callback_dict']
2148         req_msg_type = kdc_exchange_dict['req_msg_type']
2149         req_asn1Spec = kdc_exchange_dict['req_asn1Spec']
2150         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2151
2152         expected_error_mode = kdc_exchange_dict['expected_error_mode']
2153         kdc_options = kdc_exchange_dict['kdc_options']
2154
2155         pac_request = kdc_exchange_dict['pac_request']
2156         pac_options = kdc_exchange_dict['pac_options']
2157
2158         # Parameters specific to the inner request body
2159         inner_req = kdc_exchange_dict['inner_req']
2160
2161         # Parameters specific to the outer request body
2162         outer_req = kdc_exchange_dict['outer_req']
2163
2164         if till_time is None:
2165             till_time = self.get_KerberosTime(offset=36000)
2166
2167         if 'nonce' in kdc_exchange_dict:
2168             nonce = kdc_exchange_dict['nonce']
2169         else:
2170             nonce = self.get_Nonce()
2171             kdc_exchange_dict['nonce'] = nonce
2172
2173         req_body = self.KDC_REQ_BODY_create(
2174             kdc_options=kdc_options,
2175             cname=cname,
2176             realm=realm,
2177             sname=sname,
2178             from_time=from_time,
2179             till_time=till_time,
2180             renew_time=renew_time,
2181             nonce=nonce,
2182             etypes=etypes,
2183             addresses=addresses,
2184             additional_tickets=additional_tickets,
2185             EncAuthorizationData=EncAuthorizationData,
2186             EncAuthorizationData_key=EncAuthorizationData_key,
2187             EncAuthorizationData_usage=EncAuthorizationData_usage)
2188
2189         inner_req_body = dict(req_body)
2190         if inner_req is not None:
2191             for key, value in inner_req.items():
2192                 if value is not None:
2193                     inner_req_body[key] = value
2194                 else:
2195                     del inner_req_body[key]
2196         if outer_req is not None:
2197             for key, value in outer_req.items():
2198                 if value is not None:
2199                     req_body[key] = value
2200                 else:
2201                     del req_body[key]
2202
2203         additional_padata = []
2204         if pac_request is not None:
2205             pa_pac_request = self.KERB_PA_PAC_REQUEST_create(pac_request)
2206             additional_padata.append(pa_pac_request)
2207         if pac_options is not None:
2208             pa_pac_options = self.get_pa_pac_options(pac_options)
2209             additional_padata.append(pa_pac_options)
2210
2211         if req_msg_type == KRB_AS_REQ:
2212             tgs_req = None
2213             tgs_req_padata = None
2214         else:
2215             self.assertEqual(KRB_TGS_REQ, req_msg_type)
2216
2217             tgs_req = self.generate_ap_req(kdc_exchange_dict,
2218                                            callback_dict,
2219                                            req_body,
2220                                            armor=False)
2221             tgs_req_padata = self.PA_DATA_create(PADATA_KDC_REQ, tgs_req)
2222
2223         if generate_fast_padata_fn is not None:
2224             self.assertIsNotNone(generate_fast_fn)
2225             # This can alter req_body...
2226             fast_padata, req_body = generate_fast_padata_fn(kdc_exchange_dict,
2227                                                             callback_dict,
2228                                                             req_body)
2229         else:
2230             fast_padata = []
2231
2232         if generate_fast_armor_fn is not None:
2233             self.assertIsNotNone(generate_fast_fn)
2234             fast_ap_req = generate_fast_armor_fn(kdc_exchange_dict,
2235                                                  callback_dict,
2236                                                  None,
2237                                                  armor=True)
2238
2239             fast_armor_type = kdc_exchange_dict['fast_armor_type']
2240             fast_armor = self.KRB_FAST_ARMOR_create(fast_armor_type,
2241                                                     fast_ap_req)
2242         else:
2243             fast_armor = None
2244
2245         if generate_padata_fn is not None:
2246             # This can alter req_body...
2247             outer_padata, req_body = generate_padata_fn(kdc_exchange_dict,
2248                                                         callback_dict,
2249                                                         req_body)
2250             self.assertIsNotNone(outer_padata)
2251             self.assertNotIn(PADATA_KDC_REQ,
2252                              [pa['padata-type'] for pa in outer_padata],
2253                              'Don\'t create TGS-REQ manually')
2254         else:
2255             outer_padata = None
2256
2257         if generate_fast_fn is not None:
2258             armor_key = kdc_exchange_dict['armor_key']
2259             self.assertIsNotNone(armor_key)
2260
2261             if req_msg_type == KRB_AS_REQ:
2262                 checksum_blob = self.der_encode(
2263                     req_body,
2264                     asn1Spec=krb5_asn1.KDC_REQ_BODY())
2265             else:
2266                 self.assertEqual(KRB_TGS_REQ, req_msg_type)
2267                 checksum_blob = tgs_req
2268
2269             checksum = self.Checksum_create(armor_key,
2270                                             KU_FAST_REQ_CHKSUM,
2271                                             checksum_blob)
2272
2273             fast_padata += additional_padata
2274             fast = generate_fast_fn(kdc_exchange_dict,
2275                                     callback_dict,
2276                                     inner_req_body,
2277                                     fast_padata,
2278                                     fast_armor,
2279                                     checksum)
2280         else:
2281             fast = None
2282
2283         padata = []
2284
2285         if tgs_req_padata is not None:
2286             padata.append(tgs_req_padata)
2287
2288         if fast is not None:
2289             padata.append(fast)
2290
2291         if outer_padata is not None:
2292             padata += outer_padata
2293
2294         if fast is None:
2295             padata += additional_padata
2296
2297         if not padata:
2298             padata = None
2299
2300         kdc_exchange_dict['req_padata'] = padata
2301         kdc_exchange_dict['fast_padata'] = fast_padata
2302         kdc_exchange_dict['req_body'] = inner_req_body
2303
2304         req_obj, req_decoded = self.KDC_REQ_create(msg_type=req_msg_type,
2305                                                    padata=padata,
2306                                                    req_body=req_body,
2307                                                    asn1Spec=req_asn1Spec())
2308
2309         to_rodc = kdc_exchange_dict['to_rodc']
2310
2311         rep = self.send_recv_transaction(req_decoded, to_rodc=to_rodc)
2312         self.assertIsNotNone(rep)
2313
2314         msg_type = self.getElementValue(rep, 'msg-type')
2315         self.assertIsNotNone(msg_type)
2316
2317         expected_msg_type = None
2318         if check_error_fn is not None:
2319             expected_msg_type = KRB_ERROR
2320             self.assertIsNone(check_rep_fn)
2321             self.assertNotEqual(0, len(expected_error_mode))
2322             self.assertNotIn(0, expected_error_mode)
2323         if check_rep_fn is not None:
2324             expected_msg_type = rep_msg_type
2325             self.assertIsNone(check_error_fn)
2326             self.assertEqual(0, len(expected_error_mode))
2327         self.assertIsNotNone(expected_msg_type)
2328         if msg_type == KRB_ERROR:
2329             error_code = self.getElementValue(rep, 'error-code')
2330             fail_msg = f'Got unexpected error: {error_code}'
2331         else:
2332             fail_msg = f'Expected to fail with error: {expected_error_mode}'
2333         self.assertEqual(msg_type, expected_msg_type, fail_msg)
2334
2335         if msg_type == KRB_ERROR:
2336             return check_error_fn(kdc_exchange_dict,
2337                                   callback_dict,
2338                                   rep)
2339
2340         return check_rep_fn(kdc_exchange_dict, callback_dict, rep)
2341
2342     def as_exchange_dict(self,
2343                          expected_crealm=None,
2344                          expected_cname=None,
2345                          expected_anon=False,
2346                          expected_srealm=None,
2347                          expected_sname=None,
2348                          expected_account_name=None,
2349                          expected_upn_name=None,
2350                          expected_sid=None,
2351                          expected_supported_etypes=None,
2352                          expected_flags=None,
2353                          unexpected_flags=None,
2354                          ticket_decryption_key=None,
2355                          expect_ticket_checksum=None,
2356                          generate_fast_fn=None,
2357                          generate_fast_armor_fn=None,
2358                          generate_fast_padata_fn=None,
2359                          fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2360                          generate_padata_fn=None,
2361                          check_error_fn=None,
2362                          check_rep_fn=None,
2363                          check_kdc_private_fn=None,
2364                          callback_dict=None,
2365                          expected_error_mode=0,
2366                          expected_status=None,
2367                          client_as_etypes=None,
2368                          expected_salt=None,
2369                          authenticator_subkey=None,
2370                          preauth_key=None,
2371                          armor_key=None,
2372                          armor_tgt=None,
2373                          armor_subkey=None,
2374                          auth_data=None,
2375                          kdc_options='',
2376                          inner_req=None,
2377                          outer_req=None,
2378                          pac_request=None,
2379                          pac_options=None,
2380                          expect_edata=None,
2381                          expect_pac=True,
2382                          expect_claims=True,
2383                          expect_upn_dns_info_ex=None,
2384                          expect_pac_attrs=None,
2385                          expect_pac_attrs_pac_request=None,
2386                          expect_requester_sid=None,
2387                          to_rodc=False):
2388         if expected_error_mode == 0:
2389             expected_error_mode = ()
2390         elif not isinstance(expected_error_mode, collections.abc.Container):
2391             expected_error_mode = (expected_error_mode,)
2392
2393         kdc_exchange_dict = {
2394             'req_msg_type': KRB_AS_REQ,
2395             'req_asn1Spec': krb5_asn1.AS_REQ,
2396             'rep_msg_type': KRB_AS_REP,
2397             'rep_asn1Spec': krb5_asn1.AS_REP,
2398             'rep_encpart_asn1Spec': krb5_asn1.EncASRepPart,
2399             'expected_crealm': expected_crealm,
2400             'expected_cname': expected_cname,
2401             'expected_anon': expected_anon,
2402             'expected_srealm': expected_srealm,
2403             'expected_sname': expected_sname,
2404             'expected_account_name': expected_account_name,
2405             'expected_upn_name': expected_upn_name,
2406             'expected_sid': expected_sid,
2407             'expected_supported_etypes': expected_supported_etypes,
2408             'expected_flags': expected_flags,
2409             'unexpected_flags': unexpected_flags,
2410             'ticket_decryption_key': ticket_decryption_key,
2411             'expect_ticket_checksum': expect_ticket_checksum,
2412             'generate_fast_fn': generate_fast_fn,
2413             'generate_fast_armor_fn': generate_fast_armor_fn,
2414             'generate_fast_padata_fn': generate_fast_padata_fn,
2415             'fast_armor_type': fast_armor_type,
2416             'generate_padata_fn': generate_padata_fn,
2417             'check_error_fn': check_error_fn,
2418             'check_rep_fn': check_rep_fn,
2419             'check_kdc_private_fn': check_kdc_private_fn,
2420             'callback_dict': callback_dict,
2421             'expected_error_mode': expected_error_mode,
2422             'expected_status': expected_status,
2423             'client_as_etypes': client_as_etypes,
2424             'expected_salt': expected_salt,
2425             'authenticator_subkey': authenticator_subkey,
2426             'preauth_key': preauth_key,
2427             'armor_key': armor_key,
2428             'armor_tgt': armor_tgt,
2429             'armor_subkey': armor_subkey,
2430             'auth_data': auth_data,
2431             'kdc_options': kdc_options,
2432             'inner_req': inner_req,
2433             'outer_req': outer_req,
2434             'pac_request': pac_request,
2435             'pac_options': pac_options,
2436             'expect_edata': expect_edata,
2437             'expect_pac': expect_pac,
2438             'expect_claims': expect_claims,
2439             'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2440             'expect_pac_attrs': expect_pac_attrs,
2441             'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2442             'expect_requester_sid': expect_requester_sid,
2443             'to_rodc': to_rodc
2444         }
2445         if callback_dict is None:
2446             callback_dict = {}
2447
2448         return kdc_exchange_dict
2449
2450     def tgs_exchange_dict(self,
2451                           expected_crealm=None,
2452                           expected_cname=None,
2453                           expected_anon=False,
2454                           expected_srealm=None,
2455                           expected_sname=None,
2456                           expected_account_name=None,
2457                           expected_upn_name=None,
2458                           expected_sid=None,
2459                           expected_supported_etypes=None,
2460                           expected_flags=None,
2461                           unexpected_flags=None,
2462                           ticket_decryption_key=None,
2463                           expect_ticket_checksum=None,
2464                           generate_fast_fn=None,
2465                           generate_fast_armor_fn=None,
2466                           generate_fast_padata_fn=None,
2467                           fast_armor_type=FX_FAST_ARMOR_AP_REQUEST,
2468                           generate_padata_fn=None,
2469                           check_error_fn=None,
2470                           check_rep_fn=None,
2471                           check_kdc_private_fn=None,
2472                           expected_error_mode=0,
2473                           expected_status=None,
2474                           callback_dict=None,
2475                           tgt=None,
2476                           armor_key=None,
2477                           armor_tgt=None,
2478                           armor_subkey=None,
2479                           authenticator_subkey=None,
2480                           auth_data=None,
2481                           body_checksum_type=None,
2482                           kdc_options='',
2483                           inner_req=None,
2484                           outer_req=None,
2485                           pac_request=None,
2486                           pac_options=None,
2487                           expect_edata=None,
2488                           expect_pac=True,
2489                           expect_claims=True,
2490                           expect_upn_dns_info_ex=None,
2491                           expect_pac_attrs=None,
2492                           expect_pac_attrs_pac_request=None,
2493                           expect_requester_sid=None,
2494                           expected_proxy_target=None,
2495                           expected_transited_services=None,
2496                           to_rodc=False):
2497         if expected_error_mode == 0:
2498             expected_error_mode = ()
2499         elif not isinstance(expected_error_mode, collections.abc.Container):
2500             expected_error_mode = (expected_error_mode,)
2501
2502         kdc_exchange_dict = {
2503             'req_msg_type': KRB_TGS_REQ,
2504             'req_asn1Spec': krb5_asn1.TGS_REQ,
2505             'rep_msg_type': KRB_TGS_REP,
2506             'rep_asn1Spec': krb5_asn1.TGS_REP,
2507             'rep_encpart_asn1Spec': krb5_asn1.EncTGSRepPart,
2508             'expected_crealm': expected_crealm,
2509             'expected_cname': expected_cname,
2510             'expected_anon': expected_anon,
2511             'expected_srealm': expected_srealm,
2512             'expected_sname': expected_sname,
2513             'expected_account_name': expected_account_name,
2514             'expected_upn_name': expected_upn_name,
2515             'expected_sid': expected_sid,
2516             'expected_supported_etypes': expected_supported_etypes,
2517             'expected_flags': expected_flags,
2518             'unexpected_flags': unexpected_flags,
2519             'ticket_decryption_key': ticket_decryption_key,
2520             'expect_ticket_checksum': expect_ticket_checksum,
2521             'generate_fast_fn': generate_fast_fn,
2522             'generate_fast_armor_fn': generate_fast_armor_fn,
2523             'generate_fast_padata_fn': generate_fast_padata_fn,
2524             'fast_armor_type': fast_armor_type,
2525             'generate_padata_fn': generate_padata_fn,
2526             'check_error_fn': check_error_fn,
2527             'check_rep_fn': check_rep_fn,
2528             'check_kdc_private_fn': check_kdc_private_fn,
2529             'callback_dict': callback_dict,
2530             'expected_error_mode': expected_error_mode,
2531             'expected_status': expected_status,
2532             'tgt': tgt,
2533             'body_checksum_type': body_checksum_type,
2534             'armor_key': armor_key,
2535             'armor_tgt': armor_tgt,
2536             'armor_subkey': armor_subkey,
2537             'auth_data': auth_data,
2538             'authenticator_subkey': authenticator_subkey,
2539             'kdc_options': kdc_options,
2540             'inner_req': inner_req,
2541             'outer_req': outer_req,
2542             'pac_request': pac_request,
2543             'pac_options': pac_options,
2544             'expect_edata': expect_edata,
2545             'expect_pac': expect_pac,
2546             'expect_claims': expect_claims,
2547             'expect_upn_dns_info_ex': expect_upn_dns_info_ex,
2548             'expect_pac_attrs': expect_pac_attrs,
2549             'expect_pac_attrs_pac_request': expect_pac_attrs_pac_request,
2550             'expect_requester_sid': expect_requester_sid,
2551             'expected_proxy_target': expected_proxy_target,
2552             'expected_transited_services': expected_transited_services,
2553             'to_rodc': to_rodc
2554         }
2555         if callback_dict is None:
2556             callback_dict = {}
2557
2558         return kdc_exchange_dict
2559
2560     def generic_check_kdc_rep(self,
2561                               kdc_exchange_dict,
2562                               callback_dict,
2563                               rep):
2564
2565         expected_crealm = kdc_exchange_dict['expected_crealm']
2566         expected_anon = kdc_exchange_dict['expected_anon']
2567         expected_srealm = kdc_exchange_dict['expected_srealm']
2568         expected_sname = kdc_exchange_dict['expected_sname']
2569         ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2570         check_kdc_private_fn = kdc_exchange_dict['check_kdc_private_fn']
2571         rep_encpart_asn1Spec = kdc_exchange_dict['rep_encpart_asn1Spec']
2572         msg_type = kdc_exchange_dict['rep_msg_type']
2573         armor_key = kdc_exchange_dict['armor_key']
2574
2575         self.assertElementEqual(rep, 'msg-type', msg_type)  # AS-REP | TGS-REP
2576         padata = self.getElementValue(rep, 'padata')
2577         if self.strict_checking:
2578             self.assertElementEqualUTF8(rep, 'crealm', expected_crealm)
2579             if expected_anon:
2580                 expected_cname = self.PrincipalName_create(
2581                     name_type=NT_WELLKNOWN,
2582                     names=['WELLKNOWN', 'ANONYMOUS'])
2583             else:
2584                 expected_cname = kdc_exchange_dict['expected_cname']
2585             self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
2586         self.assertElementPresent(rep, 'ticket')
2587         ticket = self.getElementValue(rep, 'ticket')
2588         ticket_encpart = None
2589         ticket_cipher = None
2590         self.assertIsNotNone(ticket)
2591         if ticket is not None:  # Never None, but gives indentation
2592             self.assertElementEqual(ticket, 'tkt-vno', 5)
2593             self.assertElementEqualUTF8(ticket, 'realm', expected_srealm)
2594             self.assertElementEqualPrincipal(ticket, 'sname', expected_sname)
2595             self.assertElementPresent(ticket, 'enc-part')
2596             ticket_encpart = self.getElementValue(ticket, 'enc-part')
2597             self.assertIsNotNone(ticket_encpart)
2598             if ticket_encpart is not None:  # Never None, but gives indentation
2599                 self.assertElementPresent(ticket_encpart, 'etype')
2600
2601                 kdc_options = kdc_exchange_dict['kdc_options']
2602                 pos = len(tuple(krb5_asn1.KDCOptions('enc-tkt-in-skey'))) - 1
2603                 expect_kvno = (pos >= len(kdc_options)
2604                                or kdc_options[pos] != '1')
2605                 if expect_kvno:
2606                     # 'unspecified' means present, with any value != 0
2607                     self.assertElementKVNO(ticket_encpart, 'kvno',
2608                                            self.unspecified_kvno)
2609                 else:
2610                     # For user-to-user, don't expect a kvno.
2611                     self.assertElementMissing(ticket_encpart, 'kvno')
2612
2613                 self.assertElementPresent(ticket_encpart, 'cipher')
2614                 ticket_cipher = self.getElementValue(ticket_encpart, 'cipher')
2615         self.assertElementPresent(rep, 'enc-part')
2616         encpart = self.getElementValue(rep, 'enc-part')
2617         encpart_cipher = None
2618         self.assertIsNotNone(encpart)
2619         if encpart is not None:  # Never None, but gives indentation
2620             self.assertElementPresent(encpart, 'etype')
2621             self.assertElementKVNO(ticket_encpart, 'kvno', 'autodetect')
2622             self.assertElementPresent(encpart, 'cipher')
2623             encpart_cipher = self.getElementValue(encpart, 'cipher')
2624
2625         ticket_checksum = None
2626
2627         # Get the decryption key for the encrypted part
2628         encpart_decryption_key, encpart_decryption_usage = (
2629             self.get_preauth_key(kdc_exchange_dict))
2630
2631         if armor_key is not None:
2632             pa_dict = self.get_pa_dict(padata)
2633
2634             if PADATA_FX_FAST in pa_dict:
2635                 fx_fast_data = pa_dict[PADATA_FX_FAST]
2636                 fast_response = self.check_fx_fast_data(kdc_exchange_dict,
2637                                                         fx_fast_data,
2638                                                         armor_key,
2639                                                         finished=True)
2640
2641                 if 'strengthen-key' in fast_response:
2642                     strengthen_key = self.EncryptionKey_import(
2643                         fast_response['strengthen-key'])
2644                     encpart_decryption_key = (
2645                         self.generate_strengthen_reply_key(
2646                             strengthen_key,
2647                             encpart_decryption_key))
2648
2649                 fast_finished = fast_response.get('finished')
2650                 if fast_finished is not None:
2651                     ticket_checksum = fast_finished['ticket-checksum']
2652
2653                 self.check_rep_padata(kdc_exchange_dict,
2654                                       callback_dict,
2655                                       fast_response['padata'],
2656                                       error_code=0)
2657
2658         ticket_private = None
2659         if ticket_decryption_key is not None:
2660             self.assertElementEqual(ticket_encpart, 'etype',
2661                                     ticket_decryption_key.etype)
2662             self.assertElementKVNO(ticket_encpart, 'kvno',
2663                                    ticket_decryption_key.kvno)
2664             ticket_decpart = ticket_decryption_key.decrypt(KU_TICKET,
2665                                                            ticket_cipher)
2666             ticket_private = self.der_decode(
2667                 ticket_decpart,
2668                 asn1Spec=krb5_asn1.EncTicketPart())
2669
2670         encpart_private = None
2671         self.assertIsNotNone(encpart_decryption_key)
2672         if encpart_decryption_key is not None:
2673             self.assertElementEqual(encpart, 'etype',
2674                                     encpart_decryption_key.etype)
2675             if self.strict_checking:
2676                 self.assertElementKVNO(encpart, 'kvno',
2677                                        encpart_decryption_key.kvno)
2678             rep_decpart = encpart_decryption_key.decrypt(
2679                 encpart_decryption_usage,
2680                 encpart_cipher)
2681             # MIT KDC encodes both EncASRepPart and EncTGSRepPart with
2682             # application tag 26
2683             try:
2684                 encpart_private = self.der_decode(
2685                     rep_decpart,
2686                     asn1Spec=rep_encpart_asn1Spec())
2687             except Exception:
2688                 encpart_private = self.der_decode(
2689                     rep_decpart,
2690                     asn1Spec=krb5_asn1.EncTGSRepPart())
2691
2692         self.assertIsNotNone(check_kdc_private_fn)
2693         if check_kdc_private_fn is not None:
2694             check_kdc_private_fn(kdc_exchange_dict, callback_dict,
2695                                  rep, ticket_private, encpart_private,
2696                                  ticket_checksum)
2697
2698         return rep
2699
2700     def check_fx_fast_data(self,
2701                            kdc_exchange_dict,
2702                            fx_fast_data,
2703                            armor_key,
2704                            finished=False,
2705                            expect_strengthen_key=True):
2706         fx_fast_data = self.der_decode(fx_fast_data,
2707                                        asn1Spec=krb5_asn1.PA_FX_FAST_REPLY())
2708
2709         enc_fast_rep = fx_fast_data['armored-data']['enc-fast-rep']
2710         self.assertEqual(enc_fast_rep['etype'], armor_key.etype)
2711
2712         fast_rep = armor_key.decrypt(KU_FAST_REP, enc_fast_rep['cipher'])
2713
2714         fast_response = self.der_decode(fast_rep,
2715                                         asn1Spec=krb5_asn1.KrbFastResponse())
2716
2717         if expect_strengthen_key and self.strict_checking:
2718             self.assertIn('strengthen-key', fast_response)
2719
2720         if finished:
2721             self.assertIn('finished', fast_response)
2722
2723         # Ensure that the nonce matches the nonce in the body of the request
2724         # (RFC6113 5.4.3).
2725         nonce = kdc_exchange_dict['nonce']
2726         self.assertEqual(nonce, fast_response['nonce'])
2727
2728         return fast_response
2729
2730     def generic_check_kdc_private(self,
2731                                   kdc_exchange_dict,
2732                                   callback_dict,
2733                                   rep,
2734                                   ticket_private,
2735                                   encpart_private,
2736                                   ticket_checksum):
2737         kdc_options = kdc_exchange_dict['kdc_options']
2738         canon_pos = len(tuple(krb5_asn1.KDCOptions('canonicalize'))) - 1
2739         canonicalize = (canon_pos < len(kdc_options)
2740                         and kdc_options[canon_pos] == '1')
2741         renewable_pos = len(tuple(krb5_asn1.KDCOptions('renewable'))) - 1
2742         renewable = (renewable_pos < len(kdc_options)
2743                      and kdc_options[renewable_pos] == '1')
2744         renew_pos = len(tuple(krb5_asn1.KDCOptions('renew'))) - 1
2745         renew = (renew_pos < len(kdc_options)
2746                  and kdc_options[renew_pos] == '1')
2747         expect_renew_till = renewable or renew
2748
2749         expected_crealm = kdc_exchange_dict['expected_crealm']
2750         expected_cname = kdc_exchange_dict['expected_cname']
2751         expected_srealm = kdc_exchange_dict['expected_srealm']
2752         expected_sname = kdc_exchange_dict['expected_sname']
2753         ticket_decryption_key = kdc_exchange_dict['ticket_decryption_key']
2754
2755         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2756
2757         expected_flags = kdc_exchange_dict.get('expected_flags')
2758         unexpected_flags = kdc_exchange_dict.get('unexpected_flags')
2759
2760         ticket = self.getElementValue(rep, 'ticket')
2761
2762         if ticket_checksum is not None:
2763             armor_key = kdc_exchange_dict['armor_key']
2764             self.verify_ticket_checksum(ticket, ticket_checksum, armor_key)
2765
2766         to_rodc = kdc_exchange_dict['to_rodc']
2767         if to_rodc:
2768             krbtgt_creds = self.get_rodc_krbtgt_creds()
2769         else:
2770             krbtgt_creds = self.get_krbtgt_creds()
2771         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
2772
2773         krbtgt_keys = [krbtgt_key]
2774         if not self.strict_checking:
2775             krbtgt_key_rc4 = self.TicketDecryptionKey_from_creds(
2776                 krbtgt_creds,
2777                 etype=kcrypto.Enctype.RC4)
2778             krbtgt_keys.append(krbtgt_key_rc4)
2779
2780         if self.expect_pac and self.is_tgs(expected_sname):
2781             expect_pac = True
2782         else:
2783             expect_pac = kdc_exchange_dict['expect_pac']
2784
2785         ticket_session_key = None
2786         if ticket_private is not None:
2787             self.assertElementFlags(ticket_private, 'flags',
2788                                     expected_flags,
2789                                     unexpected_flags)
2790             self.assertElementPresent(ticket_private, 'key')
2791             ticket_key = self.getElementValue(ticket_private, 'key')
2792             self.assertIsNotNone(ticket_key)
2793             if ticket_key is not None:  # Never None, but gives indentation
2794                 self.assertElementPresent(ticket_key, 'keytype')
2795                 self.assertElementPresent(ticket_key, 'keyvalue')
2796                 ticket_session_key = self.EncryptionKey_import(ticket_key)
2797             self.assertElementEqualUTF8(ticket_private, 'crealm',
2798                                         expected_crealm)
2799             if self.strict_checking:
2800                 self.assertElementEqualPrincipal(ticket_private, 'cname',
2801                                                  expected_cname)
2802             self.assertElementPresent(ticket_private, 'transited')
2803             self.assertElementPresent(ticket_private, 'authtime')
2804             if self.strict_checking:
2805                 self.assertElementPresent(ticket_private, 'starttime')
2806             self.assertElementPresent(ticket_private, 'endtime')
2807             if expect_renew_till:
2808                 if self.strict_checking:
2809                     self.assertElementPresent(ticket_private, 'renew-till')
2810             else:
2811                 self.assertElementMissing(ticket_private, 'renew-till')
2812             if self.strict_checking:
2813                 self.assertElementEqual(ticket_private, 'caddr', [])
2814             if expect_pac is not None:
2815                 self.assertElementPresent(ticket_private, 'authorization-data',
2816                                           expect_empty=not expect_pac)
2817
2818         encpart_session_key = None
2819         if encpart_private is not None:
2820             self.assertElementPresent(encpart_private, 'key')
2821             encpart_key = self.getElementValue(encpart_private, 'key')
2822             self.assertIsNotNone(encpart_key)
2823             if encpart_key is not None:  # Never None, but gives indentation
2824                 self.assertElementPresent(encpart_key, 'keytype')
2825                 self.assertElementPresent(encpart_key, 'keyvalue')
2826                 encpart_session_key = self.EncryptionKey_import(encpart_key)
2827             self.assertElementPresent(encpart_private, 'last-req')
2828             self.assertElementEqual(encpart_private, 'nonce',
2829                                     kdc_exchange_dict['nonce'])
2830             if rep_msg_type == KRB_AS_REP:
2831                 if self.strict_checking:
2832                     self.assertElementPresent(encpart_private,
2833                                               'key-expiration')
2834             else:
2835                 self.assertElementMissing(encpart_private,
2836                                           'key-expiration')
2837             self.assertElementFlags(encpart_private, 'flags',
2838                                     expected_flags,
2839                                     unexpected_flags)
2840             self.assertElementPresent(encpart_private, 'authtime')
2841             if self.strict_checking:
2842                 self.assertElementPresent(encpart_private, 'starttime')
2843             self.assertElementPresent(encpart_private, 'endtime')
2844             if expect_renew_till:
2845                 if self.strict_checking:
2846                     self.assertElementPresent(encpart_private, 'renew-till')
2847             else:
2848                 self.assertElementMissing(encpart_private, 'renew-till')
2849             self.assertElementEqualUTF8(encpart_private, 'srealm',
2850                                         expected_srealm)
2851             self.assertElementEqualPrincipal(encpart_private, 'sname',
2852                                              expected_sname)
2853             if self.strict_checking:
2854                 self.assertElementEqual(encpart_private, 'caddr', [])
2855
2856             sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
2857
2858             if self.strict_checking:
2859                 if canonicalize or '1' in sent_pac_options:
2860                     self.assertElementPresent(encpart_private,
2861                                               'encrypted-pa-data')
2862                     enc_pa_dict = self.get_pa_dict(
2863                         encpart_private['encrypted-pa-data'])
2864                     if canonicalize:
2865                         self.assertIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2866
2867                         expected_supported_etypes = kdc_exchange_dict[
2868                             'expected_supported_etypes']
2869                         expected_supported_etypes |= (
2870                             security.KERB_ENCTYPE_DES_CBC_CRC |
2871                             security.KERB_ENCTYPE_DES_CBC_MD5 |
2872                             security.KERB_ENCTYPE_RC4_HMAC_MD5)
2873
2874                         (supported_etypes,) = struct.unpack(
2875                             '<L',
2876                             enc_pa_dict[PADATA_SUPPORTED_ETYPES])
2877
2878                         self.assertEqual(supported_etypes,
2879                                          expected_supported_etypes)
2880                     else:
2881                         self.assertNotIn(PADATA_SUPPORTED_ETYPES, enc_pa_dict)
2882
2883                     if '1' in sent_pac_options:
2884                         self.assertIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2885
2886                         pac_options = self.der_decode(
2887                             enc_pa_dict[PADATA_PAC_OPTIONS],
2888                             asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
2889
2890                         self.assertElementEqual(pac_options, 'options',
2891                                                 sent_pac_options)
2892                     else:
2893                         self.assertNotIn(PADATA_PAC_OPTIONS, enc_pa_dict)
2894                 else:
2895                     self.assertElementEqual(encpart_private,
2896                                             'encrypted-pa-data',
2897                                             [])
2898
2899         if ticket_session_key is not None and encpart_session_key is not None:
2900             self.assertEqual(ticket_session_key.etype,
2901                              encpart_session_key.etype)
2902             self.assertEqual(ticket_session_key.key.contents,
2903                              encpart_session_key.key.contents)
2904         if encpart_session_key is not None:
2905             session_key = encpart_session_key
2906         else:
2907             session_key = ticket_session_key
2908         ticket_creds = KerberosTicketCreds(
2909             ticket,
2910             session_key,
2911             crealm=expected_crealm,
2912             cname=expected_cname,
2913             srealm=expected_srealm,
2914             sname=expected_sname,
2915             decryption_key=ticket_decryption_key,
2916             ticket_private=ticket_private,
2917             encpart_private=encpart_private)
2918
2919         if ticket_private is not None:
2920             pac_data = self.get_ticket_pac(ticket_creds, expect_pac=expect_pac)
2921             if expect_pac is True:
2922                 self.assertIsNotNone(pac_data)
2923             elif expect_pac is False:
2924                 self.assertIsNone(pac_data)
2925
2926             if pac_data is not None:
2927                 self.check_pac_buffers(pac_data, kdc_exchange_dict)
2928
2929         expect_ticket_checksum = kdc_exchange_dict['expect_ticket_checksum']
2930         if expect_ticket_checksum:
2931             self.assertIsNotNone(ticket_decryption_key)
2932
2933         if ticket_decryption_key is not None:
2934             service_ticket = (not self.is_tgs(expected_sname)
2935                               and rep_msg_type == KRB_TGS_REP)
2936             self.verify_ticket(ticket_creds, krbtgt_keys,
2937                                service_ticket=service_ticket,
2938                                expect_pac=expect_pac,
2939                                expect_ticket_checksum=expect_ticket_checksum
2940                                or self.tkt_sig_support)
2941
2942         kdc_exchange_dict['rep_ticket_creds'] = ticket_creds
2943
2944     def check_pac_buffers(self, pac_data, kdc_exchange_dict):
2945         pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
2946
2947         rep_msg_type = kdc_exchange_dict['rep_msg_type']
2948         armor_tgt = kdc_exchange_dict['armor_tgt']
2949
2950         expected_sname = kdc_exchange_dict['expected_sname']
2951         expect_claims = kdc_exchange_dict['expect_claims']
2952
2953         expected_types = [krb5pac.PAC_TYPE_LOGON_INFO,
2954                           krb5pac.PAC_TYPE_SRV_CHECKSUM,
2955                           krb5pac.PAC_TYPE_KDC_CHECKSUM,
2956                           krb5pac.PAC_TYPE_LOGON_NAME,
2957                           krb5pac.PAC_TYPE_UPN_DNS_INFO]
2958
2959         kdc_options = kdc_exchange_dict['kdc_options']
2960         pos = len(tuple(krb5_asn1.KDCOptions('cname-in-addl-tkt'))) - 1
2961         constrained_delegation = (pos < len(kdc_options)
2962                                   and kdc_options[pos] == '1')
2963         if constrained_delegation:
2964             expected_types.append(krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION)
2965
2966         if self.kdc_fast_support:
2967             if expect_claims:
2968                 expected_types.append(krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
2969
2970             if (rep_msg_type == KRB_TGS_REP
2971                     and armor_tgt is not None):
2972                 expected_types.append(krb5pac.PAC_TYPE_DEVICE_INFO)
2973                 expected_types.append(krb5pac.PAC_TYPE_DEVICE_CLAIMS_INFO)
2974
2975         if not self.is_tgs(expected_sname) and rep_msg_type == KRB_TGS_REP:
2976             expected_types.append(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2977
2978         require_strict = {krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO}
2979         if not self.tkt_sig_support:
2980             require_strict.add(krb5pac.PAC_TYPE_TICKET_CHECKSUM)
2981
2982         expect_extra_pac_buffers = self.is_tgs(expected_sname)
2983
2984         expect_pac_attrs = kdc_exchange_dict['expect_pac_attrs']
2985
2986         if expect_pac_attrs:
2987             expect_pac_attrs_pac_request = kdc_exchange_dict[
2988                 'expect_pac_attrs_pac_request']
2989         else:
2990             expect_pac_attrs_pac_request = kdc_exchange_dict[
2991                 'pac_request']
2992
2993         if expect_pac_attrs is None:
2994             if self.expect_extra_pac_buffers:
2995                 expect_pac_attrs = expect_extra_pac_buffers
2996             else:
2997                 require_strict.add(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
2998         if expect_pac_attrs:
2999             expected_types.append(krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
3000
3001         expect_requester_sid = kdc_exchange_dict['expect_requester_sid']
3002
3003         if expect_requester_sid is None:
3004             if self.expect_extra_pac_buffers:
3005                 expect_requester_sid = expect_extra_pac_buffers
3006             else:
3007                 require_strict.add(krb5pac.PAC_TYPE_REQUESTER_SID)
3008         if expect_requester_sid:
3009             expected_types.append(krb5pac.PAC_TYPE_REQUESTER_SID)
3010
3011         buffer_types = [pac_buffer.type
3012                         for pac_buffer in pac.buffers]
3013         self.assertSequenceElementsEqual(
3014             expected_types, buffer_types,
3015             require_ordered=False,
3016             require_strict=require_strict)
3017
3018         expected_account_name = kdc_exchange_dict['expected_account_name']
3019         expected_sid = kdc_exchange_dict['expected_sid']
3020
3021         expect_upn_dns_info_ex = kdc_exchange_dict['expect_upn_dns_info_ex']
3022         if expect_upn_dns_info_ex is None and (
3023                 expected_account_name is not None
3024                 or expected_sid is not None):
3025             expect_upn_dns_info_ex = True
3026
3027         for pac_buffer in pac.buffers:
3028             if pac_buffer.type == krb5pac.PAC_TYPE_CONSTRAINED_DELEGATION:
3029                 expected_proxy_target = kdc_exchange_dict[
3030                     'expected_proxy_target']
3031                 expected_transited_services = kdc_exchange_dict[
3032                     'expected_transited_services']
3033
3034                 delegation_info = pac_buffer.info.info
3035
3036                 self.assertEqual(expected_proxy_target,
3037                                  str(delegation_info.proxy_target))
3038
3039                 transited_services = list(map(
3040                     str, delegation_info.transited_services))
3041                 self.assertEqual(expected_transited_services,
3042                                  transited_services)
3043
3044             elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
3045                 expected_cname = kdc_exchange_dict['expected_cname']
3046                 account_name = expected_cname['name-string'][0]
3047
3048                 self.assertEqual(account_name, pac_buffer.info.account_name)
3049
3050             elif pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
3051                 logon_info = pac_buffer.info.info.info3.base
3052
3053                 if expected_account_name is not None:
3054                     self.assertEqual(expected_account_name,
3055                                      str(logon_info.account_name))
3056
3057                 if expected_sid is not None:
3058                     expected_rid = int(expected_sid.rsplit('-', 1)[1])
3059                     self.assertEqual(expected_rid, logon_info.rid)
3060
3061             elif pac_buffer.type == krb5pac.PAC_TYPE_UPN_DNS_INFO:
3062                 upn_dns_info = pac_buffer.info
3063                 upn_dns_info_ex = upn_dns_info.ex
3064
3065                 expected_realm = kdc_exchange_dict['expected_crealm']
3066                 self.assertEqual(expected_realm,
3067                                  upn_dns_info.dns_domain_name)
3068
3069                 expected_upn_name = kdc_exchange_dict['expected_upn_name']
3070                 if expected_upn_name is not None:
3071                     self.assertEqual(expected_upn_name,
3072                                      upn_dns_info.upn_name)
3073
3074                 if expect_upn_dns_info_ex:
3075                     self.assertIsNotNone(upn_dns_info_ex)
3076
3077                 if upn_dns_info_ex is not None:
3078                     if expected_account_name is not None:
3079                         self.assertEqual(expected_account_name,
3080                                          upn_dns_info_ex.samaccountname)
3081
3082                     if expected_sid is not None:
3083                         self.assertEqual(expected_sid,
3084                                          str(upn_dns_info_ex.objectsid))
3085
3086             elif (pac_buffer.type == krb5pac.PAC_TYPE_ATTRIBUTES_INFO
3087                       and expect_pac_attrs):
3088                 attr_info = pac_buffer.info
3089
3090                 self.assertEqual(2, attr_info.flags_length)
3091
3092                 flags = attr_info.flags
3093
3094                 requested_pac = bool(flags & 1)
3095                 given_pac = bool(flags & 2)
3096
3097                 self.assertEqual(expect_pac_attrs_pac_request is True,
3098                                  requested_pac)
3099                 self.assertEqual(expect_pac_attrs_pac_request is None,
3100                                  given_pac)
3101
3102             elif (pac_buffer.type == krb5pac.PAC_TYPE_REQUESTER_SID
3103                       and expect_requester_sid):
3104                 requester_sid = pac_buffer.info.sid
3105
3106                 if expected_sid is not None:
3107                     self.assertEqual(expected_sid, str(requester_sid))
3108
3109     def generic_check_kdc_error(self,
3110                                 kdc_exchange_dict,
3111                                 callback_dict,
3112                                 rep,
3113                                 inner=False):
3114
3115         rep_msg_type = kdc_exchange_dict['rep_msg_type']
3116
3117         expected_anon = kdc_exchange_dict['expected_anon']
3118         expected_srealm = kdc_exchange_dict['expected_srealm']
3119         expected_sname = kdc_exchange_dict['expected_sname']
3120         expected_error_mode = kdc_exchange_dict['expected_error_mode']
3121
3122         sent_fast = self.sent_fast(kdc_exchange_dict)
3123
3124         fast_armor_type = kdc_exchange_dict['fast_armor_type']
3125
3126         self.assertElementEqual(rep, 'pvno', 5)
3127         self.assertElementEqual(rep, 'msg-type', KRB_ERROR)
3128         error_code = self.getElementValue(rep, 'error-code')
3129         self.assertIn(error_code, expected_error_mode)
3130         if self.strict_checking:
3131             self.assertElementMissing(rep, 'ctime')
3132             self.assertElementMissing(rep, 'cusec')
3133         self.assertElementPresent(rep, 'stime')
3134         self.assertElementPresent(rep, 'susec')
3135         # error-code checked above
3136         if self.strict_checking:
3137             self.assertElementMissing(rep, 'crealm')
3138             if expected_anon and not inner:
3139                 expected_cname = self.PrincipalName_create(
3140                     name_type=NT_WELLKNOWN,
3141                     names=['WELLKNOWN', 'ANONYMOUS'])
3142                 self.assertElementEqualPrincipal(rep, 'cname', expected_cname)
3143             else:
3144                 self.assertElementMissing(rep, 'cname')
3145             self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
3146             self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
3147             self.assertElementMissing(rep, 'e-text')
3148         expected_status = kdc_exchange_dict['expected_status']
3149         expect_edata = kdc_exchange_dict['expect_edata']
3150         if expect_edata is None:
3151             expect_edata = (error_code != KDC_ERR_UNKNOWN_CRITICAL_FAST_OPTIONS
3152                             and (not sent_fast or fast_armor_type is None
3153                                  or fast_armor_type == FX_FAST_ARMOR_AP_REQUEST)
3154                             and not inner)
3155         if not expect_edata:
3156             self.assertIsNone(expected_status)
3157             self.assertElementMissing(rep, 'e-data')
3158             return rep
3159         edata = self.getElementValue(rep, 'e-data')
3160         if self.strict_checking:
3161             self.assertIsNotNone(edata)
3162         if edata is not None:
3163             if rep_msg_type == KRB_TGS_REP and not sent_fast:
3164                 error_data = self.der_decode(
3165                     edata,
3166                     asn1Spec=krb5_asn1.KERB_ERROR_DATA())
3167                 self.assertEqual(KERB_ERR_TYPE_EXTENDED,
3168                                  error_data['data-type'])
3169
3170                 extended_error = error_data['data-value']
3171
3172                 self.assertEqual(12, len(extended_error))
3173
3174                 status = int.from_bytes(extended_error[:4], 'little')
3175                 flags = int.from_bytes(extended_error[8:], 'little')
3176
3177                 self.assertEqual(expected_status, status)
3178
3179                 self.assertEqual(3, flags)
3180             else:
3181                 self.assertIsNone(expected_status)
3182
3183                 rep_padata = self.der_decode(edata,
3184                                              asn1Spec=krb5_asn1.METHOD_DATA())
3185                 self.assertGreater(len(rep_padata), 0)
3186
3187                 if sent_fast:
3188                     self.assertEqual(1, len(rep_padata))
3189                     rep_pa_dict = self.get_pa_dict(rep_padata)
3190                     self.assertIn(PADATA_FX_FAST, rep_pa_dict)
3191
3192                     armor_key = kdc_exchange_dict['armor_key']
3193                     self.assertIsNotNone(armor_key)
3194                     fast_response = self.check_fx_fast_data(
3195                         kdc_exchange_dict,
3196                         rep_pa_dict[PADATA_FX_FAST],
3197                         armor_key,
3198                         expect_strengthen_key=False)
3199
3200                     rep_padata = fast_response['padata']
3201
3202                 etype_info2 = self.check_rep_padata(kdc_exchange_dict,
3203                                                     callback_dict,
3204                                                     rep_padata,
3205                                                     error_code)
3206
3207                 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
3208
3209         return rep
3210
3211     def check_rep_padata(self,
3212                          kdc_exchange_dict,
3213                          callback_dict,
3214                          rep_padata,
3215                          error_code):
3216         rep_msg_type = kdc_exchange_dict['rep_msg_type']
3217
3218         req_body = kdc_exchange_dict['req_body']
3219         proposed_etypes = req_body['etype']
3220         client_as_etypes = kdc_exchange_dict.get('client_as_etypes', [])
3221
3222         sent_fast = self.sent_fast(kdc_exchange_dict)
3223         sent_enc_challenge = self.sent_enc_challenge(kdc_exchange_dict)
3224
3225         if rep_msg_type == KRB_TGS_REP:
3226             self.assertTrue(sent_fast)
3227
3228         expect_etype_info2 = ()
3229         expect_etype_info = False
3230         expected_aes_type = 0
3231         expected_rc4_type = 0
3232         if kcrypto.Enctype.RC4 in proposed_etypes:
3233             expect_etype_info = True
3234         for etype in proposed_etypes:
3235             if etype not in client_as_etypes:
3236                 continue
3237             if etype in (kcrypto.Enctype.AES256, kcrypto.Enctype.AES128):
3238                 expect_etype_info = False
3239                 if etype > expected_aes_type:
3240                     expected_aes_type = etype
3241             if etype in (kcrypto.Enctype.RC4,) and error_code != 0:
3242                 if etype > expected_rc4_type:
3243                     expected_rc4_type = etype
3244
3245         if expected_aes_type != 0:
3246             expect_etype_info2 += (expected_aes_type,)
3247         if expected_rc4_type != 0:
3248             expect_etype_info2 += (expected_rc4_type,)
3249
3250         expected_patypes = ()
3251         if sent_fast and error_code != 0:
3252             expected_patypes += (PADATA_FX_ERROR,)
3253             expected_patypes += (PADATA_FX_COOKIE,)
3254
3255         if rep_msg_type == KRB_TGS_REP:
3256             sent_pac_options = self.get_sent_pac_options(kdc_exchange_dict)
3257             if ('1' in sent_pac_options
3258                     and error_code not in (0, KDC_ERR_GENERIC)):
3259                 expected_patypes += (PADATA_PAC_OPTIONS,)
3260         elif error_code != KDC_ERR_GENERIC:
3261             if expect_etype_info:
3262                 self.assertGreater(len(expect_etype_info2), 0)
3263                 expected_patypes += (PADATA_ETYPE_INFO,)
3264             if len(expect_etype_info2) != 0:
3265                 expected_patypes += (PADATA_ETYPE_INFO2,)
3266
3267             if error_code != KDC_ERR_PREAUTH_FAILED:
3268                 if sent_fast:
3269                     expected_patypes += (PADATA_ENCRYPTED_CHALLENGE,)
3270                 else:
3271                     expected_patypes += (PADATA_ENC_TIMESTAMP,)
3272
3273                 if not sent_enc_challenge:
3274                     expected_patypes += (PADATA_PK_AS_REQ,)
3275                     expected_patypes += (PADATA_PK_AS_REP_19,)
3276
3277             if (self.kdc_fast_support
3278                     and not sent_fast
3279                     and not sent_enc_challenge):
3280                 expected_patypes += (PADATA_FX_FAST,)
3281                 expected_patypes += (PADATA_FX_COOKIE,)
3282
3283         got_patypes = tuple(pa['padata-type'] for pa in rep_padata)
3284         self.assertSequenceElementsEqual(expected_patypes, got_patypes,
3285                                          require_strict={PADATA_FX_COOKIE,
3286                                                          PADATA_FX_FAST,
3287                                                          PADATA_PAC_OPTIONS,
3288                                                          PADATA_PK_AS_REP_19,
3289                                                          PADATA_PK_AS_REQ})
3290
3291         if not expected_patypes:
3292             return None
3293
3294         pa_dict = self.get_pa_dict(rep_padata)
3295
3296         enc_timestamp = pa_dict.get(PADATA_ENC_TIMESTAMP)
3297         if enc_timestamp is not None:
3298             self.assertEqual(len(enc_timestamp), 0)
3299
3300         pk_as_req = pa_dict.get(PADATA_PK_AS_REQ)
3301         if pk_as_req is not None:
3302             self.assertEqual(len(pk_as_req), 0)
3303
3304         pk_as_rep19 = pa_dict.get(PADATA_PK_AS_REP_19)
3305         if pk_as_rep19 is not None:
3306             self.assertEqual(len(pk_as_rep19), 0)
3307
3308         fx_fast = pa_dict.get(PADATA_FX_FAST)
3309         if fx_fast is not None:
3310             self.assertEqual(len(fx_fast), 0)
3311
3312         fast_cookie = pa_dict.get(PADATA_FX_COOKIE)
3313         if fast_cookie is not None:
3314             kdc_exchange_dict['fast_cookie'] = fast_cookie
3315
3316         fast_error = pa_dict.get(PADATA_FX_ERROR)
3317         if fast_error is not None:
3318             fast_error = self.der_decode(fast_error,
3319                                          asn1Spec=krb5_asn1.KRB_ERROR())
3320             self.generic_check_kdc_error(kdc_exchange_dict,
3321                                          callback_dict,
3322                                          fast_error,
3323                                          inner=True)
3324
3325         pac_options = pa_dict.get(PADATA_PAC_OPTIONS)
3326         if pac_options is not None:
3327             pac_options = self.der_decode(
3328                 pac_options,
3329                 asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
3330             self.assertElementEqual(pac_options, 'options', sent_pac_options)
3331
3332         enc_challenge = pa_dict.get(PADATA_ENCRYPTED_CHALLENGE)
3333         if enc_challenge is not None:
3334             if not sent_enc_challenge:
3335                 self.assertEqual(len(enc_challenge), 0)
3336             else:
3337                 armor_key = kdc_exchange_dict['armor_key']
3338                 self.assertIsNotNone(armor_key)
3339
3340                 preauth_key, _ = self.get_preauth_key(kdc_exchange_dict)
3341
3342                 kdc_challenge_key = self.generate_kdc_challenge_key(
3343                     armor_key, preauth_key)
3344
3345                 # Ensure that the encrypted challenge FAST factor is supported
3346                 # (RFC6113 5.4.6).
3347                 if self.strict_checking:
3348                     self.assertNotEqual(len(enc_challenge), 0)
3349                 if len(enc_challenge) != 0:
3350                     encrypted_challenge = self.der_decode(
3351                         enc_challenge,
3352                         asn1Spec=krb5_asn1.EncryptedData())
3353                     self.assertEqual(encrypted_challenge['etype'],
3354                                      kdc_challenge_key.etype)
3355
3356                     challenge = kdc_challenge_key.decrypt(
3357                         KU_ENC_CHALLENGE_KDC,
3358                         encrypted_challenge['cipher'])
3359                     challenge = self.der_decode(
3360                         challenge,
3361                         asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
3362
3363                     # Retrieve the returned timestamp.
3364                     rep_patime = challenge['patimestamp']
3365                     self.assertIn('pausec', challenge)
3366
3367                     # Ensure the returned time is within five minutes of the
3368                     # current time.
3369                     rep_time = self.get_EpochFromKerberosTime(rep_patime)
3370                     current_time = time.time()
3371
3372                     self.assertLess(current_time - 300, rep_time)
3373                     self.assertLess(rep_time, current_time + 300)
3374
3375         etype_info2 = pa_dict.get(PADATA_ETYPE_INFO2)
3376         if etype_info2 is not None:
3377             etype_info2 = self.der_decode(etype_info2,
3378                                           asn1Spec=krb5_asn1.ETYPE_INFO2())
3379             self.assertGreaterEqual(len(etype_info2), 1)
3380             if self.strict_checking:
3381                 self.assertEqual(len(etype_info2), len(expect_etype_info2))
3382             for i in range(0, len(etype_info2)):
3383                 e = self.getElementValue(etype_info2[i], 'etype')
3384                 if self.strict_checking:
3385                     self.assertEqual(e, expect_etype_info2[i])
3386                 salt = self.getElementValue(etype_info2[i], 'salt')
3387                 if e == kcrypto.Enctype.RC4:
3388                     if self.strict_checking:
3389                         self.assertIsNone(salt)
3390                 else:
3391                     self.assertIsNotNone(salt)
3392                     expected_salt = kdc_exchange_dict['expected_salt']
3393                     if expected_salt is not None:
3394                         self.assertEqual(salt, expected_salt)
3395                 s2kparams = self.getElementValue(etype_info2[i], 's2kparams')
3396                 if self.strict_checking:
3397                     self.assertIsNone(s2kparams)
3398
3399         etype_info = pa_dict.get(PADATA_ETYPE_INFO)
3400         if etype_info is not None:
3401             etype_info = self.der_decode(etype_info,
3402                                          asn1Spec=krb5_asn1.ETYPE_INFO())
3403             self.assertEqual(len(etype_info), 1)
3404             e = self.getElementValue(etype_info[0], 'etype')
3405             self.assertEqual(e, kcrypto.Enctype.RC4)
3406             self.assertEqual(e, expect_etype_info2[0])
3407             salt = self.getElementValue(etype_info[0], 'salt')
3408             if self.strict_checking:
3409                 self.assertIsNotNone(salt)
3410                 self.assertEqual(len(salt), 0)
3411
3412         return etype_info2
3413
3414     def generate_simple_fast(self,
3415                              kdc_exchange_dict,
3416                              _callback_dict,
3417                              req_body,
3418                              fast_padata,
3419                              fast_armor,
3420                              checksum,
3421                              fast_options=''):
3422         armor_key = kdc_exchange_dict['armor_key']
3423
3424         fast_req = self.KRB_FAST_REQ_create(fast_options,
3425                                             fast_padata,
3426                                             req_body)
3427         fast_req = self.der_encode(fast_req,
3428                                    asn1Spec=krb5_asn1.KrbFastReq())
3429         fast_req = self.EncryptedData_create(armor_key,
3430                                              KU_FAST_ENC,
3431                                              fast_req)
3432
3433         fast_armored_req = self.KRB_FAST_ARMORED_REQ_create(fast_armor,
3434                                                             checksum,
3435                                                             fast_req)
3436
3437         fx_fast_request = self.PA_FX_FAST_REQUEST_create(fast_armored_req)
3438         fx_fast_request = self.der_encode(
3439             fx_fast_request,
3440             asn1Spec=krb5_asn1.PA_FX_FAST_REQUEST())
3441
3442         fast_padata = self.PA_DATA_create(PADATA_FX_FAST,
3443                                           fx_fast_request)
3444
3445         return fast_padata
3446
3447     def generate_ap_req(self,
3448                         kdc_exchange_dict,
3449                         _callback_dict,
3450                         req_body,
3451                         armor,
3452                         usage=None,
3453                         seq_number=None):
3454         req_body_checksum = None
3455
3456         if armor:
3457             self.assertIsNone(req_body)
3458
3459             tgt = kdc_exchange_dict['armor_tgt']
3460             authenticator_subkey = kdc_exchange_dict['armor_subkey']
3461         else:
3462             tgt = kdc_exchange_dict['tgt']
3463             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3464
3465             if req_body is not None:
3466                 body_checksum_type = kdc_exchange_dict['body_checksum_type']
3467
3468                 req_body_blob = self.der_encode(
3469                     req_body, asn1Spec=krb5_asn1.KDC_REQ_BODY())
3470
3471                 req_body_checksum = self.Checksum_create(
3472                     tgt.session_key,
3473                     KU_TGS_REQ_AUTH_CKSUM,
3474                     req_body_blob,
3475                     ctype=body_checksum_type)
3476
3477         auth_data = kdc_exchange_dict['auth_data']
3478
3479         subkey_obj = None
3480         if authenticator_subkey is not None:
3481             subkey_obj = authenticator_subkey.export_obj()
3482         if seq_number is None:
3483             seq_number = random.randint(0, 0xfffffffe)
3484         (ctime, cusec) = self.get_KerberosTimeWithUsec()
3485         authenticator_obj = self.Authenticator_create(
3486             crealm=tgt.crealm,
3487             cname=tgt.cname,
3488             cksum=req_body_checksum,
3489             cusec=cusec,
3490             ctime=ctime,
3491             subkey=subkey_obj,
3492             seq_number=seq_number,
3493             authorization_data=auth_data)
3494         authenticator_blob = self.der_encode(
3495             authenticator_obj,
3496             asn1Spec=krb5_asn1.Authenticator())
3497
3498         if usage is None:
3499             usage = KU_AP_REQ_AUTH if armor else KU_TGS_REQ_AUTH
3500         authenticator = self.EncryptedData_create(tgt.session_key,
3501                                                   usage,
3502                                                   authenticator_blob)
3503
3504         ap_options = krb5_asn1.APOptions('0')
3505         ap_req_obj = self.AP_REQ_create(ap_options=str(ap_options),
3506                                         ticket=tgt.ticket,
3507                                         authenticator=authenticator)
3508         ap_req = self.der_encode(ap_req_obj, asn1Spec=krb5_asn1.AP_REQ())
3509
3510         return ap_req
3511
3512     def generate_simple_tgs_padata(self,
3513                                    kdc_exchange_dict,
3514                                    callback_dict,
3515                                    req_body):
3516         ap_req = self.generate_ap_req(kdc_exchange_dict,
3517                                       callback_dict,
3518                                       req_body,
3519                                       armor=False)
3520         pa_tgs_req = self.PA_DATA_create(PADATA_KDC_REQ, ap_req)
3521         padata = [pa_tgs_req]
3522
3523         return padata, req_body
3524
3525     def get_preauth_key(self, kdc_exchange_dict):
3526         msg_type = kdc_exchange_dict['rep_msg_type']
3527
3528         if msg_type == KRB_AS_REP:
3529             key = kdc_exchange_dict['preauth_key']
3530             usage = KU_AS_REP_ENC_PART
3531         else:  # KRB_TGS_REP
3532             authenticator_subkey = kdc_exchange_dict['authenticator_subkey']
3533             if authenticator_subkey is not None:
3534                 key = authenticator_subkey
3535                 usage = KU_TGS_REP_ENC_PART_SUB_KEY
3536             else:
3537                 tgt = kdc_exchange_dict['tgt']
3538                 key = tgt.session_key
3539                 usage = KU_TGS_REP_ENC_PART_SESSION
3540
3541         self.assertIsNotNone(key)
3542
3543         return key, usage
3544
3545     def generate_armor_key(self, subkey, session_key):
3546         armor_key = kcrypto.cf2(subkey.key,
3547                                 session_key.key,
3548                                 b'subkeyarmor',
3549                                 b'ticketarmor')
3550         armor_key = Krb5EncryptionKey(armor_key, None)
3551
3552         return armor_key
3553
3554     def generate_strengthen_reply_key(self, strengthen_key, reply_key):
3555         strengthen_reply_key = kcrypto.cf2(strengthen_key.key,
3556                                            reply_key.key,
3557                                            b'strengthenkey',
3558                                            b'replykey')
3559         strengthen_reply_key = Krb5EncryptionKey(strengthen_reply_key,
3560                                                  reply_key.kvno)
3561
3562         return strengthen_reply_key
3563
3564     def generate_client_challenge_key(self, armor_key, longterm_key):
3565         client_challenge_key = kcrypto.cf2(armor_key.key,
3566                                            longterm_key.key,
3567                                            b'clientchallengearmor',
3568                                            b'challengelongterm')
3569         client_challenge_key = Krb5EncryptionKey(client_challenge_key, None)
3570
3571         return client_challenge_key
3572
3573     def generate_kdc_challenge_key(self, armor_key, longterm_key):
3574         kdc_challenge_key = kcrypto.cf2(armor_key.key,
3575                                         longterm_key.key,
3576                                         b'kdcchallengearmor',
3577                                         b'challengelongterm')
3578         kdc_challenge_key = Krb5EncryptionKey(kdc_challenge_key, None)
3579
3580         return kdc_challenge_key
3581
3582     def verify_ticket_checksum(self, ticket, expected_checksum, armor_key):
3583         expected_type = expected_checksum['cksumtype']
3584         self.assertEqual(armor_key.ctype, expected_type)
3585
3586         ticket_blob = self.der_encode(ticket,
3587                                       asn1Spec=krb5_asn1.Ticket())
3588         checksum = self.Checksum_create(armor_key,
3589                                         KU_FAST_FINISHED,
3590                                         ticket_blob)
3591         self.assertEqual(expected_checksum, checksum)
3592
3593     def verify_ticket(self, ticket, krbtgt_keys, service_ticket,
3594                       expect_pac=True,
3595                       expect_ticket_checksum=True):
3596         # Decrypt the ticket.
3597
3598         key = ticket.decryption_key
3599         enc_part = ticket.ticket['enc-part']
3600
3601         self.assertElementEqual(enc_part, 'etype', key.etype)
3602         self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3603
3604         enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3605         enc_part = self.der_decode(
3606             enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3607
3608         # Fetch the authorization data from the ticket.
3609         auth_data = enc_part.get('authorization-data')
3610         if expect_pac:
3611             self.assertIsNotNone(auth_data)
3612         elif auth_data is None:
3613             return
3614
3615         # Get a copy of the authdata with an empty PAC, and the existing PAC
3616         # (if present).
3617         empty_pac = self.get_empty_pac()
3618         auth_data, pac_data = self.replace_pac(auth_data,
3619                                                empty_pac,
3620                                                expect_pac=expect_pac)
3621         if not expect_pac:
3622             return
3623
3624         # Unpack the PAC as both PAC_DATA and PAC_DATA_RAW types. We use the
3625         # raw type to create a new PAC with zeroed signatures for
3626         # verification. This is because on Windows, the resource_groups field
3627         # is added to PAC_LOGON_INFO after the info3 field has been created,
3628         # which results in a different ordering of pointer values than Samba
3629         # (see commit 0e201ecdc53). Using the raw type avoids changing
3630         # PAC_LOGON_INFO, so verification against Windows can work. We still
3631         # need the PAC_DATA type to retrieve the actual checksums, because the
3632         # signatures in the raw type may contain padding bytes.
3633         pac = ndr_unpack(krb5pac.PAC_DATA,
3634                          pac_data)
3635         raw_pac = ndr_unpack(krb5pac.PAC_DATA_RAW,
3636                              pac_data)
3637
3638         checksums = {}
3639
3640         for pac_buffer, raw_pac_buffer in zip(pac.buffers, raw_pac.buffers):
3641             buffer_type = pac_buffer.type
3642             if buffer_type in self.pac_checksum_types:
3643                 self.assertNotIn(buffer_type, checksums,
3644                                  f'Duplicate checksum type {buffer_type}')
3645
3646                 # Fetch the checksum and the checksum type from the PAC buffer.
3647                 checksum = pac_buffer.info.signature
3648                 ctype = pac_buffer.info.type
3649                 if ctype & 1 << 31:
3650                     ctype |= -1 << 31
3651
3652                 checksums[buffer_type] = checksum, ctype
3653
3654                 if buffer_type != krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3655                     # Zero the checksum field so that we can later verify the
3656                     # checksums. The ticket checksum field is not zeroed.
3657
3658                     signature = ndr_unpack(
3659                         krb5pac.PAC_SIGNATURE_DATA,
3660                         raw_pac_buffer.info.remaining)
3661                     signature.signature = bytes(len(checksum))
3662                     raw_pac_buffer.info.remaining = ndr_pack(
3663                         signature)
3664
3665         # Re-encode the PAC.
3666         pac_data = ndr_pack(raw_pac)
3667
3668         # Verify the signatures.
3669
3670         server_checksum, server_ctype = checksums[
3671             krb5pac.PAC_TYPE_SRV_CHECKSUM]
3672         key.verify_checksum(KU_NON_KERB_CKSUM_SALT,
3673                             pac_data,
3674                             server_ctype,
3675                             server_checksum)
3676
3677         kdc_checksum, kdc_ctype = checksums[
3678             krb5pac.PAC_TYPE_KDC_CHECKSUM]
3679
3680         if isinstance(krbtgt_keys, collections.abc.Container):
3681             if self.strict_checking:
3682                 krbtgt_key = krbtgt_keys[0]
3683             else:
3684                 krbtgt_key = next(key for key in krbtgt_keys
3685                                   if key.ctype == kdc_ctype)
3686         else:
3687             krbtgt_key = krbtgt_keys
3688
3689         krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3690                                         server_checksum,
3691                                         kdc_ctype,
3692                                         kdc_checksum)
3693
3694         if not service_ticket:
3695             self.assertNotIn(krb5pac.PAC_TYPE_TICKET_CHECKSUM, checksums)
3696         else:
3697             ticket_checksum, ticket_ctype = checksums.get(
3698                 krb5pac.PAC_TYPE_TICKET_CHECKSUM,
3699                 (None, None))
3700             if expect_ticket_checksum:
3701                 self.assertIsNotNone(ticket_checksum)
3702             elif expect_ticket_checksum is False:
3703                 self.assertIsNone(ticket_checksum)
3704             if ticket_checksum is not None:
3705                 enc_part['authorization-data'] = auth_data
3706                 enc_part = self.der_encode(enc_part,
3707                                            asn1Spec=krb5_asn1.EncTicketPart())
3708
3709                 krbtgt_key.verify_rodc_checksum(KU_NON_KERB_CKSUM_SALT,
3710                                                 enc_part,
3711                                                 ticket_ctype,
3712                                                 ticket_checksum)
3713
3714     def modified_ticket(self,
3715                         ticket, *,
3716                         new_ticket_key=None,
3717                         modify_fn=None,
3718                         modify_pac_fn=None,
3719                         exclude_pac=False,
3720                         allow_empty_authdata=False,
3721                         update_pac_checksums=True,
3722                         checksum_keys=None,
3723                         include_checksums=None):
3724         if checksum_keys is None:
3725             # A dict containing a key for each checksum type to be created in
3726             # the PAC.
3727             checksum_keys = {}
3728
3729         if include_checksums is None:
3730             # A dict containing a value for each checksum type; True if the
3731             # checksum type is to be included in the PAC, False if it is to be
3732             # excluded, or None/not present if the checksum is to be included
3733             # based on its presence in the original PAC.
3734             include_checksums = {}
3735
3736         # Check that the values passed in by the caller make sense.
3737
3738         self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
3739         self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
3740
3741         if exclude_pac:
3742             self.assertIsNone(modify_pac_fn)
3743
3744             update_pac_checksums = False
3745
3746         if not update_pac_checksums:
3747             self.assertFalse(checksum_keys)
3748             self.assertFalse(include_checksums)
3749
3750         expect_pac = modify_pac_fn is not None
3751
3752         key = ticket.decryption_key
3753
3754         if new_ticket_key is None:
3755             # Use the same key to re-encrypt the ticket.
3756             new_ticket_key = key
3757
3758         if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
3759             # If the server signature key is not present, fall back to the key
3760             # used to encrypt the ticket.
3761             checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
3762
3763         if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
3764             # If the ticket signature key is not present, fall back to the key
3765             # used for the KDC signature.
3766             kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
3767             if kdc_checksum_key is not None:
3768                 checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
3769                     kdc_checksum_key)
3770
3771         # Decrypt the ticket.
3772
3773         enc_part = ticket.ticket['enc-part']
3774
3775         self.assertElementEqual(enc_part, 'etype', key.etype)
3776         self.assertElementKVNO(enc_part, 'kvno', key.kvno)
3777
3778         enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
3779         enc_part = self.der_decode(
3780             enc_part, asn1Spec=krb5_asn1.EncTicketPart())
3781
3782         # Modify the ticket here.
3783         if modify_fn is not None:
3784             enc_part = modify_fn(enc_part)
3785
3786         auth_data = enc_part.get('authorization-data')
3787         if expect_pac:
3788             self.assertIsNotNone(auth_data)
3789         if auth_data is not None:
3790             new_pac = None
3791             if not exclude_pac:
3792                 # Get a copy of the authdata with an empty PAC, and the
3793                 # existing PAC (if present).
3794                 empty_pac = self.get_empty_pac()
3795                 empty_pac_auth_data, pac_data = self.replace_pac(
3796                     auth_data,
3797                     empty_pac,
3798                     expect_pac=expect_pac)
3799
3800                 if pac_data is not None:
3801                     pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
3802
3803                     # Modify the PAC here.
3804                     if modify_pac_fn is not None:
3805                         pac = modify_pac_fn(pac)
3806
3807                     if update_pac_checksums:
3808                         # Get the enc-part with an empty PAC, which is needed
3809                         # to create a ticket signature.
3810                         enc_part_to_sign = enc_part.copy()
3811                         enc_part_to_sign['authorization-data'] = (
3812                             empty_pac_auth_data)
3813                         enc_part_to_sign = self.der_encode(
3814                             enc_part_to_sign,
3815                             asn1Spec=krb5_asn1.EncTicketPart())
3816
3817                         self.update_pac_checksums(pac,
3818                                                   checksum_keys,
3819                                                   include_checksums,
3820                                                   enc_part_to_sign)
3821
3822                     # Re-encode the PAC.
3823                     pac_data = ndr_pack(pac)
3824                     new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
3825                                                             pac_data)
3826
3827             # Replace the PAC in the authorization data and re-add it to the
3828             # ticket enc-part.
3829             auth_data, _ = self.replace_pac(
3830                 auth_data, new_pac,
3831                 expect_pac=expect_pac,
3832                 allow_empty_authdata=allow_empty_authdata)
3833             enc_part['authorization-data'] = auth_data
3834
3835         # Re-encrypt the ticket enc-part with the new key.
3836         enc_part_new = self.der_encode(enc_part,
3837                                        asn1Spec=krb5_asn1.EncTicketPart())
3838         enc_part_new = self.EncryptedData_create(new_ticket_key,
3839                                                  KU_TICKET,
3840                                                  enc_part_new)
3841
3842         # Create a copy of the ticket with the new enc-part.
3843         new_ticket = ticket.ticket.copy()
3844         new_ticket['enc-part'] = enc_part_new
3845
3846         new_ticket_creds = KerberosTicketCreds(
3847             new_ticket,
3848             session_key=ticket.session_key,
3849             crealm=ticket.crealm,
3850             cname=ticket.cname,
3851             srealm=ticket.srealm,
3852             sname=ticket.sname,
3853             decryption_key=new_ticket_key,
3854             ticket_private=enc_part,
3855             encpart_private=ticket.encpart_private)
3856
3857         return new_ticket_creds
3858
3859     def update_pac_checksums(self,
3860                              pac,
3861                              checksum_keys,
3862                              include_checksums,
3863                              enc_part=None):
3864         pac_buffers = pac.buffers
3865         checksum_buffers = {}
3866
3867         # Find the relevant PAC checksum buffers.
3868         for pac_buffer in pac_buffers:
3869             buffer_type = pac_buffer.type
3870             if buffer_type in self.pac_checksum_types:
3871                 self.assertNotIn(buffer_type, checksum_buffers,
3872                                  f'Duplicate checksum type {buffer_type}')
3873
3874                 checksum_buffers[buffer_type] = pac_buffer
3875
3876         # Create any additional buffers that were requested but not
3877         # present. Conversely, remove any buffers that were requested to be
3878         # removed.
3879         for buffer_type in self.pac_checksum_types:
3880             if buffer_type in checksum_buffers:
3881                 if include_checksums.get(buffer_type) is False:
3882                     checksum_buffer = checksum_buffers.pop(buffer_type)
3883
3884                     pac.num_buffers -= 1
3885                     pac_buffers.remove(checksum_buffer)
3886
3887             elif include_checksums.get(buffer_type) is True:
3888                 info = krb5pac.PAC_SIGNATURE_DATA()
3889
3890                 checksum_buffer = krb5pac.PAC_BUFFER()
3891                 checksum_buffer.type = buffer_type
3892                 checksum_buffer.info = info
3893
3894                 pac_buffers.append(checksum_buffer)
3895                 pac.num_buffers += 1
3896
3897                 checksum_buffers[buffer_type] = checksum_buffer
3898
3899         # Fill the relevant checksum buffers.
3900         for buffer_type, checksum_buffer in checksum_buffers.items():
3901             checksum_key = checksum_keys[buffer_type]
3902             ctype = checksum_key.ctype & ((1 << 32) - 1)
3903
3904             if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
3905                 self.assertIsNotNone(enc_part)
3906
3907                 signature = checksum_key.make_rodc_checksum(
3908                     KU_NON_KERB_CKSUM_SALT,
3909                     enc_part)
3910
3911             elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
3912                 signature = checksum_key.make_zeroed_checksum()
3913
3914             else:
3915                 signature = checksum_key.make_rodc_zeroed_checksum()
3916
3917             checksum_buffer.info.signature = signature
3918             checksum_buffer.info.type = ctype
3919
3920         # Add the new checksum buffers to the PAC.
3921         pac.buffers = pac_buffers
3922
3923         # Calculate the server and KDC checksums and insert them into the PAC.
3924
3925         server_checksum_buffer = checksum_buffers.get(
3926             krb5pac.PAC_TYPE_SRV_CHECKSUM)
3927         if server_checksum_buffer is not None:
3928             server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
3929
3930             pac_data = ndr_pack(pac)
3931             server_checksum = server_checksum_key.make_checksum(
3932                 KU_NON_KERB_CKSUM_SALT,
3933                 pac_data)
3934
3935             server_checksum_buffer.info.signature = server_checksum
3936
3937         kdc_checksum_buffer = checksum_buffers.get(
3938             krb5pac.PAC_TYPE_KDC_CHECKSUM)
3939         if kdc_checksum_buffer is not None:
3940             if server_checksum_buffer is None:
3941                 # There's no server signature to make the checksum over, so
3942                 # just make the checksum over an empty bytes object.
3943                 server_checksum = bytes()
3944
3945             kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
3946
3947             kdc_checksum = kdc_checksum_key.make_rodc_checksum(
3948                 KU_NON_KERB_CKSUM_SALT,
3949                 server_checksum)
3950
3951             kdc_checksum_buffer.info.signature = kdc_checksum
3952
3953     def replace_pac(self, auth_data, new_pac, expect_pac=True,
3954                     allow_empty_authdata=False):
3955         if new_pac is not None:
3956             self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)
3957             self.assertElementPresent(new_pac, 'ad-data')
3958
3959         new_auth_data = []
3960
3961         ad_relevant = None
3962         old_pac = None
3963
3964         for authdata_elem in auth_data:
3965             if authdata_elem['ad-type'] == AD_IF_RELEVANT:
3966                 ad_relevant = self.der_decode(
3967                     authdata_elem['ad-data'],
3968                     asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3969
3970                 relevant_elems = []
3971                 for relevant_elem in ad_relevant:
3972                     if relevant_elem['ad-type'] == AD_WIN2K_PAC:
3973                         self.assertIsNone(old_pac, 'Multiple PACs detected')
3974                         old_pac = relevant_elem['ad-data']
3975
3976                         if new_pac is not None:
3977                             relevant_elems.append(new_pac)
3978                     else:
3979                         relevant_elems.append(relevant_elem)
3980                 if expect_pac:
3981                     self.assertIsNotNone(old_pac, 'Expected PAC')
3982
3983                 if relevant_elems or allow_empty_authdata:
3984                     ad_relevant = self.der_encode(
3985                         relevant_elems,
3986                         asn1Spec=krb5_asn1.AD_IF_RELEVANT())
3987
3988                     authdata_elem = self.AuthorizationData_create(
3989                         AD_IF_RELEVANT,
3990                         ad_relevant)
3991                 else:
3992                     authdata_elem = None
3993
3994             if authdata_elem is not None or allow_empty_authdata:
3995                 new_auth_data.append(authdata_elem)
3996
3997         if expect_pac:
3998             self.assertIsNotNone(ad_relevant, 'Expected AD-RELEVANT')
3999
4000         return new_auth_data, old_pac
4001
4002     def get_pac(self, auth_data, expect_pac=True):
4003         _, pac = self.replace_pac(auth_data, None, expect_pac)
4004         return pac
4005
4006     def get_ticket_pac(self, ticket, expect_pac=True):
4007         auth_data = ticket.ticket_private.get('authorization-data')
4008         if expect_pac:
4009             self.assertIsNotNone(auth_data)
4010         elif auth_data is None:
4011             return None
4012
4013         return self.get_pac(auth_data, expect_pac=expect_pac)
4014
4015     def get_krbtgt_checksum_key(self):
4016         krbtgt_creds = self.get_krbtgt_creds()
4017         krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
4018
4019         return {
4020             krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
4021         }
4022
4023     def is_tgs(self, principal):
4024         name = principal['name-string'][0]
4025         return name in ('krbtgt', b'krbtgt')
4026
4027     def is_tgt(self, ticket):
4028         sname = ticket.ticket['sname']
4029         return self.is_tgs(sname)
4030
4031     def get_empty_pac(self):
4032         return self.AuthorizationData_create(AD_WIN2K_PAC, bytes(1))
4033
4034     def get_outer_pa_dict(self, kdc_exchange_dict):
4035         return self.get_pa_dict(kdc_exchange_dict['req_padata'])
4036
4037     def get_fast_pa_dict(self, kdc_exchange_dict):
4038         req_pa_dict = self.get_pa_dict(kdc_exchange_dict['fast_padata'])
4039
4040         if req_pa_dict:
4041             return req_pa_dict
4042
4043         return self.get_outer_pa_dict(kdc_exchange_dict)
4044
4045     def sent_fast(self, kdc_exchange_dict):
4046         outer_pa_dict = self.get_outer_pa_dict(kdc_exchange_dict)
4047
4048         return PADATA_FX_FAST in outer_pa_dict
4049
4050     def sent_enc_challenge(self, kdc_exchange_dict):
4051         fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4052
4053         return PADATA_ENCRYPTED_CHALLENGE in fast_pa_dict
4054
4055     def get_sent_pac_options(self, kdc_exchange_dict):
4056         fast_pa_dict = self.get_fast_pa_dict(kdc_exchange_dict)
4057
4058         if PADATA_PAC_OPTIONS not in fast_pa_dict:
4059             return ''
4060
4061         pac_options = self.der_decode(fast_pa_dict[PADATA_PAC_OPTIONS],
4062                                       asn1Spec=krb5_asn1.PA_PAC_OPTIONS())
4063         pac_options = pac_options['options']
4064
4065         # Mask out unsupported bits.
4066         pac_options, remaining = pac_options[:4], pac_options[4:]
4067         pac_options += '0' * len(remaining)
4068
4069         return pac_options
4070
4071     def get_krbtgt_sname(self):
4072         krbtgt_creds = self.get_krbtgt_creds()
4073         krbtgt_username = krbtgt_creds.get_username()
4074         krbtgt_realm = krbtgt_creds.get_realm()
4075         krbtgt_sname = self.PrincipalName_create(
4076             name_type=NT_SRV_INST, names=[krbtgt_username, krbtgt_realm])
4077
4078         return krbtgt_sname
4079
4080     def _test_as_exchange(self,
4081                           cname,
4082                           realm,
4083                           sname,
4084                           till,
4085                           client_as_etypes,
4086                           expected_error_mode,
4087                           expected_crealm,
4088                           expected_cname,
4089                           expected_srealm,
4090                           expected_sname,
4091                           expected_salt,
4092                           etypes,
4093                           padata,
4094                           kdc_options,
4095                           expected_account_name=None,
4096                           expected_upn_name=None,
4097                           expected_sid=None,
4098                           expected_flags=None,
4099                           unexpected_flags=None,
4100                           expected_supported_etypes=None,
4101                           preauth_key=None,
4102                           ticket_decryption_key=None,
4103                           pac_request=None,
4104                           pac_options=None,
4105                           expect_pac=True,
4106                           expect_pac_attrs=None,
4107                           expect_pac_attrs_pac_request=None,
4108                           expect_requester_sid=None,
4109                           to_rodc=False):
4110
4111         def _generate_padata_copy(_kdc_exchange_dict,
4112                                   _callback_dict,
4113                                   req_body):
4114             return padata, req_body
4115
4116         if not expected_error_mode:
4117             check_error_fn = None
4118             check_rep_fn = self.generic_check_kdc_rep
4119         else:
4120             check_error_fn = self.generic_check_kdc_error
4121             check_rep_fn = None
4122
4123         if padata is not None:
4124             generate_padata_fn = _generate_padata_copy
4125         else:
4126             generate_padata_fn = None
4127
4128         kdc_exchange_dict = self.as_exchange_dict(
4129             expected_crealm=expected_crealm,
4130             expected_cname=expected_cname,
4131             expected_srealm=expected_srealm,
4132             expected_sname=expected_sname,
4133             expected_account_name=expected_account_name,
4134             expected_upn_name=expected_upn_name,
4135             expected_sid=expected_sid,
4136             expected_supported_etypes=expected_supported_etypes,
4137             ticket_decryption_key=ticket_decryption_key,
4138             generate_padata_fn=generate_padata_fn,
4139             check_error_fn=check_error_fn,
4140             check_rep_fn=check_rep_fn,
4141             check_kdc_private_fn=self.generic_check_kdc_private,
4142             expected_error_mode=expected_error_mode,
4143             client_as_etypes=client_as_etypes,
4144             expected_salt=expected_salt,
4145             expected_flags=expected_flags,
4146             unexpected_flags=unexpected_flags,
4147             preauth_key=preauth_key,
4148             kdc_options=str(kdc_options),
4149             pac_request=pac_request,
4150             pac_options=pac_options,
4151             expect_pac=expect_pac,
4152             expect_pac_attrs=expect_pac_attrs,
4153             expect_pac_attrs_pac_request=expect_pac_attrs_pac_request,
4154             expect_requester_sid=expect_requester_sid,
4155             to_rodc=to_rodc)
4156
4157         rep = self._generic_kdc_exchange(kdc_exchange_dict,
4158                                          cname=cname,
4159                                          realm=realm,
4160                                          sname=sname,
4161                                          till_time=till,
4162                                          etypes=etypes)
4163
4164         return rep, kdc_exchange_dict