from samba.dcerpc import dns, dnsp, dnsserver
from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
from samba.tests.subunitrun import SubunitOptions, TestProgram
-from samba import werror
+from samba import werror, WERRORError
import samba.getopt as options
import optparse
def setUp(self):
super(TestComplexQueries, self).setUp()
- name = "cname_test.%s" % self.get_dns_domain()
- rdata = "%s.%s" % (self.server, self.get_dns_domain())
- self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
def tearDown(self):
super(TestComplexQueries, self).tearDown()
- p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
- updates = []
- name = self.get_dns_domain()
+ def test_one_a_query(self):
+ "create a query packet containing one query record"
- u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
- updates.append(u)
- self.finish_name_packet(p, updates)
+ name = "cname_test.%s" % self.get_dns_domain()
+ rdata = "%s.%s" % (self.server, self.get_dns_domain())
+ self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
- updates = []
- r = dns.res_rec()
- r.name = "cname_test.%s" % self.get_dns_domain()
- r.rr_type = dns.DNS_QTYPE_CNAME
- r.rr_class = dns.DNS_QCLASS_NONE
- r.ttl = 0
- r.length = 0xffff
- r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
- updates.append(r)
- p.nscount = len(updates)
- p.nsrecs = updates
+ try:
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ # Create the record
+ name = "cname_test.%s" % self.get_dns_domain()
+ rdata = "%s.%s" % (self.server, self.get_dns_domain())
+ self.make_dns_update(name, rdata, dns.DNS_QTYPE_CNAME)
- def test_one_a_query(self):
- "create a query packet containing one query record"
- p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
- questions = []
+ p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+ questions = []
- name = "cname_test.%s" % self.get_dns_domain()
- q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
- questions.append(q)
+ # Check the record
+ name = "cname_test.%s" % self.get_dns_domain()
+ q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
+ print "asking for ", q.name
+ questions.append(q)
- self.finish_name_packet(p, questions)
- response = self.dns_transaction_udp(p)
- self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
- self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
- self.assertEquals(response.ancount, 2)
- self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
- self.assertEquals(response.answers[0].rdata, "%s.%s" %
- (self.server, self.get_dns_domain()))
- self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
- self.assertEquals(response.answers[1].rdata,
- self.server_ip)
+ self.finish_name_packet(p, questions)
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+ self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
+ self.assertEquals(response.ancount, 2)
+ self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
+ self.assertEquals(response.answers[0].rdata, "%s.%s" %
+ (self.server, self.get_dns_domain()))
+ self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
+ self.assertEquals(response.answers[1].rdata,
+ self.server_ip)
+
+ finally:
+ # Delete the record
+ p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+ updates = []
+
+ name = self.get_dns_domain()
+
+ u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+ updates.append(u)
+ self.finish_name_packet(p, updates)
+
+ updates = []
+ r = dns.res_rec()
+ r.name = "cname_test.%s" % self.get_dns_domain()
+ r.rr_type = dns.DNS_QTYPE_CNAME
+ r.rr_class = dns.DNS_QCLASS_NONE
+ r.ttl = 0
+ r.length = 0xffff
+ r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
+ updates.append(r)
+ p.nscount = len(updates)
+ p.nsrecs = updates
+
+ response = self.dns_transaction_udp(p)
+ self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
def test_cname_two_chain(self):
name0 = "cnamechain0.%s" % self.get_dns_domain()
zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
zone_create.fAging = 0
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
- self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
- 0,
- self.server_ip,
- None,
- 0,
- 'ZoneCreate',
- dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
- zone_create)
+ try:
+ self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0,
+ self.server_ip,
+ None,
+ 0,
+ 'ZoneCreate',
+ dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+ zone_create)
+ except WERRORError as e:
+ self.fail(str(e))
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+ try:
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ try:
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ try:
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+ try:
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
- self.check_query_txt(prefix, ['NULL'])
+ except WERRORError as e:
+ self.fail(str(e))
+
+ try:
+ self.check_query_txt(prefix, ['NULL'])
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
- self.check_query_txt(prefix, txt)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ try:
+ self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ try:
self.check_query_txt(prefix, txt)
+
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ try:
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
+ except WERRORError as e:
+ self.fail(str(e))
+ try:
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
--- /dev/null
+# These tests are expected to fail because we want to ensure that
+# unauthenicated updates are not permitted against the default
+# configuration, nor against an RODC
+
+samba.tests.dns.__main__.TestDNSUpdates.test_delete_record\(rodc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_readd_record\(rodc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_update_add_mx_record\(rodc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_update_add_txt_record\(rodc:local\)
+samba.tests.dns.__main__.TestInvalidQueries.test_one_a_query\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_empty_txt_records\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_hex_char_txt_record\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_null_char_txt_record\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_null_padded_txt_record\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_slash_txt_record\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_two_txt_records\(rodc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_delete_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_readd_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_update_add_mx_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestDNSUpdates.test_update_add_txt_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestInvalidQueries.test_one_a_query\(vampire_dc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_empty_txt_records\(vampire_dc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_hex_char_txt_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_null_char_txt_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_null_padded_txt_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_slash_txt_record\(vampire_dc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_two_txt_records\(vampire_dc:local\)
+samba.tests.dns.__main__.TestComplexQueries.test_cname_two_chain\(rodc:local\)
+samba.tests.dns.__main__.TestComplexQueries.test_one_a_query\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_empty_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_hex_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_null_char_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_padding_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_slash_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_two_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_txt_rpc_to_dns\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_soa_query\(rodc:local\)
+samba.tests.dns.__main__.TestComplexQueries.test_cname_two_chain\(vampire_dc:local\)
+samba.tests.dns.__main__.TestComplexQueries.test_one_a_query\(vampire_dc:local\)
+
+# The SOA override should not pass against the RODC, it must not overstamp
+samba.tests.dns.__main__.TestSimpleQueries.test_one_SOA_query\(rodc:local\)
+
+# The very first DC will have DNS records, but subsequent DCs only get entries into
+# the dns_hosts_file in our selftest env
+samba.tests.dns.__main__.TestSimpleQueries.test_one_SOA_query\(vampire_dc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_one_a_query\(vampire_dc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_one_a_query_tcp\(vampire_dc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_one_mx_query\(vampire_dc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_qtype_all_query\(vampire_dc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_soa_hostname_query\(vampire_dc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_one_a_query\(rodc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_one_a_query_tcp\(rodc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_one_mx_query\(rodc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_qtype_all_query\(rodc:local\)
+samba.tests.dns.__main__.TestSimpleQueries.test_soa_hostname_query\(rodc:local\)