Revert "recoverd: Disable takeover runs on other nodes for 5 minutes"
[ctdb.git] / server / ctdb_recoverd.c
index de733a99d77d8e25c3e524b58a175d6e6f2c3822..e5c2887f44f37a78cc7b88b1ba6be357b7d3dd80 100644 (file)
 #include "dlinklist.h"
 
 
-/* most recent reload all ips request we need to perform during the 
-   next monitoring loop
-*/
-struct reloadips_all_reply *reload_all_ips_request = NULL;
+/* List of SRVID requests that need to be processed */
+struct srvid_list {
+       struct srvid_list *next, *prev;
+       struct srvid_request *request;
+};
 
-/* list of "ctdb ipreallocate" processes to call back when we have
-   finished the takeover run.
-*/
-struct ip_reallocate_list {
-       struct ip_reallocate_list *next;
-       struct rd_memdump_reply *rd;
+struct srvid_requests {
+       struct srvid_list *requests;
 };
 
+static void srvid_request_reply(struct ctdb_context *ctdb,
+                               struct srvid_request *request,
+                               TDB_DATA result)
+{
+       /* Someone that sent srvid==0 does not want a reply */
+       if (request->srvid == 0) {
+               talloc_free(request);
+               return;
+       }
+
+       if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
+                                    result) == 0) {
+               DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
+                                 (unsigned)request->pnn,
+                                 (unsigned long long)request->srvid));
+       } else {
+               DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
+                                (unsigned)request->pnn,
+                                (unsigned long long)request->srvid));
+       }
+
+       talloc_free(request);
+}
+
+static void srvid_requests_reply(struct ctdb_context *ctdb,
+                                struct srvid_requests **requests,
+                                TDB_DATA result)
+{
+       struct srvid_list *r;
+
+       for (r = (*requests)->requests; r != NULL; r = r->next) {
+               srvid_request_reply(ctdb, r->request, result);
+       }
+
+       /* Free the list structure... */
+       TALLOC_FREE(*requests);
+}
+
+static void srvid_request_add(struct ctdb_context *ctdb,
+                             struct srvid_requests **requests,
+                             struct srvid_request *request)
+{
+       struct srvid_list *t;
+       int32_t ret;
+       TDB_DATA result;
+
+       if (*requests == NULL) {
+               *requests = talloc_zero(ctdb, struct srvid_requests);
+               if (*requests == NULL) {
+                       goto nomem;
+               }
+       }
+
+       t = talloc_zero(*requests, struct srvid_list);
+       if (t == NULL) {
+               /* If *requests was just allocated above then free it */
+               if ((*requests)->requests == NULL) {
+                       TALLOC_FREE(*requests);
+               }
+               goto nomem;
+       }
+
+       t->request = (struct srvid_request *)talloc_steal(t, request);
+       DLIST_ADD((*requests)->requests, t);
+
+       return;
+
+nomem:
+       /* Failed to add the request to the list.  Send a fail. */
+       DEBUG(DEBUG_ERR, (__location__
+                         " Out of memory, failed to queue SRVID request\n"));
+       ret = -ENOMEM;
+       result.dsize = sizeof(ret);
+       result.dptr = (uint8_t *)&ret;
+       srvid_request_reply(ctdb, request, result);
+}
+
 struct ctdb_banning_state {
        uint32_t count;
        struct timeval last_reported_time;
@@ -55,6 +129,7 @@ struct ctdb_recoverd {
        struct ctdb_context *ctdb;
        uint32_t recmaster;
        uint32_t num_active;
+       uint32_t num_lmasters;
        uint32_t num_connected;
        uint32_t last_culprit_node;
        struct ctdb_node_map *nodemap;
@@ -65,11 +140,11 @@ struct ctdb_recoverd {
        struct timed_event *send_election_te;
        struct timed_event *election_timeout;
        struct vacuum_info *vacuum_info;
-       TALLOC_CTX *ip_reallocate_ctx;
-       struct ip_reallocate_list *reallocate_callers;
-       TALLOC_CTX *ip_check_disable_ctx;
+       struct srvid_requests *reallocate_requests;
+       bool takeover_run_in_progress;
+       TALLOC_CTX *takeover_runs_disable_ctx;
        struct ctdb_control_get_ifaces *ifaces;
-       TALLOC_CTX *deferred_rebalance_ctx;
+       uint32_t *force_rebalance_nodes;
 };
 
 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
@@ -120,6 +195,12 @@ static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit,
                return;
        }
 
+       /* If we are banned or stopped, do not set other nodes as culprits */
+       if (rec->node_flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
+               return;
+       }
+
        if (ctdb->nodes[culprit]->ban_state == NULL) {
                ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
                CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
@@ -1170,7 +1251,7 @@ static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_
        if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
-       tdb_flags |= TDB_DISALLOW_NESTING;
+       tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
 
        recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
                              tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
@@ -1254,8 +1335,8 @@ static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
                params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
        }
        if (params->recdata == NULL) {
-               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
-                        rec->length + params->len, params->recdata->count));
+               DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
+                        rec->length + params->len));
                params->failed = true;
                return -1;
        }
