Revert "recoverd: Disable takeover runs on other nodes for 5 minutes"
[ctdb.git] / server / ctdb_recoverd.c
index c0792178685580a1635eeaf985cd4b472867be92..e5c2887f44f37a78cc7b88b1ba6be357b7d3dd80 100644 (file)
@@ -18,7 +18,6 @@
 */
 
 #include "includes.h"
-#include "lib/tevent/tevent.h"
 #include "system/filesys.h"
 #include "system/time.h"
 #include "system/network.h"
 #include "dlinklist.h"
 
 
-/* list of "ctdb ipreallocate" processes to call back when we have
-   finished the takeover run.
-*/
-struct ip_reallocate_list {
-       struct ip_reallocate_list *next;
-       struct rd_memdump_reply *rd;
+/* List of SRVID requests that need to be processed */
+struct srvid_list {
+       struct srvid_list *next, *prev;
+       struct srvid_request *request;
+};
+
+struct srvid_requests {
+       struct srvid_list *requests;
 };
 
+static void srvid_request_reply(struct ctdb_context *ctdb,
+                               struct srvid_request *request,
+                               TDB_DATA result)
+{
+       /* Someone that sent srvid==0 does not want a reply */
+       if (request->srvid == 0) {
+               talloc_free(request);
+               return;
+       }
+
+       if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
+                                    result) == 0) {
+               DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
+                                 (unsigned)request->pnn,
+                                 (unsigned long long)request->srvid));
+       } else {
+               DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
+                                (unsigned)request->pnn,
+                                (unsigned long long)request->srvid));
+       }
+
+       talloc_free(request);
+}
+
+static void srvid_requests_reply(struct ctdb_context *ctdb,
+                                struct srvid_requests **requests,
+                                TDB_DATA result)
+{
+       struct srvid_list *r;
+
+       for (r = (*requests)->requests; r != NULL; r = r->next) {
+               srvid_request_reply(ctdb, r->request, result);
+       }
+
+       /* Free the list structure... */
+       TALLOC_FREE(*requests);
+}
+
+static void srvid_request_add(struct ctdb_context *ctdb,
+                             struct srvid_requests **requests,
+                             struct srvid_request *request)
+{
+       struct srvid_list *t;
+       int32_t ret;
+       TDB_DATA result;
+
+       if (*requests == NULL) {
+               *requests = talloc_zero(ctdb, struct srvid_requests);
+               if (*requests == NULL) {
+                       goto nomem;
+               }
+       }
+
+       t = talloc_zero(*requests, struct srvid_list);
+       if (t == NULL) {
+               /* If *requests was just allocated above then free it */
+               if ((*requests)->requests == NULL) {
+                       TALLOC_FREE(*requests);
+               }
+               goto nomem;
+       }
+
+       t->request = (struct srvid_request *)talloc_steal(t, request);
+       DLIST_ADD((*requests)->requests, t);
+
+       return;
+
+nomem:
+       /* Failed to add the request to the list.  Send a fail. */
+       DEBUG(DEBUG_ERR, (__location__
+                         " Out of memory, failed to queue SRVID request\n"));
+       ret = -ENOMEM;
+       result.dsize = sizeof(ret);
+       result.dptr = (uint8_t *)&ret;
+       srvid_request_reply(ctdb, request, result);
+}
+
 struct ctdb_banning_state {
        uint32_t count;
        struct timeval last_reported_time;
@@ -51,6 +129,7 @@ struct ctdb_recoverd {
        struct ctdb_context *ctdb;
        uint32_t recmaster;
        uint32_t num_active;
+       uint32_t num_lmasters;
        uint32_t num_connected;
        uint32_t last_culprit_node;
        struct ctdb_node_map *nodemap;
@@ -61,11 +140,11 @@ struct ctdb_recoverd {
        struct timed_event *send_election_te;
        struct timed_event *election_timeout;
        struct vacuum_info *vacuum_info;
-       TALLOC_CTX *ip_reallocate_ctx;
-       struct ip_reallocate_list *reallocate_callers;
-       TALLOC_CTX *ip_check_disable_ctx;
+       struct srvid_requests *reallocate_requests;
+       bool takeover_run_in_progress;
+       TALLOC_CTX *takeover_runs_disable_ctx;
        struct ctdb_control_get_ifaces *ifaces;
-       TALLOC_CTX *deferred_rebalance_ctx;
+       uint32_t *force_rebalance_nodes;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
@@ -82,13 +161,13 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
        struct ctdb_context *ctdb = rec->ctdb;
        struct ctdb_ban_time bantime;
        
-       DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
-
        if (!ctdb_validate_pnn(ctdb, pnn)) {
                DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
                return;
        }
 
+       DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
+
        bantime.pnn  = pnn;
        bantime.time = ban_time;
 
@@ -103,33 +182,6 @@ static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_
 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
 
 
-/*
-  run the "recovered" eventscript on all nodes
- */
-static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
-{
-       TALLOC_CTX *tmp_ctx;
-       uint32_t *nodes;
-
-       tmp_ctx = talloc_new(ctdb);
-       CTDB_NO_MEMORY(ctdb, tmp_ctx);
-
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(), false, tdb_null,
-                                       NULL, NULL,
-                                       NULL) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
-
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       talloc_free(tmp_ctx);
-       return 0;
-}
-
 /*
   remember the trouble maker
  */
@@ -143,6 +195,12 @@ static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit,
                return;
        }
 
+       /* If we are banned or stopped, do not set other nodes as culprits */
+       if (rec->node_flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
+               return;
+       }
+
        if (ctdb->nodes[culprit]->ban_state == NULL) {
                ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
                CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
@@ -171,6 +229,46 @@ static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
 }
 
 
+/* this callback is called for every node that failed to execute the
+   recovered event
+*/
+static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
+
+       ctdb_set_culprit(rec, node_pnn);
+}
+
+/*
+  run the "recovered" eventscript on all nodes
+ */
+static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
+{
+       TALLOC_CTX *tmp_ctx;
+       uint32_t *nodes;
+       struct ctdb_context *ctdb = rec->ctdb;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
+                                       nodes, 0,
+                                       CONTROL_TIMEOUT(), false, tdb_null,
+                                       NULL, recovered_fail_callback,
+                                       rec) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
+
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /* this callback is called for every node that failed to execute the
    start recovery event
 */
