CVE-2016-0771: tests/dns: prepare script for further testing
[samba.git] / python / samba / tests / dns.py
index bac8deabddcb44dc90884b86d5be5251d316c4a9..fe0ac3b03ec071690f15da592c8de9c177b991c6 100644 (file)
@@ -20,14 +20,27 @@ import struct
 import random
 from samba import socket
 import samba.ndr as ndr
-import samba.dcerpc.dns as dns
+from samba import credentials, param
 from samba.tests import TestCase
+from samba.dcerpc import dns, dnsp, dnsserver
 
 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
 
+def make_txt_record(records):
+    rdata_txt = dns.txt_record()
+    s_list = dnsp.string_list()
+    s_list.count = len(records)
+    s_list.str = records
+    rdata_txt.txt = s_list
+    return rdata_txt
 
 class DNSTest(TestCase):
 
+    def get_loadparm(self):
+        lp = param.LoadParm()
+        lp.load(os.getenv("SMB_CONF_PATH"))
+        return lp
+
     def errstr(self, errcode):
         "Return a readable error code"
         string_codes = [
@@ -187,6 +200,20 @@ class TestSimpleQueries(DNSTest):
         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
         self.assertEquals(response.ancount, 0)
 
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        questions = []
+
+        name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
+        print "asking for ", q.name
+        questions.append(q)
+
+        self.finish_name_packet(p, questions)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+        self.assertEquals(response.ancount, 0)
+
     def test_two_queries(self):
         "create a query packet containing two query records"
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
@@ -275,6 +302,7 @@ class TestSimpleQueries(DNSTest):
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
         self.assertEquals(response.ancount, 1)
+        self.assertEquals(response.answers[0].rdata.minimum, 3600)
 
 
 class TestDNSUpdates(DNSTest):
@@ -406,8 +434,7 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test"'
+        rdata = make_txt_record(['"This is a test"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -427,7 +454,7 @@ class TestDNSUpdates(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.assertEquals(response.ancount, 1)
-        self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
+        self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
 
     def test_update_add_two_txt_records(self):
         "test adding two txt records works"
@@ -447,8 +474,8 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test" "and this is a test, too"'
+        rdata = make_txt_record(['"This is a test"',
+                                 '"and this is a test, too"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -468,7 +495,8 @@ class TestDNSUpdates(DNSTest):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
         self.assertEquals(response.ancount, 1)
-        self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
+        self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
+        self.assertEquals(response.answers[0].rdata.txt.str[1], '"and this is a test, too"')
 
     def test_delete_record(self):
         "Test if deleting records works"
@@ -492,8 +520,7 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test"'
+        rdata = make_txt_record(['"This is a test"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -529,8 +556,7 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_NONE
         r.ttl = 0
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test"'
+        rdata = make_txt_record(['"This is a test"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -572,8 +598,7 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test"'
+        rdata = make_txt_record(['"This is a test"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -609,8 +634,7 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_NONE
         r.ttl = 0
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test"'
+        rdata = make_txt_record(['"This is a test"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -647,8 +671,7 @@ class TestDNSUpdates(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        rdata = dns.txt_record()
-        rdata.txt = '"This is a test"'
+        rdata = make_txt_record(['"This is a test"'])
         r.rdata = rdata
         updates.append(r)
         p.nscount = len(updates)
@@ -819,6 +842,35 @@ class TestInvalidQueries(DNSTest):
         self.assertEquals(response.answers[0].rdata,
                           os.getenv('SERVER_IP'))
 
+    def test_one_a_reply(self):
+        "send a reply instead of a query"
+
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        questions = []
+
+        name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
+        q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
+        print "asking for ", q.name
+        questions.append(q)
+
+        self.finish_name_packet(p, questions)
+        p.operation |= dns.DNS_FLAG_REPLY
+        s = None
+        try:
+            send_packet = ndr.ndr_pack(p)
+            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+            host=os.getenv('SERVER_IP')
+            s.connect((host, 53))
+            tcp_packet = struct.pack('!H', len(send_packet))
+            tcp_packet += send_packet
+            s.send(tcp_packet, 0)
+            recv_packet = s.recv(0xffff + 2, 0)
+            self.assertEquals(0, len(recv_packet))
+        finally:
+            if s is not None:
+                s.close()
+
+
 if __name__ == "__main__":
     import unittest
     unittest.main()