util: Add extra max_size argument to file_lines_load()
[ctdb.git] / tools / ctdb.c
index cefaff040c2e66a7c7b4961fb7df80ea7172ba8c..608d50a3000953be14f0846441aa605d224d9a9f 100644 (file)
@@ -30,6 +30,7 @@
 #include "../include/ctdb_private.h"
 #include "../common/rb_tree.h"
 #include "db_wrap.h"
+#include "lib/util/dlinklist.h"
 
 #define ERR_TIMEOUT    20      /* timed out trying to reach node */
 #define ERR_NONODE     21      /* node does not exist */
@@ -42,6 +43,7 @@ static struct {
        uint32_t pnn;
        uint32_t *nodes;
        int machinereadable;
+       const char *machineseparator;
        int verbose;
        int maxruntime;
        int printemptyrecords;
@@ -62,6 +64,29 @@ static int control_version(struct ctdb_context *ctdb, int argc, const char **arg
        return 0;
 }
 
+/* Like printf(3) but substitute for separator in format */
+static int printm(const char *format, ...) PRINTF_ATTRIBUTE(1,2);
+static int printm(const char *format, ...)
+{
+       va_list ap;
+       int ret;
+       size_t len = strlen(format);
+       char new_format[len+1];
+
+       strcpy(new_format, format);
+
+       if (options.machineseparator[0] != ':') {
+               all_string_sub(new_format,
+                              ":", options.machineseparator, len + 1);
+       }
+
+       va_start(ap, format);
+       ret = vprintf(new_format, ap);
+       va_end(ap);
+
+       return ret;
+}
+
 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                           \
                DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
                                   "Out of memory in " __location__ )); \