@@ -237,7 +335,7 @@ static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *
        tmp_ctx = talloc_new(ctdb);
        CTDB_NO_MEMORY(ctdb, tmp_ctx);
 
-       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+       nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
                                        nodes, 0,
                                        CONTROL_TIMEOUT(),
@@ -1091,7 +1189,7 @@ static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *n
                           Since we are the recovery master we can just as
                           well update the flags on all nodes.
                        */
-                       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
+                       ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
                                return -1;
@@ -1153,7 +1251,7 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
        if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
-       tdb_flags |= TDB_DISALLOW_NESTING;
+       tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
 
        recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
                              tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
@@ -1168,12 +1266,13 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
 
 
 /* 
-   a traverse function for pulling all relevent records from recdb
+   a traverse function for pulling all relevant records from recdb
  */
 struct recdb_data {
        struct ctdb_context *ctdb;
        struct ctdb_marshall_buffer *recdata;
        uint32_t len;
+       uint32_t allocated_len;
        bool failed;
        bool persistent;
 };
@@ -1184,8 +1283,37 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
        struct ctdb_rec_data *rec;
        struct ctdb_ltdb_header *hdr;
 
-       /* skip empty records */
-       if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+       /*
+        * skip empty records - but NOT for persistent databases:
+        *
+        * The record-by-record mode of recovery deletes empty records.
+        * For persistent databases, this can lead to data corruption
+        * by deleting records that should be there:
+        *
+        * - Assume the cluster has been running for a while.
+        *
+        * - A record R in a persistent database has been created and
+        *   deleted a couple of times, the last operation being deletion,
+        *   leaving an empty record with a high RSN, say 10.
+        *
+        * - Now a node N is turned off.
+        *
+        * - This leaves the local database copy of D on N with the empty
+        *   copy of R and RSN 10. On all other nodes, the recovery has deleted
+        *   the copy of record R.
+        *
+        * - Now the record is created again while node N is turned off.
+        *   This creates R with RSN = 1 on all nodes except for N.
+        *
+        * - Now node N is turned on again. The following recovery will chose
+        *   the older empty copy of R due to RSN 10 > RSN 1.
+        *
+        * ==> Hence the record is gone after the recovery.
+        *
+        * On databases like Samba's registry, this can damage the higher-level
+        * data structures built from the various tdb-level records.
+        */
+       if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
                return 0;
        }
 
@@ -1202,10 +1330,13 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
                params->failed = true;
                return -1;
        }
-       params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
+       if (params->len + rec->length >= params->allocated_len) {
+               params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
+               params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
+       }
        if (params->recdata == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
-                        rec->length + params->len, params->recdata->count));
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
+                        rec->length + params->len));
                params->failed = true;
                return -1;
        }
@@ -1241,6 +1372,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
        params.ctdb = ctdb;
        params.recdata = recdata;
        params.len = offsetof(struct ctdb_marshall_buffer, data);
+       params.allocated_len = params.len;
        params.failed = false;
        params.persistent = persistent;
 
@@ -1349,15 +1481,6 @@ static int recover_database(struct ctdb_recoverd *rec,
        return 0;
 }
 
-/*
-  reload the nodes file 
-*/
-static void reload_nodes_file(struct ctdb_context *ctdb)
-{
-       ctdb->nodes = NULL;
-       ctdb_load_nodes_file(ctdb);
-}
-
 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
                                         struct ctdb_recoverd *rec,
                                         struct ctdb_node_map *nodemap,
@@ -1376,57 +1499,62 @@ static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
        }
 
        for (j=0; j<nodemap->num; j++) {
+               /* For readability */
+               struct ctdb_node *node = ctdb->nodes[j];
+
                /* release any existing data */
-               if (ctdb->nodes[j]->known_public_ips) {
-                       talloc_free(ctdb->nodes[j]->known_public_ips);
-                       ctdb->nodes[j]->known_public_ips = NULL;
+               if (node->known_public_ips) {
+                       talloc_free(node->known_public_ips);
+                       node->known_public_ips = NULL;
                }
-               if (ctdb->nodes[j]->available_public_ips) {
-                       talloc_free(ctdb->nodes[j]->available_public_ips);
-                       ctdb->nodes[j]->available_public_ips = NULL;
+               if (node->available_public_ips) {
+                       talloc_free(node->available_public_ips);
+                       node->available_public_ips = NULL;
                }
 
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
 
-               /* grab a new shiny list of public ips from the node */
+               /* Retrieve the list of known public IPs from the node */
                ret = ctdb_ctrl_get_public_ips_flags(ctdb,
                                        CONTROL_TIMEOUT(),
-                                       ctdb->nodes[j]->pnn,
+                                       node->pnn,
                                        ctdb->nodes,
                                        0,
-                                       &ctdb->nodes[j]->known_public_ips);
+                                       &node->known_public_ips);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
-                               ctdb->nodes[j]->pnn));
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to read known public IPs from node: %u\n",
+                              node->pnn));
                        if (culprit) {
-                               *culprit = ctdb->nodes[j]->pnn;
+                               *culprit = node->pnn;
                        }
                        return -1;
                }
 
-               if (ctdb->do_checkpublicip) {
-                       if (rec->ip_check_disable_ctx == NULL) {
-                               if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
-                                       DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
-                                       rec->need_takeover_run = true;
-                               }
-                       }
+               if (ctdb->do_checkpublicip &&
+                   rec->takeover_runs_disable_ctx == NULL &&
+                   verify_remote_ip_allocation(ctdb,
+                                                node->known_public_ips,
+                                                node->pnn)) {
+                       DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
+                       rec->need_takeover_run = true;
                }
 
