15672a0b008b91862218fd67cfd0366bac6a0a6e
[obnox/samba/samba-obnox.git] / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import struct
20 import random
21 from samba import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba.tests import TestCase
25
26 FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
27
28
29 class DNSTest(TestCase):
30
31     def errstr(self, errcode):
32         "Return a readable error code"
33         string_codes = [
34             "OK",
35             "FORMERR",
36             "SERVFAIL",
37             "NXDOMAIN",
38             "NOTIMP",
39             "REFUSED",
40             "YXDOMAIN",
41             "YXRRSET",
42             "NXRRSET",
43             "NOTAUTH",
44             "NOTZONE",
45         ]
46
47         return string_codes[errcode]
48
49
50     def assert_dns_rcode_equals(self, packet, rcode):
51         "Helper function to check return code"
52         p_errcode = packet.operation & 0x000F
53         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
54                             (self.errstr(rcode), self.errstr(p_errcode)))
55
56     def assert_dns_opcode_equals(self, packet, opcode):
57         "Helper function to check opcode"
58         p_opcode = packet.operation & 0x7800
59         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
60                             (opcode, p_opcode))
61
62     def make_name_packet(self, opcode, qid=None):
63         "Helper creating a dns.name_packet"
64         p = dns.name_packet()
65         if qid is None:
66             p.id = random.randint(0x0, 0xffff)
67         p.operation = opcode
68         p.questions = []
69         return p
70
71     def finish_name_packet(self, packet, questions):
72         "Helper to finalize a dns.name_packet"
73         packet.qdcount = len(questions)
74         packet.questions = questions
75
76     def make_name_question(self, name, qtype, qclass):
77         "Helper creating a dns.name_question"
78         q = dns.name_question()
79         q.name = name
80         q.question_type = qtype
81         q.question_class = qclass
82         return q
83
84     def get_dns_domain(self):
85         "Helper to get dns domain"
86         return os.getenv('REALM', 'example.com').lower()
87
88     def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
89         "send a DNS query and read the reply"
90         s = None
91         try:
92             send_packet = ndr.ndr_pack(packet)
93             if dump:
94                 print self.hexdump(send_packet)
95             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
96             s.connect((host, 53))
97             s.send(send_packet, 0)
98             recv_packet = s.recv(2048, 0)
99             if dump:
100                 print self.hexdump(recv_packet)
101             return ndr.ndr_unpack(dns.name_packet, recv_packet)
102         finally:
103             if s is not None:
104                 s.close()
105
106     def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
107         "send a DNS query and read the reply"
108         s = None
109         try:
110             send_packet = ndr.ndr_pack(packet)
111             if dump:
112                 print self.hexdump(send_packet)
113             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
114             s.connect((host, 53))
115             tcp_packet = struct.pack('!H', len(send_packet))
116             tcp_packet += send_packet
117             s.send(tcp_packet, 0)
118             recv_packet = s.recv(0xffff + 2, 0)
119             if dump:
120                 print self.hexdump(recv_packet)
121             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
122         finally:
123                 if s is not None:
124                     s.close()
125
126     def hexdump(self, src, length=8):
127         N=0; result=''
128         while src:
129            s,src = src[:length],src[length:]
130            hexa = ' '.join(["%02X"%ord(x) for x in s])
131            s = s.translate(FILTER)
132            result += "%04X   %-*s   %s\n" % (N, length*3, hexa, s)
133            N+=length
134         return result
135
136 class TestSimpleQueries(DNSTest):
137
138     def test_one_a_query(self):
139         "create a query packet containing one query record"
140         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
141         questions = []
142
143         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
144         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
145         print "asking for ", q.name
146         questions.append(q)
147
148         self.finish_name_packet(p, questions)
149         response = self.dns_transaction_udp(p)
150         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
151         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
152         self.assertEquals(response.ancount, 1)
153         self.assertEquals(response.answers[0].rdata,
154                           os.getenv('SERVER_IP'))
155
156     def test_one_a_query_tcp(self):
157         "create a query packet containing one query record via TCP"
158         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
159         questions = []
160
161         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
162         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
163         print "asking for ", q.name
164         questions.append(q)
165
166         self.finish_name_packet(p, questions)
167         response = self.dns_transaction_tcp(p)
168         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
169         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
170         self.assertEquals(response.ancount, 1)
171         self.assertEquals(response.answers[0].rdata,
172                           os.getenv('SERVER_IP'))
173
174     def test_two_queries(self):
175         "create a query packet containing two query records"
176         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
177         questions = []
178
179         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
180         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
181         questions.append(q)
182
183         name = "%s.%s" % ('bogusname', self.get_dns_domain())
184         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
185         questions.append(q)
186
187         self.finish_name_packet(p, questions)
188         response = self.dns_transaction_udp(p)
189         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
190
191     def test_qtype_all_query(self):
192         "create a QTYPE_ALL query"
193         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
194         questions = []
195
196         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
197         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
198         print "asking for ", q.name
199         questions.append(q)
200
201         self.finish_name_packet(p, questions)
202         response = self.dns_transaction_udp(p)
203
204         num_answers = 1
205         dc_ipv6 = os.getenv('SERVER_IPV6')
206         if dc_ipv6 is not None:
207             num_answers += 1
208
209         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
210         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
211         self.assertEquals(response.ancount, num_answers)
212         self.assertEquals(response.answers[0].rdata,
213                           os.getenv('SERVER_IP'))
214         if dc_ipv6 is not None:
215             self.assertEquals(response.answers[1].rdata, dc_ipv6)
216
217     def test_qclass_none_query(self):
218         "create a QCLASS_NONE query"
219         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
220         questions = []
221
222         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
223         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
224         questions.append(q)
225
226         self.finish_name_packet(p, questions)
227         response = self.dns_transaction_udp(p)
228         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
229
230 # Only returns an authority section entry in BIND and Win DNS
231 # FIXME: Enable one Samba implements this feature
232 #    def test_soa_hostname_query(self):
233 #        "create a SOA query for a hostname"
234 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
235 #        questions = []
236 #
237 #        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
238 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
239 #        questions.append(q)
240 #
241 #        self.finish_name_packet(p, questions)
242 #        response = self.dns_transaction_udp(p)
243 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
244 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
245 #        # We don't get SOA records for single hosts
246 #        self.assertEquals(response.ancount, 0)
247
248     def test_soa_domain_query(self):
249         "create a SOA query for a domain"
250         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
251         questions = []
252
253         name = self.get_dns_domain()
254         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
255         questions.append(q)
256
257         self.finish_name_packet(p, questions)
258         response = self.dns_transaction_udp(p)
259         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
260         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
261         self.assertEquals(response.ancount, 1)
262
263
264 class TestDNSUpdates(DNSTest):
265
266     def test_two_updates(self):
267         "create two update requests"
268         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
269         updates = []
270
271         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
272         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
273         updates.append(u)
274
275         name = self.get_dns_domain()
276         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
277         updates.append(u)
278
279         self.finish_name_packet(p, updates)
280         response = self.dns_transaction_udp(p)
281         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
282
283     def test_update_wrong_qclass(self):
284         "create update with DNS_QCLASS_NONE"
285         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
286         updates = []
287
288         name = self.get_dns_domain()
289         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
290         updates.append(u)
291
292         self.finish_name_packet(p, updates)
293         response = self.dns_transaction_udp(p)
294         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
295
296     def test_update_prereq_with_non_null_ttl(self):
297         "test update with a non-null TTL"
298         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
299         updates = []
300
301         name = self.get_dns_domain()
302
303         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
304         updates.append(u)
305         self.finish_name_packet(p, updates)
306
307         prereqs = []
308         r = dns.res_rec()
309         r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
310         r.rr_type = dns.DNS_QTYPE_TXT
311         r.rr_class = dns.DNS_QCLASS_NONE
312         r.ttl = 1
313         r.length = 0
314         prereqs.append(r)
315
316         p.ancount = len(prereqs)
317         p.answers = prereqs
318
319         response = self.dns_transaction_udp(p)
320         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
321
322 # I'd love to test this one, but it segfaults. :)
323 #    def test_update_prereq_with_non_null_length(self):
324 #        "test update with a non-null length"
325 #        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
326 #        updates = []
327 #
328 #        name = self.get_dns_domain()
329 #
330 #        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
331 #        updates.append(u)
332 #        self.finish_name_packet(p, updates)
333 #
334 #        prereqs = []
335 #        r = dns.res_rec()
336 #        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
337 #        r.rr_type = dns.DNS_QTYPE_TXT
338 #        r.rr_class = dns.DNS_QCLASS_ANY
339 #        r.ttl = 0
340 #        r.length = 1
341 #        prereqs.append(r)
342 #
343 #        p.ancount = len(prereqs)
344 #        p.answers = prereqs
345 #
346 #        response = self.dns_transaction_udp(p)
347 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
348
349     def test_update_prereq_nonexisting_name(self):
350         "test update with a nonexisting name"
351         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
352         updates = []
353
354         name = self.get_dns_domain()
355
356         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
357         updates.append(u)
358         self.finish_name_packet(p, updates)
359
360         prereqs = []
361         r = dns.res_rec()
362         r.name = "idontexist.%s" % self.get_dns_domain()
363         r.rr_type = dns.DNS_QTYPE_TXT
364         r.rr_class = dns.DNS_QCLASS_ANY
365         r.ttl = 0
366         r.length = 0
367         prereqs.append(r)
368
369         p.ancount = len(prereqs)
370         p.answers = prereqs
371
372         response = self.dns_transaction_udp(p)
373         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
374
375     def test_update_add_txt_record(self):
376         "test adding records works"
377         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
378         updates = []
379
380         name = self.get_dns_domain()
381
382         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
383         updates.append(u)
384         self.finish_name_packet(p, updates)
385
386         updates = []
387         r = dns.res_rec()
388         r.name = "textrec.%s" % self.get_dns_domain()
389         r.rr_type = dns.DNS_QTYPE_TXT
390         r.rr_class = dns.DNS_QCLASS_IN
391         r.ttl = 900
392         r.length = 0xffff
393         rdata = dns.txt_record()
394         rdata.txt = '"This is a test"'
395         r.rdata = rdata
396         updates.append(r)
397         p.nscount = len(updates)
398         p.nsrecs = updates
399
400         response = self.dns_transaction_udp(p)
401         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
402
403         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
404         questions = []
405
406         name = "textrec.%s" % self.get_dns_domain()
407         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
408         questions.append(q)
409
410         self.finish_name_packet(p, questions)
411         response = self.dns_transaction_udp(p)
412         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
413         self.assertEquals(response.ancount, 1)
414         self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
415
416     def test_update_add_two_txt_records(self):
417         "test adding two txt records works"
418         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
419         updates = []
420
421         name = self.get_dns_domain()
422
423         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
424         updates.append(u)
425         self.finish_name_packet(p, updates)
426
427         updates = []
428         r = dns.res_rec()
429         r.name = "textrec2.%s" % self.get_dns_domain()
430         r.rr_type = dns.DNS_QTYPE_TXT
431         r.rr_class = dns.DNS_QCLASS_IN
432         r.ttl = 900
433         r.length = 0xffff
434         rdata = dns.txt_record()
435         rdata.txt = '"This is a test" "and this is a test, too"'
436         r.rdata = rdata
437         updates.append(r)
438         p.nscount = len(updates)
439         p.nsrecs = updates
440
441         response = self.dns_transaction_udp(p)
442         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
443
444         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
445         questions = []
446
447         name = "textrec2.%s" % self.get_dns_domain()
448         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
449         questions.append(q)
450
451         self.finish_name_packet(p, questions)
452         response = self.dns_transaction_udp(p)
453         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
454         self.assertEquals(response.ancount, 1)
455         self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
456
457     def test_delete_record(self):
458         "Test if deleting records works"
459
460         NAME = "deleterec.%s" % self.get_dns_domain()
461
462         # First, create a record to make sure we have a record to delete.
463         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
464         updates = []
465
466         name = self.get_dns_domain()
467
468         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
469         updates.append(u)
470         self.finish_name_packet(p, updates)
471
472         updates = []
473         r = dns.res_rec()
474         r.name = NAME
475         r.rr_type = dns.DNS_QTYPE_TXT
476         r.rr_class = dns.DNS_QCLASS_IN
477         r.ttl = 900
478         r.length = 0xffff
479         rdata = dns.txt_record()
480         rdata.txt = '"This is a test"'
481         r.rdata = rdata
482         updates.append(r)
483         p.nscount = len(updates)
484         p.nsrecs = updates
485
486         response = self.dns_transaction_udp(p)
487         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
488
489         # Now check the record is around
490         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
491         questions = []
492         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
493         questions.append(q)
494
495         self.finish_name_packet(p, questions)
496         response = self.dns_transaction_udp(p)
497         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
498
499         # Now delete the record
500         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
501         updates = []
502
503         name = self.get_dns_domain()
504
505         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
506         updates.append(u)
507         self.finish_name_packet(p, updates)
508
509         updates = []
510         r = dns.res_rec()
511         r.name = NAME
512         r.rr_type = dns.DNS_QTYPE_TXT
513         r.rr_class = dns.DNS_QCLASS_NONE
514         r.ttl = 0
515         r.length = 0xffff
516         rdata = dns.txt_record()
517         rdata.txt = '"This is a test"'
518         r.rdata = rdata
519         updates.append(r)
520         p.nscount = len(updates)
521         p.nsrecs = updates
522
523         response = self.dns_transaction_udp(p)
524         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
525
526         # And finally check it's gone
527         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
528         questions = []
529
530         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
531         questions.append(q)
532
533         self.finish_name_packet(p, questions)
534         response = self.dns_transaction_udp(p)
535         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
536
537     def test_update_add_mx_record(self):
538         "test adding MX records works"
539         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
540         updates = []
541
542         name = self.get_dns_domain()
543
544         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
545         updates.append(u)
546         self.finish_name_packet(p, updates)
547
548         updates = []
549         r = dns.res_rec()
550         r.name = "%s" % self.get_dns_domain()
551         r.rr_type = dns.DNS_QTYPE_MX
552         r.rr_class = dns.DNS_QCLASS_IN
553         r.ttl = 900
554         r.length = 0xffff
555         rdata = dns.mx_record()
556         rdata.preference = 10
557         rdata.exchange = 'mail.%s' % self.get_dns_domain()
558         r.rdata = rdata
559         updates.append(r)
560         p.nscount = len(updates)
561         p.nsrecs = updates
562
563         response = self.dns_transaction_udp(p)
564         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
565
566         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
567         questions = []
568
569         name = "%s" % self.get_dns_domain()
570         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
571         questions.append(q)
572
573         self.finish_name_packet(p, questions)
574         response = self.dns_transaction_udp(p)
575         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
576         self.assertEqual(response.ancount, 1)
577         ans = response.answers[0]
578         self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
579         self.assertEqual(ans.rdata.preference, 10)
580         self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
581
582
583 class TestComplexQueries(DNSTest):
584
585     def setUp(self):
586         super(TestComplexQueries, self).setUp()
587         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
588         updates = []
589
590         name = self.get_dns_domain()
591
592         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
593         updates.append(u)
594         self.finish_name_packet(p, updates)
595
596         updates = []
597         r = dns.res_rec()
598         r.name = "cname_test.%s" % self.get_dns_domain()
599         r.rr_type = dns.DNS_QTYPE_CNAME
600         r.rr_class = dns.DNS_QCLASS_IN
601         r.ttl = 900
602         r.length = 0xffff
603         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
604         updates.append(r)
605         p.nscount = len(updates)
606         p.nsrecs = updates
607
608         response = self.dns_transaction_udp(p)
609         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
610
611     def tearDown(self):
612         super(TestComplexQueries, self).tearDown()
613         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
614         updates = []
615
616         name = self.get_dns_domain()
617
618         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
619         updates.append(u)
620         self.finish_name_packet(p, updates)
621
622         updates = []
623         r = dns.res_rec()
624         r.name = "cname_test.%s" % self.get_dns_domain()
625         r.rr_type = dns.DNS_QTYPE_CNAME
626         r.rr_class = dns.DNS_QCLASS_NONE
627         r.ttl = 0
628         r.length = 0xffff
629         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
630         updates.append(r)
631         p.nscount = len(updates)
632         p.nsrecs = updates
633
634         response = self.dns_transaction_udp(p)
635         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
636
637     def test_one_a_query(self):
638         "create a query packet containing one query record"
639         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
640         questions = []
641
642         name = "cname_test.%s" % self.get_dns_domain()
643         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
644         print "asking for ", q.name
645         questions.append(q)
646
647         self.finish_name_packet(p, questions)
648         response = self.dns_transaction_udp(p)
649         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
650         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
651         self.assertEquals(response.ancount, 2)
652         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
653         self.assertEquals(response.answers[0].rdata, "%s.%s" %
654                           (os.getenv('SERVER'), self.get_dns_domain()))
655         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
656         self.assertEquals(response.answers[1].rdata,
657                           os.getenv('SERVER_IP'))
658
659 class TestInvalidQueries(DNSTest):
660
661     def test_one_a_query(self):
662         "send 0 bytes follows by create a query packet containing one query record"
663
664         s = None
665         try:
666             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
667             s.connect((os.getenv('SERVER_IP'), 53))
668             s.send("", 0)
669         finally:
670             if s is not None:
671                 s.close()
672
673         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
674         questions = []
675
676         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
677         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
678         print "asking for ", q.name
679         questions.append(q)
680
681         self.finish_name_packet(p, questions)
682         response = self.dns_transaction_udp(p)
683         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
684         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
685         self.assertEquals(response.ancount, 1)
686         self.assertEquals(response.answers[0].rdata,
687                           os.getenv('SERVER_IP'))
688
689 if __name__ == "__main__":
690     import unittest
691     unittest.main()