@@ -1400,15 +1481,6 @@ static int recover_database(struct ctdb_recoverd *rec,
        return 0;
 }
 
-/*
-  reload the nodes file 
-*/
-static void reload_nodes_file(struct ctdb_context *ctdb)
-{
-       ctdb->nodes = NULL;
-       ctdb_load_nodes_file(ctdb);
-}
-
 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
                                         struct ctdb_recoverd *rec,
                                         struct ctdb_node_map *nodemap,
@@ -1427,57 +1499,62 @@ static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
        }
 
        for (j=0; j<nodemap->num; j++) {
+               /* For readability */
+               struct ctdb_node *node = ctdb->nodes[j];
+
                /* release any existing data */
-               if (ctdb->nodes[j]->known_public_ips) {
-                       talloc_free(ctdb->nodes[j]->known_public_ips);
-                       ctdb->nodes[j]->known_public_ips = NULL;
+               if (node->known_public_ips) {
+                       talloc_free(node->known_public_ips);
+                       node->known_public_ips = NULL;
                }
-               if (ctdb->nodes[j]->available_public_ips) {
-                       talloc_free(ctdb->nodes[j]->available_public_ips);
-                       ctdb->nodes[j]->available_public_ips = NULL;
+               if (node->available_public_ips) {
+                       talloc_free(node->available_public_ips);
+                       node->available_public_ips = NULL;
                }
 
                if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
 
-               /* grab a new shiny list of public ips from the node */
+               /* Retrieve the list of known public IPs from the node */
                ret = ctdb_ctrl_get_public_ips_flags(ctdb,
                                        CONTROL_TIMEOUT(),
-                                       ctdb->nodes[j]->pnn,
+                                       node->pnn,
                                        ctdb->nodes,
                                        0,
-                                       &ctdb->nodes[j]->known_public_ips);
+                                       &node->known_public_ips);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
-                               ctdb->nodes[j]->pnn));
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to read known public IPs from node: %u\n",
+                              node->pnn));
                        if (culprit) {
-                               *culprit = ctdb->nodes[j]->pnn;
+                               *culprit = node->pnn;
                        }
                        return -1;
                }
 
-               if (ctdb->do_checkpublicip) {
-                       if (rec->ip_check_disable_ctx == NULL) {
-                               if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
-                                       DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
-                                       rec->need_takeover_run = true;
-                               }
-                       }
+               if (ctdb->do_checkpublicip &&
+                   rec->takeover_runs_disable_ctx == NULL &&
+                   verify_remote_ip_allocation(ctdb,
+                                                node->known_public_ips,
+                                                node->pnn)) {
+                       DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
+                       rec->need_takeover_run = true;
                }
 
-               /* grab a new shiny list of public ips from the node */
+               /* Retrieve the list of available public IPs from the node */
                ret = ctdb_ctrl_get_public_ips_flags(ctdb,
                                        CONTROL_TIMEOUT(),
-                                       ctdb->nodes[j]->pnn,
+                                       node->pnn,
                                        ctdb->nodes,
                                        CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
-                                       &ctdb->nodes[j]->available_public_ips);
+                                       &node->available_public_ips);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
-                               ctdb->nodes[j]->pnn));
+                       DEBUG(DEBUG_ERR,
+                             ("Failed to read available public IPs from node: %u\n",
+                              node->pnn));
                        if (culprit) {
-                               *culprit = ctdb->nodes[j]->pnn;
+                               *culprit = node->pnn;
                        }
                        return -1;
                }
@@ -1535,11 +1612,136 @@ static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn,
                DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
 
                ctdb_set_culprit(rec, node_pnn);
-               rec->need_takeover_run = true;
        }
 }
 
 