-               /* grab a new shiny list of public ips from the node */
+               /* Retrieve the list of available public IPs from the node */
                ret = ctdb_ctrl_get_public_ips_flags(ctdb,
                                        CONTROL_TIMEOUT(),
-                                       ctdb->nodes[j]->pnn,
+                                       node->pnn,
                                        ctdb->nodes,
                                        CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
-                                       &ctdb->nodes[j]->available_public_ips);
+                                       &node->available_public_ips);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
-                               ctdb->nodes[j]->pnn));
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to read available public IPs from node: %u\n",
+                              node->pnn));
                        if (culprit) {
-                               *culprit = ctdb->nodes[j]->pnn;
+                               *culprit = node->pnn;
                        }
                        return -1;
                }
@@ -1470,6 +1598,150 @@ static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
 }
 
 
+/*
+ * this callback is called for every node that failed to execute ctdb_takeover_run()
+ * and set flag to re-run takeover run.
+ */
+static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
+{
+       DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
+
+       if (callback_data != NULL) {
+               struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
+
+               DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
+
+               ctdb_set_culprit(rec, node_pnn);
+       }
+}
+
+
+static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       int i;
+       struct ctdb_banning_state *ban_state;
+
+       *self_ban = false;
+       for (i=0; i<ctdb->num_nodes; i++) {
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 2*ctdb->num_nodes) {
+                       continue;
+               }
+
+               DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
+
+               /* Banning ourself? */
+               if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
+                       *self_ban = true;
+               }
+       }
+}
+
+static bool do_takeover_run(struct ctdb_recoverd *rec,
+                           struct ctdb_node_map *nodemap,
+                           bool banning_credits_on_fail)
+{
+       uint32_t *nodes = NULL;
+       struct srvid_request dtr;
+       TDB_DATA data;
+       int i;
+       uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
+       int ret;
+       bool ok;
+
+       DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
+
+       if (rec->takeover_run_in_progress) {
+               DEBUG(DEBUG_ERR, (__location__
+                                 " takeover run already in progress \n"));
+               ok = false;
+               goto done;
+       }
+
+       rec->takeover_run_in_progress = true;
+
+       /* If takeover runs are in disabled then fail... */
+       if (rec->takeover_runs_disable_ctx != NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Takeover runs are disabled so refusing to run one\n"));
+               ok = false;
+               goto done;
+       }
+
+       /* Disable IP checks (takeover runs, really) on other nodes
+        * while doing this takeover run.  This will stop those other
+        * nodes from triggering takeover runs when think they should
+        * be hosting an IP but it isn't yet on an interface.  Don't
+        * wait for replies since a failure here might cause some
+        * noise in the logs but will not actually cause a problem.
+        */
+       dtr.srvid = 0; /* No reply */
+       dtr.pnn = -1;
+
+       data.dptr  = (uint8_t*)&dtr;
+       data.dsize = sizeof(dtr);
+
+       nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
+
+       /* Disable for 60 seconds.  This can be a tunable later if
+        * necessary.
+        */
+       dtr.data = 60;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
+               }
+       }
+
+       ret = ctdb_takeover_run(rec->ctdb, nodemap,
+                               rec->force_rebalance_nodes,
+                               takeover_fail_callback,
+                               banning_credits_on_fail ? rec : NULL);
+
+       /* Reenable takeover runs and IP checks on other nodes */
+       dtr.data = 0;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
+               }
+       }
+
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
+               ok = false;
+               goto done;
+       }
+
+       ok = true;
+       /* Takeover run was successful so clear force rebalance targets */
+       if (rebalance_nodes == rec->force_rebalance_nodes) {
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       } else {
+               DEBUG(DEBUG_WARNING,
+                     ("Rebalance target nodes changed during takeover run - not clearing\n"));
+       }
+done:
+       rec->need_takeover_run = !ok;
+       talloc_free(nodes);
+       rec->takeover_run_in_progress = false;
+
+       DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
+       return ok;
+}
+
+
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
@@ -1485,30 +1757,19 @@ static int do_recovery(struct ctdb_recoverd *rec,
        uint32_t *nodes;
        struct timeval start_time;
        uint32_t culprit = (uint32_t)-1;
+       bool self_ban;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       for (i=0; i<ctdb->num_nodes; i++) {
-               struct ctdb_banning_state *ban_state;
-
-               if (ctdb->nodes[i]->ban_state == NULL) {
-                       continue;
-               }
-               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
-               if (ban_state->count < 2*ctdb->num_nodes) {
-                       continue;
-               }
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
-                       ctdb->nodes[i]->pnn, ban_state->count,
-                       ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
-               ban_state->count = 0;
+       ban_misbehaving_nodes(rec, &self_ban);
+       if (self_ban) {
+               DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
+               return -1;
        }
 
-
         if (ctdb->tunable.verify_recovery_lock != 0) {
                DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
                start_time = timeval_current();
@@ -1587,8 +1848,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
                ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
-                       return -1;
+                       if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                               DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
+                       } else {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                               return -1;
+                       }
                }
        }
 
@@ -1748,9 +2013,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
 
-       /*
-         tell nodes to takeover their public IPs
-        */
+       /* Fetch known/available public IPs from each active node */
        ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
@@ -1758,15 +2021,11 @@ static int do_recovery(struct ctdb_recoverd *rec,
                rec->need_takeover_run = true;
                return -1;
        }
-       rec->need_takeover_run = false;
-       ret = ctdb_takeover_run(ctdb, nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
-               rec->need_takeover_run = true;
-       }
+
+       do_takeover_run(rec, nodemap, false);
 
        /* execute the "recovered" event script on all nodes */
