tests/krb5: Improve edata checking
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Thu, 4 May 2023 02:55:36 +0000 (14:55 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 18 May 2023 01:03:37 +0000 (01:03 +0000)
Instead of guessing based on a heuristic whether we have KERB_ERROR_DATA
or METHOD_DATA in the ‘e-data’ field, decode it first as KERB_ERROR_DATA
and fall back to METHOD_DATA if that fails.

The environment variable EXPECT_NT_STATUS indicates that the KDC
supports returning a status code in the e-data field.

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/krb5/kdc_tgs_tests.py
python/samba/tests/krb5/raw_testcase.py
python/samba/tests/krb5/s4u_tests.py
source4/selftest/tests.py

index 91890d34ee5cf4ede75bb42fda766f40484f3eb8..24392ceba300053ef5f4a8e37ecd925b9ea97fc3 100755 (executable)
@@ -191,6 +191,7 @@ class KdcTgsBaseTests(KDCBaseTest):
                  expect_requester_sid=None,
                  expect_edata=False,
                  expected_sid=None,
+                 expect_status=None,
                  expected_status=None,
                  expected_proxy_target=None,
                  expected_transited_services=None):
@@ -282,6 +283,7 @@ class KdcTgsBaseTests(KDCBaseTest):
             check_rep_fn=check_rep_fn,
             check_kdc_private_fn=self.generic_check_kdc_private,
             expected_error_mode=expected_error,
+            expect_status=expect_status,
             expected_status=expected_status,
             tgt=tgt,
             armor_key=armor_key,
@@ -1619,6 +1621,7 @@ class KdcTgsTests(KdcTgsBaseTests):
         self._run_tgs(tgt, creds, expected_error=(KDC_ERR_GENERIC,
                                            KDC_ERR_BADKEYVER),
                       expect_edata=True,
+                      expect_status=self.expect_nt_status,
                       expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
 
     def test_renew_rc4(self):
@@ -1645,6 +1648,7 @@ class KdcTgsTests(KdcTgsBaseTests):
         self._s4u2self(tgt, creds, expected_error=(KDC_ERR_GENERIC,
                                                    KDC_ERR_BADKEYVER),
                        expect_edata=True,
+                       expect_status=self.expect_nt_status,
                        expected_status=ntstatus.NT_STATUS_INSUFFICIENT_RESOURCES)
 
     def test_user2user_rc4(self):
@@ -2930,7 +2934,7 @@ class KdcTgsTests(KdcTgsBaseTests):
     def _run_tgs(self, tgt, creds, expected_error, *, expect_pac=True,
                  expect_pac_attrs=None, expect_pac_attrs_pac_request=None,
                  expect_requester_sid=None, expected_sid=None,
-                 expect_edata=False, expected_status=None):
+                 expect_edata=False, expect_status=None, expected_status=None):
         target_creds = self.get_service_creds()
         return self._tgs_req(
             tgt, expected_error, creds, target_creds,
@@ -2940,6 +2944,7 @@ class KdcTgsTests(KdcTgsBaseTests):
             expect_requester_sid=expect_requester_sid,
             expected_sid=expected_sid,
             expect_edata=expect_edata,
+            expect_status=expect_status,
             expected_status=expected_status)
 
     # These tests fail against Windows, which does not implement ticket
@@ -2977,7 +2982,8 @@ class KdcTgsTests(KdcTgsBaseTests):
             expected_sid=expected_sid)
 
     def _s4u2self(self, tgt, tgt_creds, expected_error, *, expect_pac=True,
-                  expect_edata=False, expected_status=None):
+                  expect_edata=False, expect_status=None,
+                  expected_status=None):
         user_creds = self._get_mach_creds()
 
         user_name = user_creds.get_username()
@@ -3000,6 +3006,7 @@ class KdcTgsTests(KdcTgsBaseTests):
                              expected_cname=user_cname,
                              generate_padata_fn=generate_s4u2self_padata,
                              expect_edata=expect_edata,
+                             expect_status=expect_status,
                              expected_status=expected_status,
                              expect_pac=expect_pac)
 
index a21a49636799439951416addf7c27f8da9d1ea37..22062c5ec8c516f94892e2518e08692c542c8703 100644 (file)
@@ -732,6 +732,12 @@ class RawKerberosTest(TestCaseInTempDir):
             expect_nt_hash = '1'
         cls.expect_nt_hash = bool(int(expect_nt_hash))
 
