util: Add extra max_size argument to file_lines_load()
[ctdb.git] / tools / ctdb.c
index e75a92239f7da32a93d87dd6ce91678e450e8cc9..608d50a3000953be14f0846441aa605d224d9a9f 100644 (file)
 #include "system/locale.h"
 #include "popt.h"
 #include "cmdline.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_version.h"
 #include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
 #include "../common/rb_tree.h"
 #include "db_wrap.h"
+#include "lib/util/dlinklist.h"
 
 #define ERR_TIMEOUT    20      /* timed out trying to reach node */
 #define ERR_NONODE     21      /* node does not exist */
 #define ERR_DISNODE    22      /* node is disconnected */
 
-struct ctdb_connection *ctdb_connection;
-
 static void usage(void);
 
 static struct {
@@ -44,6 +43,7 @@ static struct {
        uint32_t pnn;
        uint32_t *nodes;
        int machinereadable;
+       const char *machineseparator;
        int verbose;
        int maxruntime;
        int printemptyrecords;
@@ -53,18 +53,39 @@ static struct {
        int printrecordflags;
 } options;
 
+#define LONGTIMEOUT options.timelimit*10
+
 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
-#define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
+#define LONGTIMELIMIT() timeval_current_ofs(LONGTIMEOUT, 0)
 
-#ifdef CTDB_VERS
 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-#define STR(x) #x
-#define XSTR(x) STR(x)
-       printf("CTDB version: %s\n", XSTR(CTDB_VERS));
+       printf("CTDB version: %s\n", CTDB_VERSION_STRING);
        return 0;
 }
-#endif
+
+/* Like printf(3) but substitute for separator in format */
+static int printm(const char *format, ...) PRINTF_ATTRIBUTE(1,2);
+static int printm(const char *format, ...)
+{
+       va_list ap;
+       int ret;
+       size_t len = strlen(format);
+       char new_format[len+1];
+
+       strcpy(new_format, format);
+
+       if (options.machineseparator[0] != ':') {
+               all_string_sub(new_format,
+                              ":", options.machineseparator, len + 1);
+       }
+
+       va_start(ap, format);
+       ret = vprintf(new_format, ap);
+       va_end(ap);
+
+       return ret;
+}
 
 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                           \
                DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
@@ -72,6 +93,32 @@ static int control_version(struct ctdb_context *ctdb, int argc, const char **arg
                abort();                                                \
        }} while (0)
 
+static uint32_t getpnn(struct ctdb_context *ctdb)
+{
+       if ((options.pnn == CTDB_BROADCAST_ALL) ||
+           (options.pnn == CTDB_MULTICAST)) {
+               DEBUG(DEBUG_ERR,
+                     ("Cannot get PNN for node %u\n", options.pnn));
+               exit(1);
+       }
+
+       if (options.pnn == CTDB_CURRENT_NODE) {
+               return ctdb_get_pnn(ctdb);
+       } else {
+               return options.pnn;
+       }
+}
+
+static void assert_single_node_only(void)
+{
+       if ((options.pnn == CTDB_BROADCAST_ALL) ||
+           (options.pnn == CTDB_MULTICAST)) {
+               DEBUG(DEBUG_ERR,
+                     ("This control can not be applied to multiple PNNs\n"));
+               exit(1);
+       }
+}
+
 /* Pretty print the flags to a static buffer in human-readable format.
  * This never returns NULL!
  */
@@ -98,8 +145,9 @@ static const char *pretty_print_flags(uint32_t flags)
                        if (flags_str[0] == '\0') {
                                (void) strcpy(flags_str, flag_names[j].name);
                        } else {
-                               (void) strcat(flags_str, "|");
-                               (void) strcat(flags_str, flag_names[j].name);
+                               (void) strncat(flags_str, "|", sizeof(flags_str)-1);
+                               (void) strncat(flags_str, flag_names[j].name,
+                                              sizeof(flags_str)-1);
                        }
                }
        }
@@ -144,27 +192,34 @@ static TDB_DATA hextodata(TALLOC_CTX *mem_ctx, const char *str)
  * explicitly specified.
  */
 static bool parse_nodestring(struct ctdb_context *ctdb,
+                            TALLOC_CTX *mem_ctx,
                             const char * nodestring,
                             uint32_t current_pnn,
                             bool dd_ok,
                             uint32_t **nodes,
                             uint32_t *pnn_mode)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
        int n;
        uint32_t i;
        struct ctdb_node_map *nodemap;
-       
+       int ret;
+
        *nodes = NULL;
 
-       if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
+               talloc_free(tmp_ctx);
                exit(10);
        }
 
        if (nodestring != NULL) {
-               *nodes = talloc_array(ctdb, uint32_t, 0);
-               CTDB_NOMEM_ABORT(*nodes);
-               
+               *nodes = talloc_array(mem_ctx, uint32_t, 0);
+               if (*nodes == NULL) {
+                       goto failed;
+               }
+
                n = 0;
 
                if (strcmp(nodestring, "all") == 0) {
@@ -172,44 +227,58 @@ static bool parse_nodestring(struct ctdb_context *ctdb,
 
                        /* all */
                        for (i = 0; i < nodemap->num; i++) {
-                               if ((nodemap->nodes[i].flags & 
+                               if ((nodemap->nodes[i].flags &
                                     (NODE_FLAGS_DISCONNECTED |
                                      NODE_FLAGS_DELETED)) && !dd_ok) {
                                        continue;
                                }
-                               *nodes = talloc_realloc(ctdb, *nodes,
+                               *nodes = talloc_realloc(mem_ctx, *nodes,
                                                        uint32_t, n+1);
-                               CTDB_NOMEM_ABORT(*nodes);
+                               if (*nodes == NULL) {
+                                       goto failed;
+                               }
                                (*nodes)[n] = i;
                                n++;
                        }
                } else {
                        /* x{,y...} */
                        char *ns, *tok;
-                       
-                       ns = talloc_strdup(ctdb, nodestring);
+
+                       ns = talloc_strdup(tmp_ctx, nodestring);
                        tok = strtok(ns, ",");
                        while (tok != NULL) {
                                uint32_t pnn;
-                               i = (uint32_t)strtoul(tok, NULL, 0);
+                               char *endptr;
+                               i = (uint32_t)strtoul(tok, &endptr, 0);
+                               if (i == 0 && tok == endptr) {
+                                       DEBUG(DEBUG_ERR,
+                                             ("Invalid node %s\n", tok));
+                                       talloc_free(tmp_ctx);
+                                       exit(ERR_NONODE);
+                               }
                                if (i >= nodemap->num) {
                                        DEBUG(DEBUG_ERR, ("Node %u does not exist\n", i));
+                                       talloc_free(tmp_ctx);
                                        exit(ERR_NONODE);
                                }
                                if ((nodemap->nodes[i].flags & 
                                     (NODE_FLAGS_DISCONNECTED |
                                      NODE_FLAGS_DELETED)) && !dd_ok) {
                                        DEBUG(DEBUG_ERR, ("Node %u has status %s\n", i, pretty_print_flags(nodemap->nodes[i].flags)));
+                                       talloc_free(tmp_ctx);
                                        exit(ERR_DISNODE);
                                }
-                               if (!ctdb_getpnn(ctdb_connection, i, &pnn)) {
+                               if ((pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), i)) < 0) {
                                        DEBUG(DEBUG_ERR, ("Can not access node %u. Node is not operational.\n", i));
+                                       talloc_free(tmp_ctx);
                                        exit(10);
                                }
 
-                               *nodes = talloc_realloc(ctdb, *nodes,
+                               *nodes = talloc_realloc(mem_ctx, *nodes,
                                                        uint32_t, n+1);
-                               CTDB_NOMEM_ABORT(*nodes);
+                               if (*nodes == NULL) {
+                                       goto failed;
+                               }
 
                                (*nodes)[n] = i;
                                n++;
@@ -226,48 +295,90 @@ static bool parse_nodestring(struct ctdb_context *ctdb,
                }
        } else {
                /* default - no nodes specified */
-               *nodes = talloc_array(ctdb, uint32_t, 1);
-               CTDB_NOMEM_ABORT(*nodes);
+               *nodes = talloc_array(mem_ctx, uint32_t, 1);
+               if (*nodes == NULL) {
+                       goto failed;
+               }
                *pnn_mode = CTDB_CURRENT_NODE;
 
-               if (!ctdb_getpnn(ctdb_connection, current_pnn,
-                                &((*nodes)[0]))) {
-                       return false;
+               if (((*nodes)[0] = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), current_pnn)) < 0) {
+                       goto failed;
                }
        }
 
-       ctdb_free_nodemap(nodemap);
-
+       talloc_free(tmp_ctx);
        return true;
+
+failed:
+       talloc_free(tmp_ctx);
+       return false;
 }
 
 /*
  check if a database exists
 */
-static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
+static bool db_exists(struct ctdb_context *ctdb, const char *dbarg,
+                     uint32_t *dbid, const char **dbname, uint8_t *flags)
 {
        int i, ret;
        struct ctdb_dbid_map *dbmap=NULL;
+       bool dbid_given = false, found = false;
+       uint32_t id;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       const char *name;
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
+       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return -1;
+               goto fail;
        }
 
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
+       if (strncmp(dbarg, "0x", 2) == 0) {
+               id = strtoul(dbarg, NULL, 0);
+               dbid_given = true;
+       }
 
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
-               if (!strcmp(name, db_name)) {
-                       if (persistent) {
-                               *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+       for(i=0; i<dbmap->num; i++) {
+               if (dbid_given) {
+                       if (id == dbmap->dbs[i].dbid) {
+                               found = true;
+                               break;
+                       }
+               } else {
+                       ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
+                               goto fail;
+                       }
+
+                       if (strcmp(name, dbarg) == 0) {
+                               id = dbmap->dbs[i].dbid;
+                               found = true;
+                               break;
                        }
-                       return 0;
                }
        }
 
-       return -1;
+       if (found && dbid_given && dbname != NULL) {
+               ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
+                       found = false;
+                       goto fail;
+               }
+       }
+
+       if (found) {
+               if (dbid) *dbid = id;
+               if (dbname) *dbname = talloc_strdup(ctdb, name);
+               if (flags) *flags = dbmap->dbs[i].flags;
+       } else {
+               DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
+       }
+
+fail:
+       talloc_free(tmp_ctx);
+       return found;
 }
 
 /*
@@ -334,10 +445,12 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                STATISTICS_FIELD(timeouts.call),
                STATISTICS_FIELD(timeouts.control),
                STATISTICS_FIELD(timeouts.traverse),
+               STATISTICS_FIELD(locks.num_calls),
+               STATISTICS_FIELD(locks.num_current),
+               STATISTICS_FIELD(locks.num_pending),
+               STATISTICS_FIELD(locks.num_failed),
                STATISTICS_FIELD(total_calls),
                STATISTICS_FIELD(pending_calls),
-               STATISTICS_FIELD(lockwait_calls),
-               STATISTICS_FIELD(pending_lockwait_calls),
                STATISTICS_FIELD(childwrite_calls),
                STATISTICS_FIELD(pending_childwrite_calls),
                STATISTICS_FIELD(memory_used),
@@ -357,69 +470,64 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
 
        if (options.machinereadable){
                if (show_header) {
-                       printf("CTDB version:");
-                       printf("Current time of statistics:");
-                       printf("Statistics collected since:");
+                       printm("CTDB version:");
+                       printm("Current time of statistics:");
+                       printm("Statistics collected since:");
                        for (i=0;i<ARRAY_SIZE(fields);i++) {
-                               printf("%s:", fields[i].name);
+                               printm("%s:", fields[i].name);
                        }
-                       printf("num_reclock_ctdbd_latency:");
-                       printf("min_reclock_ctdbd_latency:");
-                       printf("avg_reclock_ctdbd_latency:");
-                       printf("max_reclock_ctdbd_latency:");
-
-                       printf("num_reclock_recd_latency:");
-                       printf("min_reclock_recd_latency:");
-                       printf("avg_reclock_recd_latency:");
-                       printf("max_reclock_recd_latency:");
-
-                       printf("num_call_latency:");
-                       printf("min_call_latency:");
-                       printf("avg_call_latency:");
-                       printf("max_call_latency:");
-
-                       printf("num_lockwait_latency:");
-                       printf("min_lockwait_latency:");
-                       printf("avg_lockwait_latency:");
-                       printf("max_lockwait_latency:");
-
-                       printf("num_childwrite_latency:");
-                       printf("min_childwrite_latency:");
-                       printf("avg_childwrite_latency:");
-                       printf("max_childwrite_latency:");
-                       printf("\n");
-               }
-               printf("%d:", CTDB_VERSION);
-               printf("%d:", (int)s->statistics_current_time.tv_sec);
-               printf("%d:", (int)s->statistics_start_time.tv_sec);
+                       printm("num_reclock_ctdbd_latency:");
+                       printm("min_reclock_ctdbd_latency:");
+                       printm("avg_reclock_ctdbd_latency:");
+                       printm("max_reclock_ctdbd_latency:");
+
+                       printm("num_reclock_recd_latency:");
+                       printm("min_reclock_recd_latency:");
+                       printm("avg_reclock_recd_latency:");
+                       printm("max_reclock_recd_latency:");
+
+                       printm("num_call_latency:");
+                       printm("min_call_latency:");
+                       printm("avg_call_latency:");
+                       printm("max_call_latency:");
+
+                       printm("num_lockwait_latency:");
+                       printm("min_lockwait_latency:");
+                       printm("avg_lockwait_latency:");
+                       printm("max_lockwait_latency:");
+
+                       printm("num_childwrite_latency:");
+                       printm("min_childwrite_latency:");
+                       printm("avg_childwrite_latency:");
+                       printm("max_childwrite_latency:");
+                       printm("\n");
+               }
+               printm("%d:", CTDB_VERSION);
+               printm("%d:", (int)s->statistics_current_time.tv_sec);
+               printm("%d:", (int)s->statistics_start_time.tv_sec);
                for (i=0;i<ARRAY_SIZE(fields);i++) {
-                       printf("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
-               }
-               printf("%d:", s->reclock.ctdbd.num);
-               printf("%.6f:", s->reclock.ctdbd.min);
-               printf("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
-               printf("%.6f:", s->reclock.ctdbd.max);
-
-               printf("%d:", s->reclock.recd.num);
-               printf("%.6f:", s->reclock.recd.min);
-               printf("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
-               printf("%.6f:", s->reclock.recd.max);
-
-               printf("%d:", s->call_latency.num);
-               printf("%.6f:", s->call_latency.min);
-               printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
-               printf("%.6f:", s->call_latency.max);
-
-               printf("%d:", s->lockwait_latency.num);
-               printf("%.6f:", s->lockwait_latency.min);
-               printf("%.6f:", s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0);
-               printf("%.6f:", s->lockwait_latency.max);
-
-               printf("%d:", s->childwrite_latency.num);
-               printf("%.6f:", s->childwrite_latency.min);
-               printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
-               printf("%.6f:", s->childwrite_latency.max);
-               printf("\n");
+                       printm("%d:", *(uint32_t *)(fields[i].offset+(uint8_t *)s));
+               }
+               printm("%d:", s->reclock.ctdbd.num);
+               printm("%.6f:", s->reclock.ctdbd.min);
+               printm("%.6f:", s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0);
+               printm("%.6f:", s->reclock.ctdbd.max);
+
+               printm("%d:", s->reclock.recd.num);
+               printm("%.6f:", s->reclock.recd.min);
+               printm("%.6f:", s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0);
+               printm("%.6f:", s->reclock.recd.max);
+
+               printm("%d:", s->call_latency.num);
+               printm("%.6f:", s->call_latency.min);
+               printm("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
+               printm("%.6f:", s->call_latency.max);
+
+               printm("%d:", s->childwrite_latency.num);
+               printm("%.6f:", s->childwrite_latency.min);
+               printm("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
+               printm("%.6f:", s->childwrite_latency.max);
+               printm("\n");
        } else {
                printf("CTDB version %u\n", CTDB_VERSION);
                printf("Current time of statistics  :                %s", ctime(&s->statistics_current_time.tv_sec));
@@ -446,12 +554,18 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                        printf(" %d", s->hop_count_bucket[i]);
                }
                printf("\n");
-               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd       MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
+               printf(" lock_buckets:");
+               for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+                       printf(" %d", s->locks.buckets[i]);
+               }
+               printf("\n");
+               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
+
+               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
 
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
 
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
-               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "lockwait_latency   MIN/AVG/MAX", s->lockwait_latency.min, s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0, s->lockwait_latency.max, s->lockwait_latency.num);
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
        }
 
@@ -492,8 +606,6 @@ static int control_statistics_all(struct ctdb_context *ctdb)
                        MAX(statistics.max_hop_count, s1.max_hop_count);
                statistics.call_latency.max = 
                        MAX(statistics.call_latency.max, s1.call_latency.max);
-               statistics.lockwait_latency.max = 
-                       MAX(statistics.lockwait_latency.max, s1.lockwait_latency.max);
        }
        talloc_free(nodes);
        printf("Gathered statistics for %u nodes\n", num_nodes);
@@ -548,6 +660,8 @@ static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
        struct ctdb_statistics_wire *stats;
        int i, num_records = -1;
 
+       assert_single_node_only();
+
        if (argc ==1) {
                num_records = atoi(argv[0]) - 1;
        }
@@ -576,60 +690,86 @@ static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       struct ctdb_db_statistics *dbstatistics;
-       struct ctdb_dbid_map *dbmap=NULL;
-       int i, ret;
+       struct ctdb_db_statistics *dbstat;
+       int i;
+       uint32_t db_id;
+       int num_hot_keys;
+       int ret;
 
        if (argc < 1) {
                usage();
        }
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return ret;
-       }
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-               if(!strcmp(argv[0], name)){
-                       talloc_free(discard_const(name));
-                       break;
-               }
-               talloc_free(discard_const(name));
-       }
-       if (i == dbmap->num) {
-               DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-               talloc_free(tmp_ctx);
+       if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
                return -1;
        }
 
-       if (!ctdb_getdbstat(ctdb_connection, options.pnn, dbmap->dbs[i].dbid, &dbstatistics)) {
+       ret = ctdb_ctrl_dbstatistics(ctdb, options.pnn, db_id, tmp_ctx, &dbstat);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       printf("DB Statistics:\n");
-       printf("RO Delegations: %d\n", dbstatistics->db_ro_delegations);
-       printf("RO Revokes:     %d\n", dbstatistics->db_ro_revokes);
-       printf(" hop_count_buckets:");
-       for (i=0;i<MAX_COUNT_BUCKETS;i++) {
-               printf(" %d", dbstatistics->hop_count_bucket[i]);
+       printf("DB Statistics: %s\n", argv[0]);
+       printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
+               dbstat->db_ro_delegations);
+       printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
+               dbstat->db_ro_delegations);
+       printf(" %s\n", "locks");
+       printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
+               dbstat->locks.num_calls);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
+               dbstat->locks.num_failed);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
+               dbstat->locks.num_current);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
+               dbstat->locks.num_pending);
+       printf(" %s", "hop_count_buckets:");
+       for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+               printf(" %d", dbstat->hop_count_bucket[i]);
+       }
+       printf("\n");
+       printf(" %s", "lock_buckets:");
+       for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+               printf(" %d", dbstat->locks.buckets[i]);
        }
        printf("\n");
-       printf("Num Hot Keys:     %d\n", dbstatistics->num_hot_keys);
-       for (i = 0; i < dbstatistics->num_hot_keys; i++) {
+       printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
+               "locks_latency      MIN/AVG/MAX",
+               dbstat->locks.latency.min,
+               (dbstat->locks.latency.num ?
+                dbstat->locks.latency.total /dbstat->locks.latency.num :
+                0.0),
+               dbstat->locks.latency.max,
+               dbstat->locks.latency.num);
+       printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
+               "vacuum_latency     MIN/AVG/MAX",
+               dbstat->vacuum.latency.min,
+               (dbstat->vacuum.latency.num ?
+                dbstat->vacuum.latency.total /dbstat->vacuum.latency.num :
+                0.0),
+               dbstat->vacuum.latency.max,
+               dbstat->vacuum.latency.num);
+       num_hot_keys = 0;
+       for (i=0; i<dbstat->num_hot_keys; i++) {
+               if (dbstat->hot_keys[i].count > 0) {
+                       num_hot_keys++;
+               }
+       }
+       dbstat->num_hot_keys = num_hot_keys;
+
+       printf(" Num Hot Keys:     %d\n", dbstat->num_hot_keys);
+       for (i = 0; i < dbstat->num_hot_keys; i++) {
                int j;
-               printf("Count:%d Key:", dbstatistics->hot_keys[i].count);
-               for (j = 0; j < dbstatistics->hot_keys[i].key.dsize; j++) {
-                       printf("%02x", dbstatistics->hot_keys[i].key.dptr[j]&0xff);
+               printf("     Count:%d Key:", dbstat->hot_keys[i].count);
+               for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
+                       printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
                }
                printf("\n");
        }
 
-       ctdb_free_dbstat(dbstatistics);
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -649,8 +789,8 @@ static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        if (options.machinereadable){
-               printf(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
-               printf(":%u:%u:%u:%lf\n",
+               printm(":Current Node Time:Ctdb Start Time:Last Recovery/Failover Time:Last Recovery/IPFailover Duration:\n");
+               printm(":%u:%u:%u:%lf\n",
                        (unsigned int)uptime->current_time.tv_sec,
                        (unsigned int)uptime->ctdbd_start_time.tv_sec,
                        (unsigned int)uptime->last_recovery_finished.tv_sec,
@@ -695,13 +835,8 @@ static int control_uptime(struct ctdb_context *ctdb, int argc, const char **argv
 static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t mypnn;
-       bool ret;
 
-       ret = ctdb_getpnn(ctdb_connection, options.pnn, &mypnn);
-       if (!ret) {
-               DEBUG(DEBUG_ERR, ("Unable to get pnn from node."));
-               return -1;
-       }
+       mypnn = getpnn(ctdb);
 
        printf("PNN:%d\n", mypnn);
        return 0;
@@ -709,33 +844,24 @@ static int control_pnn(struct ctdb_context *ctdb, int argc, const char **argv)
 
 
 struct pnn_node {
-       struct pnn_node *next;
-       const char *addr;
+       struct pnn_node *next, *prev;
+       ctdb_sock_addr addr;
        int pnn;
 };
 
-static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
+static struct pnn_node *read_pnn_node_file(TALLOC_CTX *mem_ctx,
+                                          const char *file)
 {
-       const char *nodes_list;
        int nlines;
        char **lines;
        int i, pnn;
        struct pnn_node *pnn_nodes = NULL;
        struct pnn_node *pnn_node;
-       struct pnn_node *tmp_node;
 
-       /* read the nodes file */
-       nodes_list = getenv("CTDB_NODES");
-       if (nodes_list == NULL) {
-               nodes_list = "/etc/ctdb/nodes";
-       }
-       lines = file_lines_load(nodes_list, &nlines, mem_ctx);
+       lines = file_lines_load(file, &nlines, 0, mem_ctx);
        if (lines == NULL) {
                return NULL;
        }
-       while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
-               nlines--;
-       }
        for (i=0, pnn=0; i<nlines; i++) {
                char *node;
 
@@ -753,35 +879,50 @@ static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
                }
                pnn_node = talloc(mem_ctx, struct pnn_node);
                pnn_node->pnn = pnn++;
-               pnn_node->addr = talloc_strdup(pnn_node, node);
-               pnn_node->next = pnn_nodes;
-               pnn_nodes = pnn_node;
-       }
 
-       /* swap them around so we return them in incrementing order */
-       pnn_node = pnn_nodes;
-       pnn_nodes = NULL;
-       while (pnn_node) {
-               tmp_node = pnn_node;
-               pnn_node = pnn_node->next;
+               if (!parse_ip(node, NULL, 0, &pnn_node->addr)) {
+                       DEBUG(DEBUG_ERR,
+                             ("Invalid IP address '%s' in file %s\n",
+                              node, file));
+                       /* Caller will free mem_ctx */
+                       return NULL;
+               }
 
-               tmp_node->next = pnn_nodes;
-               pnn_nodes = tmp_node;
+               DLIST_ADD_END(pnn_nodes, pnn_node, NULL);
        }
 
        return pnn_nodes;
 }
 