-       ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
+       ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
        if (ret!=0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
                return -1;
@@ -1882,12 +2141,12 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
        /* we cant win if we are banned */
        if (rec->node_flags & NODE_FLAGS_BANNED) {
                return false;
-       }       
+       }
 
        /* we cant win if we are stopped */
        if (rec->node_flags & NODE_FLAGS_STOPPED) {
                return false;
-       }       
+       }
 
        /* we will automatically win if the other node is banned */
        if (em->node_flags & NODE_FLAGS_BANNED) {
@@ -2011,14 +2270,14 @@ static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA *dump;
        int ret;
-       struct rd_memdump_reply *rd;
+       struct srvid_request *rd;
 
-       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+       if (data.dsize != sizeof(struct srvid_request)) {
                DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
                talloc_free(tmp_ctx);
                return;
        }
-       rd = (struct rd_memdump_reply *)data.dptr;
+       rd = (struct srvid_request *)data.dptr;
 
        dump = talloc_zero(tmp_ctx, TDB_DATA);
        if (dump == NULL) {
@@ -2045,6 +2304,47 @@ DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));
        talloc_free(tmp_ctx);
 }
 
+/*
+  handler for getlog
+*/
+static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                          TDB_DATA data, void *private_data)
+{
+       struct ctdb_get_log_addr *log_addr;
+       pid_t child;
+
+       if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
+               DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
+               return;
+       }
+       log_addr = (struct ctdb_get_log_addr *)data.dptr;
+
+       child = ctdb_fork_no_free_ringbuffer(ctdb);
+       if (child == (pid_t)-1) {
+               DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
+               return;
+       }
+
+       if (child == 0) {
+               ctdb_set_process_name("ctdb_rec_log_collector");
+               if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
+                       DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
+                       _exit(1);
+               }
+               ctdb_collect_log(ctdb, log_addr);
+               _exit(0);
+       }
+}
+
+/*
+  handler for clearlog
+*/
+static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
+                            TDB_DATA data, void *private_data)
+{
+       ctdb_clear_log(ctdb);
+}
+
 /*
   handler for reload_nodes
 */
@@ -2055,46 +2355,41 @@ static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
 
-       reload_nodes_file(rec->ctdb);
+       ctdb_load_nodes_file(rec->ctdb);
 }
 
 
-static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
-                             struct timeval yt, void *p)
+static void ctdb_rebalance_timeout(struct event_context *ev,
+                                  struct timed_event *te,
+                                  struct timeval t, void *p)
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
 
-       talloc_free(rec->ip_check_disable_ctx);
-       rec->ip_check_disable_ctx = NULL;
-}
-
-
-static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
-                                 struct timeval t, void *p)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
-       struct ctdb_context *ctdb = rec->ctdb;
-       int ret;
-
-       DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
-
-       ret = ctdb_takeover_run(ctdb, rec->nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
-               rec->need_takeover_run = true;
+       if (rec->force_rebalance_nodes == NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Rebalance timeout occurred - no nodes to rebalance\n"));
+               return;
        }
 
-       talloc_free(rec->deferred_rebalance_ctx);
-       rec->deferred_rebalance_ctx = NULL;
+       DEBUG(DEBUG_NOTICE,
+             ("Rebalance timeout occurred - do takeover run\n"));
+       do_takeover_run(rec, rec->nodemap, false);
 }
 
        
-static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
+                                       uint64_t srvid,
+                                       TDB_DATA data, void *private_data)
 {
        uint32_t pnn;
+       uint32_t *t;
+       int len;
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
 
+       if (rec->recmaster != ctdb_get_pnn(ctdb)) {
+               return;
+       }
+
        if (data.dsize != sizeof(uint32_t)) {
                DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
                return;
@@ -2106,14 +2401,33 @@ static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvi
 
        pnn = *(uint32_t *)&data.dptr[0];
 
-       lcp2_forcerebalance(ctdb, pnn);
-       DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
+       DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
 
-       if (rec->deferred_rebalance_ctx != NULL) {
-               talloc_free(rec->deferred_rebalance_ctx);
+       /* Copy any existing list of nodes.  There's probably some
+        * sort of realloc variant that will do this but we need to
+        * make sure that freeing the old array also cancels the timer
+        * event for the timeout... not sure if realloc will do that.
+        */
+       len = (rec->force_rebalance_nodes != NULL) ?
+               talloc_array_length(rec->force_rebalance_nodes) :
+               0;
+
+       /* This allows duplicates to be added but they don't cause
+        * harm.  A call to add a duplicate PNN arguably means that
+        * the timeout should be reset, so this is the simplest
+        * solution.
+        */
+       t = talloc_zero_array(rec, uint32_t, len+1);
+       CTDB_NO_MEMORY_VOID(ctdb, t);
+       if (len > 0) {
+               memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
        }
-       rec->deferred_rebalance_ctx = talloc_new(rec);
-       event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
+       t[len] = pnn;
+
+       talloc_free(rec->force_rebalance_nodes);
+
+       rec->force_rebalance_nodes = t;
+       event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
                        timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
                        ctdb_rebalance_timeout, rec);
 }
@@ -2142,75 +2456,163 @@ static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid,
 }
 
 
-static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+       TALLOC_FREE(rec->takeover_runs_disable_ctx);
+}
+
+static void reenable_takeover_runs(struct event_context *ev,
+                                  struct timed_event *te,
+                                  struct timeval yt, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
+
+       DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
+       clear_takeover_runs_disable(rec);
+}
+
+static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
+                                         uint64_t srvid, TDB_DATA data,
+                                         void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data,
+                                                   struct ctdb_recoverd);
+       struct srvid_request *r;
        uint32_t timeout;
+       TDB_DATA result;
+       int32_t ret = 0;
 
-       if (rec->ip_check_disable_ctx != NULL) {
-               talloc_free(rec->ip_check_disable_ctx);
-               rec->ip_check_disable_ctx = NULL;
+       /* Validate input data */
+       if (data.dsize != sizeof(struct srvid_request)) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
+                                "expecting %lu\n", (long unsigned)data.dsize,
+                                (long unsigned)sizeof(struct srvid_request)));
+               ret = -EINVAL;
+               goto done;
        }