+        expect_nt_status = samba.tests.env_get_var_value('EXPECT_NT_STATUS',
+                                                         allow_missing=True)
+        if expect_nt_status is None:
+            expect_nt_status = '1'
+        cls.expect_nt_status = bool(int(expect_nt_status))
+
     def setUp(self):
         super().setUp()
         self.do_asn1_print = False
@@ -2508,6 +2514,7 @@ class RawKerberosTest(TestCaseInTempDir):
                          check_kdc_private_fn=None,
                          callback_dict=None,
                          expected_error_mode=0,
+                         expect_status=None,
                          expected_status=None,
                          expected_salt=None,
                          authenticator_subkey=None,
@@ -2581,6 +2588,7 @@ class RawKerberosTest(TestCaseInTempDir):
             'check_kdc_private_fn': check_kdc_private_fn,
             'callback_dict': callback_dict,
             'expected_error_mode': expected_error_mode,
+            'expect_status': expect_status,
             'expected_status': expected_status,
             'expected_salt': expected_salt,
             'authenticator_subkey': authenticator_subkey,
@@ -2650,6 +2658,7 @@ class RawKerberosTest(TestCaseInTempDir):
                           check_rep_fn=None,
                           check_kdc_private_fn=None,
                           expected_error_mode=0,
+                          expect_status=None,
                           expected_status=None,
                           callback_dict=None,
                           tgt=None,
@@ -2727,6 +2736,7 @@ class RawKerberosTest(TestCaseInTempDir):
             'check_kdc_private_fn': check_kdc_private_fn,
             'callback_dict': callback_dict,
             'expected_error_mode': expected_error_mode,
+            'expect_status': expect_status,
             'expected_status': expected_status,
             'tgt': tgt,
             'body_checksum_type': body_checksum_type,
@@ -3830,6 +3840,7 @@ class RawKerberosTest(TestCaseInTempDir):
             self.assertElementEqualUTF8(rep, 'realm', expected_srealm)
             self.assertElementEqualPrincipal(rep, 'sname', expected_sname)
             self.assertElementMissing(rep, 'e-text')
+        expect_status = kdc_exchange_dict['expect_status']
         expected_status = kdc_exchange_dict['expected_status']
         expect_edata = kdc_exchange_dict['expect_edata']
         if expect_edata is None:
@@ -3840,36 +3851,31 @@ class RawKerberosTest(TestCaseInTempDir):
         if inner and expect_edata is self.expect_padata_outer:
             expect_edata = False
         if not expect_edata:
-            self.assertIsNone(expected_status)
-            if self.strict_checking:
+            self.assertFalse(expect_status)
+            if self.strict_checking or expect_status is False:
                 self.assertElementMissing(rep, 'e-data')
             return rep
         edata = self.getElementValue(rep, 'e-data')
-        if self.strict_checking:
+        if self.strict_checking or expect_status:
             self.assertIsNotNone(edata)
         if edata is not None:
-            if rep_msg_type == KRB_TGS_REP and not sent_fast:
+            try:
                 error_data = self.der_decode(
                     edata,
                     asn1Spec=krb5_asn1.KERB_ERROR_DATA())
-                self.assertEqual(KERB_ERR_TYPE_EXTENDED,
-                                 error_data['data-type'])
-
-                extended_error = error_data['data-value']
-
-                self.assertEqual(12, len(extended_error))
-
-                status = int.from_bytes(extended_error[:4], 'little')
-                flags = int.from_bytes(extended_error[8:], 'little')
-
-                self.assertEqual(expected_status, status)
-
-                self.assertEqual(3, flags)
-            else:
-                self.assertIsNone(expected_status)
-
-                rep_padata = self.der_decode(edata,
-                                             asn1Spec=krb5_asn1.METHOD_DATA())
+            except PyAsn1Error:
+                if expect_status:
+                    # The test requires that the KDC be declared to support
+                    # NTSTATUS values in e-data to proceed.
+                    self.assertTrue(
+                        self.expect_nt_status,
+                        'expected status code (which, according to '
+                        'EXPECT_NT_STATUS=0, the KDC does not support)')
+
+                    self.fail('expected to get status code')
+
+                rep_padata = self.der_decode(
+                    edata, asn1Spec=krb5_asn1.METHOD_DATA())
                 self.assertGreater(len(rep_padata), 0)
 
                 if sent_fast:
@@ -3893,6 +3899,30 @@ class RawKerberosTest(TestCaseInTempDir):
                                                     error_code)
 
                 kdc_exchange_dict['preauth_etype_info2'] = etype_info2
