ctdbd: Add new runstate CTDB_RUNSTATE_FIRST_RECOVERY
[obnox/ctdb.git] / tools / ctdb.c
index 3f112072732f21f73518da7ba334959d241550c1..48ca93ba0eecf4a2e8e00fd8c30a3a9a371c4486 100644 (file)
 */
 
 #include "includes.h"
-#include "lib/tevent/tevent.h"
 #include "system/time.h"
 #include "system/filesys.h"
 #include "system/network.h"
 #include "system/locale.h"
 #include "popt.h"
 #include "cmdline.h"
+#include "../include/ctdb_version.h"
 #include "../include/ctdb.h"
 #include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
@@ -57,15 +57,11 @@ static struct {
 #define TIMELIMIT() timeval_current_ofs(options.timelimit, 0)
 #define LONGTIMELIMIT() timeval_current_ofs(options.timelimit*10, 0)
 
-#ifdef CTDB_VERS
 static int control_version(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-#define STR(x) #x
-#define XSTR(x) STR(x)
-       printf("CTDB version: %s\n", XSTR(CTDB_VERS));
+       printf("CTDB version: %s\n", CTDB_VERSION_STRING);
        return 0;
 }
-#endif
 
 #define CTDB_NOMEM_ABORT(p) do { if (!(p)) {                           \
                DEBUG(DEBUG_ALERT,("ctdb fatal error: %s\n",            \
@@ -245,30 +241,57 @@ static bool parse_nodestring(struct ctdb_context *ctdb,
 /*
  check if a database exists
 */
-static int db_exists(struct ctdb_context *ctdb, const char *db_name, bool *persistent)
+static bool db_exists(struct ctdb_context *ctdb, const char *dbarg, uint32_t *dbid, uint8_t *flags)
 {
        int i, ret;
        struct ctdb_dbid_map *dbmap=NULL;
+       bool dbid_given = false, found = false;
+       uint32_t id;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
+       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return -1;
+               goto fail;
        }
 
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
+       if (strncmp(dbarg, "0x", 2) == 0) {
+               id = strtoul(dbarg, NULL, 0);
+               dbid_given = true;
+       }
 
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
-               if (!strcmp(name, db_name)) {
-                       if (persistent) {
-                               *persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+       for(i=0; i<dbmap->num; i++) {
+               if (dbid_given) {
+                       if (id == dbmap->dbs[i].dbid) {
+                               found = true;
+                               break;
+                       }
+               } else {
+                       const char *name;
+                       ret = ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR, ("Unable to get dbname from dbid %u\n", dbmap->dbs[i].dbid));
+                               goto fail;
+                       }
+
+                       if (strcmp(name, dbarg) == 0) {
+                               id = dbmap->dbs[i].dbid;
+                               found = true;
+                               break;
                        }
-                       return 0;
                }
        }
 
-       return -1;
+       if (found) {
+               if (dbid) *dbid = id;
+               if (flags) *flags = dbmap->dbs[i].flags;
+       } else {
+               DEBUG(DEBUG_ERR,("No database matching '%s' found\n", dbarg));
+       }
+
+fail:
+       talloc_free(tmp_ctx);
+       return found;
 }
 
 /*
@@ -335,10 +358,12 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                STATISTICS_FIELD(timeouts.call),
                STATISTICS_FIELD(timeouts.control),
                STATISTICS_FIELD(timeouts.traverse),
+               STATISTICS_FIELD(locks.num_calls),
+               STATISTICS_FIELD(locks.num_current),
+               STATISTICS_FIELD(locks.num_pending),
+               STATISTICS_FIELD(locks.num_failed),
                STATISTICS_FIELD(total_calls),
                STATISTICS_FIELD(pending_calls),
-               STATISTICS_FIELD(lockwait_calls),
-               STATISTICS_FIELD(pending_lockwait_calls),
                STATISTICS_FIELD(childwrite_calls),
                STATISTICS_FIELD(pending_childwrite_calls),
                STATISTICS_FIELD(memory_used),
@@ -411,11 +436,6 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                printf("%.6f:", s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0);
                printf("%.6f:", s->call_latency.max);
 
-               printf("%d:", s->lockwait_latency.num);
-               printf("%.6f:", s->lockwait_latency.min);
-               printf("%.6f:", s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0);
-               printf("%.6f:", s->lockwait_latency.max);
-
                printf("%d:", s->childwrite_latency.num);
                printf("%.6f:", s->childwrite_latency.min);
                printf("%.6f:", s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0);
@@ -442,17 +462,23 @@ static void show_statistics(struct ctdb_statistics *s, int show_header)
                               preflen?0:4, "",
                               *(uint32_t *)(fields[i].offset+(uint8_t *)s));
                }
-               printf("Max hop count buckets:");
-               for (i=0;i<MAX_HOP_COUNT_BUCKETS;i++) {
+               printf(" hop_count_buckets:");
+               for (i=0;i<MAX_COUNT_BUCKETS;i++) {
                        printf(" %d", s->hop_count_bucket[i]);
                }
                printf("\n");
-               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd       MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
+               printf(" lock_buckets:");
+               for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+                       printf(" %d", s->locks.buckets[i]);
+               }
+               printf("\n");
+               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "locks_latency      MIN/AVG/MAX", s->locks.latency.min, s->locks.latency.num?s->locks.latency.total/s->locks.latency.num:0.0, s->locks.latency.max, s->locks.latency.num);
+
+               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_ctdbd      MIN/AVG/MAX", s->reclock.ctdbd.min, s->reclock.ctdbd.num?s->reclock.ctdbd.total/s->reclock.ctdbd.num:0.0, s->reclock.ctdbd.max, s->reclock.ctdbd.num);
 
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "reclock_recd       MIN/AVG/MAX", s->reclock.recd.min, s->reclock.recd.num?s->reclock.recd.total/s->reclock.recd.num:0.0, s->reclock.recd.max, s->reclock.recd.num);
 
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "call_latency       MIN/AVG/MAX", s->call_latency.min, s->call_latency.num?s->call_latency.total/s->call_latency.num:0.0, s->call_latency.max, s->call_latency.num);
-               printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "lockwait_latency   MIN/AVG/MAX", s->lockwait_latency.min, s->lockwait_latency.num?s->lockwait_latency.total/s->lockwait_latency.num:0.0, s->lockwait_latency.max, s->lockwait_latency.num);
                printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n", "childwrite_latency MIN/AVG/MAX", s->childwrite_latency.min, s->childwrite_latency.num?s->childwrite_latency.total/s->childwrite_latency.num:0.0, s->childwrite_latency.max, s->childwrite_latency.num);
        }
 
@@ -493,8 +519,6 @@ static int control_statistics_all(struct ctdb_context *ctdb)
                        MAX(statistics.max_hop_count, s1.max_hop_count);
                statistics.call_latency.max = 
                        MAX(statistics.call_latency.max, s1.call_latency.max);
-               statistics.lockwait_latency.max = 
-                       MAX(statistics.lockwait_latency.max, s1.lockwait_latency.max);
        }
        talloc_free(nodes);
        printf("Gathered statistics for %u nodes\n", num_nodes);
@@ -577,46 +601,62 @@ static int control_stats(struct ctdb_context *ctdb, int argc, const char **argv)
 static int control_dbstatistics(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       struct ctdb_db_statistics *dbstatistics;
-       struct ctdb_dbid_map *dbmap=NULL;
-       int i, ret;
+       struct ctdb_db_statistics *dbstat;
+       int i;
+       uint32_t db_id;
 
        if (argc < 1) {
                usage();
        }
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return ret;
-       }
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-               if(!strcmp(argv[0], name)){
-                       talloc_free(discard_const(name));
-                       break;
-               }
-               talloc_free(discard_const(name));
-       }
-       if (i == dbmap->num) {
-               DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-               talloc_free(tmp_ctx);
+       if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
                return -1;
        }
 
-       if (!ctdb_getdbstat(ctdb_connection, options.pnn, dbmap->dbs[i].dbid, &dbstatistics)) {
+       if (!ctdb_getdbstat(ctdb_connection, options.pnn, db_id, &dbstat)) {
                DEBUG(DEBUG_ERR,("Failed to read db statistics from node\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
        printf("DB Statistics:\n");
-       printf("RO Delegations: %d\n", dbstatistics->db_ro_delegations);
-       printf("RO Revokes:     %d\n", dbstatistics->db_ro_revokes);
+       printf(" %*s%-22s%*s%10u\n", 0, "", "ro_delegations", 4, "",
+               dbstat->db_ro_delegations);
+       printf(" %*s%-22s%*s%10u\n", 0, "", "ro_revokes", 4, "",
+               dbstat->db_ro_delegations);
+       printf(" %s\n", "locks");
+       printf(" %*s%-22s%*s%10u\n", 4, "", "total", 0, "",
+               dbstat->locks.num_calls);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "failed", 0, "",
+               dbstat->locks.num_failed);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "current", 0, "",
+               dbstat->locks.num_current);
+       printf(" %*s%-22s%*s%10u\n", 4, "", "pending", 0, "",
+               dbstat->locks.num_pending);
+       printf(" %-30s     %.6f/%.6f/%.6f sec out of %d\n",
+               "    latency_ctdbd  MIN/AVG/MAX",
+               dbstat->locks.latency.min,
+               (dbstat->locks.latency.num ?
+                dbstat->locks.latency.total /dbstat->locks.latency.num :
+                0.0),
+               dbstat->locks.latency.max,
+               dbstat->locks.latency.num);
+       printf(" %s", "    buckets:");
+       for (i=0; i<MAX_COUNT_BUCKETS; i++) {
+               printf(" %d", dbstat->hop_count_bucket[i]);
+       }
+       printf("\n");
+       printf("Num Hot Keys:     %d\n", dbstat->num_hot_keys);
+       for (i = 0; i < dbstat->num_hot_keys; i++) {
+               int j;
+               printf("Count:%d Key:", dbstat->hot_keys[i].count);
+               for (j = 0; j < dbstat->hot_keys[i].key.dsize; j++) {
+                       printf("%02x", dbstat->hot_keys[i].key.dptr[j]&0xff);
+               }
+               printf("\n");
+       }
 
-       ctdb_free_dbstat(dbstatistics);
+       ctdb_free_dbstat(dbstat);
        return 0;
 }
 
@@ -823,6 +863,12 @@ static bool is_partially_online(struct ctdb_node_and_flags *node)
        return ret;
 }
 
+static void control_status_header_machine(void)
+{
+       printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
+              ":Inactive:PartiallyOnline:ThisNode:\n");
+}
+
 static int control_status_1_machine(int mypnn, struct ctdb_node_and_flags *node)
 {
        printf(":%d:%s:%d:%d:%d:%d:%d:%d:%d:%c:\n", node->pnn,
@@ -869,8 +915,7 @@ static int control_status(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        if (options.machinereadable) {
-               printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
-                      ":Inactive:PartiallyOnline:ThisNode:\n");
+               control_status_header_machine();
                for (i=0;i<nodemap->num;i++) {
                        if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                                continue;
@@ -936,8 +981,7 @@ static int control_nodestatus(struct ctdb_context *ctdb, int argc, const char **
        }
 
        if (options.machinereadable) {
-               printf(":Node:IP:Disconnected:Banned:Disabled:Unhealthy:Stopped"
-                      ":Inactive:PartiallyOnline:ThisNode:\n");
+               control_status_header_machine();
        } else if (pnn_mode == CTDB_BROADCAST_ALL) {
                printf("Number of nodes:%d\n", (int) talloc_array_length(nodes));
        }
@@ -971,23 +1015,62 @@ struct natgw_node {
        const char *addr;
 };
 
+static int find_natgw(struct ctdb_context *ctdb,
+                      struct ctdb_node_map *nodemap, uint32_t flags,
+                      uint32_t *pnn, const char **ip)
+{
+       int i;
+       uint32_t capabilities;
+
+       for (i=0;i<nodemap->num;i++) {
+               if (!(nodemap->nodes[i].flags & flags)) {
+                       if (!ctdb_getcapabilities(ctdb_connection, nodemap->nodes[i].pnn, &capabilities)) {
+                               DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
+                               return -1;
+                       }
+                       if (!(capabilities&CTDB_CAP_NATGW)) {
+                               continue;
+                       }
+                       *pnn = nodemap->nodes[i].pnn;
+                       *ip = ctdb_addr_to_str(&nodemap->nodes[i].addr);
+                       return 0;
+               }
+       }
+
+       return 2; /* matches ENOENT */
+}
+
 /*
   display the list of nodes belonging to this natgw configuration
  */
 static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int i, ret;
-       uint32_t capabilities;
        const char *natgw_list;
        int nlines;
        char **lines;
        struct natgw_node *natgw_nodes = NULL;
        struct natgw_node *natgw_node;
        struct ctdb_node_map *nodemap=NULL;
+       uint32_t mypnn, pnn;
+       const char *ip;
 
+       /* When we have some nodes that could be the NATGW, make a
+        * series of attempts to find the first node that doesn't have
+        * certain status flags set.
+        */
+       uint32_t exclude_flags[] = {
+               /* Look for a nice healthy node */
+               NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY,
+               /* If not found, an UNHEALTHY/BANNED node will do */
+               NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED,
+               /* If not found, a STOPPED node will do */
+               NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED,
+               0,
+       };
 
        /* read the natgw nodes file into a linked list */
-       natgw_list = getenv("NATGW_NODES");
+       natgw_list = getenv("CTDB_NATGW_NODES");
        if (natgw_list == NULL) {
                natgw_list = "/etc/ctdb/natgw_nodes";
        }
@@ -996,9 +1079,6 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                ctdb_set_error(ctdb, "Failed to load natgw node list '%s'\n", natgw_list);
                return -1;
        }
-       while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
-               nlines--;
-       }
        for (i=0;i<nlines;i++) {
                char *node;
 
@@ -1020,12 +1100,14 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                natgw_nodes = natgw_node;
        }
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, CTDB_CURRENT_NODE, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node.\n"));
-               return ret;
+               return -1;
        }
 
