8aecfe3be59ff36c4cb62ec2aba4337cff85c52f
[metze/samba/wip.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import sys
20 import struct
21 import random
22 import socket
23 import samba.ndr as ndr
24 from samba import credentials, param
25 from samba.dcerpc import dns, dnsp, dnsserver
26 from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
27 from samba.tests.subunitrun import SubunitOptions, TestProgram
28 from samba import werror, WERRORError
29 from samba.tests.dns_base import DNSTest
30 import samba.getopt as options
31 import optparse
32
33 parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
34 sambaopts = options.SambaOptions(parser)
35 parser.add_option_group(sambaopts)
36
37 # This timeout only has relevance when testing against Windows
38 # Format errors tend to return patchy responses, so a timeout is needed.
39 parser.add_option("--timeout", type="int", dest="timeout",
40                   help="Specify timeout for DNS requests")
41
42 # use command line creds if available
43 credopts = options.CredentialsOptions(parser)
44 parser.add_option_group(credopts)
45 subunitopts = SubunitOptions(parser)
46 parser.add_option_group(subunitopts)
47
48 opts, args = parser.parse_args()
49
50 lp = sambaopts.get_loadparm()
51 creds = credopts.get_credentials(lp)
52
53 timeout = opts.timeout
54
55 if len(args) < 2:
56     parser.print_usage()
57     sys.exit(1)
58
59 server_name = args[0]
60 server_ip = args[1]
61 creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
62
63 class TestSimpleQueries(DNSTest):
64     def setUp(self):
65         super(TestSimpleQueries, self).setUp()
66         global server, server_ip, lp, creds, timeout
67         self.server = server_name
68         self.server_ip = server_ip
69         self.lp = lp
70         self.creds = creds
71         self.timeout = timeout
72
73     def test_one_a_query(self):
74         "create a query packet containing one query record"
75         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
76         questions = []
77
78         name = "%s.%s" % (self.server, self.get_dns_domain())
79         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
80         print "asking for ", q.name
81         questions.append(q)
82
83         self.finish_name_packet(p, questions)
84         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
85         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
86         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
87         self.assertEquals(response.ancount, 1)
88         self.assertEquals(response.answers[0].rdata,
89                           self.server_ip)
90
91     def test_one_SOA_query(self):
92         "create a query packet containing one query record for the SOA"
93         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
94         questions = []
95
96         name = "%s" % (self.get_dns_domain())
97         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
98         print "asking for ", q.name
99         questions.append(q)
100
101         self.finish_name_packet(p, questions)
102         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
103         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
104         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
105         self.assertEquals(response.ancount, 1)
106         self.assertEquals(response.answers[0].rdata.mname.upper(),
107                           ("%s.%s" % (self.server, self.get_dns_domain())).upper())
108
109     def test_one_a_query_tcp(self):
110         "create a query packet containing one query record via TCP"
111         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
112         questions = []
113
114         name = "%s.%s" % (self.server, self.get_dns_domain())
115         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
116         print "asking for ", q.name
117         questions.append(q)
118
119         self.finish_name_packet(p, questions)
120         (response, response_packet) = self.dns_transaction_tcp(p, host=server_ip)
121         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
122         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
123         self.assertEquals(response.ancount, 1)
124         self.assertEquals(response.answers[0].rdata,
125                           self.server_ip)
126
127     def test_one_mx_query(self):
128         "create a query packet causing an empty RCODE_OK answer"
129         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
130         questions = []
131
132         name = "%s.%s" % (self.server, self.get_dns_domain())
133         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
134         print "asking for ", q.name
135         questions.append(q)
136
137         self.finish_name_packet(p, questions)
138         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
139         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
140         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
141         self.assertEquals(response.ancount, 0)
142
143         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
144         questions = []
145
146         name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
147         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
148         print "asking for ", q.name
149         questions.append(q)
150
151         self.finish_name_packet(p, questions)
152         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
153         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
154         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
155         self.assertEquals(response.ancount, 0)
156
157     def test_two_queries(self):
158         "create a query packet containing two query records"
159         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
160         questions = []
161
162         name = "%s.%s" % (self.server, self.get_dns_domain())
163         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
164         questions.append(q)
165
166         name = "%s.%s" % ('bogusname', self.get_dns_domain())
167         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
168         questions.append(q)
169
170         self.finish_name_packet(p, questions)
171         try:
172             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
173             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
174         except socket.timeout:
175             # Windows chooses not to respond to incorrectly formatted queries.
176             # Although this appears to be non-deterministic even for the same
177             # request twice, it also appears to be based on a how poorly the
178             # request is formatted.
179             pass
180
181     def test_qtype_all_query(self):
182         "create a QTYPE_ALL query"
183         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
184         questions = []
185
186         name = "%s.%s" % (self.server, self.get_dns_domain())
187         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
188         print "asking for ", q.name
189         questions.append(q)
190
191         self.finish_name_packet(p, questions)
192         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
193
194         num_answers = 1
195         dc_ipv6 = os.getenv('SERVER_IPV6')
196         if dc_ipv6 is not None:
197             num_answers += 1
198
199         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
200         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
201         self.assertEquals(response.ancount, num_answers)
202         self.assertEquals(response.answers[0].rdata,
203                           self.server_ip)
204         if dc_ipv6 is not None:
205             self.assertEquals(response.answers[1].rdata, dc_ipv6)
206
207     def test_qclass_none_query(self):
208         "create a QCLASS_NONE query"
209         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
210         questions = []
211
212         name = "%s.%s" % (self.server, self.get_dns_domain())
213         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
214         questions.append(q)
215
216         self.finish_name_packet(p, questions)
217         try:
218             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
219             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
220         except socket.timeout:
221             # Windows chooses not to respond to incorrectly formatted queries.
222             # Although this appears to be non-deterministic even for the same
223             # request twice, it also appears to be based on a how poorly the
224             # request is formatted.
225             pass
226
227     def test_soa_hostname_query(self):
228         "create a SOA query for a hostname"
229         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
230         questions = []
231
232         name = "%s.%s" % (self.server, self.get_dns_domain())
233         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
234         questions.append(q)
235
236         self.finish_name_packet(p, questions)
237         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
238         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
239         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
240         # We don't get SOA records for single hosts
241         self.assertEquals(response.ancount, 0)
242         # But we do respond with an authority section
243         self.assertEqual(response.nscount, 1)
244
245     def test_soa_domain_query(self):
246         "create a SOA query for a domain"
247         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
248         questions = []
249
250         name = self.get_dns_domain()
251         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
252         questions.append(q)
253
254         self.finish_name_packet(p, questions)
255         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
256         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
257         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
258         self.assertEquals(response.ancount, 1)
259         self.assertEquals(response.answers[0].rdata.minimum, 3600)
260
261
262 class TestDNSUpdates(DNSTest):
263     def setUp(self):
264         super(TestDNSUpdates, self).setUp()
265         global server, server_ip, lp, creds, timeout
266         self.server = server_name
267         self.server_ip = server_ip
268         self.lp = lp
269         self.creds = creds
270         self.timeout = timeout
271
272     def test_two_updates(self):
273         "create two update requests"
274         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
275         updates = []
276
277         name = "%s.%s" % (self.server, self.get_dns_domain())
278         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
279         updates.append(u)
280
281         name = self.get_dns_domain()
282         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
283         updates.append(u)
284
285         self.finish_name_packet(p, updates)
286         try:
287             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
288             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
289         except socket.timeout:
290             # Windows chooses not to respond to incorrectly formatted queries.
291             # Although this appears to be non-deterministic even for the same
292             # request twice, it also appears to be based on a how poorly the
293             # request is formatted.
294             pass
295
296     def test_update_wrong_qclass(self):
297         "create update with DNS_QCLASS_NONE"
298         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
299         updates = []
300
301         name = self.get_dns_domain()
302         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
303         updates.append(u)
304
305         self.finish_name_packet(p, updates)
306         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
307         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
308
309     def test_update_prereq_with_non_null_ttl(self):
310         "test update with a non-null TTL"
311         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
312         updates = []
313
314         name = self.get_dns_domain()
315
316         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
317         updates.append(u)
318         self.finish_name_packet(p, updates)
319
320         prereqs = []
321         r = dns.res_rec()
322         r.name = "%s.%s" % (self.server, self.get_dns_domain())
323         r.rr_type = dns.DNS_QTYPE_TXT
324         r.rr_class = dns.DNS_QCLASS_NONE
325         r.ttl = 1
326         r.length = 0
327         prereqs.append(r)
328
329         p.ancount = len(prereqs)
330         p.answers = prereqs
331
332         try:
333             (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
334             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
335         except socket.timeout:
336             # Windows chooses not to respond to incorrectly formatted queries.
337             # Although this appears to be non-deterministic even for the same
338             # request twice, it also appears to be based on a how poorly the
339             # request is formatted.
340             pass
341
342     def test_update_prereq_with_non_null_length(self):
343         "test update with a non-null length"
344         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
345         updates = []
346
347         name = self.get_dns_domain()
348
349         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
350         updates.append(u)
351         self.finish_name_packet(p, updates)
352
353         prereqs = []
354         r = dns.res_rec()
355         r.name = "%s.%s" % (self.server, self.get_dns_domain())
356         r.rr_type = dns.DNS_QTYPE_TXT
357         r.rr_class = dns.DNS_QCLASS_ANY
358         r.ttl = 0
359         r.length = 1
360         prereqs.append(r)
361
362         p.ancount = len(prereqs)
363         p.answers = prereqs
364
365         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
366         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
367
368     def test_update_prereq_nonexisting_name(self):
369         "test update with a nonexisting name"
370         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
371         updates = []
372
373         name = self.get_dns_domain()
374
375         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
376         updates.append(u)
377         self.finish_name_packet(p, updates)
378
379         prereqs = []
380         r = dns.res_rec()
381         r.name = "idontexist.%s" % self.get_dns_domain()
382         r.rr_type = dns.DNS_QTYPE_TXT
383         r.rr_class = dns.DNS_QCLASS_ANY
384         r.ttl = 0
385         r.length = 0
386         prereqs.append(r)
387
388         p.ancount = len(prereqs)
389         p.answers = prereqs
390
391         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
392         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
393
394     def test_update_add_txt_record(self):
395         "test adding records works"
396         prefix, txt = 'textrec', ['"This is a test"']
397         p = self.make_txt_update(prefix, txt)
398         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
399         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
400         self.check_query_txt(prefix, txt)
401
402     def test_delete_record(self):
403         "Test if deleting records works"
404
405         NAME = "deleterec.%s" % self.get_dns_domain()
406
407         # First, create a record to make sure we have a record to delete.
408         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
409         updates = []
410
411         name = self.get_dns_domain()
412
413         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
414         updates.append(u)
415         self.finish_name_packet(p, updates)
416
417         updates = []
418         r = dns.res_rec()
419         r.name = NAME
420         r.rr_type = dns.DNS_QTYPE_TXT
421         r.rr_class = dns.DNS_QCLASS_IN
422         r.ttl = 900
423         r.length = 0xffff
424         rdata = self.make_txt_record(['"This is a test"'])
425         r.rdata = rdata
426         updates.append(r)
427         p.nscount = len(updates)
428         p.nsrecs = updates
429
430         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
431         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
432
433         # Now check the record is around
434         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
435         questions = []
436         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
437         questions.append(q)
438
439         self.finish_name_packet(p, questions)
440         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
441         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
442
443         # Now delete the record
444         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
445         updates = []
446
447         name = self.get_dns_domain()
448
449         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
450         updates.append(u)
451         self.finish_name_packet(p, updates)
452
453         updates = []
454         r = dns.res_rec()
455         r.name = NAME
456         r.rr_type = dns.DNS_QTYPE_TXT
457         r.rr_class = dns.DNS_QCLASS_NONE
458         r.ttl = 0
459         r.length = 0xffff
460         rdata = self.make_txt_record(['"This is a test"'])
461         r.rdata = rdata
462         updates.append(r)
463         p.nscount = len(updates)
464         p.nsrecs = updates
465
466         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
467         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
468
469         # And finally check it's gone
470         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
471         questions = []
472
473         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
474         questions.append(q)
475
476         self.finish_name_packet(p, questions)
477         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
478         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
479
480     def test_readd_record(self):
481         "Test if adding, deleting and then readding a records works"
482
483         NAME = "readdrec.%s" % self.get_dns_domain()
484
485         # Create the record
486         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
487         updates = []
488
489         name = self.get_dns_domain()
490
491         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
492         updates.append(u)
493         self.finish_name_packet(p, updates)
494
495         updates = []
496         r = dns.res_rec()
497         r.name = NAME
498         r.rr_type = dns.DNS_QTYPE_TXT
499         r.rr_class = dns.DNS_QCLASS_IN
500         r.ttl = 900
501         r.length = 0xffff
502         rdata = self.make_txt_record(['"This is a test"'])
503         r.rdata = rdata
504         updates.append(r)
505         p.nscount = len(updates)
506         p.nsrecs = updates
507
508         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
509         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
510
511         # Now check the record is around
512         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
513         questions = []
514         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
515         questions.append(q)
516
517         self.finish_name_packet(p, questions)
518         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
519         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
520
521         # Now delete the record
522         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
523         updates = []
524
525         name = self.get_dns_domain()
526
527         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
528         updates.append(u)
529         self.finish_name_packet(p, updates)
530
531         updates = []
532         r = dns.res_rec()
533         r.name = NAME
534         r.rr_type = dns.DNS_QTYPE_TXT
535         r.rr_class = dns.DNS_QCLASS_NONE
536         r.ttl = 0
537         r.length = 0xffff
538         rdata = self.make_txt_record(['"This is a test"'])
539         r.rdata = rdata
540         updates.append(r)
541         p.nscount = len(updates)
542         p.nsrecs = updates
543
544         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
545         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
546
547         # check it's gone
548         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
549         questions = []
550
551         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
552         questions.append(q)
553
554         self.finish_name_packet(p, questions)
555         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
556         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
557
558         # recreate the record
559         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
560         updates = []
561
562         name = self.get_dns_domain()
563
564         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
565         updates.append(u)
566         self.finish_name_packet(p, updates)
567
568         updates = []
569         r = dns.res_rec()
570         r.name = NAME
571         r.rr_type = dns.DNS_QTYPE_TXT
572         r.rr_class = dns.DNS_QCLASS_IN
573         r.ttl = 900
574         r.length = 0xffff
575         rdata = self.make_txt_record(['"This is a test"'])
576         r.rdata = rdata
577         updates.append(r)
578         p.nscount = len(updates)
579         p.nsrecs = updates
580
581         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
582         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
583
584         # Now check the record is around
585         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
586         questions = []
587         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
588         questions.append(q)
589
590         self.finish_name_packet(p, questions)
591         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
592         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
593
594     def test_update_add_mx_record(self):
595         "test adding MX records works"
596         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
597         updates = []
598
599         name = self.get_dns_domain()
600
601         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
602         updates.append(u)
603         self.finish_name_packet(p, updates)
604
605         updates = []
606         r = dns.res_rec()
607         r.name = "%s" % self.get_dns_domain()
608         r.rr_type = dns.DNS_QTYPE_MX
609         r.rr_class = dns.DNS_QCLASS_IN
610         r.ttl = 900
611         r.length = 0xffff
612         rdata = dns.mx_record()
613         rdata.preference = 10
614         rdata.exchange = 'mail.%s' % self.get_dns_domain()
615         r.rdata = rdata
616         updates.append(r)
617         p.nscount = len(updates)
618         p.nsrecs = updates
619
620         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
621         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
622
623         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
624         questions = []
625
626         name = "%s" % self.get_dns_domain()
627         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
628         questions.append(q)
629
630         self.finish_name_packet(p, questions)
631         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
632         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
633         self.assertEqual(response.ancount, 1)
634         ans = response.answers[0]
635         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
636         self.assertEqual(ans.rdata.preference, 10)
637         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
638
639
640 class TestComplexQueries(DNSTest):
641     def make_dns_update(self, key, value, qtype):
642         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
643
644         name = self.get_dns_domain()
645         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
646         self.finish_name_packet(p, [u])
647
648         r = dns.res_rec()
649         r.name = key
650         r.rr_type = qtype
651         r.rr_class = dns.DNS_QCLASS_IN
652         r.ttl = 900
653         r.length = 0xffff
654         rdata = value
655         r.rdata = rdata
656         updates = [r]
657         p.nscount = 1
658         p.nsrecs = updates
659         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
660         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
661
662     def setUp(self):
663         super(TestComplexQueries, self).setUp()
664
665         global server, server_ip, lp, creds, timeout
666         self.server = server_name
667         self.server_ip = server_ip
668         self.lp = lp
669         self.creds = creds
670         self.timeout = timeout
671
672     def test_one_a_query(self):
673         "create a query packet containing one query record"
674
675         try:
676
677             # Create the record
678             name = "cname_test.%s" % self.get_dns_domain()
679             rdata = "%s.%s" % (self.server, self.get_dns_domain())
680             self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
681
682             p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
683             questions = []
684
685             # Check the record
686             name = "cname_test.%s" % self.get_dns_domain()
687             q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
688             print "asking for ", q.name
689             questions.append(q)
690
691             self.finish_name_packet(p, questions)
692             (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
693             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
694             self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
695             self.assertEquals(response.ancount, 2)
696             self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
697             self.assertEquals(response.answers[0].rdata, "%s.%s" %
698                               (self.server, self.get_dns_domain()))
699             self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
700             self.assertEquals(response.answers[1].rdata,
701                               self.server_ip)
702
703         finally:
704             # Delete the record
705             p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
706             updates = []
707
708             name = self.get_dns_domain()
709
710             u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
711             updates.append(u)
712             self.finish_name_packet(p, updates)
713
714             updates = []
715             r = dns.res_rec()
716             r.name = "cname_test.%s" % self.get_dns_domain()
717             r.rr_type = dns.DNS_QTYPE_CNAME
718             r.rr_class = dns.DNS_QCLASS_NONE
719             r.ttl = 0
720             r.length = 0xffff
721             r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
722             updates.append(r)
723             p.nscount = len(updates)
724             p.nsrecs = updates
725
726             (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
727             self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
728
729     def test_cname_two_chain(self):
730         name0 = "cnamechain0.%s" % self.get_dns_domain()
731         name1 = "cnamechain1.%s" % self.get_dns_domain()
732         name2 = "cnamechain2.%s" % self.get_dns_domain()
733         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
734         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
735         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
736
737         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
738         questions = []
739         q = self.make_name_question(name1, dns.DNS_QTYPE_A,
740                                     dns.DNS_QCLASS_IN)
741         questions.append(q)
742
743         self.finish_name_packet(p, questions)
744         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
745         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
746         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
747         self.assertEquals(response.ancount, 3)
748
749         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
750         self.assertEquals(response.answers[0].name, name1)
751         self.assertEquals(response.answers[0].rdata, name2)
752
753         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
754         self.assertEquals(response.answers[1].name, name2)
755         self.assertEquals(response.answers[1].rdata, name0)
756
757         self.assertEquals(response.answers[2].rr_type, dns.DNS_QTYPE_A)
758         self.assertEquals(response.answers[2].rdata,
759                           self.server_ip)
760
761     def test_invalid_empty_cname(self):
762         name0 = "cnamedotprefix0.%s" % self.get_dns_domain()
763         try:
764             self.make_dns_update(name0, "", dns.DNS_QTYPE_CNAME)
765         except AssertionError as e:
766             pass
767         else:
768             self.fail("Successfully added empty CNAME, which is invalid.")
769
770     def test_cname_two_chain_not_matching_qtype(self):
771         name0 = "cnamechain0.%s" % self.get_dns_domain()
772         name1 = "cnamechain1.%s" % self.get_dns_domain()
773         name2 = "cnamechain2.%s" % self.get_dns_domain()
774         self.make_dns_update(name1, name2, dns.DNS_QTYPE_CNAME)
775         self.make_dns_update(name2, name0, dns.DNS_QTYPE_CNAME)
776         self.make_dns_update(name0, server_ip, dns.DNS_QTYPE_A)
777
778         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
779         questions = []
780         q = self.make_name_question(name1, dns.DNS_QTYPE_TXT,
781                                     dns.DNS_QCLASS_IN)
782         questions.append(q)
783
784         self.finish_name_packet(p, questions)
785         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
786         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
787         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
788
789         # CNAME should return all intermediate results!
790         # Only the A records exists, not the TXT.
791         self.assertEquals(response.ancount, 2)
792
793         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
794         self.assertEquals(response.answers[0].name, name1)
795         self.assertEquals(response.answers[0].rdata, name2)
796
797         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_CNAME)
798         self.assertEquals(response.answers[1].name, name2)
799         self.assertEquals(response.answers[1].rdata, name0)
800
801 class TestInvalidQueries(DNSTest):
802     def setUp(self):
803         super(TestInvalidQueries, self).setUp()
804         global server, server_ip, lp, creds, timeout
805         self.server = server_name
806         self.server_ip = server_ip
807         self.lp = lp
808         self.creds = creds
809         self.timeout = timeout
810
811     def test_one_a_query(self):
812         "send 0 bytes follows by create a query packet containing one query record"
813
814         s = None
815         try:
816             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
817             s.connect((self.server_ip, 53))
818             s.send("", 0)
819         finally:
820             if s is not None:
821                 s.close()
822
823         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
824         questions = []
825
826         name = "%s.%s" % (self.server, self.get_dns_domain())
827         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
828         print "asking for ", q.name
829         questions.append(q)
830
831         self.finish_name_packet(p, questions)
832         (response, response_packet) = self.dns_transaction_udp(p, host=self.server_ip)
833         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
834         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
835         self.assertEquals(response.ancount, 1)
836         self.assertEquals(response.answers[0].rdata,
837                           self.server_ip)
838
839     def test_one_a_reply(self):
840         "send a reply instead of a query"
841         global timeout
842
843         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
844         questions = []
845
846         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
847         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
848         print "asking for ", q.name
849         questions.append(q)
850
851         self.finish_name_packet(p, questions)
852         p.operation |= dns.DNS_FLAG_REPLY
853         s = None
854         try:
855             send_packet = ndr.ndr_pack(p)
856             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
857             s.settimeout(timeout)
858             host=self.server_ip
859             s.connect((host, 53))
860             tcp_packet = struct.pack('!H', len(send_packet))
861             tcp_packet += send_packet
862             s.send(tcp_packet, 0)
863             recv_packet = s.recv(0xffff + 2, 0)
864             self.assertEquals(0, len(recv_packet))
865         except socket.timeout:
866             # Windows chooses not to respond to incorrectly formatted queries.
867             # Although this appears to be non-deterministic even for the same
868             # request twice, it also appears to be based on a how poorly the
869             # request is formatted.
870             pass
871         finally:
872             if s is not None:
873                 s.close()
874
875 class TestZones(DNSTest):
876     def setUp(self):
877         super(TestZones, self).setUp()
878         global server, server_ip, lp, creds, timeout
879         self.server = server_name
880         self.server_ip = server_ip
881         self.lp = lp
882         self.creds = creds
883         self.timeout = timeout
884
885         self.zone = "test.lan"
886         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
887                                             self.lp, self.creds)
888
889     def tearDown(self):
890         super(TestZones, self).tearDown()
891         try:
892             self.delete_zone(self.zone)
893         except RuntimeError as e:
894             (num, string) = e.args
895             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
896                 raise
897
898     def create_zone(self, zone):
899         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
900         zone_create.pszZoneName = zone
901         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
902         zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
903         zone_create.fAging = 0
904         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
905         try:
906             self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
907                                            0,
908                                            self.server_ip,
909                                            None,
910                                            0,
911                                            'ZoneCreate',
912                                            dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
913                                            zone_create)
914         except WERRORError as e:
915             self.fail(str(e))
916
917     def delete_zone(self, zone):
918         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
919                                        0,
920                                        self.server_ip,
921                                        zone,
922                                        0,
923                                        'DeleteZoneFromDs',
924                                        dnsserver.DNSSRV_TYPEID_NULL,
925                                        None)
926
927     def test_soa_query(self):
928         zone = "test.lan"
929         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
930         questions = []
931
932         q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
933         questions.append(q)
934         self.finish_name_packet(p, questions)
935
936         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
937         # Windows returns OK while BIND logically seems to return NXDOMAIN
938         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
939         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
940         self.assertEquals(response.ancount, 0)
941
942         self.create_zone(zone)
943         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
944         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
945         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
946         self.assertEquals(response.ancount, 1)
947         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
948
949         self.delete_zone(zone)
950         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
951         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
952         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
953         self.assertEquals(response.ancount, 0)
954
955 class TestRPCRoundtrip(DNSTest):
956     def setUp(self):
957         super(TestRPCRoundtrip, self).setUp()
958         global server, server_ip, lp, creds
959         self.server = server_name
960         self.server_ip = server_ip
961         self.lp = lp
962         self.creds = creds
963         self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
964                                             self.lp, self.creds)
965
966     def tearDown(self):
967         super(TestRPCRoundtrip, self).tearDown()
968
969     def test_update_add_txt_rpc_to_dns(self):
970         prefix, txt = 'rpctextrec', ['"This is a test"']
971
972         name = "%s.%s" % (prefix, self.get_dns_domain())
973
974         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
975         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
976         add_rec_buf.rec = rec
977         try:
978             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
979                                      0, self.server_ip, self.get_dns_domain(),
980                                      name, add_rec_buf, None)
981         except WERRORError as e:
982             self.fail(str(e))
983
984         try:
985             self.check_query_txt(prefix, txt)
986         finally:
987             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
988                                               0, self.server_ip, self.get_dns_domain(),
989                                               name, None, add_rec_buf)
990
991     def test_update_add_null_padded_txt_record(self):
992         "test adding records works"
993         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
994         p = self.make_txt_update(prefix, txt)
995         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
996         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
997         self.check_query_txt(prefix, txt)
998         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
999                              self.get_dns_domain(),
1000                              "%s.%s" % (prefix, self.get_dns_domain()),
1001                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
1002
1003         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1004         p = self.make_txt_update(prefix, txt)
1005         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1006         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1007         self.check_query_txt(prefix, txt)
1008         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1009                              self.get_dns_domain(),
1010                              "%s.%s" % (prefix, self.get_dns_domain()),
1011                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
1012
1013         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1014         p = self.make_txt_update(prefix, txt)
1015         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1016         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1017         self.check_query_txt(prefix, txt)
1018         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1019                              self.get_dns_domain(),
1020                              "%s.%s" % (prefix, self.get_dns_domain()),
1021                              dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
1022
1023     def test_update_add_padding_rpc_to_dns(self):
1024         prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
1025         prefix = 'rpc' + prefix
1026         name = "%s.%s" % (prefix, self.get_dns_domain())
1027
1028         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
1029         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1030         add_rec_buf.rec = rec
1031         try:
1032             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1033                                      0, self.server_ip, self.get_dns_domain(),
1034                                      name, add_rec_buf, None)
1035
1036         except WERRORError as e:
1037             self.fail(str(e))
1038
1039         try:
1040             self.check_query_txt(prefix, txt)
1041         finally:
1042             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1043                                               0, self.server_ip, self.get_dns_domain(),
1044                                               name, None, add_rec_buf)
1045
1046         prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
1047         prefix = 'rpc' + prefix
1048         name = "%s.%s" % (prefix, self.get_dns_domain())
1049
1050         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
1051         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1052         add_rec_buf.rec = rec
1053         try:
1054             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1055                                      0, self.server_ip, self.get_dns_domain(),
1056                                      name, add_rec_buf, None)
1057
1058         except WERRORError as e:
1059             self.fail(str(e))
1060
1061         try:
1062             self.check_query_txt(prefix, txt)
1063         finally:
1064             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1065                                               0, self.server_ip, self.get_dns_domain(),
1066                                               name, None, add_rec_buf)
1067
1068         prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
1069         prefix = 'rpc' + prefix
1070         name = "%s.%s" % (prefix, self.get_dns_domain())
1071
1072         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
1073         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1074         add_rec_buf.rec = rec
1075         try:
1076             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1077                                      0, self.server_ip, self.get_dns_domain(),
1078                                      name, add_rec_buf, None)
1079         except WERRORError as e:
1080             self.fail(str(e))
1081
1082         try:
1083             self.check_query_txt(prefix, txt)
1084         finally:
1085             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1086                                               0, self.server_ip, self.get_dns_domain(),
1087                                               name, None, add_rec_buf)
1088
1089     # Test is incomplete due to strlen against txt records
1090     def test_update_add_null_char_txt_record(self):
1091         "test adding records works"
1092         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1093         p = self.make_txt_update(prefix, txt)
1094         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1095         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1096         self.check_query_txt(prefix, ['NULL'])
1097         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1098                              self.get_dns_domain(),
1099                              "%s.%s" % (prefix, self.get_dns_domain()),
1100                              dnsp.DNS_TYPE_TXT, '"NULL"'))
1101
1102         prefix, txt = 'nulltextrec2', ['NULL\x00BYTE', 'NULL\x00BYTE']
1103         p = self.make_txt_update(prefix, txt)
1104         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1105         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1106         self.check_query_txt(prefix, ['NULL', 'NULL'])
1107         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1108                              self.get_dns_domain(),
1109                              "%s.%s" % (prefix, self.get_dns_domain()),
1110                              dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
1111
1112     def test_update_add_null_char_rpc_to_dns(self):
1113         prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
1114         prefix = 'rpc' + prefix
1115         name = "%s.%s" % (prefix, self.get_dns_domain())
1116
1117         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
1118         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1119         add_rec_buf.rec = rec
1120         try:
1121             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1122                                      0, self.server_ip, self.get_dns_domain(),
1123                                      name, add_rec_buf, None)
1124
1125         except WERRORError as e:
1126             self.fail(str(e))
1127
1128         try:
1129            self.check_query_txt(prefix, ['NULL'])
1130         finally:
1131             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1132                                               0, self.server_ip, self.get_dns_domain(),
1133                                               name, None, add_rec_buf)
1134
1135     def test_update_add_hex_char_txt_record(self):
1136         "test adding records works"
1137         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1138         p = self.make_txt_update(prefix, txt)
1139         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1140         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1141         self.check_query_txt(prefix, txt)
1142         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1143                              self.get_dns_domain(),
1144                              "%s.%s" % (prefix, self.get_dns_domain()),
1145                              dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
1146
1147     def test_update_add_hex_rpc_to_dns(self):
1148         prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
1149         prefix = 'rpc' + prefix
1150         name = "%s.%s" % (prefix, self.get_dns_domain())
1151
1152         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
1153         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1154         add_rec_buf.rec = rec
1155         try:
1156             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1157                                      0, self.server_ip, self.get_dns_domain(),
1158                                      name, add_rec_buf, None)
1159
1160         except WERRORError as e:
1161             self.fail(str(e))
1162
1163         try:
1164            self.check_query_txt(prefix, txt)
1165         finally:
1166             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1167                                               0, self.server_ip, self.get_dns_domain(),
1168                                               name, None, add_rec_buf)
1169
1170     def test_update_add_slash_txt_record(self):
1171         "test adding records works"
1172         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1173         p = self.make_txt_update(prefix, txt)
1174         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1175         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1176         self.check_query_txt(prefix, txt)
1177         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1178                              self.get_dns_domain(),
1179                              "%s.%s" % (prefix, self.get_dns_domain()),
1180                              dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
1181
1182     # This test fails against Windows as it eliminates slashes in RPC
1183     # One typical use for a slash is in records like 'var=value' to
1184     # escape '=' characters.
1185     def test_update_add_slash_rpc_to_dns(self):
1186         prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
1187         prefix = 'rpc' + prefix
1188         name = "%s.%s" % (prefix, self.get_dns_domain())
1189
1190         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
1191         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1192         add_rec_buf.rec = rec
1193         try:
1194             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1195                                      0, self.server_ip, self.get_dns_domain(),
1196                                      name, add_rec_buf, None)
1197
1198         except WERRORError as e:
1199             self.fail(str(e))
1200
1201         try:
1202             self.check_query_txt(prefix, txt)
1203
1204         finally:
1205             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1206                                               0, self.server_ip, self.get_dns_domain(),
1207                                               name, None, add_rec_buf)
1208
1209     def test_update_add_two_txt_records(self):
1210         "test adding two txt records works"
1211         prefix, txt = 'textrec2', ['"This is a test"',
1212                                    '"and this is a test, too"']
1213         p = self.make_txt_update(prefix, txt)
1214         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1215         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1216         self.check_query_txt(prefix, txt)
1217         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1218                              self.get_dns_domain(),
1219                              "%s.%s" % (prefix, self.get_dns_domain()),
1220                              dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
1221                              ' "\\"and this is a test, too\\""'))
1222
1223     def test_update_add_two_rpc_to_dns(self):
1224         prefix, txt = 'textrec2', ['"This is a test"',
1225                                    '"and this is a test, too"']
1226         prefix = 'rpc' + prefix
1227         name = "%s.%s" % (prefix, self.get_dns_domain())
1228
1229         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
1230                                 '"\\"This is a test\\""' +
1231                                 ' "\\"and this is a test, too\\""')
1232         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1233         add_rec_buf.rec = rec
1234         try:
1235             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1236                                      0, self.server_ip, self.get_dns_domain(),
1237                                      name, add_rec_buf, None)
1238
1239         except WERRORError as e:
1240             self.fail(str(e))
1241
1242         try:
1243             self.check_query_txt(prefix, txt)
1244         finally:
1245             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1246                                               0, self.server_ip, self.get_dns_domain(),
1247                                               name, None, add_rec_buf)
1248
1249     def test_update_add_empty_txt_records(self):
1250         "test adding two txt records works"
1251         prefix, txt = 'emptytextrec', []
1252         p = self.make_txt_update(prefix, txt)
1253         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
1254         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
1255         self.check_query_txt(prefix, txt)
1256         self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
1257                              self.get_dns_domain(),
1258                              "%s.%s" % (prefix, self.get_dns_domain()),
1259                              dnsp.DNS_TYPE_TXT, ''))
1260
1261     def test_update_add_empty_rpc_to_dns(self):
1262         prefix, txt = 'rpcemptytextrec', []
1263
1264         name = "%s.%s" % (prefix, self.get_dns_domain())
1265
1266         rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
1267         add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
1268         add_rec_buf.rec = rec
1269         try:
1270             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1271                                      0, self.server_ip, self.get_dns_domain(),
1272                                      name, add_rec_buf, None)
1273         except WERRORError as e:
1274             self.fail(str(e))
1275
1276         try:
1277             self.check_query_txt(prefix, txt)
1278         finally:
1279             self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
1280                                               0, self.server_ip, self.get_dns_domain(),
1281                                               name, None, add_rec_buf)
1282
1283 TestProgram(module=__name__, opts=subunitopts)