tests/krb5: Add method for modifying a ticket and creating PAC checksums
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Fri, 17 Sep 2021 03:26:12 +0000 (15:26 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 23 Sep 2021 18:32:29 +0000 (18:32 +0000)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14642

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/raw_testcase.py

index 265789cdb262174b25611942293ec1eb94a02fd0..4ac7698ffab2e6e760e9c0a576f763a9365676ea 100644 (file)
@@ -3013,6 +3013,240 @@ class RawKerberosTest(TestCaseInTempDir):
                                        ticket_ctype,
                                        ticket_checksum)
 
+    def modified_ticket(self,
+                        ticket, *,
+                        new_ticket_key=None,
+                        modify_fn=None,
+                        modify_pac_fn=None,
+                        exclude_pac=False,
+                        update_pac_checksums=True,
+                        checksum_keys=None,
+                        include_checksums=None):
+        if checksum_keys is None:
+            # A dict containing a key for each checksum type to be created in
+            # the PAC.
+            checksum_keys = {}
+
+        if include_checksums is None:
+            # A dict containing a value for each checksum type; True if the
+            # checksum type is to be included in the PAC, False if it is to be
+            # excluded, or None/not present if the checksum is to be included
+            # based on its presence in the original PAC.
+            include_checksums = {}
+
+        # Check that the values passed in by the caller make sense.
+
+        self.assertLessEqual(checksum_keys.keys(), self.pac_checksum_types)
+        self.assertLessEqual(include_checksums.keys(), self.pac_checksum_types)
+
+        if exclude_pac:
+            self.assertIsNone(modify_pac_fn)
+
+            update_pac_checksums = False
+
+        if not update_pac_checksums:
+            self.assertFalse(checksum_keys)
+            self.assertFalse(include_checksums)
+
+        expect_pac = update_pac_checksums or modify_pac_fn is not None
+
+        key = ticket.decryption_key
+
+        if new_ticket_key is None:
+            # Use the same key to re-encrypt the ticket.
+            new_ticket_key = key
+
+        if krb5pac.PAC_TYPE_SRV_CHECKSUM not in checksum_keys:
+            # If the server signature key is not present, fall back to the key
+            # used to encrypt the ticket.
+            checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM] = new_ticket_key
+
+        if krb5pac.PAC_TYPE_TICKET_CHECKSUM not in checksum_keys:
+            # If the ticket signature key is not present, fall back to the key
+            # used for the KDC signature.
+            kdc_checksum_key = checksum_keys.get(krb5pac.PAC_TYPE_KDC_CHECKSUM)
+            if kdc_checksum_key is not None:
+                checksum_keys[krb5pac.PAC_TYPE_TICKET_CHECKSUM] = (
+                    kdc_checksum_key)
+
+        # Decrypt the ticket.
+
+        enc_part = ticket.ticket['enc-part']
+
+        self.assertElementEqual(enc_part, 'etype', key.etype)
+        self.assertElementKVNO(enc_part, 'kvno', key.kvno)
+
+        enc_part = key.decrypt(KU_TICKET, enc_part['cipher'])
+        enc_part = self.der_decode(
+            enc_part, asn1Spec=krb5_asn1.EncTicketPart())
+
+        # Modify the ticket here.
+        if modify_fn is not None:
+            enc_part = modify_fn(enc_part)
+
+        auth_data = enc_part.get('authorization-data')
+        if expect_pac:
+            self.assertIsNotNone(auth_data)
+        if auth_data is not None:
+            new_pac = None
+            if not exclude_pac:
+                # Get a copy of the authdata with an empty PAC, and the
+                # existing PAC (if present).
+                empty_pac = self.get_empty_pac()
+                empty_pac_auth_data, pac_data = self.replace_pac(auth_data,
+                                                                 empty_pac)
+
+                if expect_pac:
+                    self.assertIsNotNone(pac_data)
+                if pac_data is not None:
+                    pac = ndr_unpack(krb5pac.PAC_DATA, pac_data)
+
+                    # Modify the PAC here.
+                    if modify_pac_fn is not None:
+                        pac = modify_pac_fn(pac)
+
+                    if update_pac_checksums:
+                        # Get the enc-part with an empty PAC, which is needed
+                        # to create a ticket signature.
+                        enc_part_to_sign = enc_part.copy()
+                        enc_part_to_sign['authorization-data'] = (
+                            empty_pac_auth_data)
+                        enc_part_to_sign = self.der_encode(
+                            enc_part_to_sign,
+                            asn1Spec=krb5_asn1.EncTicketPart())
+
+                        self.update_pac_checksums(pac,
+                                                  checksum_keys,
+                                                  include_checksums,
+                                                  enc_part_to_sign)
+
+                    # Re-encode the PAC.
+                    pac_data = ndr_pack(pac)
+                    new_pac = self.AuthorizationData_create(AD_WIN2K_PAC,
+                                                            pac_data)
+
+            # Replace the PAC in the authorization data and re-add it to the
+            # ticket enc-part.
+            auth_data, _ = self.replace_pac(auth_data, new_pac)
+            enc_part['authorization-data'] = auth_data
+
+        # Re-encrypt the ticket enc-part with the new key.
+        enc_part_new = self.der_encode(enc_part,
+                                       asn1Spec=krb5_asn1.EncTicketPart())
+        enc_part_new = self.EncryptedData_create(new_ticket_key,
+                                                 KU_TICKET,
+                                                 enc_part_new)
+
+        # Create a copy of the ticket with the new enc-part.
+        new_ticket = ticket.ticket.copy()
+        new_ticket['enc-part'] = enc_part_new
+
+        new_ticket_creds = KerberosTicketCreds(
+            new_ticket,
+            session_key=ticket.session_key,
+            crealm=ticket.crealm,
+            cname=ticket.cname,
+            srealm=ticket.srealm,
+            sname=ticket.sname,
+            decryption_key=new_ticket_key,
+            ticket_private=enc_part,
+            encpart_private=ticket.encpart_private)
+
+        return new_ticket_creds
+
+    def update_pac_checksums(self,
+                             pac,
+                             checksum_keys,
+                             include_checksums,
+                             enc_part=None):
+        pac_buffers = pac.buffers
+        checksum_buffers = {}
+
+        # Find the relevant PAC checksum buffers.
+        for pac_buffer in pac_buffers:
+            buffer_type = pac_buffer.type
+            if buffer_type in self.pac_checksum_types:
+                self.assertNotIn(buffer_type, checksum_buffers,
+                                 f'Duplicate checksum type {buffer_type}')
+
+                checksum_buffers[buffer_type] = pac_buffer
+
+        # Create any additional buffers that were requested but not
+        # present. Conversely, remove any buffers that were requested to be
+        # removed.
+        for buffer_type in self.pac_checksum_types:
+            if buffer_type in checksum_buffers:
+                if include_checksums.get(buffer_type) is False:
+                    checksum_buffer = checksum_buffers.pop(buffer_type)
+
+                    pac.num_buffers -= 1
+                    pac_buffers.remove(checksum_buffer)
+
+            elif include_checksums.get(buffer_type) is True:
+                info = krb5pac.PAC_SIGNATURE_DATA()
+
+                checksum_buffer = krb5pac.PAC_BUFFER()
+                checksum_buffer.type = buffer_type
+                checksum_buffer.info = info
+
+                pac_buffers.append(checksum_buffer)
+                pac.num_buffers += 1
+
+                checksum_buffers[buffer_type] = checksum_buffer
+
+        # Fill the relevant checksum buffers.
+        for buffer_type, checksum_buffer in checksum_buffers.items():
+            checksum_key = checksum_keys[buffer_type]
+            ctype = checksum_key.ctype & ((1 << 32) - 1)
+
+            if buffer_type == krb5pac.PAC_TYPE_TICKET_CHECKSUM:
+                self.assertIsNotNone(enc_part)
+
+                signature = checksum_key.make_checksum(
+                    KU_NON_KERB_CKSUM_SALT,
+                    enc_part)
+
+            elif buffer_type == krb5pac.PAC_TYPE_SRV_CHECKSUM:
+                signature = Krb5EncryptionKey.make_zeroed_checksum(
+                    checksum_key)
+
+            else:
+                signature = checksum_key.make_zeroed_checksum()
+
+            checksum_buffer.info.signature = signature
+            checksum_buffer.info.type = ctype
+
+        # Add the new checksum buffers to the PAC.
+        pac.buffers = pac_buffers
+
+        # Calculate the server and KDC checksums and insert them into the PAC.
+
+        server_checksum_buffer = checksum_buffers.get(
+            krb5pac.PAC_TYPE_SRV_CHECKSUM)
+        if server_checksum_buffer is not None:
+            server_checksum_key = checksum_keys[krb5pac.PAC_TYPE_SRV_CHECKSUM]
+
+            pac_data = ndr_pack(pac)
+            server_checksum = Krb5EncryptionKey.make_checksum(
+                server_checksum_key,
+                KU_NON_KERB_CKSUM_SALT,
+                pac_data)
+
+            server_checksum_buffer.info.signature = server_checksum
+
+        kdc_checksum_buffer = checksum_buffers.get(
+            krb5pac.PAC_TYPE_KDC_CHECKSUM)
+        if kdc_checksum_buffer is not None:
+            self.assertIsNotNone(server_checksum_buffer)
+
+            kdc_checksum_key = checksum_keys[krb5pac.PAC_TYPE_KDC_CHECKSUM]
+
+            kdc_checksum = kdc_checksum_key.make_checksum(
+                KU_NON_KERB_CKSUM_SALT,
+                server_checksum)
+
+            kdc_checksum_buffer.info.signature = kdc_checksum
+
     def replace_pac(self, auth_data, new_pac, expect_pac=True):
         if new_pac is not None:
             self.assertElementEqual(new_pac, 'ad-type', AD_WIN2K_PAC)