+       /* Trim the nodemap so it only includes connected nodes in the
+        * current natgw group.
+        */
        i=0;
        while(i<nodemap->num) {
                for(natgw_node=natgw_nodes;natgw_node;natgw_node=natgw_node->next) {
@@ -1049,79 +1131,56 @@ static int control_natgwlist(struct ctdb_context *ctdb, int argc, const char **a
                }
 
                i++;
-       }               
+       }
 
-       /* pick a node to be natgwmaster
-        * we dont allow STOPPED, DELETED, BANNED or UNHEALTHY nodes to become the natgwmaster
-        */
-       for(i=0;i<nodemap->num;i++){
-               if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_UNHEALTHY))) {
-                       ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
-                       if (ret != 0) {
-                               DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                               return ret;
-                       }
-                       if (!(capabilities&CTDB_CAP_NATGW)) {
-                               continue;
-                       }
-                       printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
-                       break;
+       ret = 2; /* matches ENOENT */
+       pnn = -1;
+       ip = "0.0.0.0";
+       for (i = 0; exclude_flags[i] != 0; i++) {
+               ret = find_natgw(ctdb, nodemap,
+                                exclude_flags[i],
+                                &pnn, &ip);
+               if (ret == -1) {
+                       goto done;
                }
-       }
-       /* we couldnt find any healthy node, try unhealthy ones */
-       if (i == nodemap->num) {
-               for(i=0;i<nodemap->num;i++){
-                       if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_STOPPED|NODE_FLAGS_DELETED))) {
-                               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                                       return ret;
-                               }
-                               if (!(capabilities&CTDB_CAP_NATGW)) {
-                                       continue;
-                               }
-                               printf("%d %s\n", nodemap->nodes[i].pnn,ctdb_addr_to_str(&nodemap->nodes[i].addr));
-                               break;
-                       }
+               if (ret == 0) {
+                       break;
                }
        }
