c9f2bdfbd47e5da7f2f84c3a565cd93f0ba684c8
[samba.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import struct
20 import random
21 from samba import socket
22 import samba.ndr as ndr
23 from samba import credentials, param
24 from samba.tests import TestCase
25 from samba.dcerpc import dns, dnsp, dnsserver
26
27 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
28
29 def make_txt_record(records):
30     rdata_txt = dns.txt_record()
31     s_list = dnsp.string_list()
32     s_list.count = len(records)
33     s_list.str = records
34     rdata_txt.txt = s_list
35     return rdata_txt
36
37 class DNSTest(TestCase):
38
39     def errstr(self, errcode):
40         "Return a readable error code"
41         string_codes = [
42             "OK",
43             "FORMERR",
44             "SERVFAIL",
45             "NXDOMAIN",
46             "NOTIMP",
47             "REFUSED",
48             "YXDOMAIN",
49             "YXRRSET",
50             "NXRRSET",
51             "NOTAUTH",
52             "NOTZONE",
53         ]
54
55         return string_codes[errcode]
56
57
58     def assert_dns_rcode_equals(self, packet, rcode):
59         "Helper function to check return code"
60         p_errcode = packet.operation & 0x000F
61         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
62                             (self.errstr(rcode), self.errstr(p_errcode)))
63
64     def assert_dns_opcode_equals(self, packet, opcode):
65         "Helper function to check opcode"
66         p_opcode = packet.operation & 0x7800
67         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
68                             (opcode, p_opcode))
69
70     def make_name_packet(self, opcode, qid=None):
71         "Helper creating a dns.name_packet"
72         p = dns.name_packet()
73         if qid is None:
74             p.id = random.randint(0x0, 0xffff)
75         p.operation = opcode
76         p.questions = []
77         return p
78
79     def finish_name_packet(self, packet, questions):
80         "Helper to finalize a dns.name_packet"
81         packet.qdcount = len(questions)
82         packet.questions = questions
83
84     def make_name_question(self, name, qtype, qclass):
85         "Helper creating a dns.name_question"
86         q = dns.name_question()
87         q.name = name
88         q.question_type = qtype
89         q.question_class = qclass
90         return q
91
92     def get_dns_domain(self):
93         "Helper to get dns domain"
94         return os.getenv('REALM', 'example.com').lower()
95
96     def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
97         "send a DNS query and read the reply"
98         s = None
99         try:
100             send_packet = ndr.ndr_pack(packet)
101             if dump:
102                 print self.hexdump(send_packet)
103             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
104             s.connect((host, 53))
105             s.send(send_packet, 0)
106             recv_packet = s.recv(2048, 0)
107             if dump:
108                 print self.hexdump(recv_packet)
109             return ndr.ndr_unpack(dns.name_packet, recv_packet)
110         finally:
111             if s is not None:
112                 s.close()
113
114     def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
115         "send a DNS query and read the reply"
116         s = None
117         try:
118             send_packet = ndr.ndr_pack(packet)
119             if dump:
120                 print self.hexdump(send_packet)
121             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
122             s.connect((host, 53))
123             tcp_packet = struct.pack('!H', len(send_packet))
124             tcp_packet += send_packet
125             s.send(tcp_packet, 0)
126             recv_packet = s.recv(0xffff + 2, 0)
127             if dump:
128                 print self.hexdump(recv_packet)
129             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
130         finally:
131                 if s is not None:
132                     s.close()
133
134     def hexdump(self, src, length=8):
135         N=0; result=''
136         while src:
137            s,src = src[:length],src[length:]
138            hexa = ' '.join(["%02X"%ord(x) for x in s])
139            s = s.translate(FILTER)
140            result += "%04X   %-*s   %s\n" % (N, length*3, hexa, s)
141            N+=length
142         return result
143
144 class TestSimpleQueries(DNSTest):
145
146     def test_one_a_query(self):
147         "create a query packet containing one query record"
148         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
149         questions = []
150
151         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
152         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
153         print "asking for ", q.name
154         questions.append(q)
155
156         self.finish_name_packet(p, questions)
157         response = self.dns_transaction_udp(p)
158         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
159         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
160         self.assertEquals(response.ancount, 1)
161         self.assertEquals(response.answers[0].rdata,
162                           os.getenv('SERVER_IP'))
163
164     def test_one_a_query_tcp(self):
165         "create a query packet containing one query record via TCP"
166         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
167         questions = []
168
169         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
170         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
171         print "asking for ", q.name
172         questions.append(q)
173
174         self.finish_name_packet(p, questions)
175         response = self.dns_transaction_tcp(p)
176         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
177         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
178         self.assertEquals(response.ancount, 1)
179         self.assertEquals(response.answers[0].rdata,
180                           os.getenv('SERVER_IP'))
181
182     def test_one_mx_query(self):
183         "create a query packet causing an empty RCODE_OK answer"
184         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
185         questions = []
186
187         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
188         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
189         print "asking for ", q.name
190         questions.append(q)
191
192         self.finish_name_packet(p, questions)
193         response = self.dns_transaction_udp(p)
194         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
195         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
196         self.assertEquals(response.ancount, 0)
197
198         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
199         questions = []
200
201         name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
202         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
203         print "asking for ", q.name
204         questions.append(q)
205
206         self.finish_name_packet(p, questions)
207         response = self.dns_transaction_udp(p)
208         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
209         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
210         self.assertEquals(response.ancount, 0)
211
212     def test_two_queries(self):
213         "create a query packet containing two query records"
214         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
215         questions = []
216
217         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
218         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
219         questions.append(q)
220
221         name = "%s.%s" % ('bogusname', self.get_dns_domain())
222         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
223         questions.append(q)
224
225         self.finish_name_packet(p, questions)
226         response = self.dns_transaction_udp(p)
227         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
228
229     def test_qtype_all_query(self):
230         "create a QTYPE_ALL query"
231         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
232         questions = []
233
234         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
235         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
236         print "asking for ", q.name
237         questions.append(q)
238
239         self.finish_name_packet(p, questions)
240         response = self.dns_transaction_udp(p)
241
242         num_answers = 1
243         dc_ipv6 = os.getenv('SERVER_IPV6')
244         if dc_ipv6 is not None:
245             num_answers += 1
246
247         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
248         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
249         self.assertEquals(response.ancount, num_answers)
250         self.assertEquals(response.answers[0].rdata,
251                           os.getenv('SERVER_IP'))
252         if dc_ipv6 is not None:
253             self.assertEquals(response.answers[1].rdata, dc_ipv6)
254
255     def test_qclass_none_query(self):
256         "create a QCLASS_NONE query"
257         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
258         questions = []
259
260         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
261         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
262         questions.append(q)
263
264         self.finish_name_packet(p, questions)
265         response = self.dns_transaction_udp(p)
266         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
267
268 # Only returns an authority section entry in BIND and Win DNS
269 # FIXME: Enable one Samba implements this feature
270 #    def test_soa_hostname_query(self):
271 #        "create a SOA query for a hostname"
272 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
273 #        questions = []
274 #
275 #        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
276 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
277 #        questions.append(q)
278 #
279 #        self.finish_name_packet(p, questions)
280 #        response = self.dns_transaction_udp(p)
281 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
282 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
283 #        # We don't get SOA records for single hosts
284 #        self.assertEquals(response.ancount, 0)
285
286     def test_soa_domain_query(self):
287         "create a SOA query for a domain"
288         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
289         questions = []
290
291         name = self.get_dns_domain()
292         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
293         questions.append(q)
294
295         self.finish_name_packet(p, questions)
296         response = self.dns_transaction_udp(p)
297         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
298         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
299         self.assertEquals(response.ancount, 1)
300         self.assertEquals(response.answers[0].rdata.minimum, 3600)
301
302
303 class TestDNSUpdates(DNSTest):
304
305     def test_two_updates(self):
306         "create two update requests"
307         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
308         updates = []
309
310         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
311         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
312         updates.append(u)
313
314         name = self.get_dns_domain()
315         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
316         updates.append(u)
317
318         self.finish_name_packet(p, updates)
319         response = self.dns_transaction_udp(p)
320         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
321
322     def test_update_wrong_qclass(self):
323         "create update with DNS_QCLASS_NONE"
324         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
325         updates = []
326
327         name = self.get_dns_domain()
328         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
329         updates.append(u)
330
331         self.finish_name_packet(p, updates)
332         response = self.dns_transaction_udp(p)
333         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
334
335     def test_update_prereq_with_non_null_ttl(self):
336         "test update with a non-null TTL"
337         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
338         updates = []
339
340         name = self.get_dns_domain()
341
342         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
343         updates.append(u)
344         self.finish_name_packet(p, updates)
345
346         prereqs = []
347         r = dns.res_rec()
348         r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
349         r.rr_type = dns.DNS_QTYPE_TXT
350         r.rr_class = dns.DNS_QCLASS_NONE
351         r.ttl = 1
352         r.length = 0
353         prereqs.append(r)
354
355         p.ancount = len(prereqs)
356         p.answers = prereqs
357
358         response = self.dns_transaction_udp(p)
359         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
360
361 # I'd love to test this one, but it segfaults. :)
362 #    def test_update_prereq_with_non_null_length(self):
363 #        "test update with a non-null length"
364 #        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
365 #        updates = []
366 #
367 #        name = self.get_dns_domain()
368 #
369 #        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
370 #        updates.append(u)
371 #        self.finish_name_packet(p, updates)
372 #
373 #        prereqs = []
374 #        r = dns.res_rec()
375 #        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
376 #        r.rr_type = dns.DNS_QTYPE_TXT
377 #        r.rr_class = dns.DNS_QCLASS_ANY
378 #        r.ttl = 0
379 #        r.length = 1
380 #        prereqs.append(r)
381 #
382 #        p.ancount = len(prereqs)
383 #        p.answers = prereqs
384 #
385 #        response = self.dns_transaction_udp(p)
386 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
387
388     def test_update_prereq_nonexisting_name(self):
389         "test update with a nonexisting name"
390         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
391         updates = []
392
393         name = self.get_dns_domain()
394
395         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
396         updates.append(u)
397         self.finish_name_packet(p, updates)
398
399         prereqs = []
400         r = dns.res_rec()
401         r.name = "idontexist.%s" % self.get_dns_domain()
402         r.rr_type = dns.DNS_QTYPE_TXT
403         r.rr_class = dns.DNS_QCLASS_ANY
404         r.ttl = 0
405         r.length = 0
406         prereqs.append(r)
407
408         p.ancount = len(prereqs)
409         p.answers = prereqs
410
411         response = self.dns_transaction_udp(p)
412         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
413
414     def test_update_add_txt_record(self):
415         "test adding records works"
416         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
417         updates = []
418
419         name = self.get_dns_domain()
420
421         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
422         updates.append(u)
423         self.finish_name_packet(p, updates)
424
425         updates = []
426         r = dns.res_rec()
427         r.name = "textrec.%s" % self.get_dns_domain()
428         r.rr_type = dns.DNS_QTYPE_TXT
429         r.rr_class = dns.DNS_QCLASS_IN
430         r.ttl = 900
431         r.length = 0xffff
432         rdata = make_txt_record(['"This is a test"'])
433         r.rdata = rdata
434         updates.append(r)
435         p.nscount = len(updates)
436         p.nsrecs = updates
437
438         response = self.dns_transaction_udp(p)
439         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
440
441         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
442         questions = []
443
444         name = "textrec.%s" % self.get_dns_domain()
445         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
446         questions.append(q)
447
448         self.finish_name_packet(p, questions)
449         response = self.dns_transaction_udp(p)
450         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
451         self.assertEquals(response.ancount, 1)
452         self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
453
454     def test_update_add_two_txt_records(self):
455         "test adding two txt records works"
456         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
457         updates = []
458
459         name = self.get_dns_domain()
460
461         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
462         updates.append(u)
463         self.finish_name_packet(p, updates)
464
465         updates = []
466         r = dns.res_rec()
467         r.name = "textrec2.%s" % self.get_dns_domain()
468         r.rr_type = dns.DNS_QTYPE_TXT
469         r.rr_class = dns.DNS_QCLASS_IN
470         r.ttl = 900
471         r.length = 0xffff
472         rdata = make_txt_record(['"This is a test"',
473                                  '"and this is a test, too"'])
474         r.rdata = rdata
475         updates.append(r)
476         p.nscount = len(updates)
477         p.nsrecs = updates
478
479         response = self.dns_transaction_udp(p)
480         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
481
482         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
483         questions = []
484
485         name = "textrec2.%s" % self.get_dns_domain()
486         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
487         questions.append(q)
488
489         self.finish_name_packet(p, questions)
490         response = self.dns_transaction_udp(p)
491         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
492         self.assertEquals(response.ancount, 1)
493         self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
494         self.assertEquals(response.answers[0].rdata.txt.str[1], '"and this is a test, too"')
495
496     def test_delete_record(self):
497         "Test if deleting records works"
498
499         NAME = "deleterec.%s" % self.get_dns_domain()
500
501         # First, create a record to make sure we have a record to delete.
502         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
503         updates = []
504
505         name = self.get_dns_domain()
506
507         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
508         updates.append(u)
509         self.finish_name_packet(p, updates)
510
511         updates = []
512         r = dns.res_rec()
513         r.name = NAME
514         r.rr_type = dns.DNS_QTYPE_TXT
515         r.rr_class = dns.DNS_QCLASS_IN
516         r.ttl = 900
517         r.length = 0xffff
518         rdata = make_txt_record(['"This is a test"'])
519         r.rdata = rdata
520         updates.append(r)
521         p.nscount = len(updates)
522         p.nsrecs = updates
523
524         response = self.dns_transaction_udp(p)
525         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
526
527         # Now check the record is around
528         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
529         questions = []
530         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
531         questions.append(q)
532
533         self.finish_name_packet(p, questions)
534         response = self.dns_transaction_udp(p)
535         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
536
537         # Now delete the record
538         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
539         updates = []
540
541         name = self.get_dns_domain()
542
543         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
544         updates.append(u)
545         self.finish_name_packet(p, updates)
546
547         updates = []
548         r = dns.res_rec()
549         r.name = NAME
550         r.rr_type = dns.DNS_QTYPE_TXT
551         r.rr_class = dns.DNS_QCLASS_NONE
552         r.ttl = 0
553         r.length = 0xffff
554         rdata = make_txt_record(['"This is a test"'])
555         r.rdata = rdata
556         updates.append(r)
557         p.nscount = len(updates)
558         p.nsrecs = updates
559
560         response = self.dns_transaction_udp(p)
561         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
562
563         # And finally check it's gone
564         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
565         questions = []
566
567         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
568         questions.append(q)
569
570         self.finish_name_packet(p, questions)
571         response = self.dns_transaction_udp(p)
572         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
573
574     def test_readd_record(self):
575         "Test if adding, deleting and then readding a records works"
576
577         NAME = "readdrec.%s" % self.get_dns_domain()
578
579         # Create the record
580         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
581         updates = []
582
583         name = self.get_dns_domain()
584
585         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
586         updates.append(u)
587         self.finish_name_packet(p, updates)
588
589         updates = []
590         r = dns.res_rec()
591         r.name = NAME
592         r.rr_type = dns.DNS_QTYPE_TXT
593         r.rr_class = dns.DNS_QCLASS_IN
594         r.ttl = 900
595         r.length = 0xffff
596         rdata = make_txt_record(['"This is a test"'])
597         r.rdata = rdata
598         updates.append(r)
599         p.nscount = len(updates)
600         p.nsrecs = updates
601
602         response = self.dns_transaction_udp(p)
603         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
604
605         # Now check the record is around
606         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
607         questions = []
608         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
609         questions.append(q)
610
611         self.finish_name_packet(p, questions)
612         response = self.dns_transaction_udp(p)
613         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
614
615         # Now delete the record
616         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
617         updates = []
618
619         name = self.get_dns_domain()
620
621         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
622         updates.append(u)
623         self.finish_name_packet(p, updates)
624
625         updates = []
626         r = dns.res_rec()
627         r.name = NAME
628         r.rr_type = dns.DNS_QTYPE_TXT
629         r.rr_class = dns.DNS_QCLASS_NONE
630         r.ttl = 0
631         r.length = 0xffff
632         rdata = make_txt_record(['"This is a test"'])
633         r.rdata = rdata
634         updates.append(r)
635         p.nscount = len(updates)
636         p.nsrecs = updates
637
638         response = self.dns_transaction_udp(p)
639         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
640
641         # check it's gone
642         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
643         questions = []
644
645         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
646         questions.append(q)
647
648         self.finish_name_packet(p, questions)
649         response = self.dns_transaction_udp(p)
650         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
651
652         # recreate the record
653         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
654         updates = []
655
656         name = self.get_dns_domain()
657
658         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
659         updates.append(u)
660         self.finish_name_packet(p, updates)
661
662         updates = []
663         r = dns.res_rec()
664         r.name = NAME
665         r.rr_type = dns.DNS_QTYPE_TXT
666         r.rr_class = dns.DNS_QCLASS_IN
667         r.ttl = 900
668         r.length = 0xffff
669         rdata = make_txt_record(['"This is a test"'])
670         r.rdata = rdata
671         updates.append(r)
672         p.nscount = len(updates)
673         p.nsrecs = updates
674
675         response = self.dns_transaction_udp(p)
676         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
677
678         # Now check the record is around
679         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
680         questions = []
681         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
682         questions.append(q)
683
684         self.finish_name_packet(p, questions)
685         response = self.dns_transaction_udp(p)
686         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
687
688     def test_update_add_mx_record(self):
689         "test adding MX records works"
690         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
691         updates = []
692
693         name = self.get_dns_domain()
694
695         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
696         updates.append(u)
697         self.finish_name_packet(p, updates)
698
699         updates = []
700         r = dns.res_rec()
701         r.name = "%s" % self.get_dns_domain()
702         r.rr_type = dns.DNS_QTYPE_MX
703         r.rr_class = dns.DNS_QCLASS_IN
704         r.ttl = 900
705         r.length = 0xffff
706         rdata = dns.mx_record()
707         rdata.preference = 10
708         rdata.exchange = 'mail.%s' % self.get_dns_domain()
709         r.rdata = rdata
710         updates.append(r)
711         p.nscount = len(updates)
712         p.nsrecs = updates
713
714         response = self.dns_transaction_udp(p)
715         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
716
717         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
718         questions = []
719
720         name = "%s" % self.get_dns_domain()
721         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
722         questions.append(q)
723
724         self.finish_name_packet(p, questions)
725         response = self.dns_transaction_udp(p)
726         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
727         self.assertEqual(response.ancount, 1)
728         ans = response.answers[0]
729         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
730         self.assertEqual(ans.rdata.preference, 10)
731         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
732
733
734 class TestComplexQueries(DNSTest):
735
736     def setUp(self):
737         super(TestComplexQueries, self).setUp()
738         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
739         updates = []
740
741         name = self.get_dns_domain()
742
743         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
744         updates.append(u)
745         self.finish_name_packet(p, updates)
746
747         updates = []
748         r = dns.res_rec()
749         r.name = "cname_test.%s" % self.get_dns_domain()
750         r.rr_type = dns.DNS_QTYPE_CNAME
751         r.rr_class = dns.DNS_QCLASS_IN
752         r.ttl = 900
753         r.length = 0xffff
754         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
755         updates.append(r)
756         p.nscount = len(updates)
757         p.nsrecs = updates
758
759         response = self.dns_transaction_udp(p)
760         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
761
762     def tearDown(self):
763         super(TestComplexQueries, self).tearDown()
764         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
765         updates = []
766
767         name = self.get_dns_domain()
768
769         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
770         updates.append(u)
771         self.finish_name_packet(p, updates)
772
773         updates = []
774         r = dns.res_rec()
775         r.name = "cname_test.%s" % self.get_dns_domain()
776         r.rr_type = dns.DNS_QTYPE_CNAME
777         r.rr_class = dns.DNS_QCLASS_NONE
778         r.ttl = 0
779         r.length = 0xffff
780         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
781         updates.append(r)
782         p.nscount = len(updates)
783         p.nsrecs = updates
784
785         response = self.dns_transaction_udp(p)
786         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
787
788     def test_one_a_query(self):
789         "create a query packet containing one query record"
790         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
791         questions = []
792
793         name = "cname_test.%s" % self.get_dns_domain()
794         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
795         print "asking for ", q.name
796         questions.append(q)
797
798         self.finish_name_packet(p, questions)
799         response = self.dns_transaction_udp(p)
800         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
801         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
802         self.assertEquals(response.ancount, 2)
803         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
804         self.assertEquals(response.answers[0].rdata, "%s.%s" %
805                           (os.getenv('SERVER'), self.get_dns_domain()))
806         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
807         self.assertEquals(response.answers[1].rdata,
808                           os.getenv('SERVER_IP'))
809
810 class TestInvalidQueries(DNSTest):
811
812     def test_one_a_query(self):
813         "send 0 bytes follows by create a query packet containing one query record"
814
815         s = None
816         try:
817             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
818             s.connect((os.getenv('SERVER_IP'), 53))
819             s.send("", 0)
820         finally:
821             if s is not None:
822                 s.close()
823
824         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
825         questions = []
826
827         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
828         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
829         print "asking for ", q.name
830         questions.append(q)
831
832         self.finish_name_packet(p, questions)
833         response = self.dns_transaction_udp(p)
834         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
835         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
836         self.assertEquals(response.ancount, 1)
837         self.assertEquals(response.answers[0].rdata,
838                           os.getenv('SERVER_IP'))
839
840     def test_one_a_reply(self):
841         "send a reply instead of a query"
842
843         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
844         questions = []
845
846         name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
847         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
848         print "asking for ", q.name
849         questions.append(q)
850
851         self.finish_name_packet(p, questions)
852         p.operation |= dns.DNS_FLAG_REPLY
853         s = None
854         try:
855             send_packet = ndr.ndr_pack(p)
856             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
857             host=os.getenv('SERVER_IP')
858             s.connect((host, 53))
859             tcp_packet = struct.pack('!H', len(send_packet))
860             tcp_packet += send_packet
861             s.send(tcp_packet, 0)
862             recv_packet = s.recv(0xffff + 2, 0)
863             self.assertEquals(0, len(recv_packet))
864         finally:
865             if s is not None:
866                 s.close()
867
868
869 if __name__ == "__main__":
870     import unittest
871     unittest.main()