+       if (data.dptr == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
+               ret = -EINVAL;
+               goto done;
+       }
+
+       r = (struct srvid_request *)data.dptr;
+       timeout = r->data;
+
+       if (timeout == 0) {
+               DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
+               clear_takeover_runs_disable(rec);
+               ret = ctdb_get_pnn(ctdb);
+               goto done;
+       }
+
+       if (rec->node_flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(DEBUG_ERR,
+                     ("Refusing to disable takeover runs on inactive node\n"));
+               ret = -EHOSTDOWN;
+               goto done;
+       }
+
+       if (rec->takeover_run_in_progress) {
+               DEBUG(DEBUG_ERR,
+                     ("Unable to disable takeover runs - in progress\n"));
+               ret = -EAGAIN;
+               goto done;
+       }
+
+       DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
+
+       /* Clear any old timers */
+       clear_takeover_runs_disable(rec);
+
+       /* When this is non-NULL it indicates that takeover runs are
+        * disabled.  This context also holds the timeout timer.
+        */
+       rec->takeover_runs_disable_ctx = talloc_new(rec);
+       if (rec->takeover_runs_disable_ctx == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
+               ret = -ENOMEM;
+               goto done;
+       }
+
+       /* Arrange for the timeout to occur */
+       event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
+                       timeval_current_ofs(timeout, 0),
+                       reenable_takeover_runs,
+                       rec);
+
+       /* Returning our PNN tells the caller that we succeeded */
+       ret = ctdb_get_pnn(ctdb);
+done:
+       result.dsize = sizeof(int32_t);
+       result.dptr  = (uint8_t *)&ret;
+       srvid_request_reply(ctdb, r, result);
+}
+
+/* Backward compatibility for this SRVID - call
+ * disable_takeover_runs_handler() instead
+ */
+static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
+                                    TDB_DATA data, void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data,
+                                                   struct ctdb_recoverd);
+       TDB_DATA data2;
+       struct srvid_request *req;
 
        if (data.dsize != sizeof(uint32_t)) {
                DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
-                                "expexting %lu\n", (long unsigned)data.dsize,
+                                "expecting %lu\n", (long unsigned)data.dsize,
                                 (long unsigned)sizeof(uint32_t)));
                return;
        }
        if (data.dptr == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
+               DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
                return;
        }
 
-       timeout = *((uint32_t *)data.dptr);
-       DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
+       req = talloc(ctdb, struct srvid_request);
+       CTDB_NO_MEMORY_VOID(ctdb, req);
 
-       rec->ip_check_disable_ctx = talloc_new(rec);
-       CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
+       req->srvid = 0; /* No reply */
+       req->pnn = -1;
+       req->data = *((uint32_t *)data.dptr); /* Timeout */
 
-       event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
-}
+       data2.dsize = sizeof(*req);
+       data2.dptr = (uint8_t *)req;
 
+       disable_takeover_runs_handler(rec->ctdb,
+                                     CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                     data2, rec);
+}
 
 /*
-  handler for ip reallocate, just add it to the list of callers and 
+  handler for ip reallocate, just add it to the list of requests and 
   handle this later in the monitor_cluster loop so we do not recurse
-  with other callers to takeover_run()
+  with other requests to takeover_run()
 */
-static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
+                                 TDB_DATA data, void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ip_reallocate_list *caller;
+       struct srvid_request *request;
+       struct ctdb_recoverd *rec = talloc_get_type(private_data,
+                                                   struct ctdb_recoverd);
 
-       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+       if (data.dsize != sizeof(struct srvid_request)) {
                DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
                return;
        }
 
-       if (rec->ip_reallocate_ctx == NULL) {
-               rec->ip_reallocate_ctx = talloc_new(rec);
-               CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
-       }
-
-       caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
-       CTDB_NO_MEMORY_FATAL(ctdb, caller);
+       request = (struct srvid_request *)data.dptr;
 
-       caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
-       caller->next = rec->reallocate_callers;
-       rec->reallocate_callers = caller;
-
-       return;
+       srvid_request_add(ctdb, &rec->reallocate_requests, request);
 }
 
-static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
+static void process_ipreallocate_requests(struct ctdb_context *ctdb,
+                                         struct ctdb_recoverd *rec)
 {
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA result;
        int32_t ret;
-       struct ip_reallocate_list *callers;
        uint32_t culprit;
 
        DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
@@ -2225,39 +2627,17 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb
                rec->need_takeover_run = true;
        }
        if (ret == 0) {
-               ret = ctdb_takeover_run(ctdb, rec->nodemap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
-                       rec->need_takeover_run = true;
+               if (do_takeover_run(rec, rec->nodemap, false)) {
+                       ret = ctdb_get_pnn(ctdb);
+               } else {
+                       ret = -1;
                }
        }
 
        result.dsize = sizeof(int32_t);
        result.dptr  = (uint8_t *)&ret;
 
-       for (callers=rec->reallocate_callers; callers; callers=callers->next) {
-
-               /* Someone that sent srvid==0 does not want a reply */
-               if (callers->rd->srvid == 0) {
-                       continue;
-               }
-               DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
-                                 "%u:%llu\n", (unsigned)callers->rd->pnn,
-                                 (unsigned long long)callers->rd->srvid));
-               ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
-                                        "message to %u:%llu\n",
-                                        (unsigned)callers->rd->pnn,
-                                        (unsigned long long)callers->rd->srvid));
-               }
-       }
-
-       talloc_free(tmp_ctx);
-       talloc_free(rec->ip_reallocate_ctx);
-       rec->ip_reallocate_ctx = NULL;
-       rec->reallocate_callers = NULL;
-       
+       srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
 }
 
 