-       /* unless all nodes are STOPPED, when we pick one anyway */
-       if (i == nodemap->num) {
-               for(i=0;i<nodemap->num;i++){
-                       if (!(nodemap->nodes[i].flags & (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED))) {
-                               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, &capabilities);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", nodemap->nodes[i].pnn));
-                                       return ret;
-                               }
-                               if (!(capabilities&CTDB_CAP_NATGW)) {
-                                       continue;
-                               }
-                               printf("%d %s\n", nodemap->nodes[i].pnn, ctdb_addr_to_str(&nodemap->nodes[i].addr));
-                               break;
-                       }
-               }
-               /* or if we still can not find any */
-               if (i == nodemap->num) {
-                       printf("-1 0.0.0.0\n");
-                       ret = 2; /* matches ENOENT */
-               }
+
+       if (options.machinereadable) {
+               printf(":Node:IP:\n");
+               printf(":%d:%s:\n", pnn, ip);
+       } else {
+               printf("%d %s\n", pnn, ip);
        }
 
        /* print the pruned list of nodes belonging to this natgw list */
+       if (!ctdb_getpnn(ctdb_connection, options.pnn, &mypnn)) {
+               DEBUG(DEBUG_NOTICE, ("Unable to get PNN from node %u\n", options.pnn));
+               /* This is actually harmless and will only result in
+                * the "this node" indication being missing
+                */
+               mypnn = -1;
+       }
+       if (options.machinereadable) {
+               control_status_header_machine();
+       } else {
+               printf("Number of nodes:%d\n", nodemap->num);
+       }
        for(i=0;i<nodemap->num;i++){
                if (nodemap->nodes[i].flags & NODE_FLAGS_DELETED) {
                        continue;
                }
-               printf(":%d:%s:%d:%d:%d:%d:%d\n", nodemap->nodes[i].pnn,
-                       ctdb_addr_to_str(&nodemap->nodes[i].addr),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_BANNED),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
-                      !!(nodemap->nodes[i].flags&NODE_FLAGS_STOPPED));
+               if (options.machinereadable) {
+                       control_status_1_machine(mypnn, &(nodemap->nodes[i]));
+               } else {
+                       control_status_1_human(mypnn, &(nodemap->nodes[i]));
+               }
        }
 
+done:
+       ctdb_free_nodemap(nodemap);
        return ret;
 }
 
@@ -1535,13 +1594,88 @@ static int move_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn
        return 0;
 }
 
+
+/* 
+ * scans all other nodes and returns a pnn for another node that can host this 
+ * ip address or -1
+ */
+static int
+find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_all_public_ips *ips;
+       struct ctdb_node_map *nodemap=NULL;
+       int i, j, ret;
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       for(i=0;i<nodemap->num;i++){
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               if (nodemap->nodes[i].pnn == options.pnn) {
+                       continue;
+               }
+
+               /* read the public ip list from this node */
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
+                       return -1;
+               }
+
+               for (j=0;j<ips->num;j++) {
+                       if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
+                               talloc_free(tmp_ctx);
+                               return nodemap->nodes[i].pnn;
+                       }
+               }
+               talloc_free(ips);
+       }
+
+       talloc_free(tmp_ctx);
+       return -1;
+}
+
+/* If pnn is -1 then try to find a node to move IP to... */
+static bool try_moveip(struct ctdb_context *ctdb, ctdb_sock_addr *addr, uint32_t pnn)
+{
+       bool pnn_specified = (pnn == -1 ? false : true);
+       int retries = 0;
+
+       while (retries < 5) {
+               if (!pnn_specified) {
+                       pnn = find_other_host_for_public_ip(ctdb, addr);
+                       if (pnn == -1) {
+                               return false;
+                       }
+                       DEBUG(DEBUG_NOTICE,
+                             ("Trying to move public IP to node %u\n", pnn));
+               }
+
+               if (move_ip(ctdb, addr, pnn) == 0) {
+                       return true;
+               }
+
+               sleep(3);
+               retries++;
+       }
+
+       return false;
+}
+
+
 /*
   move/failover an ip address to a specific node
  */
 static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t pnn;
-       int ret, retries = 0;
        ctdb_sock_addr addr;
 
        if (argc < 2) {
@@ -1560,16 +1694,8 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-       do {
-               ret = move_ip(ctdb, &addr, pnn);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 second and try again.\n", pnn));
-                       sleep(3);
-                       retries++;
-               }
-       } while (retries < 5 && ret != 0);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", pnn));
+       if (!try_moveip(ctdb, &addr, pnn)) {
+               DEBUG(DEBUG_ERR,("Failed to move IP to node %d.\n", pnn));
                return -1;
        }
 
@@ -1614,6 +1740,80 @@ static int control_rebalancenode(struct ctdb_context *ctdb, int argc, const char
 }
 
 
+static int rebalance_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
+{
+       struct ctdb_public_ip ip;
+       int ret;
+       uint32_t *nodes;
+       uint32_t disable_time;
+       TDB_DATA data;
+       struct ctdb_node_map *nodemap=NULL;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+
+       disable_time = 30;
+       data.dptr  = (uint8_t*)&disable_time;
+       data.dsize = sizeof(disable_time);
+       ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_DISABLE_IP_CHECK, data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send message to disable ipcheck\n"));
+               return -1;
+       }
+
+       ip.pnn  = -1;
+       ip.addr = *addr;
+
+       data.dptr  = (uint8_t *)&ip;
+       data.dsize = sizeof(ip);
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+               nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       ret = ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
+                                       nodes, 0,
+                                       LONGTIMELIMIT(),
+                                       false, data,
+                                       NULL, NULL,
+                                       NULL);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to release IP on nodes\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+/*
+  release an ip form all nodes and have it re-assigned by recd
+ */
+static int control_rebalanceip(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       ctdb_sock_addr addr;
+
+       if (argc < 1) {
+               usage();
+               return -1;
+       }
+
+       if (parse_ip(argv[0], NULL, 0, &addr) == 0) {
+               DEBUG(DEBUG_ERR,("Wrongly formed ip address '%s'\n", argv[0]));
+               return -1;
+       }
+
+       if (rebalance_ip(ctdb, &addr) != 0) {
+               DEBUG(DEBUG_ERR,("Error when trying to reassign ip\n"));
+               return -1;
+       }
+
+       return 0;
+}
+
 static int getips_store_callback(void *param, void *data)
 {
        struct ctdb_public_ip *node_ip = (struct ctdb_public_ip *)data;
@@ -1728,54 +1928,7 @@ control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struc
 }
 
 
-/* 
- * scans all other nodes and returns a pnn for another node that can host this 
- * ip address or -1
- */
-static int
-find_other_host_for_public_ip(struct ctdb_context *ctdb, ctdb_sock_addr *addr)
-{
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       struct ctdb_all_public_ips *ips;
-       struct ctdb_node_map *nodemap=NULL;
-       int i, j, ret;
-
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               talloc_free(tmp_ctx);
-               return ret;
-       }
-
-       for(i=0;i<nodemap->num;i++){
-               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
-                       continue;
-               }
-               if (nodemap->nodes[i].pnn == options.pnn) {
-                       continue;
-               }
-
-               /* read the public ip list from this node */
-               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
-                       return -1;
-               }
-
-               for (j=0;j<ips->num;j++) {
-                       if (ctdb_same_ip(addr, &ips->ips[j].addr)) {
-                               talloc_free(tmp_ctx);
-                               return nodemap->nodes[i].pnn;
-                       }
-               }
-               talloc_free(ips);
-       }
-
-       talloc_free(tmp_ctx);
-       return -1;
-}
-
-static uint32_t ipreallocate_finished;
+static bool ipreallocate_finished;
 
 /*
   handler for receiving the response to ipreallocate
@@ -1783,7 +1936,7 @@ static uint32_t ipreallocate_finished;
 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
                             TDB_DATA data, void *private_data)
 {
-       ipreallocate_finished = 1;
+       ipreallocate_finished = true;
 }
 
 static void ctdb_every_second(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
@@ -1803,9 +1956,8 @@ static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char
        int i, ret;
        TDB_DATA data;
        struct takeover_run_reply rd;
-       uint32_t recmaster;
        struct ctdb_node_map *nodemap=NULL;
-       int retries=0;
+       int count;
        struct timeval tv = timeval_current();
 
        /* we need some events to trigger so we can timeout and restart
@@ -1813,99 +1965,59 @@ static int control_ipreallocate(struct ctdb_context *ctdb, int argc, const char
        */
        event_add_timed(ctdb->ev, ctdb, 
                                timeval_current_ofs(1, 0),
-                               ctdb_every_second, ctdb);
-
-       rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-       if (rd.pnn == -1) {
-               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
-               return -1;
-       }
-       rd.srvid = getpid();
-
-       /* register a message port for receiveing the reply so that we
-          can receive the reply
-       */
-       ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
-
-       data.dptr = (uint8_t *)&rd;
-       data.dsize = sizeof(rd);
-
-again:
-       /* check that there are valid nodes available */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               return -1;
-       }
-       for (i=0; i<nodemap->num;i++) {
-               if ((nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) == 0) {
-                       break;
-               }
-       }
-       if (i==nodemap->num) {
-               DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
-               return 0;
-       }
-
+                               ctdb_every_second, ctdb);
 
