samba-tool: Add new samba-tool gpo aclcheck and test
authorAndrew Bartlett <abartlet@samba.org>
Mon, 5 Nov 2012 08:36:28 +0000 (19:36 +1100)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 15 Nov 2012 21:59:00 +0000 (08:59 +1100)
Reviewed-by: Jelmer Vernooij <jelmer@samba.org>
source4/scripting/python/samba/netcmd/gpo.py
source4/scripting/python/samba/tests/samba_tool/gpo.py

index 347231b523bdcfabc4dd767d01e11fc72e6e17d8..f70317ad82ad5889e1528407f5ee41f3146bd6f9 100644 (file)
@@ -1072,6 +1072,68 @@ class cmd_del(Command):
         self.outf.write("GPO %s deleted.\n" % gpo)
 
 
+class cmd_aclcheck(Command):
+    """Check all GPOs have matching LDAP and DS ACLs."""
+
+    synopsis = "%prog [options]"
+
+    takes_optiongroups = {
+        "sambaopts": options.SambaOptions,
+        "versionopts": options.VersionOptions,
+        "credopts": options.CredentialsOptions,
+    }
+
+    takes_options = [
+        Option("-H", "--URL", help="LDB URL for database or target server", type=str,
+               metavar="URL", dest="H")
+        ]
+
+    def run(self, H=None, sambaopts=None, credopts=None, versionopts=None):
+
+        self.lp = sambaopts.get_loadparm()
+        self.creds = credopts.get_credentials(self.lp, fallback_machine=True)
+
+        self.url = dc_url(self.lp, self.creds, H)
+
+        # We need to know writable DC to setup SMB connection
+        if H and H.startswith('ldap://'):
+            dc_hostname = H[7:]
+            self.url = H
+        else:
+            dc_hostname = netcmd_finddc(self.lp, self.creds)
+            self.url = dc_url(self.lp, self.creds, dc=dc_hostname)
+
+        samdb_connect(self)
+
+        msg = get_gpo_info(self.samdb, None)
+
+        for m in msg:
+            # verify UNC path
+            unc = m['gPCFileSysPath'][0]
+            try:
+                [dom_name, service, sharepath] = parse_unc(unc)
+            except ValueError:
+                raise CommandError("Invalid GPO path (%s)" % unc)
+
+            # SMB connect to DC
+            try:
+                conn = smb.SMB(dc_hostname, service, lp=self.lp, creds=self.creds)
+            except Exception:
+                raise CommandError("Error connecting to '%s' using SMB" % dc_hostname)
+
+            fs_sd = conn.get_acl(sharepath, security.SECINFO_OWNER | security.SECINFO_GROUP | security.SECINFO_DACL, security.SEC_FLAG_MAXIMUM_ALLOWED)
+
+            ds_sd_ndr = m['ntSecurityDescriptor'][0]
+            ds_sd = ndr_unpack(security.descriptor, ds_sd_ndr).as_sddl()
+
+            # Create a file system security descriptor
+            domain_sid = security.dom_sid(self.samdb.get_domain_sid())
+            expected_fs_sddl = dsacl2fsacl(ds_sd, domain_sid)
+
+            if (fs_sd.as_sddl(domain_sid) != expected_fs_sddl):
+                raise CommandError("Invalid GPO ACL %s on path (%s), should be %s" % (fs_sd.as_sddl(domain_sid), sharepath, expected_fs_sddl))
+
+
 class cmd_gpo(SuperCommand):
     """Group Policy Object (GPO) management."""
 
@@ -1088,3 +1150,4 @@ class cmd_gpo(SuperCommand):
     subcommands["fetch"] = cmd_fetch()
     subcommands["create"] = cmd_create()
     subcommands["del"] = cmd_del()
+    subcommands["aclcheck"] = cmd_aclcheck()
index 7ada91f80bf676fd169b97573e52ff3a406e8973..82e7268e1b9695d30313f1ec119b5dd23b2f4623 100644 (file)
@@ -44,6 +44,16 @@ os.environ["SERVER"])
         self.assertCmdSuccess(result, "Ensuring gpo fetched successfully")
         shutil.rmtree(os.path.join(self.tempdir, "policy"))
 
+    def test_show(self):
+        """Show a real GPO, and make sure it passes"""
+        (result, out, err) = self.runsubcmd("gpo", "show", self.gpo_guid, "-H", "ldap://%s" % os.environ["SERVER"])
+        self.assertCmdSuccess(result, "Ensuring gpo fetched successfully")
+
+    def test_aclcheck(self):
+        """Check all the GPOs on the remote server have correct ACLs"""
+        (result, out, err) = self.runsubcmd("gpo", "aclcheck", "-H", "ldap://%s" % os.environ["SERVER"], "-U%s%%%s" % (os.environ["USERNAME"], os.environ["PASSWORD"]))
+        self.assertCmdSuccess(result, "Ensuring gpo checked successfully")
+
     def setUp(self):
         """set up a temporary GPO to work with"""
         super(GpoCmdTestCase, self).setUp()