+static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
+{
+       struct ctdb_context *ctdb = rec->ctdb;
+       int i;
+       struct ctdb_banning_state *ban_state;
+
+       *self_ban = false;
+       for (i=0; i<ctdb->num_nodes; i++) {
+               if (ctdb->nodes[i]->ban_state == NULL) {
+                       continue;
+               }
+               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
+               if (ban_state->count < 2*ctdb->num_nodes) {
+                       continue;
+               }
+
+               DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
+                       ctdb->nodes[i]->pnn, ban_state->count,
+                       ctdb->tunable.recovery_ban_period));
+               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
+               ban_state->count = 0;
+
+               /* Banning ourself? */
+               if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
+                       *self_ban = true;
+               }
+       }
+}
+
+static bool do_takeover_run(struct ctdb_recoverd *rec,
+                           struct ctdb_node_map *nodemap,
+                           bool banning_credits_on_fail)
+{
+       uint32_t *nodes = NULL;
+       struct srvid_request dtr;
+       TDB_DATA data;
+       int i;
+       uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
+       int ret;
+       bool ok;
+
+       DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
+
+       if (rec->takeover_run_in_progress) {
+               DEBUG(DEBUG_ERR, (__location__
+                                 " takeover run already in progress \n"));
+               ok = false;
+               goto done;
+       }
+
+       rec->takeover_run_in_progress = true;
+
+       /* If takeover runs are in disabled then fail... */
+       if (rec->takeover_runs_disable_ctx != NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Takeover runs are disabled so refusing to run one\n"));
+               ok = false;
+               goto done;
+       }
+
+       /* Disable IP checks (takeover runs, really) on other nodes
+        * while doing this takeover run.  This will stop those other
+        * nodes from triggering takeover runs when think they should
+        * be hosting an IP but it isn't yet on an interface.  Don't
+        * wait for replies since a failure here might cause some
+        * noise in the logs but will not actually cause a problem.
+        */
+       dtr.srvid = 0; /* No reply */
+       dtr.pnn = -1;
+
+       data.dptr  = (uint8_t*)&dtr;
+       data.dsize = sizeof(dtr);
+
+       nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
+
+       /* Disable for 60 seconds.  This can be a tunable later if
+        * necessary.
+        */
+       dtr.data = 60;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
+               }
+       }
+
+       ret = ctdb_takeover_run(rec->ctdb, nodemap,
+                               rec->force_rebalance_nodes,
+                               takeover_fail_callback,
+                               banning_credits_on_fail ? rec : NULL);
+
+       /* Reenable takeover runs and IP checks on other nodes */
+       dtr.data = 0;
+       for (i = 0; i < talloc_array_length(nodes); i++) {
+               if (ctdb_client_send_message(rec->ctdb, nodes[i],
+                                            CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                            data) != 0) {
+                       DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
+               }
+       }
+
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
+               ok = false;
+               goto done;
+       }
+
+       ok = true;
+       /* Takeover run was successful so clear force rebalance targets */
+       if (rebalance_nodes == rec->force_rebalance_nodes) {
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       } else {
+               DEBUG(DEBUG_WARNING,
+                     ("Rebalance target nodes changed during takeover run - not clearing\n"));
+       }
+done:
+       rec->need_takeover_run = !ok;
+       talloc_free(nodes);
+       rec->takeover_run_in_progress = false;
+
+       DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
+       return ok;
+}
+
+
 /*
   we are the recmaster, and recovery is needed - start a recovery run
  */
@@ -1555,30 +1757,19 @@ static int do_recovery(struct ctdb_recoverd *rec,
        uint32_t *nodes;
        struct timeval start_time;
        uint32_t culprit = (uint32_t)-1;
+       bool self_ban;
 
        DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
 
        /* if recovery fails, force it again */
        rec->need_recovery = true;
 
-       for (i=0; i<ctdb->num_nodes; i++) {
-               struct ctdb_banning_state *ban_state;
-
-               if (ctdb->nodes[i]->ban_state == NULL) {
-                       continue;
-               }
-               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
-               if (ban_state->count < 2*ctdb->num_nodes) {
-                       continue;
-               }
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
-                       ctdb->nodes[i]->pnn, ban_state->count,
-                       ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
-               ban_state->count = 0;
+       ban_misbehaving_nodes(rec, &self_ban);
+       if (self_ban) {
+               DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
+               return -1;
        }
 
-
         if (ctdb->tunable.verify_recovery_lock != 0) {
                DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
                start_time = timeval_current();
@@ -1657,8 +1848,12 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
                ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
                if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
-                       return -1;
+                       if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                               DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
+                       } else {
+                               DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
+                               return -1;
+                       }
                }
        }
 
@@ -1818,9 +2013,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
 
-       /*
-         tell nodes to takeover their public IPs
-        */
+       /* Fetch known/available public IPs from each active node */
        ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
@@ -1828,12 +2021,8 @@ static int do_recovery(struct ctdb_recoverd *rec,
                rec->need_takeover_run = true;
                return -1;
        }
-       rec->need_takeover_run = false;
-       ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, NULL);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
-               rec->need_takeover_run = true;
-       }
+
+       do_takeover_run(rec, nodemap, false);
 
        /* execute the "recovered" event script on all nodes */
        ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
