1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 sys.path.insert(0, "bin/python")
25 samba.ensure_external_module("testtools", "testtools")
26 samba.ensure_external_module("subunit", "subunit/python")
27 from subunit.run import SubunitTestRunner
30 from samba import socket
31 import samba.ndr as ndr
32 from samba import credentials, param
33 from samba.tests import TestCase
34 from samba.dcerpc import dns, dnsp, dnsserver
35 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
36 import samba.getopt as options
39 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
40 sambaopts = options.SambaOptions(parser)
41 parser.add_option_group(sambaopts)
43 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
45 # use command line creds if available
46 credopts = options.CredentialsOptions(parser)
47 parser.add_option_group(credopts)
49 opts, args = parser.parse_args()
51 lp = sambaopts.get_loadparm()
52 creds = credopts.get_credentials(lp)
60 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
62 def make_txt_record(records):
63 rdata_txt = dns.txt_record()
64 s_list = dnsp.string_list()
65 s_list.count = len(records)
67 rdata_txt.txt = s_list
70 class DNSTest(TestCase):
73 global server, server_ip, lp, creds
74 super(DNSTest, self).setUp()
75 self.server = server_name
76 self.server_ip = server_ip
80 def errstr(self, errcode):
81 "Return a readable error code"
96 return string_codes[errcode]
99 def assert_dns_rcode_equals(self, packet, rcode):
100 "Helper function to check return code"
101 p_errcode = packet.operation & 0x000F
102 self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
103 (self.errstr(rcode), self.errstr(p_errcode)))
105 def assert_dns_opcode_equals(self, packet, opcode):
106 "Helper function to check opcode"
107 p_opcode = packet.operation & 0x7800
108 self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
111 def make_name_packet(self, opcode, qid=None):
112 "Helper creating a dns.name_packet"
113 p = dns.name_packet()
115 p.id = random.randint(0x0, 0xffff)
120 def finish_name_packet(self, packet, questions):
121 "Helper to finalize a dns.name_packet"
122 packet.qdcount = len(questions)
123 packet.questions = questions
125 def make_name_question(self, name, qtype, qclass):
126 "Helper creating a dns.name_question"
127 q = dns.name_question()
129 q.question_type = qtype
130 q.question_class = qclass
133 def get_dns_domain(self):
134 "Helper to get dns domain"
135 return self.creds.get_realm().lower()
137 def dns_transaction_udp(self, packet, host=server_ip,
139 "send a DNS query and read the reply"
142 send_packet = ndr.ndr_pack(packet)
144 print self.hexdump(send_packet)
145 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
146 s.connect((host, 53))
147 s.send(send_packet, 0)
148 recv_packet = s.recv(2048, 0)
150 print self.hexdump(recv_packet)
151 return ndr.ndr_unpack(dns.name_packet, recv_packet)
156 def dns_transaction_tcp(self, packet, host=server_ip,
158 "send a DNS query and read the reply"
161 send_packet = ndr.ndr_pack(packet)
163 print self.hexdump(send_packet)
164 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
165 s.connect((host, 53))
166 tcp_packet = struct.pack('!H', len(send_packet))
167 tcp_packet += send_packet
168 s.send(tcp_packet, 0)
169 recv_packet = s.recv(0xffff + 2, 0)
171 print self.hexdump(recv_packet)
172 return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
177 def hexdump(self, src, length=8):
180 s,src = src[:length],src[length:]
181 hexa = ' '.join(["%02X"%ord(x) for x in s])
182 s = s.translate(FILTER)
183 result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
187 def make_txt_update(self, prefix, txt_array):
188 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
191 name = self.get_dns_domain()
192 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
194 self.finish_name_packet(p, updates)
198 r.name = "%s.%s" % (prefix, self.get_dns_domain())
199 r.rr_type = dns.DNS_QTYPE_TXT
200 r.rr_class = dns.DNS_QCLASS_IN
203 rdata = make_txt_record(txt_array)
206 p.nscount = len(updates)
211 def check_query_txt(self, prefix, txt_array):
212 name = "%s.%s" % (prefix, self.get_dns_domain())
213 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
216 q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
219 self.finish_name_packet(p, questions)
220 response = self.dns_transaction_udp(p)
221 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
222 self.assertEquals(response.ancount, 1)
223 self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
225 def assertIsNotNone(self, item):
226 self.assertTrue(item is not None)
228 class TestSimpleQueries(DNSTest):
230 def test_one_a_query(self):
231 "create a query packet containing one query record"
232 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
235 name = "%s.%s" % (self.server, self.get_dns_domain())
236 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
237 print "asking for ", q.name
240 self.finish_name_packet(p, questions)
241 response = self.dns_transaction_udp(p)
242 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
243 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
244 self.assertEquals(response.ancount, 1)
245 self.assertEquals(response.answers[0].rdata,
248 def test_one_a_query_tcp(self):
249 "create a query packet containing one query record via TCP"
250 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
253 name = "%s.%s" % (self.server, self.get_dns_domain())
254 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
255 print "asking for ", q.name
258 self.finish_name_packet(p, questions)
259 response = self.dns_transaction_tcp(p)
260 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262 self.assertEquals(response.ancount, 1)
263 self.assertEquals(response.answers[0].rdata,
266 def test_one_mx_query(self):
267 "create a query packet causing an empty RCODE_OK answer"
268 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
271 name = "%s.%s" % (self.server, self.get_dns_domain())
272 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
273 print "asking for ", q.name
276 self.finish_name_packet(p, questions)
277 response = self.dns_transaction_udp(p)
278 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
279 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
280 self.assertEquals(response.ancount, 0)
282 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
285 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
286 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
287 print "asking for ", q.name
290 self.finish_name_packet(p, questions)
291 response = self.dns_transaction_udp(p)
292 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
293 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
294 self.assertEquals(response.ancount, 0)
296 def test_two_queries(self):
297 "create a query packet containing two query records"
298 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
301 name = "%s.%s" % (self.server, self.get_dns_domain())
302 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
305 name = "%s.%s" % ('bogusname', self.get_dns_domain())
306 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
309 self.finish_name_packet(p, questions)
310 response = self.dns_transaction_udp(p)
311 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
313 def test_qtype_all_query(self):
314 "create a QTYPE_ALL query"
315 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
318 name = "%s.%s" % (self.server, self.get_dns_domain())
319 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
320 print "asking for ", q.name
323 self.finish_name_packet(p, questions)
324 response = self.dns_transaction_udp(p)
327 dc_ipv6 = os.getenv('SERVER_IPV6')
328 if dc_ipv6 is not None:
331 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
332 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
333 self.assertEquals(response.ancount, num_answers)
334 self.assertEquals(response.answers[0].rdata,
336 if dc_ipv6 is not None:
337 self.assertEquals(response.answers[1].rdata, dc_ipv6)
339 def test_qclass_none_query(self):
340 "create a QCLASS_NONE query"
341 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
344 name = "%s.%s" % (self.server, self.get_dns_domain())
345 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
348 self.finish_name_packet(p, questions)
349 response = self.dns_transaction_udp(p)
350 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
352 # Only returns an authority section entry in BIND and Win DNS
353 # FIXME: Enable one Samba implements this feature
354 # def test_soa_hostname_query(self):
355 # "create a SOA query for a hostname"
356 # p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
359 # name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
360 # q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
361 # questions.append(q)
363 # self.finish_name_packet(p, questions)
364 # response = self.dns_transaction_udp(p)
365 # self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
366 # self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
367 # # We don't get SOA records for single hosts
368 # self.assertEquals(response.ancount, 0)
370 def test_soa_domain_query(self):
371 "create a SOA query for a domain"
372 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
375 name = self.get_dns_domain()
376 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
379 self.finish_name_packet(p, questions)
380 response = self.dns_transaction_udp(p)
381 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
382 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
383 self.assertEquals(response.ancount, 1)
384 self.assertEquals(response.answers[0].rdata.minimum, 3600)
387 class TestDNSUpdates(DNSTest):
389 def test_two_updates(self):
390 "create two update requests"
391 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
394 name = "%s.%s" % (self.server, self.get_dns_domain())
395 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
398 name = self.get_dns_domain()
399 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
402 self.finish_name_packet(p, updates)
403 response = self.dns_transaction_udp(p)
404 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
406 def test_update_wrong_qclass(self):
407 "create update with DNS_QCLASS_NONE"
408 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
411 name = self.get_dns_domain()
412 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
415 self.finish_name_packet(p, updates)
416 response = self.dns_transaction_udp(p)
417 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
419 def test_update_prereq_with_non_null_ttl(self):
420 "test update with a non-null TTL"
421 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
424 name = self.get_dns_domain()
426 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
428 self.finish_name_packet(p, updates)
432 r.name = "%s.%s" % (self.server, self.get_dns_domain())
433 r.rr_type = dns.DNS_QTYPE_TXT
434 r.rr_class = dns.DNS_QCLASS_NONE
439 p.ancount = len(prereqs)
442 response = self.dns_transaction_udp(p)
443 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
445 def test_update_prereq_with_non_null_length(self):
446 "test update with a non-null length"
447 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
450 name = self.get_dns_domain()
452 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
454 self.finish_name_packet(p, updates)
458 r.name = "%s.%s" % (self.server, self.get_dns_domain())
459 r.rr_type = dns.DNS_QTYPE_TXT
460 r.rr_class = dns.DNS_QCLASS_ANY
465 p.ancount = len(prereqs)
468 response = self.dns_transaction_udp(p)
469 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
471 def test_update_prereq_nonexisting_name(self):
472 "test update with a nonexisting name"
473 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
476 name = self.get_dns_domain()
478 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
480 self.finish_name_packet(p, updates)
484 r.name = "idontexist.%s" % self.get_dns_domain()
485 r.rr_type = dns.DNS_QTYPE_TXT
486 r.rr_class = dns.DNS_QCLASS_ANY
491 p.ancount = len(prereqs)
494 response = self.dns_transaction_udp(p)
495 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
497 def test_update_add_txt_record(self):
498 "test adding records works"
499 prefix, txt = 'textrec', ['"This is a test"']
500 p = self.make_txt_update(prefix, txt)
501 response = self.dns_transaction_udp(p)
502 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
503 self.check_query_txt(prefix, txt)
505 def test_delete_record(self):
506 "Test if deleting records works"
508 NAME = "deleterec.%s" % self.get_dns_domain()
510 # First, create a record to make sure we have a record to delete.
511 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
514 name = self.get_dns_domain()
516 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
518 self.finish_name_packet(p, updates)
523 r.rr_type = dns.DNS_QTYPE_TXT
524 r.rr_class = dns.DNS_QCLASS_IN
527 rdata = make_txt_record(['"This is a test"'])
530 p.nscount = len(updates)
533 response = self.dns_transaction_udp(p)
534 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
536 # Now check the record is around
537 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
539 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
542 self.finish_name_packet(p, questions)
543 response = self.dns_transaction_udp(p)
544 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
546 # Now delete the record
547 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
550 name = self.get_dns_domain()
552 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
554 self.finish_name_packet(p, updates)
559 r.rr_type = dns.DNS_QTYPE_TXT
560 r.rr_class = dns.DNS_QCLASS_NONE
563 rdata = make_txt_record(['"This is a test"'])
566 p.nscount = len(updates)
569 response = self.dns_transaction_udp(p)
570 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
572 # And finally check it's gone
573 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
576 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
579 self.finish_name_packet(p, questions)
580 response = self.dns_transaction_udp(p)
581 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
583 def test_readd_record(self):
584 "Test if adding, deleting and then readding a records works"
586 NAME = "readdrec.%s" % self.get_dns_domain()
589 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
592 name = self.get_dns_domain()
594 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
596 self.finish_name_packet(p, updates)
601 r.rr_type = dns.DNS_QTYPE_TXT
602 r.rr_class = dns.DNS_QCLASS_IN
605 rdata = make_txt_record(['"This is a test"'])
608 p.nscount = len(updates)
611 response = self.dns_transaction_udp(p)
612 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
614 # Now check the record is around
615 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
617 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
620 self.finish_name_packet(p, questions)
621 response = self.dns_transaction_udp(p)
622 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
624 # Now delete the record
625 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
628 name = self.get_dns_domain()
630 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
632 self.finish_name_packet(p, updates)
637 r.rr_type = dns.DNS_QTYPE_TXT
638 r.rr_class = dns.DNS_QCLASS_NONE
641 rdata = make_txt_record(['"This is a test"'])
644 p.nscount = len(updates)
647 response = self.dns_transaction_udp(p)
648 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
651 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
654 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
657 self.finish_name_packet(p, questions)
658 response = self.dns_transaction_udp(p)
659 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
661 # recreate the record
662 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
665 name = self.get_dns_domain()
667 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
669 self.finish_name_packet(p, updates)
674 r.rr_type = dns.DNS_QTYPE_TXT
675 r.rr_class = dns.DNS_QCLASS_IN
678 rdata = make_txt_record(['"This is a test"'])
681 p.nscount = len(updates)
684 response = self.dns_transaction_udp(p)
685 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
687 # Now check the record is around
688 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
690 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
693 self.finish_name_packet(p, questions)
694 response = self.dns_transaction_udp(p)
695 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
697 def test_update_add_mx_record(self):
698 "test adding MX records works"
699 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
702 name = self.get_dns_domain()
704 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
706 self.finish_name_packet(p, updates)
710 r.name = "%s" % self.get_dns_domain()
711 r.rr_type = dns.DNS_QTYPE_MX
712 r.rr_class = dns.DNS_QCLASS_IN
715 rdata = dns.mx_record()
716 rdata.preference = 10
717 rdata.exchange = 'mail.%s' % self.get_dns_domain()
720 p.nscount = len(updates)
723 response = self.dns_transaction_udp(p)
724 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
726 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
729 name = "%s" % self.get_dns_domain()
730 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
733 self.finish_name_packet(p, questions)
734 response = self.dns_transaction_udp(p)
735 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
736 self.assertEqual(response.ancount, 1)
737 ans = response.answers[0]
738 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
739 self.assertEqual(ans.rdata.preference, 10)
740 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
743 class TestComplexQueries(DNSTest):
746 super(TestComplexQueries, self).setUp()
747 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
750 name = self.get_dns_domain()
752 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
754 self.finish_name_packet(p, updates)
758 r.name = "cname_test.%s" % self.get_dns_domain()
759 r.rr_type = dns.DNS_QTYPE_CNAME
760 r.rr_class = dns.DNS_QCLASS_IN
763 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
765 p.nscount = len(updates)
768 response = self.dns_transaction_udp(p)
769 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
772 super(TestComplexQueries, self).tearDown()
773 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
776 name = self.get_dns_domain()
778 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
780 self.finish_name_packet(p, updates)
784 r.name = "cname_test.%s" % self.get_dns_domain()
785 r.rr_type = dns.DNS_QTYPE_CNAME
786 r.rr_class = dns.DNS_QCLASS_NONE
789 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
791 p.nscount = len(updates)
794 response = self.dns_transaction_udp(p)
795 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
797 def test_one_a_query(self):
798 "create a query packet containing one query record"
799 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
802 name = "cname_test.%s" % self.get_dns_domain()
803 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
804 print "asking for ", q.name
807 self.finish_name_packet(p, questions)
808 response = self.dns_transaction_udp(p)
809 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
810 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
811 self.assertEquals(response.ancount, 2)
812 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
813 self.assertEquals(response.answers[0].rdata, "%s.%s" %
814 (self.server, self.get_dns_domain()))
815 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
816 self.assertEquals(response.answers[1].rdata,
819 class TestInvalidQueries(DNSTest):
821 def test_one_a_query(self):
822 "send 0 bytes follows by create a query packet containing one query record"
826 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
827 s.connect((self.server_ip, 53))
833 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
836 name = "%s.%s" % (self.server, self.get_dns_domain())
837 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
838 print "asking for ", q.name
841 self.finish_name_packet(p, questions)
842 response = self.dns_transaction_udp(p)
843 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
844 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
845 self.assertEquals(response.ancount, 1)
846 self.assertEquals(response.answers[0].rdata,
849 def test_one_a_reply(self):
850 "send a reply instead of a query"
852 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
855 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
856 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
857 print "asking for ", q.name
860 self.finish_name_packet(p, questions)
861 p.operation |= dns.DNS_FLAG_REPLY
864 send_packet = ndr.ndr_pack(p)
865 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
867 s.connect((host, 53))
868 tcp_packet = struct.pack('!H', len(send_packet))
869 tcp_packet += send_packet
870 s.send(tcp_packet, 0)
871 recv_packet = s.recv(0xffff + 2, 0)
872 self.assertEquals(0, len(recv_packet))
877 class TestRPCRoundtrip(DNSTest):
879 super(TestRPCRoundtrip, self).setUp()
880 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
884 super(TestRPCRoundtrip, self).tearDown()
886 def test_update_add_txt_rpc_to_dns(self):
887 prefix, txt = 'rpctextrec', ['"This is a test"']
889 name = "%s.%s" % (prefix, self.get_dns_domain())
891 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
892 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
893 add_rec_buf.rec = rec
895 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
896 0, self.server_ip, self.get_dns_domain(),
897 name, add_rec_buf, None)
899 self.check_query_txt(prefix, txt)
901 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
902 0, self.server_ip, self.get_dns_domain(),
903 name, None, add_rec_buf)
905 def test_update_add_null_padded_txt_record(self):
906 "test adding records works"
907 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
908 p = self.make_txt_update(prefix, txt)
909 response = self.dns_transaction_udp(p)
910 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
911 self.check_query_txt(prefix, txt)
912 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
913 self.get_dns_domain(),
914 "%s.%s" % (prefix, self.get_dns_domain()),
915 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
917 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
918 p = self.make_txt_update(prefix, txt)
919 response = self.dns_transaction_udp(p)
920 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
921 self.check_query_txt(prefix, txt)
922 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
923 self.get_dns_domain(),
924 "%s.%s" % (prefix, self.get_dns_domain()),
925 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
927 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
928 p = self.make_txt_update(prefix, txt)
929 response = self.dns_transaction_udp(p)
930 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
931 self.check_query_txt(prefix, txt)
932 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
933 self.get_dns_domain(),
934 "%s.%s" % (prefix, self.get_dns_domain()),
935 dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
937 def test_update_add_padding_rpc_to_dns(self):
938 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
939 prefix = 'rpc' + prefix
940 name = "%s.%s" % (prefix, self.get_dns_domain())
942 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
943 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
944 add_rec_buf.rec = rec
946 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
947 0, self.server_ip, self.get_dns_domain(),
948 name, add_rec_buf, None)
950 self.check_query_txt(prefix, txt)
952 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
953 0, self.server_ip, self.get_dns_domain(),
954 name, None, add_rec_buf)
956 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
957 prefix = 'rpc' + prefix
958 name = "%s.%s" % (prefix, self.get_dns_domain())
960 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
961 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
962 add_rec_buf.rec = rec
964 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
965 0, self.server_ip, self.get_dns_domain(),
966 name, add_rec_buf, None)
968 self.check_query_txt(prefix, txt)
970 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
971 0, self.server_ip, self.get_dns_domain(),
972 name, None, add_rec_buf)
974 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
975 prefix = 'rpc' + prefix
976 name = "%s.%s" % (prefix, self.get_dns_domain())
978 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
979 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
980 add_rec_buf.rec = rec
982 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
983 0, self.server_ip, self.get_dns_domain(),
984 name, add_rec_buf, None)
986 self.check_query_txt(prefix, txt)
988 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
989 0, self.server_ip, self.get_dns_domain(),
990 name, None, add_rec_buf)
992 # Test is incomplete due to strlen against txt records
993 def test_update_add_null_char_txt_record(self):
994 "test adding records works"
995 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
996 p = self.make_txt_update(prefix, txt)
997 response = self.dns_transaction_udp(p)
998 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
999 self.check_query_txt(prefix, ['NULL'])
1000 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1001 self.get_dns_domain(),
1002 "%s.%s" % (prefix, self.get_dns_domain()),
1003 dnsp.DNS_TYPE_TXT, '"NULL"'))
1005 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1006 p = self.make_txt_update(prefix, txt)
1007 response = self.dns_transaction_udp(p)
1008 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1009 self.check_query_txt(prefix, ['NULL', 'NULL'])
1010 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1011 self.get_dns_domain(),
1012 "%s.%s" % (prefix, self.get_dns_domain()),
1013 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1015 def test_update_add_null_char_rpc_to_dns(self):
1016 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1017 prefix = 'rpc' + prefix
1018 name = "%s.%s" % (prefix, self.get_dns_domain())
1020 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1021 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1022 add_rec_buf.rec = rec
1024 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1025 0, self.server_ip, self.get_dns_domain(),
1026 name, add_rec_buf, None)
1028 self.check_query_txt(prefix, ['NULL'])
1030 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1031 0, self.server_ip, self.get_dns_domain(),
1032 name, None, add_rec_buf)
1034 def test_update_add_hex_char_txt_record(self):
1035 "test adding records works"
1036 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1037 p = self.make_txt_update(prefix, txt)
1038 response = self.dns_transaction_udp(p)
1039 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1040 self.check_query_txt(prefix, txt)
1041 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1042 self.get_dns_domain(),
1043 "%s.%s" % (prefix, self.get_dns_domain()),
1044 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1046 def test_update_add_hex_rpc_to_dns(self):
1047 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1048 prefix = 'rpc' + prefix
1049 name = "%s.%s" % (prefix, self.get_dns_domain())
1051 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1052 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1053 add_rec_buf.rec = rec
1055 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1056 0, self.server_ip, self.get_dns_domain(),
1057 name, add_rec_buf, None)
1059 self.check_query_txt(prefix, txt)
1061 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1062 0, self.server_ip, self.get_dns_domain(),
1063 name, None, add_rec_buf)
1065 def test_update_add_slash_txt_record(self):
1066 "test adding records works"
1067 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1068 p = self.make_txt_update(prefix, txt)
1069 response = self.dns_transaction_udp(p)
1070 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1071 self.check_query_txt(prefix, txt)
1072 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1073 self.get_dns_domain(),
1074 "%s.%s" % (prefix, self.get_dns_domain()),
1075 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1077 # This test fails against Windows as it eliminates slashes in RPC
1078 # One typical use for a slash is in records like 'var=value' to
1079 # escape '=' characters.
1080 def test_update_add_slash_rpc_to_dns(self):
1081 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1082 prefix = 'rpc' + prefix
1083 name = "%s.%s" % (prefix, self.get_dns_domain())
1085 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1086 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1087 add_rec_buf.rec = rec
1089 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1090 0, self.server_ip, self.get_dns_domain(),
1091 name, add_rec_buf, None)
1093 self.check_query_txt(prefix, txt)
1095 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1096 0, self.server_ip, self.get_dns_domain(),
1097 name, None, add_rec_buf)
1099 def test_update_add_two_txt_records(self):
1100 "test adding two txt records works"
1101 prefix, txt = 'textrec2', ['"This is a test"',
1102 '"and this is a test, too"']
1103 p = self.make_txt_update(prefix, txt)
1104 response = self.dns_transaction_udp(p)
1105 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1106 self.check_query_txt(prefix, txt)
1107 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1108 self.get_dns_domain(),
1109 "%s.%s" % (prefix, self.get_dns_domain()),
1110 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1111 ' "\\"and this is a test, too\\""'))
1113 def test_update_add_two_rpc_to_dns(self):
1114 prefix, txt = 'textrec2', ['"This is a test"',
1115 '"and this is a test, too"']
1116 prefix = 'rpc' + prefix
1117 name = "%s.%s" % (prefix, self.get_dns_domain())
1119 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1120 '"\\"This is a test\\""' +
1121 ' "\\"and this is a test, too\\""')
1122 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1123 add_rec_buf.rec = rec
1125 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1126 0, self.server_ip, self.get_dns_domain(),
1127 name, add_rec_buf, None)
1129 self.check_query_txt(prefix, txt)
1131 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1132 0, self.server_ip, self.get_dns_domain(),
1133 name, None, add_rec_buf)
1135 def test_update_add_empty_txt_records(self):
1136 "test adding two txt records works"
1137 prefix, txt = 'emptytextrec', []
1138 p = self.make_txt_update(prefix, txt)
1139 response = self.dns_transaction_udp(p)
1140 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1141 self.check_query_txt(prefix, txt)
1142 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1143 self.get_dns_domain(),
1144 "%s.%s" % (prefix, self.get_dns_domain()),
1145 dnsp.DNS_TYPE_TXT, ''))
1147 def test_update_add_empty_rpc_to_dns(self):
1148 prefix, txt = 'rpcemptytextrec', []
1150 name = "%s.%s" % (prefix, self.get_dns_domain())
1152 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1153 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1154 add_rec_buf.rec = rec
1156 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1157 0, self.server_ip, self.get_dns_domain(),
1158 name, add_rec_buf, None)
1160 self.check_query_txt(prefix, txt)
1162 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1163 0, self.server_ip, self.get_dns_domain(),
1164 name, None, add_rec_buf)
1166 runner = SubunitTestRunner()
1168 if not runner.run(unittest.makeSuite(DNSTest)).wasSuccessful():
1170 if not runner.run(unittest.makeSuite(TestSimpleQueries)).wasSuccessful():
1172 if not runner.run(unittest.makeSuite(TestDNSUpdates)).wasSuccessful():
1174 if not runner.run(unittest.makeSuite(TestComplexQueries)).wasSuccessful():
1176 if not runner.run(unittest.makeSuite(TestInvalidQueries)).wasSuccessful():
1178 if not runner.run(unittest.makeSuite(TestRPCRoundtrip)).wasSuccessful():