@@ -445,64 +470,64 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
 
        if (options.machinereadable){
                if (show_header) {
-                       printf("CTDB version:");
-                       printf("Current time of statistics:");
-                       printf("Statistics collected since:");
+                       printm("CTDB version:");
+                       printm("Current time of statistics:");
+                       printm("Statistics collected since:");
                        for (i=0;i<ARRAY_SIZE(fields);i++) {
-                               printf("%s:", fields[i].name);
+                               printm("%s:", fields[i].name);
                        }
-                       printf("num_reclock_ctdbd_latency:");
-                       printf("min_reclock_ctdbd_latency:");
-                       printf("avg_reclock_ctdbd_latency:");
-                       printf("max_reclock_ctdbd_latency:");
-
-                       printf("num_reclock_recd_latency:");
-                       printf("min_reclock_recd_latency:");
-                       printf("avg_reclock_recd_latency:");
-                       printf("max_reclock_recd_latency:");
-
-                       printf("num_call_latency:");
-                       printf("min_call_latency:");
-                       printf("avg_call_latency:");
-                       printf("max_call_latency:");
-
-                       printf("num_lockwait_latency:");
-                       printf("min_lockwait_latency:");
-                       printf("avg_lockwait_latency:");
-                       printf("max_lockwait_latency:");
-
-                       printf("num_childwrite_latency:");
-                       printf("min_childwrite_latency:");
-                       printf("avg_childwrite_latency:");
-                       printf("max_childwrite_latency:");
-                       printf("\n");
-               }
-               printf("%d:", CTDB_VERSION);
-               printf("%d:", (int)s->statistics_current_time.tv_sec);
-               printf("%d:", (int)s->statistics_start_time.tv_sec);
+                       printm("num_reclock_ctdbd_latency:");
+                       printm("min_reclock_ctdbd_latency:");
+                       printm("avg_reclock_ctdbd_latency:");
+                       printm("max_reclock_ctdbd_latency:");
+
+                       printm("num_reclock_recd_latency:");
+                       printm("min_reclock_recd_latency:");
+                       printm("avg_reclock_recd_latency:");
+                       printm("max_reclock_recd_latency:");
+
+                       printm("num_call_latency:");
+                       printm("min_call_latency:");
+                       printm("avg_call_latency:");
+                       printm("max_call_latency:");
+
+                       printm("num_lockwait_latency:");
+                       printm("min_lockwait_latency:");
+                       printm("avg_lockwait_latency:");
+                       printm("max_lockwait_latency:");
+
+                       printm("num_childwrite_latency:");
+                       printm("min_childwrite_latency:");
+                       printm("avg_childwrite_latency:");
+                       printm("max_childwrite_latency:");
+                       printm("\n");
+               }
+               printm("%d:", CTDB_VERSION);
+               printm("%d:", (int)s->statistics_current_time.tv_sec);
+               printm("%d:", (int)s->statistics_start_time.tv_sec);
                for (i=0;i<ARRAY_SIZE(fields);i++) {
-                       printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
-               }
-               printf("%d:", s->reclock.ctdbd.num);
-               printf("%.6f:", s->reclock.ctdbd.min);
-               printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
-               printf("%.6f:", s->reclock.ctdbd.max);
-
-               printf("%d:", s->reclock.recd.num);
-               printf("%.6f:", s->reclock.recd.min);
-               printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
-               printf("%.6f:", s->reclock.recd.max);
-
-               printf("%d:", s->call_latency.num);
-               printf("%.6f:", s->call_latency.min);
-               printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
-               printf("%.6f:", s->call_latency.max);
-
-               printf("%d:", s->childwrite_latency.num);
-               printf("%.6f:", s->childwrite_latency.min);
-               printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
-               printf("%.6f:", s->childwrite_latency.max);
-               printf("\n");
+                       printm("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
+               }
+               printm("%d:", s->reclock.ctdbd.num);
+               printm("%.6f:", s->reclock.ctdbd.min);
+               printm("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
+               printm("%.6f:", s->reclock.ctdbd.max);
+
+               printm("%d:", s->reclock.recd.num);
+               printm("%.6f:", s->reclock.recd.min);
+               printm("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
+               printm("%.6f:", s->reclock.recd.max);
+
+               printm("%d:", s->call_latency.num);
+               printm("%.6f:", s->call_latency.min);
+               printm("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
+               printm("%.6f:", s->call_latency.max);
+
+               printm("%d:", s->childwrite_latency.num);
+               printm("%.6f:", s->childwrite_latency.min);
+               printm("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
+               printm("%.6f:", s->childwrite_latency.max);
+               printm("\n");
        } else {
                printf("CTDB version %u\n", CTDB_VERSION);
                printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
@@ -718,6 +743,14 @@ static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char
                 0.0),
                dbstat->locks.latency.max,
                dbstat->locks.latency.num);
+       printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
+               "vacuum_latency     MIN/AVG/MAX",
+               dbstat->vacuum.latency.min,
+               (dbstat->vacuum.latency.num ?
+                dbstat->vacuum.latency.total /dbstat->vacuum.latency.num :
+                0.0),
+               dbstat->vacuum.latency.max,
+               dbstat->vacuum.latency.num);
        num_hot_keys = 0;
        for (i=0; i<dbstat->num_hot_keys; i++) {
                if (dbstat->hot_keys[i].count > 0) {
@@ -756,8 +789,8 @@ static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        if (options.machinereadable){
-               printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
-               printf(":%u:%u:%u:%lf\n",
+               printm(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
+               printm(":%u:%u:%u:%lf\n",
                        (unsigned int)uptime->current_time.tv_sec,
                        (unsigned int)uptime->ctdbd_start_time.tv_sec,
                        (unsigned int)uptime->last_recovery_finished.tv_sec,
@@ -811,38 +844,24 @@ static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
 
 
 struct pnn_node {
-       struct pnn_node *next;
-       const char *addr;
+       struct pnn_node *next, *prev;
+       ctdb_sock_addr addr;
        int pnn;
 };
 
-static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
+static struct pnn_node *read_pnn_node_file(TALLOC_CTX *mem_ctx,
+                                          const char *file)
 {
-       const char *nodes_list;
        int nlines;
        char **lines;
        int i, pnn;
        struct pnn_node *pnn_nodes = NULL;
        struct pnn_node *pnn_node;
-       struct pnn_node *tmp_node;
 
-       /* read the nodes file */
-       nodes_list = getenv("CTDB_NODES");
-       if (nodes_list == NULL) {
-               nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
-                                            getenv("CTDB_BASE"));
-               if (nodes_list == NULL) {
-                       DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
-                       exit(1);
-               }
-       }
-       lines = file_lines_load(nodes_list, &nlines, mem_ctx);
+       lines = file_lines_load(file, &nlines, 0, mem_ctx);
        if (lines == NULL) {
                return NULL;
        }
-       while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
-               nlines--;
-       }
        for (i=0, pnn=0; i<nlines; i++) {
                char *node;
 
@@ -860,37 +879,50 @@ static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
                }
                pnn_node = talloc(mem_ctx, struct pnn_node);
                pnn_node->pnn = pnn++;
-               pnn_node->addr = talloc_strdup(pnn_node, node);
-               pnn_node->next = pnn_nodes;
-               pnn_nodes = pnn_node;
-       }
 
-       /* swap them around so we return them in incrementing order */
-       pnn_node = pnn_nodes;
-       pnn_nodes = NULL;
-       while (pnn_node) {
-               tmp_node = pnn_node;
-               pnn_node = pnn_node->next;
+               if (!parse_ip(node, NULL, 0, &pnn_node->addr)) {
+                       DEBUG(DEBUG_ERR,
+                             ("Invalid IP address '%s' in file %s\n",
+                              node, file));
+                       /* Caller will free mem_ctx */
+                       return NULL;
+               }
 
-               tmp_node->next = pnn_nodes;
-               pnn_nodes = tmp_node;
+               DLIST_ADD_END(pnn_nodes, pnn_node, NULL);
        }
 
        return pnn_nodes;
 }
 
+static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
+{
+       const char *nodes_list;
+
+       /* read the nodes file */
+       nodes_list = getenv("CTDB_NODES");
+       if (nodes_list == NULL) {
+               nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
+                                            getenv("CTDB_BASE"));
+               if (nodes_list == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
+                       exit(1);
+               }
+       }
+
+       return read_pnn_node_file(mem_ctx, nodes_list);
+}
+
 /*
   show the PNN of the current node
   discover the pnn by loading the nodes file and try to bind to all
   addresses one at a time until the ip address is found.
  */
-static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
+static int find_node_xpnn(void)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
        struct pnn_node *pnn_nodes;
        struct pnn_node *pnn_node;
-
-       assert_single_node_only();
+       int pnn;
 
        pnn_nodes = read_nodes_file(mem_ctx);
        if (pnn_nodes == NULL) {
@@ -900,18 +932,10 @@ static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
        }
 
        for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
-               ctdb_sock_addr addr;
-
-               if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
-                       DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
+               if (ctdb_sys_have_ip(&pnn_node->addr)) {
+                       pnn = pnn_node->pnn;
                        talloc_free(mem_ctx);
-                       return -1;
-               }
-
-               if (ctdb_sys_have_ip(&addr)) {
-                       printf("PNN:%d\n", pnn_node->pnn);
-                       talloc_free(mem_ctx);
-                       return 0;
+                       return pnn;
                }
        }
 
@@ -920,6 +944,21 @@ static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
        return -1;
 }
 
+static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       uint32_t pnn;
+
+       assert_single_node_only();
+
+       pnn = find_node_xpnn();
+       if (pnn == -1) {
+               return -1;
+       }
+
+       printf("PNN:%d\n", pnn);
+       return 0;
+}
+
 /* Helpers for ctdb status
  */
 static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
