ed78d56dd3034e3b1e73628e408de1d6429cbfe5
[mat/samba.git] / source4 / scripting / python / samba / tests / dns.py
1 #!/usr/bin/env python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin  <kai@samba.org> 2011
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
28
29 class DNSTest(TestCase):
30
31     def errstr(self, errcode):
32         "Return a readable error code"
33         string_codes = [
34             "OK",
35             "FORMERR",
36             "SERVFAIL",
37             "NXDOMAIN",
38             "NOTIMP",
39             "REFUSED",
40             "YXDOMAIN",
41             "YXRRSET",
42             "NXRRSET",
43             "NOTAUTH",
44             "NOTZONE",
45         ]
46
47         return string_codes[errcode]
48
49
50     def assert_dns_rcode_equals(self, packet, rcode):
51         "Helper function to check return code"
52         p_errcode = packet.operation & 0x000F
53         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
54                             (self.errstr(rcode), self.errstr(p_errcode)))
55
56     def assert_dns_opcode_equals(self, packet, opcode):
57         "Helper function to check opcode"
58         p_opcode = packet.operation & 0x7800
59         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
60                             (opcode, p_opcode))
61
62     def make_name_packet(self, opcode, qid=None):
63         "Helper creating a dns.name_packet"
64         p = dns.name_packet()
65         if qid is None:
66             p.id = random.randint(0x0, 0xffff)
67         p.operation = opcode
68         p.questions = []
69         return p
70
71     def finish_name_packet(self, packet, questions):
72         "Helper to finalize a dns.name_packet"
73         packet.qdcount = len(questions)
74         packet.questions = questions
75
76     def make_name_question(self, name, qtype, qclass):
77         "Helper creating a dns.name_question"
78         q = dns.name_question()
79         q.name = name
80         q.question_type = qtype
81         q.question_class = qclass
82         return q
83
84     def get_dns_domain(self):
85         "Helper to get dns domain"
86         return os.getenv('REALM', 'example.com').lower()
87
88     def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
89         "send a DNS query and read the reply"
90         s = None
91         try:
92             send_packet = ndr.ndr_pack(packet)
93             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
94             s.connect((host, 53))
95             s.send(send_packet, 0)
96             recv_packet = s.recv(2048, 0)
97             return ndr.ndr_unpack(dns.name_packet, recv_packet)
98         finally:
99             if s is not None:
100                 s.close()
101
102     def test_one_a_query(self):
103         "create a query packet containing one query record"
104         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
105         questions = []
106
107         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
108         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
109         print "asking for ", q.name
110         questions.append(q)
111
112         self.finish_name_packet(p, questions)
113         response = self.dns_transaction_udp(p)
114         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
115         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
116         self.assertEquals(response.ancount, 1)
117         self.assertEquals(response.answers[0].rdata,
118                           os.getenv('DC_SERVER_IP'))
119
120     def test_two_queries(self):
121         "create a query packet containing two query records"
122         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
123         questions = []
124
125         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
126         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
127         questions.append(q)
128
129         name = "%s.%s" % ('bogusname', self.get_dns_domain())
130         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
131         questions.append(q)
132
133         self.finish_name_packet(p, questions)
134         response = self.dns_transaction_udp(p)
135         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
136
137     def test_qtype_all_query(self):
138         "create a QTYPE_ALL query"
139         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
140         questions = []
141
142         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
143         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
144         print "asking for ", q.name
145         questions.append(q)
146
147         self.finish_name_packet(p, questions)
148         response = self.dns_transaction_udp(p)
149
150         num_answers = 1
151         dc_ipv6 = os.getenv('DC_SERVER_IPV6')
152         if dc_ipv6 is not None:
153             num_answers += 1
154
155         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
156         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
157         self.assertEquals(response.ancount, num_answers)
158         self.assertEquals(response.answers[0].rdata,
159                           os.getenv('DC_SERVER_IP'))
160         if dc_ipv6 is not None:
161             self.assertEquals(response.answers[1].rdata, dc_ipv6)
162
163     def test_qclass_none_query(self):
164         "create a QCLASS_NONE query"
165         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
166         questions = []
167
168         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
169         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
170         questions.append(q)
171
172         self.finish_name_packet(p, questions)
173         response = self.dns_transaction_udp(p)
174         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
175
176 # Only returns an authority section entry in BIND and Win DNS
177 # FIXME: Enable one Samba implements this feature
178 #    def test_soa_hostname_query(self):
179 #        "create a SOA query for a hostname"
180 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
181 #        questions = []
182 #
183 #        name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
184 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
185 #        questions.append(q)
186 #
187 #        self.finish_name_packet(p, questions)
188 #        response = self.dns_transaction_udp(p)
189 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191 #        # We don't get SOA records for single hosts
192 #        self.assertEquals(response.ancount, 0)
193
194     def test_soa_domain_query(self):
195         "create a SOA query for a domain"
196         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
197         questions = []
198
199         name = self.get_dns_domain()
200         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
201         questions.append(q)
202
203         self.finish_name_packet(p, questions)
204         response = self.dns_transaction_udp(p)
205         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
206         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
207         self.assertEquals(response.ancount, 1)
208
209     def test_two_updates(self):
210         "create two update requests"
211         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
212         updates = []
213
214         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
215         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
216         updates.append(u)
217
218         name = self.get_dns_domain()
219         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
220         updates.append(u)
221
222         self.finish_name_packet(p, updates)
223         response = self.dns_transaction_udp(p)
224         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
225
226     def test_update_wrong_qclass(self):
227         "create update with DNS_QCLASS_NONE"
228         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
229         updates = []
230
231         name = self.get_dns_domain()
232         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
233         updates.append(u)
234
235         self.finish_name_packet(p, updates)
236         response = self.dns_transaction_udp(p)
237         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
238
239 if __name__ == "__main__":
240     import unittest
241     unittest.main()