import random
from samba import socket
import samba.ndr as ndr
-import samba.dcerpc.dns as dns
+from samba import credentials, param
from samba.tests import TestCase
+from samba.dcerpc import dns, dnsp, dnsserver
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
+def make_txt_record(records):
+ rdata_txt = dns.txt_record()
+ s_list = dnsp.string_list()
+ s_list.count = len(records)
+ s_list.str = records
+ rdata_txt.txt = s_list
+ return rdata_txt
class DNSTest(TestCase):
+ def get_loadparm(self):
+ lp = param.LoadParm()
+ lp.load(os.getenv("SMB_CONF_PATH"))
+ return lp
+
def errstr(self, errcode):
"Return a readable error code"
string_codes = [
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
self.assertEquals(response.ancount, 0)
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
+ q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
+ print "asking for ", q.name
+ questions.append(q)
+
+ self.finish_name_packet(p, questions)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 0)
+
def test_two_queries(self):
"create a query packet containing two query records"
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
self.assertEquals(response.ancount, 1)
+ self.assertEquals(response.answers[0].rdata.minimum, 3600)
class TestDNSUpdates(DNSTest):
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test"'
+ rdata = make_txt_record(['"This is a test"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assertEquals(response.ancount, 1)
- self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
+ self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
def test_update_add_two_txt_records(self):
"test adding two txt records works"
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test" "and this is a test, too"'
+ rdata = make_txt_record(['"This is a test"',
+ '"and this is a test, too"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assertEquals(response.ancount, 1)
- self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
+ self.assertEquals(response.answers[0].rdata.txt.str[0], '"This is a test"')
+ self.assertEquals(response.answers[0].rdata.txt.str[1], '"and this is a test, too"')
def test_delete_record(self):
"Test if deleting records works"
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test"'
+ rdata = make_txt_record(['"This is a test"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
r.rr_class = dns.DNS_QCLASS_NONE
r.ttl = 0
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test"'
+ rdata = make_txt_record(['"This is a test"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test"'
+ rdata = make_txt_record(['"This is a test"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
r.rr_class = dns.DNS_QCLASS_NONE
r.ttl = 0
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test"'
+ rdata = make_txt_record(['"This is a test"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = dns.txt_record()
- rdata.txt = '"This is a test"'
+ rdata = make_txt_record(['"This is a test"'])
r.rdata = rdata
updates.append(r)
p.nscount = len(updates)
self.assertEquals(response.answers[0].rdata,
os.getenv('SERVER_IP'))
+ def test_one_a_reply(self):
+ "send a reply instead of a query"
+
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
+
+ name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
+ q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
+ print "asking for ", q.name
+ questions.append(q)
+
+ self.finish_name_packet(p, questions)
+ p.operation |= dns.DNS_FLAG_REPLY
+ s = None
+ try:
+ send_packet = ndr.ndr_pack(p)
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ host=os.getenv('SERVER_IP')
+ s.connect((host, 53))
+ tcp_packet = struct.pack('!H', len(send_packet))
+ tcp_packet += send_packet
+ s.send(tcp_packet, 0)
+ recv_packet = s.recv(0xffff + 2, 0)
+ self.assertEquals(0, len(recv_packet))
+ finally:
+ if s is not None:
+ s.close()
+
+
if __name__ == "__main__":
import unittest
unittest.main()