@@ -1952,12 +2141,12 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
        /* we cant win if we are banned */
        if (rec->node_flags & NODE_FLAGS_BANNED) {
                return false;
-       }       
+       }
 
        /* we cant win if we are stopped */
        if (rec->node_flags & NODE_FLAGS_STOPPED) {
                return false;
-       }       
+       }
 
        /* we will automatically win if the other node is banned */
        if (em->node_flags & NODE_FLAGS_BANNED) {
@@ -2081,14 +2270,14 @@ static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA *dump;
        int ret;
-       struct rd_memdump_reply *rd;
+       struct srvid_request *rd;
 
-       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+       if (data.dsize != sizeof(struct srvid_request)) {
                DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
                talloc_free(tmp_ctx);
                return;
        }
-       rd = (struct rd_memdump_reply *)data.dptr;
+       rd = (struct srvid_request *)data.dptr;
 
        dump = talloc_zero(tmp_ctx, TDB_DATA);
        if (dump == NULL) {
@@ -2137,6 +2326,7 @@ static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid,
        }
 
        if (child == 0) {
+               ctdb_set_process_name("ctdb_rec_log_collector");
                if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
                        DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
                        _exit(1);
@@ -2165,46 +2355,41 @@ static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
 
-       reload_nodes_file(rec->ctdb);
+       ctdb_load_nodes_file(rec->ctdb);
 }
 
 
-static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
-                             struct timeval yt, void *p)
+static void ctdb_rebalance_timeout(struct event_context *ev,
+                                  struct timed_event *te,
+                                  struct timeval t, void *p)
 {
        struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
 
-       talloc_free(rec->ip_check_disable_ctx);
-       rec->ip_check_disable_ctx = NULL;
-}
-
-
-static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
-                                 struct timeval t, void *p)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
-       struct ctdb_context *ctdb = rec->ctdb;
-       int ret;
-
-       DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
-
-       ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
-               rec->need_takeover_run = true;
+       if (rec->force_rebalance_nodes == NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Rebalance timeout occurred - no nodes to rebalance\n"));
+               return;
        }
 
-       talloc_free(rec->deferred_rebalance_ctx);
-       rec->deferred_rebalance_ctx = NULL;
+       DEBUG(DEBUG_NOTICE,
+             ("Rebalance timeout occurred - do takeover run\n"));
+       do_takeover_run(rec, rec->nodemap, false);
 }
 
        
-static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
+                                       uint64_t srvid,
+                                       TDB_DATA data, void *private_data)
 {
        uint32_t pnn;
+       uint32_t *t;
+       int len;
        struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
 
+       if (rec->recmaster != ctdb_get_pnn(ctdb)) {
+               return;
+       }
+
        if (data.dsize != sizeof(uint32_t)) {
                DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
                return;
@@ -2216,14 +2401,33 @@ static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvi
 
        pnn = *(uint32_t *)&data.dptr[0];
 
-       lcp2_forcerebalance(ctdb, pnn);
-       DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
+       DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
 
-       if (rec->deferred_rebalance_ctx != NULL) {
-               talloc_free(rec->deferred_rebalance_ctx);
+       /* Copy any existing list of nodes.  There's probably some
+        * sort of realloc variant that will do this but we need to
+        * make sure that freeing the old array also cancels the timer
+        * event for the timeout... not sure if realloc will do that.
+        */
+       len = (rec->force_rebalance_nodes != NULL) ?
+               talloc_array_length(rec->force_rebalance_nodes) :
+               0;
+
+       /* This allows duplicates to be added but they don't cause
+        * harm.  A call to add a duplicate PNN arguably means that
+        * the timeout should be reset, so this is the simplest
+        * solution.
+        */
+       t = talloc_zero_array(rec, uint32_t, len+1);
+       CTDB_NO_MEMORY_VOID(ctdb, t);
+       if (len > 0) {
+               memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
        }
-       rec->deferred_rebalance_ctx = talloc_new(rec);
-       event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
+       t[len] = pnn;
+
+       talloc_free(rec->force_rebalance_nodes);
+
+       rec->force_rebalance_nodes = t;
+       event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
                        timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
                        ctdb_rebalance_timeout, rec);
 }
@@ -2252,154 +2456,163 @@ static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid,
 }
 
 
-static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       uint32_t timeout;
+       TALLOC_FREE(rec->takeover_runs_disable_ctx);
+}
 
-       if (rec->ip_check_disable_ctx != NULL) {
-               talloc_free(rec->ip_check_disable_ctx);
-               rec->ip_check_disable_ctx = NULL;
-       }
+static void reenable_takeover_runs(struct event_context *ev,
+                                  struct timed_event *te,
+                                  struct timeval yt, void *p)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
 
-       if (data.dsize != sizeof(uint32_t)) {
+       DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
+       clear_takeover_runs_disable(rec);
+}
+
+static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
+                                         uint64_t srvid, TDB_DATA data,
+                                         void *private_data)
+{
+       struct ctdb_recoverd *rec = talloc_get_type(private_data,
+                                                   struct ctdb_recoverd);
+       struct srvid_request *r;
+       uint32_t timeout;
+       TDB_DATA result;
+       int32_t ret = 0;
+
+       /* Validate input data */
+       if (data.dsize != sizeof(struct srvid_request)) {
                DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
-                                "expexting %lu\n", (long unsigned)data.dsize,
-                                (long unsigned)sizeof(uint32_t)));
-               return;
+                                "expecting %lu\n", (long unsigned)data.dsize,
+                                (long unsigned)sizeof(struct srvid_request)));
+               ret = -EINVAL;
+               goto done;
        }
        if (data.dptr == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
-               return;
+               DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
+               ret = -EINVAL;
+               goto done;
        }
 