@@ -949,14 +988,14 @@ static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_
 
 static void control_status_header_machine(void)
 {
-       printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
+       printm(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
               ":Inactive:PartiallyOnline:ThisNode:\n");
 }
 
 static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
                                    struct ctdb_node_and_flags *node)
 {
-       printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
+       printm(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
               ctdb_addr_to_str(&node->addr),
               !!(node->flags&NODE_FLAGS_DISCONNECTED),
               !!(node->flags&NODE_FLAGS_BANNED),
@@ -1118,10 +1157,30 @@ static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **
        return ret;
 }
 
-struct natgw_node {
-       struct natgw_node *next;
-       const char *addr;
-};
+static struct pnn_node *read_natgw_nodes_file(struct ctdb_context *ctdb,
+                                             TALLOC_CTX *mem_ctx)
+{
+       const char *natgw_list;
+       struct pnn_node *natgw_nodes = NULL;
+
+       natgw_list = getenv("CTDB_NATGW_NODES");
+       if (natgw_list == NULL) {
+               natgw_list = talloc_asprintf(mem_ctx, "%s/natgw_nodes",
+                                            getenv("CTDB_BASE"));
+               if (natgw_list == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
+                       exit(1);
+               }
+       }
+       /* The PNNs will be junk but they're not used */
+       natgw_nodes = read_pnn_node_file(mem_ctx, natgw_list);
+       if (natgw_nodes == NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to load natgw node list '%s'\n", natgw_list));
+       }
+       return natgw_nodes;
+}
+
 
 /* talloc off the existing nodemap... */
 static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
@@ -1131,10 +1190,43 @@ static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
                                nodemap->num * sizeof(struct ctdb_node_and_flags));
 }
 
+static struct ctdb_node_map *
+filter_nodemap_by_addrs(struct ctdb_context *ctdb,
+                       struct ctdb_node_map *nodemap,
+                       struct pnn_node *nodes)
+{
+       int i;
+       struct pnn_node *n;
+       struct ctdb_node_map *ret;
+
+       ret = talloc_nodemap(nodemap);
+       CTDB_NO_MEMORY_NULL(ctdb, ret);
+
+       ret->num = 0;
+
+       for (i = 0; i < nodemap->num; i++) {
+               for(n = nodes; n != NULL ; n = n->next) {
+                       if (ctdb_same_ip(&n->addr,
+                                        &nodemap->nodes[i].addr)) {
+                               break;
+                       }
+               }
+               if (n == NULL) {
+                       continue;
+               }
+
+               ret->nodes[ret->num] = nodemap->nodes[i];
+               ret->num++;
+       }
+
+       return ret;
+}
+
 static struct ctdb_node_map *
 filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
                               struct ctdb_node_map *nodemap,