-       if (!ctdb_getrecmaster(ctdb_connection, options.pnn, &recmaster)) {
-               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node %u\n", options.pnn));
+       rd.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+       if (rd.pnn == -1) {
+               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
                return -1;
        }
+       rd.srvid = getpid();
 
-       /* verify the node exists */
-       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), recmaster, ctdb, &nodemap) != 0) {
+       /* register a message port for receiveing the reply so that we
+          can receive the reply
+       */
+       ctdb_client_set_message_handler(ctdb, rd.srvid, ip_reallocate_handler, NULL);
+
+       data.dptr = (uint8_t *)&rd;
+       data.dsize = sizeof(rd);
+
+again:
+       /* get the number of nodes and node flags */
+       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap) != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
                return -1;
        }
 
-
-       /* check tha there are nodes available that can act as a recmaster */
-       for (i=0; i<nodemap->num; i++) {
-               if (nodemap->nodes[i].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
+       ipreallocate_finished = false;
+       count = 0;
+       for (i=0; i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
                        continue;
+               } else {
+                       /* Send to all active nodes. Only recmaster will reply. */
+                       ret = ctdb_client_send_message(ctdb, i, CTDB_SRVID_TAKEOVER_RUN, data);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
+                               return -1;
+                       }
+                       count++;
                }
-               break;
        }
-       if (i == nodemap->num) {
-               DEBUG(DEBUG_ERR,("No possible nodes to host addresses.\n"));
+       if (count == 0) {
+               DEBUG(DEBUG_ERR,("No recmaster available, no need to wait for cluster convergence\n"));
                return 0;
        }
 
-       /* verify the recovery master is not STOPPED, nor BANNED */
-       if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
-               DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
-               retries++;
-               sleep(1);
-               goto again;
-       } 
-       
-       /* verify the recovery master is not STOPPED, nor BANNED */
-       if (nodemap->nodes[recmaster].flags & (NODE_FLAGS_DELETED|NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
-               DEBUG(DEBUG_ERR,("No suitable recmaster found. Try again\n"));
-               retries++;
-               sleep(1);
-               goto again;
-       } 
-
-       ipreallocate_finished = 0;
-       ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,("Failed to send ip takeover run request message to %u\n", options.pnn));
-               return -1;
-       }
-
        tv = timeval_current();
        /* this loop will terminate when we have received the reply */
-       while (timeval_elapsed(&tv) < 5.0 && ipreallocate_finished == 0) {
+       while (timeval_elapsed(&tv) < 5.0 && !ipreallocate_finished) {
                event_loop_once(ctdb->ev);
        }
-       if (ipreallocate_finished == 1) {
-               return 0;
-       }
 
-       retries++;
-       sleep(1);
-       goto again;
+       if (!ipreallocate_finished) {
+               goto again;
+       }
 
        return 0;
 }
@@ -1990,6 +2102,27 @@ static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
        return 0;
 }
 
+/*
+  add a public ip address to a node
+ */
+static int control_ipiface(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       ctdb_sock_addr addr;
+
+       if (argc != 1) {
+               usage();
+       }
+
+       if (!parse_ip(argv[0], NULL, 0, &addr)) {
+               printf("Badly formed ip : %s\n", argv[0]);
+               return -1;
+       }
+
+       printf("IP on interface %s\n", ctdb_sys_find_ifname(&addr));
+
+       return 0;
+}
+
 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv);
 
 static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **argv, ctdb_sock_addr *addr)
@@ -2066,7 +2199,6 @@ static int control_delip_all(struct ctdb_context *ctdb, int argc, const char **a
 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int i, ret;
-       int retries = 0;
        ctdb_sock_addr addr;
        struct ctdb_control_ip_iface pub;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
@@ -2110,22 +2242,12 @@ static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
                return -1;
        }
 
+       /* This is an optimisation.  If this node is hosting the IP
+        * then try to move it somewhere else without invoking a full
+        * takeover run.  We don't care if this doesn't work!
+        */
        if (ips->ips[i].pnn == options.pnn) {
-               ret = find_other_host_for_public_ip(ctdb, &addr);
-               if (ret != -1) {
-                       do {
-                               ret = move_ip(ctdb, &addr, ret);
-                               if (ret != 0) {
-                                       DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Wait 3 seconds and try again.\n", options.pnn));
-                                       sleep(3);
-                                       retries++;
-                               }
-                       } while (retries < 5 && ret != 0);
-                       if (ret != 0) {
-                               DEBUG(DEBUG_ERR,("Failed to move ip to node %d. Giving up.\n", options.pnn));
-                               return -1;
-                       }
-               }
+               (void) try_moveip(ctdb, &addr, -1);
        }
 
        ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
@@ -3041,12 +3163,10 @@ static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **
 static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        uint32_t capabilities;
-       int ret;
 