-       timeout = *((uint32_t *)data.dptr);
+       r = (struct srvid_request *)data.dptr;
+       timeout = r->data;
 
        if (timeout == 0) {
-               DEBUG(DEBUG_NOTICE,("Reenabling ip check\n"));
-               return;
+               DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
+               clear_takeover_runs_disable(rec);
+               ret = ctdb_get_pnn(ctdb);
+               goto done;
        }
-               
-       DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
-
-       rec->ip_check_disable_ctx = talloc_new(rec);
-       CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
-
-       event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
-}
-
 
-/*
-  handler for reload all ips.
-*/
-static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
-{
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-
-       if (data.dsize != sizeof(struct reloadips_all_reply)) {
-               DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
-               return;
+       if (rec->node_flags & NODE_FLAGS_INACTIVE) {
+               DEBUG(DEBUG_ERR,
+                     ("Refusing to disable takeover runs on inactive node\n"));
+               ret = -EHOSTDOWN;
+               goto done;
        }
 
-       reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
+       if (rec->takeover_run_in_progress) {
+               DEBUG(DEBUG_ERR,
+                     ("Unable to disable takeover runs - in progress\n"));
+               ret = -EAGAIN;
+               goto done;
+       }
 
-       DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
-       return;
-}
+       DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
 
-static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
-{
-       uint32_t *status = callback_data;
+       /* Clear any old timers */
+       clear_takeover_runs_disable(rec);
 
-       if (res != 0) {
-               DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
-               *status = 1;
-       }
+       /* When this is non-NULL it indicates that takeover runs are
+        * disabled.  This context also holds the timeout timer.
+        */
+       rec->takeover_runs_disable_ctx = talloc_new(rec);
+       if (rec->takeover_runs_disable_ctx == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
+               ret = -ENOMEM;
+               goto done;
+       }
+
+       /* Arrange for the timeout to occur */
+       event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
+                       timeval_current_ofs(timeout, 0),
+                       reenable_takeover_runs,
+                       rec);
+
+       /* Returning our PNN tells the caller that we succeeded */
+       ret = ctdb_get_pnn(ctdb);
+done:
+       result.dsize = sizeof(int32_t);
+       result.dptr  = (uint8_t *)&ret;
+       srvid_request_reply(ctdb, r, result);
 }
 
-static int
-reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
+/* Backward compatibility for this SRVID - call
+ * disable_takeover_runs_handler() instead
+ */
+static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
+                                    TDB_DATA data, void *private_data)
 {
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
-       uint32_t *nodes;
-       uint32_t status;
-       int i;
+       struct ctdb_recoverd *rec = talloc_get_type(private_data,
+                                                   struct ctdb_recoverd);
+       TDB_DATA data2;
+       struct srvid_request *req;
 
-       DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
-       for (i = 0; i< nodemap->num; i++) {
-               if (nodemap->nodes[i].flags != 0) {
-                       DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
+       if (data.dsize != sizeof(uint32_t)) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
+                                "expecting %lu\n", (long unsigned)data.dsize,
+                                (long unsigned)sizeof(uint32_t)));
+               return;
        }
-
-       /* send the flags update to all connected nodes */
-       nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
-       status = 0;
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
-                                       nodes, 0,
-                                       CONTROL_TIMEOUT(),
-                                       false, tdb_null,
-                                       async_reloadips_callback, NULL,
-                                       &status) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       if (data.dptr == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
+               return;
        }
 
-       if (status != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
+       req = talloc(ctdb, struct srvid_request);
+       CTDB_NO_MEMORY_VOID(ctdb, req);
 
-       ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
+       req->srvid = 0; /* No reply */
+       req->pnn = -1;
+       req->data = *((uint32_t *)data.dptr); /* Timeout */
 
-       talloc_free(tmp_ctx);
-       return 0;
-}
+       data2.dsize = sizeof(*req);
+       data2.dptr = (uint8_t *)req;
 
+       disable_takeover_runs_handler(rec->ctdb,
+                                     CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                     data2, rec);
+}
 
 /*
-  handler for ip reallocate, just add it to the list of callers and 
+  handler for ip reallocate, just add it to the list of requests and 
   handle this later in the monitor_cluster loop so we do not recurse
-  with other callers to takeover_run()
+  with other requests to takeover_run()
 */
