CVE-2016-0771: tests/dns: modify tests to check via RPC
authorGarming Sam <garming@catalyst.net.nz>
Wed, 27 Jan 2016 04:41:44 +0000 (17:41 +1300)
committerKarolin Seeger <kseeger@samba.org>
Wed, 24 Feb 2016 10:43:59 +0000 (11:43 +0100)
This checks that TXT records added over DNS, look the same over RPC.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/dns.py

index e153a2b1b94b8f3eb4d57514f816c0841c01a8a7..784485e9b3867e550cb9c7cd0f1f28d4230148c0 100644 (file)
@@ -146,6 +146,47 @@ class DNSTest(TestCase):
            N+=length
         return result
 
+    def make_txt_update(self, prefix, txt_array):
+        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+        updates = []
+
+        name = self.get_dns_domain()
+        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+        updates.append(u)
+        self.finish_name_packet(p, updates)
+
+        updates = []
+        r = dns.res_rec()
+        r.name = "%s.%s" % (prefix, self.get_dns_domain())
+        r.rr_type = dns.DNS_QTYPE_TXT
+        r.rr_class = dns.DNS_QCLASS_IN
+        r.ttl = 900
+        r.length = 0xffff
+        rdata = make_txt_record(txt_array)
+        r.rdata = rdata
+        updates.append(r)
+        p.nscount = len(updates)
+        p.nsrecs = updates
+
+        return p
+
+    def check_query_txt(self, prefix, txt_array):
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        questions = []
+
+        q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+        questions.append(q)
+
+        self.finish_name_packet(p, questions)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.assertEquals(response.ancount, 1)
+        self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
+
+    def assertIsNotNone(self, item):
+        self.assertTrue(item is not None)
+
 class TestSimpleQueries(DNSTest):
 
     def test_one_a_query(self):
@@ -415,44 +456,6 @@ class TestDNSUpdates(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
 
-    def make_txt_update(self, prefix, txt_array):
-        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
-        updates = []
-
-        name = self.get_dns_domain()
-        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
-        updates.append(u)
-        self.finish_name_packet(p, updates)
-
-        updates = []
-        r = dns.res_rec()
-        r.name = "%s.%s" % (prefix, self.get_dns_domain())
-        r.rr_type = dns.DNS_QTYPE_TXT
-        r.rr_class = dns.DNS_QCLASS_IN
-        r.ttl = 900
-        r.length = 0xffff
-        rdata = make_txt_record(txt_array)
-        r.rdata = rdata
-        updates.append(r)
-        p.nscount = len(updates)
-        p.nsrecs = updates
-
-        return p
-
-    def check_query_txt(self, prefix, txt_array):
-        name = "%s.%s" % (prefix, self.get_dns_domain())
-        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
-        questions = []
-
-        q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
-        questions.append(q)
-
-        self.finish_name_packet(p, questions)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.assertEquals(response.ancount, 1)
-        self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
-
     def test_update_add_txt_record(self):
         "test adding records works"
         prefix, txt = 'textrec', ['"This is a test"']
@@ -461,74 +464,6 @@ class TestDNSUpdates(DNSTest):
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
 
-    def test_update_add_null_padded_txt_record(self):
-        "test adding records works"
-        prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
-        prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
-        prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
-    # Test is incomplete due to strlen against txt records
-    def test_update_add_null_char_txt_record(self):
-        "test adding records works"
-        prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, ['NULL'])
-
-        prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, ['NULL', 'NULL'])
-
-    def test_update_add_hex_char_txt_record(self):
-        "test adding records works"
-        prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
-    def test_update_add_slash_txt_record(self):
-        "test adding records works"
-        prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
-    def test_update_add_two_txt_records(self):
-        "test adding two txt records works"
-        prefix, txt = 'textrec2', ['"This is a test"',
-                                   '"and this is a test, too"']
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
-    def test_update_add_empty_txt_records(self):
-        "test adding two txt records works"
-        prefix, txt = 'emptytextrec', []
-        p = self.make_txt_update(prefix, txt)
-        response = self.dns_transaction_udp(p)
-        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
-        self.check_query_txt(prefix, txt)
-
     def test_delete_record(self):
         "Test if deleting records works"
 
@@ -901,6 +836,130 @@ class TestInvalidQueries(DNSTest):
             if s is not None:
                 s.close()
 
+class TestRPCRoundtrip(DNSTest):
+    def get_credentials(self, lp):
+        creds = credentials.Credentials()
+        creds.guess(lp)
+        creds.set_machine_account(lp)
+        creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+        return creds
+
+    def setUp(self):
+        super(TestRPCRoundtrip, self).setUp()
+        self.lp = self.get_loadparm()
+        self.creds = self.get_credentials(self.lp)
+        self.server = os.getenv("SERVER_IP")
+        self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+                                            self.lp, self.creds)
+
+    def tearDown(self):
+        super(TestRPCRoundtrip, self).tearDown()
+
+    def test_update_add_null_padded_txt_record(self):
+        "test adding records works"
+        prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
+
+        prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
+
+        prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
+
+    # Test is incomplete due to strlen against txt records
+    def test_update_add_null_char_txt_record(self):
+        "test adding records works"
+        prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, ['NULL'])
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"NULL"'))
+
+        prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, ['NULL', 'NULL'])
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
+
+    def test_update_add_hex_char_txt_record(self):
+        "test adding records works"
+        prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
+
+    def test_update_add_slash_txt_record(self):
+        "test adding records works"
+        prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
+
+    def test_update_add_two_txt_records(self):
+        "test adding two txt records works"
+        prefix, txt = 'textrec2', ['"This is a test"',
+                                   '"and this is a test, too"']
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
+                             ' "\\"and this is a test, too\\""'))
+
+    def test_update_add_empty_txt_records(self):
+        "test adding two txt records works"
+        prefix, txt = 'emptytextrec', []
+        p = self.make_txt_update(prefix, txt)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.check_query_txt(prefix, txt)
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+                             self.get_dns_domain(),
+                             "%s.%s" % (prefix, self.get_dns_domain()),
+                             dnsp.DNS_TYPE_TXT, ''))
+
 if __name__ == "__main__":
     import unittest
     unittest.main()