@@ -2401,7 +2781,7 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       if (nodemap->nodes[i].flags != c->new_flags) {
+       if (c->old_flags != c->new_flags) {
                DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
@@ -2514,7 +2894,7 @@ static void verify_recmode_normal_callback(struct ctdb_client_control_state *sta
           status field
        */
        if (state->status != CTDB_RECOVERY_NORMAL) {
-               DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
                rmdata->status = MONITOR_RECOVERY_NEEDED;
        }
 
@@ -2605,7 +2985,7 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
           status field
        */
        if (state->status != rmdata->pnn) {
-               DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
                ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
                rmdata->status = MONITOR_ELECTION_NEEDED;
        }
@@ -2670,18 +3050,74 @@ static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ct
        return status;
 }
 
+static bool interfaces_have_changed(struct ctdb_context *ctdb,
+                                   struct ctdb_recoverd *rec)
+{
+       struct ctdb_control_get_ifaces *ifaces = NULL;
+       TALLOC_CTX *mem_ctx;
+       bool ret = false;
+
+       mem_ctx = talloc_new(NULL);
+
+       /* Read the interfaces from the local node */
+       if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
+                                CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
+               /* We could return an error.  However, this will be
+                * rare so we'll decide that the interfaces have
+                * actually changed, just in case.
+                */
+               talloc_free(mem_ctx);
+               return true;
+       }
+
+       if (!rec->ifaces) {
+               /* We haven't been here before so things have changed */
+               DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
+               ret = true;
+       } else if (rec->ifaces->num != ifaces->num) {
+               /* Number of interfaces has changed */
+               DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
+                                    rec->ifaces->num, ifaces->num));
+               ret = true;
+       } else {
+               /* See if interface names or link states have changed */
+               int i;
+               for (i = 0; i < rec->ifaces->num; i++) {
+                       struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
+                       if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
+                               DEBUG(DEBUG_NOTICE,
+                                     ("Interface in slot %d changed: %s => %s\n",
+                                      i, iface->name, ifaces->ifaces[i].name));
+                               ret = true;
+                               break;
+                       }
+                       if (iface->link_state != ifaces->ifaces[i].link_state) {
+                               DEBUG(DEBUG_NOTICE,
+                                     ("Interface %s changed state: %d => %d\n",
+                                      iface->name, iface->link_state,
+                                      ifaces->ifaces[i].link_state));
+                               ret = true;
+                               break;
+                       }
+               }
+       }
+
+       talloc_free(rec->ifaces);
+       rec->ifaces = talloc_steal(rec, ifaces);
+
+       talloc_free(mem_ctx);
+       return ret;
+}
 
 /* called to check that the local allocation of public ip addresses is ok.
 */
 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
 {
        TALLOC_CTX *mem_ctx = talloc_new(NULL);
-       struct ctdb_control_get_ifaces *ifaces = NULL;
-       struct ctdb_all_public_ips *ips = NULL;
        struct ctdb_uptime *uptime1 = NULL;
        struct ctdb_uptime *uptime2 = NULL;
        int ret, j;
-       bool need_iface_check = false;
        bool need_takeover_run = false;
 
        ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
@@ -2692,38 +3128,13 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
                return -1;
        }
 
-
-       /* read the interfaces from the local node */
-       ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
-               talloc_free(mem_ctx);
-               return -1;
-       }
-
-       if (!rec->ifaces) {
-               need_iface_check = true;
-       } else if (rec->ifaces->num != ifaces->num) {
-               need_iface_check = true;
-       } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
-               need_iface_check = true;
-       }
-
-       if (need_iface_check) {
+       if (interfaces_have_changed(ctdb, rec)) {
                DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
                                     "local node %u - force takeover run\n",
                                     pnn));
                need_takeover_run = true;
        }
 
-       /* read the ip allocation from the local node */
-       ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
-               talloc_free(mem_ctx);
-               return -1;
-       }
-
        ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
                                CTDB_CURRENT_NODE, &uptime2);
        if (ret != 0) {
@@ -2757,9 +3168,6 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
                return 0;
        }
 
-       talloc_free(rec->ifaces);
-       rec->ifaces = talloc_steal(rec, ifaces);
-
        /* verify that we have the ip addresses we should have
           and we dont have ones we shouldnt have.
           if we find an inconsistency we set recmode to
@@ -2769,29 +3177,59 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
           we also request a ip reallocation.
        */
        if (ctdb->tunable.disable_ip_failover == 0) {
+               struct ctdb_all_public_ips *ips = NULL;
+
+               /* read the *available* IPs from the local node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
+                       talloc_free(mem_ctx);
+                       return -1;
+               }
+
                for (j=0; j<ips->num; j++) {
-                       if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
-                               DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
-                                               ctdb_addr_to_str(&ips->ips[j].addr)));
+                       if (ips->ips[j].pnn == -1 &&
+                           nodemap->nodes[pnn].flags == 0) {
+                               DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
+                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
                                need_takeover_run = true;
-                       } else if (ips->ips[j].pnn == pnn) {
+                       }
+               }
+
+               talloc_free(ips);
+
+               /* read the *known* IPs from the local node */
+               ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
+                       talloc_free(mem_ctx);
+                       return -1;
+               }
+
+               for (j=0; j<ips->num; j++) {
+                       if (ips->ips[j].pnn == pnn) {
                                if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
-                                       DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
+                                       DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
                                                ctdb_addr_to_str(&ips->ips[j].addr)));
                                        need_takeover_run = true;
                                }
                        } else {
-                               if (ctdb->do_checkpublicip && ctdb_sys_have_ip(&ips->ips[j].addr)) {
-                                       DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
+                               if (ctdb->do_checkpublicip &&
+                                   ctdb_sys_have_ip(&ips->ips[j].addr)) {
+
+                                       DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
                                                ctdb_addr_to_str(&ips->ips[j].addr)));
-                                       need_takeover_run = true;
+
+                                       if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
+                                               DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
+                                       }
                                }
                        }
                }
        }
 
        if (need_takeover_run) {
-               struct takeover_run_reply rd;
+               struct srvid_request rd;
                TDB_DATA data;
 
                DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
@@ -2872,7 +3310,7 @@ static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
                close(state->fd[1]);
                state->fd[1] = -1;
        }
-       kill(state->child, SIGKILL);
+       ctdb_kill(ctdb, state->child, SIGKILL);
        return 0;
 }
 
@@ -2962,6 +3400,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                close(state->fd[0]);
                state->fd[0] = -1;
 
+               ctdb_set_process_name("ctdb_rec_reclock");
                debug_extra = talloc_asprintf(NULL, "recovery-lock:");
                if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
                        DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
@@ -2970,9 +3409,8 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
 
                write(state->fd[1], &cc, 1);
                /* make sure we die when our parent dies */