-static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
-                            TDB_DATA data, void *private_data)
+static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
+                                 TDB_DATA data, void *private_data)
 {
-       struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
-       struct ip_reallocate_list *caller;
+       struct srvid_request *request;
+       struct ctdb_recoverd *rec = talloc_get_type(private_data,
+                                                   struct ctdb_recoverd);
 
-       if (data.dsize != sizeof(struct rd_memdump_reply)) {
+       if (data.dsize != sizeof(struct srvid_request)) {
                DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
                return;
        }
 
-       if (rec->ip_reallocate_ctx == NULL) {
-               rec->ip_reallocate_ctx = talloc_new(rec);
-               CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
-       }
+       request = (struct srvid_request *)data.dptr;
 
-       caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
-       CTDB_NO_MEMORY_FATAL(ctdb, caller);
-
-       caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
-       caller->next = rec->reallocate_callers;
-       rec->reallocate_callers = caller;
-
-       return;
+       srvid_request_add(ctdb, &rec->reallocate_requests, request);
 }
 
-static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
+static void process_ipreallocate_requests(struct ctdb_context *ctdb,
+                                         struct ctdb_recoverd *rec)
 {
-       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        TDB_DATA result;
        int32_t ret;
-       struct ip_reallocate_list *callers;
        uint32_t culprit;
 
        DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
@@ -2414,39 +2627,17 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb
                rec->need_takeover_run = true;
        }
        if (ret == 0) {
-               ret = ctdb_takeover_run(ctdb, rec->nodemap, takeover_fail_callback, NULL);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
-                       rec->need_takeover_run = true;
+               if (do_takeover_run(rec, rec->nodemap, false)) {
+                       ret = ctdb_get_pnn(ctdb);
+               } else {
+                       ret = -1;
                }
        }
 
        result.dsize = sizeof(int32_t);
        result.dptr  = (uint8_t *)&ret;
 
-       for (callers=rec->reallocate_callers; callers; callers=callers->next) {
-
-               /* Someone that sent srvid==0 does not want a reply */
-               if (callers->rd->srvid == 0) {
-                       continue;
-               }
-               DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
-                                 "%u:%llu\n", (unsigned)callers->rd->pnn,
-                                 (unsigned long long)callers->rd->srvid));
-               ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
-                                        "message to %u:%llu\n",
-                                        (unsigned)callers->rd->pnn,
-                                        (unsigned long long)callers->rd->srvid));
-               }
-       }
-
-       talloc_free(tmp_ctx);
-       talloc_free(rec->ip_reallocate_ctx);
-       rec->ip_reallocate_ctx = NULL;
-       rec->reallocate_callers = NULL;
-       
+       srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
 }
 
 
@@ -2590,8 +2781,8 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
                return;
        }
 
-       if (nodemap->nodes[i].flags != c->new_flags) {
-               DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
+       if (c->old_flags != c->new_flags) {
+               DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
        }
 
        disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
@@ -2703,7 +2894,7 @@ static void verify_recmode_normal_callback(struct ctdb_client_control_state *sta
           status field
        */
        if (state->status != CTDB_RECOVERY_NORMAL) {
-               DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
                rmdata->status = MONITOR_RECOVERY_NEEDED;
        }
 
@@ -2794,7 +2985,7 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
           status field
        */
        if (state->status != rmdata->pnn) {
-               DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
                ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
                rmdata->status = MONITOR_ELECTION_NEEDED;
        }
@@ -2882,17 +3073,30 @@ static bool interfaces_have_changed(struct ctdb_context *ctdb,
 
        if (!rec->ifaces) {
                /* We haven't been here before so things have changed */
+               DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
                ret = true;
        } else if (rec->ifaces->num != ifaces->num) {
                /* Number of interfaces has changed */
+               DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
+                                    rec->ifaces->num, ifaces->num));
                ret = true;
        } else {
                /* See if interface names or link states have changed */
                int i;
                for (i = 0; i < rec->ifaces->num; i++) {
                        struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
-                       if (strcmp(iface->name, ifaces->ifaces[i].name) != 0 ||
-                           iface->link_state != ifaces->ifaces[i].link_state) {
+                       if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
+                               DEBUG(DEBUG_NOTICE,
+                                     ("Interface in slot %d changed: %s => %s\n",
+                                      i, iface->name, ifaces->ifaces[i].name));
+                               ret = true;
+                               break;
+                       }
+                       if (iface->link_state != ifaces->ifaces[i].link_state) {
+                               DEBUG(DEBUG_NOTICE,
+                                     ("Interface %s changed state: %d => %d\n",
+                                      iface->name, iface->link_state,
+                                      ifaces->ifaces[i].link_state));
                                ret = true;
                                break;
                        }
@@ -3025,7 +3229,7 @@ static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_rec
        }
 
        if (need_takeover_run) {
-               struct takeover_run_reply rd;
+               struct srvid_request rd;
                TDB_DATA data;
 
                DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
@@ -3196,6 +3400,7 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                close(state->fd[0]);
                state->fd[0] = -1;
 
+               ctdb_set_process_name("ctdb_rec_reclock");
                debug_extra = talloc_asprintf(NULL, "recovery-lock:");
                if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
                        DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
@@ -3206,7 +3411,6 @@ static int check_recovery_lock(struct ctdb_context *ctdb)
                /* make sure we die when our parent dies */
                while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
                        sleep(5);
-                       write(state->fd[1], &cc, 1);
                }
                _exit(0);
        }
@@ -3319,7 +3523,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int32_t debug_level;
        int i, j, ret;
-
+       bool self_ban;
 
 
        /* verify that the main daemon is still running */
@@ -3344,28 +3548,6 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
        LogLevel = debug_level;
 
-
-       /* We must check if we need to ban a node here but we want to do this
-          as early as possible so we dont wait until we have pulled the node
-          map from the local node. thats why we have the hardcoded value 20
-       */
-       for (i=0; i<ctdb->num_nodes; i++) {
-               struct ctdb_banning_state *ban_state;
-
-               if (ctdb->nodes[i]->ban_state == NULL) {
-                       continue;
-               }
-               ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
-               if (ban_state->count < 20) {
-                       continue;
-               }
-               DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
-                       ctdb->nodes[i]->pnn, ban_state->count,
-                       ctdb->tunable.recovery_ban_period));
-               ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
-               ban_state->count = 0;
-       }
-
        /* get relevant tunables */
        ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
        if (ret != 0) {
@@ -3389,11 +3571,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                }
        }
 