-                              uint32_t required_capabilities)
+                              uint32_t required_capabilities,
+                              bool first_only)
 {
        int i;
        uint32_t capabilities;
@@ -1146,9 +1238,16 @@ filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
        ret->num = 0;
 
        for (i = 0; i < nodemap->num; i++) {
-               int res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
-                                                   nodemap->nodes[i].pnn,
-                                                   &capabilities);
+               int res;
+
+               /* Disconnected nodes have no capabilities! */
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
+                                               nodemap->nodes[i].pnn,
+                                               &capabilities);
                if (res != 0) {
                        DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
                                          nodemap->nodes[i].pnn));
@@ -1161,6 +1260,9 @@ filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
 
                ret->nodes[ret->num] = nodemap->nodes[i];
                ret->num++;
+               if (first_only) {
+                       break;
+               }
        }
 
        return ret;
@@ -1198,13 +1300,9 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        int i, ret;
-       const char *natgw_list;
-       int nlines;
-       char **lines;
-       struct natgw_node *natgw_nodes = NULL;
-       struct natgw_node *natgw_node;
-       struct ctdb_node_map *nodemap=NULL;
-       struct ctdb_node_map *cnodemap;
+       struct pnn_node *natgw_nodes = NULL;
+       struct ctdb_node_map *orig_nodemap=NULL;
+       struct ctdb_node_map *nodemap;
        uint32_t mypnn, pnn;
        const char *ip;
 
@@ -1223,82 +1321,24 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
        };
 
        /* read the natgw nodes file into a linked list */