+static struct pnn_node *read_nodes_file(TALLOC_CTX *mem_ctx)
+{
+       const char *nodes_list;
+
+       /* read the nodes file */
+       nodes_list = getenv("CTDB_NODES");
+       if (nodes_list == NULL) {
+               nodes_list = talloc_asprintf(mem_ctx, "%s/nodes",
+                                            getenv("CTDB_BASE"));
+               if (nodes_list == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
+                       exit(1);
+               }
+       }
+
+       return read_pnn_node_file(mem_ctx, nodes_list);
+}
+
 /*
   show the PNN of the current node
   discover the pnn by loading the nodes file and try to bind to all
   addresses one at a time until the ip address is found.
  */
-static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
+static int find_node_xpnn(void)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
        struct pnn_node *pnn_nodes;
        struct pnn_node *pnn_node;
+       int pnn;
 
        pnn_nodes = read_nodes_file(mem_ctx);
        if (pnn_nodes == NULL) {
@@ -791,18 +932,10 @@ static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
        }
 
        for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
-               ctdb_sock_addr addr;
-
-               if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
-                       DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
-                       talloc_free(mem_ctx);
-                       return -1;
-               }
-
-               if (ctdb_sys_have_ip(&addr)) {
-                       printf("PNN:%d\n", pnn_node->pnn);
+               if (ctdb_sys_have_ip(&pnn_node->addr)) {
+                       pnn = pnn_node->pnn;
                        talloc_free(mem_ctx);
-                       return 0;
+                       return pnn;
                }
        }
 
@@ -811,17 +944,34 @@ static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
        return -1;
 }
 
+static int control_xpnn(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       uint32_t pnn;
+
+       assert_single_node_only();
+
+       pnn = find_node_xpnn();
+       if (pnn == -1) {
+               return -1;
+       }
+
+       printf("PNN:%d\n", pnn);
+       return 0;
+}
+
 /* Helpers for ctdb status
  */
-static bool is_partially_online(struct ctdb_node_and_flags *node)
+static bool is_partially_online(struct ctdb_context *ctdb, struct ctdb_node_and_flags *node)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
        int j;
        bool ret = false;
 
        if (node->flags == 0) {
-               struct ctdb_ifaces_list *ifaces;
+               struct ctdb_control_get_ifaces *ifaces;
 
-               if (ctdb_getifaces(ctdb_connection, node->pnn, &ifaces)) {
+               if (ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), node->pnn,
+                                        tmp_ctx, &ifaces) == 0) {
                        for (j=0; j < ifaces->num; j++) {
                                if (ifaces->ifaces[j].link_state != 0) {
                                        continue;
@@ -829,22 +979,23 @@ static bool is_partially_online(struct ctdb_node_and_flags *node)
                                ret = true;
                                break;
                        }
-                       ctdb_free_ifaces(ifaces);
                }
        }
+       talloc_free(tmp_ctx);
 
        return ret;
 }
 
 static void control_status_header_machine(void)
 {
-       printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
+       printm(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
               ":Inactive:PartiallyOnline:ThisNode:\n");
 }
 
-static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
+static int control_status_1_machine(struct ctdb_context *ctdb, int mypnn,
+                                   struct ctdb_node_and_flags *node)
 {
-       printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
+       printm(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
               ctdb_addr_to_str(&node->addr),
               !!(node->flags&NODE_FLAGS_DISCONNECTED),
               !!(node->flags&NODE_FLAGS_BANNED),
@@ -852,17 +1003,18 @@ static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
               !!(node->flags&NODE_FLAGS_UNHEALTHY),
               !!(node->flags&NODE_FLAGS_STOPPED),
               !!(node->flags&NODE_FLAGS_INACTIVE),
-              is_partially_online(node) ? 1 : 0,
+              is_partially_online(ctdb, node) ? 1 : 0,
               (node->pnn == mypnn)?'Y':'N');
 
        return node->flags;
 }
 
-static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
+static int control_status_1_human(struct ctdb_context *ctdb, int mypnn,
+                                 struct ctdb_node_and_flags *node)
 {
        printf("pnn:%d %-16s %s%s\n", node->pnn,
               ctdb_addr_to_str(&node->addr),
-              is_partially_online(node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
+              is_partially_online(ctdb, node) ? "PARTIALLYONLINE" : pretty_print_flags(node->flags),
               node->pnn == mypnn?" (THIS NODE)":"");
 
        return node->flags;
@@ -873,17 +1025,20 @@ static int control_status_1_human(int mypnn, struct ctdb_node_and_flags *node)
  */
 static int control_status(struct ctdb_context *ctdb, int argc, const char **argv)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        int i;
        struct ctdb_vnn_map *vnnmap=NULL;
        struct ctdb_node_map *nodemap=NULL;
        uint32_t recmode, recmaster, mypnn;
+       int num_deleted_nodes = 0;
+       int ret;
 
-       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
-               return -1;
-       }
+       mypnn = getpnn(ctdb);
 
-       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
 
@@ -893,22 +1048,35 @@ static int control_status(struct ctdb_context *ctdb, int argc, const char **argv
                        if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                                continue;
                        }
-                       (void) control_status_1_machine(mypnn,
+                       (void) control_status_1_machine(ctdb, mypnn,
                                                        &nodemap->nodes[i]);
                }
+               talloc_free(tmp_ctx);
                return 0;
        }
 
-       printf("Number of nodes:%d\n", nodemap->num);
+       for (i=0; i<nodemap->num; i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
+                       num_deleted_nodes++;
+               }
+       }
+       if (num_deleted_nodes == 0) {
+               printf("Number of nodes:%d\n", nodemap->num);
+       } else {
+               printf("Number of nodes:%d (including %d deleted nodes)\n",
+                      nodemap->num, num_deleted_nodes);
+       }
        for(i=0;i<nodemap->num;i++){
                if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                        continue;
                }
-               (void) control_status_1_human(mypnn, &nodemap->nodes[i]);
+               (void) control_status_1_human(ctdb, mypnn, &nodemap->nodes[i]);
        }
 
-       if (!ctdb_getvnnmap(ctdb_connection, options.pnn, &vnnmap)) {
+       ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &vnnmap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get vnnmap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
        if (vnnmap->generation == INVALID_GENERATION) {
@@ -920,25 +1088,30 @@ static int control_status(struct ctdb_context *ctdb, int argc, const char **argv
        for(i=0;i<vnnmap->size;i++){
                printf("hash:%d lmaster:%d\n", i, vnnmap->map[i]);
        }
-       ctdb_free_vnnmap(vnnmap);
 
-       if (!ctdb_getrecmode(ctdb_connection, options.pnn, &recmode)) {
+       ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmode);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
        printf("Recovery mode:%s (%d)\n",recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"RECOVERY",recmode);
 
-       if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
+       ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), options.pnn, &recmaster);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
        printf("Recovery master:%d\n",recmaster);
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
 static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **argv)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        int i, ret;
        struct ctdb_node_map *nodemap=NULL;
        uint32_t * nodes;
@@ -948,7 +1121,7 @@ static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **
                usage();
        }
 
-       if (!parse_nodestring(ctdb, argc == 1 ? argv[0] : NULL,
+       if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
                              options.pnn, true, &nodes, &pnn_mode)) {
                return -1;
        }
@@ -959,13 +1132,12 @@ static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **
                printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
        }
 
-       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
-               DEBUG(DEBUG_ERR, ("Unable to get PNN from local node\n"));
-               return -1;
-       }
+       mypnn = getpnn(ctdb);
 
-       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
 