-       pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
-       if (pnn == (uint32_t)-1) {
-               DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
-               return;
-       }
+       pnn = ctdb_get_pnn(ctdb);
 
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
@@ -3416,73 +3594,96 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
        nodemap = rec->nodemap;
 
-       /* update the capabilities for all nodes */
-       ret = update_capabilities(ctdb, nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
-               return;
-       }
-
-       /* check which node is the recovery master */
-       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
-               return;
-       }
-
-       /* if we are not the recmaster we can safely ignore any ip reallocate requests */
-       if (rec->recmaster != pnn) {
-               if (rec->ip_reallocate_ctx != NULL) {
-                       talloc_free(rec->ip_reallocate_ctx);
-                       rec->ip_reallocate_ctx = NULL;
-                       rec->reallocate_callers = NULL;
-               }
-       }
+       /* remember our own node flags */
+       rec->node_flags = nodemap->nodes[pnn].flags;
 
-       if (rec->recmaster == (uint32_t)-1) {
-               DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
-               force_election(rec, pnn, nodemap);
+       ban_misbehaving_nodes(rec, &self_ban);
+       if (self_ban) {
+               DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
                return;
        }
 
-       /* if the local daemon is STOPPED, we verify that the databases are
-          also frozen and thet the recmode is set to active 
+       /* if the local daemon is STOPPED or BANNED, we verify that the databases are
+          also frozen and that the recmode is set to active.
        */
-       if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
+       if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
+               /* If this node has become inactive then we want to
+                * reduce the chances of it taking over the recovery
+                * master role when it becomes active again.  This
+                * helps to stabilise the recovery master role so that
+                * it stays on the most stable node.
+                */
+               rec->priority_time = timeval_current();
+
                ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
                }
                if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
-                       DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
+                       DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
 
                        ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
                        if (ret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED state\n"));
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
                                return;
                        }
                        ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
                        if (ret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED state\n"));
+                               DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
 
                                return;
                        }
-                       return;
                }
+
+               /* If this node is stopped or banned then it is not the recovery
+                * master, so don't do anything. This prevents stopped or banned
+                * node from starting election and sending unnecessary controls.
+                */
+               return;
        }
-       /* If the local node is stopped, verify we are not the recmaster 
-          and yield this role if so
-       */
-       if ((nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE) && (rec->recmaster == pnn)) {
-               DEBUG(DEBUG_ERR,("Local node is INACTIVE. Yielding recmaster role\n"));
+
+       /* check which node is the recovery master */
+       ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
+               return;
+       }
+
+       /* If we are not the recmaster then do some housekeeping */
+       if (rec->recmaster != pnn) {
+               /* Ignore any IP reallocate requests - only recmaster
+                * processes them
+                */
+               TALLOC_FREE(rec->reallocate_requests);
+               /* Clear any nodes that should be force rebalanced in
+                * the next takeover run.  If the recovery master role
+                * has moved then we don't want to process these some
+                * time in the future.
+                */
+               TALLOC_FREE(rec->force_rebalance_nodes);
+       }
+
+       /* This is a special case.  When recovery daemon is started, recmaster
+        * is set to -1.  If a node is not started in stopped state, then
+        * start election to decide recovery master
+        */
+       if (rec->recmaster == (uint32_t)-1) {
+               DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
                force_election(rec, pnn, nodemap);
                return;
        }
-       
+
+       /* update the capabilities for all nodes */
+       ret = update_capabilities(ctdb, nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
+               return;
+       }
+
        /*
-        * if the current recmaster do not have CTDB_CAP_RECMASTER,
-        * but we have force an election and try to become the new
-        * recmaster
+        * If the current recmaster does not have CTDB_CAP_RECMASTER,
+        * but we have, then force an election and try to become the new
+        * recmaster.
         */
        if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
            (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
@@ -3494,19 +3695,16 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                return;
        }
 