-       ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
-       if (ret != 0) {
+       if (!ctdb_getcapabilities(ctdb_connection, options.pnn, &capabilities)) {
                DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
        
        if (!options.machinereadable){
@@ -3075,15 +3195,16 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
        int i, ret;
        int healthy_count = 0;
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
 
        capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
        CTDB_NO_MEMORY(ctdb, capabilities);
        
+       ret = 0;
+
        /* collect capabilities for all connected nodes */
        for (i=0; i<nodemap->num; i++) {
                if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
@@ -3093,10 +3214,10 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
                        continue;
                }
        
-               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
-               if (ret != 0) {
+               if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
                        DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
-                       return ret;
+                       ret = -1;
+                       goto done;
                }
 
                if (!(capabilities[i] & CTDB_CAP_LVS)) {
@@ -3130,7 +3251,9 @@ static int control_lvs(struct ctdb_context *ctdb, int argc, const char **argv)
                        ctdb_addr_to_str(&nodemap->nodes[i].addr));
        }
 
-       return 0;
+done:
+       ctdb_free_nodemap(nodemap);
+       return ret;
 }
 
 /*
@@ -3143,14 +3266,15 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
        int i, ret;
        int healthy_count = 0;
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), options.pnn, ctdb, &nodemap);
-       if (ret != 0) {
+       if (!ctdb_getnodemap(ctdb_connection, options.pnn, &nodemap)) {
                DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
-               return ret;
+               return -1;
        }
 
        capabilities = talloc_array(ctdb, uint32_t, nodemap->num);
        CTDB_NO_MEMORY(ctdb, capabilities);
+
+       ret = -1;
        
        /* collect capabilities for all connected nodes */
        for (i=0; i<nodemap->num; i++) {
@@ -3161,10 +3285,10 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
                        continue;
                }
        
-               ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), i, &capabilities[i]);
-               if (ret != 0) {
+               if (!ctdb_getcapabilities(ctdb_connection, i, &capabilities[i])) {
                        DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", i));
-                       return ret;
+                       ret = -1;
+                       goto done;
                }
 
                if (!(capabilities[i] & CTDB_CAP_LVS)) {
@@ -3199,11 +3323,14 @@ static int control_lvsmaster(struct ctdb_context *ctdb, int argc, const char **a
                } else {
                        printf("Node %d is LVS master\n", i);
                }
-               return 0;
+               ret = 0;
+               goto done;
        }
 
        printf("There is no LVS master\n");
-       return -1;
+done:
+       ctdb_free_nodemap(nodemap);
+       return ret;
 }
 
 /*
@@ -3250,8 +3377,8 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
        const char *db_name;
        struct ctdb_db_context *ctdb_db;
        int ret;
-       bool persistent;
        struct ctdb_dump_db_context c;
+       uint8_t flags;
 
        if (argc < 1) {
                usage();
@@ -3259,14 +3386,11 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
 
        db_name = argv[0];
 
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, db_name, NULL, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3336,7 +3460,7 @@ static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv
        const char *db_name;
        struct ctdb_db_context *ctdb_db;
        struct cattdb_data d;
-       bool persistent;
+       uint8_t flags;
 
        if (argc < 1) {
                usage();
@@ -3344,14 +3468,11 @@ static int control_cattdb(struct ctdb_context *ctdb, int argc, const char **argv
 
        db_name = argv[0];
 
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, db_name, NULL, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, false, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3380,7 +3501,7 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
        struct ctdb_record_handle *h;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA key, data;
-       bool persistent;
+       uint8_t flags;
 
        if (argc < 2) {
                usage();
@@ -3388,13 +3509,11 @@ static int control_readkey(struct ctdb_context *ctdb, int argc, const char **arg
 
        db_name = argv[0];
 
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, db_name, NULL, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3428,7 +3547,7 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
        struct ctdb_record_handle *h;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA key, data;
-       bool persistent;
+       uint8_t flags;
 
        if (argc < 3) {
                usage();
@@ -3436,13 +3555,11 @@ static int control_writekey(struct ctdb_context *ctdb, int argc, const char **ar
 
        db_name = argv[0];
 
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, db_name, NULL, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                return -1;
@@ -3484,6 +3601,7 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
        TDB_DATA key, data;
        int fd, ret;
        bool persistent;
+       uint8_t flags;
 
        if (argc < 2) {
                talloc_free(tmp_ctx);
@@ -3492,13 +3610,12 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
 
        db_name = argv[0];
 
-
-       if (db_exists(ctdb, db_name, &persistent)) {
-               DEBUG(DEBUG_ERR,("Database '%s' does not exist\n", db_name));
+       if (!db_exists(ctdb, db_name, NULL, &flags)) {
                talloc_free(tmp_ctx);
                return -1;
        }
 
+       persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
        if (!persistent) {
                DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", db_name));
                talloc_free(tmp_ctx);
@@ -3506,7 +3623,6 @@ static int control_pfetch(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
        ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
-
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", db_name));
                talloc_free(tmp_ctx);
@@ -3782,6 +3898,73 @@ static int control_pstore(struct ctdb_context *ctdb, int argc, const char **argv
        return 0;
 }
 
+/*
+ * delete a record from a persistent database
+ */
+static int control_pdelete(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       const char *db_name;
+       struct ctdb_db_context *ctdb_db;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_transaction_handle *h;
+       TDB_DATA key;
+       int ret;
+       bool persistent;
+       uint8_t flags;
+
+       if (argc < 2) {
+               talloc_free(tmp_ctx);
+               usage();
+       }
+
+       db_name = argv[0];
+
+       if (!db_exists(ctdb, db_name, NULL, &flags)) {
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
+       if (!persistent) {
+               DEBUG(DEBUG_ERR, ("Database '%s' is not persistent\n", db_name));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), db_name, persistent, 0);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n", db_name));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       h = ctdb_transaction_start(ctdb_db, tmp_ctx);
+       if (h == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to start transaction on database %s\n", db_name));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       key.dptr = discard_const(argv[1]);
+       key.dsize = strlen(argv[1]);
+       ret = ctdb_transaction_store(h, key, tdb_null);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to delete record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_transaction_commit(h);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to commit transaction\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /*
   check if a service is bound to a port or not
  */
@@ -3834,79 +4017,205 @@ static void log_handler(struct ctdb_context *ctdb, uint64_t srvid,
        exit(0);
 }
 
-/*
-  display a list of log messages from the in memory ringbuffer
- */
-static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
+/*
+  display a list of log messages from the in memory ringbuffer
+ */
+static int control_getlog(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       int ret, i;
+       bool main_daemon;
+       struct ctdb_get_log_addr log_addr;
+       TDB_DATA data;
+       struct timeval tv;
+
+       /* Since this can fail, do it first */
+       log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+       if (log_addr.pnn == -1) {
+               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
+               return -1;
+       }
+
+       /* Process options */
+       main_daemon = true;
+       log_addr.level = DEBUG_NOTICE;
+       for (i = 0; i < argc; i++) {
+               if (strcmp(argv[i], "recoverd") == 0) {
+                       main_daemon = false;
+               } else {
+                       if (isalpha(argv[i][0]) || argv[i][0] == '-') { 
+                               log_addr.level = get_debug_by_desc(argv[i]);
+                       } else {
+                               log_addr.level = strtol(argv[i], NULL, 0);
+                       }
+               }
+       }
+
+       /* Our message port is our PID */
+       log_addr.srvid = getpid();
+
+       data.dptr = (unsigned char *)&log_addr;
+       data.dsize = sizeof(log_addr);
+
+       DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
+
+       ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
+       sleep(1);
+
+       DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
+
+       if (main_daemon) {
+               int32_t res;
+               char *errmsg;
+               TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+
+               ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
+                                  0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
+               if (ret != 0 || res != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+               talloc_free(tmp_ctx);
+       } else {
+               ret = ctdb_client_send_message(ctdb, options.pnn,
+                                              CTDB_SRVID_GETLOG, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to send getlog request message to %u\n", options.pnn));
+                       return -1;
+               }
+       }
+
+       tv = timeval_current();
+       /* this loop will terminate when we have received the reply */
+       while (timeval_elapsed(&tv) < (double)options.timelimit) {
+               event_loop_once(ctdb->ev);
+       }
+
+       DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
+
+       return 0;
+}
+
+/*
+  clear the in memory log area
+ */
+static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       int ret;
+
+       if (argc == 0 || (argc >= 1 && strcmp(argv[0], "recoverd") != 0)) {
+               /* "recoverd" not given - get logs from main daemon */
+               int32_t res;
+               char *errmsg;
+               TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+
+               ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
+                                  0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
+               if (ret != 0 || res != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+
+               talloc_free(tmp_ctx);
+       } else {
+               TDB_DATA data; /* unused in recoverd... */
+               data.dsize = 0;
+
+               ret = ctdb_client_send_message(ctdb, options.pnn, CTDB_SRVID_CLEARLOG, data);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,("Failed to send clearlog request message to %u\n", options.pnn));
+                       return -1;
+               }
+       }
+
+       return 0;
+}
+
+
+static uint32_t reloadips_finished;
+
+static void reloadips_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       reloadips_finished = 1;
+}
+
+static int reloadips_all(struct ctdb_context *ctdb)
 {
-       int ret;
-       int32_t res;
-       struct ctdb_get_log_addr log_addr;
+       struct reloadips_all_reply rips;
+       struct ctdb_node_map *nodemap=NULL;
        TDB_DATA data;
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       char *errmsg;
-       struct timeval tv;
+       uint32_t recmaster;
+       int ret, i;
 
-       if (argc != 1) {
-               DEBUG(DEBUG_ERR,("Invalid arguments\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       /* check that there are valid nodes available */
+       if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
+               return 1;
        }
-
-       log_addr.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
-       log_addr.srvid = getpid();
-       if (isalpha(argv[0][0]) || argv[0][0] == '-') { 
-               log_addr.level = get_debug_by_desc(argv[0]);
-       } else {
-               log_addr.level = strtol(argv[0], NULL, 0);
+       for (i=0; i<nodemap->num;i++) {
+               if (nodemap->nodes[i].flags != 0) {
+                       DEBUG(DEBUG_ERR,("reloadips -n all  can only be used when all nodes are up and healthy. Aborting due to problem with node %d\n", i));
+                       return 1;
+               }
        }
 
 
-       data.dptr = (unsigned char *)&log_addr;
-       data.dsize = sizeof(log_addr);
-
-       DEBUG(DEBUG_ERR, ("Pulling logs from node %u\n", options.pnn));
+       rips.pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+       if (rips.pnn == -1) {
+               DEBUG(DEBUG_ERR, ("Failed to get pnn of local node\n"));
+               return 1;
+       }
+       rips.srvid = getpid();
 
-       ctdb_client_set_message_handler(ctdb, log_addr.srvid, log_handler, NULL);
-       sleep(1);
 
-       DEBUG(DEBUG_ERR,("Listen for response on %d\n", (int)log_addr.srvid));
+       /* register a message port for receiveing the reply so that we
+          can receive the reply
+       */
+       ctdb_client_set_message_handler(ctdb, rips.srvid, reloadips_handler, NULL);
 
-       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_GET_LOG,
-                          0, data, tmp_ctx, NULL, &res, NULL, &errmsg);
-       if (ret != 0 || res != 0) {
-               DEBUG(DEBUG_ERR,("Failed to get logs - %s\n", errmsg));
-               talloc_free(tmp_ctx);
+       if (!ctdb_getrecmaster(ctdb_connection, CTDB_CURRENT_NODE, &recmaster)) {
+               DEBUG(DEBUG_ERR, ("Unable to get recmaster from node\n"));
                return -1;
        }
 
 
-       tv = timeval_current();
-       /* this loop will terminate when we have received the reply */
-       while (timeval_elapsed(&tv) < 3.0) {    
-               event_loop_once(ctdb->ev);
+       data.dptr = (uint8_t *)&rips;
+       data.dsize = sizeof(rips);
+
+       ret = ctdb_client_send_message(ctdb, recmaster, CTDB_SRVID_RELOAD_ALL_IPS, data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to send reload all ips request message to %u\n", options.pnn));
+               return 1;
        }
 
-       DEBUG(DEBUG_INFO,("Timed out waiting for log data.\n"));
+       reloadips_finished = 0;
+       while (reloadips_finished == 0) {
+               event_loop_once(ctdb->ev);
+       }
 
-       talloc_free(tmp_ctx);
        return 0;
 }
 
 /*
-  clear the in memory log area
+  reload public ips on a specific node
  */
-static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **argv)
+static int control_reloadips(struct ctdb_context *ctdb, int argc, const char **argv)
 {
        int ret;
        int32_t res;
        char *errmsg;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
 
-       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_CLEAR_LOG,
+       if (options.pnn == CTDB_BROADCAST_ALL) {
+               return reloadips_all(ctdb);
+       }
+
+       ret = ctdb_control(ctdb, options.pnn, 0, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
                           0, tdb_null, tmp_ctx, NULL, &res, NULL, &errmsg);
        if (ret != 0 || res != 0) {
-               DEBUG(DEBUG_ERR,("Failed to clear logs\n"));
+               DEBUG(DEBUG_ERR,("Failed to reload ips\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
@@ -3915,8 +4224,6 @@ static int control_clearlog(struct ctdb_context *ctdb, int argc, const char **ar
        return 0;
 }
 
-
-
 /*
   display a list of the databases on a remote ctdb
  */
@@ -3932,13 +4239,14 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
        }
 
        if(options.machinereadable){
-               printf(":ID:Name:Path:Persistent:Unhealthy:ReadOnly:\n");
+               printf(":ID:Name:Path:Persistent:Sticky:Unhealthy:ReadOnly:\n");
                for(i=0;i<dbmap->num;i++){
                        const char *path;
                        const char *name;
                        const char *health;
                        bool persistent;
                        bool readonly;
+                       bool sticky;
 
                        ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn,
                                            dbmap->dbs[i].dbid, ctdb, &path);
@@ -3948,9 +4256,11 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
                                              dbmap->dbs[i].dbid, ctdb, &health);
                        persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
                        readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
-                       printf(":0x%08X:%s:%s:%d:%d:%d:\n",
+                       sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
+                       printf(":0x%08X:%s:%s:%d:%d:%d:%d:\n",
                               dbmap->dbs[i].dbid, name, path,
-                              !!(persistent), !!(health), !!(readonly));
+                              !!(persistent), !!(sticky),
+                              !!(health), !!(readonly));
                }
                return 0;
        }
@@ -3962,15 +4272,18 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
                const char *health;
                bool persistent;
                bool readonly;
+               bool sticky;
 
                ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
                ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
                ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
                persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
                readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
-               printf("dbid:0x%08x name:%s path:%s%s%s%s\n",
+               sticky     = dbmap->dbs[i].flags & CTDB_DB_FLAGS_STICKY;
+               printf("dbid:0x%08x name:%s path:%s%s%s%s%s\n",
                       dbmap->dbs[i].dbid, name, path,
                       persistent?" PERSISTENT":"",
+                      sticky?" STICKY":"",
                       readonly?" READONLY":"",
                       health?" UNHEALTHY":"");
        }
@@ -3983,9 +4296,11 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
  */
 static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int i, ret;
-       struct ctdb_dbid_map *dbmap=NULL;
        const char *db_name;
+       uint32_t db_id;
+       uint8_t flags;
+       const char *path;
+       const char *health;
 
        if (argc < 1) {
                usage();
@@ -3993,37 +4308,19 @@ static int control_getdbstatus(struct ctdb_context *ctdb, int argc, const char *
 
        db_name = argv[0];
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, ctdb, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return ret;
+       if (!db_exists(ctdb, db_name, &db_id, &flags)) {
+               return -1;
        }
 
-       for(i=0;i<dbmap->num;i++){
-               const char *path;
-               const char *name;
-               const char *health;
-               bool persistent;
-               bool readonly;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
-               if (strcmp(name, db_name) != 0) {
-                       continue;
-               }
-
-               ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
-               ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &health);
-               persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
-               readonly   = dbmap->dbs[i].flags & CTDB_DB_FLAGS_READONLY;
-               printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nREADONLY: %s\nHEALTH: %s\n",
-                      dbmap->dbs[i].dbid, name, path,
-                      persistent?"yes":"no",
-                      readonly?"yes":"no",
-                      health?health:"OK");
-               return 0;
-       }
+       ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &path);
+       ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn, db_id, ctdb, &health);
+       printf("dbid: 0x%08x\nname: %s\npath: %s\nPERSISTENT: %s\nSTICKY: %s\nREADONLY: %s\nHEALTH: %s\n",
+              db_id, db_name, path,
+              (flags & CTDB_DB_FLAGS_PERSISTENT ? "yes" : "no"),
+              (flags & CTDB_DB_FLAGS_STICKY ? "yes" : "no"),
+              (flags & CTDB_DB_FLAGS_READONLY ? "yes" : "no"),
+              (health ? health : "OK"));
 
-       DEBUG(DEBUG_ERR, ("db %s doesn't exist on node %u\n", db_name, options.pnn));
        return 0;
 }
 
@@ -4075,6 +4372,49 @@ static int control_ping(struct ctdb_context *ctdb, int argc, const char **argv)
 }
 
 