@@ -973,44 +1145,152 @@ static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **
 
        for (i = 0; i < talloc_array_length(nodes); i++) {
                if (options.machinereadable) {
-                       ret |= control_status_1_machine(mypnn,
+                       ret |= control_status_1_machine(ctdb, mypnn,
                                                        &nodemap->nodes[nodes[i]]);
                } else {
-                       ret |= control_status_1_human(mypnn,
+                       ret |= control_status_1_human(ctdb, mypnn,
                                                      &nodemap->nodes[nodes[i]]);
                }
        }
+
+       talloc_free(tmp_ctx);
        return ret;
 }
 
-struct natgw_node {
-       struct natgw_node *next;
-       const char *addr;
-};
+static struct pnn_node *read_natgw_nodes_file(struct ctdb_context *ctdb,
+                                             TALLOC_CTX *mem_ctx)
+{
+       const char *natgw_list;
+       struct pnn_node *natgw_nodes = NULL;
+
+       natgw_list = getenv("CTDB_NATGW_NODES");
+       if (natgw_list == NULL) {
+               natgw_list = talloc_asprintf(mem_ctx, "%s/natgw_nodes",
+                                            getenv("CTDB_BASE"));
+               if (natgw_list == NULL) {
+                       DEBUG(DEBUG_ALERT,(__location__ " Out of memory\n"));
+                       exit(1);
+               }
+       }
+       /* The PNNs will be junk but they're not used */
+       natgw_nodes = read_pnn_node_file(mem_ctx, natgw_list);
+       if (natgw_nodes == NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to load natgw node list '%s'\n", natgw_list));
+       }
+       return natgw_nodes;
+}
+
+
+/* talloc off the existing nodemap... */
+static struct ctdb_node_map *talloc_nodemap(struct ctdb_node_map *nodemap)
+{
+       return talloc_zero_size(nodemap,
+                               offsetof(struct ctdb_node_map, nodes) +
+                               nodemap->num * sizeof(struct ctdb_node_and_flags));
+}
 
-static int find_natgw(struct ctdb_context *ctdb,
-                      struct ctdb_node_map *nodemap, uint32_t flags,
-                      uint32_t *pnn, const char **ip)
+static struct ctdb_node_map *
+filter_nodemap_by_addrs(struct ctdb_context *ctdb,
+                       struct ctdb_node_map *nodemap,
+                       struct pnn_node *nodes)
 {
        int i;
-       uint32_t capabilities;
+       struct pnn_node *n;
+       struct ctdb_node_map *ret;
 
-       for (i=0;i<nodemap->num;i++) {
-               if (!(nodemap->nodes[i].flags & flags)) {
-                       if (!ctdb_getcapabilities(ctdb_connection, nodemap->nodes[i].pnn, &capabilities)) {
-                               DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                               return -1;
-                       }
-                       if (!(capabilities&CTDB_CAP_NATGW)) {
-                               continue;
+       ret = talloc_nodemap(nodemap);
+       CTDB_NO_MEMORY_NULL(ctdb, ret);
+
+       ret->num = 0;
+
+       for (i = 0; i < nodemap->num; i++) {
+               for(n = nodes; n != NULL ; n = n->next) {
+                       if (ctdb_same_ip(&n->addr,
+                                        &nodemap->nodes[i].addr)) {
+                               break;
                        }
-                       *pnn = nodemap->nodes[i].pnn;
-                       *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
-                       return 0;
+               }
+               if (n == NULL) {
+                       continue;
+               }
+
+               ret->nodes[ret->num] = nodemap->nodes[i];
+               ret->num++;
+       }
+
+       return ret;
+}
+
+static struct ctdb_node_map *
+filter_nodemap_by_capabilities(struct ctdb_context *ctdb,
+                              struct ctdb_node_map *nodemap,
+                              uint32_t required_capabilities,
+                              bool first_only)
+{
+       int i;
+       uint32_t capabilities;
+       struct ctdb_node_map *ret;
+
+       ret = talloc_nodemap(nodemap);
+       CTDB_NO_MEMORY_NULL(ctdb, ret);
+
+       ret->num = 0;
+
+       for (i = 0; i < nodemap->num; i++) {
+               int res;
+
+               /* Disconnected nodes have no capabilities! */
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               res = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(),
+                                               nodemap->nodes[i].pnn,
+                                               &capabilities);
+               if (res != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n",
+                                         nodemap->nodes[i].pnn));
+                       talloc_free(ret);
+                       return NULL;
+               }
+               if (!(capabilities & required_capabilities)) {
+                       continue;
+               }
+
+               ret->nodes[ret->num] = nodemap->nodes[i];
+               ret->num++;
+               if (first_only) {
+                       break;
                }
        }
 
-       return 2; /* matches ENOENT */
+       return ret;
+}
+
+static struct ctdb_node_map *
+filter_nodemap_by_flags(struct ctdb_context *ctdb,
+                       struct ctdb_node_map *nodemap,
+                       uint32_t flags_mask)
+{
+       int i;
+       struct ctdb_node_map *ret;
+
+       ret = talloc_nodemap(nodemap);
+       CTDB_NO_MEMORY_NULL(ctdb, ret);
+
+       ret->num = 0;
+
+       for (i = 0; i < nodemap->num; i++) {
+               if (nodemap->nodes[i].flags & flags_mask) {
+                       continue;
+               }
+
+               ret->nodes[ret->num] = nodemap->nodes[i];
+               ret->num++;
+       }
+
+       return ret;
 }
 
 /*
@@ -1018,13 +1298,11 @@ static int find_natgw(struct ctdb_context *ctdb,
  */
 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        int i, ret;
-       const char *natgw_list;
-       int nlines;
-       char **lines;
-       struct natgw_node *natgw_nodes = NULL;
-       struct natgw_node *natgw_node;
-       struct ctdb_node_map *nodemap=NULL;
+       struct pnn_node *natgw_nodes = NULL;
+       struct ctdb_node_map *orig_nodemap=NULL;
+       struct ctdb_node_map *nodemap;
        uint32_t mypnn, pnn;
        const char *ip;
 
@@ -1043,103 +1321,74 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
        };
 
        /* read the natgw nodes file into a linked list */
-       natgw_list = getenv("CTDB_NATGW_NODES");
-       if (natgw_list == NULL) {
-               natgw_list = "/etc/ctdb/natgw_nodes";
+       natgw_nodes = read_natgw_nodes_file(ctdb, tmp_ctx);
+       if (natgw_nodes == NULL) {
+               ret = -1;
+               goto done;
        }
-       lines = file_lines_load(natgw_list, &nlines, ctdb);
-       if (lines == NULL) {
-               ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
+                                  tmp_ctx, &orig_nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
+               talloc_free(tmp_ctx);
                return -1;
        }
-       for (i=0;i<nlines;i++) {
-               char *node;
 
-               node = lines[i];
-               /* strip leading spaces */
-               while((*node == ' ') || (*node == '\t')) {
-                       node++;
-               }
-               if (*node == '#') {
-                       continue;
-               }
-               if (strcmp(node, "") == 0) {
-                       continue;
-               }
-               natgw_node = talloc(ctdb, struct natgw_node);
-               natgw_node->addr = talloc_strdup(natgw_node, node);
-               CTDB_NO_MEMORY(ctdb, natgw_node->addr);
-               natgw_node->next = natgw_nodes;
-               natgw_nodes = natgw_node;
-       }
-
-       if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
-               return -1;
-       }
-
-       /* Trim the nodemap so it only includes connected nodes in the
-        * current natgw group.
-        */
-       i=0;
-       while(i<nodemap->num) {
-               for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
-                       if (!strcmp(natgw_node->addr, ctdb_addr_to_str(&nodemap->nodes[i].addr))) {
-                               break;
-                       }
-               }
-
-               /* this node was not in the natgw so we just remove it from
-                * the list
-                */
-               if ((natgw_node == NULL) 
-               ||  (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) ) {
-                       int j;
-
-                       for (j=i+1; j<nodemap->num; j++) {
-                               nodemap->nodes[j-1] = nodemap->nodes[j];
-                       }
-                       nodemap->num--;
-                       continue;
-               }
-
-               i++;
-       }
-
-       if (options.machinereadable) {
-               printf(":Node:IP:\n");
+       /* Get a nodemap that includes only the nodes in the NATGW
+        * group */
+       nodemap = filter_nodemap_by_addrs(ctdb, orig_nodemap, natgw_nodes);
+       if (nodemap == NULL) {
+               ret = -1;
+               goto done;
        }
 
        ret = 2; /* matches ENOENT */
        pnn = -1;
        ip = "0.0.0.0";
+       /* For each flag mask... */
        for (i = 0; exclude_flags[i] != 0; i++) {
-               ret = find_natgw(ctdb, nodemap,
-                                exclude_flags[i],
-                                &pnn, &ip);
-               if (ret == -1) {
+               /* ... get a nodemap that excludes nodes with with
+                * masked flags... */
+               struct ctdb_node_map *t =
+                       filter_nodemap_by_flags(ctdb, nodemap,
+                                               exclude_flags[i]);
+               if (t == NULL) {
+                       /* No memory */
+                       ret = -1;
                        goto done;
                }
-               if (ret == 0) {
-                       break;
+               if (t->num > 0) {
+                       /* ... and find the first node with the NATGW
+                        * capability */
+                       struct ctdb_node_map *n;
+                       n = filter_nodemap_by_capabilities(ctdb, t,
+                                                          CTDB_CAP_NATGW,
+                                                          true);
+                       if (n == NULL) {
+                               /* No memory */
+                               ret = -1;
+                               goto done;
+                       }
+                       if (n->num > 0) {
+                               ret = 0;
+                               pnn = n->nodes[0].pnn;
+                               ip = ctdb_addr_to_str(&n->nodes[0].addr);
+                               break;
+                       }
                }
+               talloc_free(t);
        }
 
        if (options.machinereadable) {
-               printf(":Node:IP:\n");
-               printf(":%d:%s:\n", pnn, ip);
+               printm(":Node:IP:\n");
+               printm(":%d:%s:\n", pnn, ip);
        } else {
                printf("%d %s\n", pnn, ip);
        }
 
        /* print the pruned list of nodes belonging to this natgw list */
-       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
-               DEBUG(DEBUG_NOTICE, ("Unable to get PNN from node %u\n", options.pnn));
-               /* This is actually harmless and will only result in
-                * the "this node" indication being missing
-                */
-               mypnn = -1;
-       }
+       mypnn = getpnn(ctdb);
        if (options.machinereadable) {
                control_status_header_machine();
        } else {
@@ -1150,14 +1399,14 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                        continue;
                }
                if (options.machinereadable) {
-                       control_status_1_machine(mypnn, &(nodemap->nodes[i]));
+                       control_status_1_machine(ctdb, mypnn, &(nodemap->nodes[i]));
                } else {
-                       control_status_1_human(mypnn, &(nodemap->nodes[i]));
+                       control_status_1_human(ctdb, mypnn, &(nodemap->nodes[i]));
                }
        }
 
 done:
-       ctdb_free_nodemap(nodemap);
+       talloc_free(tmp_ctx);
        return ret;
 }
 
@@ -1185,8 +1434,14 @@ static int control_one_scriptstatus(struct ctdb_context *ctdb,
        }
 
        if (!options.machinereadable) {
+               int num_run = 0;
+               for (i=0; i<script_status->num_scripts; i++) {
+                       if (script_status->scripts[i].status != -ENOEXEC) {
+                               num_run++;
+                       }
+               }
                printf("%d scripts were executed last %s cycle\n",
-                      script_status->num_scripts,
+                      num_run,
                       ctdb_eventscript_call_names[type]);
        }
        for (i=0; i<script_status->num_scripts; i++) {
@@ -1208,7 +1463,7 @@ static int control_one_scriptstatus(struct ctdb_context *ctdb,
                        break;
                }
                if (options.machinereadable) {
-                       printf(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
+                       printm(":%s:%s:%i:%s:%lu.%06lu:%lu.%06lu:%s:\n",
                               ctdb_eventscript_call_names[type],
                               script_status->scripts[i].name,
                               script_status->scripts[i].status,
@@ -1284,7 +1539,7 @@ static int control_scriptstatus(struct ctdb_context *ctdb,
        }
 
        if (options.machinereadable) {
-               printf(":Type:Name:Code:Status:Start:End:Error Output...:\n");
+               printm(":Type:Name:Code:Status:Start:End:Error Output...:\n");
        }
 
        for (type = min; type < max; type++) {
@@ -1343,8 +1598,10 @@ static int control_disablescript(struct ctdb_context *ctdb, int argc, const char
 static int control_recmaster(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t recmaster;
+       int ret;
 
-       if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
+       ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
                return -1;
        }
@@ -1362,6 +1619,8 @@ static int control_add_tickle(struct ctdb_context *ctdb, int argc, const char **
        TDB_DATA data;
        int ret;
 
+       assert_single_node_only();
+
        if (argc < 2) {
                usage();
        }
@@ -1399,6 +1658,8 @@ static int control_del_tickle(struct ctdb_context *ctdb, int argc, const char **
        TDB_DATA data;
        int ret;
 
+       assert_single_node_only();
+
        if (argc < 2) {
                usage();
        }
@@ -1437,6 +1698,8 @@ static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char *
        int i, ret;
        unsigned port = 0;
 
+       assert_single_node_only();
+
        if (argc < 1) {
                usage();
        }
@@ -1457,13 +1720,13 @@ static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char *
        }
 
        if (options.machinereadable){
-               printf(":source ip:port:destination ip:port:\n");
+               printm(":source ip:port:destination ip:port:\n");
                for (i=0;i<list->tickles.num;i++) {
                        if (port && port != ntohs(list->tickles.connections[i].dst_addr.ip.sin_port)) {
                                continue;
                        }
-                       printf(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
-                       printf(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
+                       printm(":%s:%u", ctdb_addr_to_str(&list->tickles.connections[i].src_addr), ntohs(list->tickles.connections[i].src_addr.ip.sin_port));
+                       printm(":%s:%u:\n", ctdb_addr_to_str(&list->tickles.connections[i].dst_addr), ntohs(list->tickles.connections[i].dst_addr.ip.sin_port));
                }
        } else {
                printf("Tickles for ip:%s\n", ctdb_addr_to_str(&list->addr));
@@ -1538,7 +1801,7 @@ static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn
                return ret;
        }
 
-               nodes = list_of_active_nodes_except_pnn(ctdb, nodemap, tmp_ctx, pnn);
+       nodes = list_of_nodes(ctdb, nodemap, tmp_ctx, NODE_FLAGS_INACTIVE, pnn);
        ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
                                        nodes, 0,
                                        LONGTIMELIMIT(),
@@ -1571,15 +1834,94 @@ static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn
        return 0;
 }
 
+
+/* 
+ * scans all other nodes and returns a pnn for another node that can host this 
+ * ip address or -1
+ */
+static int
+find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_all_public_ips *ips;
+       struct ctdb_node_map *nodemap=NULL;
+       int i, j, ret;
+       int pnn;
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       for(i=0;i<nodemap->num;i++){
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               if (nodemap->nodes[i].pnn == options.pnn) {
+                       continue;
+               }
+
+               /* read the public ip list from this node */
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
+                       return -1;
+               }
+
+               for (j=0;j<ips->num;j++) {
+                       if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
+                               pnn = nodemap->nodes[i].pnn;
+                               talloc_free(tmp_ctx);
+                               return pnn;
+                       }
+               }
+               talloc_free(ips);
+       }
+
+       talloc_free(tmp_ctx);
+       return -1;
+}
+
+/* If pnn is -1 then try to find a node to move IP to... */
+static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
+{
+       bool pnn_specified = (pnn == -1 ? false : true);
+       int retries = 0;
+
+       while (retries < 5) {
+               if (!pnn_specified) {
+                       pnn = find_other_host_for_public_ip(ctdb, addr);
+                       if (pnn == -1) {
+                               return false;
+                       }
+                       DEBUG(DEBUG_NOTICE,
+                             ("Trying to move public IP to node %u\n", pnn));
+               }
+
+               if (move_ip(ctdb, addr, pnn) == 0) {
+                       return true;
+               }
+
+               sleep(3);
+               retries++;
+       }
+
+       return false;
+}
+
+
 /*
   move/failover an ip address to a specific node
  */
 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t pnn;
-       int ret, retries = 0;
        ctdb_sock_addr addr;
 
+       assert_single_node_only();
+
        if (argc < 2) {
                usage();
                return -1;
@@ -1596,16 +1938,8 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-       do {
-               ret = move_ip(ctdb, &addr, pnn);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
-                       sleep(3);
-                       retries++;
-               }
-       } while (retries < 5 && ret != 0);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
+       if (!try_moveip(ctdb, &addr, pnn)) {
+               DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
                return -1;
        }
 
@@ -1614,18 +1948,14 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
 
 static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
 {
-       uint32_t recmaster;
        TDB_DATA data;
 
-       if (ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), pnn, &recmaster) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", pnn));
-               return -1;
-       }
-
        data.dptr  = (uint8_t *)&pnn;
        data.dsize = sizeof(uint32_t);
-       if (ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
-               DEBUG(DEBUG_ERR,("Failed to send message to force node reallocation\n"));
+       if (ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_REBALANCE_NODE, data) != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to send message to force node %u to be a rebalancing target\n",
+                      pnn));
                return -1;
        }
 
@@ -1639,16 +1969,34 @@ static int rebalance_node(struct ctdb_context *ctdb, uint32_t pnn)
  */
 static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       switch (options.pnn) {
-       case CTDB_BROADCAST_ALL:
-       case CTDB_CURRENT_NODE:
-               DEBUG(DEBUG_ERR,("You must specify a node number with -n <pnn> for the node to rebalance\n"));
-               return -1;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t *nodes;
+       uint32_t pnn_mode;
+       int i, ret;
+
+       assert_single_node_only();
+
+       if (argc > 1) {
+               usage();
        }
 
-       return rebalance_node(ctdb, options.pnn);
-}
+       /* Determine the nodes where IPs need to be reloaded */
+       if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
+                             options.pnn, true, &nodes, &pnn_mode)) {
+               ret = -1;
+               goto done;
+       }
+
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (!rebalance_node(ctdb, nodes[i])) {
+                       ret = -1;
+               }
+       }
 
