tools/glusterfind: add query command to list files
authorMilind Changire <mchangir@redhat.com>
Thu, 15 Oct 2015 09:31:23 +0000 (15:01 +0530)
committerVenky Shankar <vshankar@redhat.com>
Wed, 25 Nov 2015 06:29:07 +0000 (22:29 -0800)
When session information is maintained outside Gluster, there needs to
be some mechanism to list files starting from a time-stamp. This patch
implements the feature via the "query" command-line option.

The only caveat is that the first time the query command is run for the
volume, it will likely report that "historical changelogs are not
available". This is due to the fact that changelogs had not been turned
on for the volume so far. So the volume options need to be turned on
outside glusterfind or the since_time need to be greater than the
current time when the query command is run for the very first time for
the volume. The query command turns on the required volume options for
collecting changelogs.

Change-Id: I6cb7a57a5ecd166210e2eb4deede06d40ccfa996
BUG: 1272006
Signed-off-by: Milind Changire <mchangir@redhat.com>
Reviewed-on: http://review.gluster.org/12362
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Aravinda VK <avishwan@redhat.com>
tools/glusterfind/src/changelog.py
tools/glusterfind/src/main.py

index 4d0a190286e690b490943f5c0ca12336fc789ac5..d6f3dc188ac802816748198284b056f3294f7ba0 100644 (file)
@@ -351,6 +351,8 @@ def _get_args():
     parser.add_argument("brick", help="Brick Name")
     parser.add_argument("outfile", help="Output File")
     parser.add_argument("start", help="Start Time", type=int)
+    parser.add_argument("--only-query", help="Query mode only (no session)",
+                        action="store_true")
     parser.add_argument("--debug", help="Debug", action="store_true")
     parser.add_argument("--output-prefix", help="File prefix in output",
                         default=".")
@@ -378,19 +380,23 @@ if __name__ == "__main__":
     mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
            logger=logger)
 
-    try:
-        with open(status_file) as f:
-            start = int(f.read().strip())
-    except (ValueError, OSError, IOError):
+    if args.only_query:
         start = args.start
+    else:
+        try:
+            with open(status_file) as f:
+                start = int(f.read().strip())
+        except (ValueError, OSError, IOError):
+            start = args.start
 
     end = int(time.time()) - get_changelog_rollover_time(args.volume)
     logger.info("%s Started Changelog Crawl - Start: %s End: %s" % (args.brick,
                                                                     start,
                                                                     end))
     actual_end = changelog_crawl(args.brick, start, end, args)
-    with open(status_file_pre, "w", buffering=0) as f:
-        f.write(str(actual_end))
+    if not args.only_query:
+        with open(status_file_pre, "w", buffering=0) as f:
+            f.write(str(actual_end))
 
     logger.info("%s Finished Changelog Crawl - End: %s" % (args.brick,
                                                            actual_end))
index 9bc4872ad9bcc244a330c1309a552c89f7edb8fd..6d03cbed5f27bdf2ba522c571e084c42e06de110 100644 (file)
@@ -124,6 +124,25 @@ def run_cmd_nodes(task, args, **kwargs):
                 (["--only-namespace-changes"] if args.only_namespace_changes
                  else [])
 
+            opts["node_outfile"] = node_outfile
+            opts["copy_outfile"] = True
+        elif task == "query":
+            # If Full backup is requested or start time is zero, use brickfind
+            change_detector = conf.get_change_detector("changelog")
+            node_outfiles.append(node_outfile)
+
+            cmd = [change_detector,
+                   args.session,
+                   args.volume,
+                   brick,
+                   node_outfile,
+                   str(kwargs.get("start"))] + \
+                ["--only-query"] + \
+                ["--output-prefix", args.output_prefix] + \
+                (["--debug"] if args.debug else []) + \
+                (["--only-namespace-changes"]
+                    if args.only_namespace_changes else [])
+
             opts["node_outfile"] = node_outfile
             opts["copy_outfile"] = True
         elif task == "cleanup":