-       natgw_list = getenv("CTDB_NATGW_NODES");
-       if (natgw_list == NULL) {
-               natgw_list = talloc_asprintf(tmp_ctx, "%s/natgw_nodes",
-                                            getenv("CTDB_BASE"));
-               if (natgw_list == NULL) {
-                       DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
-                       exit(1);
-               }
-       }
-       lines = file_lines_load(natgw_list, &nlines, ctdb);
-       if (lines == NULL) {
-               ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-       for (i=0;i<nlines;i++) {
-               char *node;
-
-               node = lines[i];
-               /* strip leading spaces */
-               while((*node == ' ') || (*node == '\t')) {
-                       node++;
-               }
-               if (*node == '#') {
-                       continue;
-               }
-               if (strcmp(node, "") == 0) {
-                       continue;
-               }
-               natgw_node = talloc(ctdb, struct natgw_node);
-               natgw_node->addr = talloc_strdup(natgw_node, node);
-               CTDB_NO_MEMORY(ctdb, natgw_node->addr);
-               natgw_node->next = natgw_nodes;
-               natgw_nodes = natgw_node;
+       natgw_nodes = read_natgw_nodes_file(ctdb, tmp_ctx);
+       if (natgw_nodes == NULL) {
+               ret = -1;
+               goto done;
        }
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
+                                  tmp_ctx, &orig_nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       /* Trim the nodemap so it only includes connected nodes in the
-        * current natgw group.
-        */
-       i=0;
-       while(i<nodemap->num) {
-               for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
-                       if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
-                               break;
-                       }
-               }
-
-               /* this node was not in the natgw so we just remove it from
-                * the list
-                */
-               if ((natgw_node == NULL) 
-               ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
-                       int j;
-
-                       for (j=i+1; j<nodemap->num; j++) {
-                               nodemap->nodes[j-1] = nodemap->nodes[j];
-                       }
-                       nodemap->num--;
-                       continue;
-               }
-
-               i++;
-       }
-
-       /* Get a nodemap that includes only the nodes with the NATGW
-        * capability */
-       cnodemap = filter_nodemap_by_capabilities(ctdb, nodemap,
-                                                 CTDB_CAP_NATGW);
-       if (cnodemap == NULL) {
+       /* Get a nodemap that includes only the nodes in the NATGW
+        * group */
+       nodemap = filter_nodemap_by_addrs(ctdb, orig_nodemap, natgw_nodes);
+       if (nodemap == NULL) {
                ret = -1;
                goto done;
        }
@@ -1306,9 +1346,12 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
        ret = 2; /* matches ENOENT */
        pnn = -1;
        ip = "0.0.0.0";
+       /* For each flag mask... */
        for (i = 0; exclude_flags[i] != 0; i++) {
+               /* ... get a nodemap that excludes nodes with with
+                * masked flags... */
                struct ctdb_node_map *t =
-                       filter_nodemap_by_flags(ctdb, cnodemap,
+                       filter_nodemap_by_flags(ctdb, nodemap,
                                                exclude_flags[i]);
                if (t == NULL) {
                        /* No memory */
@@ -1316,17 +1359,30 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                        goto done;
                }
                if (t->num > 0) {
-                       ret = 0;
-                       pnn = t->nodes[0].pnn;
-                       ip = ctdb_addr_to_str(&t->nodes[0].addr);
-                       break;
+                       /* ... and find the first node with the NATGW
+                        * capability */
+                       struct ctdb_node_map *n;
+                       n = filter_nodemap_by_capabilities(ctdb, t,
+                                                          CTDB_CAP_NATGW,
+                                                          true);
+                       if (n == NULL) {
+                               /* No memory */
+                               ret = -1;
+                               goto done;
+                       }
+                       if (n->num > 0) {
+                               ret = 0;
+                               pnn = n->nodes[0].pnn;
+                               ip = ctdb_addr_to_str(&n->nodes[0].addr);
+                               break;
+                       }
                }
                talloc_free(t);
        }
 
        if (options.machinereadable) {
-               printf(":Node:IP:\n");
-               printf(":%d:%s:\n", pnn, ip);
+               printm(":Node:IP:\n");
+               printm(":%d:%s:\n", pnn, ip);
        } else {
                printf("%d %s\n", pnn, ip);
        }
@@ -1378,8 +1434,14 @@ static int control_one_scriptstatus(struct ctdb_context *ctdb,
        }
 
        if (!options.machinereadable) {
+               int num_run = 0;
+               for (i=0; i<script_status->num_scripts; i++) {
+                       if (script_status->scripts[i].status != -ENOEXEC) {
+                               num_run++;
+                       }
+               }
                printf("%d scripts were executed last %s cycle\n",
-                      script_status->num_scripts,
+                      num_run,
                       ctdb_eventscript_call_names[type]);
        }
        for (i=0; i<script_status->num_scripts; i++) {
@@ -1401,7 +1463,7 @@ static int control_one_scriptstatus(struct ctdb_context *ctdb,
                        break;
                }
                if (options.machinereadable) {
-                       printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
+                       printm(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
                               ctdb_eventscript_call_names[type],
                               script_status->scripts[i].name,
                               script_status->scripts[i].status,
@@ -1477,7 +1539,7 @@ static int control_scriptstatus(struct ctdb_context *ctdb,
        }
 
        if (options.machinereadable) {
-               printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
+               printm(":Type:Name:Code:Status:Start:End:Error Output...:\n");
        }
 
        for (type = min; type < max; type++) {
@@ -1658,13 +1720,13 @@ static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char *
        }
 
        if (options.machinereadable){
-               printf(":source ip:port:destination ip:port:\n");
+               printm(":source ip:port:destination ip:port:\n");
                for (i=0;i<list->tickles.num;i++) {
                        if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
                                continue;
                        }
-                       printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
-                       printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
+                       printm(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
+                       printm(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
                }
        } else {
                printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
@@ -1784,6 +1846,7 @@ find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
        struct ctdb_all_public_ips *ips;
        struct ctdb_node_map *nodemap=NULL;
        int i, j, ret;
+       int pnn;
 
        ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
        if (ret != 0) {
@@ -1809,8 +1872,9 @@ find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
 
                for (j=0;j<ips->num;j++) {
                        if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
+                               pnn = nodemap->nodes[i].pnn;
                                talloc_free(tmp_ctx);
-                               return nodemap->nodes[i].pnn;
+                               return pnn;
                        }
                }
                talloc_free(ips);
@@ -2909,11 +2973,11 @@ static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
        }
 
        if (options.machinereadable){
-               printf(":Public IP:Node:");
+               printm(":Public IP:Node:");
                if (options.verbose){
-                       printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
+                       printm("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
                }
-               printf("\n");
+               printm("\n");
        } else {
                if (options.pnn == CTDB_BROADCAST_ALL) {
                        printf("Public IPs on ALL nodes\n");
@@ -2973,11 +3037,11 @@ static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
                }
 
                if (options.machinereadable){
-                       printf(":%s:%d:",
+                       printm(":%s:%d:",
                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
                                ips->ips[ips->num-i].pnn);
                        if (options.verbose){
-                               printf("%s:%s:%s:",
+                               printm("%s:%s:%s:",
                                        aciface?aciface:"",
                                        avifaces?avifaces:"",
                                        cifaces?cifaces:"");
@@ -3076,14 +3140,14 @@ static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        if (options.machinereadable){
-               printf(":Name:LinkStatus:References:\n");
+               printm(":Name:LinkStatus:References:\n");
        } else {
                printf("Interfaces on node %u\n", options.pnn);
        }
 
        for (i=0; i<ifaces->num; i++) {
                if (options.machinereadable){
-                       printf(":%s:%s:%u\n",
+                       printm(":%s:%s:%u:\n",
                               ifaces->ifaces[i].name,
                               ifaces->ifaces[i].link_state?"1":"0",
                               (unsigned int)ifaces->ifaces[i].references);
@@ -3447,23 +3511,10 @@ static int control_recover(struct ctdb_context *ctdb, int argc, const char **arg
 {
        int ret;
        uint32_t generation, next_generation;
-       bool force;
-
-       /* "force" option ignores freeze failure and forces recovery */
-       force = (argc == 1) && (strcasecmp(argv[0], "force") == 0);
 
        /* record the current generation number */
        generation = get_generation(ctdb);
 
-       ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
-       if (ret != 0) {
-               if (!force) {
-                       DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
-                       return ret;
-               }
-               DEBUG(DEBUG_WARNING, ("Unable to freeze node but proceeding because \"force\" option given\n"));
-       }
-
        ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
@@ -3499,8 +3550,8 @@ static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **
        if (!options.machinereadable){
                printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
        } else {
-               printf(":mode:\n");
-               printf(":%d:\n",monmode);
+               printm(":mode:\n");
+               printm(":%d:\n",monmode);
        }
        return 0;
 }
@@ -3526,8 +3577,8 @@ static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const ch
                printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
                printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
        } else {
-               printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
-               printf(":%d:%d:%d:%d:\n",
+               printm(":RECMASTER:LMASTER:LVS:NATGW:\n");
+               printm(":%d:%d:%d:%d:\n",
                        !!(capabilities&CTDB_CAP_RECMASTER),
                        !!(capabilities&CTDB_CAP_LMASTER),
                        !!(capabilities&CTDB_CAP_LVS),
@@ -3564,7 +3615,7 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
        }
 
        nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
-                                                CTDB_CAP_LVS);
+                                                CTDB_CAP_LVS, false);
        if (nodemap == NULL) {
                /* No memory */
                ret = -1;
@@ -3604,26 +3655,17 @@ done:
 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       struct ctdb_node_map *orig_nodemap=NULL;
-       struct ctdb_node_map *nodemap;
+       struct ctdb_node_map *nodemap=NULL;
        int i, ret;
 
        ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
-                                  tmp_ctx, &orig_nodemap);
+                                  tmp_ctx, &nodemap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
-                                                CTDB_CAP_LVS);
-       if (nodemap == NULL) {
-               /* No memory */
-               ret = -1;
-               goto done;
-       }
-
        for (i = 0; lvs_exclude_flags[i] != 0; i++) {
                struct ctdb_node_map *t =
                        filter_nodemap_by_flags(ctdb, nodemap,
@@ -3634,11 +3676,25 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
                        goto done;
                }
                if (t->num > 0) {
-                       ret = 0;
-                       printf(options.machinereadable ?
-                              "%d\n" : "Node %d is LVS master\n",
-                               t->nodes[0].pnn);
-                       goto done;
+                       struct ctdb_node_map *n;
+                       n = filter_nodemap_by_capabilities(ctdb,
+                                                          t,
+                                                          CTDB_CAP_LVS,
+                                                          true);
+                       if (n == NULL) {
+                               /* No memory */
+                               ret = -1;
+                               goto done;
+                       }
+                       if (n->num > 0) {
+                               ret = 0;
+                               if (options.machinereadable) {
+                                       printm("%d\n", n->nodes[0].pnn);
+                               } else {
+                                       printf("Node %d is LVS master\n", n->nodes[0].pnn);
+                               }
+                               goto done;
+                       }
                }
                talloc_free(t);
        }
@@ -3843,8 +3899,8 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
 
        printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
 
-       talloc_free(ctdb_db);
        talloc_free(tmp_ctx);
+       talloc_free(ctdb_db);
        return 0;
 }
 
@@ -3893,8 +3949,8 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
        }
 
        talloc_free(h);
-       talloc_free(ctdb_db);
        talloc_free(tmp_ctx);
+       talloc_free(ctdb_db);
        return 0;
 }
 
@@ -3965,10 +4021,10 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
                        talloc_free(tmp_ctx);
                        return -1;
                }
-               write(fd, data.dptr, data.dsize);
+               sys_write(fd, data.dptr, data.dsize);
                close(fd);
        } else {
-               write(1, data.dptr, data.dsize);
+               sys_write(1, data.dptr, data.dsize);
        }
 
        /* abort the transaction */
@@ -4029,16 +4085,16 @@ static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv
                        return -1;
                }
                if (options.verbose){
-                       write(fd, data.dptr, data.dsize);
+                       sys_write(fd, data.dptr, data.dsize);
                } else {
-                       write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+                       sys_write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
                }
                close(fd);
        } else {
                if (options.verbose){
-                       write(1, data.dptr, data.dsize);
+                       sys_write(1, data.dptr, data.dsize);
                } else {
-                       write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+                       sys_write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
                }
        }
 
@@ -4175,7 +4231,7 @@ static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv
                        talloc_free(tmp_ctx);
                        return -1;
                }
-               ret = read(fd, data.dptr, data.dsize);
+               ret = sys_read(fd, data.dptr, data.dsize);
                if (ret != data.dsize) {
                        DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
                        close(fd);
@@ -4679,7 +4735,7 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
        }
 
        if(options.machinereadable){
-               printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
+               printm(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
                for(i=0;i<dbmap->num;i++){
                        const char *path;
                        const char *name;
@@ -4697,7 +4753,7 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
                        persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
                        readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
                        sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
-                       printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
+                       printm(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
                               dbmap->dbs[i].dbid, name, path,
                               !!(persistent), !!(sticky),
                               !!(health), !!(readonly));
@@ -4898,6 +4954,11 @@ static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv
                DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
                return -1;
        }
+       if (ret == 1) {
+               DEBUG(DEBUG_WARNING,
+                     ("Setting obsolete tunable variable '%s'\n",
+                      name));
+       }
        return 0;
 }
 
@@ -4939,8 +5000,8 @@ static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **ar
                return ret;
        } else {
                if (options.machinereadable){
-                       printf(":Name:Level:\n");
-                       printf(":%s:%d:\n",get_debug_by_level(level),level);
+                       printm(":Name:Level:\n");
+                       printm(":%s:%d:\n",get_debug_by_level(level),level);
                } else {
                        printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
                }
@@ -4963,7 +5024,7 @@ static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **
        } else {
                if (options.machinereadable){
                        if (reclock != NULL) {
-                               printf("%s", reclock);
+                               printm("%s", reclock);
                        }
                } else {
                        if (reclock == NULL) {
@@ -5186,6 +5247,109 @@ static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
+/*
+ * detach from a database
+ */
+static int control_detach(struct ctdb_context *ctdb, int argc,
+                         const char **argv)
+{
+       uint32_t db_id;
+       uint8_t flags;
+       int ret, i, status = 0;
+       struct ctdb_node_map *nodemap = NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t recmode;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       assert_single_node_only();
+
+       ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn,
+                                   &recmode);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Database cannot be detached "
+                                 "when recovery is active\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
+                                  &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
+                                 options.pnn));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       for (i=0; i<nodemap->num; i++) {
+               uint32_t value;
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
+                       continue;
+               }
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       DEBUG(DEBUG_ERR, ("Database cannot be detached on "
+                                         "inactive (stopped or banned) node "
+                                         "%u\n", nodemap->nodes[i].pnn));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(),
+                                           nodemap->nodes[i].pnn,
+                                           "AllowClientDBAttach",
+                                           &value);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get tunable "
+                                         "AllowClientDBAttach from node %u\n",
+                                          nodemap->nodes[i].pnn));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               if (value == 1) {
+                       DEBUG(DEBUG_ERR, ("Database access is still active on "
+                                         "node %u. Set AllowClientDBAttach=0 "
+                                         "on all nodes.\n",
+                                         nodemap->nodes[i].pnn));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+       }
+
+       talloc_free(tmp_ctx);
+
+       for (i=0; i<argc; i++) {
+               if (!db_exists(ctdb, argv[i], &db_id, NULL, &flags)) {
+                       continue;
+               }
+
+               if (flags & CTDB_DB_FLAGS_PERSISTENT) {
+                       DEBUG(DEBUG_ERR, ("Persistent database '%s' "
+                                         "cannot be detached\n", argv[i]));
+                       status = -1;
+                       continue;
+               }
+
+               ret = ctdb_detach(ctdb, db_id);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Database '%s' detach failed\n",
+                                         argv[i]));
+                       status = ret;
+               }
+       }
+
+       return status;
+}
+
 /*
   set db priority
  */