+/*
+  get a node's runstate
+ */
+static int control_runstate(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       int ret;
+       enum ctdb_runstate runstate;
+
+       ret = ctdb_ctrl_get_runstate(ctdb, TIMELIMIT(), options.pnn, &runstate);
+       if (ret == -1) {
+               printf("Unable to get runstate response from node %u\n",
+                      options.pnn);
+               return -1;
+       } else {
+               bool found = true;
+               enum ctdb_runstate t;
+               int i;
+               for (i=0; i<argc; i++) {
+                       found = false;
+                       t = runstate_from_string(argv[i]);
+                       if (t == CTDB_RUNSTATE_UNKNOWN) {
+                               printf("Invalid run state (%s)\n", argv[i]);
+                               return -1;
+                       }
+
+                       if (t == runstate) {
+                               found = true;
+                               break;
+                       }
+               }
+
+               if (!found) {
+                       printf("CTDB not in required run state (got %s)\n", 
+                              runstate_to_string((enum ctdb_runstate)runstate));
+                       return -1;
+               }
+       }
+
+       printf("%s\n", runstate_to_string(runstate));
+       return 0;
+}
+
+
 /*
   get a tunable
  */
@@ -4444,7 +4784,9 @@ static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **a
                usage();
        }
 
-       db_id = strtoul(argv[0], NULL, 0);
+       if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
+               return -1;
+       }
 
        ret = ctdb_ctrl_get_db_priority(ctdb, TIMELIMIT(), options.pnn, db_id, &priority);
        if (ret != 0) {
@@ -4457,6 +4799,34 @@ static int control_getdbprio(struct ctdb_context *ctdb, int argc, const char **a
        return 0;
 }
 
+/*
+  set the sticky records capability for a database
+ */
+static int control_setdbsticky(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       uint32_t db_id;
+       int ret;
+
+       if (argc < 1) {
+               usage();
+       }
+
+       if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
+               return -1;
+       }
+
+       ret = ctdb_ctrl_set_db_sticky(ctdb, options.pnn, db_id);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Unable to set db to support sticky records\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /*
   set the readonly capability for a database
  */
@@ -4464,38 +4834,14 @@ static int control_setdbreadonly(struct ctdb_context *ctdb, int argc, const char
 {
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        uint32_t db_id;
-       struct ctdb_dbid_map *dbmap=NULL;
-       int i, ret;
+       int ret;
 
        if (argc < 1) {
                usage();
        }
 
-       if (!strncmp(argv[0], "0x", 2)) {
-               db_id = strtoul(argv[0] + 2, NULL, 0);
-       } else {
-               ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-                       talloc_free(tmp_ctx);
-                       return ret;
-               }
-               for(i=0;i<dbmap->num;i++){
-                       const char *name;
-
-                       ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-                       if(!strcmp(argv[0], name)){
-                               talloc_free(discard_const(name));
-                               break;
-                       }
-                       talloc_free(discard_const(name));
-               }
-               if (i == dbmap->num) {
-                       DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-               db_id = dbmap->dbs[i].dbid;
+       if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
+               return -1;
        }
 
        ret = ctdb_ctrl_set_db_readonly(ctdb, options.pnn, db_id);
@@ -4522,7 +4868,9 @@ static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char *
                usage();
        }
 
-       db_id = strtoul(argv[0], NULL, 0);
+       if (!db_exists(ctdb, argv[0], &db_id, NULL)) {
+               return -1;
+       }
 
        ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &seqnum);
        if (!ret) {
@@ -4535,6 +4883,91 @@ static int control_getdbseqnum(struct ctdb_context *ctdb, int argc, const char *
        return 0;
 }
 
+/*
+ * set db seqnum
+ */
+static int control_setdbseqnum(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       bool ret;
+       struct ctdb_db_context *ctdb_db;
+       uint32_t db_id;
+       uint8_t flags;
+       uint64_t old_seqnum, new_seqnum;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_transaction_handle *h;
+       TDB_DATA key, data;
+       bool persistent;
+
+       if (argc != 2) {
+               talloc_free(tmp_ctx);
+               usage();
+       }
+
+       if (!db_exists(ctdb, argv[0], &db_id, &flags)) {
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
+       if (!persistent) {
+               DEBUG(DEBUG_ERR,("Database '%s' is not persistent\n", argv[0]));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_getdbseqnum(ctdb_connection, options.pnn, db_id, &old_seqnum);
+       if (!ret) {
+               DEBUG(DEBUG_ERR, ("Unable to get seqnum from node."));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       new_seqnum = strtoull(argv[1], NULL, 0);
+       if (new_seqnum <= old_seqnum) {
+               DEBUG(DEBUG_ERR, ("New sequence number is less than current sequence number\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], persistent, 0);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       h = ctdb_transaction_start(ctdb_db, tmp_ctx);
+       if (h == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to start transaction on database %s\n", argv[0]));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       key.dptr  = (uint8_t *)discard_const(CTDB_DB_SEQNUM_KEY);
+       key.dsize = strlen(CTDB_DB_SEQNUM_KEY) + 1;
+
+       data.dsize = sizeof(new_seqnum);
+       data.dptr = talloc_size(tmp_ctx, data.dsize);
+       *data.dptr = new_seqnum;
+
+       ret = ctdb_transaction_store(h, key, data);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to store record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       ret = ctdb_transaction_commit(h);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,("Failed to commit transaction\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /*
   run an eventscript on a node
  */
@@ -4616,8 +5049,7 @@ static int backup_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
  */
 static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int i, ret;
-       struct ctdb_dbid_map *dbmap=NULL;
+       int ret;
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        struct db_file_header dbhdr;
        struct ctdb_db_context *ctdb_db;
@@ -4625,36 +5057,20 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
        int fh = -1;
        int status = -1;
        const char *reason = NULL;
+       uint32_t db_id;
+       uint8_t flags;
 
        if (argc != 2) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
                return -1;
        }
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n", options.pnn));
-               return ret;
-       }
-
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, tmp_ctx, &name);
-               if(!strcmp(argv[0], name)){
-                       talloc_free(discard_const(name));
-                       break;
-               }
-               talloc_free(discard_const(name));
-       }
-       if (i == dbmap->num) {
-               DEBUG(DEBUG_ERR,("No database with name '%s' found\n", argv[0]));
-               talloc_free(tmp_ctx);
+       if (!db_exists(ctdb, argv[0], &db_id, &flags)) {
                return -1;
        }
 
        ret = ctdb_ctrl_getdbhealth(ctdb, TIMELIMIT(), options.pnn,
-                                   dbmap->dbs[i].dbid, tmp_ctx, &reason);
+                                   db_id, tmp_ctx, &reason);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Unable to get dbhealth for database '%s'\n",
                                 argv[0]));
@@ -4685,7 +5101,7 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
                                     allow_unhealthy));
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,("Unable to attach to database '%s'\n", argv[0]));
                talloc_free(tmp_ctx);
@@ -4737,7 +5153,7 @@ static int control_backupdb(struct ctdb_context *ctdb, int argc, const char **ar
 
        dbhdr.version = DB_VERSION;
        dbhdr.timestamp = time(NULL);
-       dbhdr.persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
+       dbhdr.persistent = flags & CTDB_DB_FLAGS_PERSISTENT;
        dbhdr.size = bd->len;
        if (strlen(argv[0]) >= MAX_DB_NAME) {
                DEBUG(DEBUG_ERR,("Too long dbname\n"));
@@ -5060,40 +5476,18 @@ static int control_wipedb(struct ctdb_context *ctdb, int argc,
        struct ctdb_control_wipe_database w;
        uint32_t *nodes;
        uint32_t generation;
-       struct ctdb_dbid_map *dbmap = NULL;
+       uint8_t flags;
 
        if (argc != 1) {
                DEBUG(DEBUG_ERR,("Invalid arguments\n"));
                return -1;
        }
 
-       ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), options.pnn, tmp_ctx,
-                                &dbmap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get dbids from node %u\n",
-                                 options.pnn));
-               return ret;
-       }
-
-       for(i=0;i<dbmap->num;i++){
-               const char *name;
-
-               ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn,
-                                   dbmap->dbs[i].dbid, tmp_ctx, &name);
-               if(!strcmp(argv[0], name)){
-                       talloc_free(discard_const(name));
-                       break;
-               }
-               talloc_free(discard_const(name));
-       }
-       if (i == dbmap->num) {
-               DEBUG(DEBUG_ERR, ("No database with name '%s' found\n",
-                                 argv[0]));
-               talloc_free(tmp_ctx);
+       if (!db_exists(ctdb, argv[0], NULL, &flags)) {
                return -1;
        }
 
-       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, 0);
+       ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), argv[0], flags & CTDB_DB_FLAGS_PERSISTENT, 0);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR, ("Unable to attach to database '%s'\n",
                                  argv[0]));