+done:
+       talloc_free(tmp_ctx);
+       return ret;
+}
 
 static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
 {
@@ -1706,6 +2054,8 @@ static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char *
 {
        ctdb_sock_addr addr;
 
+       assert_single_node_only();
+
        if (argc < 1) {
                usage();
                return -1;
@@ -1838,188 +2188,186 @@ control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struc
 }
 
 
-/* 
- * scans all other nodes and returns a pnn for another node that can host this 
- * ip address or -1
- */
-static int
-find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
+static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
 {
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       struct ctdb_all_public_ips *ips;
-       struct ctdb_node_map *nodemap=NULL;
-       int i, j, ret;
+       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               talloc_free(tmp_ctx);
-               return ret;
-       }
+       event_add_timed(ctdb->ev, ctdb, 
+                               timeval_current_ofs(1, 0),
+                               ctdb_every_second, ctdb);
+}
 
-       for(i=0;i<nodemap->num;i++){
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (nodemap->nodes[i].pnn == options.pnn) {
-                       continue;
-               }
+struct srvid_reply_handler_data {
+       bool done;
+       bool wait_for_all;
+       uint32_t *nodes;
+       const char *srvid_str;
+};
 
-               /* read the public ip list from this node */
-               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
-                       return -1;
-               }
+static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
+                                        uint64_t srvid,
+                                        TDB_DATA data,
+                                        void *private_data)
+{
+       struct srvid_reply_handler_data *d =
+               (struct srvid_reply_handler_data *)private_data;
+       int i;
+       int32_t ret;
 
-               for (j=0;j<ips->num;j++) {
-                       if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
-                               talloc_free(tmp_ctx);
-                               return nodemap->nodes[i].pnn;
-                       }
-               }
-               talloc_free(ips);
+       if (data.dsize != sizeof(ret)) {
+               DEBUG(DEBUG_ERR, (__location__ " Wrong reply size\n"));
+               return;
        }
 
-       talloc_free(tmp_ctx);
-       return -1;
-}
-
-static uint32_t ipreallocate_finished;
-
-/*
-  handler for receiving the response to ipreallocate
-*/
-static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
-{
-       ipreallocate_finished = 1;
-}
+       /* ret will be a PNN (i.e. >=0) on success, or negative on error */
+       ret = *(int32_t *)data.dptr;
+       if (ret < 0) {
+               DEBUG(DEBUG_ERR,
+                     ("%s failed with result %d\n", d->srvid_str, ret));
+               return;
+       }
 
-static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
-{
-       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+       if (!d->wait_for_all) {
+               d->done = true;
+               return;
+       }
 
-       event_add_timed(ctdb->ev, ctdb, 
-                               timeval_current_ofs(1, 0),
-                               ctdb_every_second, ctdb);
+       /* Wait for all replies */
+       d->done = true;
+       for (i = 0; i < talloc_array_length(d->nodes); i++) {
+               if (d->nodes[i] == ret) {
+                       DEBUG(DEBUG_INFO,
+                             ("%s reply received from node %u\n",
+                              d->srvid_str, ret));
+                       d->nodes[i] = -1;
+               }
+               if (d->nodes[i] != -1) {
+                       /* Found a node that hasn't yet replied */
+                       d->done = false;
+               }
+       }
 }
 
-/*
-  ask the recovery daemon on the recovery master to perform a ip reallocation
+/* Broadcast the given SRVID to all connected nodes.  Wait for 1 reply
+ * or replies from all connected nodes.  arg is the data argument to
+ * pass in the srvid_request structure - pass 0 if this isn't needed.
  */
-static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
+static int srvid_broadcast(struct ctdb_context *ctdb,
+                          uint64_t srvid, uint32_t *arg,
+                          const char *srvid_str, bool wait_for_all)
 {
-       int i, ret;
+       int ret;
        TDB_DATA data;
-       struct takeover_run_reply rd;
-       uint32_t recmaster;
-       struct ctdb_node_map *nodemap=NULL;
-       int retries=0;
-       struct timeval tv = timeval_current();
+       uint32_t pnn;
+       uint64_t reply_srvid;
+       struct srvid_request request;
+       struct srvid_request_data request_data;
+       struct srvid_reply_handler_data reply_data;
+       struct timeval tv;
 
-       /* we need some events to trigger so we can timeout and restart
-          the loop
-       */
+       ZERO_STRUCT(request);
+
+       /* Time ticks to enable timeouts to be processed */
        event_add_timed(ctdb->ev, ctdb, 
                                timeval_current_ofs(1, 0),
                                ctdb_every_second, ctdb);
 
-       rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-       if (rd.pnn == -1) {
-               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
-               return -1;
+       pnn = ctdb_get_pnn(ctdb);
+       reply_srvid = getpid();
+
+       if (arg == NULL) {
+               request.pnn = pnn;
+               request.srvid = reply_srvid;
+
+               data.dptr = (uint8_t *)&request;
+               data.dsize = sizeof(request);
+       } else {
+               request_data.pnn = pnn;
+               request_data.srvid = reply_srvid;
+               request_data.data = *arg;
+
+               data.dptr = (uint8_t *)&request_data;
+               data.dsize = sizeof(request_data);
        }
-       rd.srvid = getpid();
 
-       /* register a message port for receiveing the reply so that we
-          can receive the reply
-       */
-       ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
+       /* Register message port for reply from recovery master */
+       ctdb_client_set_message_handler(ctdb, reply_srvid,
+                                       srvid_broadcast_reply_handler,
+                                       &reply_data);
 
-       data.dptr = (uint8_t *)&rd;
-       data.dsize = sizeof(rd);
+       reply_data.wait_for_all = wait_for_all;
+       reply_data.nodes = NULL;
+       reply_data.srvid_str = srvid_str;
 
 again:
-       /* check that there are valid nodes available */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               return -1;
-       }
-       for (i=0; i<nodemap->num;i++) {
-               if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
-                       break;
+       reply_data.done = false;
+
+       if (wait_for_all) {
+               struct ctdb_node_map *nodemap;
+
+               ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
+                                          CTDB_CURRENT_NODE, ctdb, &nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,
+                             ("Unable to get nodemap from current node, try again\n"));
+                       sleep(1);
+                       goto again;
                }
-       }
-       if (i==nodemap->num) {
-               DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
-               return 0;
-       }
 
+               if (reply_data.nodes != NULL) {
+                       talloc_free(reply_data.nodes);
+               }
+               reply_data.nodes = list_of_connected_nodes(ctdb, nodemap,
+                                                          NULL, true);
 
-       if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
-               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
-               return -1;
+               talloc_free(nodemap);
        }
 
-       /* verify the node exists */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
+       /* Send to all connected nodes. Only recmaster replies */
+       ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
+                                      srvid, data);
+       if (ret != 0) {
+               /* This can only happen if the socket is closed and
+                * there's no way to recover from that, so don't try
+                * again.
+                */
+               DEBUG(DEBUG_ERR,
+                     ("Failed to send %s request to connected nodes\n",
+                      srvid_str));
                return -1;
        }
 
-
-       /* check tha there are nodes available that can act as a recmaster */
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
-                       continue;
-               }
-               break;
-       }
-       if (i == nodemap->num) {
-               DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
-               return 0;
+       tv = timeval_current();
+       /* This loop terminates the reply is received */
+       while (timeval_elapsed(&tv) < 5.0 && !reply_data.done) {
+               event_loop_once(ctdb->ev);
        }
 
-       /* verify the recovery master is not STOPPED, nor BANNED */
-       if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
-               DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
-               retries++;
-               sleep(1);
-               goto again;
-       } 
-       
-       /* verify the recovery master is not STOPPED, nor BANNED */
-       if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
-               DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
-               retries++;
+       if (!reply_data.done) {
+               DEBUG(DEBUG_NOTICE,
+                     ("Still waiting for confirmation of %s\n", srvid_str));
                sleep(1);
                goto again;
-       } 
-
-       ipreallocate_finished = 0;
-       ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
-               return -1;
        }
 
-       tv = timeval_current();
-       /* this loop will terminate when we have received the reply */
-       while (timeval_elapsed(&tv) < 5.0 && ipreallocate_finished == 0) {
-               event_loop_once(ctdb->ev);
-       }
-       if (ipreallocate_finished == 1) {
-               return 0;
-       }
+       ctdb_client_remove_message_handler(ctdb, reply_srvid, &reply_data);
 
-       retries++;
-       sleep(1);
-       goto again;
+       talloc_free(reply_data.nodes);
 
        return 0;
 }
 
+static int ipreallocate(struct ctdb_context *ctdb)
+{
+       return srvid_broadcast(ctdb, CTDB_SRVID_TAKEOVER_RUN, NULL,
+                              "IP reallocation", false);
+}
+
+
+static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       return ipreallocate(ctdb);
+}
 
 /*
   add a public ip address to a node
@@ -2141,7 +2489,8 @@ static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **a
                if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
                        continue;
                }
@@ -2169,7 +2518,8 @@ static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **a
                if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
-               if (ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips) != 0) {
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %d\n", nodemap->nodes[i].pnn));
                        continue;
                }
@@ -2197,7 +2547,6 @@ static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **a
 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int i, ret;
-       int retries = 0;
        ctdb_sock_addr addr;
        struct ctdb_control_ip_iface pub;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
@@ -2241,22 +2590,12 @@ static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
                return -1;
        }
 
+       /* This is an optimisation.  If this node is hosting the IP
+        * then try to move it somewhere else without invoking a full
+        * takeover run.  We don't care if this doesn't work!
+        */
        if (ips->ips[i].pnn == options.pnn) {
-               ret = find_other_host_for_public_ip(ctdb, &addr);
-               if (ret != -1) {
-                       do {
-                               ret = move_ip(ctdb, &addr, ret);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 seconds and try again.\n", options.pnn));
-                                       sleep(3);
-                                       retries++;
-                               }
-                       } while (retries < 5 && ret != 0);
-                       if (ret != 0) {
-                               DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", options.pnn));
-                               return -1;
-                       }
-               }
+               (void) try_moveip(ctdb, &addr, -1);
        }
 
        ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
@@ -2270,6 +2609,106 @@ static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
        return 0;
 }
 
+static int kill_tcp_from_file(struct ctdb_context *ctdb,
+                             int argc, const char **argv)
+{
+       struct ctdb_control_killtcp *killtcp;
+       int max_entries, current, i;
+       struct timeval timeout;
+       char line[128], src[128], dst[128];
+       int linenum;
+       TDB_DATA data;
+       struct client_async_data *async_data;
+       struct ctdb_client_control_state *state;
+
+       if (argc != 0) {
+               usage();
+       }
+
+       linenum = 1;
+       killtcp = NULL;
+       max_entries = 0;
+       current = 0;
+       while (!feof(stdin)) {
+               if (fgets(line, sizeof(line), stdin) == NULL) {
+                       continue;
+               }
+
+               /* Silently skip empty lines */
+               if (line[0] == '\n') {
+                       continue;
+               }
+
+               if (sscanf(line, "%s %s\n", src, dst) != 2) {
+                       DEBUG(DEBUG_ERR, ("Bad line [%d]: '%s'\n",
+                                         linenum, line));
+                       talloc_free(killtcp);
+                       return -1;
+               }
+
+               if (current >= max_entries) {
+                       max_entries += 1024;
+                       killtcp = talloc_realloc(ctdb, killtcp,
+                                                struct ctdb_control_killtcp,
+                                                max_entries);
+                       CTDB_NO_MEMORY(ctdb, killtcp);
+               }
+
+               if (!parse_ip_port(src, &killtcp[current].src_addr)) {
+                       DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
+                                         linenum, src));
+                       talloc_free(killtcp);
+                       return -1;
+               }
+
+               if (!parse_ip_port(dst, &killtcp[current].dst_addr)) {
+                       DEBUG(DEBUG_ERR, ("Bad IP:port on line [%d]: '%s'\n",
+                                         linenum, dst));
+                       talloc_free(killtcp);
+                       return -1;
+               }
+
+               current++;
+       }
+
+       async_data = talloc_zero(ctdb, struct client_async_data);
+       if (async_data == NULL) {
+               talloc_free(killtcp);
+               return -1;
+       }
+
+       for (i = 0; i < current; i++) {
+
+               data.dsize = sizeof(struct ctdb_control_killtcp);
+               data.dptr  = (unsigned char *)&killtcp[i];
+
+               timeout = TIMELIMIT();
+               state = ctdb_control_send(ctdb, options.pnn, 0,
+                                         CTDB_CONTROL_KILL_TCP, 0, data,
+                                         async_data, &timeout, NULL);
+
+               if (state == NULL) {
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to call async killtcp control to node %u\n",
+                              options.pnn));
+                       talloc_free(killtcp);
+                       return -1;
+               }
+               
+               ctdb_client_async_add(async_data, state);
+       }
+
+       if (ctdb_client_async_wait(ctdb, async_data) != 0) {
+               DEBUG(DEBUG_ERR,("killtcp failed\n"));
+               talloc_free(killtcp);
+               return -1;
+       }
+
+       talloc_free(killtcp);
+       return 0;
+}
+
+
 /*
   kill a tcp connection
  */
@@ -2278,6 +2717,12 @@ static int kill_tcp(struct ctdb_context *ctdb, int argc, const char **argv)
        int ret;
        struct ctdb_control_killtcp killtcp;
 
+       assert_single_node_only();
+
+       if (argc == 0) {
+               return kill_tcp_from_file(ctdb, argc, argv);
+       }
+
        if (argc < 2) {
                usage();
        }
@@ -2310,6 +2755,8 @@ static int control_gratious_arp(struct ctdb_context *ctdb, int argc, const char
        int ret;
        ctdb_sock_addr addr;
 
+       assert_single_node_only();
+
        if (argc < 2) {
                usage();
        }
@@ -2455,8 +2902,7 @@ static int check_srvids(struct ctdb_context *ctdb, int argc, const char **argv)
                ids[i] = strtoull(argv[i], NULL, 0);
        }
 