@@ -5517,12 +5681,12 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
                goto done;
        }
        strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
-       ret = write(fh, &dbhdr, sizeof(dbhdr));
+       ret = sys_write(fh, &dbhdr, sizeof(dbhdr));
        if (ret == -1) {
                DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
                goto done;
        }
-       ret = write(fh, bd->records, bd->len);
+       ret = sys_write(fh, bd->records, bd->len);
        if (ret == -1) {
                DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
                goto done;
@@ -5578,7 +5742,7 @@ static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **a
                return -1;
        }
 
-       read(fh, &dbhdr, sizeof(dbhdr));
+       sys_read(fh, &dbhdr, sizeof(dbhdr));
        if (dbhdr.version != DB_VERSION) {
                DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
                close(fh);
@@ -5599,7 +5763,7 @@ static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **a
                talloc_free(tmp_ctx);
                return -1;
        }               
-       read(fh, outdata.dptr, outdata.dsize);
+       sys_read(fh, outdata.dptr, outdata.dsize);
        close(fh);
 
        tm = localtime(&dbhdr.timestamp);
@@ -5774,7 +5938,7 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
                return -1;
        }
 
-       read(fh, &dbhdr, sizeof(dbhdr));
+       sys_read(fh, &dbhdr, sizeof(dbhdr));
        if (dbhdr.version != DB_VERSION) {
                DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
                close(fh);
@@ -5790,7 +5954,7 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
                talloc_free(tmp_ctx);
                return -1;
        }