@@ -271,6 +290,23 @@ def _get_args():
                             help="List only namespace changes",
                             action="store_true")
 
+    # query <VOLUME> <OUTFILE> --since-time <SINCE_TIME>
+    #       [--output-prefix <OUTPUT_PREFIX>] [--full]
+    parser_pre = subparsers.add_parser('query')
+    parser_pre.add_argument("volume", help="Volume Name")
+    parser_pre.add_argument("outfile", help="Output File",
+                            action=StoreAbsPath)
+    parser_pre.add_argument("--since-time", help="UNIX epoch time since which "
+                            "listing is required", type=int)
+    parser_pre.add_argument("--debug", help="Debug", action="store_true")
+    parser_pre.add_argument("--disable-partial", help="Disable Partial find, "
+                            "Fail when one node fails", action="store_true")
+    parser_pre.add_argument("--output-prefix", help="File prefix in output",
+                            default=".")
+    parser_pre.add_argument("-N", "--only-namespace-changes",
+                            help="List only namespace changes",
+                            action="store_true")
+
     # post <SESSION> <VOLUME>
     parser_post = subparsers.add_parser('post')
     parser_post.add_argument("session", help="Session Name")
@@ -333,6 +369,45 @@ def ssh_setup(args):
     logger.info("Ssh key added to authorized_keys of Volume nodes")
 
 
+def enable_volume_options(args):
+    execute(["gluster", "volume", "set",
+             args.volume, "build-pgfid", "on"],
+            exit_msg="Failed to set volume option build-pgfid on",
+            logger=logger)
+    logger.info("Volume option set %s, build-pgfid on" % args.volume)
+
+    execute(["gluster", "volume", "set",
+             args.volume, "changelog.changelog", "on"],
+            exit_msg="Failed to set volume option "
+            "changelog.changelog on", logger=logger)
+    logger.info("Volume option set %s, changelog.changelog on"
+                % args.volume)
+
+    execute(["gluster", "volume", "set",
+             args.volume, "changelog.capture-del-path", "on"],
+            exit_msg="Failed to set volume option "
+            "changelog.capture-del-path on", logger=logger)
+    logger.info("Volume option set %s, changelog.capture-del-path on"
+                % args.volume)
+
+
+def write_output(args, outfilemerger):
+    with open(args.outfile, "a") as f:
+        for row in outfilemerger.get():
+            # Multiple paths in case of Hardlinks
+            paths = row[1].split(",")
+            row_2_rep = None
+            for p in paths:
+                if p == "":
+                    continue
+                p_rep = p.replace("%2F%2F", "%2F")
+                if not row_2_rep:
+                    row_2_rep = row[2].replace("%2F%2F", "%2F")
+                if p_rep == row_2_rep:
+                    continue
+                f.write("%s %s %s\n" % (row[0], p_rep, row_2_rep))
+
+
 def mode_create(session_dir, args):
     logger.debug("Init is called - Session: %s, Volume: %s"
                  % (args.session, args.volume))
@@ -360,26 +435,7 @@ def mode_create(session_dir, args):
 
     if not os.path.exists(status_file) or args.force:
         ssh_setup(args)
-
-        execute(["gluster", "volume", "set",
-                 args.volume, "build-pgfid", "on"],
-                exit_msg="Failed to set volume option build-pgfid on",
-                logger=logger)
-        logger.info("Volume option set %s, build-pgfid on" % args.volume)
-
-        execute(["gluster", "volume", "set",
-                 args.volume, "changelog.changelog", "on"],
-                exit_msg="Failed to set volume option "
-                "changelog.changelog on", logger=logger)
-        logger.info("Volume option set %s, changelog.changelog on"
-                    % args.volume)
-
-        execute(["gluster", "volume", "set",
-                 args.volume, "changelog.capture-del-path", "on"],
-                exit_msg="Failed to set volume option "
-                "changelog.capture-del-path on", logger=logger)
-        logger.info("Volume option set %s, changelog.capture-del-path on"
-                    % args.volume)
+        enable_volume_options(args)
 
     # Add Rollover time to current time to make sure changelogs
     # will be available if we use this time as start time
