1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
21 from samba import socket
22 import samba.ndr as ndr
23 from samba import credentials, param
24 from samba.tests import TestCase
25 from samba.dcerpc import dns, dnsp, dnsserver
27 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
29 def make_txt_record(records):
30 rdata_txt = dns.txt_record()
31 s_list = dnsp.string_list()
32 s_list.count = len(records)
34 rdata_txt.txt = s_list
37 class DNSTest(TestCase):
39 def errstr(self, errcode):
40 "Return a readable error code"
55 return string_codes[errcode]
58 def assert_dns_rcode_equals(self, packet, rcode):
59 "Helper function to check return code"
60 p_errcode = packet.operation & 0x000F
61 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
62 (self.errstr(rcode), self.errstr(p_errcode)))
64 def assert_dns_opcode_equals(self, packet, opcode):
65 "Helper function to check opcode"
66 p_opcode = packet.operation & 0x7800
67 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
70 def make_name_packet(self, opcode, qid=None):
71 "Helper creating a dns.name_packet"
74 p.id = random.randint(0x0, 0xffff)
79 def finish_name_packet(self, packet, questions):
80 "Helper to finalize a dns.name_packet"
81 packet.qdcount = len(questions)
82 packet.questions = questions
84 def make_name_question(self, name, qtype, qclass):
85 "Helper creating a dns.name_question"
86 q = dns.name_question()
88 q.question_type = qtype
89 q.question_class = qclass
92 def get_dns_domain(self):
93 "Helper to get dns domain"
94 return os.getenv('REALM', 'example.com').lower()
96 def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
97 "send a DNS query and read the reply"
100 send_packet = ndr.ndr_pack(packet)
102 print self.hexdump(send_packet)
103 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
104 s.connect((host, 53))
105 s.send(send_packet, 0)
106 recv_packet = s.recv(2048, 0)
108 print self.hexdump(recv_packet)
109 return ndr.ndr_unpack(dns.name_packet, recv_packet)
114 def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
115 "send a DNS query and read the reply"
118 send_packet = ndr.ndr_pack(packet)
120 print self.hexdump(send_packet)
121 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
122 s.connect((host, 53))
123 tcp_packet = struct.pack('!H', len(send_packet))
124 tcp_packet += send_packet
125 s.send(tcp_packet, 0)
126 recv_packet = s.recv(0xffff + 2, 0)
128 print self.hexdump(recv_packet)
129 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
134 def hexdump(self, src, length=8):
137 s,src = src[:length],src[length:]
138 hexa = ' '.join(["%02X"%ord(x) for x in s])
139 s = s.translate(FILTER)
140 result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
144 class TestSimpleQueries(DNSTest):
146 def test_one_a_query(self):
147 "create a query packet containing one query record"
148 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
151 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
152 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
153 print "asking for ", q.name
156 self.finish_name_packet(p, questions)
157 response = self.dns_transaction_udp(p)
158 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
159 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
160 self.assertEquals(response.ancount, 1)
161 self.assertEquals(response.answers[0].rdata,
162 os.getenv('SERVER_IP'))
164 def test_one_a_query_tcp(self):
165 "create a query packet containing one query record via TCP"
166 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
169 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
170 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
171 print "asking for ", q.name
174 self.finish_name_packet(p, questions)
175 response = self.dns_transaction_tcp(p)
176 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
177 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
178 self.assertEquals(response.ancount, 1)
179 self.assertEquals(response.answers[0].rdata,
180 os.getenv('SERVER_IP'))
182 def test_one_mx_query(self):
183 "create a query packet causing an empty RCODE_OK answer"
184 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
187 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
188 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
189 print "asking for ", q.name
192 self.finish_name_packet(p, questions)
193 response = self.dns_transaction_udp(p)
194 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
195 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
196 self.assertEquals(response.ancount, 0)
198 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
201 name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
202 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
203 print "asking for ", q.name
206 self.finish_name_packet(p, questions)
207 response = self.dns_transaction_udp(p)
208 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
209 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
210 self.assertEquals(response.ancount, 0)
212 def test_two_queries(self):
213 "create a query packet containing two query records"
214 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
217 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
218 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
221 name = "%s.%s" % ('bogusname', self.get_dns_domain())
222 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
225 self.finish_name_packet(p, questions)
226 response = self.dns_transaction_udp(p)
227 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
229 def test_qtype_all_query(self):
230 "create a QTYPE_ALL query"
231 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
234 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
235 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
236 print "asking for ", q.name
239 self.finish_name_packet(p, questions)
240 response = self.dns_transaction_udp(p)
243 dc_ipv6 = os.getenv('SERVER_IPV6')
244 if dc_ipv6 is not None:
247 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
248 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
249 self.assertEquals(response.ancount, num_answers)
250 self.assertEquals(response.answers[0].rdata,
251 os.getenv('SERVER_IP'))
252 if dc_ipv6 is not None:
253 self.assertEquals(response.answers[1].rdata, dc_ipv6)
255 def test_qclass_none_query(self):
256 "create a QCLASS_NONE query"
257 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
260 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
261 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
264 self.finish_name_packet(p, questions)
265 response = self.dns_transaction_udp(p)
266 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
268 # Only returns an authority section entry in BIND and Win DNS
269 # FIXME: Enable one Samba implements this feature
270 # def test_soa_hostname_query(self):
271 # "create a SOA query for a hostname"
272 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
275 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
276 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
277 # questions.append(q)
279 # self.finish_name_packet(p, questions)
280 # response = self.dns_transaction_udp(p)
281 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
282 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
283 # # We don't get SOA records for single hosts
284 # self.assertEquals(response.ancount, 0)
286 def test_soa_domain_query(self):
287 "create a SOA query for a domain"
288 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
291 name = self.get_dns_domain()
292 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
295 self.finish_name_packet(p, questions)
296 response = self.dns_transaction_udp(p)
297 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
298 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
299 self.assertEquals(response.ancount, 1)
300 self.assertEquals(response.answers[0].rdata.minimum, 3600)
303 class TestDNSUpdates(DNSTest):
305 def test_two_updates(self):
306 "create two update requests"
307 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
310 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
311 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
314 name = self.get_dns_domain()
315 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
318 self.finish_name_packet(p, updates)
319 response = self.dns_transaction_udp(p)
320 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
322 def test_update_wrong_qclass(self):
323 "create update with DNS_QCLASS_NONE"
324 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
327 name = self.get_dns_domain()
328 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
331 self.finish_name_packet(p, updates)
332 response = self.dns_transaction_udp(p)
333 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
335 def test_update_prereq_with_non_null_ttl(self):
336 "test update with a non-null TTL"
337 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
340 name = self.get_dns_domain()
342 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
344 self.finish_name_packet(p, updates)
348 r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
349 r.rr_type = dns.DNS_QTYPE_TXT
350 r.rr_class = dns.DNS_QCLASS_NONE
355 p.ancount = len(prereqs)
358 response = self.dns_transaction_udp(p)
359 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
361 # I'd love to test this one, but it segfaults. :)
362 # def test_update_prereq_with_non_null_length(self):
363 # "test update with a non-null length"
364 # p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
367 # name = self.get_dns_domain()
369 # u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
371 # self.finish_name_packet(p, updates)
375 # r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
376 # r.rr_type = dns.DNS_QTYPE_TXT
377 # r.rr_class = dns.DNS_QCLASS_ANY
382 # p.ancount = len(prereqs)
383 # p.answers = prereqs
385 # response = self.dns_transaction_udp(p)
386 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
388 def test_update_prereq_nonexisting_name(self):
389 "test update with a nonexisting name"
390 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
393 name = self.get_dns_domain()
395 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
397 self.finish_name_packet(p, updates)
401 r.name = "idontexist.%s" % self.get_dns_domain()
402 r.rr_type = dns.DNS_QTYPE_TXT
403 r.rr_class = dns.DNS_QCLASS_ANY
408 p.ancount = len(prereqs)
411 response = self.dns_transaction_udp(p)
412 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
414 def test_update_add_txt_record(self):
415 "test adding records works"
416 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
419 name = self.get_dns_domain()
421 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
423 self.finish_name_packet(p, updates)
427 r.name = "textrec.%s" % self.get_dns_domain()
428 r.rr_type = dns.DNS_QTYPE_TXT
429 r.rr_class = dns.DNS_QCLASS_IN
432 rdata = make_txt_record(['"This is a test"'])
435 p.nscount = len(updates)
438 response = self.dns_transaction_udp(p)
439 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
441 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
444 name = "textrec.%s" % self.get_dns_domain()
445 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
448 self.finish_name_packet(p, questions)
449 response = self.dns_transaction_udp(p)
450 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
451 self.assertEquals(response.ancount, 1)
452 self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
454 def test_update_add_two_txt_records(self):
455 "test adding two txt records works"
456 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
459 name = self.get_dns_domain()
461 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
463 self.finish_name_packet(p, updates)
467 r.name = "textrec2.%s" % self.get_dns_domain()
468 r.rr_type = dns.DNS_QTYPE_TXT
469 r.rr_class = dns.DNS_QCLASS_IN
472 rdata = make_txt_record(['"This is a test"',
473 '"and this is a test, too"'])
476 p.nscount = len(updates)
479 response = self.dns_transaction_udp(p)
480 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
482 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
485 name = "textrec2.%s" % self.get_dns_domain()
486 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
489 self.finish_name_packet(p, questions)
490 response = self.dns_transaction_udp(p)
491 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
492 self.assertEquals(response.ancount, 1)
493 self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
494 self.assertEquals(response.answers[0].rdata.txt.str[1], '"and this is a test, too"')
496 def test_delete_record(self):
497 "Test if deleting records works"
499 NAME = "deleterec.%s" % self.get_dns_domain()
501 # First, create a record to make sure we have a record to delete.
502 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
505 name = self.get_dns_domain()
507 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
509 self.finish_name_packet(p, updates)
514 r.rr_type = dns.DNS_QTYPE_TXT
515 r.rr_class = dns.DNS_QCLASS_IN
518 rdata = make_txt_record(['"This is a test"'])
521 p.nscount = len(updates)
524 response = self.dns_transaction_udp(p)
525 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
527 # Now check the record is around
528 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
530 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
533 self.finish_name_packet(p, questions)
534 response = self.dns_transaction_udp(p)
535 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
537 # Now delete the record
538 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
541 name = self.get_dns_domain()
543 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
545 self.finish_name_packet(p, updates)
550 r.rr_type = dns.DNS_QTYPE_TXT
551 r.rr_class = dns.DNS_QCLASS_NONE
554 rdata = make_txt_record(['"This is a test"'])
557 p.nscount = len(updates)
560 response = self.dns_transaction_udp(p)
561 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
563 # And finally check it's gone
564 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
567 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
570 self.finish_name_packet(p, questions)
571 response = self.dns_transaction_udp(p)
572 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
574 def test_readd_record(self):
575 "Test if adding, deleting and then readding a records works"
577 NAME = "readdrec.%s" % self.get_dns_domain()
580 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
583 name = self.get_dns_domain()
585 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
587 self.finish_name_packet(p, updates)
592 r.rr_type = dns.DNS_QTYPE_TXT
593 r.rr_class = dns.DNS_QCLASS_IN
596 rdata = make_txt_record(['"This is a test"'])
599 p.nscount = len(updates)
602 response = self.dns_transaction_udp(p)
603 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
605 # Now check the record is around
606 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
608 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
611 self.finish_name_packet(p, questions)
612 response = self.dns_transaction_udp(p)
613 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
615 # Now delete the record
616 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
619 name = self.get_dns_domain()
621 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
623 self.finish_name_packet(p, updates)
628 r.rr_type = dns.DNS_QTYPE_TXT
629 r.rr_class = dns.DNS_QCLASS_NONE
632 rdata = make_txt_record(['"This is a test"'])
635 p.nscount = len(updates)
638 response = self.dns_transaction_udp(p)
639 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
642 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
645 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
648 self.finish_name_packet(p, questions)
649 response = self.dns_transaction_udp(p)
650 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
652 # recreate the record
653 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
656 name = self.get_dns_domain()
658 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
660 self.finish_name_packet(p, updates)
665 r.rr_type = dns.DNS_QTYPE_TXT
666 r.rr_class = dns.DNS_QCLASS_IN
669 rdata = make_txt_record(['"This is a test"'])
672 p.nscount = len(updates)
675 response = self.dns_transaction_udp(p)
676 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
678 # Now check the record is around
679 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
681 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
684 self.finish_name_packet(p, questions)
685 response = self.dns_transaction_udp(p)
686 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
688 def test_update_add_mx_record(self):
689 "test adding MX records works"
690 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
693 name = self.get_dns_domain()
695 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
697 self.finish_name_packet(p, updates)
701 r.name = "%s" % self.get_dns_domain()
702 r.rr_type = dns.DNS_QTYPE_MX
703 r.rr_class = dns.DNS_QCLASS_IN
706 rdata = dns.mx_record()
707 rdata.preference = 10
708 rdata.exchange = 'mail.%s' % self.get_dns_domain()
711 p.nscount = len(updates)
714 response = self.dns_transaction_udp(p)
715 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
717 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
720 name = "%s" % self.get_dns_domain()
721 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
724 self.finish_name_packet(p, questions)
725 response = self.dns_transaction_udp(p)
726 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
727 self.assertEqual(response.ancount, 1)
728 ans = response.answers[0]
729 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
730 self.assertEqual(ans.rdata.preference, 10)
731 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
734 class TestComplexQueries(DNSTest):
737 super(TestComplexQueries, self).setUp()
738 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
741 name = self.get_dns_domain()
743 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
745 self.finish_name_packet(p, updates)
749 r.name = "cname_test.%s" % self.get_dns_domain()
750 r.rr_type = dns.DNS_QTYPE_CNAME
751 r.rr_class = dns.DNS_QCLASS_IN
754 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
756 p.nscount = len(updates)
759 response = self.dns_transaction_udp(p)
760 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
763 super(TestComplexQueries, self).tearDown()
764 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
767 name = self.get_dns_domain()
769 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
771 self.finish_name_packet(p, updates)
775 r.name = "cname_test.%s" % self.get_dns_domain()
776 r.rr_type = dns.DNS_QTYPE_CNAME
777 r.rr_class = dns.DNS_QCLASS_NONE
780 r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
782 p.nscount = len(updates)
785 response = self.dns_transaction_udp(p)
786 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
788 def test_one_a_query(self):
789 "create a query packet containing one query record"
790 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
793 name = "cname_test.%s" % self.get_dns_domain()
794 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
795 print "asking for ", q.name
798 self.finish_name_packet(p, questions)
799 response = self.dns_transaction_udp(p)
800 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
801 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
802 self.assertEquals(response.ancount, 2)
803 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
804 self.assertEquals(response.answers[0].rdata, "%s.%s" %
805 (os.getenv('SERVER'), self.get_dns_domain()))
806 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
807 self.assertEquals(response.answers[1].rdata,
808 os.getenv('SERVER_IP'))
810 class TestInvalidQueries(DNSTest):
812 def test_one_a_query(self):
813 "send 0 bytes follows by create a query packet containing one query record"
817 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
818 s.connect((os.getenv('SERVER_IP'), 53))
824 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
827 name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
828 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
829 print "asking for ", q.name
832 self.finish_name_packet(p, questions)
833 response = self.dns_transaction_udp(p)
834 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
835 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
836 self.assertEquals(response.ancount, 1)
837 self.assertEquals(response.answers[0].rdata,
838 os.getenv('SERVER_IP'))
840 def test_one_a_reply(self):
841 "send a reply instead of a query"
843 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
846 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
847 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
848 print "asking for ", q.name
851 self.finish_name_packet(p, questions)
852 p.operation |= dns.DNS_FLAG_REPLY
855 send_packet = ndr.ndr_pack(p)
856 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
857 host=os.getenv('SERVER_IP')
858 s.connect((host, 53))
859 tcp_packet = struct.pack('!H', len(send_packet))
860 tcp_packet += send_packet
861 s.send(tcp_packet, 0)
862 recv_packet = s.recv(0xffff + 2, 0)
863 self.assertEquals(0, len(recv_packet))
869 if __name__ == "__main__":