-               while (kill(parent, 0) == 0 || errno != ESRCH) {
+               while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
                        sleep(5);
-                       write(state->fd[1], &cc, 1);
                }
                _exit(0);
        }
@@ -3085,11 +3523,11 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int32_t debug_level;
        int i, j, ret;
-
+       bool self_ban;
 
 
        /* verify that the main daemon is still running */
-       if (kill(ctdb->ctdbd_pid, 0) != 0) {
+       if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
                DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
                exit(-1);
        }
@@ -3110,28 +3548,6 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
        LogLevel = debug_level;
 
-
-       /* We must check if we need to ban a node here but we want to do this
-          as early as possible so we dont wait until we have pulled the node
-          map from the local node. thats why we have the hardcoded value 20
-       */
-       for (i=0; i<ctdb->num_nodes; i++) {
-               struct ctdb_banning_state *ban_state;
-
-               if (ctdb->nodes[i]->ban_state == NULL) {
-                       continue;
-               }
-               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
-               if (ban_state->count < 20) {
-                       continue;
-               }
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
-                       ctdb->nodes[i]->pnn, ban_state->count,
-                       ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
-               ban_state->count = 0;
-       }
-
        /* get relevant tunables */
        ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
        if (ret != 0) {
@@ -3155,11 +3571,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                }
        }
 
-       pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-       if (pnn == (uint32_t)-1) {
-               DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
-               return;
-       }
+       pnn = ctdb_get_pnn(ctdb);
 
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
@@ -3182,73 +3594,96 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
        nodemap = rec->nodemap;
 
-       /* update the capabilities for all nodes */
-       ret = update_capabilities(ctdb, nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
-               return;
-       }
-
-       /* check which node is the recovery master */
-       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
-               return;
-       }
-
-       /* if we are not the recmaster we can safely ignore any ip reallocate requests */
-       if (rec->recmaster != pnn) {
-               if (rec->ip_reallocate_ctx != NULL) {
-                       talloc_free(rec->ip_reallocate_ctx);
-                       rec->ip_reallocate_ctx = NULL;
-                       rec->reallocate_callers = NULL;
-               }
-       }
+       /* remember our own node flags */
+       rec->node_flags = nodemap->nodes[pnn].flags;
 
-       if (rec->recmaster == (uint32_t)-1) {
-               DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
-               force_election(rec, pnn, nodemap);
+       ban_misbehaving_nodes(rec, &self_ban);
+       if (self_ban) {
+               DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
                return;
        }
 
-       /* if the local daemon is STOPPED, we verify that the databases are
-          also frozen and thet the recmode is set to active 
+       /* if the local daemon is STOPPED or BANNED, we verify that the databases are
+          also frozen and that the recmode is set to active.
        */
-       if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
+       if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
+               /* If this node has become inactive then we want to
+                * reduce the chances of it taking over the recovery
+                * master role when it becomes active again.  This
+                * helps to stabilise the recovery master role so that
+                * it stays on the most stable node.
+                */
+               rec->priority_time = timeval_current();
+
                ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
                }
                if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
-                       DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
+                       DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
                        ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
                        if (ret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
                                return;
                        }
                        ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
                        if (ret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
 
                                return;
                        }
-                       return;
                }
+
+               /* If this node is stopped or banned then it is not the recovery
+                * master, so don't do anything. This prevents stopped or banned
+                * node from starting election and sending unnecessary controls.
+                */
+               return;
        }
-       /* If the local node is stopped, verify we are not the recmaster 
-          and yield this role if so
-       */
-       if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
-               DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
+
+       /* check which node is the recovery master */
+       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
+               return;
+       }
+
+       /* If we are not the recmaster then do some housekeeping */
+       if (rec->recmaster != pnn) {
+               /* Ignore any IP reallocate requests - only recmaster
+                * processes them
+                */
+               TALLOC_FREE(rec->reallocate_requests);
+               /* Clear any nodes that should be force rebalanced in
+                * the next takeover run.  If the recovery master role
+                * has moved then we don't want to process these some
+                * time in the future.
+                */
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       }
+
+       /* This is a special case.  When recovery daemon is started, recmaster
+        * is set to -1.  If a node is not started in stopped state, then
+        * start election to decide recovery master
+        */
+       if (rec->recmaster == (uint32_t)-1) {
+               DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
                force_election(rec, pnn, nodemap);
                return;
        }
-       
+
+       /* update the capabilities for all nodes */
+       ret = update_capabilities(ctdb, nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
+               return;
+       }
+
        /*
-        * if the current recmaster do not have CTDB_CAP_RECMASTER,
-        * but we have force an election and try to become the new
-        * recmaster
+        * If the current recmaster does not have CTDB_CAP_RECMASTER,
+        * but we have, then force an election and try to become the new
+        * recmaster.
         */
        if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
            (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
@@ -3260,20 +3695,16 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* check that we (recovery daemon) and the local ctdb daemon
-          agrees on whether we are banned or not
-       */
-//qqq
-
-       /* remember our own node flags */
-       rec->node_flags = nodemap->nodes[pnn].flags;
-
        /* count how many active nodes there are */
        rec->num_active    = 0;
+       rec->num_lmasters  = 0;
        rec->num_connected = 0;
        for (i=0; i<nodemap->num; i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
                        rec->num_active++;
+                       if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
+                               rec->num_lmasters++;
+                       }
                }
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
                        rec->num_connected++;
@@ -3301,7 +3732,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* grap the nodemap from the recovery master to check if it is banned */
+       /* get nodemap from the recovery master to check if it is inactive */
        ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
                                   mem_ctx, &recmaster_nodemap);
        if (ret != 0) {
@@ -3311,21 +3742,26 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
-       if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+       if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
+           (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
+               /*
+                * update our nodemap to carry the recmaster's notion of
+                * its own flags, so that we don't keep freezing the
+                * inactive recmaster node...
+                */
+               nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
                force_election(rec, pnn, nodemap);
                return;
        }
 
-
        /* verify that we have all ip addresses we should have and we dont
         * have addresses we shouldnt have.
         */ 
-       if (ctdb->tunable.disable_ip_failover == 0) {
-               if (rec->ip_check_disable_ctx == NULL) {
-                       if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
-                               DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
-                       }
+       if (ctdb->tunable.disable_ip_failover == 0 &&
+           rec->takeover_runs_disable_ctx == NULL) {
+               if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
                }
        }
 
@@ -3352,7 +3788,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
        if (ctdb->num_nodes != nodemap->num) {
                DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
-               reload_nodes_file(ctdb);
+               ctdb_load_nodes_file(ctdb);
                return;
        }
 
@@ -3404,8 +3840,10 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                }
        }
 
