ticket_ctype,
ticket_checksum)
+ def modified_ticket(self,
+ ticket, *,
+ new_ticket_key=None,
+ modify_fn=None,
+ modify_pac_fn=None,
+ exclude_pac=False,
+ update_pac_checksums=True,
+ checksum_keys=None,
+ include_checksums=None):
+ if checksum_keys is None:
+ # A dict containing a key for each checksum type to be created in
+ # the PAC.
+ checksum_keys = {}
+
+ if include_checksums is None:
+ # A dict containing a value for each checksum type; True if the
+ # checksum type is to be included in the PAC, False if it is to be
+ # excluded, or None/not present if the checksum is to be included
+ # based on its presence in the original PAC.
+ include_checksums = {}
+
+ # Check that the values passed in by the caller make sense.
+
+ self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
+ self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
+
+ if exclude_pac:
+ self.assertIsNone(modify_pac_fn)
+
+ update_pac_checksums = False
+
+ if not update_pac_checksums:
+ self.assertFalse(checksum_keys)
+ self.assertFalse(include_checksums)
+
+ expect_pac = update_pac_checksums or modify_pac_fn is not None
+
+ key = ticket.decryption_key
+
+ if new_ticket_key is None:
+ # Use the same key to re-encrypt the ticket.
+ new_ticket_key = key
+
+ if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
+ # If the server signature key is not present, fall back to the key
+ # used to encrypt the ticket.
+ checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
+
+ if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
+ # If the ticket signature key is not present, fall back to the key
+ # used for the KDC signature.
+ kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
+ if kdc_checksum_key is not None:
+ checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
+ kdc_checksum_key)
+
+ # Decrypt the ticket.
+
+ enc_part = ticket.ticket['enc-part']
+
+ self.assertElementEqual(enc_part, 'etype', key.etype)
+ self.assertElementKVNO(enc_part, 'kvno', key.kvno)
+
+ enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
+ enc_part = self.der_decode(
+ enc_part, asn1Spec=krb5_asn1.EncTicketPart())
+
+ # Modify the ticket here.
+ if modify_fn is not None:
+ enc_part = modify_fn(enc_part)
+
+ auth_data = enc_part.get('authorization-data')
+ if expect_pac:
+ self.assertIsNotNone(auth_data)
+ if auth_data is not None:
+ new_pac = None
+ if not exclude_pac:
+ # Get a copy of the authdata with an empty PAC, and the
+ # existing PAC (if present).
+ empty_pac = self.get_empty_pac()
+ empty_pac_auth_data, pac_data = self.replace_pac(auth_data,
+ empty_pac)
+
+ if expect_pac:
+ self.assertIsNotNone(pac_data)
+ if pac_data is not None:
+ pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
+
+ # Modify the PAC here.
+ if modify_pac_fn is not None:
+ pac = modify_pac_fn(pac)
+
+ if update_pac_checksums:
+ # Get the enc-part with an empty PAC, which is needed
+ # to create a ticket signature.
+ enc_part_to_sign = enc_part.copy()
+ enc_part_to_sign['authorization-data'] = (
+ empty_pac_auth_data)
+ enc_part_to_sign = self.der_encode(
+ enc_part_to_sign,
+ asn1Spec=krb5_asn1.EncTicketPart())
+
+ self.update_pac_checksums(pac,
+ checksum_keys,
+ include_checksums,
+ enc_part_to_sign)
+
+ # Re-encode the PAC.
+ pac_data = ndr_pack(pac)
+ new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
+ pac_data)
+
+ # Replace the PAC in the authorization data and re-add it to the
+ # ticket enc-part.
+ auth_data, _ = self.replace_pac(auth_data, new_pac)
+ enc_part['authorization-data'] = auth_data
+
+ # Re-encrypt the ticket enc-part with the new key.
+ enc_part_new = self.der_encode(enc_part,
+ asn1Spec=krb5_asn1.EncTicketPart())
+ enc_part_new = self.EncryptedData_create(new_ticket_key,
+ KU_TICKET,
+ enc_part_new)
+
+ # Create a copy of the ticket with the new enc-part.
+ new_ticket = ticket.ticket.copy()
+ new_ticket['enc-part'] = enc_part_new
+
+ new_ticket_creds = KerberosTicketCreds(
+ new_ticket,
+ session_key=ticket.session_key,
+ crealm=ticket.crealm,
+ cname=ticket.cname,
+ srealm=ticket.srealm,
+ sname=ticket.sname,
+ decryption_key=new_ticket_key,
+ ticket_private=enc_part,
+ encpart_private=ticket.encpart_private)
+
+ return new_ticket_creds
+
+ def update_pac_checksums(self,
+ pac,
+ checksum_keys,
+ include_checksums,
+ enc_part=None):
+ pac_buffers = pac.buffers
+ checksum_buffers = {}
+
+ # Find the relevant PAC checksum buffers.
+ for pac_buffer in pac_buffers:
+ buffer_type = pac_buffer.type
+ if buffer_type in self.pac_checksum_types:
+ self.assertNotIn(buffer_type, checksum_buffers,
+ f'Duplicate checksum type {buffer_type}')
+
+ checksum_buffers[buffer_type] = pac_buffer
+
+ # Create any additional buffers that were requested but not
+ # present. Conversely, remove any buffers that were requested to be
+ # removed.
+ for buffer_type in self.pac_checksum_types:
+ if buffer_type in checksum_buffers:
+ if include_checksums.get(buffer_type) is False:
+ checksum_buffer = checksum_buffers.pop(buffer_type)
+
+ pac.num_buffers -= 1
+ pac_buffers.remove(checksum_buffer)
+
+ elif include_checksums.get(buffer_type) is True:
+ info = krb5pac.PAC_SIGNATURE_DATA()
+
+ checksum_buffer = krb5pac.PAC_BUFFER()
+ checksum_buffer.type = buffer_type
+ checksum_buffer.info = info
+
+ pac_buffers.append(checksum_buffer)
+ pac.num_buffers += 1
+
+ checksum_buffers[buffer_type] = checksum_buffer
+
+ # Fill the relevant checksum buffers.
+ for buffer_type, checksum_buffer in checksum_buffers.items():
+ checksum_key = checksum_keys[buffer_type]
+ ctype = checksum_key.ctype & ((1 << 32) - 1)
+
+ if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
+ self.assertIsNotNone(enc_part)
+
+ signature = checksum_key.make_checksum(
+ KU_NON_KERB_CKSUM_SALT,
+ enc_part)
+
+ elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
+ signature = Krb5EncryptionKey.make_zeroed_checksum(
+ checksum_key)
+
+ else:
+ signature = checksum_key.make_zeroed_checksum()
+
+ checksum_buffer.info.signature = signature
+ checksum_buffer.info.type = ctype
+
+ # Add the new checksum buffers to the PAC.
+ pac.buffers = pac_buffers
+
+ # Calculate the server and KDC checksums and insert them into the PAC.
+
+ server_checksum_buffer = checksum_buffers.get(
+ krb5pac.PAC_TYPE_SRV_CHECKSUM)
+ if server_checksum_buffer is not None:
+ server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
+
+ pac_data = ndr_pack(pac)
+ server_checksum = Krb5EncryptionKey.make_checksum(
+ server_checksum_key,
+ KU_NON_KERB_CKSUM_SALT,
+ pac_data)
+
+ server_checksum_buffer.info.signature = server_checksum
+
+ kdc_checksum_buffer = checksum_buffers.get(
+ krb5pac.PAC_TYPE_KDC_CHECKSUM)
+ if kdc_checksum_buffer is not None:
+ self.assertIsNotNone(server_checksum_buffer)
+
+ kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
+
+ kdc_checksum = kdc_checksum_key.make_checksum(
+ KU_NON_KERB_CKSUM_SALT,
+ server_checksum)
+
+ kdc_checksum_buffer.info.signature = kdc_checksum
+
def replace_pac(self, auth_data, new_pac, expect_pac=True):
if new_pac is not None:
self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)