d01c8ffe52e9a7784481946b7b3500313ee40311
[kai/samba.git] / source4 / scripting / python / samba / tests / dns.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Kai Blin  <kai@samba.org> 2011
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import struct
20 import random
21 from samba import socket
22 import samba.ndr as ndr
23 import samba.dcerpc.dns as dns
24 from samba.tests import TestCase
25
26 class DNSTest(TestCase):
27
28     def errstr(self, errcode):
29         "Return a readable error code"
30         string_codes = [
31             "OK",
32             "FORMERR",
33             "SERVFAIL",
34             "NXDOMAIN",
35             "NOTIMP",
36             "REFUSED",
37             "YXDOMAIN",
38             "YXRRSET",
39             "NXRRSET",
40             "NOTAUTH",
41             "NOTZONE",
42         ]
43
44         return string_codes[errcode]
45
46
47     def assert_dns_rcode_equals(self, packet, rcode):
48         "Helper function to check return code"
49         p_errcode = packet.operation & 0x000F
50         self.assertEquals(p_errcode, rcode, "Expected RCODE %s, got %s" %
51                             (self.errstr(rcode), self.errstr(p_errcode)))
52
53     def assert_dns_opcode_equals(self, packet, opcode):
54         "Helper function to check opcode"
55         p_opcode = packet.operation & 0x7800
56         self.assertEquals(p_opcode, opcode, "Expected OPCODE %s, got %s" %
57                             (opcode, p_opcode))
58
59     def make_name_packet(self, opcode, qid=None):
60         "Helper creating a dns.name_packet"
61         p = dns.name_packet()
62         if qid is None:
63             p.id = random.randint(0x0, 0xffff)
64         p.operation = opcode
65         p.questions = []
66         return p
67
68     def finish_name_packet(self, packet, questions):
69         "Helper to finalize a dns.name_packet"
70         packet.qdcount = len(questions)
71         packet.questions = questions
72
73     def make_name_question(self, name, qtype, qclass):
74         "Helper creating a dns.name_question"
75         q = dns.name_question()
76         q.name = name
77         q.question_type = qtype
78         q.question_class = qclass
79         return q
80
81     def get_dns_domain(self):
82         "Helper to get dns domain"
83         return os.getenv('REALM', 'example.com').lower()
84
85     def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP')):
86         "send a DNS query and read the reply"
87         s = None
88         try:
89             send_packet = ndr.ndr_pack(packet)
90             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
91             s.connect((host, 53))
92             s.send(send_packet, 0)
93             recv_packet = s.recv(2048, 0)
94             return ndr.ndr_unpack(dns.name_packet, recv_packet)
95         finally:
96             if s is not None:
97                 s.close()
98
99     def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP')):
100         "send a DNS query and read the reply"
101         s = None
102         try:
103             send_packet = ndr.ndr_pack(packet)
104             s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
105             s.connect((host, 53))
106             tcp_packet = struct.pack('!H', len(send_packet))
107             tcp_packet += send_packet
108             s.send(tcp_packet, 0)
109             recv_packet = s.recv(0xffff + 2, 0)
110             return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
111         finally:
112                 if s is not None:
113                     s.close()
114
115
116 class TestSimpleQueries(DNSTest):
117
118     def test_one_a_query(self):
119         "create a query packet containing one query record"
120         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
121         questions = []
122
123         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
124         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
125         print "asking for ", q.name
126         questions.append(q)
127
128         self.finish_name_packet(p, questions)
129         response = self.dns_transaction_udp(p)
130         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
131         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
132         self.assertEquals(response.ancount, 1)
133         self.assertEquals(response.answers[0].rdata,
134                           os.getenv('SERVER_IP'))
135
136     def test_one_a_query_tcp(self):
137         "create a query packet containing one query record via TCP"
138         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
139         questions = []
140
141         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
142         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
143         print "asking for ", q.name
144         questions.append(q)
145
146         self.finish_name_packet(p, questions)
147         response = self.dns_transaction_tcp(p)
148         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
149         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
150         self.assertEquals(response.ancount, 1)
151         self.assertEquals(response.answers[0].rdata,
152                           os.getenv('SERVER_IP'))
153
154     def test_two_queries(self):
155         "create a query packet containing two query records"
156         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
157         questions = []
158
159         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
160         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
161         questions.append(q)
162
163         name = "%s.%s" % ('bogusname', self.get_dns_domain())
164         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
165         questions.append(q)
166
167         self.finish_name_packet(p, questions)
168         response = self.dns_transaction_udp(p)
169         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
170
171     def test_qtype_all_query(self):
172         "create a QTYPE_ALL query"
173         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
174         questions = []
175
176         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
177         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
178         print "asking for ", q.name
179         questions.append(q)
180
181         self.finish_name_packet(p, questions)
182         response = self.dns_transaction_udp(p)
183
184         num_answers = 1
185         dc_ipv6 = os.getenv('SERVER_IPV6')
186         if dc_ipv6 is not None:
187             num_answers += 1
188
189         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
190         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
191         self.assertEquals(response.ancount, num_answers)
192         self.assertEquals(response.answers[0].rdata,
193                           os.getenv('SERVER_IP'))
194         if dc_ipv6 is not None:
195             self.assertEquals(response.answers[1].rdata, dc_ipv6)
196
197     def test_qclass_none_query(self):
198         "create a QCLASS_NONE query"
199         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
200         questions = []
201
202         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
203         q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
204         questions.append(q)
205
206         self.finish_name_packet(p, questions)
207         response = self.dns_transaction_udp(p)
208         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
209
210 # Only returns an authority section entry in BIND and Win DNS
211 # FIXME: Enable one Samba implements this feature
212 #    def test_soa_hostname_query(self):
213 #        "create a SOA query for a hostname"
214 #        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
215 #        questions = []
216 #
217 #        name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
218 #        q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
219 #        questions.append(q)
220 #
221 #        self.finish_name_packet(p, questions)
222 #        response = self.dns_transaction_udp(p)
223 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
224 #        self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
225 #        # We don't get SOA records for single hosts
226 #        self.assertEquals(response.ancount, 0)
227
228     def test_soa_domain_query(self):
229         "create a SOA query for a domain"
230         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
231         questions = []
232
233         name = self.get_dns_domain()
234         q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
235         questions.append(q)
236
237         self.finish_name_packet(p, questions)
238         response = self.dns_transaction_udp(p)
239         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
240         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
241         self.assertEquals(response.ancount, 1)
242
243
244 class TestDNSUpdates(DNSTest):
245
246     def test_two_updates(self):
247         "create two update requests"
248         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
249         updates = []
250
251         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
252         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
253         updates.append(u)
254
255         name = self.get_dns_domain()
256         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
257         updates.append(u)
258
259         self.finish_name_packet(p, updates)
260         response = self.dns_transaction_udp(p)
261         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
262
263     def test_update_wrong_qclass(self):
264         "create update with DNS_QCLASS_NONE"
265         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
266         updates = []
267
268         name = self.get_dns_domain()
269         u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_NONE)
270         updates.append(u)
271
272         self.finish_name_packet(p, updates)
273         response = self.dns_transaction_udp(p)
274         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NOTIMP)
275
276     def test_update_prereq_with_non_null_ttl(self):
277         "test update with a non-null TTL"
278         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
279         updates = []
280
281         name = self.get_dns_domain()
282
283         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
284         updates.append(u)
285         self.finish_name_packet(p, updates)
286
287         prereqs = []
288         r = dns.res_rec()
289         r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
290         r.rr_type = dns.DNS_QTYPE_TXT
291         r.rr_class = dns.DNS_QCLASS_NONE
292         r.ttl = 1
293         r.length = 0
294         prereqs.append(r)
295
296         p.ancount = len(prereqs)
297         p.answers = prereqs
298
299         response = self.dns_transaction_udp(p)
300         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
301
302 # I'd love to test this one, but it segfaults. :)
303 #    def test_update_prereq_with_non_null_length(self):
304 #        "test update with a non-null length"
305 #        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
306 #        updates = []
307 #
308 #        name = self.get_dns_domain()
309 #
310 #        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
311 #        updates.append(u)
312 #        self.finish_name_packet(p, updates)
313 #
314 #        prereqs = []
315 #        r = dns.res_rec()
316 #        r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
317 #        r.rr_type = dns.DNS_QTYPE_TXT
318 #        r.rr_class = dns.DNS_QCLASS_ANY
319 #        r.ttl = 0
320 #        r.length = 1
321 #        prereqs.append(r)
322 #
323 #        p.ancount = len(prereqs)
324 #        p.answers = prereqs
325 #
326 #        response = self.dns_transaction_udp(p)
327 #        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
328
329     def test_update_prereq_nonexisting_name(self):
330         "test update with a nonexisting name"
331         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
332         updates = []
333
334         name = self.get_dns_domain()
335
336         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
337         updates.append(u)
338         self.finish_name_packet(p, updates)
339
340         prereqs = []
341         r = dns.res_rec()
342         r.name = "idontexist.%s" % self.get_dns_domain()
343         r.rr_type = dns.DNS_QTYPE_TXT
344         r.rr_class = dns.DNS_QCLASS_ANY
345         r.ttl = 0
346         r.length = 0
347         prereqs.append(r)
348
349         p.ancount = len(prereqs)
350         p.answers = prereqs
351
352         response = self.dns_transaction_udp(p)
353         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
354
355     def test_update_add_txt_record(self):
356         "test adding records works"
357         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
358         updates = []
359
360         name = self.get_dns_domain()
361
362         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
363         updates.append(u)
364         self.finish_name_packet(p, updates)
365
366         updates = []
367         r = dns.res_rec()
368         r.name = "textrec.%s" % self.get_dns_domain()
369         r.rr_type = dns.DNS_QTYPE_TXT
370         r.rr_class = dns.DNS_QCLASS_IN
371         r.ttl = 900
372         r.length = 0xffff
373         r.rdata = dns.txt_record()
374         r.rdata.txt = '"This is a test"'
375         updates.append(r)
376         p.nscount = len(updates)
377         p.nsrecs = updates
378
379         response = self.dns_transaction_udp(p)
380         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
381
382         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
383         questions = []
384
385         name = "textrec.%s" % self.get_dns_domain()
386         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
387         questions.append(q)
388
389         self.finish_name_packet(p, questions)
390         response = self.dns_transaction_udp(p)
391         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
392         self.assertEquals(response.ancount, 1)
393         self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
394
395     def test_update_add_two_txt_records(self):
396         "test adding two txt records works"
397         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
398         updates = []
399
400         name = self.get_dns_domain()
401
402         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
403         updates.append(u)
404         self.finish_name_packet(p, updates)
405
406         updates = []
407         r = dns.res_rec()
408         r.name = "textrec2.%s" % self.get_dns_domain()
409         r.rr_type = dns.DNS_QTYPE_TXT
410         r.rr_class = dns.DNS_QCLASS_IN
411         r.ttl = 900
412         r.length = 0xffff
413         r.rdata = dns.txt_record()
414         r.rdata.txt = '"This is a test" "and this is a test, too"'
415         updates.append(r)
416         p.nscount = len(updates)
417         p.nsrecs = updates
418
419         response = self.dns_transaction_udp(p)
420         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
421
422         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
423         questions = []
424
425         name = "textrec2.%s" % self.get_dns_domain()
426         q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
427         questions.append(q)
428
429         self.finish_name_packet(p, questions)
430         response = self.dns_transaction_udp(p)
431         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
432         self.assertEquals(response.ancount, 1)
433         self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
434
435     def test_delete_record(self):
436         "Test if deleting records works"
437
438         NAME = "deleterec.%s" % self.get_dns_domain()
439
440         # First, create a record to make sure we have a record to delete.
441         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
442         updates = []
443
444         name = self.get_dns_domain()
445
446         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
447         updates.append(u)
448         self.finish_name_packet(p, updates)
449
450         updates = []
451         r = dns.res_rec()
452         r.name = NAME
453         r.rr_type = dns.DNS_QTYPE_TXT
454         r.rr_class = dns.DNS_QCLASS_IN
455         r.ttl = 900
456         r.length = 0xffff
457         r.rdata = dns.txt_record()
458         r.rdata.txt = '"This is a test"'
459         updates.append(r)
460         p.nscount = len(updates)
461         p.nsrecs = updates
462
463         response = self.dns_transaction_udp(p)
464         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
465
466         # Now check the record is around
467         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
468         questions = []
469         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
470         questions.append(q)
471
472         self.finish_name_packet(p, questions)
473         response = self.dns_transaction_udp(p)
474         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
475
476         # Now delete the record
477         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
478         updates = []
479
480         name = self.get_dns_domain()
481
482         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
483         updates.append(u)
484         self.finish_name_packet(p, updates)
485
486         updates = []
487         r = dns.res_rec()
488         r.name = NAME
489         r.rr_type = dns.DNS_QTYPE_TXT
490         r.rr_class = dns.DNS_QCLASS_NONE
491         r.ttl = 0
492         r.length = 0xffff
493         r.rdata = dns.txt_record()
494         r.rdata.txt = '"This is a test"'
495         updates.append(r)
496         p.nscount = len(updates)
497         p.nsrecs = updates
498
499         response = self.dns_transaction_udp(p)
500         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
501
502         # And finally check it's gone
503         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
504         questions = []
505
506         q = self.make_name_question(NAME, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
507         questions.append(q)
508
509         self.finish_name_packet(p, questions)
510         response = self.dns_transaction_udp(p)
511         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
512
513     def test_update_add_mx_record(self):
514         "test adding MX records works"
515         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
516         updates = []
517
518         name = self.get_dns_domain()
519
520         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
521         updates.append(u)
522         self.finish_name_packet(p, updates)
523
524         updates = []
525         r = dns.res_rec()
526         r.name = "%s" % self.get_dns_domain()
527         r.rr_type = dns.DNS_QTYPE_MX
528         r.rr_class = dns.DNS_QCLASS_IN
529         r.ttl = 900
530         r.length = 0xffff
531         r.rdata = dns.mx_record()
532         r.rdata.preference = 10
533         r.rdata.exchange = 'mail.%s' % self.get_dns_domain()
534         updates.append(r)
535         p.nscount = len(updates)
536         p.nsrecs = updates
537
538         response = self.dns_transaction_udp(p)
539         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
540
541         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
542         questions = []
543
544         name = "%s" % self.get_dns_domain()
545         q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
546         questions.append(q)
547
548         self.finish_name_packet(p, questions)
549         response = self.dns_transaction_udp(p)
550         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
551         self.assertEqual(response.ancount, 1)
552         self.assertEqual(response.answers[0].rdata.preference, 10)
553         self.assertEqual(response.answers[0].rdata.exchange, 'mail.%s' % self.get_dns_domain())
554
555
556 class TestComplexQueries(DNSTest):
557
558     def setUp(self):
559         super(TestComplexQueries, self).setUp()
560         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
561         updates = []
562
563         name = self.get_dns_domain()
564
565         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
566         updates.append(u)
567         self.finish_name_packet(p, updates)
568
569         updates = []
570         r = dns.res_rec()
571         r.name = "cname_test.%s" % self.get_dns_domain()
572         r.rr_type = dns.DNS_QTYPE_CNAME
573         r.rr_class = dns.DNS_QCLASS_IN
574         r.ttl = 900
575         r.length = 0xffff
576         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
577         updates.append(r)
578         p.nscount = len(updates)
579         p.nsrecs = updates
580
581         response = self.dns_transaction_udp(p)
582         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
583
584     def tearDown(self):
585         super(TestComplexQueries, self).tearDown()
586         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
587         updates = []
588
589         name = self.get_dns_domain()
590
591         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
592         updates.append(u)
593         self.finish_name_packet(p, updates)
594
595         updates = []
596         r = dns.res_rec()
597         r.name = "cname_test.%s" % self.get_dns_domain()
598         r.rr_type = dns.DNS_QTYPE_CNAME
599         r.rr_class = dns.DNS_QCLASS_NONE
600         r.ttl = 0
601         r.length = 0xffff
602         r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
603         updates.append(r)
604         p.nscount = len(updates)
605         p.nsrecs = updates
606
607         response = self.dns_transaction_udp(p)
608         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
609
610     def test_one_a_query(self):
611         "create a query packet containing one query record"
612         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
613         questions = []
614
615         name = "cname_test.%s" % self.get_dns_domain()
616         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
617         print "asking for ", q.name
618         questions.append(q)
619
620         self.finish_name_packet(p, questions)
621         response = self.dns_transaction_udp(p)
622         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
623         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
624         self.assertEquals(response.ancount, 2)
625         self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
626         self.assertEquals(response.answers[0].rdata, "%s.%s" %
627                           (os.getenv('SERVER'), self.get_dns_domain()))
628         self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
629         self.assertEquals(response.answers[1].rdata,
630                           os.getenv('SERVER_IP'))
631
632 class TestInvalidQueries(DNSTest):
633
634     def test_one_a_query(self):
635         "send 0 bytes follows by create a query packet containing one query record"
636
637         s = None
638         try:
639             s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
640             s.connect((os.getenv('SERVER_IP'), 53))
641             s.send("", 0)
642         finally:
643             if s is not None:
644                 s.close()
645
646         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
647         questions = []
648
649         name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
650         q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
651         print "asking for ", q.name
652         questions.append(q)
653
654         self.finish_name_packet(p, questions)
655         response = self.dns_transaction_udp(p)
656         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
657         self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
658         self.assertEquals(response.ancount, 1)
659         self.assertEquals(response.answers[0].rdata,
660                           os.getenv('SERVER_IP'))
661
662 if __name__ == "__main__":
663     import unittest
664     unittest.main()