+
        /* if there are takeovers requested, perform it and notify the waiters */
-       if (rec->reallocate_callers) {
+       if (rec->takeover_runs_disable_ctx == NULL &&
+           rec->reallocate_requests) {
                process_ipreallocate_requests(ctdb, rec);
        }
 
@@ -3463,6 +3901,23 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                                return;
                        }
                }
+       }
+
+       /*
+        * Update node flags obtained from each active node. This ensure we have
+        * up-to-date information for all the nodes.
+        */
+       for (j=0; j<nodemap->num; j++) {
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
+       }
+
+       for (j=0; j<nodemap->num; j++) {
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
 
                /* verify the flags are consistent
                */
@@ -3476,7 +3931,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                                  nodemap->nodes[j].pnn, 
                                  nodemap->nodes[i].pnn, 
                                  remote_nodemaps[j]->nodes[i].flags,
-                                 nodemap->nodes[j].flags));
+                                 nodemap->nodes[i].flags));
                                if (i == j) {
                                        DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
                                        update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
@@ -3497,12 +3952,13 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
-       /* there better be the same number of lmasters in the vnn map
-          as there are active nodes or we will have to do a recovery
+       /* There must be the same number of lmasters in the vnn map as
+        * there are active nodes with the lmaster capability...  or
+        * do a recovery.
         */
-       if (vnnmap->size != rec->num_active) {
-               DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
-                         vnnmap->size, rec->num_active));
+       if (vnnmap->size != rec->num_lmasters) {
+               DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
+                         vnnmap->size, rec->num_lmasters));
                ctdb_set_culprit(rec, ctdb->pnn);
                do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                return;
@@ -3610,14 +4066,21 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                        return;
                }
 
-               ret = ctdb_takeover_run(ctdb, nodemap);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
+               /* If takeover run fails, then the offending nodes are
+                * assigned ban culprit counts. And we re-try takeover.
+                * If takeover run fails repeatedly, the node would get
+                * banned.
+                *
+                * If rec->need_takeover_run is not set to true at this
+                * failure, monitoring is disabled cluster-wide (via
+                * startrecovery eventscript) and will not get enabled.
+                */
+               if (!do_takeover_run(rec, nodemap, true)) {
                        return;
                }
 
                /* execute the "recovered" event script on all nodes */
-               ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
+               ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
 #if 0
 // we cant check whether the event completed successfully
 // since this script WILL fail if the node is in recovery mode
@@ -3646,11 +4109,19 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->ctdb = ctdb;
 
+       rec->takeover_run_in_progress = false;
+
        rec->priority_time = timeval_current();
 
        /* register a message port for sending memory dumps */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
 
+       /* register a message port for requesting logs */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
+
+       /* register a message port for clearing logs */
+       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
+
        /* register a message port for recovery elections */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
 
@@ -3679,6 +4150,11 @@ static void monitor_cluster(struct ctdb_context *ctdb)
           reallocation */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
 
+       /* Register a message port for disabling takeover runs */
+       ctdb_client_set_message_handler(ctdb,
+                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                       disable_takeover_runs_handler, rec);
+
        for (;;) {
                TALLOC_CTX *mem_ctx = talloc_new(ctdb);
                struct timeval start;
@@ -3721,7 +4197,7 @@ static void ctdb_check_recd(struct event_context *ev, struct timed_event *te,
 {
        struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
 
-       if (kill(ctdb->recoverd_pid, 0) != 0) {
+       if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
                DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
 
                event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
@@ -3730,7 +4206,7 @@ static void ctdb_check_recd(struct event_context *ev, struct timed_event *te,
                return;
        }
 
-       event_add_timed(ctdb->ev, ctdb
+       event_add_timed(ctdb->ev, ctdb->recd_ctx,
                        timeval_current_ofs(30, 0),
                        ctdb_check_recd, ctdb);
 }
@@ -3773,14 +4249,18 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        ctdb->ctdbd_pid = getpid();
 
-       ctdb->recoverd_pid = fork();
+       ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
        if (ctdb->recoverd_pid == -1) {
                return -1;
        }
-       
+
        if (ctdb->recoverd_pid != 0) {
+               talloc_free(ctdb->recd_ctx);
+               ctdb->recd_ctx = talloc_new(ctdb);
+               CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
+
                close(fd[0]);
-               event_add_timed(ctdb->ev, ctdb
+               event_add_timed(ctdb->ev, ctdb->recd_ctx,
                                timeval_current_ofs(30, 0),
                                ctdb_check_recd, ctdb);
                return 0;
@@ -3790,6 +4270,10 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
 
        srandom(getpid() ^ time(NULL));
 
+       /* Clear the log ringbuffer */
+       ctdb_clear_log(ctdb);
+
+       ctdb_set_process_name("ctdb_recovered");
        if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
                DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
                exit(1);
@@ -3798,7 +4282,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
 
        fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
-                    ctdb_recoverd_parent, &fd[0]);     
+                    ctdb_recoverd_parent, &fd[0]);
        tevent_fd_set_auto_close(fde);
 
        /* set up a handler to pick up sigchld */
@@ -3827,7 +4311,10 @@ void ctdb_stop_recoverd(struct ctdb_context *ctdb)
        }
 
        DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
-       kill(ctdb->recoverd_pid, SIGTERM);
+       ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
+
+       TALLOC_FREE(ctdb->recd_ctx);
+       TALLOC_FREE(ctdb->recd_ping_count);
 }
 
 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te,