-       /* check that we (recovery daemon) and the local ctdb daemon
-          agrees on whether we are banned or not
-       */
-
-       /* remember our own node flags */
-       rec->node_flags = nodemap->nodes[pnn].flags;
-
        /* count how many active nodes there are */
        rec->num_active    = 0;
+       rec->num_lmasters  = 0;
        rec->num_connected = 0;
        for (i=0; i<nodemap->num; i++) {
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
                        rec->num_active++;
+                       if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
+                               rec->num_lmasters++;
+                       }
                }
                if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
                        rec->num_connected++;
@@ -3547,27 +3745,23 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
            (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
                DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
+               /*
+                * update our nodemap to carry the recmaster's notion of
+                * its own flags, so that we don't keep freezing the
+                * inactive recmaster node...
+                */
+               nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
                force_election(rec, pnn, nodemap);
                return;
        }
 
-       /* If this node is stopped then it is not the recovery master
-        * so the only remaining action is to potentially to verify
-        * the local IP allocation below.  This won't accomplish
-        * anything useful so skip it.
-        */
-       if (rec->node_flags & NODE_FLAGS_STOPPED) {
-               return;
-       }
-
        /* verify that we have all ip addresses we should have and we dont
         * have addresses we shouldnt have.
         */ 
-       if (ctdb->tunable.disable_ip_failover == 0) {
-               if (rec->ip_check_disable_ctx == NULL) {
-                       if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
-                               DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
-                       }
+       if (ctdb->tunable.disable_ip_failover == 0 &&
+           rec->takeover_runs_disable_ctx == NULL) {
+               if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
                }
        }
 
@@ -3594,7 +3788,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
 
        if (ctdb->num_nodes != nodemap->num) {
                DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
-               reload_nodes_file(ctdb);
+               ctdb_load_nodes_file(ctdb);
                return;
        }
 
@@ -3647,15 +3841,9 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
-       /* is there a pending reload all ips ? */
-       if (reload_all_ips_request != NULL) {
-               reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
-               talloc_free(reload_all_ips_request);
-               reload_all_ips_request = NULL;
-       }
-
        /* if there are takeovers requested, perform it and notify the waiters */
-       if (rec->reallocate_callers) {
+       if (rec->takeover_runs_disable_ctx == NULL &&
+           rec->reallocate_requests) {
                process_ipreallocate_requests(ctdb, rec);
        }
 
@@ -3713,6 +3901,23 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                                return;
                        }
                }
+       }
+
+       /*
+        * Update node flags obtained from each active node. This ensure we have
+        * up-to-date information for all the nodes.
+        */
+       for (j=0; j<nodemap->num; j++) {
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
+               nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
+       }
+
+       for (j=0; j<nodemap->num; j++) {
+               if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
+               }
 
                /* verify the flags are consistent
                */
@@ -3747,12 +3952,13 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
        }
 
 
-       /* there better be the same number of lmasters in the vnn map
-          as there are active nodes or we will have to do a recovery
+       /* There must be the same number of lmasters in the vnn map as
+        * there are active nodes with the lmaster capability...  or
+        * do a recovery.
         */
-       if (vnnmap->size != rec->num_active) {
-               DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
-                         vnnmap->size, rec->num_active));
+       if (vnnmap->size != rec->num_lmasters) {
+               DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
+                         vnnmap->size, rec->num_lmasters));
                ctdb_set_culprit(rec, ctdb->pnn);
                do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
                return;
@@ -3869,9 +4075,7 @@ static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
                 * failure, monitoring is disabled cluster-wide (via
                 * startrecovery eventscript) and will not get enabled.
                 */
-               ret = ctdb_takeover_run(ctdb, nodemap, takeover_fail_callback, rec);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Trying again\n"));
+               if (!do_takeover_run(rec, nodemap, true)) {
                        return;
                }
 
@@ -3905,6 +4109,8 @@ static void monitor_cluster(struct ctdb_context *ctdb)
 
        rec->ctdb = ctdb;
 
+       rec->takeover_run_in_progress = false;
+
        rec->priority_time = timeval_current();
 
        /* register a message port for sending memory dumps */
@@ -3934,9 +4140,6 @@ static void monitor_cluster(struct ctdb_context *ctdb)
        /* register a message port for performing a takeover run */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
 
-       /* register a message port for performing a reload all ips */
-       ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
-
        /* register a message port for disabling the ip check for a short while */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
 
@@ -3947,6 +4150,11 @@ static void monitor_cluster(struct ctdb_context *ctdb)
           reallocation */
        ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
 
+       /* Register a message port for disabling takeover runs */
+       ctdb_client_set_message_handler(ctdb,
+                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
+                                       disable_takeover_runs_handler, rec);
+
        for (;;) {
                TALLOC_CTX *mem_ctx = talloc_new(ctdb);
                struct timeval start;
@@ -4065,6 +4273,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        /* Clear the log ringbuffer */
        ctdb_clear_log(ctdb);
 
+       ctdb_set_process_name("ctdb_recovered");
        if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
                DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
                exit(1);