@@ -398,6 +454,59 @@ def mode_create(session_dir, args):
     sys.exit(0)
 
 
+def mode_query(session_dir, args):
+    # Verify volume status
+    cmd = ["gluster", 'volume', 'info', args.volume, "--xml"]
+    _, data, _ = execute(cmd,
+                         exit_msg="Failed to Run Gluster Volume Info",
+                         logger=logger)
+    try:
+        tree = etree.fromstring(data)
+        statusStr = tree.find('volInfo/volumes/volume/statusStr').text
+    except (ParseError, AttributeError) as e:
+        fail("Invalid Volume: %s" % e, logger=logger)
+
+    if statusStr != "Started":
+        fail("Volume %s is not online" % args.volume, logger=logger)
+
+    mkdirp(session_dir, exit_on_err=True, logger=logger)
+    mkdirp(os.path.join(session_dir, args.volume), exit_on_err=True,
+           logger=logger)
+    mkdirp(os.path.dirname(args.outfile), exit_on_err=True, logger=logger)
+
+    # Configure cluster for pasword-less SSH
+    ssh_setup(args)
+
+    # Enable volume options for changelog capture
+    enable_volume_options(args)
+
+    # Start query command processing
+    if args.since_time:
+        start = args.since_time
+        logger.debug("Query is called - Session: %s, Volume: %s, "
+                     "Start time: %s"
+                     % ("default", args.volume, start))
+
+        run_cmd_nodes("query", args, start=start)
+
+        # Merger
+        # Read each Changelogs db and generate finaldb
+        create_file(args.outfile, exit_on_err=True, logger=logger)
+        outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
+        write_output(args, outfilemerger)
+
+        try:
+            os.remove(args.outfile + ".db")
+        except (IOError, OSError):
+            pass
+
+        run_cmd_nodes("cleanup", args)
+
+        sys.stdout.write("Generated output file %s\n" % args.outfile)
+    else:
+        fail("Please specify --since-time option")
+
+
 def mode_pre(session_dir, args):
     """
     Read from Session file and write to session.pre file
@@ -441,15 +550,7 @@ def mode_pre(session_dir, args):
         create_file(args.outfile, exit_on_err=True, logger=logger)
         outfilemerger = OutputMerger(args.outfile + ".db", node_outfiles)
 
-        with open(args.outfile, "a") as f:
-            for row in outfilemerger.get():
-                # Multiple paths in case of Hardlinks
-                paths = row[1].split(",")
-                for p in paths:
-                    if p == "" or p.replace("%2F%2F","%2F") == \
-                       row[2].replace("%2F%2F","%2F"):
-                        continue
-                    f.write("%s %s %s\n" % (row[0], p, row[2]))
+        write_output(args, outfilemerger)
 
     try:
         os.remove(args.outfile + ".db")
@@ -566,18 +667,28 @@ def main():
         args = _get_args()
         mkdirp(conf.get_opt("session_dir"), exit_on_err=True)
 
+        # force the default session name if mode is "query"
+        if args.mode == "query":
+            args.session = "default"
+
         if args.mode == "list":
             session_dir = conf.get_opt("session_dir")
         else:
             session_dir = os.path.join(conf.get_opt("session_dir"),
                                        args.session)
 
-        if not os.path.exists(session_dir) and args.mode not in ["create",
-                                                                 "list"]:
+        if not os.path.exists(session_dir) and \
+                args.mode not in ["create", "list", "query"]:
+            fail("Invalid session %s" % args.session)
+
+        # "default" is a system defined session name
+        if args.mode in ["create", "post", "pre", "delete"] and \
+                args.session == "default":
             fail("Invalid session %s" % args.session)
 
         vol_dir = os.path.join(session_dir, args.volume)
-        if not os.path.exists(vol_dir) and args.mode not in ["create", "list"]:
+        if not os.path.exists(vol_dir) and args.mode not in \
+                ["create", "list", "query"]:
             fail("Session %s not created with volume %s" %
                  (args.session, args.volume))