-       if (!ctdb_check_message_handlers(ctdb_connection,
-               options.pnn, argc, ids, result)) {
+       if (!ctdb_client_check_message_handlers(ctdb, ids, argc, result)) {
                DEBUG(DEBUG_ERR, ("Unable to check server_id from node %u\n",
                                  options.pnn));
                talloc_free(tmp_ctx);
@@ -2527,11 +2973,11 @@ static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
        }
 
        if (options.machinereadable){
-               printf(":Public IP:Node:");
+               printm(":Public IP:Node:");
                if (options.verbose){
-                       printf("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
+                       printm("ActiveInterface:AvailableInterfaces:ConfiguredInterfaces:");
                }
-               printf("\n");
+               printm("\n");
        } else {
                if (options.pnn == CTDB_BROADCAST_ALL) {
                        printf("Public IPs on ALL nodes\n");
@@ -2591,11 +3037,11 @@ static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
                }
 
                if (options.machinereadable){
-                       printf(":%s:%d:",
+                       printm(":%s:%d:",
                                ctdb_addr_to_str(&ips->ips[ips->num-i].addr),
                                ips->ips[ips->num-i].pnn);
                        if (options.verbose){
-                               printf("%s:%s:%s:",
+                               printm("%s:%s:%s:",
                                        aciface?aciface:"",
                                        avifaces?avifaces:"",
                                        cifaces?cifaces:"");
@@ -2679,25 +3125,29 @@ static int control_ipinfo(struct ctdb_context *ctdb, int argc, const char **argv
  */
 static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        int i;
-       struct ctdb_ifaces_list *ifaces;
+       struct ctdb_control_get_ifaces *ifaces;
+       int ret;
 
        /* read the public ip list from this node */
-       if (!ctdb_getifaces(ctdb_connection, options.pnn, &ifaces)) {
+       ret = ctdb_ctrl_get_ifaces(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ifaces);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get interfaces from node %u\n",
                                  options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
 
        if (options.machinereadable){
-               printf(":Name:LinkStatus:References:\n");
+               printm(":Name:LinkStatus:References:\n");
        } else {
                printf("Interfaces on node %u\n", options.pnn);
        }
 
        for (i=0; i<ifaces->num; i++) {
                if (options.machinereadable){
-                       printf(":%s:%s:%u\n",
+                       printm(":%s:%s:%u:\n",
                               ifaces->ifaces[i].name,
                               ifaces->ifaces[i].link_state?"1":"0",
                               (unsigned int)ifaces->ifaces[i].references);
@@ -2709,7 +3159,7 @@ static int control_ifaces(struct ctdb_context *ctdb, int argc, const char **argv
                }
        }
 
-       ctdb_free_ifaces(ifaces);
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -2780,165 +3230,139 @@ static int control_getpid(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
-/*
-  disable a remote node
- */
-static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
+typedef bool update_flags_handler_t(struct ctdb_context *ctdb, void *data);
+
+static int update_flags_and_ipreallocate(struct ctdb_context *ctdb,
+                                             void *data,
+                                             update_flags_handler_t handler,
+                                             uint32_t flag,
+                                             const char *desc,
+                                             bool set_flag)
 {
+       struct ctdb_node_map *nodemap = NULL;
+       bool flag_is_set;
        int ret;
-       struct ctdb_node_map *nodemap=NULL;
 
-       /* check if the node is already disabled */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
+       /* Check if the node is already in the desired state */
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
                exit(10);
        }
-       if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
-               DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
+       flag_is_set = nodemap->nodes[options.pnn].flags & flag;
+       if (set_flag == flag_is_set) {
+               DEBUG(DEBUG_NOTICE, ("Node %d is %s %s\n", options.pnn,
+                                    (set_flag ? "already" : "not"), desc));
                return 0;
        }
 
        do {
-               ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to disable node %u\n", options.pnn));
-                       return ret;
+               if (!handler(ctdb, data)) {
+                       DEBUG(DEBUG_WARNING,
+                             ("Failed to send control to set state %s on node %u, try again\n",
+                              desc, options.pnn));
                }
 
                sleep(1);
 
-               /* read the nodemap and verify the change took effect */
-               if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-                       exit(10);
+               /* Read the nodemap and verify the change took effect.
+                * Even if the above control/hanlder timed out then it
+                * could still have worked!
+                */
+               ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE,
+                                        ctdb, &nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_WARNING,
+                             ("Unable to get nodemap from local node, try again\n"));
                }
+               flag_is_set = nodemap->nodes[options.pnn].flags & flag;
+       } while (nodemap == NULL || (set_flag != flag_is_set));
 
-       } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED));
-       ret = control_ipreallocate(ctdb, argc, argv);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
-               return ret;
-       }
-
-       return 0;
+       return ipreallocate(ctdb);
 }
 
-/*
-  enable a disabled remote node
- */
-static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv)
+/* Administratively disable a node */
+static bool update_flags_disabled(struct ctdb_context *ctdb, void *data)
 {
        int ret;
 
-       struct ctdb_node_map *nodemap=NULL;
-
-
-       /* check if the node is already enabled */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               exit(10);
-       }
-       if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
-               DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
-               return 0;
-       }
-
-       do {
-               ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to enable node %u\n", options.pnn));
-                       return ret;
-               }
-
-               sleep(1);
+       ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
+                                NODE_FLAGS_PERMANENTLY_DISABLED, 0);
+       return ret == 0;
+}
 
-               /* read the nodemap and verify the change took effect */
-               if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-                       exit(10);
-               }
+static int control_disable(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       return update_flags_and_ipreallocate(ctdb, NULL,
+                                                 update_flags_disabled,
+                                                 NODE_FLAGS_PERMANENTLY_DISABLED,
+                                                 "disabled",
+                                                 true /* set_flag*/);
+}
 
-       } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED);
+/* Administratively re-enable a node */
+static bool update_flags_not_disabled(struct ctdb_context *ctdb, void *data)
+{
+       int ret;
 
-       ret = control_ipreallocate(ctdb, argc, argv);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
-               return ret;
-       }
+       ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn,
+                                0, NODE_FLAGS_PERMANENTLY_DISABLED);
+       return ret == 0;
+}
 
-       return 0;
+static int control_enable(struct ctdb_context *ctdb,  int argc, const char **argv)
+{
+       return update_flags_and_ipreallocate(ctdb, NULL,
+                                                 update_flags_not_disabled,
+                                                 NODE_FLAGS_PERMANENTLY_DISABLED,
+                                                 "disabled",
+                                                 false /* set_flag*/);
 }
 
-/*
-  stop a remote node
- */
-static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
+/* Stop a node */
+static bool update_flags_stopped(struct ctdb_context *ctdb, void *data)
 {
        int ret;
-       struct ctdb_node_map *nodemap=NULL;
-
-       do {
-               ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to stop node %u   try again\n", options.pnn));
-               }
-       
-               sleep(1);
 
-               /* read the nodemap and verify the change took effect */
-               if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-                       exit(10);
-               }
+       ret = ctdb_ctrl_stop_node(ctdb, TIMELIMIT(), options.pnn);
 
-       } while (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED));
-       ret = control_ipreallocate(ctdb, argc, argv);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
-               return ret;
-       }
+       return ret == 0;
+}
 
-       return 0;
+static int control_stop(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       return update_flags_and_ipreallocate(ctdb, NULL,
+                                                 update_flags_stopped,
+                                                 NODE_FLAGS_STOPPED,
+                                                 "stopped",
+                                                 true /* set_flag*/);
 }
 
-/*
-  restart a stopped remote node
- */
-static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
+/* Continue a stopped node */
+static bool update_flags_not_stopped(struct ctdb_context *ctdb, void *data)
 {
        int ret;
 
-       struct ctdb_node_map *nodemap=NULL;
-
-       do {
-               ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to continue node %u\n", options.pnn));
-                       return ret;
-               }
-       
-               sleep(1);
-
-               /* read the nodemap and verify the change took effect */
-               if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-                       exit(10);
-               }
+       ret = ctdb_ctrl_continue_node(ctdb, TIMELIMIT(), options.pnn);
 
-       } while (nodemap->nodes[options.pnn].flags & NODE_FLAGS_STOPPED);
-       ret = control_ipreallocate(ctdb, argc, argv);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
-               return ret;
-       }
+       return ret == 0;
+}
 
-       return 0;
+static int control_continue(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       return update_flags_and_ipreallocate(ctdb, NULL,
+                                                 update_flags_not_stopped,
+                                                 NODE_FLAGS_STOPPED,
+                                                 "stopped",
+                                                 false /* set_flag */);
 }
 
 static uint32_t get_generation(struct ctdb_context *ctdb)
 {
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        struct ctdb_vnn_map *vnnmap=NULL;
        int ret;
+       uint32_t generation;
 
        /* wait until the recmaster is not in recovery mode */
        while (1) {
@@ -2950,116 +3374,88 @@ static uint32_t get_generation(struct ctdb_context *ctdb)
                }
 
                /* get the recmaster */
-               if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
+               ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, TIMELIMIT(), CTDB_CURRENT_NODE, &recmaster);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
+                       talloc_free(tmp_ctx);
                        exit(10);
                }
 
                /* get recovery mode */
-               if (!ctdb_getrecmode(ctdb_connection, recmaster, &recmode)) {
+               ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), recmaster, &recmode);
+               if (ret != 0) {
                        DEBUG(DEBUG_ERR, ("Unable to get recmode from node %u\n", options.pnn));
+                       talloc_free(tmp_ctx);
                        exit(10);
                }
 
                /* get the current generation number */
-               ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, ctdb, &vnnmap);
+               ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), recmaster, tmp_ctx, &vnnmap);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR, ("Unable to get vnnmap from recmaster (%u)\n", recmaster));
+                       talloc_free(tmp_ctx);
                        exit(10);
                }
 
-               if ((recmode == CTDB_RECOVERY_NORMAL)
-               &&  (vnnmap->generation != 1)){
-                       return vnnmap->generation;
+               if ((recmode == CTDB_RECOVERY_NORMAL) && (vnnmap->generation != 1)) {
+                       generation = vnnmap->generation;
+                       talloc_free(tmp_ctx);
+                       return generation;
                }
                sleep(1);
        }
 }
 
-/*
-  ban a node from the cluster
- */
-static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
+/* Ban a node */
+static bool update_state_banned(struct ctdb_context *ctdb, void *data)
 {
+       struct ctdb_ban_time *bantime = (struct ctdb_ban_time *)data;
        int ret;
-       struct ctdb_node_map *nodemap=NULL;
+
+       ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, bantime);
+
+       return ret == 0;
+}
+
+static int control_ban(struct ctdb_context *ctdb, int argc, const char **argv)
+{
        struct ctdb_ban_time bantime;
 
        if (argc < 1) {
                usage();
        }
        
-       /* verify the node exists */
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               return ret;
-       }
-
-       if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED) {
-               DEBUG(DEBUG_ERR,("Node %u is already banned.\n", options.pnn));
-               return -1;
-       }
-
        bantime.pnn  = options.pnn;
        bantime.time = strtoul(argv[0], NULL, 0);
 
-       ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Banning node %d for %d seconds failed.\n", bantime.pnn, bantime.time));
+       if (bantime.time == 0) {
+               DEBUG(DEBUG_ERR, ("Invalid ban time specified - must be >0\n"));
                return -1;
-       }       
-
-       ret = control_ipreallocate(ctdb, argc, argv);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
-               return ret;
        }
 
-       return 0;
+       return update_flags_and_ipreallocate(ctdb, &bantime,
+                                                 update_state_banned,
+                                                 NODE_FLAGS_BANNED,
+                                                 "banned",
+                                                 true /* set_flag*/);
 }
 
 
-/*
-  unban a node from the cluster
- */
+/* Unban a node */
 static int control_unban(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int ret;
-       struct ctdb_node_map *nodemap=NULL;
        struct ctdb_ban_time bantime;
 
-       /* verify the node exists */
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               return ret;
-       }
-
-       if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_BANNED)) {
-               DEBUG(DEBUG_ERR,("Node %u is not banned.\n", options.pnn));
-               return -1;
-       }
-
        bantime.pnn  = options.pnn;
        bantime.time = 0;
 
-       ret = ctdb_ctrl_set_ban(ctdb, TIMELIMIT(), options.pnn, &bantime);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Unbanning node %d failed.\n", bantime.pnn));
-               return -1;
-       }       
-
-       ret = control_ipreallocate(ctdb, argc, argv);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("IP Reallocate failed on node %u\n", options.pnn));
-               return ret;
-       }
-
-       return 0;
+       return update_flags_and_ipreallocate(ctdb, &bantime,
+                                                 update_state_banned,
+                                                 NODE_FLAGS_BANNED,
+                                                 "banned",
+                                                 false /* set_flag*/);
 }
 
-
 /*
   show ban information for a node
  */
@@ -3085,7 +3481,8 @@ static int control_showban(struct ctdb_context *ctdb, int argc, const char **arg
        if (bantime->time == 0) {
                printf("Node %u is not banned\n", bantime->pnn);
        } else {
-               printf("Node %u is banned banned for %d seconds\n", bantime->pnn, bantime->time);
+               printf("Node %u is banned, %d seconds remaining\n",
+                      bantime->pnn, bantime->time);
        }
 
        return 0;
@@ -3118,12 +3515,6 @@ static int control_recover(struct ctdb_context *ctdb, int argc, const char **arg
        /* record the current generation number */
        generation = get_generation(ctdb);
 
-       ret = ctdb_ctrl_freeze_priority(ctdb, TIMELIMIT(), options.pnn, 1);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to freeze node\n"));
-               return ret;
-       }
-
        ret = ctdb_ctrl_setrecmode(ctdb, TIMELIMIT(), options.pnn, CTDB_RECOVERY_ACTIVE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to set recovery mode\n"));
@@ -3159,8 +3550,8 @@ static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **
        if (!options.machinereadable){
                printf("Monitoring mode:%s (%d)\n",monmode==CTDB_MONITORING_ACTIVE?"ACTIVE":"DISABLED",monmode);
        } else {
-               printf(":mode:\n");
-               printf(":%d:\n",monmode);
+               printm(":mode:\n");
+               printm(":%d:\n",monmode);
        }
        return 0;
 }
@@ -3172,8 +3563,10 @@ static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **
 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t capabilities;
+       int ret;
 
-       if (!ctdb_getcapabilities(ctdb_connection, options.pnn, &capabilities)) {
+       ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
                return -1;
        }
@@ -3184,8 +3577,8 @@ static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const ch
                printf("LVS: %s\n", (capabilities&CTDB_CAP_LVS)?"YES":"NO");
                printf("NATGW: %s\n", (capabilities&CTDB_CAP_NATGW)?"YES":"NO");
        } else {
-               printf(":RECMASTER:LMASTER:LVS:NATGW:\n");
-               printf(":%d:%d:%d:%d:\n",
+               printm(":RECMASTER:LMASTER:LVS:NATGW:\n");
+               printm(":%d:%d:%d:%d:\n",
                        !!(capabilities&CTDB_CAP_RECMASTER),
                        !!(capabilities&CTDB_CAP_LMASTER),
                        !!(capabilities&CTDB_CAP_LVS),
@@ -3197,71 +3590,62 @@ static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const ch
 /*
   display lvs configuration
  */
+
+static uint32_t lvs_exclude_flags[] = {
+       /* Look for a nice healthy node */
+       NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED,
+       /* If not found, an UNHEALTHY node will do */
+       NODE_FLAGS_INACTIVE|NODE_FLAGS_PERMANENTLY_DISABLED,
+       0,
+};
+
 static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       uint32_t *capabilities;
-       struct ctdb_node_map *nodemap=NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_node_map *orig_nodemap=NULL;
+       struct ctdb_node_map *nodemap;
        int i, ret;
-       int healthy_count = 0;
 
-       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
+                                  tmp_ctx, &orig_nodemap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
 
-       capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
-       CTDB_NO_MEMORY(ctdb, capabilities);
-       
+       nodemap = filter_nodemap_by_capabilities(ctdb, orig_nodemap,
+                                                CTDB_CAP_LVS, false);
+       if (nodemap == NULL) {
+               /* No memory */
+               ret = -1;
+               goto done;
+       }
+
        ret = 0;
 
-       /* collect capabilities for all connected nodes */
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
-                       continue;
-               }
-       
-               if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
-                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
+       for (i = 0; lvs_exclude_flags[i] != 0; i++) {
+               struct ctdb_node_map *t =
+                       filter_nodemap_by_flags(ctdb, nodemap,
+                                               lvs_exclude_flags[i]);
+               if (t == NULL) {
+                       /* No memory */
                        ret = -1;
                        goto done;
                }
-
-               if (!(capabilities[i] & CTDB_CAP_LVS)) {
-                       continue;
-               }
-
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
-                       healthy_count++;
-               }
-       }
-
-       /* Print all LVS nodes */
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
-                       continue;
-               }
-               if (!(capabilities[i] & CTDB_CAP_LVS)) {
-                       continue;
-               }
-
-               if (healthy_count != 0) {
-                       if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
-                               continue;
+               if (t->num > 0) {
+                       /* At least 1 node without excluded flags */
+                       int j;
+                       for (j = 0; j < t->num; j++) {
+                               printf("%d:%s\n", t->nodes[j].pnn,
+                                      ctdb_addr_to_str(&t->nodes[j].addr));
                        }
+                       goto done;
                }
-
-               printf("%d:%s\n", i, 
-                       ctdb_addr_to_str(&nodemap->nodes[i].addr));
+               talloc_free(t);
        }
