CVE-2016-0771: tests/dns: RPC => DNS roundtrip test
authorGarming Sam <garming@catalyst.net.nz>
Wed, 27 Jan 2016 23:54:58 +0000 (12:54 +1300)
committerKarolin Seeger <kseeger@samba.org>
Wed, 24 Feb 2016 10:43:59 +0000 (11:43 +0100)
Make sure that TXT entries stored via RPC come out the same in DNS.

This has one caveat in that adding over RPC in Windows eats slashes,
and so fails there.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/dns.py
source4/selftest/tests.py

index 784485e9b3867e550cb9c7cd0f1f28d4230148c0..b16ea1d46253c3b5b57cb8ec5bbfaceab69cec78 100644 (file)
@@ -23,6 +23,7 @@ import samba.ndr as ndr
 from samba import credentials, param
 from samba.tests import TestCase
 from samba.dcerpc import dns, dnsp, dnsserver
+from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
 
 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
 
@@ -849,12 +850,31 @@ class TestRPCRoundtrip(DNSTest):
         self.lp = self.get_loadparm()
         self.creds = self.get_credentials(self.lp)
         self.server = os.getenv("SERVER_IP")
-        self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
+        self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
                                             self.lp, self.creds)
 
     def tearDown(self):
         super(TestRPCRoundtrip, self).tearDown()
 
+    def test_update_add_txt_rpc_to_dns(self):
+        prefix, txt = 'rpctextrec', ['"This is a test"']
+
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
     def test_update_add_null_padded_txt_record(self):
         "test adding records works"
         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
@@ -862,7 +882,7 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
@@ -872,7 +892,7 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
@@ -882,11 +902,66 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
 
+    def test_update_add_padding_rpc_to_dns(self):
+        prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
+        prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
+        prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
     # Test is incomplete due to strlen against txt records
     def test_update_add_null_char_txt_record(self):
         "test adding records works"
@@ -895,7 +970,7 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, ['NULL'])
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"NULL"'))
@@ -905,11 +980,30 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, ['NULL', 'NULL'])
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
 
+    def test_update_add_null_char_rpc_to_dns(self):
+        prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, ['NULL'])
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
     def test_update_add_hex_char_txt_record(self):
         "test adding records works"
         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
@@ -917,11 +1011,30 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
 
+    def test_update_add_hex_rpc_to_dns(self):
+        prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
     def test_update_add_slash_txt_record(self):
         "test adding records works"
         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
@@ -929,11 +1042,33 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
 
+    # This test fails against Windows as it eliminates slashes in RPC
+    # One typical use for a slash is in records like 'var=value' to
+    # escape '=' characters.
+    def test_update_add_slash_rpc_to_dns(self):
+        prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
     def test_update_add_two_txt_records(self):
         "test adding two txt records works"
         prefix, txt = 'textrec2', ['"This is a test"',
@@ -942,12 +1077,34 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
                              ' "\\"and this is a test, too\\""'))
 
+    def test_update_add_two_rpc_to_dns(self):
+        prefix, txt = 'textrec2', ['"This is a test"',
+                                   '"and this is a test, too"']
+        prefix = 'rpc' + prefix
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
+                                '"\\"This is a test\\""' +
+                                ' "\\"and this is a test, too\\""')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
     def test_update_add_empty_txt_records(self):
         "test adding two txt records works"
         prefix, txt = 'emptytextrec', []
@@ -955,11 +1112,30 @@ class TestRPCRoundtrip(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.check_query_txt(prefix, txt)
-        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
+        self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
                              self.get_dns_domain(),
                              "%s.%s" % (prefix, self.get_dns_domain()),
                              dnsp.DNS_TYPE_TXT, ''))
 
+    def test_update_add_empty_rpc_to_dns(self):
+        prefix, txt = 'rpcemptytextrec', []
+
+        name = "%s.%s" % (prefix, self.get_dns_domain())
+
+        rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
+        add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        add_rec_buf.rec = rec
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                     0, self.server_ip, self.get_dns_domain(),
+                                     name, add_rec_buf, None)
+
+            self.check_query_txt(prefix, txt)
+        finally:
+            self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                              0, self.server_ip, self.get_dns_domain(),
+                                              name, None, add_rec_buf)
+
 if __name__ == "__main__":
     import unittest
     unittest.main()
index 08b59a17de545665ab23d77c2abb6c7467372ba0..933f30b0966580b0a10361aa1c7a937719607aeb 100755 (executable)
@@ -284,6 +284,7 @@ for f in sorted(os.listdir(os.path.join(samba4srcdir, "../pidl/tests"))):
 
 # DNS tests
 planpythontestsuite("fl2003dc", "samba.tests.dns")
+
 for t in smbtorture4_testsuites("dns_internal."):
     plansmbtorture4testsuite(t, "dc:local", '//$SERVER/whavever')