-       read(fh, outdata.dptr, outdata.dsize);
+       sys_read(fh, outdata.dptr, outdata.dsize);
        close(fh);
        m = (struct ctdb_marshall_buffer *)outdata.dptr;
 
@@ -6001,7 +6165,7 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
                talloc_free(tmp_ctx);
                return -1;
        }
-       write(1, data.dptr, data.dsize);
+       sys_write(1, data.dptr, data.dsize);
        talloc_free(tmp_ctx);
        return 0;
 }
@@ -6012,7 +6176,7 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
                             TDB_DATA data, void *private_data)
 {
-       write(1, data.dptr, data.dsize);
+       sys_write(1, data.dptr, data.dsize);
        exit(0);
 }
 
@@ -6134,16 +6298,11 @@ static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **a
        }
 
        for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
-               ctdb_sock_addr addr;
-               if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
-                       DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
-                       talloc_free(mem_ctx);
-                       return -1;
-               }
+               const char *addr = ctdb_addr_to_str(&pnn_node->addr);
                if (options.machinereadable){
-                       printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
+                       printm(":%d:%s:\n", pnn_node->pnn, addr);
                } else {
-                       printf("%s\n", pnn_node->addr);
+                       printf("%s\n", addr);
                }
        }
        talloc_free(mem_ctx);
