CVE-2016-0771: tests/dns: Remove dependencies on env variables
[samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import sys
20 import struct
21 import random
22
23 sys.path.insert(0, "bin/python")
24 import samba
25 samba.ensure_external_module("testtools", "testtools")
26 samba.ensure_external_module("subunit", "subunit/python")
27 from subunit.run import SubunitTestRunner
28 import unittest
29
30 from samba import socket
31 import samba.ndr as ndr
32 from samba import credentials, param
33 from samba.tests import TestCase
34 from samba.dcerpc import dns, dnsp, dnsserver
35 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
36 import samba.getopt as options
37 import optparse
38
39 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
40 sambaopts = options.SambaOptions(parser)
41 parser.add_option_group(sambaopts)
42
43 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
44
45 # use command line creds if available
46 credopts = options.CredentialsOptions(parser)
47 parser.add_option_group(credopts)
48
49 opts, args = parser.parse_args()
50
51 lp = sambaopts.get_loadparm()
52 creds = credopts.get_credentials(lp)
53
54 if len(args) < 2:
55     parser.print_usage()
56     sys.exit(1)
57
58 server_name = args[0]
59 server_ip = args[1]
60 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
61
62 def make_txt_record(records):
63     rdata_txt = dns.txt_record()
64     s_list = dnsp.string_list()
65     s_list.count = len(records)
66     s_list.str = records
67     rdata_txt.txt = s_list
68     return rdata_txt
69
70 class DNSTest(TestCase):
71
72     def setUp(self):
73         global server, server_ip, lp, creds
74         super(DNSTest, self).setUp()
75         self.server = server_name
76         self.server_ip = server_ip
77         self.lp = lp
78         self.creds = creds
79
80     def errstr(self, errcode):
81         "Return a readable error code"
82         string_codes = [
83             "OK",
84             "FORMERR",
85             "SERVFAIL",
86             "NXDOMAIN",
87             "NOTIMP",
88             "REFUSED",
89             "YXDOMAIN",
90             "YXRRSET",
91             "NXRRSET",
92             "NOTAUTH",
93             "NOTZONE",
94         ]
95
96         return string_codes[errcode]
97
98
99     def assert_dns_rcode_equals(self, packet, rcode):
100         "Helper function to check return code"
101         p_errcode = packet.operation & 0x000F
102         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
103                             (self.errstr(rcode), self.errstr(p_errcode)))
104
105     def assert_dns_opcode_equals(self, packet, opcode):
106         "Helper function to check opcode"
107         p_opcode = packet.operation & 0x7800
108         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
109                             (opcode, p_opcode))
110
111     def make_name_packet(self, opcode, qid=None):
112         "Helper creating a dns.name_packet"
113         p = dns.name_packet()
114         if qid is None:
115             p.id = random.randint(0x0, 0xffff)
116         p.operation = opcode
117         p.questions = []
118         return p
119
120     def finish_name_packet(self, packet, questions):
121         "Helper to finalize a dns.name_packet"
122         packet.qdcount = len(questions)
123         packet.questions = questions
124
125     def make_name_question(self, name, qtype, qclass):
126         "Helper creating a dns.name_question"
127         q = dns.name_question()
128         q.name = name
129         q.question_type = qtype
130         q.question_class = qclass
131         return q
132
133     def get_dns_domain(self):
134         "Helper to get dns domain"
135         return self.creds.get_realm().lower()
136
137     def dns_transaction_udp(self, packet, host=server_ip,
138                             dump=False):
139         "send a DNS query and read the reply"
140         s = None
141         try:
142             send_packet = ndr.ndr_pack(packet)
143             if dump:
144                 print self.hexdump(send_packet)
145             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
146             s.connect((host, 53))
147             s.send(send_packet, 0)
148             recv_packet = s.recv(2048, 0)
149             if dump:
150                 print self.hexdump(recv_packet)
151             return ndr.ndr_unpack(dns.name_packet, recv_packet)
152         finally:
153             if s is not None:
154                 s.close()
155
156     def dns_transaction_tcp(self, packet, host=server_ip,
157                             dump=False):
158         "send a DNS query and read the reply"
159         s = None
160         try:
161             send_packet = ndr.ndr_pack(packet)
162             if dump:
163                 print self.hexdump(send_packet)
164             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
165             s.connect((host, 53))
166             tcp_packet = struct.pack('!H', len(send_packet))
167             tcp_packet += send_packet
168             s.send(tcp_packet, 0)
169             recv_packet = s.recv(0xffff + 2, 0)
170             if dump:
171                 print self.hexdump(recv_packet)
172             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
173         finally:
174                 if s is not None:
175                     s.close()
176
177     def hexdump(self, src, length=8):
178         N=0; result=''
179         while src:
180            s,src = src[:length],src[length:]
181            hexa = ' '.join(["%02X"%ord(x) for x in s])
182            s = s.translate(FILTER)
183            result += "%04X   %-*s   %s\n" % (N, length*3, hexa, s)
184            N+=length
185         return result
186
187     def make_txt_update(self, prefix, txt_array):
188         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
189         updates = []
190
191         name = self.get_dns_domain()
192         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
193         updates.append(u)
194         self.finish_name_packet(p, updates)
195
196         updates = []
197         r = dns.res_rec()
198         r.name = "%s.%s" % (prefix, self.get_dns_domain())
199         r.rr_type = dns.DNS_QTYPE_TXT
200         r.rr_class = dns.DNS_QCLASS_IN
201         r.ttl = 900
202         r.length = 0xffff
203         rdata = make_txt_record(txt_array)
204         r.rdata = rdata
205         updates.append(r)
206         p.nscount = len(updates)
207         p.nsrecs = updates
208
209         return p
210
211     def check_query_txt(self, prefix, txt_array):
212         name = "%s.%s" % (prefix, self.get_dns_domain())
213         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
214         questions = []
215
216         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
217         questions.append(q)
218
219         self.finish_name_packet(p, questions)
220         response = self.dns_transaction_udp(p)
221         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
222         self.assertEquals(response.ancount, 1)
223         self.assertEquals(response.answers[0].rdata.txt.str, txt_array)
224
225     def assertIsNotNone(self, item):
226         self.assertTrue(item is not None)
227
228 class TestSimpleQueries(DNSTest):
229
230     def test_one_a_query(self):
231         "create a query packet containing one query record"
232         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
233         questions = []
234
235         name = "%s.%s" % (self.server, self.get_dns_domain())
236         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
237         print "asking for ", q.name
238         questions.append(q)
239
240         self.finish_name_packet(p, questions)
241         response = self.dns_transaction_udp(p)
242         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
243         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
244         self.assertEquals(response.ancount, 1)
245         self.assertEquals(response.answers[0].rdata,
246                           self.server_ip)
247
248     def test_one_a_query_tcp(self):
249         "create a query packet containing one query record via TCP"
250         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251         questions = []
252
253         name = "%s.%s" % (self.server, self.get_dns_domain())
254         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
255         print "asking for ", q.name
256         questions.append(q)
257
258         self.finish_name_packet(p, questions)
259         response = self.dns_transaction_tcp(p)
260         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
261         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
262         self.assertEquals(response.ancount, 1)
263         self.assertEquals(response.answers[0].rdata,
264                           self.server_ip)
265
266     def test_one_mx_query(self):
267         "create a query packet causing an empty RCODE_OK answer"
268         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
269         questions = []
270
271         name = "%s.%s" % (self.server, self.get_dns_domain())
272         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
273         print "asking for ", q.name
274         questions.append(q)
275
276         self.finish_name_packet(p, questions)
277         response = self.dns_transaction_udp(p)
278         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
279         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
280         self.assertEquals(response.ancount, 0)
281
282         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
283         questions = []
284
285         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
286         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
287         print "asking for ", q.name
288         questions.append(q)
289
290         self.finish_name_packet(p, questions)
291         response = self.dns_transaction_udp(p)
292         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
293         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
294         self.assertEquals(response.ancount, 0)
295
296     def test_two_queries(self):
297         "create a query packet containing two query records"
298         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
299         questions = []
300
301         name = "%s.%s" % (self.server, self.get_dns_domain())
302         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
303         questions.append(q)
304
305         name = "%s.%s" % ('bogusname', self.get_dns_domain())
306         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
307         questions.append(q)
308
309         self.finish_name_packet(p, questions)
310         response = self.dns_transaction_udp(p)
311         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
312
313     def test_qtype_all_query(self):
314         "create a QTYPE_ALL query"
315         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
316         questions = []
317
318         name = "%s.%s" % (self.server, self.get_dns_domain())
319         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
320         print "asking for ", q.name
321         questions.append(q)
322
323         self.finish_name_packet(p, questions)
324         response = self.dns_transaction_udp(p)
325
326         num_answers = 1
327         dc_ipv6 = os.getenv('SERVER_IPV6')
328         if dc_ipv6 is not None:
329             num_answers += 1
330
331         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
332         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
333         self.assertEquals(response.ancount, num_answers)
334         self.assertEquals(response.answers[0].rdata,
335                           self.server_ip)
336         if dc_ipv6 is not None:
337             self.assertEquals(response.answers[1].rdata, dc_ipv6)
338
339     def test_qclass_none_query(self):
340         "create a QCLASS_NONE query"
341         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
342         questions = []
343
344         name = "%s.%s" % (self.server, self.get_dns_domain())
345         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
346         questions.append(q)
347
348         self.finish_name_packet(p, questions)
349         response = self.dns_transaction_udp(p)
350         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
351
352 # Only returns an authority section entry in BIND and Win DNS
353 # FIXME: Enable one Samba implements this feature
354 #    def test_soa_hostname_query(self):
355 #        "create a SOA query for a hostname"
356 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
357 #        questions = []
358 #
359 #        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
360 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
361 #        questions.append(q)
362 #
363 #        self.finish_name_packet(p, questions)
364 #        response = self.dns_transaction_udp(p)
365 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
366 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
367 #        # We don't get SOA records for single hosts
368 #        self.assertEquals(response.ancount, 0)
369
370     def test_soa_domain_query(self):
371         "create a SOA query for a domain"
372         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
373         questions = []
374
375         name = self.get_dns_domain()
376         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
377         questions.append(q)
378
379         self.finish_name_packet(p, questions)
380         response = self.dns_transaction_udp(p)
381         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
382         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
383         self.assertEquals(response.ancount, 1)
384         self.assertEquals(response.answers[0].rdata.minimum, 3600)
385
386
387 class TestDNSUpdates(DNSTest):
388
389     def test_two_updates(self):
390         "create two update requests"
391         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
392         updates = []
393
394         name = "%s.%s" % (self.server, self.get_dns_domain())
395         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
396         updates.append(u)
397
398         name = self.get_dns_domain()
399         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
400         updates.append(u)
401
402         self.finish_name_packet(p, updates)
403         response = self.dns_transaction_udp(p)
404         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
405
406     def test_update_wrong_qclass(self):
407         "create update with DNS_QCLASS_NONE"
408         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
409         updates = []
410
411         name = self.get_dns_domain()
412         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
413         updates.append(u)
414
415         self.finish_name_packet(p, updates)
416         response = self.dns_transaction_udp(p)
417         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
418
419     def test_update_prereq_with_non_null_ttl(self):
420         "test update with a non-null TTL"
421         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
422         updates = []
423
424         name = self.get_dns_domain()
425
426         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
427         updates.append(u)
428         self.finish_name_packet(p, updates)
429
430         prereqs = []
431         r = dns.res_rec()
432         r.name = "%s.%s" % (self.server, self.get_dns_domain())
433         r.rr_type = dns.DNS_QTYPE_TXT
434         r.rr_class = dns.DNS_QCLASS_NONE
435         r.ttl = 1
436         r.length = 0
437         prereqs.append(r)
438
439         p.ancount = len(prereqs)
440         p.answers = prereqs
441
442         response = self.dns_transaction_udp(p)
443         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
444
445     def test_update_prereq_with_non_null_length(self):
446         "test update with a non-null length"
447         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
448         updates = []
449
450         name = self.get_dns_domain()
451
452         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
453         updates.append(u)
454         self.finish_name_packet(p, updates)
455
456         prereqs = []
457         r = dns.res_rec()
458         r.name = "%s.%s" % (self.server, self.get_dns_domain())
459         r.rr_type = dns.DNS_QTYPE_TXT
460         r.rr_class = dns.DNS_QCLASS_ANY
461         r.ttl = 0
462         r.length = 1
463         prereqs.append(r)
464
465         p.ancount = len(prereqs)
466         p.answers = prereqs
467
468         response = self.dns_transaction_udp(p)
469         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
470
471     def test_update_prereq_nonexisting_name(self):
472         "test update with a nonexisting name"
473         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
474         updates = []
475
476         name = self.get_dns_domain()
477
478         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
479         updates.append(u)
480         self.finish_name_packet(p, updates)
481
482         prereqs = []
483         r = dns.res_rec()
484         r.name = "idontexist.%s" % self.get_dns_domain()
485         r.rr_type = dns.DNS_QTYPE_TXT
486         r.rr_class = dns.DNS_QCLASS_ANY
487         r.ttl = 0
488         r.length = 0
489         prereqs.append(r)
490
491         p.ancount = len(prereqs)
492         p.answers = prereqs
493
494         response = self.dns_transaction_udp(p)
495         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
496
497     def test_update_add_txt_record(self):
498         "test adding records works"
499         prefix, txt = 'textrec', ['"This is a test"']
500         p = self.make_txt_update(prefix, txt)
501         response = self.dns_transaction_udp(p)
502         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
503         self.check_query_txt(prefix, txt)
504
505     def test_delete_record(self):
506         "Test if deleting records works"
507
508         NAME = "deleterec.%s" % self.get_dns_domain()
509
510         # First, create a record to make sure we have a record to delete.
511         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
512         updates = []
513
514         name = self.get_dns_domain()
515
516         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
517         updates.append(u)
518         self.finish_name_packet(p, updates)
519
520         updates = []
521         r = dns.res_rec()
522         r.name = NAME
523         r.rr_type = dns.DNS_QTYPE_TXT
524         r.rr_class = dns.DNS_QCLASS_IN
525         r.ttl = 900
526         r.length = 0xffff
527         rdata = make_txt_record(['"This is a test"'])
528         r.rdata = rdata
529         updates.append(r)
530         p.nscount = len(updates)
531         p.nsrecs = updates
532
533         response = self.dns_transaction_udp(p)
534         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
535
536         # Now check the record is around
537         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
538         questions = []
539         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
540         questions.append(q)
541
542         self.finish_name_packet(p, questions)
543         response = self.dns_transaction_udp(p)
544         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
545
546         # Now delete the record
547         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
548         updates = []
549
550         name = self.get_dns_domain()
551
552         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
553         updates.append(u)
554         self.finish_name_packet(p, updates)
555
556         updates = []
557         r = dns.res_rec()
558         r.name = NAME
559         r.rr_type = dns.DNS_QTYPE_TXT
560         r.rr_class = dns.DNS_QCLASS_NONE
561         r.ttl = 0
562         r.length = 0xffff
563         rdata = make_txt_record(['"This is a test"'])
564         r.rdata = rdata
565         updates.append(r)
566         p.nscount = len(updates)
567         p.nsrecs = updates
568
569         response = self.dns_transaction_udp(p)
570         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
571
572         # And finally check it's gone
573         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
574         questions = []
575
576         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
577         questions.append(q)
578
579         self.finish_name_packet(p, questions)
580         response = self.dns_transaction_udp(p)
581         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
582
583     def test_readd_record(self):
584         "Test if adding, deleting and then readding a records works"
585
586         NAME = "readdrec.%s" % self.get_dns_domain()
587
588         # Create the record
589         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
590         updates = []
591
592         name = self.get_dns_domain()
593
594         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
595         updates.append(u)
596         self.finish_name_packet(p, updates)
597
598         updates = []
599         r = dns.res_rec()
600         r.name = NAME
601         r.rr_type = dns.DNS_QTYPE_TXT
602         r.rr_class = dns.DNS_QCLASS_IN
603         r.ttl = 900
604         r.length = 0xffff
605         rdata = make_txt_record(['"This is a test"'])
606         r.rdata = rdata
607         updates.append(r)
608         p.nscount = len(updates)
609         p.nsrecs = updates
610
611         response = self.dns_transaction_udp(p)
612         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
613
614         # Now check the record is around
615         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
616         questions = []
617         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
618         questions.append(q)
619
620         self.finish_name_packet(p, questions)
621         response = self.dns_transaction_udp(p)
622         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
623
624         # Now delete the record
625         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
626         updates = []
627
628         name = self.get_dns_domain()
629
630         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
631         updates.append(u)
632         self.finish_name_packet(p, updates)
633
634         updates = []
635         r = dns.res_rec()
636         r.name = NAME
637         r.rr_type = dns.DNS_QTYPE_TXT
638         r.rr_class = dns.DNS_QCLASS_NONE
639         r.ttl = 0
640         r.length = 0xffff
641         rdata = make_txt_record(['"This is a test"'])
642         r.rdata = rdata
643         updates.append(r)
644         p.nscount = len(updates)
645         p.nsrecs = updates
646
647         response = self.dns_transaction_udp(p)
648         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
649
650         # check it's gone
651         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
652         questions = []
653
654         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
655         questions.append(q)
656
657         self.finish_name_packet(p, questions)
658         response = self.dns_transaction_udp(p)
659         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
660
661         # recreate the record
662         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
663         updates = []
664
665         name = self.get_dns_domain()
666
667         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
668         updates.append(u)
669         self.finish_name_packet(p, updates)
670
671         updates = []
672         r = dns.res_rec()
673         r.name = NAME
674         r.rr_type = dns.DNS_QTYPE_TXT
675         r.rr_class = dns.DNS_QCLASS_IN
676         r.ttl = 900
677         r.length = 0xffff
678         rdata = make_txt_record(['"This is a test"'])
679         r.rdata = rdata
680         updates.append(r)
681         p.nscount = len(updates)
682         p.nsrecs = updates
683
684         response = self.dns_transaction_udp(p)
685         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
686
687         # Now check the record is around
688         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
689         questions = []
690         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
691         questions.append(q)
692
693         self.finish_name_packet(p, questions)
694         response = self.dns_transaction_udp(p)
695         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
696
697     def test_update_add_mx_record(self):
698         "test adding MX records works"
699         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
700         updates = []
701
702         name = self.get_dns_domain()
703
704         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
705         updates.append(u)
706         self.finish_name_packet(p, updates)
707
708         updates = []
709         r = dns.res_rec()
710         r.name = "%s" % self.get_dns_domain()
711         r.rr_type = dns.DNS_QTYPE_MX
712         r.rr_class = dns.DNS_QCLASS_IN
713         r.ttl = 900
714         r.length = 0xffff
715         rdata = dns.mx_record()
716         rdata.preference = 10
717         rdata.exchange = 'mail.%s' % self.get_dns_domain()
718         r.rdata = rdata
719         updates.append(r)
720         p.nscount = len(updates)
721         p.nsrecs = updates
722
723         response = self.dns_transaction_udp(p)
724         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
725
726         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
727         questions = []
728
729         name = "%s" % self.get_dns_domain()
730         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
731         questions.append(q)
732
733         self.finish_name_packet(p, questions)
734         response = self.dns_transaction_udp(p)
735         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
736         self.assertEqual(response.ancount, 1)
737         ans = response.answers[0]
738         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
739         self.assertEqual(ans.rdata.preference, 10)
740         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
741
742
743 class TestComplexQueries(DNSTest):
744
745     def setUp(self):
746         super(TestComplexQueries, self).setUp()
747         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
748         updates = []
749
750         name = self.get_dns_domain()
751
752         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
753         updates.append(u)
754         self.finish_name_packet(p, updates)
755
756         updates = []
757         r = dns.res_rec()
758         r.name = "cname_test.%s" % self.get_dns_domain()
759         r.rr_type = dns.DNS_QTYPE_CNAME
760         r.rr_class = dns.DNS_QCLASS_IN
761         r.ttl = 900
762         r.length = 0xffff
763         r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
764         updates.append(r)
765         p.nscount = len(updates)
766         p.nsrecs = updates
767
768         response = self.dns_transaction_udp(p)
769         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
770
771     def tearDown(self):
772         super(TestComplexQueries, self).tearDown()
773         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
774         updates = []
775
776         name = self.get_dns_domain()
777
778         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
779         updates.append(u)
780         self.finish_name_packet(p, updates)
781
782         updates = []
783         r = dns.res_rec()
784         r.name = "cname_test.%s" % self.get_dns_domain()
785         r.rr_type = dns.DNS_QTYPE_CNAME
786         r.rr_class = dns.DNS_QCLASS_NONE
787         r.ttl = 0
788         r.length = 0xffff
789         r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
790         updates.append(r)
791         p.nscount = len(updates)
792         p.nsrecs = updates
793
794         response = self.dns_transaction_udp(p)
795         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
796
797     def test_one_a_query(self):
798         "create a query packet containing one query record"
799         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
800         questions = []
801
802         name = "cname_test.%s" % self.get_dns_domain()
803         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
804         print "asking for ", q.name
805         questions.append(q)
806
807         self.finish_name_packet(p, questions)
808         response = self.dns_transaction_udp(p)
809         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
810         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
811         self.assertEquals(response.ancount, 2)
812         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
813         self.assertEquals(response.answers[0].rdata, "%s.%s" %
814                           (self.server, self.get_dns_domain()))
815         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
816         self.assertEquals(response.answers[1].rdata,
817                           self.server_ip)
818
819 class TestInvalidQueries(DNSTest):
820
821     def test_one_a_query(self):
822         "send 0 bytes follows by create a query packet containing one query record"
823
824         s = None
825         try:
826             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
827             s.connect((self.server_ip, 53))
828             s.send("", 0)
829         finally:
830             if s is not None:
831                 s.close()
832
833         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
834         questions = []
835
836         name = "%s.%s" % (self.server, self.get_dns_domain())
837         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
838         print "asking for ", q.name
839         questions.append(q)
840
841         self.finish_name_packet(p, questions)
842         response = self.dns_transaction_udp(p)
843         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
844         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
845         self.assertEquals(response.ancount, 1)
846         self.assertEquals(response.answers[0].rdata,
847                           self.server_ip)
848
849     def test_one_a_reply(self):
850         "send a reply instead of a query"
851
852         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
853         questions = []
854
855         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
856         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
857         print "asking for ", q.name
858         questions.append(q)
859
860         self.finish_name_packet(p, questions)
861         p.operation |= dns.DNS_FLAG_REPLY
862         s = None
863         try:
864             send_packet = ndr.ndr_pack(p)
865             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
866             host=self.server_ip
867             s.connect((host, 53))
868             tcp_packet = struct.pack('!H', len(send_packet))
869             tcp_packet += send_packet
870             s.send(tcp_packet, 0)
871             recv_packet = s.recv(0xffff + 2, 0)
872             self.assertEquals(0, len(recv_packet))
873         finally:
874             if s is not None:
875                 s.close()
876
877 class TestRPCRoundtrip(DNSTest):
878     def setUp(self):
879         super(TestRPCRoundtrip, self).setUp()
880         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
881                                             self.lp, self.creds)
882
883     def tearDown(self):
884         super(TestRPCRoundtrip, self).tearDown()
885
886     def test_update_add_txt_rpc_to_dns(self):
887         prefix, txt = 'rpctextrec', ['"This is a test"']
888
889         name = "%s.%s" % (prefix, self.get_dns_domain())
890
891         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
892         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
893         add_rec_buf.rec = rec
894         try:
895             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
896                                      0, self.server_ip, self.get_dns_domain(),
897                                      name, add_rec_buf, None)
898
899             self.check_query_txt(prefix, txt)
900         finally:
901             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
902                                               0, self.server_ip, self.get_dns_domain(),
903                                               name, None, add_rec_buf)
904
905     def test_update_add_null_padded_txt_record(self):
906         "test adding records works"
907         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
908         p = self.make_txt_update(prefix, txt)
909         response = self.dns_transaction_udp(p)
910         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
911         self.check_query_txt(prefix, txt)
912         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
913                              self.get_dns_domain(),
914                              "%s.%s" % (prefix, self.get_dns_domain()),
915                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
916
917         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
918         p = self.make_txt_update(prefix, txt)
919         response = self.dns_transaction_udp(p)
920         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
921         self.check_query_txt(prefix, txt)
922         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
923                              self.get_dns_domain(),
924                              "%s.%s" % (prefix, self.get_dns_domain()),
925                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
926
927         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
928         p = self.make_txt_update(prefix, txt)
929         response = self.dns_transaction_udp(p)
930         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
931         self.check_query_txt(prefix, txt)
932         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
933                              self.get_dns_domain(),
934                              "%s.%s" % (prefix, self.get_dns_domain()),
935                              dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
936
937     def test_update_add_padding_rpc_to_dns(self):
938         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
939         prefix = 'rpc' + prefix
940         name = "%s.%s" % (prefix, self.get_dns_domain())
941
942         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
943         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
944         add_rec_buf.rec = rec
945         try:
946             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
947                                      0, self.server_ip, self.get_dns_domain(),
948                                      name, add_rec_buf, None)
949
950             self.check_query_txt(prefix, txt)
951         finally:
952             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
953                                               0, self.server_ip, self.get_dns_domain(),
954                                               name, None, add_rec_buf)
955
956         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
957         prefix = 'rpc' + prefix
958         name = "%s.%s" % (prefix, self.get_dns_domain())
959
960         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
961         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
962         add_rec_buf.rec = rec
963         try:
964             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
965                                      0, self.server_ip, self.get_dns_domain(),
966                                      name, add_rec_buf, None)
967
968             self.check_query_txt(prefix, txt)
969         finally:
970             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
971                                               0, self.server_ip, self.get_dns_domain(),
972                                               name, None, add_rec_buf)
973
974         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
975         prefix = 'rpc' + prefix
976         name = "%s.%s" % (prefix, self.get_dns_domain())
977
978         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
979         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
980         add_rec_buf.rec = rec
981         try:
982             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
983                                      0, self.server_ip, self.get_dns_domain(),
984                                      name, add_rec_buf, None)
985
986             self.check_query_txt(prefix, txt)
987         finally:
988             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
989                                               0, self.server_ip, self.get_dns_domain(),
990                                               name, None, add_rec_buf)
991
992     # Test is incomplete due to strlen against txt records
993     def test_update_add_null_char_txt_record(self):
994         "test adding records works"
995         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
996         p = self.make_txt_update(prefix, txt)
997         response = self.dns_transaction_udp(p)
998         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
999         self.check_query_txt(prefix, ['NULL'])
1000         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1001                              self.get_dns_domain(),
1002                              "%s.%s" % (prefix, self.get_dns_domain()),
1003                              dnsp.DNS_TYPE_TXT, '"NULL"'))
1004
1005         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1006         p = self.make_txt_update(prefix, txt)
1007         response = self.dns_transaction_udp(p)
1008         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1009         self.check_query_txt(prefix, ['NULL', 'NULL'])
1010         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1011                              self.get_dns_domain(),
1012                              "%s.%s" % (prefix, self.get_dns_domain()),
1013                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1014
1015     def test_update_add_null_char_rpc_to_dns(self):
1016         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1017         prefix = 'rpc' + prefix
1018         name = "%s.%s" % (prefix, self.get_dns_domain())
1019
1020         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1021         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1022         add_rec_buf.rec = rec
1023         try:
1024             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1025                                      0, self.server_ip, self.get_dns_domain(),
1026                                      name, add_rec_buf, None)
1027
1028             self.check_query_txt(prefix, ['NULL'])
1029         finally:
1030             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1031                                               0, self.server_ip, self.get_dns_domain(),
1032                                               name, None, add_rec_buf)
1033
1034     def test_update_add_hex_char_txt_record(self):
1035         "test adding records works"
1036         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1037         p = self.make_txt_update(prefix, txt)
1038         response = self.dns_transaction_udp(p)
1039         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1040         self.check_query_txt(prefix, txt)
1041         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1042                              self.get_dns_domain(),
1043                              "%s.%s" % (prefix, self.get_dns_domain()),
1044                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1045
1046     def test_update_add_hex_rpc_to_dns(self):
1047         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1048         prefix = 'rpc' + prefix
1049         name = "%s.%s" % (prefix, self.get_dns_domain())
1050
1051         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1052         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1053         add_rec_buf.rec = rec
1054         try:
1055             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1056                                      0, self.server_ip, self.get_dns_domain(),
1057                                      name, add_rec_buf, None)
1058
1059             self.check_query_txt(prefix, txt)
1060         finally:
1061             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1062                                               0, self.server_ip, self.get_dns_domain(),
1063                                               name, None, add_rec_buf)
1064
1065     def test_update_add_slash_txt_record(self):
1066         "test adding records works"
1067         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1068         p = self.make_txt_update(prefix, txt)
1069         response = self.dns_transaction_udp(p)
1070         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1071         self.check_query_txt(prefix, txt)
1072         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1073                              self.get_dns_domain(),
1074                              "%s.%s" % (prefix, self.get_dns_domain()),
1075                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1076
1077     # This test fails against Windows as it eliminates slashes in RPC
1078     # One typical use for a slash is in records like 'var=value' to
1079     # escape '=' characters.
1080     def test_update_add_slash_rpc_to_dns(self):
1081         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1082         prefix = 'rpc' + prefix
1083         name = "%s.%s" % (prefix, self.get_dns_domain())
1084
1085         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1086         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1087         add_rec_buf.rec = rec
1088         try:
1089             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1090                                      0, self.server_ip, self.get_dns_domain(),
1091                                      name, add_rec_buf, None)
1092
1093             self.check_query_txt(prefix, txt)
1094         finally:
1095             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1096                                               0, self.server_ip, self.get_dns_domain(),
1097                                               name, None, add_rec_buf)
1098
1099     def test_update_add_two_txt_records(self):
1100         "test adding two txt records works"
1101         prefix, txt = 'textrec2', ['"This is a test"',
1102                                    '"and this is a test, too"']
1103         p = self.make_txt_update(prefix, txt)
1104         response = self.dns_transaction_udp(p)
1105         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1106         self.check_query_txt(prefix, txt)
1107         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1108                              self.get_dns_domain(),
1109                              "%s.%s" % (prefix, self.get_dns_domain()),
1110                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1111                              ' "\\"and this is a test, too\\""'))
1112
1113     def test_update_add_two_rpc_to_dns(self):
1114         prefix, txt = 'textrec2', ['"This is a test"',
1115                                    '"and this is a test, too"']
1116         prefix = 'rpc' + prefix
1117         name = "%s.%s" % (prefix, self.get_dns_domain())
1118
1119         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1120                                 '"\\"This is a test\\""' +
1121                                 ' "\\"and this is a test, too\\""')
1122         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1123         add_rec_buf.rec = rec
1124         try:
1125             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1126                                      0, self.server_ip, self.get_dns_domain(),
1127                                      name, add_rec_buf, None)
1128
1129             self.check_query_txt(prefix, txt)
1130         finally:
1131             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1132                                               0, self.server_ip, self.get_dns_domain(),
1133                                               name, None, add_rec_buf)
1134
1135     def test_update_add_empty_txt_records(self):
1136         "test adding two txt records works"
1137         prefix, txt = 'emptytextrec', []
1138         p = self.make_txt_update(prefix, txt)
1139         response = self.dns_transaction_udp(p)
1140         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1141         self.check_query_txt(prefix, txt)
1142         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1143                              self.get_dns_domain(),
1144                              "%s.%s" % (prefix, self.get_dns_domain()),
1145                              dnsp.DNS_TYPE_TXT, ''))
1146
1147     def test_update_add_empty_rpc_to_dns(self):
1148         prefix, txt = 'rpcemptytextrec', []
1149
1150         name = "%s.%s" % (prefix, self.get_dns_domain())
1151
1152         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1153         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1154         add_rec_buf.rec = rec
1155         try:
1156             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1157                                      0, self.server_ip, self.get_dns_domain(),
1158                                      name, add_rec_buf, None)
1159
1160             self.check_query_txt(prefix, txt)
1161         finally:
1162             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1163                                               0, self.server_ip, self.get_dns_domain(),
1164                                               name, None, add_rec_buf)
1165
1166 runner = SubunitTestRunner()
1167 rc = 0
1168 if not runner.run(unittest.makeSuite(DNSTest)).wasSuccessful():
1169     rc = 1
1170 if not runner.run(unittest.makeSuite(TestSimpleQueries)).wasSuccessful():
1171     rc = 1
1172 if not runner.run(unittest.makeSuite(TestDNSUpdates)).wasSuccessful():
1173     rc = 1
1174 if not runner.run(unittest.makeSuite(TestComplexQueries)).wasSuccessful():
1175     rc = 1
1176 if not runner.run(unittest.makeSuite(TestInvalidQueries)).wasSuccessful():
1177     rc = 1
1178 if not runner.run(unittest.makeSuite(TestRPCRoundtrip)).wasSuccessful():
1179     rc = 1
1180
1181 sys.exit(rc)