CVE-2022-2031 tests/krb5: Add tests for kpasswd service
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 24 May 2022 07:59:16 +0000 (19:59 +1200)
committerJule Anger <janger@samba.org>
Sun, 24 Jul 2022 09:42:02 +0000 (11:42 +0200)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
[jsutton@samba.org Fixed conflicts in usage.py and knownfails; removed
 MIT KDC 1.20-specific knownfails as it's not supported]

[jsutton@samba.org Fixed conflicts in usage.py, knownfails, and
 tests.py]

python/samba/tests/krb5/kdc_base_test.py
python/samba/tests/krb5/kpasswd_tests.py [new file with mode: 0755]
python/samba/tests/krb5/raw_testcase.py
python/samba/tests/usage.py
selftest/knownfail_heimdal_kdc
selftest/knownfail_mit_kdc
source4/selftest/tests.py

index c0ca881985a82249f07a7962d6969a5122bdec8f..f0306dde11029ce2d40d06e36b106c2bd1b57736 100644 (file)
@@ -1586,7 +1586,9 @@ class KDCBaseTest(RawKerberosTest):
         authenticator_subkey = self.RandomKey(kcrypto.Enctype.AES256)
 
         if expect_error:
-            expected_error_mode = KDC_ERR_TGT_REVOKED
+            expected_error_mode = expect_error
+            if expected_error_mode is True:
+                expected_error_mode = KDC_ERR_TGT_REVOKED
             check_error_fn = self.generic_check_kdc_error
             check_rep_fn = None
         else:
diff --git a/python/samba/tests/krb5/kpasswd_tests.py b/python/samba/tests/krb5/kpasswd_tests.py
new file mode 100755 (executable)
index 0000000..3a6c7d8
--- /dev/null
@@ -0,0 +1,1021 @@
+#!/usr/bin/env python3
+# Unix SMB/CIFS implementation.
+# Copyright (C) Stefan Metzmacher 2020
+# Copyright (C) Catalyst.Net Ltd
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import os
+import sys
+
+from functools import partial
+
+from samba import generate_random_password, unix2nttime
+from samba.dcerpc import krb5pac, security
+from samba.sd_utils import SDUtils
+
+from samba.tests.krb5.kdc_base_test import KDCBaseTest
+from samba.tests.krb5.rfc4120_constants import (
+    KDC_ERR_TGT_REVOKED,
+    KDC_ERR_TKT_EXPIRED,
+    KPASSWD_ACCESSDENIED,
+    KPASSWD_HARDERROR,
+    KPASSWD_INITIAL_FLAG_NEEDED,
+    KPASSWD_MALFORMED,
+    KPASSWD_SOFTERROR,
+    KPASSWD_SUCCESS,
+    NT_PRINCIPAL,
+    NT_SRV_INST,
+)
+
+sys.path.insert(0, 'bin/python')
+os.environ['PYTHONUNBUFFERED'] = '1'
+
+global_asn1_print = False
+global_hexdump = False
+
+
+# Note: these tests do not pass on Windows, which returns different error codes
+# to the ones we have chosen, and does not always return additional error data.
+class KpasswdTests(KDCBaseTest):
+
+    def setUp(self):
+        super().setUp()
+        self.do_asn1_print = global_asn1_print
+        self.do_hexdump = global_hexdump
+
+        samdb = self.get_samdb()
+
+        # Get the old 'dSHeuristics' if it was set
+        dsheuristics = samdb.get_dsheuristics()
+
+        # Reset the 'dSHeuristics' as they were before
+        self.addCleanup(samdb.set_dsheuristics, dsheuristics)
+
+        # Set the 'dSHeuristics' to activate the correct 'userPassword'
+        # behaviour
+        samdb.set_dsheuristics('000000001')
+
+        # Get the old 'minPwdAge'
+        minPwdAge = samdb.get_minPwdAge()
+
+        # Reset the 'minPwdAge' as it was before
+        self.addCleanup(samdb.set_minPwdAge, minPwdAge)
+
+        # Set it temporarily to '0'
+        samdb.set_minPwdAge('0')
+
+    def _get_creds(self, expired=False):
+        opts = {
+            'expired_password': expired
+        }
+
+        # Create the account.
+        creds = self.get_cached_creds(account_type=self.AccountType.USER,
+                                      opts=opts,
+                                      use_cache=False)
+
+        return creds
+
+    def issued_by_rodc(self, ticket):
+        krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
+
+        krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
+        checksum_keys = {
+            krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
+        }
+
+        return self.modified_ticket(
+            ticket,
+            new_ticket_key=krbtgt_key,
+            checksum_keys=checksum_keys)
+
+    def get_kpasswd_sname(self):
+        return self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                         names=['kadmin', 'changepw'])
+
+    def get_ticket_lifetime(self, ticket):
+        enc_part = ticket.ticket_private
+
+        authtime = enc_part['authtime']
+        starttime = enc_part.get('starttime', authtime)
+        endtime = enc_part['endtime']
+
+        starttime = self.get_EpochFromKerberosTime(starttime)
+        endtime = self.get_EpochFromKerberosTime(endtime)
+
+        return endtime - starttime
+
+    def add_requester_sid(self, pac, sid):
+        pac_buffers = pac.buffers
+
+        buffer_types = [pac_buffer.type for pac_buffer in pac_buffers]
+        self.assertNotIn(krb5pac.PAC_TYPE_REQUESTER_SID, buffer_types)
+
+        requester_sid = krb5pac.PAC_REQUESTER_SID()
+        requester_sid.sid = security.dom_sid(sid)
+
+        requester_sid_buffer = krb5pac.PAC_BUFFER()
+        requester_sid_buffer.type = krb5pac.PAC_TYPE_REQUESTER_SID
+        requester_sid_buffer.info = requester_sid
+
+        pac_buffers.append(requester_sid_buffer)
+
+        pac.buffers = pac_buffers
+        pac.num_buffers += 1
+
+        return pac
+
+    # Test setting a password with kpasswd.
+    def test_kpasswd_set(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Test the newly set password.
+        creds.update_password(new_password)
+        self.get_tgt(creds, fresh=True)
+
+    # Test changing a password with kpasswd.
+    def test_kpasswd_change(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+        # Test the newly set password.
+        creds.update_password(new_password)
+        self.get_tgt(creds, fresh=True)
+
+    # Test kpasswd without setting the canonicalize option.
+    def test_kpasswd_no_canonicalize(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        sname = self.get_kpasswd_sname()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        creds.update_password(new_password)
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              kdc_options='0')
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test kpasswd with the canonicalize option reset and a non-canonical
+    # (by conversion to title case) realm.
+    def test_kpasswd_no_canonicalize_realm_case(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        sname = self.get_kpasswd_sname()
+        realm = creds.get_realm().capitalize()  # We use a title-cased realm.
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              realm=realm,
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        creds.update_password(new_password)
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              realm=realm,
+                              kdc_options='0')
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test kpasswd with the canonicalize option set.
+    def test_kpasswd_canonicalize(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd. We set the canonicalize flag here.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='canonicalize')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        creds.update_password(new_password)
+
+        # Get an initial ticket to kpasswd. We set the canonicalize flag here.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='canonicalize')
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test kpasswd with the canonicalize option set and a non-canonical (by
+    # conversion to title case) realm.
+    def test_kpasswd_canonicalize_realm_case(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        sname = self.get_kpasswd_sname()
+        realm = creds.get_realm().capitalize()  # We use a title-cased realm.
+
+        # Get an initial ticket to kpasswd. We set the canonicalize flag here.
+        ticket = self.get_tgt(creds, sname=sname,
+                              realm=realm,
+                              kdc_options='canonicalize')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        creds.update_password(new_password)
+
+        # Get an initial ticket to kpasswd. We set the canonicalize flag here.
+        ticket = self.get_tgt(creds, sname=sname,
+                              realm=realm,
+                              kdc_options='canonicalize')
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test kpasswd rejects a password that does not meet complexity
+    # requirements.
+    def test_kpasswd_too_weak(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SOFTERROR
+        expected_msg = b'Password does not meet complexity requirements'
+
+        # Set the password.
+        new_password = 'password'
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test kpasswd rejects an empty new password.
+    def test_kpasswd_empty(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SOFTERROR, KPASSWD_HARDERROR
+        expected_msg = (b'Password too short, password must be at least 7 '
+                        b'characters long.',
+                        b'String conversion failed!')
+
+        # Set the password.
+        new_password = ''
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        expected_code = KPASSWD_HARDERROR
+        expected_msg = b'String conversion failed!'
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test kpasswd rejects a request that does not include a random sequence
+    # number.
+    def test_kpasswd_no_seq_number(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_HARDERROR
+        expected_msg = b'gensec_unwrap failed - NT_STATUS_ACCESS_DENIED\n'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET,
+                              send_seq_number=False)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE,
+                              send_seq_number=False)
+
+    # Test kpasswd rejects a ticket issued by an RODC.
+    def test_kpasswd_from_rodc(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        # Have the ticket be issued by the RODC.
+        ticket = self.issued_by_rodc(ticket)
+
+        expected_code = KPASSWD_HARDERROR
+        expected_msg = b'gensec_update failed - NT_STATUS_LOGON_FAILURE\n'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test setting a password, specifying the principal of the target user.
+    def test_kpasswd_set_target_princ_only(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+        username = creds.get_username()
+
+        cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                          names=username.split('/'))
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_MALFORMED
+        expected_msg = (b'Realm and principal must be both present, or '
+                        b'neither present',
+                        b'Failed to decode packet')
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET,
+                              target_princ=cname)
+
+    # Test that kpasswd rejects a password set specifying only the realm of the
+    # target user.
+    def test_kpasswd_set_target_realm_only(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_MALFORMED, KPASSWD_ACCESSDENIED
+        expected_msg = (b'Realm and principal must be both present, or '
+                        b'neither present',
+                        b'Failed to decode packet',
+                        b'No such user when changing password')
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET,
+                              target_realm=creds.get_realm())
+
+    # Show that a user cannot set a password, specifying both principal and
+    # realm of the target user, without having control access.
+    def test_kpasswd_set_target_princ_and_realm_no_access(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+        username = creds.get_username()
+
+        cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                          names=username.split('/'))
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_ACCESSDENIED
+        expected_msg = b'Not permitted to change password'
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET,
+                              target_princ=cname,
+                              target_realm=creds.get_realm())
+
+    # Test setting a password, specifying both principal and realm of the
+    # target user, whem the user has control access on their account.
+    def test_kpasswd_set_target_princ_and_realm_access(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+        username = creds.get_username()
+        tgt = self.get_tgt(creds)
+
+        cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                          names=username.split('/'))
+
+        samdb = self.get_samdb()
+        sd_utils = SDUtils(samdb)
+
+        user_dn = creds.get_dn()
+        user_sid = self.get_objectSid(samdb, user_dn)
+
+        # Give the user control access on their account.
+        ace = f'(A;;CR;;;{user_sid})'
+        sd_utils.dacl_add_ace(user_dn, ace)
+
+        # Get a non-initial ticket to kpasswd. Since we have the right to
+        # change the account's password, we don't need an initial ticket.
+        krbtgt_creds = self.get_krbtgt_creds()
+        ticket = self.get_service_ticket(tgt,
+                                         krbtgt_creds,
+                                         service='kadmin',
+                                         target_name='changepw',
+                                         kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET,
+                              target_princ=cname,
+                              target_realm=creds.get_realm())
+
+    # Test setting a password when the existing password has expired.
+    def test_kpasswd_set_expired_password(self):
+        # Create an account for testing, with an expired password.
+        creds = self._get_creds(expired=True)
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+    # Test changing a password when the existing password has expired.
+    def test_kpasswd_change_expired_password(self):
+        # Create an account for testing, with an expired password.
+        creds = self._get_creds(expired=True)
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Check the lifetime of a kpasswd ticket is not more than two minutes.
+    def test_kpasswd_ticket_lifetime(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        # Check the lifetime of the ticket is equal to two minutes.
+        lifetime = self.get_ticket_lifetime(ticket)
+        self.assertEqual(2 * 60, lifetime)
+
+    # Ensure we cannot perform a TGS-REQ with a kpasswd ticket.
+    def test_kpasswd_ticket_tgs(self):
+        creds = self.get_client_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        # Change the sname of the ticket to match that of a TGT.
+        realm = creds.get_realm()
+        krbtgt_sname = self.PrincipalName_create(name_type=NT_SRV_INST,
+                                                 names=['krbtgt', realm])
+        ticket.set_sname(krbtgt_sname)
+
+        # Try to use that ticket to get a service ticket.
+        service_creds = self.get_service_creds()
+
+        # This fails due to missing REQUESTER_SID buffer.
+        self._make_tgs_request(creds, service_creds, ticket,
+                               expect_error=(KDC_ERR_TGT_REVOKED,
+                                             KDC_ERR_TKT_EXPIRED))
+
+    def modify_requester_sid_time(self, ticket, sid, lifetime):
+        # Get the krbtgt key.
+        krbtgt_creds = self.get_krbtgt_creds()
+
+        krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
+        checksum_keys = {
+            krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key,
+        }
+
+        # Set authtime and starttime to an hour in the past, to show that they
+        # do not affect ticket rejection.
+        start_time = self.get_KerberosTime(offset=-60 * 60)
+
+        # Set the endtime of the ticket relative to our current time, so that
+        # the ticket has 'lifetime' seconds remaining to live.
+        end_time = self.get_KerberosTime(offset=lifetime)
+
+        # Modify the times in the ticket.
+        def modify_ticket_times(enc_part):
+            enc_part['authtime'] = start_time
+            if 'starttime' in enc_part:
+                enc_part['starttime'] = start_time
+
+            enc_part['endtime'] = end_time
+
+            return enc_part
+
+        # We have to set the times in both the ticket and the PAC, otherwise
+        # Heimdal will complain.
+        def modify_pac_time(pac):
+            pac_buffers = pac.buffers
+
+            for pac_buffer in pac_buffers:
+                if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_NAME:
+                    logon_time = self.get_EpochFromKerberosTime(start_time)
+                    pac_buffer.info.logon_time = unix2nttime(logon_time)
+                    break
+            else:
+                self.fail('failed to find LOGON_NAME PAC buffer')
+
+            pac.buffers = pac_buffers
+
+            return pac
+
+        # Add a requester SID to show that the KDC will then accept this
+        # kpasswd ticket as if it were a TGT.
+        def modify_pac_fn(pac):
+            pac = self.add_requester_sid(pac, sid=sid)
+            pac = modify_pac_time(pac)
+            return pac
+
+        # Do the actual modification.
+        return self.modified_ticket(ticket,
+                                    new_ticket_key=krbtgt_key,
+                                    modify_fn=modify_ticket_times,
+                                    modify_pac_fn=modify_pac_fn,
+                                    checksum_keys=checksum_keys)
+
+    # Ensure we cannot perform a TGS-REQ with a kpasswd ticket containing a
+    # requester SID and having a remaining lifetime of two minutes.
+    def test_kpasswd_ticket_requester_sid_tgs(self):
+        creds = self.get_client_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        # Change the sname of the ticket to match that of a TGT.
+        realm = creds.get_realm()
+        krbtgt_sname = self.PrincipalName_create(name_type=NT_SRV_INST,
+                                                 names=['krbtgt', realm])
+        ticket.set_sname(krbtgt_sname)
+
+        # Get the user's SID.
+        samdb = self.get_samdb()
+
+        user_dn = creds.get_dn()
+        user_sid = self.get_objectSid(samdb, user_dn)
+
+        # Modify the ticket to add a requester SID and give it two minutes to
+        # live.
+        ticket = self.modify_requester_sid_time(ticket,
+                                                sid=user_sid,
+                                                lifetime=2 * 60)
+
+        # Try to use that ticket to get a service ticket.
+        service_creds = self.get_service_creds()
+
+        # This fails due to the lifetime being too short.
+        self._make_tgs_request(creds, service_creds, ticket,
+                               expect_error=KDC_ERR_TKT_EXPIRED)
+
+    # Show we can perform a TGS-REQ with a kpasswd ticket containing a
+    # requester SID if the remaining lifetime exceeds two minutes.
+    def test_kpasswd_ticket_requester_sid_lifetime_tgs(self):
+        creds = self.get_client_creds()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=self.get_kpasswd_sname(),
+                              kdc_options='0')
+
+        # Change the sname of the ticket to match that of a TGT.
+        realm = creds.get_realm()
+        krbtgt_sname = self.PrincipalName_create(name_type=NT_SRV_INST,
+                                                 names=['krbtgt', realm])
+        ticket.set_sname(krbtgt_sname)
+
+        # Get the user's SID.
+        samdb = self.get_samdb()
+
+        user_dn = creds.get_dn()
+        user_sid = self.get_objectSid(samdb, user_dn)
+
+        # Modify the ticket to add a requester SID and give it two minutes and
+        # ten seconds to live.
+        ticket = self.modify_requester_sid_time(ticket,
+                                                sid=user_sid,
+                                                lifetime=2 * 60 + 10)
+
+        # Try to use that ticket to get a service ticket.
+        service_creds = self.get_service_creds()
+
+        # This succeeds.
+        self._make_tgs_request(creds, service_creds, ticket,
+                               expect_error=False)
+
+    # Test that kpasswd rejects requests with a service ticket.
+    def test_kpasswd_non_initial(self):
+        # Create an account for testing, and get a TGT.
+        creds = self._get_creds()
+        tgt = self.get_tgt(creds)
+
+        # Get a non-initial ticket to kpasswd.
+        krbtgt_creds = self.get_krbtgt_creds()
+        ticket = self.get_service_ticket(tgt,
+                                         krbtgt_creds,
+                                         service='kadmin',
+                                         target_name='changepw',
+                                         kdc_options='0')
+
+        expected_code = KPASSWD_INITIAL_FLAG_NEEDED
+        expected_msg = b'Expected an initial ticket'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Show that kpasswd accepts requests with a service ticket modified to set
+    # the 'initial' flag.
+    def test_kpasswd_initial(self):
+        # Create an account for testing, and get a TGT.
+        creds = self._get_creds()
+
+        krbtgt_creds = self.get_krbtgt_creds()
+
+        # Get a service ticket, and modify it to set the 'initial' flag.
+        def get_ticket():
+            tgt = self.get_tgt(creds, fresh=True)
+
+            # Get a non-initial ticket to kpasswd.
+            ticket = self.get_service_ticket(tgt,
+                                             krbtgt_creds,
+                                             service='kadmin',
+                                             target_name='changepw',
+                                             kdc_options='0',
+                                             fresh=True)
+
+            set_initial_flag = partial(self.modify_ticket_flag, flag='initial',
+                                       value=True)
+
+            checksum_keys = self.get_krbtgt_checksum_key()
+            return self.modified_ticket(ticket,
+                                        modify_fn=set_initial_flag,
+                                        checksum_keys=checksum_keys)
+
+        expected_code = KPASSWD_SUCCESS
+        expected_msg = b'Password changed'
+
+        ticket = get_ticket()
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        creds.update_password(new_password)
+        ticket = get_ticket()
+
+        # Change the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test that kpasswd rejects requests where the ticket is encrypted with a
+    # key other than the krbtgt's.
+    def test_kpasswd_wrong_key(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        sname = self.get_kpasswd_sname()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              kdc_options='0')
+
+        # Get a key belonging to the Administrator account.
+        admin_creds = self.get_admin_creds()
+        admin_key = self.TicketDecryptionKey_from_creds(admin_creds)
+        self.assertIsNotNone(admin_key.kvno,
+                             'a kvno is required to tell the DB '
+                             'which key to look up.')
+        checksum_keys = {
+            krb5pac.PAC_TYPE_KDC_CHECKSUM: admin_key,
+        }
+
+        # Re-encrypt the ticket using the Administrator's key.
+        ticket = self.modified_ticket(ticket,
+                                      new_ticket_key=admin_key,
+                                      checksum_keys=checksum_keys)
+
+        # Set the sname of the ticket to that of the Administrator account.
+        admin_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                                names=['Administrator'])
+        ticket.set_sname(admin_sname)
+
+        expected_code = KPASSWD_HARDERROR
+        expected_msg = b'gensec_update failed - NT_STATUS_LOGON_FAILURE\n'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    def test_kpasswd_wrong_key_service(self):
+        # Create an account for testing.
+        creds = self.get_cached_creds(account_type=self.AccountType.COMPUTER,
+                                      use_cache=False)
+
+        sname = self.get_kpasswd_sname()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              kdc_options='0')
+
+        # Get a key belonging to our account.
+        our_key = self.TicketDecryptionKey_from_creds(creds)
+        self.assertIsNotNone(our_key.kvno,
+                             'a kvno is required to tell the DB '
+                             'which key to look up.')
+        checksum_keys = {
+            krb5pac.PAC_TYPE_KDC_CHECKSUM: our_key,
+        }
+
+        # Re-encrypt the ticket using our key.
+        ticket = self.modified_ticket(ticket,
+                                      new_ticket_key=our_key,
+                                      checksum_keys=checksum_keys)
+
+        # Set the sname of the ticket to that of our account.
+        username = creds.get_username()
+        sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                          names=username.split('/'))
+        ticket.set_sname(sname)
+
+        expected_code = KPASSWD_HARDERROR
+        expected_msg = b'gensec_update failed - NT_STATUS_LOGON_FAILURE\n'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+    # Test that kpasswd rejects requests where the ticket is encrypted with a
+    # key belonging to a server account other than the krbtgt.
+    def test_kpasswd_wrong_key_server(self):
+        # Create an account for testing.
+        creds = self._get_creds()
+
+        sname = self.get_kpasswd_sname()
+
+        # Get an initial ticket to kpasswd.
+        ticket = self.get_tgt(creds, sname=sname,
+                              kdc_options='0')
+
+        # Get a key belonging to the DC's account.
+        dc_creds = self.get_dc_creds()
+        dc_key = self.TicketDecryptionKey_from_creds(dc_creds)
+        self.assertIsNotNone(dc_key.kvno,
+                             'a kvno is required to tell the DB '
+                             'which key to look up.')
+        checksum_keys = {
+            krb5pac.PAC_TYPE_KDC_CHECKSUM: dc_key,
+        }
+
+        # Re-encrypt the ticket using the DC's key.
+        ticket = self.modified_ticket(ticket,
+                                      new_ticket_key=dc_key,
+                                      checksum_keys=checksum_keys)
+
+        # Set the sname of the ticket to that of the DC's account.
+        dc_username = dc_creds.get_username()
+        dc_sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+                                             names=dc_username.split('/'))
+        ticket.set_sname(dc_sname)
+
+        expected_code = KPASSWD_HARDERROR
+        expected_msg = b'gensec_update failed - NT_STATUS_LOGON_FAILURE\n'
+
+        # Set the password.
+        new_password = generate_random_password(32, 32)
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.SET)
+
+        # Change the password.
+        self.kpasswd_exchange(ticket,
+                              new_password,
+                              expected_code,
+                              expected_msg,
+                              mode=self.KpasswdMode.CHANGE)
+
+
+if __name__ == '__main__':
+    global_asn1_print = False
+    global_hexdump = False
+    import unittest
+    unittest.main()
index 57010ae73bdf7e7853609e2fb6bf2dd36f7d6d67..4a78a8eadf3614a518718d04a991d8285696376e 100644 (file)
@@ -500,6 +500,10 @@ class KerberosCredentials(Credentials):
     def get_upn(self):
         return self.upn
 