-
 done:
-       ctdb_free_nodemap(nodemap);
+       talloc_free(tmp_ctx);
        return ret;
 }
 
@@ -3270,75 +3654,55 @@ done:
  */
 static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       uint32_t *capabilities;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        struct ctdb_node_map *nodemap=NULL;
        int i, ret;
-       int healthy_count = 0;
 
-       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn,
+                                  tmp_ctx, &nodemap);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return -1;
        }
 
-       capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
-       CTDB_NO_MEMORY(ctdb, capabilities);
-
-       ret = -1;
-       
-       /* collect capabilities for all connected nodes */
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
-                       continue;
-               }
-       
-               if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
-                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
+       for (i = 0; lvs_exclude_flags[i] != 0; i++) {
+               struct ctdb_node_map *t =
+                       filter_nodemap_by_flags(ctdb, nodemap,
+                                               lvs_exclude_flags[i]);
+               if (t == NULL) {
+                       /* No memory */
                        ret = -1;
                        goto done;
                }
-
-               if (!(capabilities[i] & CTDB_CAP_LVS)) {
-                       continue;
-               }
-
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY)) {
-                       healthy_count++;
-               }
-       }
-
-       /* find and show the lvsmaster */
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
-                       continue;
-               }
-               if (!(capabilities[i] & CTDB_CAP_LVS)) {
-                       continue;
-               }
-
-               if (healthy_count != 0) {
-                       if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
-                               continue;
+               if (t->num > 0) {
+                       struct ctdb_node_map *n;
+                       n = filter_nodemap_by_capabilities(ctdb,
+                                                          t,
+                                                          CTDB_CAP_LVS,
+                                                          true);
+                       if (n == NULL) {
+                               /* No memory */
+                               ret = -1;
+                               goto done;
+                       }
+                       if (n->num > 0) {
+                               ret = 0;
+                               if (options.machinereadable) {
+                                       printm("%d\n", n->nodes[0].pnn);
+                               } else {
+                                       printf("Node %d is LVS master\n", n->nodes[0].pnn);
+                               }
+                               goto done;
                        }
                }
-
-               if (options.machinereadable){
-                       printf("%d\n", i);
-               } else {
-                       printf("Node %d is LVS master\n", i);
-               }
-               ret = 0;
-               goto done;
+               talloc_free(t);
        }
 
        printf("There is no LVS master\n");
+       ret = 255;
 done:
-       ctdb_free_nodemap(nodemap);
+       talloc_free(tmp_ctx);
        return ret;
 }
 
@@ -3386,23 +3750,18 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
        const char *db_name;
        struct ctdb_db_context *ctdb_db;
        int ret;
-       bool persistent;
        struct ctdb_dump_db_context c;
+       uint8_t flags;
 
        if (argc < 1) {
                usage();
        }
 
-       db_name = argv[0];
-
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3472,22 +3831,17 @@ static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv
        const char *db_name;
        struct ctdb_db_context *ctdb_db;
        struct cattdb_data d;
-       bool persistent;
+       uint8_t flags;
 
        if (argc < 1) {
                usage();
        }
 
-       db_name = argv[0];
-
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3516,21 +3870,17 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
        struct ctdb_record_handle *h;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA key, data;
-       bool persistent;
+       uint8_t flags;
 
        if (argc < 2) {
                usage();
        }
 
-       db_name = argv[0];
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3547,10 +3897,10 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
                exit(10);
        }
 
-       printf("Data: size:%d ptr:[%s]\n", (int)data.dsize, data.dptr);
+       printf("Data: size:%d ptr:[%.*s]\n", (int)data.dsize, (int)data.dsize, data.dptr);
 
-       talloc_free(ctdb_db);
        talloc_free(tmp_ctx);
+       talloc_free(ctdb_db);
        return 0;
 }
 
@@ -3564,21 +3914,17 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
        struct ctdb_record_handle *h;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA key, data;
-       bool persistent;
+       uint8_t flags;
 
        if (argc < 3) {
                usage();
        }
 
-       db_name = argv[0];
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3603,8 +3949,8 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
        }
 
        talloc_free(h);
-       talloc_free(ctdb_db);
        talloc_free(tmp_ctx);
+       talloc_free(ctdb_db);
        return 0;
 }
 
@@ -3620,21 +3966,19 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
        TDB_DATA key, data;
        int fd, ret;
        bool persistent;
+       uint8_t flags;
 
        if (argc < 2) {
                talloc_free(tmp_ctx);
                usage();
        }
 
-       db_name = argv[0];
-
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
                talloc_free(tmp_ctx);
                return -1;
        }
 
+       persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
        if (!persistent) {
                DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
                talloc_free(tmp_ctx);
@@ -3642,7 +3986,6 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                talloc_free(tmp_ctx);
@@ -3678,10 +4021,10 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
                        talloc_free(tmp_ctx);
                        return -1;
                }
-               write(fd, data.dptr, data.dsize);
+               sys_write(fd, data.dptr, data.dsize);
                close(fd);
        } else {
-               write(1, data.dptr, data.dsize);
+               sys_write(1, data.dptr, data.dsize);
        }
 
        /* abort the transaction */