@@ -5449,12 +5843,11 @@ static const struct {
        const char *msg;
        const char *args;
 } ctdb_commands[] = {
-#ifdef CTDB_VERS
-       { "version",         control_version,           true,   false,  "show version of ctdb" },
-#endif
+       { "version",         control_version,           true,   true,   "show version of ctdb" },
        { "status",          control_status,            true,   false,  "show node status" },
        { "uptime",          control_uptime,            true,   false,  "show node uptime" },
        { "ping",            control_ping,              true,   false,  "ping all nodes" },
+       { "runstate",        control_runstate,          true,   false,  "get/check runstate of a node", "[setup|first_recovery|startup|running]" },
        { "getvar",          control_getvar,            true,   false,  "get a tunable variable",               "<name>"},
        { "setvar",          control_setvar,            true,   false,  "set a tunable variable",               "<name> <value>"},
        { "listvars",        control_listvars,          true,   false,  "list tunable variables"},
@@ -5467,9 +5860,9 @@ static const struct {
        { "setifacelink",    control_setifacelink,      true,   false,  "set interface link status", "<iface> <status>" },
        { "process-exists",  control_process_exists,    true,   false,  "check if a process exists on a node",  "<pid>"},
        { "getdbmap",        control_getdbmap,          true,   false,  "show the database map" },
-       { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname>" },
-       { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname>"},
-       { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname>"},
+       { "getdbstatus",     control_getdbstatus,       true,   false,  "show the status of a database", "<dbname|dbid>" },
+       { "catdb",           control_catdb,             true,   false,  "dump a ctdb database" ,                     "<dbname|dbid>"},
+       { "cattdb",          control_cattdb,            true,   false,  "dump a local tdb database" ,                     "<dbname|dbid>"},
        { "getmonmode",      control_getmonmode,        true,   false,  "show monitoring mode" },
        { "getcapabilities", control_getcapabilities,   true,   false,  "show node capabilities" },
        { "pnn",             control_pnn,               true,   false,  "show the pnn of the currnet node" },
@@ -5479,8 +5872,8 @@ static const struct {
        { "enablemonitor",      control_enable_monmode, true,   false,  "set monitoring mode to ACTIVE" },
        { "setdebug",        control_setdebug,          true,   false,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
        { "getdebug",        control_getdebug,          true,   false,  "get debug level" },
-       { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "<level>" },
-       { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer" },
+       { "getlog",          control_getlog,            true,   false,  "get the log data from the in memory ringbuffer", "[<level>] [recoverd]" },
+       { "clearlog",          control_clearlog,        true,   false,  "clear the log data from the in memory ringbuffer", "[recoverd]" },
        { "attach",          control_attach,            true,   false,  "attach to a database",                 "<dbname> [persistent]" },
        { "dumpmemory",      control_dumpmemory,        true,   false,  "dump memory map to stdout" },
        { "rddumpmemory",    control_rddumpmemory,      true,   false,  "dump memory map from the recovery daemon to stdout" },
@@ -5494,7 +5887,7 @@ static const struct {
        { "showban",         control_showban,           true,   false,  "show ban information"},
        { "shutdown",        control_shutdown,          true,   false,  "shutdown ctdbd" },
        { "recover",         control_recover,           true,   false,  "force recovery" },
-       { "sync",            control_ipreallocate,      true,   false,  "wait until ctdbd has synced all state changes" },
+       { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
        { "ipreallocate",    control_ipreallocate,      true,   false,  "force the recovery daemon to perform a ip reallocation procedure" },
        { "thaw",            control_thaw,              true,   false,  "thaw databases", "[priority:1-3]" },
        { "isnotrecmaster",  control_isnotrecmaster,    false,  false,  "check if the local node is recmaster or not" },
@@ -5516,14 +5909,15 @@ static const struct {
        { "listnodes",       control_listnodes,         false,  true, "list all nodes in the cluster"},
        { "reloadnodes",     control_reload_nodes_file, false,  false, "reload the nodes file and restart the transport on all nodes"},
        { "moveip",          control_moveip,            false,  false, "move/failover an ip address to another node", "<ip> <node>"},
+       { "rebalanceip",     control_rebalanceip,       false,  false, "release an ip from the node and let recd rebalance it", "<ip>"},
        { "addip",           control_addip,             true,   false, "add a ip address to a node", "<ip/mask> <iface>"},
        { "delip",           control_delip,             false,  false, "delete an ip address from a node", "<ip>"},
        { "eventscript",     control_eventscript,       true,   false, "run the eventscript with the given parameters on a node", "<arguments>"},
-       { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<database> <file>"},
+       { "backupdb",        control_backupdb,          false,  false, "backup the database into a file.", "<dbname|dbid> <file>"},
        { "restoredb",        control_restoredb,        false,  false, "restore the database from a file.", "<file> [dbname]"},
        { "dumpdbbackup",    control_dumpdbbackup,      false,  true,  "dump database backup from a file.", "<file>"},
-       { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname>"},
-       { "recmaster",        control_recmaster,        false,  false, "show the pnn for the recovery master."},
+       { "wipedb",           control_wipedb,        false,     false, "wipe the contents of a database.", "<dbname|dbid>"},
+       { "recmaster",        control_recmaster,        true,   false, "show the pnn for the recovery master."},
        { "scriptstatus",     control_scriptstatus,     true,   false, "show the status of the monitoring scripts (or all scripts)", "[all]"},
        { "enablescript",     control_enablescript,  false,     false, "enable an eventscript", "<script>"},
        { "disablescript",    control_disablescript,  false,    false, "disable an eventscript", "<script>"},
@@ -5534,23 +5928,27 @@ static const struct {
        { "setnatgwstate",    control_setnatgwstate,    false,  false, "Set NATGW state to on/off", "{on|off}"},
        { "setlmasterrole",   control_setlmasterrole,   false,  false, "Set LMASTER role to on/off", "{on|off}"},
        { "setrecmasterrole", control_setrecmasterrole, false,  false, "Set RECMASTER role to on/off", "{on|off}"},
-       { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbid> <prio:1-3>"},
-       { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbid>"},
-       { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbid>"},
+       { "setdbprio",        control_setdbprio,        false,  false, "Set DB priority", "<dbname|dbid> <prio:1-3>"},
+       { "getdbprio",        control_getdbprio,        false,  false, "Get DB priority", "<dbname|dbid>"},
+       { "setdbreadonly",    control_setdbreadonly,    false,  false, "Set DB readonly capable", "<dbname|dbid>"},
+       { "setdbsticky",      control_setdbsticky,      false,  false, "Set DB sticky-records capable", "<dbname|dbid>"},
        { "msglisten",        control_msglisten,        false,  false, "Listen on a srvid port for messages", "<msg srvid>"},
        { "msgsend",          control_msgsend,  false,  false, "Send a message to srvid", "<srvid> <message>"},
-       { "sync",            control_ipreallocate,      false,  false,  "wait until ctdbd has synced all state changes" },
-       { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<db> <key> [<file>]" },
-       { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<db> <key> <file containing record>" },
+       { "pfetch",          control_pfetch,            false,  false,  "fetch a record from a persistent database", "<dbname|dbid> <key> [<file>]" },
+       { "pstore",          control_pstore,            false,  false,  "write a record to a persistent database", "<dbname|dbid> <key> <file containing record>" },
+       { "pdelete",         control_pdelete,           false,  false,  "delete a record from a persistent database", "<dbname|dbid> <key>" },
        { "tfetch",          control_tfetch,            false,  true,  "fetch a record from a [c]tdb-file [-v]", "<tdb-file> <key> [<file>]" },
        { "tstore",          control_tstore,            false,  true,  "store a record (including ltdb header)", "<tdb-file> <key> <data+header>" },
        { "readkey",         control_readkey,           true,   false,  "read the content off a database key", "<tdb-file> <key>" },
        { "writekey",        control_writekey,          true,   false,  "write to a database key", "<tdb-file> <key> <value>" },
        { "checktcpport",    control_chktcpport,        false,  true,  "check if a service is bound to a specific tcp port or not", "<port>" },
        { "rebalancenode",     control_rebalancenode,   false,  false, "release a node by allowing it to takeover ips", "<pnn>"},
-       { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbid>" },
+       { "getdbseqnum",     control_getdbseqnum,       false,  false, "get the sequence number off a database", "<dbname|dbid>" },
+       { "setdbseqnum",     control_setdbseqnum,       false,  false, "set the sequence number for a database", "<dbname|dbid> <seqnum>" },
        { "nodestatus",      control_nodestatus,        true,   false,  "show and return node status" },
-       { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<db>" },
+       { "dbstatistics",    control_dbstatistics,      false,  false, "show db statistics", "<dbname|dbid>" },
+       { "reloadips",       control_reloadips,         false,  false, "reload the public addresses file on a node" },
+       { "ipiface",         control_ipiface,           true,   true,  "Find which interface an ip address is hsoted on", "<ip>" },
 };
 
 /*
@@ -5664,7 +6062,6 @@ int main(int argc, const char *argv[])
                DEBUG(DEBUG_ERR, ("Failed to initialize event system\n"));
                exit(1);
        }
-       tevent_loop_allow_nesting(ev);
 
        for (i=0;i<ARRAY_SIZE(ctdb_commands);i++) {
                if (strcmp(control, ctdb_commands[i].name) == 0) {
@@ -5729,6 +6126,7 @@ int main(int argc, const char *argv[])
 
        ctdb_disconnect(ctdb_connection);
        talloc_free(ctdb);
+       talloc_free(ev);
        (void)poptFreeContext(pc);
 
        return ret;