@@ -6237,6 +6396,7 @@ static const struct {
        { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
        { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
        { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
+       { "detach",          control_detach,            false,  false,  "detach from a database",                 "<dbname|dbid> [<dbname|dbid> ...]" },
        { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
        { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
        { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
@@ -6322,7 +6482,8 @@ static void usage(void)
 "Usage: ctdb [options] <control>\n" \
 "Options:\n" \
 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
-"   -Y                 generate machinereadable output\n"
+"   -Y                 generate machine readable output\n"
+"   -x <char>          specify delimiter for machine readable output\n"
 "   -v                 generate verbose output\n"
 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
        printf("Controls:\n");
@@ -6349,12 +6510,15 @@ int main(int argc, const char *argv[])
 {
        struct ctdb_context *ctdb;
        char *nodestring = NULL;
+       int machineparsable = 0;
        struct poptOption popt_options[] = {
                POPT_AUTOHELP
                POPT_CTDB_CMDLINE
                { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
                { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
-               { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
+               { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machine readable output", NULL },
+               { NULL, 'x', POPT_ARG_STRING, &options.machineseparator, 0, "specify separator for machine readable output", "char" },
+               { NULL, 'X', POPT_ARG_NONE, &machineparsable, 0, "enable machine parsable output with separator |", NULL },
                { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
                { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
                { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
@@ -6412,6 +6576,23 @@ int main(int argc, const char *argv[])
                }
        }
 
+       if (machineparsable) {
+               options.machineseparator = "|";
+       }
+       if (options.machineseparator != NULL) {
+               if (strlen(options.machineseparator) != 1) {
+                       printf("Invalid separator \"%s\" - "
+                              "must be single character\n",
+                              options.machineseparator);
+                       exit(1);
+               }
+
+               /* -x implies -Y */
+               options.machinereadable = true;
+       } else if (options.machinereadable) {
+               options.machineseparator = ":";
+       }
+
        signal(SIGALRM, ctdb_alarm);
        alarm(options.maxruntime);
 
@@ -6449,7 +6630,14 @@ int main(int argc, const char *argv[])
        ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
 
        if (ctdb == NULL) {
+               uint32_t pnn;
                DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
+
+               pnn = find_node_xpnn();
+               if (pnn == -1) {
+                       DEBUG(DEBUG_ERR,
+                             ("Is this node part of a CTDB cluster?\n"));
+               }
                exit(1);
        }