1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin <kai@samba.org> 2011
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 import samba.ndr as ndr
24 from samba import credentials, param
25 from samba.dcerpc import dns, dnsp, dnsserver
26 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
28 from samba import werror, WERRORError
29 from samba.tests.dns_base import DNSTest
30 import samba.getopt as options
33 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
34 sambaopts = options.SambaOptions(parser)
35 parser.add_option_group(sambaopts)
37 # This timeout only has relevance when testing against Windows
38 # Format errors tend to return patchy responses, so a timeout is needed.
39 parser.add_option("--timeout", type="int", dest="timeout",
40 help="Specify timeout for DNS requests")
42 # use command line creds if available
43 credopts = options.CredentialsOptions(parser)
44 parser.add_option_group(credopts)
45 subunitopts = SubunitOptions(parser)
46 parser.add_option_group(subunitopts)
48 opts, args = parser.parse_args()
50 lp = sambaopts.get_loadparm()
51 creds = credopts.get_credentials(lp)
53 timeout = opts.timeout
61 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
63 class TestSimpleQueries(DNSTest):
65 super(TestSimpleQueries, self).setUp()
66 global server, server_ip, lp, creds, timeout
67 self.server = server_name
68 self.server_ip = server_ip
71 self.timeout = timeout
73 def test_one_a_query(self):
74 "create a query packet containing one query record"
75 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
78 name = "%s.%s" % (self.server, self.get_dns_domain())
79 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
80 print "asking for ", q.name
83 self.finish_name_packet(p, questions)
84 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
85 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
86 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
87 self.assertEquals(response.ancount, 1)
88 self.assertEquals(response.answers[0].rdata,
91 def test_one_SOA_query(self):
92 "create a query packet containing one query record for the SOA"
93 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
96 name = "%s" % (self.get_dns_domain())
97 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
98 print "asking for ", q.name
101 self.finish_name_packet(p, questions)
102 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
103 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
104 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
105 self.assertEquals(response.ancount, 1)
106 self.assertEquals(response.answers[0].rdata.mname.upper(),
107 ("%s.%s" % (self.server, self.get_dns_domain())).upper())
109 def test_one_a_query_tcp(self):
110 "create a query packet containing one query record via TCP"
111 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
114 name = "%s.%s" % (self.server, self.get_dns_domain())
115 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
116 print "asking for ", q.name
119 self.finish_name_packet(p, questions)
120 (response, response_packet) = self.dns_transaction_tcp(p, host=server_ip)
121 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
122 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
123 self.assertEquals(response.ancount, 1)
124 self.assertEquals(response.answers[0].rdata,
127 def test_one_mx_query(self):
128 "create a query packet causing an empty RCODE_OK answer"
129 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
132 name = "%s.%s" % (self.server, self.get_dns_domain())
133 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
134 print "asking for ", q.name
137 self.finish_name_packet(p, questions)
138 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
139 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
140 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
141 self.assertEquals(response.ancount, 0)
143 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
146 name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
147 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
148 print "asking for ", q.name
151 self.finish_name_packet(p, questions)
152 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
153 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
154 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155 self.assertEquals(response.ancount, 0)
157 def test_two_queries(self):
158 "create a query packet containing two query records"
159 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
162 name = "%s.%s" % (self.server, self.get_dns_domain())
163 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
166 name = "%s.%s" % ('bogusname', self.get_dns_domain())
167 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
170 self.finish_name_packet(p, questions)
172 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
173 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
174 except socket.timeout:
175 # Windows chooses not to respond to incorrectly formatted queries.
176 # Although this appears to be non-deterministic even for the same
177 # request twice, it also appears to be based on a how poorly the
178 # request is formatted.
181 def test_qtype_all_query(self):
182 "create a QTYPE_ALL query"
183 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
186 name = "%s.%s" % (self.server, self.get_dns_domain())
187 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
188 print "asking for ", q.name
191 self.finish_name_packet(p, questions)
192 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
195 dc_ipv6 = os.getenv('SERVER_IPV6')
196 if dc_ipv6 is not None:
199 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
200 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
201 self.assertEquals(response.ancount, num_answers)
202 self.assertEquals(response.answers[0].rdata,
204 if dc_ipv6 is not None:
205 self.assertEquals(response.answers[1].rdata, dc_ipv6)
207 def test_qclass_none_query(self):
208 "create a QCLASS_NONE query"
209 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
212 name = "%s.%s" % (self.server, self.get_dns_domain())
213 q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
216 self.finish_name_packet(p, questions)
218 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
219 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
220 except socket.timeout:
221 # Windows chooses not to respond to incorrectly formatted queries.
222 # Although this appears to be non-deterministic even for the same
223 # request twice, it also appears to be based on a how poorly the
224 # request is formatted.
227 def test_soa_hostname_query(self):
228 "create a SOA query for a hostname"
229 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
232 name = "%s.%s" % (self.server, self.get_dns_domain())
233 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
236 self.finish_name_packet(p, questions)
237 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
238 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
239 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
240 # We don't get SOA records for single hosts
241 self.assertEquals(response.ancount, 0)
242 # But we do respond with an authority section
243 self.assertEqual(response.nscount, 1)
245 def test_soa_domain_query(self):
246 "create a SOA query for a domain"
247 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
250 name = self.get_dns_domain()
251 q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
254 self.finish_name_packet(p, questions)
255 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
256 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
257 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
258 self.assertEquals(response.ancount, 1)
259 self.assertEquals(response.answers[0].rdata.minimum, 3600)
262 class TestDNSUpdates(DNSTest):
264 super(TestDNSUpdates, self).setUp()
265 global server, server_ip, lp, creds, timeout
266 self.server = server_name
267 self.server_ip = server_ip
270 self.timeout = timeout
272 def test_two_updates(self):
273 "create two update requests"
274 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
277 name = "%s.%s" % (self.server, self.get_dns_domain())
278 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
281 name = self.get_dns_domain()
282 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
285 self.finish_name_packet(p, updates)
287 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
288 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
289 except socket.timeout:
290 # Windows chooses not to respond to incorrectly formatted queries.
291 # Although this appears to be non-deterministic even for the same
292 # request twice, it also appears to be based on a how poorly the
293 # request is formatted.
296 def test_update_wrong_qclass(self):
297 "create update with DNS_QCLASS_NONE"
298 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
301 name = self.get_dns_domain()
302 u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
305 self.finish_name_packet(p, updates)
306 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
307 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
309 def test_update_prereq_with_non_null_ttl(self):
310 "test update with a non-null TTL"
311 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
314 name = self.get_dns_domain()
316 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
318 self.finish_name_packet(p, updates)
322 r.name = "%s.%s" % (self.server, self.get_dns_domain())
323 r.rr_type = dns.DNS_QTYPE_TXT
324 r.rr_class = dns.DNS_QCLASS_NONE
329 p.ancount = len(prereqs)
333 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
334 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
335 except socket.timeout:
336 # Windows chooses not to respond to incorrectly formatted queries.
337 # Although this appears to be non-deterministic even for the same
338 # request twice, it also appears to be based on a how poorly the
339 # request is formatted.
342 def test_update_prereq_with_non_null_length(self):
343 "test update with a non-null length"
344 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
347 name = self.get_dns_domain()
349 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
351 self.finish_name_packet(p, updates)
355 r.name = "%s.%s" % (self.server, self.get_dns_domain())
356 r.rr_type = dns.DNS_QTYPE_TXT
357 r.rr_class = dns.DNS_QCLASS_ANY
362 p.ancount = len(prereqs)
365 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
366 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
368 def test_update_prereq_nonexisting_name(self):
369 "test update with a nonexisting name"
370 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
373 name = self.get_dns_domain()
375 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
377 self.finish_name_packet(p, updates)
381 r.name = "idontexist.%s" % self.get_dns_domain()
382 r.rr_type = dns.DNS_QTYPE_TXT
383 r.rr_class = dns.DNS_QCLASS_ANY
388 p.ancount = len(prereqs)
391 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
392 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
394 def test_update_add_txt_record(self):
395 "test adding records works"
396 prefix, txt = 'textrec', ['"This is a test"']
397 p = self.make_txt_update(prefix, txt)
398 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
399 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
400 self.check_query_txt(prefix, txt)
402 def test_delete_record(self):
403 "Test if deleting records works"
405 NAME = "deleterec.%s" % self.get_dns_domain()
407 # First, create a record to make sure we have a record to delete.
408 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
411 name = self.get_dns_domain()
413 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
415 self.finish_name_packet(p, updates)
420 r.rr_type = dns.DNS_QTYPE_TXT
421 r.rr_class = dns.DNS_QCLASS_IN
424 rdata = self.make_txt_record(['"This is a test"'])
427 p.nscount = len(updates)
430 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
431 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
433 # Now check the record is around
434 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
436 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
439 self.finish_name_packet(p, questions)
440 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
441 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
443 # Now delete the record
444 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
447 name = self.get_dns_domain()
449 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
451 self.finish_name_packet(p, updates)
456 r.rr_type = dns.DNS_QTYPE_TXT
457 r.rr_class = dns.DNS_QCLASS_NONE
460 rdata = self.make_txt_record(['"This is a test"'])
463 p.nscount = len(updates)
466 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
467 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
469 # And finally check it's gone
470 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
473 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
476 self.finish_name_packet(p, questions)
477 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
478 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
480 def test_readd_record(self):
481 "Test if adding, deleting and then readding a records works"
483 NAME = "readdrec.%s" % self.get_dns_domain()
486 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
489 name = self.get_dns_domain()
491 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
493 self.finish_name_packet(p, updates)
498 r.rr_type = dns.DNS_QTYPE_TXT
499 r.rr_class = dns.DNS_QCLASS_IN
502 rdata = self.make_txt_record(['"This is a test"'])
505 p.nscount = len(updates)
508 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
509 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
511 # Now check the record is around
512 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
514 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
517 self.finish_name_packet(p, questions)
518 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
519 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
521 # Now delete the record
522 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
525 name = self.get_dns_domain()
527 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
529 self.finish_name_packet(p, updates)
534 r.rr_type = dns.DNS_QTYPE_TXT
535 r.rr_class = dns.DNS_QCLASS_NONE
538 rdata = self.make_txt_record(['"This is a test"'])
541 p.nscount = len(updates)
544 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
545 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
548 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
551 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
554 self.finish_name_packet(p, questions)
555 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
556 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
558 # recreate the record
559 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
562 name = self.get_dns_domain()
564 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
566 self.finish_name_packet(p, updates)
571 r.rr_type = dns.DNS_QTYPE_TXT
572 r.rr_class = dns.DNS_QCLASS_IN
575 rdata = self.make_txt_record(['"This is a test"'])
578 p.nscount = len(updates)
581 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
582 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
584 # Now check the record is around
585 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
587 q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
590 self.finish_name_packet(p, questions)
591 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
592 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
594 def test_update_add_mx_record(self):
595 "test adding MX records works"
596 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
599 name = self.get_dns_domain()
601 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
603 self.finish_name_packet(p, updates)
607 r.name = "%s" % self.get_dns_domain()
608 r.rr_type = dns.DNS_QTYPE_MX
609 r.rr_class = dns.DNS_QCLASS_IN
612 rdata = dns.mx_record()
613 rdata.preference = 10
614 rdata.exchange = 'mail.%s' % self.get_dns_domain()
617 p.nscount = len(updates)
620 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
621 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
623 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
626 name = "%s" % self.get_dns_domain()
627 q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
630 self.finish_name_packet(p, questions)
631 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
632 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
633 self.assertEqual(response.ancount, 1)
634 ans = response.answers[0]
635 self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
636 self.assertEqual(ans.rdata.preference, 10)
637 self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
640 class TestComplexQueries(DNSTest):
641 def make_dns_update(self, key, value, qtype):
642 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
644 name = self.get_dns_domain()
645 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
646 self.finish_name_packet(p, [u])
651 r.rr_class = dns.DNS_QCLASS_IN
659 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
660 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
663 super(TestComplexQueries, self).setUp()
665 global server, server_ip, lp, creds, timeout
666 self.server = server_name
667 self.server_ip = server_ip
670 self.timeout = timeout
672 def test_one_a_query(self):
673 "create a query packet containing one query record"
678 name = "cname_test.%s" % self.get_dns_domain()
679 rdata = "%s.%s" % (self.server, self.get_dns_domain())
680 self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
682 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
686 name = "cname_test.%s" % self.get_dns_domain()
687 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
688 print "asking for ", q.name
691 self.finish_name_packet(p, questions)
692 (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
693 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
694 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
695 self.assertEquals(response.ancount, 2)
696 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
697 self.assertEquals(response.answers[0].rdata, "%s.%s" %
698 (self.server, self.get_dns_domain()))
699 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
700 self.assertEquals(response.answers[1].rdata,
705 p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
708 name = self.get_dns_domain()
710 u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
712 self.finish_name_packet(p, updates)
716 r.name = "cname_test.%s" % self.get_dns_domain()
717 r.rr_type = dns.DNS_QTYPE_CNAME
718 r.rr_class = dns.DNS_QCLASS_NONE
721 r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
723 p.nscount = len(updates)
726 (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
727 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
729 def test_cname_two_chain(self):
730 name0 = "cnamechain0.%s" % self.get_dns_domain()
731 name1 = "cnamechain1.%s" % self.get_dns_domain()
732 name2 = "cnamechain2.%s" % self.get_dns_domain()
733 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
734 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
735 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
737 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
739 q = self.make_name_question(name1, dns.DNS_QTYPE_A,
743 self.finish_name_packet(p, questions)
744 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
745 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
746 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
747 self.assertEquals(response.ancount, 3)
749 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
750 self.assertEquals(response.answers[0].name, name1)
751 self.assertEquals(response.answers[0].rdata, name2)
753 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
754 self.assertEquals(response.answers[1].name, name2)
755 self.assertEquals(response.answers[1].rdata, name0)
757 self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
758 self.assertEquals(response.answers[2].rdata,
761 def test_invalid_empty_cname(self):
762 name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
764 self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
765 except AssertionError as e:
768 self.fail("Successfully added empty CNAME, which is invalid.")
770 def test_cname_two_chain_not_matching_qtype(self):
771 name0 = "cnamechain0.%s" % self.get_dns_domain()
772 name1 = "cnamechain1.%s" % self.get_dns_domain()
773 name2 = "cnamechain2.%s" % self.get_dns_domain()
774 self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
775 self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
776 self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
778 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
780 q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
784 self.finish_name_packet(p, questions)
785 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
786 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
787 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
789 # CNAME should return all intermediate results!
790 # Only the A records exists, not the TXT.
791 self.assertEquals(response.ancount, 2)
793 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
794 self.assertEquals(response.answers[0].name, name1)
795 self.assertEquals(response.answers[0].rdata, name2)
797 self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
798 self.assertEquals(response.answers[1].name, name2)
799 self.assertEquals(response.answers[1].rdata, name0)
801 class TestInvalidQueries(DNSTest):
803 super(TestInvalidQueries, self).setUp()
804 global server, server_ip, lp, creds, timeout
805 self.server = server_name
806 self.server_ip = server_ip
809 self.timeout = timeout
811 def test_one_a_query(self):
812 "send 0 bytes follows by create a query packet containing one query record"
816 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
817 s.connect((self.server_ip, 53))
823 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
826 name = "%s.%s" % (self.server, self.get_dns_domain())
827 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
828 print "asking for ", q.name
831 self.finish_name_packet(p, questions)
832 (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
833 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
834 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
835 self.assertEquals(response.ancount, 1)
836 self.assertEquals(response.answers[0].rdata,
839 def test_one_a_reply(self):
840 "send a reply instead of a query"
843 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
846 name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
847 q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
848 print "asking for ", q.name
851 self.finish_name_packet(p, questions)
852 p.operation |= dns.DNS_FLAG_REPLY
855 send_packet = ndr.ndr_pack(p)
856 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
857 s.settimeout(timeout)
859 s.connect((host, 53))
860 tcp_packet = struct.pack('!H', len(send_packet))
861 tcp_packet += send_packet
862 s.send(tcp_packet, 0)
863 recv_packet = s.recv(0xffff + 2, 0)
864 self.assertEquals(0, len(recv_packet))
865 except socket.timeout:
866 # Windows chooses not to respond to incorrectly formatted queries.
867 # Although this appears to be non-deterministic even for the same
868 # request twice, it also appears to be based on a how poorly the
869 # request is formatted.
875 class TestZones(DNSTest):
877 super(TestZones, self).setUp()
878 global server, server_ip, lp, creds, timeout
879 self.server = server_name
880 self.server_ip = server_ip
883 self.timeout = timeout
885 self.zone = "test.lan"
886 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
890 super(TestZones, self).tearDown()
892 self.delete_zone(self.zone)
893 except RuntimeError as e:
894 (num, string) = e.args
895 if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
898 def create_zone(self, zone):
899 zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
900 zone_create.pszZoneName = zone
901 zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
902 zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
903 zone_create.fAging = 0
904 zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
906 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
912 dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
914 except WERRORError as e:
917 def delete_zone(self, zone):
918 self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
924 dnsserver.DNSSRV_TYPEID_NULL,
927 def test_soa_query(self):
929 p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
932 q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
934 self.finish_name_packet(p, questions)
936 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
937 # Windows returns OK while BIND logically seems to return NXDOMAIN
938 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
939 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
940 self.assertEquals(response.ancount, 0)
942 self.create_zone(zone)
943 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
944 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
945 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
946 self.assertEquals(response.ancount, 1)
947 self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
949 self.delete_zone(zone)
950 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
951 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
952 self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
953 self.assertEquals(response.ancount, 0)
955 class TestRPCRoundtrip(DNSTest):
957 super(TestRPCRoundtrip, self).setUp()
958 global server, server_ip, lp, creds
959 self.server = server_name
960 self.server_ip = server_ip
963 self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
967 super(TestRPCRoundtrip, self).tearDown()
969 def test_update_add_txt_rpc_to_dns(self):
970 prefix, txt = 'rpctextrec', ['"This is a test"']
972 name = "%s.%s" % (prefix, self.get_dns_domain())
974 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
975 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
976 add_rec_buf.rec = rec
978 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
979 0, self.server_ip, self.get_dns_domain(),
980 name, add_rec_buf, None)
981 except WERRORError as e:
985 self.check_query_txt(prefix, txt)
987 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
988 0, self.server_ip, self.get_dns_domain(),
989 name, None, add_rec_buf)
991 def test_update_add_null_padded_txt_record(self):
992 "test adding records works"
993 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
994 p = self.make_txt_update(prefix, txt)
995 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
996 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
997 self.check_query_txt(prefix, txt)
998 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
999 self.get_dns_domain(),
1000 "%s.%s" % (prefix, self.get_dns_domain()),
1001 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
1003 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1004 p = self.make_txt_update(prefix, txt)
1005 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1006 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1007 self.check_query_txt(prefix, txt)
1008 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1009 self.get_dns_domain(),
1010 "%s.%s" % (prefix, self.get_dns_domain()),
1011 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
1013 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1014 p = self.make_txt_update(prefix, txt)
1015 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1016 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1017 self.check_query_txt(prefix, txt)
1018 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1019 self.get_dns_domain(),
1020 "%s.%s" % (prefix, self.get_dns_domain()),
1021 dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
1023 def test_update_add_padding_rpc_to_dns(self):
1024 prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1025 prefix = 'rpc' + prefix
1026 name = "%s.%s" % (prefix, self.get_dns_domain())
1028 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
1029 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1030 add_rec_buf.rec = rec
1032 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1033 0, self.server_ip, self.get_dns_domain(),
1034 name, add_rec_buf, None)
1036 except WERRORError as e:
1040 self.check_query_txt(prefix, txt)
1042 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1043 0, self.server_ip, self.get_dns_domain(),
1044 name, None, add_rec_buf)
1046 prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1047 prefix = 'rpc' + prefix
1048 name = "%s.%s" % (prefix, self.get_dns_domain())
1050 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
1051 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1052 add_rec_buf.rec = rec
1054 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1055 0, self.server_ip, self.get_dns_domain(),
1056 name, add_rec_buf, None)
1058 except WERRORError as e:
1062 self.check_query_txt(prefix, txt)
1064 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1065 0, self.server_ip, self.get_dns_domain(),
1066 name, None, add_rec_buf)
1068 prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1069 prefix = 'rpc' + prefix
1070 name = "%s.%s" % (prefix, self.get_dns_domain())
1072 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
1073 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1074 add_rec_buf.rec = rec
1076 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1077 0, self.server_ip, self.get_dns_domain(),
1078 name, add_rec_buf, None)
1079 except WERRORError as e:
1083 self.check_query_txt(prefix, txt)
1085 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1086 0, self.server_ip, self.get_dns_domain(),
1087 name, None, add_rec_buf)
1089 # Test is incomplete due to strlen against txt records
1090 def test_update_add_null_char_txt_record(self):
1091 "test adding records works"
1092 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1093 p = self.make_txt_update(prefix, txt)
1094 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1095 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1096 self.check_query_txt(prefix, ['NULL'])
1097 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1098 self.get_dns_domain(),
1099 "%s.%s" % (prefix, self.get_dns_domain()),
1100 dnsp.DNS_TYPE_TXT, '"NULL"'))
1102 prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1103 p = self.make_txt_update(prefix, txt)
1104 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1105 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1106 self.check_query_txt(prefix, ['NULL', 'NULL'])
1107 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1108 self.get_dns_domain(),
1109 "%s.%s" % (prefix, self.get_dns_domain()),
1110 dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1112 def test_update_add_null_char_rpc_to_dns(self):
1113 prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1114 prefix = 'rpc' + prefix
1115 name = "%s.%s" % (prefix, self.get_dns_domain())
1117 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1118 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1119 add_rec_buf.rec = rec
1121 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1122 0, self.server_ip, self.get_dns_domain(),
1123 name, add_rec_buf, None)
1125 except WERRORError as e:
1129 self.check_query_txt(prefix, ['NULL'])
1131 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1132 0, self.server_ip, self.get_dns_domain(),
1133 name, None, add_rec_buf)
1135 def test_update_add_hex_char_txt_record(self):
1136 "test adding records works"
1137 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1138 p = self.make_txt_update(prefix, txt)
1139 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1140 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1141 self.check_query_txt(prefix, txt)
1142 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1143 self.get_dns_domain(),
1144 "%s.%s" % (prefix, self.get_dns_domain()),
1145 dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1147 def test_update_add_hex_rpc_to_dns(self):
1148 prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1149 prefix = 'rpc' + prefix
1150 name = "%s.%s" % (prefix, self.get_dns_domain())
1152 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1153 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1154 add_rec_buf.rec = rec
1156 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1157 0, self.server_ip, self.get_dns_domain(),
1158 name, add_rec_buf, None)
1160 except WERRORError as e:
1164 self.check_query_txt(prefix, txt)
1166 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1167 0, self.server_ip, self.get_dns_domain(),
1168 name, None, add_rec_buf)
1170 def test_update_add_slash_txt_record(self):
1171 "test adding records works"
1172 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1173 p = self.make_txt_update(prefix, txt)
1174 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1175 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1176 self.check_query_txt(prefix, txt)
1177 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1178 self.get_dns_domain(),
1179 "%s.%s" % (prefix, self.get_dns_domain()),
1180 dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1182 # This test fails against Windows as it eliminates slashes in RPC
1183 # One typical use for a slash is in records like 'var=value' to
1184 # escape '=' characters.
1185 def test_update_add_slash_rpc_to_dns(self):
1186 prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1187 prefix = 'rpc' + prefix
1188 name = "%s.%s" % (prefix, self.get_dns_domain())
1190 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1191 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1192 add_rec_buf.rec = rec
1194 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1195 0, self.server_ip, self.get_dns_domain(),
1196 name, add_rec_buf, None)
1198 except WERRORError as e:
1202 self.check_query_txt(prefix, txt)
1205 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1206 0, self.server_ip, self.get_dns_domain(),
1207 name, None, add_rec_buf)
1209 def test_update_add_two_txt_records(self):
1210 "test adding two txt records works"
1211 prefix, txt = 'textrec2', ['"This is a test"',
1212 '"and this is a test, too"']
1213 p = self.make_txt_update(prefix, txt)
1214 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1215 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1216 self.check_query_txt(prefix, txt)
1217 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1218 self.get_dns_domain(),
1219 "%s.%s" % (prefix, self.get_dns_domain()),
1220 dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1221 ' "\\"and this is a test, too\\""'))
1223 def test_update_add_two_rpc_to_dns(self):
1224 prefix, txt = 'textrec2', ['"This is a test"',
1225 '"and this is a test, too"']
1226 prefix = 'rpc' + prefix
1227 name = "%s.%s" % (prefix, self.get_dns_domain())
1229 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1230 '"\\"This is a test\\""' +
1231 ' "\\"and this is a test, too\\""')
1232 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1233 add_rec_buf.rec = rec
1235 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1236 0, self.server_ip, self.get_dns_domain(),
1237 name, add_rec_buf, None)
1239 except WERRORError as e:
1243 self.check_query_txt(prefix, txt)
1245 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1246 0, self.server_ip, self.get_dns_domain(),
1247 name, None, add_rec_buf)
1249 def test_update_add_empty_txt_records(self):
1250 "test adding two txt records works"
1251 prefix, txt = 'emptytextrec', []
1252 p = self.make_txt_update(prefix, txt)
1253 (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1254 self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1255 self.check_query_txt(prefix, txt)
1256 self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1257 self.get_dns_domain(),
1258 "%s.%s" % (prefix, self.get_dns_domain()),
1259 dnsp.DNS_TYPE_TXT, ''))
1261 def test_update_add_empty_rpc_to_dns(self):
1262 prefix, txt = 'rpcemptytextrec', []
1264 name = "%s.%s" % (prefix, self.get_dns_domain())
1266 rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1267 add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1268 add_rec_buf.rec = rec
1270 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1271 0, self.server_ip, self.get_dns_domain(),
1272 name, add_rec_buf, None)
1273 except WERRORError as e:
1277 self.check_query_txt(prefix, txt)
1279 self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1280 0, self.server_ip, self.get_dns_domain(),
1281 name, None, add_rec_buf)
1283 TestProgram(module=__name__, opts=subunitopts)