s4 dns: Check more of the returned values for the A query
[metze/samba/wip.git] / source4 / scripting / python / samba / tests / dns.py
1 #!/usr/bin/env python
2
3 # Unix SMB/CIFS implementation.
4 # Copyright (C) Kai Blin  <kai@samba.org> 2011
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 import os
21 import sys
22 import struct
23 import random
24 from samba import socket
25 import samba.ndr as ndr
26 import samba.dcerpc.dns as dns
27 from samba.tests import TestCase
28
29 class DNSTest(TestCase):
30
31     def assert_dns_rcode_equals(self, packet, rcode):
32         "Helper function to check return code"
33         p_errcode = packet.operation & 0x000F
34         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" % \
35                             (rcode, p_errcode))
36
37     def assert_dns_opcode_equals(self, packet, opcode):
38         "Helper function to check opcode"
39         p_opcode = packet.operation & 0x7800
40         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" % \
41                             (opcode, p_opcode))
42
43     def make_name_packet(self, opcode, qid=None):
44         "Helper creating a dns.name_packet"
45         p = dns.name_packet()
46         if qid is None:
47             p.id = random.randint(0x0, 0xffff)
48         p.operation = opcode
49         p.questions = []
50         return p
51
52     def finish_name_packet(self, packet, questions):
53         "Helper to finalize a dns.name_packet"
54         packet.qdcount = len(questions)
55         packet.questions = questions
56
57     def make_name_question(self, name, qtype, qclass):
58         "Helper creating a dns.name_question"
59         q = dns.name_question()
60         q.name = name
61         q.question_type = qtype
62         q.question_class = qclass
63         return q
64
65     def get_dns_domain(self):
66         "Helper to get dns domain"
67         return os.getenv('REALM', 'example.com').lower()
68
69     def dns_transaction_udp(self, packet, host=os.getenv('DC_SERVER_IP')):
70         "send a DNS query and read the reply"
71         s = None
72         try:
73             send_packet = ndr.ndr_pack(packet)
74             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
75             s.connect((host, 53))
76             s.send(send_packet, 0)
77             recv_packet = s.recv(2048, 0)
78             return ndr.ndr_unpack(dns.name_packet, recv_packet)
79         finally:
80             if s is not None:
81                 s.close()
82
83     def test_one_a_query(self):
84         "create a query packet containing one query record"
85         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
86         questions = []
87
88         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
89         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
90         print "asking for ", q.name
91         questions.append(q)
92
93         self.finish_name_packet(p, questions)
94         response = self.dns_transaction_udp(p)
95         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
96         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
97         self.assertEquals(response.ancount, 1)
98         self.assertEquals(response.answers[0].rdata,
99                           os.getenv('DC_SERVER_IP'))
100
101     def test_two_queries(self):
102         "create a query packet containing two query records"
103         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
104         questions = []
105
106         name = "%s.%s" % (os.getenv('DC_SERVER'), self.get_dns_domain())
107         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
108         questions.append(q)
109
110         name = "%s.%s" % ('bogusname', self.get_dns_domain())
111         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
112         questions.append(q)
113
114         self.finish_name_packet(p, questions)
115         response = self.dns_transaction_udp(p)
116         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
117
118
119 if __name__ == "__main__":
120     import unittest
121     unittest.main()