+    def update_password(self, password):
+        self.set_password(password)
+        self.set_kvno(self.get_kvno() + 1)
+
 
 class KerberosTicketCreds:
     def __init__(self, ticket, session_key,
@@ -518,6 +522,10 @@ class KerberosTicketCreds:
         self.ticket_private = ticket_private
         self.encpart_private = encpart_private
 
+    def set_sname(self, sname):
+        self.ticket['sname'] = sname
+        self.sname = sname
+
 
 class RawKerberosTest(TestCaseInTempDir):
     """A raw Kerberos Test case."""
index 6bbd96e7a0814faa2d1d5f6ba87cd767458a8367..a1210ada5797fdfe66b81594c8800f8f33017e77 100644 (file)
@@ -109,6 +109,7 @@ EXCLUDE_USAGE = {
     'python/samba/tests/krb5/alias_tests.py',
     'python/samba/tests/krb5/test_min_domain_uid.py',
     'python/samba/tests/krb5/test_idmap_nss.py',
+    'python/samba/tests/krb5/kpasswd_tests.py',
 }
 
 EXCLUDE_HELP = {
index 424a8b81c38b00f11668a431624ce785328c8980..54e69a48bc1bbacf10305eccaeb12cf2219a21e3 100644 (file)
 ^samba.tests.krb5.kdc_tgs_tests.samba.tests.krb5.kdc_tgs_tests.KdcTgsTests.test_fast_service_ticket
 ^samba.tests.krb5.kdc_tgs_tests.samba.tests.krb5.kdc_tgs_tests.KdcTgsTests.test_fast_sid_mismatch_existing
 ^samba.tests.krb5.kdc_tgs_tests.samba.tests.krb5.kdc_tgs_tests.KdcTgsTests.test_fast_sid_mismatch_nonexisting
+#
+# Kpasswd tests
+#
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_canonicalize.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_canonicalize_realm_case.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_change.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_change_expired_password.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_empty.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_from_rodc.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_initial.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_no_canonicalize.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_no_canonicalize_realm_case.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_no_seq_number.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_non_initial.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_expired_password.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_princ_and_realm_access.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_princ_and_realm_no_access.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_princ_only.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_realm_only.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_ticket_lifetime.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_ticket_requester_sid_tgs.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_too_weak.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_wrong_key.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_wrong_key_server.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_wrong_key_service.ad_dc
index 108c6055d0cf073955ba7687cbc4e9d125d55502..53638afc17a496b174caa6e6e709622600411a77 100644 (file)
@@ -575,3 +575,29 @@ samba.tests.krb5.as_canonicalization_tests.samba.tests.krb5.as_canonicalization_
 ^samba.tests.krb5.kdc_tgs_tests.samba.tests.krb5.kdc_tgs_tests.KdcTgsTests.test_tgs_rodc_logon_info_sid_mismatch_nonexisting
 ^samba.tests.krb5.kdc_tgs_tests.samba.tests.krb5.kdc_tgs_tests.KdcTgsTests.test_tgs_rodc_requester_sid_mismatch_existing
 ^samba.tests.krb5.kdc_tgs_tests.samba.tests.krb5.kdc_tgs_tests.KdcTgsTests.test_tgs_rodc_requester_sid_mismatch_nonexisting
+#
+# Kpasswd tests
+#
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_canonicalize.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_canonicalize_realm_case.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_change.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_change_expired_password.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_empty.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_from_rodc.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_initial.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_no_canonicalize.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_no_canonicalize_realm_case.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_no_seq_number.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_non_initial.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_expired_password.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_princ_and_realm_access.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_princ_and_realm_no_access.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_princ_only.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_set_target_realm_only.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_ticket_lifetime.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_ticket_requester_sid_tgs.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_too_weak.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_wrong_key.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_wrong_key_server.ad_dc
+^samba.tests.krb5.kpasswd_tests.samba.tests.krb5.kpasswd_tests.KpasswdTests.test_kpasswd_wrong_key_service.ad_dc
index c7e1667e025a88211e9f7aaaebdcc1ba06f175e5..e29ece5b849ca25811a743bf16984d4e4cada888 100755 (executable)
@@ -1499,6 +1499,10 @@ planpythontestsuite(
     "ad_dc",
     "samba.tests.krb5.alias_tests",
     environ=krb5_environ)
+planoldpythontestsuite(
+    'ad_dc',
+    'samba.tests.krb5.kpasswd_tests',
+    environ=krb5_environ)
 
 for env in [
         'vampire_dc',