@@ -3742,16 +4085,16 @@ static int control_tfetch(struct ctdb_context *ctdb, int argc, const char **argv
                        return -1;
                }
                if (options.verbose){
-                       write(fd, data.dptr, data.dsize);
+                       sys_write(fd, data.dptr, data.dsize);
                } else {
-                       write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+                       sys_write(fd, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
                }
                close(fd);
        } else {
                if (options.verbose){
-                       write(1, data.dptr, data.dsize);
+                       sys_write(1, data.dptr, data.dsize);
                } else {
-                       write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
+                       sys_write(1, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
                }
        }
 
@@ -3766,8 +4109,9 @@ static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv
 {
        const char *tdb_file;
        TDB_CONTEXT *tdb;
-       TDB_DATA key, data;
+       TDB_DATA key, value, data;
        TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       struct ctdb_ltdb_header header;
 
        if (argc < 3) {
                usage();
@@ -3793,20 +4137,37 @@ static int control_tstore(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        if (!strncmp(argv[2], "0x", 2)) {
-               data = hextodata(tmp_ctx, argv[2] + 2);
-               if (data.dsize == 0) {
+               value = hextodata(tmp_ctx, argv[2] + 2);
+               if (value.dsize == 0) {
                        printf("Failed to convert \"%s\" into a TDB_DATA\n", argv[2]);
                        return -1;
                }
        } else {
-               data.dptr  = discard_const(argv[2]);
-               data.dsize = strlen(argv[2]);
+               value.dptr  = discard_const(argv[2]);
+               value.dsize = strlen(argv[2]);
+       }
+
+       ZERO_STRUCT(header);
+       if (argc > 3) {
+               header.rsn = atoll(argv[3]);
+       }
+       if (argc > 4) {
+               header.dmaster = atoi(argv[4]);
+       }
+       if (argc > 5) {
+               header.flags = atoi(argv[5]);
        }
 
-       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
-               printf("Not enough data. You must specify the full ctdb_ltdb_header too when storing\n");
+       data.dsize = sizeof(struct ctdb_ltdb_header) + value.dsize;
+       data.dptr = talloc_size(tmp_ctx, data.dsize);
+       if (data.dptr == NULL) {
+               printf("Failed to allocate header+value\n");
                return -1;
        }
+
+       *(struct ctdb_ltdb_header *)data.dptr = header;
+       memcpy(data.dptr + sizeof(struct ctdb_ltdb_header), value.dptr, value.dsize);
+
        if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
                printf("Failed to write record %s to tdb %s\n", argv[1], tdb_file);
                tdb_close(tdb);
@@ -3870,7 +4231,7 @@ static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv
                        talloc_free(tmp_ctx);
                        return -1;
                }
-               ret = read(fd, data.dptr, data.dsize);
+               ret = sys_read(fd, data.dptr, data.dsize);
                if (ret != data.dsize) {
                        DEBUG(DEBUG_ERR,("Failed to read %d bytes of record data\n", (int)data.dsize));
                        close(fd);
@@ -3918,13 +4279,227 @@ static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
+/*
+ * delete a record from a persistent database
+ */
+static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       const char *db_name;
+       struct ctdb_db_context *ctdb_db;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_transaction_handle *h;
+       TDB_DATA key;
+       int ret;
+       bool persistent;
+       uint8_t flags;
+
+       if (argc < 2) {
+               talloc_free(tmp_ctx);
+               usage();
+       }
+
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
+       if (!persistent) {
+               DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       h = ctdb_transaction_start(ctdb_db, tmp_ctx);
+       if (h == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       key.dptr = discard_const(argv[1]);
+       key.dsize = strlen(argv[1]);
+       ret = ctdb_transaction_store(h, key, tdb_null);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_transaction_commit(h);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+static const char *ptrans_parse_string(TALLOC_CTX *mem_ctx, const char *s,
+                                      TDB_DATA *data)
+{
+       const char *t;
+       size_t n;
+       const char *ret; /* Next byte after successfully parsed value */
+
+       /* Error, unless someone says otherwise */
+       ret = NULL;
+       /* Indicates no value to parse */
+       *data = tdb_null;
+
+       /* Skip whitespace */
+       n = strspn(s, " \t");
+       t = s + n;
+
+       if (t[0] == '"') {
+               /* Quoted ASCII string - no wide characters! */
+               t++;
+               n = strcspn(t, "\"");
+               if (t[n] == '"') {
+                       if (n > 0) {
+                               data->dsize = n;
+                               data->dptr = talloc_memdup(mem_ctx, t, n);
+                               CTDB_NOMEM_ABORT(data->dptr);
+                       }
+                       ret = t + n + 1;
+               } else {
+                       DEBUG(DEBUG_WARNING,("Unmatched \" in input %s\n", s));
+               }
+       } else {
+               DEBUG(DEBUG_WARNING,("Unsupported input format in %s\n", s));
+       }
+
+       return ret;
+}
+
+static bool ptrans_get_key_value(TALLOC_CTX *mem_ctx, FILE *file,
+                                TDB_DATA *key, TDB_DATA *value)
+{
+       char line [1024]; /* FIXME: make this more flexible? */
+       const char *t;
+       char *ptr;
+
+       ptr = fgets(line, sizeof(line), file);
+
+       if (ptr == NULL) {
+               return false;
+       }
+
+       /* Get key */
+       t = ptrans_parse_string(mem_ctx, line, key);
+       if (t == NULL || key->dptr == NULL) {
+               /* Line Ignored but not EOF */
+               return true;
+       }
+
+       /* Get value */
+       t = ptrans_parse_string(mem_ctx, t, value);
+       if (t == NULL) {
+               /* Line Ignored but not EOF */
+               talloc_free(key->dptr);
+               *key = tdb_null;
+               return true;
+       }
+
+       return true;
+}
+
+/*
+ * Update a persistent database as per file/stdin
+ */
+static int control_ptrans(struct ctdb_context *ctdb,
+                         int argc, const char **argv)
+{
+       const char *db_name;
+       struct ctdb_db_context *ctdb_db;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_transaction_handle *h;
+       TDB_DATA key, value;
+       FILE *file;
+       int ret;
+
+       if (argc < 1) {
+               talloc_free(tmp_ctx);
+               usage();
+       }
+
+       file = stdin;
+       if (argc == 2) {
+               file = fopen(argv[1], "r");
+               if (file == NULL) {
+                       DEBUG(DEBUG_ERR,("Unable to open file for reading '%s'\n", argv[1]));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+       }
+
+       db_name = argv[0];
+
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, true, 0);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
+               goto error;
+       }
+
+       h = ctdb_transaction_start(ctdb_db, tmp_ctx);
+       if (h == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", db_name));
+               goto error;
+       }
+
+       while (ptrans_get_key_value(tmp_ctx, file, &key, &value)) {
+               if (key.dsize != 0) {
+                       ret = ctdb_transaction_store(h, key, value);
+                       /* Minimise memory use */
+                       talloc_free(key.dptr);
+                       if (value.dptr != NULL) {
+                               talloc_free(value.dptr);
+                       }
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,("Failed to store record\n"));
+                               ctdb_transaction_cancel(h);
+                               goto error;
+                       }
+               }
+       }
+
+       ret = ctdb_transaction_commit(h);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
+               goto error;
+       }
+
+       if (file != stdin) {
+               fclose(file);
+       }
+       talloc_free(tmp_ctx);
+       return 0;
+
+error:
+       if (file != stdin) {
+               fclose(file);
+       }
+
+       talloc_free(tmp_ctx);
+       return -1;
+}
+
 /*
   check if a service is bound to a port or not
  */
 static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int s, ret;
-       unsigned v;
+       int v;
        int port;
         struct sockaddr_in sin;
 
@@ -3942,7 +4517,9 @@ static int control_chktcpport(struct ctdb_context *ctdb, int argc, const char **
        }
 
        v = fcntl(s, F_GETFL, 0);
-        fcntl(s, F_SETFL, v | O_NONBLOCK);
+       if (v == -1 || fcntl(s, F_SETFL, v | O_NONBLOCK) != 0) {
+               printf("Unable to set socket non-blocking: %s\n", strerror(errno));
+       }
 
        bzero(&sin, sizeof(sin));
        sin.sin_family = PF_INET;
@@ -3975,28 +4552,30 @@ static void log_handler(struct ctdb_context *ctdb, uint64_t srvid,
  */
 static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int ret;
-       int32_t res;
+       int ret, i;
+       bool main_daemon;
        struct ctdb_get_log_addr log_addr;
        TDB_DATA data;
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       char *errmsg;
        struct timeval tv;
 
-       if (argc != 1) {
-               DEBUG(DEBUG_ERR,("Invalid arguments\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       /* Process options */
+       main_daemon = true;
+       log_addr.pnn = ctdb_get_pnn(ctdb);
+       log_addr.level = DEBUG_NOTICE;
+       for (i = 0; i < argc; i++) {
+               if (strcmp(argv[i], "recoverd") == 0) {
+                       main_daemon = false;
+               } else {
+                       if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
+                               log_addr.level = get_debug_by_desc(argv[i]);
+                       } else {
+                               log_addr.level = strtol(argv[i], NULL, 0);
+                       }
+               }
        }
 
-       log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+       /* Our message port is our PID */
        log_addr.srvid = getpid();
-       if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
-               log_addr.level = get_debug_by_desc(argv[0]);
-       } else {
-               log_addr.level = strtol(argv[0], NULL, 0);
-       }
-
 
        data.dptr = (unsigned char *)&log_addr;
        data.dsize = sizeof(log_addr);
@@ -4008,24 +4587,36 @@ static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv
 
        DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
 
-       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
-                          0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
-       if (ret != 0 || res != 0) {
-               DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
+       if (main_daemon) {
+               int32_t res;
+               char *errmsg;
+               TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+
+               ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
+                                  0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
+               if (ret != 0 || res != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
                talloc_free(tmp_ctx);
-               return -1;
+       } else {
+               ret = ctdb_client_send_message(ctdb, options.pnn,
+                                              CTDB_SRVID_GETLOG, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
+                       return -1;
+               }
        }
 
-
        tv = timeval_current();
        /* this loop will terminate when we have received the reply */
-       while (timeval_elapsed(&tv) < 3.0) {    
+       while (timeval_elapsed(&tv) < (double)options.timelimit) {
                event_loop_once(ctdb->ev);
        }
 
        DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
 
-       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -4035,112 +4626,98 @@ static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv
 static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int ret;
-       int32_t res;
-       char *errmsg;
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-
-       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
-                          0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
-       if (ret != 0 || res != 0) {
-               DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
-
-static uint32_t reloadips_finished;
 
-static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
-{
-       reloadips_finished = 1;
-}
-
-static int reloadips_all(struct ctdb_context *ctdb)
-{
-       struct reloadips_all_reply rips;
-       struct ctdb_node_map *nodemap=NULL;
-       TDB_DATA data;
-       uint32_t recmaster;
-       int ret, i;
+       if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
+               /* "recoverd" not given - get logs from main daemon */
+               int32_t res;
+               char *errmsg;
+               TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
 
-       /* check that there are valid nodes available */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               return 1;
-       }
-       for (i=0; i<nodemap->num;i++) {
-               if (nodemap->nodes[i].flags != 0) {
-                       DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
-                       return 1;
+               ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
+                                  0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
+               if (ret != 0 || res != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
                }
-       }
-
-
-       rips.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-       if (rips.pnn == -1) {
-               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
-               return 1;
-       }
-       rips.srvid = getpid();
-
-
-       /* register a message port for receiveing the reply so that we
-          can receive the reply
-       */
-       ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
-
-       if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
-               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
-               return -1;
-       }
-
 
-       data.dptr = (uint8_t *)&rips;
-       data.dsize = sizeof(rips);
-
-       ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
-               return 1;
-       }
+               talloc_free(tmp_ctx);
+       } else {
+               TDB_DATA data; /* unused in recoverd... */
+               data.dsize = 0;
 
-       reloadips_finished = 0;
-       while (reloadips_finished == 0) {
-               event_loop_once(ctdb->ev);
+               ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
+                       return -1;
+               }
        }
 
        return 0;
 }
 
-/*
-  reload public ips on a specific node
- */
-static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
-{
-       int ret;
-       int32_t res;
-       char *errmsg;
+/* Reload public IPs on a specified nodes */
+static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
+{
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t *nodes;
+       uint32_t pnn_mode;
+       uint32_t timeout;
+       int ret;
 
-       if (options.pnn == CTDB_BROADCAST_ALL) {
-               return reloadips_all(ctdb);
+       assert_single_node_only();
+
+       if (argc > 1) {
+               usage();
        }
 
-       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
-                          0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
-       if (ret != 0 || res != 0) {
-               DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       /* Determine the nodes where IPs need to be reloaded */
+       if (!parse_nodestring(ctdb, tmp_ctx, argc == 1 ? argv[0] : NULL,
+                             options.pnn, true, &nodes, &pnn_mode)) {
+               ret = -1;
+               goto done;
+       }
+
+again:
+       /* Disable takeover runs on all connected nodes.  A reply
+        * indicating success is needed from each node so all nodes
+        * will need to be active.  This will retry until maxruntime
+        * is exceeded, hence no error handling.
+        * 
+        * A check could be added to not allow reloading of IPs when
+        * there are disconnected nodes.  However, this should
+        * probably be left up to the administrator.
+        */
+       timeout = LONGTIMEOUT;
+       srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
+                       "Disable takeover runs", true);
+
+       /* Now tell all the desired nodes to reload their public IPs.
+        * Keep trying this until it succeeds.  This assumes all
+        * failures are transient, which might not be true...
+        */
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
+                                     nodes, 0, LONGTIMELIMIT(),
+                                     false, tdb_null,
+                                     NULL, NULL, NULL) != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Unable to reload IPs on some nodes, try again.\n"));
+               goto again;
        }
 
+       /* It isn't strictly necessary to wait until takeover runs are
+        * re-enabled but doing so can't hurt.
+        */
+       timeout = 0;
+       srvid_broadcast(ctdb, CTDB_SRVID_DISABLE_TAKEOVER_RUNS, &timeout,
+                       "Enable takeover runs", true);
+
+       ipreallocate(ctdb);
+
+       ret = 0;
+done:
        talloc_free(tmp_ctx);
-       return 0;
+       return ret;
 }
 
 /*
@@ -4158,7 +4735,7 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
        }
 
        if(options.machinereadable){
-               printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
+               printm(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
                for(i=0;i<dbmap->num;i++){
                        const char *path;
                        const char *name;
@@ -4176,7 +4753,7 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
                        persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
                        readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
                        sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
-                       printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
+                       printm(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
                               dbmap->dbs[i].dbid, name, path,
                               !!(persistent), !!(sticky),
                               !!(health), !!(readonly));
@@ -4215,50 +4792,29 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
  */
 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int i, ret;
-       struct ctdb_dbid_map *dbmap=NULL;
        const char *db_name;
+       uint32_t db_id;
+       uint8_t flags;
+       const char *path;
+       const char *health;
 
        if (argc < 1) {
                usage();
        }
 
-       db_name = argv[0];
-
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return ret;
+       if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
+               return -1;
        }
 
-       for(i=0;i<dbmap->num;i++){
-               const char *path;
-               const char *name;
-               const char *health;
-               bool persistent;
-               bool readonly;
-               bool sticky;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
-               if (strcmp(name, db_name) != 0) {
-                       continue;
-               }
-
-               ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
-               ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
-               persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
-               readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
-               sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
-               printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
-                      dbmap->dbs[i].dbid, name, path,
-                      persistent?"yes":"no",
-                      sticky?"yes":"no",
-                      readonly?"yes":"no",
-                      health?health:"OK");
-               return 0;
-       }
+       ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
+       ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
+       printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
+              db_id, db_name, path,
+              (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
+              (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
+              (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
+              (health ? health : "OK"));
 
-       DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
        return 0;
 }
 
@@ -4270,14 +4826,14 @@ static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char *
 static int control_isnotrecmaster(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t mypnn, recmaster;
+       int ret;
 
-       mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), options.pnn);
-       if (mypnn == -1) {
-               printf("Failed to get pnn of node\n");
-               return 1;
-       }
+       assert_single_node_only();
 
-       if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
+       mypnn = getpnn(ctdb);
+
+       ret = ctdb_ctrl_getrecmaster(ctdb, ctdb, TIMELIMIT(), options.pnn, &recmaster);
+       if (ret != 0) {
                printf("Failed to get the recmaster\n");
                return 1;
        }
@@ -4310,6 +4866,49 @@ static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
 }
 
 
+/*
+  get a node's runstate
+ */
+static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       int ret;
+       enum ctdb_runstate runstate;
+
+       ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
+       if (ret == -1) {
+               printf("Unable to get runstate response from node %u\n",
+                      options.pnn);
+               return -1;
+       } else {
+               bool found = true;
+               enum ctdb_runstate t;
+               int i;
+               for (i=0; i<argc; i++) {
+                       found = false;
+                       t = runstate_from_string(argv[i]);
+                       if (t == CTDB_RUNSTATE_UNKNOWN) {
+                               printf("Invalid run state (%s)\n", argv[i]);
+                               return -1;
+                       }
+
+                       if (t == runstate) {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (!found) {
+                       printf("CTDB not in required run state (got %s)\n", 
+                              runstate_to_string((enum ctdb_runstate)runstate));
+                       return -1;
+               }
+       }
+
+       printf("%s\n", runstate_to_string(runstate));
+       return 0;
+}
+
+
 /*
   get a tunable
  */
@@ -4325,7 +4924,7 @@ static int control_getvar(struct ctdb_context *ctdb, int argc, const char **argv
 
        name = argv[0];
        ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(), options.pnn, name, &value);
-       if (ret == -1) {
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get tunable variable '%s'\n", name));
                return -1;
        }
@@ -4355,6 +4954,11 @@ static int control_setvar(struct ctdb_context *ctdb, int argc, const char **argv
                DEBUG(DEBUG_ERR, ("Unable to set tunable variable '%s'\n", name));
                return -1;
        }
+       if (ret == 1) {
+               DEBUG(DEBUG_WARNING,
+                     ("Setting obsolete tunable variable '%s'\n",
+                      name));
+       }
        return 0;
 }
 
@@ -4396,8 +5000,8 @@ static int control_getdebug(struct ctdb_context *ctdb, int argc, const char **ar
                return ret;
        } else {
                if (options.machinereadable){
-                       printf(":Name:Level:\n");
-                       printf(":%s:%d:\n",get_debug_by_level(level),level);
+                       printm(":Name:Level:\n");
+                       printm(":%s:%d:\n",get_debug_by_level(level),level);
                } else {
                        printf("Node %u is at debug level %s (%d)\n", options.pnn, get_debug_by_level(level), level);
                }
@@ -4420,7 +5024,7 @@ static int control_getreclock(struct ctdb_context *ctdb, int argc, const char **
        } else {
                if (options.machinereadable){
                        if (reclock != NULL) {
-                               printf("%s", reclock);
+                               printm("%s", reclock);
                        }
                } else {
                        if (reclock == NULL) {
@@ -4643,6 +5247,109 @@ static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
+/*
+ * detach from a database
+ */
+static int control_detach(struct ctdb_context *ctdb, int argc,
+                         const char **argv)
+{
+       uint32_t db_id;
+       uint8_t flags;
+       int ret, i, status = 0;
+       struct ctdb_node_map *nodemap = NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t recmode;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       assert_single_node_only();
+
+       ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, TIMELIMIT(), options.pnn,
+                                   &recmode);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Database cannot be detached "
+                                 "when recovery is active\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
+                                  &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n",
+                                 options.pnn));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       for (i=0; i<nodemap->num; i++) {
+               uint32_t value;
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
+                       continue;
+               }
+
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       DEBUG(DEBUG_ERR, ("Database cannot be detached on "
+                                         "inactive (stopped or banned) node "
+                                         "%u\n", nodemap->nodes[i].pnn));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               ret = ctdb_ctrl_get_tunable(ctdb, TIMELIMIT(),
+                                           nodemap->nodes[i].pnn,
+                                           "AllowClientDBAttach",
+                                           &value);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get tunable "
+                                         "AllowClientDBAttach from node %u\n",
+                                          nodemap->nodes[i].pnn));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               if (value == 1) {
+                       DEBUG(DEBUG_ERR, ("Database access is still active on "
+                                         "node %u. Set AllowClientDBAttach=0 "
+                                         "on all nodes.\n",
+                                         nodemap->nodes[i].pnn));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+       }
+
+       talloc_free(tmp_ctx);
+
+       for (i=0; i<argc; i++) {
+               if (!db_exists(ctdb, argv[i], &db_id, NULL, &flags)) {
+                       continue;
+               }
+
+               if (flags & CTDB_DB_FLAGS_PERSISTENT) {
+                       DEBUG(DEBUG_ERR, ("Persistent database '%s' "
+                                         "cannot be detached\n", argv[i]));
+                       status = -1;
+                       continue;
+               }
+
+               ret = ctdb_detach(ctdb, db_id);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Database '%s' detach failed\n",
+                                         argv[i]));
+                       status = ret;
+               }
+       }
+
+       return status;
+}
+
 /*
   set db priority
  */
@@ -4679,7 +5386,9 @@ static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **a
                usage();
        }
 
-       db_id = strtoul(argv[0], NULL, 0);
+       if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
+               return -1;
+       }
 
        ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
        if (ret != 0) {
@@ -4699,38 +5408,14 @@ static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char *
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        uint32_t db_id;
-       struct ctdb_dbid_map *dbmap=NULL;
-       int i, ret;
+       int ret;
 
        if (argc < 1) {
                usage();
        }
 
-       if (!strncmp(argv[0], "0x", 2)) {
-               db_id = strtoul(argv[0] + 2, NULL, 0);
-       } else {
-               ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
-               for(i=0;i<dbmap->num;i++){
-                       const char *name;
-
-                       ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-                       if(!strcmp(argv[0], name)){
-                               talloc_free(discard_const(name));
-                               break;
-                       }
-                       talloc_free(discard_const(name));
-               }
-               if (i == dbmap->num) {
-                       DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-               db_id = dbmap->dbs[i].dbid;
+       if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
+               return -1;
        }
 
        ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
@@ -4751,38 +5436,14 @@ static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        uint32_t db_id;
-       struct ctdb_dbid_map *dbmap=NULL;
-       int i, ret;
+       int ret;
 
        if (argc < 1) {
                usage();
        }
 
-       if (!strncmp(argv[0], "0x", 2)) {
-               db_id = strtoul(argv[0] + 2, NULL, 0);
-       } else {
-               ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
-               for(i=0;i<dbmap->num;i++){
-                       const char *name;
-
-                       ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-                       if(!strcmp(argv[0], name)){
-                               talloc_free(discard_const(name));
-                               break;
-                       }
-                       talloc_free(discard_const(name));
-               }
-               if (i == dbmap->num) {
-                       DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-               db_id = dbmap->dbs[i].dbid;
+       if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
+               return -1;
        }
 
        ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
@@ -4801,18 +5462,20 @@ static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char
  */
 static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       bool ret;
        uint32_t db_id;
        uint64_t seqnum;
+       int ret;
 
        if (argc < 1) {
                usage();
        }
 
-       db_id = strtoul(argv[0], NULL, 0);
+       if (!db_exists(ctdb, argv[0], &db_id, NULL, NULL)) {
+               return -1;
+       }
 
-       ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
-       if (!ret) {
+       ret = ctdb_ctrl_getdbseqnum(ctdb, TIMELIMIT(), options.pnn, db_id, &seqnum);
+       if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
                return -1;
        }
@@ -4903,8 +5566,8 @@ static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
  */
 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int i, ret;
-       struct ctdb_dbid_map *dbmap=NULL;
+       const char *db_name;
+       int ret;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        struct db_file_header dbhdr;
        struct ctdb_db_context *ctdb_db;
@@ -4912,36 +5575,22 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
        int fh = -1;
        int status = -1;
        const char *reason = NULL;
+       uint32_t db_id;
+       uint8_t flags;
+
+       assert_single_node_only();
 
        if (argc != 2) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
                return -1;
        }
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return ret;
-       }
-
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-               if(!strcmp(argv[0], name)){
-                       talloc_free(discard_const(name));
-                       break;
-               }
-               talloc_free(discard_const(name));
-       }
-       if (i == dbmap->num) {
-               DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-               talloc_free(tmp_ctx);
+       if (!db_exists(ctdb, argv[0], &db_id, &db_name, &flags)) {
                return -1;
        }
 
        ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
-                                   dbmap->dbs[i].dbid, tmp_ctx, &reason);
+                                   db_id, tmp_ctx, &reason);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
                                 argv[0]));
@@ -4972,7 +5621,7 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
                                     allow_unhealthy));
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
                talloc_free(tmp_ctx);
@@ -5022,21 +5671,22 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
                return -1;
        }
 
+       ZERO_STRUCT(dbhdr);
        dbhdr.version = DB_VERSION;
        dbhdr.timestamp = time(NULL);
-       dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+       dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
        dbhdr.size = bd->len;
        if (strlen(argv[0]) >= MAX_DB_NAME) {
                DEBUG(DEBUG_ERR,("Too long dbname\n"));
                goto done;
        }
-       strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME);
-       ret = write(fh, &dbhdr, sizeof(dbhdr));
+       strncpy(discard_const(dbhdr.name), argv[0], MAX_DB_NAME-1);
+       ret = sys_write(fh, &dbhdr, sizeof(dbhdr));
        if (ret == -1) {
                DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
                goto done;
        }
-       ret = write(fh, bd->records, bd->len);
+       ret = sys_write(fh, bd->records, bd->len);
        if (ret == -1) {
                DEBUG(DEBUG_ERR,("write failed: %s\n", strerror(errno)));
                goto done;
@@ -5078,6 +5728,8 @@ static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **a
        char tbuf[100];
        char *dbname;
 
+       assert_single_node_only();
+
        if (argc < 1 || argc > 2) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
                return -1;
@@ -5090,9 +5742,10 @@ static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **a
                return -1;
        }
 
-       read(fh, &dbhdr, sizeof(dbhdr));
+       sys_read(fh, &dbhdr, sizeof(dbhdr));
        if (dbhdr.version != DB_VERSION) {
                DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
+               close(fh);
                talloc_free(tmp_ctx);
                return -1;
        }
