CVE-2016-0771: tests/dns: Remove dependencies on env variables
authorGarming Sam <garming@catalyst.net.nz>
Fri, 29 Jan 2016 04:28:54 +0000 (17:28 +1300)
committerKarolin Seeger <kseeger@samba.org>
Wed, 24 Feb 2016 10:43:59 +0000 (11:43 +0100)
Now that it is invoked as a normal script, there should be less of them.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/dns.py

index 630a25538e80f146464c9e686c35dfd138e1cdbf..75b5b7f085a2e77d69e10874783640369b2ac003 100644 (file)
@@ -132,9 +132,10 @@ class DNSTest(TestCase):
 
     def get_dns_domain(self):
         "Helper to get dns domain"
-        return os.getenv('REALM', 'example.com').lower()
+        return self.creds.get_realm().lower()
 
-    def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
+    def dns_transaction_udp(self, packet, host=server_ip,
+                            dump=False):
         "send a DNS query and read the reply"
         s = None
         try:
@@ -152,7 +153,8 @@ class DNSTest(TestCase):
             if s is not None:
                 s.close()
 
-    def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
+    def dns_transaction_tcp(self, packet, host=server_ip,
+                            dump=False):
         "send a DNS query and read the reply"
         s = None
         try:
@@ -230,7 +232,7 @@ class TestSimpleQueries(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
         print "asking for ", q.name
         questions.append(q)
@@ -241,14 +243,14 @@ class TestSimpleQueries(DNSTest):
         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
         self.assertEquals(response.ancount, 1)
         self.assertEquals(response.answers[0].rdata,
-                          os.getenv('SERVER_IP'))
+                          self.server_ip)
 
     def test_one_a_query_tcp(self):
         "create a query packet containing one query record via TCP"
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
         print "asking for ", q.name
         questions.append(q)
@@ -259,14 +261,14 @@ class TestSimpleQueries(DNSTest):
         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
         self.assertEquals(response.ancount, 1)
         self.assertEquals(response.answers[0].rdata,
-                          os.getenv('SERVER_IP'))
+                          self.server_ip)
 
     def test_one_mx_query(self):
         "create a query packet causing an empty RCODE_OK answer"
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
         print "asking for ", q.name
         questions.append(q)
@@ -280,7 +282,7 @@ class TestSimpleQueries(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
         print "asking for ", q.name
         questions.append(q)
@@ -296,7 +298,7 @@ class TestSimpleQueries(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
         questions.append(q)
 
@@ -313,7 +315,7 @@ class TestSimpleQueries(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
         print "asking for ", q.name
         questions.append(q)
@@ -330,7 +332,7 @@ class TestSimpleQueries(DNSTest):
         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
         self.assertEquals(response.ancount, num_answers)
         self.assertEquals(response.answers[0].rdata,
-                          os.getenv('SERVER_IP'))
+                          self.server_ip)
         if dc_ipv6 is not None:
             self.assertEquals(response.answers[1].rdata, dc_ipv6)
 
@@ -339,7 +341,7 @@ class TestSimpleQueries(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
         questions.append(q)
 
@@ -389,7 +391,7 @@ class TestDNSUpdates(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
         updates = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
         updates.append(u)
 
@@ -427,7 +429,7 @@ class TestDNSUpdates(DNSTest):
 
         prereqs = []
         r = dns.res_rec()
-        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        r.name = "%s.%s" % (self.server, self.get_dns_domain())
         r.rr_type = dns.DNS_QTYPE_TXT
         r.rr_class = dns.DNS_QCLASS_NONE
         r.ttl = 1
@@ -453,7 +455,7 @@ class TestDNSUpdates(DNSTest):
 
         prereqs = []
         r = dns.res_rec()
-        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        r.name = "%s.%s" % (self.server, self.get_dns_domain())
         r.rr_type = dns.DNS_QTYPE_TXT
         r.rr_class = dns.DNS_QCLASS_ANY
         r.ttl = 0
@@ -758,7 +760,7 @@ class TestComplexQueries(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
         updates.append(r)
         p.nscount = len(updates)
         p.nsrecs = updates
@@ -784,7 +786,7 @@ class TestComplexQueries(DNSTest):
         r.rr_class = dns.DNS_QCLASS_NONE
         r.ttl = 0
         r.length = 0xffff
-        r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
         updates.append(r)
         p.nscount = len(updates)
         p.nsrecs = updates
@@ -809,10 +811,10 @@ class TestComplexQueries(DNSTest):
         self.assertEquals(response.ancount, 2)
         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
         self.assertEquals(response.answers[0].rdata, "%s.%s" %
-                          (os.getenv('SERVER'), self.get_dns_domain()))
+                          (self.server, self.get_dns_domain()))
         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
         self.assertEquals(response.answers[1].rdata,
-                          os.getenv('SERVER_IP'))
+                          self.server_ip)
 
 class TestInvalidQueries(DNSTest):
 
@@ -822,7 +824,7 @@ class TestInvalidQueries(DNSTest):
         s = None
         try:
             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
-            s.connect((os.getenv('SERVER_IP'), 53))
+            s.connect((self.server_ip, 53))
             s.send("", 0)
         finally:
             if s is not None:
@@ -831,7 +833,7 @@ class TestInvalidQueries(DNSTest):
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
-        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+        name = "%s.%s" % (self.server, self.get_dns_domain())
         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
         print "asking for ", q.name
         questions.append(q)
@@ -842,7 +844,7 @@ class TestInvalidQueries(DNSTest):
         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
         self.assertEquals(response.ancount, 1)
         self.assertEquals(response.answers[0].rdata,
-                          os.getenv('SERVER_IP'))
+                          self.server_ip)
 
     def test_one_a_reply(self):
         "send a reply instead of a query"
@@ -861,7 +863,7 @@ class TestInvalidQueries(DNSTest):
         try:
             send_packet = ndr.ndr_pack(p)
             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
-            host=os.getenv('SERVER_IP')
+            host=self.server_ip
             s.connect((host, 53))
             tcp_packet = struct.pack('!H', len(send_packet))
             tcp_packet += send_packet
@@ -873,18 +875,8 @@ class TestInvalidQueries(DNSTest):
                 s.close()
 
 class TestRPCRoundtrip(DNSTest):
-    def get_credentials(self, lp):
-        creds = credentials.Credentials()
-        creds.guess(lp)
-        creds.set_machine_account(lp)
-        creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
-        return creds
-
     def setUp(self):
         super(TestRPCRoundtrip, self).setUp()
-        self.lp = self.get_loadparm()
-        self.creds = self.get_credentials(self.lp)
-        self.server = os.getenv("SERVER_IP")
         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
                                             self.lp, self.creds)