+            else:
+                self.assertTrue(self.expect_nt_status,
+                                'got status code, but EXPECT_NT_STATUS=0')
+
+                if expect_status is not None:
+                    self.assertTrue(expect_status,
+                                    'got unexpected status code')
+
+                self.assertEqual(KERB_ERR_TYPE_EXTENDED,
+                                 error_data['data-type'])
+
+                extended_error = error_data['data-value']
+
+                self.assertEqual(12, len(extended_error))
+
+                status = int.from_bytes(extended_error[:4], 'little')
+                flags = int.from_bytes(extended_error[8:], 'little')
+
+                self.assertEqual(expected_status, status)
+
+                if rep_msg_type == KRB_TGS_REP:
+                    self.assertEqual(3, flags)
+                else:
+                    self.assertEqual(1, flags)
 
         return rep
 
@@ -4955,6 +4985,8 @@ class RawKerberosTest(TestCaseInTempDir):
                           expected_device_claims=None,
                           unexpected_device_claims=None,
                           expect_edata=None,
+                          expect_status=None,
+                          expected_status=None,
                           rc4_support=True,
                           to_rodc=False):
 
@@ -5012,6 +5044,8 @@ class RawKerberosTest(TestCaseInTempDir):
             expected_device_claims=expected_device_claims,
             unexpected_device_claims=unexpected_device_claims,
             expect_edata=expect_edata,
+            expect_status=expect_status,
+            expected_status=expected_status,
             rc4_support=rc4_support,
             to_rodc=to_rodc)
 
index 2c55c876c81b2566804cae993d8c749e9d47e51a..fa1d2141fd428d3074905c8b1e1060a3b9ed318e 100755 (executable)
@@ -303,6 +303,8 @@ class S4UKerberosTests(KDCBaseTest):
 
             return [pa_s4u], req_body
 
+        expect_status = self.expect_nt_status and expected_status is not None
+
         kdc_exchange_dict = self.tgs_exchange_dict(
             expected_crealm=realm,
             expected_cname=client_cname,
@@ -321,6 +323,7 @@ class S4UKerberosTests(KDCBaseTest):
             check_rep_fn=check_rep_fn,
             check_kdc_private_fn=self.generic_check_kdc_private,
             expected_error_mode=expected_error_mode,
+            expect_status=expect_status,
             expected_status=expected_status,
             tgt=service_tgt,
             authenticator_subkey=authenticator_subkey,
@@ -733,6 +736,8 @@ class S4UKerberosTests(KDCBaseTest):
         transited_service = f'host/{service1_name}@{service1_realm}'
         expected_transited_services.append(transited_service)
 
+        expect_status = self.expect_nt_status and expected_status is not None
+
         kdc_exchange_dict = self.tgs_exchange_dict(
             expected_crealm=client_realm,
             expected_cname=client_cname,
@@ -748,6 +753,7 @@ class S4UKerberosTests(KDCBaseTest):
             check_rep_fn=check_rep_fn,
             check_kdc_private_fn=self.generic_check_kdc_private,
             expected_error_mode=expected_error_mode,
+            expect_status=expect_status,
             expected_status=expected_status,
             callback_dict={},
             tgt=service1_tgt,
index 41778ce89583ea6af7cf9911fb6460f3ccfaf521..8adff4b281ff79543a5672fdf0e8a5bb7b792d9e 100755 (executable)
@@ -1205,6 +1205,7 @@ expect_pac = int('SAMBA4_USES_HEIMDAL' in config_hash)
 extra_pac_buffers = int('SAMBA4_USES_HEIMDAL' in config_hash)
 check_cname = int('SAMBA4_USES_HEIMDAL' in config_hash)
 check_padata = int('SAMBA4_USES_HEIMDAL' in config_hash)
+expect_nt_status = 0
 krb5_environ = {
     'SERVICE_USERNAME': '$SERVER',
     'ADMIN_USERNAME': '$DC_USERNAME',
@@ -1223,6 +1224,7 @@ krb5_environ = {
     'CHECK_CNAME': check_cname,
     'CHECK_PADATA': check_padata,
     'KADMIN_IS_TGS': kadmin_is_tgs,
+    'EXPECT_NT_STATUS': expect_nt_status,
 }
 planoldpythontestsuite("none", "samba.tests.krb5.kcrypto")
 planoldpythontestsuite("none", "samba.tests.krb5.claims_in_pac")