CVE-2022-2031 tests/krb5: Add methods to send and receive generic messages
authorJoseph Sutton <josephsutton@catalyst.net.nz>
Tue, 24 May 2022 07:20:28 +0000 (19:20 +1200)
committerJule Anger <janger@samba.org>
Wed, 27 Jul 2022 10:52:36 +0000 (10:52 +0000)
This allows us to send and receive kpasswd messages, while avoiding the
existing logic for encoding and decoding other Kerberos message types.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
python/samba/tests/krb5/raw_testcase.py

index 3374beecd1ed1b20768e3bc0b2f6c565c30f9781..b16b78ebf562fed090c5d8d70b7e8c78cced9cfe 100644 (file)
@@ -955,24 +955,28 @@ class RawKerberosTest(TestCaseInTempDir):
         return blob
 
     def send_pdu(self, req, asn1_print=None, hexdump=None):
+        k5_pdu = self.der_encode(
+            req, native_decode=False, asn1_print=asn1_print, hexdump=False)
+        self.send_msg(k5_pdu, hexdump=hexdump)
+
+    def send_msg(self, msg, hexdump=None):
+        header = struct.pack('>I', len(msg))
+        req_pdu = header
+        req_pdu += msg
+        self.hex_dump("send_msg", header, hexdump=hexdump)
+        self.hex_dump("send_msg", msg, hexdump=hexdump)
+
         try:
-            k5_pdu = self.der_encode(
-                req, native_decode=False, asn1_print=asn1_print, hexdump=False)
-            header = struct.pack('>I', len(k5_pdu))
-            req_pdu = header
-            req_pdu += k5_pdu
-            self.hex_dump("send_pdu", header, hexdump=hexdump)
-            self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
             while True:
                 sent = self.s.send(req_pdu, 0)
                 if sent == len(req_pdu):
-                    break
+                    return
                 req_pdu = req_pdu[sent:]
         except socket.error as e:
-            self._disconnect("send_pdu: %s" % e)
+            self._disconnect("send_msg: %s" % e)
             raise
         except IOError as e:
-            self._disconnect("send_pdu: %s" % e)
+            self._disconnect("send_msg: %s" % e)
             raise
 
     def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
@@ -998,16 +1002,14 @@ class RawKerberosTest(TestCaseInTempDir):
         return rep_pdu
 
     def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
-        rep_pdu = None
-        rep = None
         raw_pdu = self.recv_raw(
             num_recv=4, hexdump=hexdump, timeout=timeout)
         if raw_pdu is None:
-            return (None, None)
+            return None
         header = struct.unpack(">I", raw_pdu[0:4])
         k5_len = header[0]
         if k5_len == 0:
-            return (None, "")
+            return ""
         missing = k5_len
         rep_pdu = b''
         while missing > 0:
@@ -1016,6 +1018,14 @@ class RawKerberosTest(TestCaseInTempDir):
             self.assertGreaterEqual(len(raw_pdu), 1)
             rep_pdu += raw_pdu
             missing = k5_len - len(rep_pdu)
+        return rep_pdu
+
+    def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
+        rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
+                                    hexdump=hexdump,
+                                    timeout=timeout)
+        if not rep_pdu:
+            return None, rep_pdu
         k5_raw = self.der_decode(
             rep_pdu,
             asn1Spec=None,
@@ -1037,9 +1047,9 @@ class RawKerberosTest(TestCaseInTempDir):
         return (rep, rep_pdu)
 
     def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
-        (rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
-                                           hexdump=hexdump,
-                                           timeout=timeout)
+        (rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
+                                         hexdump=hexdump,
+                                         timeout=timeout)
         return rep
 
     def assertIsConnected(self):