samba-tool: add dns cleanup cmd
[metze/samba/wip.git] / python / samba / tests / samba_tool / dnscmd.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 import os
19 import ldb
20
21 from samba.auth import system_session
22 from samba.samdb import SamDB
23 from samba.ndr import ndr_unpack, ndr_pack
24 from samba.dcerpc import dnsp
25 from samba.tests.samba_tool.base import SambaToolCmdTest
26
27 class DnsCmdTestCase(SambaToolCmdTest):
28     def setUp(self):
29         super(DnsCmdTestCase, self).setUp()
30
31         self.dburl = "ldap://%s" % os.environ["SERVER"]
32         self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
33                                           os.environ["DC_PASSWORD"])
34
35         self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
36         self.config_dn = str(self.samdb.get_config_basedn())
37
38         self.testip = "192.168.0.193"
39         self.testip2 = "192.168.0.194"
40
41         self.addZone()
42
43         # Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
44
45         good_dns = ["SAMDOM.EXAMPLE.COM",
46                     "1.EXAMPLE.COM",
47                     "%sEXAMPLE.COM" % ("1."*100),
48                     "EXAMPLE",
49                     "\n.COM",
50                     "!@#$%^&*()_",
51                     "HIGH\xFFBYTE",
52                     "@.EXAMPLE.COM",
53                     "."]
54         bad_dns = ["...",
55                    ".EXAMPLE.COM",
56                    ".EXAMPLE.",
57                    "",
58                    "SAMDOM..EXAMPLE.COM"]
59
60         good_mx = ["SAMDOM.EXAMPLE.COM 65530"]
61         bad_mx = ["SAMDOM.EXAMPLE.COM -1",
62                   "SAMDOM.EXAMPLE.COM",
63                   " ",
64                   "SAMDOM.EXAMPLE.COM 1 1",
65                   "SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
66
67         good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530"]
68         bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
69                    "SAMDOM.EXAMPLE.COM 0 0 65536",
70                    "SAMDOM.EXAMPLE.COM 65536 0 0" ]
71
72         for bad_dn in bad_dns:
73             bad_mx.append("%s 1" % bad_dn)
74             bad_srv.append("%s 0 0 0" % bad_dn)
75         for good_dn in good_dns:
76             good_mx.append("%s 1" % good_dn)
77             good_srv.append("%s 0 0 0" % good_dn)
78
79         self.good_records = {
80                 "A":["192.168.0.1", "255.255.255.255"],
81                 "AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
82                         "0000:0000:0000:0000:0000:0000:0000:0000",
83                         "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
84                         "1234:1234:1234::",
85                         "1234:5678:9ABC:DEF0::",
86                         "0000:0000::0000",
87                         "1234::5678:9ABC:0000:0000:0000:0000",
88                         "::1",
89                         "::",
90                         "1:1:1:1:1:1:1:1"],
91                 "PTR":good_dns,
92                 "CNAME":good_dns,
93                 "NS":good_dns,
94                 "MX":good_mx,
95                 "SRV":good_srv,
96                 "TXT":["text", "", "@#!", "\n"]
97         }
98
99         self.bad_records = {
100                 "A":["192.168.0.500",
101                      "255.255.255.255/32"],
102                 "AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
103                         "0000:0000:0000:0000:0000:0000:0000:0000/1",
104                         "AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
105                         "1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
106                         "1234:5678:9ABC:DEF0:1234:5678:9ABC",
107                         "1111::1111::1111"],
108                 "PTR":bad_dns,
109                 "CNAME":bad_dns,
110                 "NS":bad_dns,
111                 "MX":bad_mx,
112                 "SRV":bad_srv
113         }
114
115     def tearDown(self):
116         self.deleteZone()
117         super(DnsCmdTestCase, self).tearDown()
118
119     def resetZone(self):
120         self.deleteZone()
121         self.addZone()
122
123     def addZone(self):
124         self.zone = "zone"
125         result, out, err = self.runsubcmd("dns",
126                                           "zonecreate",
127                                           os.environ["SERVER"],
128                                           self.zone,
129                                           self.creds_string)
130         self.assertCmdSuccess(result, out, err)
131
132     def deleteZone(self):
133         result, out, err = self.runsubcmd("dns",
134                                           "zonedelete",
135                                           os.environ["SERVER"],
136                                           self.zone,
137                                           self.creds_string)
138         self.assertCmdSuccess(result, out, err)
139
140     def get_record_from_db(self, zone_name, record_name):
141         zones = self.samdb.search(base="DC=DomainDnsZones,%s"
142                                   % self.samdb.get_default_basedn(),
143                                   scope=ldb.SCOPE_SUBTREE,
144                                   expression="(objectClass=dnsZone)",
145                                   attrs=["cn"])
146
147         for zone in zones:
148             if zone_name in str(zone.dn):
149                 zone_dn = zone.dn
150                 break
151
152         records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
153                                     expression="(objectClass=dnsNode)",
154                                     attrs=["dnsRecord"])
155
156         for old_packed_record in records:
157             if record_name in str(old_packed_record.dn):
158                 return (old_packed_record.dn,
159                         ndr_unpack(dnsp.DnssrvRpcRecord,
160                                    old_packed_record["dnsRecord"][0]))
161
162     def test_rank_none(self):
163         record_str = "192.168.50.50"
164         record_type_str = "A"
165
166         result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
167                                           self.zone, "testrecord", record_type_str,
168                                           record_str, self.creds_string)
169         self.assertCmdSuccess(result, out, err,
170                               "Failed to add record '%s' with type %s."
171                               % (record_str, record_type_str))
172
173         dn, record = self.get_record_from_db(self.zone, "testrecord")
174         record.rank = 0 # DNS_RANK_NONE
175         res = self.samdb.dns_replace_by_dn(dn, [record])
176         if res is not None:
177             self.fail("Unable to update dns record to have DNS_RANK_NONE.")
178
179         errors = []
180
181         # The record should still exist
182         result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
183                                           self.zone, "testrecord", record_type_str,
184                                           self.creds_string)
185         try:
186             self.assertCmdSuccess(result, out, err,
187                                   "Failed to query for a record" \
188                                   "which had DNS_RANK_NONE.")
189             self.assertTrue("testrecord" in out and record_str in out,
190                             "Query for a record which had DNS_RANK_NONE" \
191                             "succeeded but produced no resulting records.")
192         except AssertionError, e:
193             # Windows produces no resulting records
194             pass
195
196         # We should not be able to add a duplicate
197         result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
198                                           self.zone, "testrecord", record_type_str,
199                                           record_str, self.creds_string)
200         try:
201             self.assertCmdFail(result, "Successfully added duplicate record" \
202                                "of one which had DNS_RANK_NONE.")
203         except AssertionError, e:
204             errors.append(e)
205
206         # We should be able to delete it
207         result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
208                                           self.zone, "testrecord", record_type_str,
209                                           record_str, self.creds_string)
210         try:
211             self.assertCmdSuccess(result, out, err, "Failed to delete record" \
212                                   "which had DNS_RANK_NONE.")
213         except AssertionError, e:
214             errors.append(e)
215
216         # Now the record should not exist
217         result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
218                                           self.zone, "testrecord",
219                                           record_type_str, self.creds_string)
220         try:
221             self.assertCmdFail(result, "Successfully queried for deleted record" \
222                                "which had DNS_RANK_NONE.")
223         except AssertionError, e:
224             errors.append(e)
225
226         if len(errors) > 0:
227             err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
228             for error in errors:
229                 err_str = err_str + "\n" + str(error)
230             raise AssertionError(err_str)
231
232     def test_accept_valid_commands(self):
233         """
234         For all good records, attempt to add, query and delete them.
235         """
236         num_failures = 0
237         failure_msgs = []
238         for dnstype in self.good_records:
239             for record in self.good_records[dnstype]:
240                 try:
241                     result, out, err = self.runsubcmd("dns", "add",
242                                                       os.environ["SERVER"],
243                                                       self.zone, "testrecord",
244                                                       dnstype, record,
245                                                       self.creds_string)
246                     self.assertCmdSuccess(result, out, err, "Failed to add" \
247                                           "record %s with type %s."
248                                           % (record, dnstype))
249
250                     result, out, err = self.runsubcmd("dns", "query",
251                                                       os.environ["SERVER"],
252                                                       self.zone, "testrecord",
253                                                       dnstype,
254                                                       self.creds_string)
255                     self.assertCmdSuccess(result, out, err, "Failed to query" \
256                                           "record %s with qualifier %s."
257                                           % (record, dnstype))
258
259                     result, out, err = self.runsubcmd("dns", "delete",
260                                                       os.environ["SERVER"],
261                                                       self.zone, "testrecord",
262                                                       dnstype, record,
263                                                       self.creds_string)
264                     self.assertCmdSuccess(result, out, err, "Failed to remove" \
265                                           "record %s with type %s."
266                                           % (record, dnstype))
267                 except AssertionError as e:
268                     num_failures = num_failures + 1
269                     failure_msgs.append(e)
270
271         if num_failures > 0:
272             for msg in failure_msgs:
273                 print(msg)
274             self.fail("Failed to accept valid commands. %d total failures." \
275                       "Errors above." % num_failures)
276
277     def test_reject_invalid_commands(self):
278         """
279         For all bad records, attempt to add them and update to them,
280         making sure that both operations fail.
281         """
282         num_failures = 0
283         failure_msgs = []
284
285         # Add invalid records and make sure they fail to be added
286         for dnstype in self.bad_records:
287             for record in self.bad_records[dnstype]:
288                 try:
289                     result, out, err = self.runsubcmd("dns", "add",
290                                                       os.environ["SERVER"],
291                                                       self.zone, "testrecord",
292                                                       dnstype, record,
293                                                       self.creds_string)
294                     self.assertCmdFail(result, "Successfully added invalid" \
295                                        "record '%s' of type '%s'."
296                                        % (record, dnstype))
297                 except AssertionError as e:
298                     num_failures = num_failures + 1
299                     failure_msgs.append(e)
300                     self.resetZone()
301                 try:
302                     result, out, err = self.runsubcmd("dns", "delete",
303                                                       os.environ["SERVER"],
304                                                       self.zone, "testrecord",
305                                                       dnstype, record,
306                                                       self.creds_string)
307                     self.assertCmdFail(result, "Successfully deleted invalid" \
308                                        "record '%s' of type '%s' which" \
309                                        "shouldn't exist." % (record, dnstype))
310                 except AssertionError as e:
311                     num_failures = num_failures + 1
312                     failure_msgs.append(e)
313                     self.resetZone()
314
315         # Update valid records to invalid ones and make sure they
316         # fail to be updated
317         for dnstype in self.bad_records:
318             for bad_record in self.bad_records[dnstype]:
319                 good_record = self.good_records[dnstype][0]
320
321                 try:
322                     result, out, err = self.runsubcmd("dns", "add",
323                                                       os.environ["SERVER"],
324                                                       self.zone, "testrecord",
325                                                       dnstype, good_record,
326                                                       self.creds_string)
327                     self.assertCmdSuccess(result, out, err, "Failed to add " \
328                                           "record '%s' with type %s."
329                                           % (record, dnstype))
330
331                     result, out, err = self.runsubcmd("dns", "update",
332                                                       os.environ["SERVER"],
333                                                       self.zone, "testrecord",
334                                                       dnstype, good_record,
335                                                       bad_record,
336                                                       self.creds_string)
337                     self.assertCmdFail(result, "Successfully updated valid " \
338                                        "record '%s' of type '%s' to invalid " \
339                                        "record '%s' of the same type."
340                                        % (good_record, dnstype, bad_record))
341
342                     result, out, err = self.runsubcmd("dns", "delete",
343                                                       os.environ["SERVER"],
344                                                       self.zone, "testrecord",
345                                                       dnstype, good_record,
346                                                       self.creds_string)
347                     self.assertCmdSuccess(result, out, err, "Could not delete " \
348                                           "valid record '%s' of type '%s'."
349                                           % (good_record, dnstype))
350                 except AssertionError as e:
351                     num_failures = num_failures + 1
352                     failure_msgs.append(e)
353                     self.resetZone()
354
355         if num_failures > 0:
356             for msg in failure_msgs:
357                 print(msg)
358             self.fail("Failed to reject invalid commands. %d total failures. " \
359                       "Errors above." % num_failures)
360
361     def test_update_invalid_type(self):
362         """
363         Make sure that a record can't be updated to one of a different type.
364         """
365         for dnstype1 in self.good_records:
366             record1 = self.good_records[dnstype1][0]
367             result, out, err = self.runsubcmd("dns", "add",
368                                               os.environ["SERVER"],
369                                               self.zone, "testrecord",
370                                               dnstype1, record1,
371                                               self.creds_string)
372             self.assertCmdSuccess(result, out, err, "Failed to add " \
373                                   "record %s with type %s."
374                                   % (record1, dnstype1))
375
376             for dnstype2 in self.good_records:
377                 record2 = self.good_records[dnstype2][0]
378
379                 # Make sure that record2 isn't a valid entry of dnstype1.
380                 # For example, any A-type will also be a valid TXT-type.
381                 result, out, err = self.runsubcmd("dns", "add",
382                                                   os.environ["SERVER"],
383                                                   self.zone, "testrecord",
384                                                   dnstype1, record2,
385                                                   self.creds_string)
386                 try:
387                     self.assertCmdFail(result)
388                 except AssertionError:
389                     continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
390
391                 # Check both ways: Give the current type and try to update,
392                 # and give the new type and try to update.
393                 result, out, err = self.runsubcmd("dns", "update",
394                                                   os.environ["SERVER"],
395                                                   self.zone, "testrecord",
396                                                   dnstype1, record1,
397                                                   record2, self.creds_string)
398                 self.assertCmdFail(result, "Successfully updated record '%s' " \
399                                    "to '%s', even though the latter is of " \
400                                    "type '%s' where '%s' was expected."
401                                    % (record1, record2, dnstype2, dnstype1))
402
403                 result, out, err = self.runsubcmd("dns", "update",
404                                                   os.environ["SERVER"],
405                                                   self.zone, "testrecord",
406                                                   dnstype2, record1, record2,
407                                                   self.creds_string)
408                 self.assertCmdFail(result, "Successfully updated record " \
409                                    "'%s' to '%s', even though the former " \
410                                    "is of type '%s' where '%s' was expected."
411                                    % (record1, record2, dnstype1, dnstype2))
412
413
414     def test_update_valid_type(self):
415         for dnstype in self.good_records:
416             for record in self.good_records[dnstype]:
417                 result, out, err = self.runsubcmd("dns", "add",
418                                                   os.environ["SERVER"],
419                                                   self.zone, "testrecord",
420                                                   dnstype, record,
421                                                   self.creds_string)
422                 self.assertCmdSuccess(result, out, err, "Failed to add " \
423                                       "record %s with type %s."
424                                       % (record, dnstype))
425
426                 # Update the record to be the same.
427                 result, out, err = self.runsubcmd("dns", "update",
428                                                   os.environ["SERVER"],
429                                                   self.zone, "testrecord",
430                                                   dnstype, record, record,
431                                                   self.creds_string)
432                 self.assertCmdFail(result, "Successfully updated record " \
433                                    "'%s' to be exactly the same." % record)
434
435                 result, out, err = self.runsubcmd("dns", "delete",
436                                                   os.environ["SERVER"],
437                                                   self.zone, "testrecord",
438                                                   dnstype, record,
439                                                   self.creds_string)
440                 self.assertCmdSuccess(result, out, err, "Could not delete " \
441                                       "valid record '%s' of type '%s'."
442                                       % (record, dnstype))
443
444         for record in self.good_records["SRV"]:
445             result, out, err = self.runsubcmd("dns", "add",
446                                               os.environ["SERVER"],
447                                               self.zone, "testrecord",
448                                               "SRV", record,
449                                               self.creds_string)
450             self.assertCmdSuccess(result, out, err, "Failed to add " \
451                                   "record %s with type 'SRV'." % record)
452
453             split = record.split(' ')
454             new_bit = str(int(split[3]) + 1)
455             new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
456
457             result, out, err = self.runsubcmd("dns", "update",
458                                               os.environ["SERVER"],
459                                               self.zone, "testrecord",
460                                               "SRV", record,
461                                               new_record, self.creds_string)
462             self.assertCmdSuccess(result, out, err, "Failed to update record " \
463                                   "'%s' of type '%s' to '%s'."
464                                   % (record, "SRV", new_record))
465
466             result, out, err = self.runsubcmd("dns", "query",
467                                               os.environ["SERVER"],
468                                               self.zone, "testrecord",
469                                               "SRV", self.creds_string)
470             self.assertCmdSuccess(result, out, err, "Failed to query for " \
471                                   "record '%s' of type '%s'."
472                                   % (new_record, "SRV"))
473
474             result, out, err = self.runsubcmd("dns", "delete",
475                                               os.environ["SERVER"],
476                                               self.zone, "testrecord",
477                                               "SRV", new_record,
478                                               self.creds_string)
479             self.assertCmdSuccess(result, out, err, "Could not delete " \
480                                   "valid record '%s' of type '%s'."
481                                   % (new_record, "SRV"))
482
483         # Since 'dns update' takes the current value as a parameter, make sure
484         # we can't enter the wrong current value for a given record.
485         for dnstype in self.good_records:
486             if len(self.good_records[dnstype]) < 3:
487                 continue # Not enough records of this type to do this test
488
489             used_record = self.good_records[dnstype][0]
490             unused_record = self.good_records[dnstype][1]
491             new_record = self.good_records[dnstype][2]
492
493             result, out, err = self.runsubcmd("dns", "add",
494                                               os.environ["SERVER"],
495                                               self.zone, "testrecord",
496                                               dnstype, used_record,
497                                               self.creds_string)
498             self.assertCmdSuccess(result, out, err, "Failed to add record %s " \
499                                   "with type %s." % (used_record, dnstype))
500
501             result, out, err = self.runsubcmd("dns", "update",
502                                               os.environ["SERVER"],
503                                               self.zone, "testrecord",
504                                               dnstype, unused_record,
505                                               new_record,
506                                               self.creds_string)
507             self.assertCmdFail(result, "Successfully updated record '%s' " \
508                                "from '%s' to '%s', even though the given " \
509                                "source record is incorrect."
510                                % (used_record, unused_record, new_record))
511
512     def test_invalid_types(self):
513         result, out, err = self.runsubcmd("dns", "add",
514                                           os.environ["SERVER"],
515                                           self.zone, "testrecord",
516                                           "SOA", "test",
517                                           self.creds_string)
518         self.assertCmdFail(result, "Successfully added record of type SOA, " \
519                            "when this type should not be available.")
520         self.assertTrue("type SOA is not supported" in err,
521                         "Invalid error message '%s' when attempting to " \
522                         "add record of type SOA." % err)
523
524     def test_add_overlapping_different_type(self):
525         """
526         Make sure that we can add an entry with the same name as an existing one but a different type.
527         """
528
529         i = 0
530         for dnstype1 in self.good_records:
531             record1 = self.good_records[dnstype1][0]
532             for dnstype2 in self.good_records:
533                 # Only do some subset of dns types, otherwise it takes a long time.
534                 i += 1
535                 if i % 4 != 0:
536                     continue
537
538                 if dnstype1 == dnstype2:
539                     continue
540
541                 record2 = self.good_records[dnstype2][0]
542
543                 result, out, err = self.runsubcmd("dns", "add",
544                                                   os.environ["SERVER"],
545                                                   self.zone, "testrecord",
546                                                   dnstype1, record1,
547                                                   self.creds_string)
548                 self.assertCmdSuccess(result, out, err, "Failed to add record " \
549                                       "'%s' of type '%s'." % (record1, dnstype1))
550
551                 result, out, err = self.runsubcmd("dns", "add",
552                                                   os.environ["SERVER"],
553                                                   self.zone, "testrecord",
554                                                   dnstype2, record2,
555                                                   self.creds_string)
556                 self.assertCmdSuccess(result, out, err, "Failed to add record " \
557                                       "'%s' of type '%s' when a record '%s' " \
558                                       "of type '%s' with the same name exists."
559                                       % (record1, dnstype1, record2, dnstype2))
560
561                 result, out, err = self.runsubcmd("dns", "query",
562                                                   os.environ["SERVER"],
563                                                   self.zone, "testrecord",
564                                                   dnstype1, self.creds_string)
565                 self.assertCmdSuccess(result, out, err, "Failed to query for " \
566                                       "record '%s' of type '%s' when a new " \
567                                       "record '%s' of type '%s' with the same " \
568                                       "name was added."
569                                       % (record1, dnstype1, record2, dnstype2))
570
571                 result, out, err = self.runsubcmd("dns", "query",
572                                                   os.environ["SERVER"],
573                                                   self.zone, "testrecord",
574                                                   dnstype2, self.creds_string)
575                 self.assertCmdSuccess(result, out, err, "Failed to query " \
576                                       "record '%s' of type '%s' which should " \
577                                       "have been added with the same name as " \
578                                       "record '%s' of type '%s'."
579                                       % (record2, dnstype2, record1, dnstype1))
580
581                 result, out, err = self.runsubcmd("dns", "delete",
582                                                   os.environ["SERVER"],
583                                                   self.zone, "testrecord",
584                                                   dnstype1, record1,
585                                                   self.creds_string)
586                 self.assertCmdSuccess(result, out, err, "Failed to delete " \
587                                       "record '%s' of type '%s'."
588                                       % (record1, dnstype1))
589
590                 result, out, err = self.runsubcmd("dns", "delete",
591                                                   os.environ["SERVER"],
592                                                   self.zone, "testrecord",
593                                                   dnstype2, record2,
594                                                   self.creds_string)
595                 self.assertCmdSuccess(result, out, err, "Failed to delete " \
596                                       "record '%s' of type '%s'."
597                                       % (record2, dnstype2))
598
599     def test_query_deleted_record(self):
600         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
601                        "testrecord", "A", self.testip, self.creds_string)
602         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
603                        "testrecord", "A", self.testip, self.creds_string)
604
605         result, out, err = self.runsubcmd("dns", "query",
606                                           os.environ["SERVER"],
607                                           self.zone, "testrecord",
608                                           "A", self.creds_string)
609         self.assertCmdFail(result)
610
611     def test_add_duplicate_record(self):
612         for record_type in self.good_records:
613             result, out, err = self.runsubcmd("dns", "add",
614                                               os.environ["SERVER"],
615                                               self.zone, "testrecord",
616                                               record_type,
617                                               self.good_records[record_type][0],
618                                               self.creds_string)
619             self.assertCmdSuccess(result, out, err)
620             result, out, err = self.runsubcmd("dns", "add",
621                                               os.environ["SERVER"],
622                                               self.zone, "testrecord",
623                                               record_type,
624                                               self.good_records[record_type][0],
625                                               self.creds_string)
626             self.assertCmdFail(result)
627             result, out, err = self.runsubcmd("dns", "query",
628                                               os.environ["SERVER"],
629                                               self.zone, "testrecord",
630                                               record_type, self.creds_string)
631             self.assertCmdSuccess(result, out, err)
632             result, out, err = self.runsubcmd("dns", "delete",
633                                               os.environ["SERVER"],
634                                               self.zone, "testrecord",
635                                               record_type,
636                                               self.good_records[record_type][0],
637                                               self.creds_string)
638             self.assertCmdSuccess(result, out, err)
639
640     def test_remove_deleted_record(self):
641         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
642                        "testrecord", "A", self.testip, self.creds_string)
643         self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
644                        "testrecord", "A", self.testip, self.creds_string)
645
646         # Attempting to delete a record that has already been deleted or has never existed should fail
647         result, out, err = self.runsubcmd("dns", "delete",
648                                           os.environ["SERVER"],
649                                           self.zone, "testrecord",
650                                           "A", self.testip, self.creds_string)
651         self.assertCmdFail(result)
652         result, out, err = self.runsubcmd("dns", "query",
653                                           os.environ["SERVER"],
654                                           self.zone, "testrecord",
655                                           "A", self.creds_string)
656         self.assertCmdFail(result)
657         result, out, err = self.runsubcmd("dns", "delete",
658                                           os.environ["SERVER"],
659                                           self.zone, "testrecord2",
660                                           "A", self.testip, self.creds_string)
661         self.assertCmdFail(result)
662
663     def test_cleanup_record(self):
664         """
665         Test dns cleanup command is working fine.
666         """
667
668         # add a A record
669         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
670                        'testa', "A", self.testip, self.creds_string)
671
672         # the above A record points to this host
673         dnshostname = '{}.{}'.format('testa', self.zone.lower())
674
675         # add a CNAME record points to above host
676         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
677                        'testcname', "CNAME", dnshostname, self.creds_string)
678
679         # add a NS record
680         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
681                        'testns', "NS", dnshostname, self.creds_string)
682
683         # add a PTR record points to above host
684         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
685                        'testptr', "PTR", dnshostname, self.creds_string)
686
687         # add a SRV record points to above host
688         srv_record = "{} 65530 65530 65530".format(dnshostname)
689         self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
690                        'testsrv', "SRV", srv_record, self.creds_string)
691
692         # cleanup record for this dns host
693         self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
694                        dnshostname, self.creds_string)
695
696         # all records should be marked as dNSTombstoned
697         for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
698
699             records = self.samdb.search(
700                 base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
701                 scope=ldb.SCOPE_SUBTREE,
702                 expression="(&(objectClass=dnsNode)(name={}))".format(record_name),
703                 attrs=["dNSTombstoned"])
704
705             self.assertEqual(len(records), 1)
706             for record in records:
707                 self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
708
709     def test_cleanup_multi_srv_record(self):
710         """
711         Test dns cleanup command for multi-valued SRV record.
712
713         Steps:
714         - Add 2 A records host1 and host2
715         - Add a SRV record srv1 and points to both host1 and host2
716         - Run cleanup command for host1
717         - Check records for srv1, data for host1 should be gone and host2 is kept.
718         """
719
720         hosts = ['host1', 'host2']  # A record names
721         srv_name = 'srv1'
722
723         # add A records
724         for host in hosts:
725             self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
726                            host, "A", self.testip, self.creds_string)
727
728             # the above A record points to this host
729             dnshostname = '{}.{}'.format(host, self.zone.lower())
730
731             # add a SRV record points to above host
732             srv_record = "{} 65530 65530 65530".format(dnshostname)
733             self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
734                            srv_name, "SRV", srv_record, self.creds_string)
735
736         records = self.samdb.search(
737             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
738             scope=ldb.SCOPE_SUBTREE,
739             expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
740             attrs=['dnsRecord'])
741         # should have 2 records here
742         self.assertEqual(len(records[0]['dnsRecord']), 2)
743
744         # cleanup record for dns host1
745         dnshostname1 = 'host1.{}'.format(self.zone.lower())
746         self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
747                        dnshostname1, self.creds_string)
748
749         records = self.samdb.search(
750             base="DC=DomainDnsZones,{}".format(self.samdb.get_default_basedn()),
751             scope=ldb.SCOPE_SUBTREE,
752             expression="(&(objectClass=dnsNode)(name={}))".format(srv_name),
753             attrs=['dnsRecord'])
754
755         # dnsRecord for host1 should be deleted
756         self.assertEqual(len(records[0]['dnsRecord']), 1)
757
758         # unpack data
759         dns_record_bin = records[0]['dnsRecord'][0]
760         dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
761
762         # dnsRecord for host2 is still there and is the only one
763         dnshostname2 = 'host2.{}'.format(self.zone.lower())
764         self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)
765
766     def test_dns_wildcards(self):
767         """
768         Ensure that DNS wild card entries can be added deleted and queried
769         """
770         num_failures = 0
771         failure_msgs = []
772         records = [("*.",       "MISS",         "A", "1.1.1.1"),
773                    ("*.SAMDOM", "MISS.SAMDOM",  "A", "1.1.1.2")]
774         for (name, miss, dnstype, record) in records:
775             try:
776                 result, out, err = self.runsubcmd("dns", "add",
777                                                   os.environ["SERVER"],
778                                                   self.zone, name,
779                                                   dnstype, record,
780                                                   self.creds_string)
781                 self.assertCmdSuccess(
782                     result,
783                     out,
784                     err,
785                     ("Failed to add record %s (%s) with type %s."
786                      % (name, record, dnstype)))
787
788                 result, out, err = self.runsubcmd("dns", "query",
789                                                   os.environ["SERVER"],
790                                                   self.zone, name,
791                                                   dnstype,
792                                                   self.creds_string)
793                 self.assertCmdSuccess(
794                     result,
795                     out,
796                     err,
797                     ("Failed to query record %s with qualifier %s."
798                      % (record, dnstype)))
799
800                 # dns tool does not perform dns wildcard search if the name
801                 # does not match
802                 result, out, err = self.runsubcmd("dns", "query",
803                                                   os.environ["SERVER"],
804                                                   self.zone, miss,
805                                                   dnstype,
806                                                   self.creds_string)
807                 self.assertCmdFail(
808                     result,
809                     ("Failed to query record %s with qualifier %s."
810                      % (record, dnstype)))
811
812                 result, out, err = self.runsubcmd("dns", "delete",
813                                                   os.environ["SERVER"],
814                                                   self.zone, name,
815                                                   dnstype, record,
816                                                   self.creds_string)
817                 self.assertCmdSuccess(
818                     result,
819                     out,
820                     err,
821                     ("Failed to remove record %s with type %s."
822                      % (record, dnstype)))
823             except AssertionError as e:
824                 num_failures = num_failures + 1
825                 failure_msgs.append(e)
826
827         if num_failures > 0:
828             for msg in failure_msgs:
829                 print(msg)
830             self.fail("Failed to accept valid commands. %d total failures."
831                       "Errors above." % num_failures)