s4 dns: Support TXT updates, add tests
authorKai Blin <kai@samba.org>
Sat, 10 Mar 2012 22:48:44 +0000 (23:48 +0100)
committerKai Blin <kai@samba.org>
Sat, 10 Mar 2012 23:31:37 +0000 (00:31 +0100)
source4/dns_server/dns_update.c
source4/scripting/python/samba/tests/dns.py

index 62cf9e555a348bd8630bd8f1495be8c1b5c7376f..f103d06b0a4d672ffbc9e2cd44efff65608f6df2 100644 (file)
@@ -285,6 +285,10 @@ static WERROR dns_rr_to_dnsp(TALLOC_CTX *mem_ctx,
                             const struct dns_res_rec *rrec,
                             struct dnsp_DnssrvRpcRecord *r)
 {
+       char *tmp;
+       char *txt_record_txt;
+       char *saveptr = NULL;
+
        if (rrec->rr_type == DNS_QTYPE_ALL) {
                return DNS_ERR(FORMAT_ERROR);
        }
@@ -334,15 +338,30 @@ static WERROR dns_rr_to_dnsp(TALLOC_CTX *mem_ctx,
                W_ERROR_HAVE_NO_MEMORY(r->data.mx.nameTarget);
                break;
        case DNS_QTYPE_TXT:
-               /* FIXME: This converts the TXT rr data into a single string.
-                *        Since dns server does not reply to qtype TXT,
-                *        this is not yet relevant.
-                */
-               r->data.txt.count = 1;
-               r->data.txt.str = talloc_array(mem_ctx, const char *, 1);
+               r->data.txt.count = 0;
+               r->data.txt.str = talloc_array(mem_ctx, const char *,
+                                              r->data.txt.count);
                W_ERROR_HAVE_NO_MEMORY(r->data.txt.str);
-               r->data.txt.str[0] = talloc_strdup(mem_ctx, rrec->rdata.txt_record.txt);
-               W_ERROR_HAVE_NO_MEMORY(r->data.txt.str[0]);
+
+               txt_record_txt = talloc_strdup(r->data.txt.str,
+                                              rrec->rdata.txt_record.txt);
+               W_ERROR_HAVE_NO_MEMORY(txt_record_txt);
+
+               tmp = strtok_r(txt_record_txt, "\"", &saveptr);
+               while (tmp) {
+                       if (strcmp(tmp, " ") == 0) {
+                               tmp = strtok_r(NULL, "\"", &saveptr);
+                               continue;
+                       }
+                       r->data.txt.str = talloc_realloc(mem_ctx, r->data.txt.str, const char *,
+                                                       r->data.txt.count+1);
+                       r->data.txt.str[r->data.txt.count] = talloc_strdup(r->data.txt.str, tmp);
+                       W_ERROR_HAVE_NO_MEMORY(r->data.txt.str[r->data.txt.count]);
+
+                       r->data.txt.count++;
+                       tmp = strtok_r(NULL, "\"", &saveptr);
+               }
+
                break;
        default:
                DEBUG(0, ("Got a qytpe of %d\n", rrec->rr_type));
@@ -390,6 +409,8 @@ static WERROR handle_one_update(struct dns_server *dns,
        case DNS_QTYPE_TXT:
                break;
        default:
+               DEBUG(0, ("Can't handle updates of type %u yet\n",
+                         update->rr_type));
                return DNS_ERR(NOT_IMPLEMENTED);
        }
 
index 26f80898225d80d683c931c489750a98534b7797..621bd16cd5c28f76129a14af11d7ce93c1aabe6b 100644 (file)
@@ -349,6 +349,87 @@ class DNSTest(TestCase):
         response = self.dns_transaction_udp(p)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
 
+    def test_update_add_txt_record(self):
+        "test adding records works"
+        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+        updates = []
+
+        name = self.get_dns_domain()
+
+        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+        updates.append(u)
+        self.finish_name_packet(p, updates)
+
+        updates = []
+        r = dns.res_rec()
+        r.name = "textrec.%s" % self.get_dns_domain()
+        r.rr_type = dns.DNS_QTYPE_TXT
+        r.rr_class = dns.DNS_QCLASS_IN
+        r.ttl = 900
+        r.length = 0xffff
+        r.rdata = dns.txt_record()
+        r.rdata.txt = '"This is a test"'
+        updates.append(r)
+        p.nscount = len(updates)
+        p.nsrecs = updates
+
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        questions = []
+
+        name = "textrec.%s" % self.get_dns_domain()
+        q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+        questions.append(q)
+
+        self.finish_name_packet(p, questions)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.assertEquals(response.ancount, 1)
+        self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
+
+    def test_update_add_two_txt_records(self):
+        "test adding two txt records works"
+        p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
+        updates = []
+
+        name = self.get_dns_domain()
+
+        u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
+        updates.append(u)
+        self.finish_name_packet(p, updates)
+
+        updates = []
+        r = dns.res_rec()
+        r.name = "textrec2.%s" % self.get_dns_domain()
+        r.rr_type = dns.DNS_QTYPE_TXT
+        r.rr_class = dns.DNS_QCLASS_IN
+        r.ttl = 900
+        r.length = 0xffff
+        r.rdata = dns.txt_record()
+        r.rdata.txt = '"This is a test" "and this is a test, too"'
+        updates.append(r)
+        p.nscount = len(updates)
+        p.nsrecs = updates
+
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+
+        p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
+        questions = []
+
+        name = "textrec2.%s" % self.get_dns_domain()
+        q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
+        questions.append(q)
+
+        self.finish_name_packet(p, questions)
+        response = self.dns_transaction_udp(p)
+        self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
+        self.assertEquals(response.ancount, 1)
+        self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
+
+
 if __name__ == "__main__":
     import unittest
     unittest.main()