@@ -5110,7 +5763,7 @@ static int control_restoredb(struct ctdb_context *ctdb, int argc, const char **a
                talloc_free(tmp_ctx);
                return -1;
        }               
-       read(fh, outdata.dptr, outdata.dsize);
+       sys_read(fh, outdata.dptr, outdata.dsize);
        close(fh);
 
        tm = localtime(&dbhdr.timestamp);
@@ -5271,6 +5924,8 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
        struct ctdb_marshall_buffer *m;
        struct ctdb_dump_db_context c;
 
+       assert_single_node_only();
+
        if (argc != 1) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
                return -1;
@@ -5283,9 +5938,10 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
                return -1;
        }
 
-       read(fh, &dbhdr, sizeof(dbhdr));
+       sys_read(fh, &dbhdr, sizeof(dbhdr));
        if (dbhdr.version != DB_VERSION) {
                DEBUG(DEBUG_ERR,("Invalid version of database dump. File is version %lu but expected version was %u\n", dbhdr.version, DB_VERSION));
+               close(fh);
                talloc_free(tmp_ctx);
                return -1;
        }
@@ -5298,7 +5954,7 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
                talloc_free(tmp_ctx);
                return -1;
        }
-       read(fh, outdata.dptr, outdata.dsize);
+       sys_read(fh, outdata.dptr, outdata.dsize);
        close(fh);
        m = (struct ctdb_marshall_buffer *)outdata.dptr;
 
@@ -5337,6 +5993,7 @@ static int control_dumpdbbackup(struct ctdb_context *ctdb, int argc, const char
 static int control_wipedb(struct ctdb_context *ctdb, int argc,
                          const char **argv)
 {
+       const char *db_name;
        int ret;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA data;
@@ -5347,40 +6004,20 @@ static int control_wipedb(struct ctdb_context *ctdb, int argc,
        struct ctdb_control_wipe_database w;
        uint32_t *nodes;
        uint32_t generation;
-       struct ctdb_dbid_map *dbmap = NULL;
+       uint8_t flags;
+
+       assert_single_node_only();
 
        if (argc != 1) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
                return -1;
        }
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
-                                &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
-                                 options.pnn));
-               return ret;
-       }
-
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
-                                   dbmap->dbs[i].dbid, tmp_ctx, &name);
-               if(!strcmp(argv[0], name)){
-                       talloc_free(discard_const(name));
-                       break;
-               }
-               talloc_free(discard_const(name));
-       }
-       if (i == dbmap->num) {
-               DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
-                                 argv[0]));
-               talloc_free(tmp_ctx);
+       if (!db_exists(ctdb, argv[0], NULL, &db_name, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
                                  argv[0]));
@@ -5528,7 +6165,7 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
                talloc_free(tmp_ctx);
                return -1;
        }
-       write(1, data.dptr, data.dsize);
+       sys_write(1, data.dptr, data.dsize);
        talloc_free(tmp_ctx);
        return 0;
 }
@@ -5539,7 +6176,7 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
                             TDB_DATA data, void *private_data)
 {
-       write(1, data.dptr, data.dsize);
+       sys_write(1, data.dptr, data.dsize);
        exit(0);
 }
 
@@ -5550,13 +6187,9 @@ static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char
 {
        int ret;
        TDB_DATA data;
-       struct rd_memdump_reply rd;
+       struct srvid_request rd;
 
-       rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-       if (rd.pnn == -1) {
-               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
-               return -1;
-       }
+       rd.pnn = ctdb_get_pnn(ctdb);
        rd.srvid = getpid();
 
        /* register a message port for receiveing the reply so that we
@@ -5655,6 +6288,8 @@ static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **a
        struct pnn_node *pnn_nodes;
        struct pnn_node *pnn_node;
 
+       assert_single_node_only();
+
        pnn_nodes = read_nodes_file(mem_ctx);
        if (pnn_nodes == NULL) {
                DEBUG(DEBUG_ERR,("Failed to read nodes file\n"));
@@ -5663,16 +6298,11 @@ static int control_listnodes(struct ctdb_context *ctdb, int argc, const char **a
        }
 
        for(pnn_node=pnn_nodes;pnn_node;pnn_node=pnn_node->next) {
-               ctdb_sock_addr addr;
-               if (parse_ip(pnn_node->addr, NULL, 63999, &addr) == 0) {
-                       DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s' in nodes file\n", pnn_node->addr));
-                       talloc_free(mem_ctx);
-                       return -1;
-               }
+               const char *addr = ctdb_addr_to_str(&pnn_node->addr);
                if (options.machinereadable){
-                       printf(":%d:%s:\n", pnn_node->pnn, pnn_node->addr);
+                       printm(":%d:%s:\n", pnn_node->pnn, addr);
                } else {
-                       printf("%s\n", pnn_node->addr);
+                       printf("%s\n", addr);
                }
        }
        talloc_free(mem_ctx);
@@ -5689,11 +6319,9 @@ static int control_reload_nodes_file(struct ctdb_context *ctdb, int argc, const
        int mypnn;
        struct ctdb_node_map *nodemap=NULL;
 
-       mypnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-       if (mypnn == -1) {
-               DEBUG(DEBUG_ERR, ("Failed to read pnn of local node\n"));
-               return -1;
-       }
+       assert_single_node_only();
+
+       mypnn = ctdb_get_pnn(ctdb);
 
        ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
        if (ret != 0) {
@@ -5736,12 +6364,11 @@ static const struct {
        const char *msg;
        const char *args;
 } ctdb_commands[] = {
-#ifdef CTDB_VERS
-       { "version",         control_version,           true,   false,  "show version of ctdb" },
-#endif
+       { "version",         control_version,           true,   true,   "show version of ctdb" },
        { "status",          control_status,            true,   false,  "show node status" },
        { "uptime",          control_uptime,            true,   false,  "show node uptime" },
        { "ping",            control_ping,              true,   false,  "ping all nodes" },
+       { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
        { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
        { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
        { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
@@ -5754,9 +6381,9 @@ static const struct {
        { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
        { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
        { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
-       { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
-       { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
-       { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
+       { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
+       { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
+       { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
        { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
        { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
        { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
@@ -5766,9 +6393,10 @@ static const struct {
        { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
        { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
        { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
-       { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
-       { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
+       { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
+       { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
        { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
+       { "detach",          control_detach,            false,  false,  "detach from a database",                 "<dbname|dbid> [<dbname|dbid> ...]" },
        { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
        { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
        { "getpid",          control_getpid,            true,   false,  "get ctdbd process ID" },
@@ -5776,16 +6404,16 @@ static const struct {
        { "enable",          control_enable,            true,   false,  "enable a nodes public IP" },
        { "stop",            control_stop,              true,   false,  "stop a node" },
        { "continue",        control_continue,          true,   false,  "re-start a stopped node" },
-       { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime|0>"},
+       { "ban",             control_ban,               true,   false,  "ban a node from the cluster",          "<bantime>"},
        { "unban",           control_unban,             true,   false,  "unban a node" },
        { "showban",         control_showban,           true,   false,  "show ban information"},
        { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
        { "recover",         control_recover,           true,   false,  "force recovery" },
-       { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
-       { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
+       { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
+       { "ipreallocate",    control_ipreallocate,      false,  false,  "force the recovery daemon to perform a ip reallocation procedure" },
        { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
        { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
-       { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "<srcip:port> <dstip:port>" },
+       { "killtcp",         kill_tcp,                  false,  false, "kill a tcp connection.", "[<srcip:port> <dstip:port>]" },
        { "gratiousarp",     control_gratious_arp,      false,  false, "send a gratious arp", "<ip> <interface>" },
        { "tickle",          tickle_tcp,                false,  false, "send a tcp tickle ack", "<srcip:port> <dstip:port>" },
        { "gettickles",      control_get_tickles,       false,  false, "get the list of tickles registered for this ip", "<ip> [<port>]" },
@@ -5798,7 +6426,6 @@ static const struct {
        { "chksrvid",        chksrvid,                  false,  false, "check if a server id exists", "<pnn> <type> <id>" },
        { "getsrvids",       getsrvids,                 false,  false, "get a list of all server ids"},
        { "check_srvids",    check_srvids,              false,  false, "check if a srvid exists", "<id>+" },
-       { "vacuum",          ctdb_vacuum,               false,  true, "vacuum the databases of empty records", "[max_records]"},
        { "repack",          ctdb_repack,               false,  false, "repack all databases", "[max_freelist]"},
        { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
        { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
@@ -5807,41 +6434,42 @@ static const struct {
        { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
        { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
        { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
-       { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
+       { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
        { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
        { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
-       { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
+       { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
        { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
        { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
-       { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
-       { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
-       { "natgwlist",        control_natgwlist,        false,  false, "show the nodes belonging to this natgw configuration"},
-       { "xpnn",             control_xpnn,             true,   true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
-       { "getreclock",       control_getreclock,       false,  false, "Show the reclock file of a node"},
-       { "setreclock",       control_setreclock,       false,  false, "Set/clear the reclock file of a node", "[filename]"},
+       { "enablescript",     control_enablescript,  true,      false, "enable an eventscript", "<script>"},
+       { "disablescript",    control_disablescript,  true,     false, "disable an eventscript", "<script>"},
+       { "natgwlist",        control_natgwlist,        true,   false, "show the nodes belonging to this natgw configuration"},
+       { "xpnn",             control_xpnn,             false,  true,  "find the pnn of the local node without talking to the daemon (unreliable)" },
+       { "getreclock",       control_getreclock,       true,   false, "Show the reclock file of a node"},
+       { "setreclock",       control_setreclock,       true,   false, "Set/clear the reclock file of a node", "[filename]"},
        { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
        { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
        { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
-       { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
-       { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
-       { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>|<name>"},
-       { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbid>|<name>"},
+       { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
+       { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
+       { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
+       { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
        { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
        { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
-       { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
-       { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
-       { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
+       { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
+       { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
+       { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
+       { "ptrans",          control_ptrans,            false,  false,  "update a persistent database (from stdin)", "<dbname|dbid>" },
        { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
-       { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
-       { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
-       { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
+       { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data> [<rsn> <dmaster> <flags>]" },
+       { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<dbname|dbid> <key>" },
+       { "writekey",        control_writekey,          true,   false,  "write to a database key", "<dbname|dbid> <key> <value>" },
        { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
-       { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
-       { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
-       { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
-       { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<db>" },
-       { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
-       { "ipiface",         control_ipiface,           true,   true,  "Find which interface an ip address is hsoted on", "<ip>" },
+       { "rebalancenode",     control_rebalancenode,   false,  false, "mark nodes as forced IP rebalancing targets", "[<pnn-list>]"},
+       { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
+       { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status", "[<pnn-list>]" },
+       { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
+       { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on specified nodes" , "[<pnn-list>]" },
+       { "ipiface",         control_ipiface,           false,  true,  "Find which interface an ip address is hosted on", "<ip>" },
 };
 
 /*
@@ -5854,7 +6482,8 @@ static void usage(void)
 "Usage: ctdb [options] <control>\n" \
 "Options:\n" \
 "   -n <node>          choose node number, or 'all' (defaults to local node)\n"
-"   -Y                 generate machinereadable output\n"
+"   -Y                 generate machine readable output\n"
+"   -x <char>          specify delimiter for machine readable output\n"
 "   -v                 generate verbose output\n"
 "   -t <timelimit>     set timelimit for control in seconds (default %u)\n", options.timelimit);
        printf("Controls:\n");
@@ -5881,12 +6510,15 @@ int main(int argc, const char *argv[])
 {
        struct ctdb_context *ctdb;
        char *nodestring = NULL;
+       int machineparsable = 0;
        struct poptOption popt_options[] = {
                POPT_AUTOHELP
                POPT_CTDB_CMDLINE
                { "timelimit", 't', POPT_ARG_INT, &options.timelimit, 0, "timelimit", "integer" },
                { "node",      'n', POPT_ARG_STRING, &nodestring, 0, "node", "integer|all" },
-               { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machinereadable output", NULL },
+               { "machinereadable", 'Y', POPT_ARG_NONE, &options.machinereadable, 0, "enable machine readable output", NULL },
+               { NULL, 'x', POPT_ARG_STRING, &options.machineseparator, 0, "specify separator for machine readable output", "char" },
+               { NULL, 'X', POPT_ARG_NONE, &machineparsable, 0, "enable machine parsable output with separator |", NULL },
                { "verbose",    'v', POPT_ARG_NONE, &options.verbose, 0, "enable verbose output", NULL },
                { "maxruntime", 'T', POPT_ARG_INT, &options.maxruntime, 0, "die if runtime exceeds this limit (in seconds)", "integer" },
                { "print-emptyrecords", 0, POPT_ARG_NONE, &options.printemptyrecords, 0, "print the empty records when dumping databases (catdb, cattdb, dumpdbbackup)", NULL },
@@ -5903,13 +6535,12 @@ int main(int argc, const char *argv[])
        poptContext pc;
        struct event_context *ev;
        const char *control;
-       const char *socket_name;
 
        setlinebuf(stdout);
        
        /* set some defaults */
        options.maxruntime = 0;
-       options.timelimit = 3;
+       options.timelimit = 10;
        options.pnn = CTDB_CURRENT_NODE;
 
        pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
@@ -5945,11 +6576,31 @@ int main(int argc, const char *argv[])
                }
        }
 
+       if (machineparsable) {
+               options.machineseparator = "|";
+       }
+       if (options.machineseparator != NULL) {
+               if (strlen(options.machineseparator) != 1) {
+                       printf("Invalid separator \"%s\" - "
+                              "must be single character\n",
+                              options.machineseparator);
+                       exit(1);
+               }
+
+               /* -x implies -Y */
+               options.machinereadable = true;
+       } else if (options.machinereadable) {
+               options.machineseparator = ":";
+       }
+
        signal(SIGALRM, ctdb_alarm);
        alarm(options.maxruntime);
 
        control = extra_argv[0];
 
+       /* Default value for CTDB_BASE - don't override */
+       setenv("CTDB_BASE", ETCDIR "/ctdb", 0);
+
        ev = event_context_init(NULL);
        if (!ev) {
                DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
@@ -5972,7 +6623,6 @@ int main(int argc, const char *argv[])
                        DEBUG(DEBUG_ERR, ("Can't specify node(s) with \"ctdb %s\"\n", control));
                        exit(1);
                }
-               close(2);
                return ctdb_commands[i].fn(NULL, extra_argc-1, extra_argv+1);
        }
 
@@ -5980,21 +6630,19 @@ int main(int argc, const char *argv[])
        ctdb = ctdb_cmdline_client(ev, TIMELIMIT());
 
        if (ctdb == NULL) {
+               uint32_t pnn;
                DEBUG(DEBUG_ERR, ("Failed to init ctdb\n"));
-               exit(1);
-       }
 
-       /* initialize a libctdb connection as well */
-       socket_name = ctdb_get_socketname(ctdb);
-       ctdb_connection = ctdb_connect(socket_name,
-                                      ctdb_log_file, stderr);
-       if (ctdb_connection == NULL) {
-               DEBUG(DEBUG_ERR, ("Failed to connect to daemon from libctdb\n"));
+               pnn = find_node_xpnn();
+               if (pnn == -1) {
+                       DEBUG(DEBUG_ERR,
+                             ("Is this node part of a CTDB cluster?\n"));
+               }
                exit(1);
-       }                               
+       }
 
        /* setup the node number(s) to contact */
-       if (!parse_nodestring(ctdb, nodestring, CTDB_CURRENT_NODE, false,
+       if (!parse_nodestring(ctdb, ctdb, nodestring, CTDB_CURRENT_NODE, false,
                              &options.nodes, &options.pnn)) {
                usage();
        }
@@ -6017,7 +6665,6 @@ int main(int argc, const char *argv[])
                ret = ctdb_commands[i].fn(ctdb, extra_argc-1, extra_argv+1);
        }
 
-       ctdb_disconnect(ctdb_connection);
        talloc_free(ctdb);
        talloc_free(ev);
        (void)poptFreeContext(pc);