implement a scheme where nodes are banned if they continuously caused the cluster
authorAndrew Tridgell <tridge@samba.org>
Thu, 7 Jun 2007 05:18:55 +0000 (15:18 +1000)
committerAndrew Tridgell <tridge@samba.org>
Thu, 7 Jun 2007 05:18:55 +0000 (15:18 +1000)
to start a recovery session. The node is banned from the cluster for the RecoveryBanPeriod (default of 5 minutes)

13 files changed:
1  2 
common/ctdb.c
common/ctdb_client.c
common/ctdb_control.c
common/ctdb_daemon.c
common/ctdb_monitor.c
common/ctdb_recoverd.c
common/ctdb_tunables.c
include/ctdb.h
include/ctdb_private.h
takeover/ctdb_takeover.c
tcp/tcp_connect.c
tests/nodes.txt
tools/ctdb_control.c

diff --cc common/ctdb.c
index 01654fe518268583483af4009489e3c43ebf31b7,01654fe518268583483af4009489e3c43ebf31b7..151005a8ebffcf2d94c6b0c6f36e4ace93673707
@@@ -116,10 -116,10 +116,13 @@@ static int ctdb_add_node(struct ctdb_co
        /* this assumes that the nodes are kept in sorted order, and no gaps */
        node->vnn = ctdb->num_nodes;
  
++      /* nodes start out disconnected */
++      node->flags |= NODE_FLAGS_DISCONNECTED;
++
        if (ctdb->address.address &&
            ctdb_same_address(&ctdb->address, &node->address)) {
                ctdb->vnn = node->vnn;
--              node->flags |= NODE_FLAGS_CONNECTED;
++              node->flags &= ~NODE_FLAGS_DISCONNECTED;
        }
  
        ctdb->num_nodes++;
@@@ -222,8 -222,8 +225,7 @@@ uint32_t ctdb_get_num_enabled_nodes(str
        uint32_t count=0;
        for (i=0;i<ctdb->vnn_map->size;i++) {
                struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
--              if ((node->flags & NODE_FLAGS_CONNECTED) &&
--                  !(node->flags & NODE_FLAGS_DISABLED)) {
++              if (!(node->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED))) {
                        count++;
                }
        }
@@@ -354,14 -354,14 +356,14 @@@ static void ctdb_recv_pkt(struct ctdb_c
  */
  void ctdb_node_dead(struct ctdb_node *node)
  {
--      if (!(node->flags & NODE_FLAGS_CONNECTED)) {
++      if (node->flags & NODE_FLAGS_DISCONNECTED) {
                DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n", 
                         node->ctdb->name, node->name, 
                         node->ctdb->num_connected));
                return;
        }
        node->ctdb->num_connected--;
--      node->flags &= ~NODE_FLAGS_CONNECTED;
++      node->flags |= NODE_FLAGS_DISCONNECTED;
        node->rx_cnt = 0;
        node->dead_count = 0;
        DEBUG(1,("%s: node %s is dead: %u connected\n", 
  */
  void ctdb_node_connected(struct ctdb_node *node)
  {
--      if (node->flags & NODE_FLAGS_CONNECTED) {
++      if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
                DEBUG(1,("%s: node %s is already marked connected: %u connected\n", 
                         node->ctdb->name, node->name, 
                         node->ctdb->num_connected));
        }
        node->ctdb->num_connected++;
        node->dead_count = 0;
--      node->flags |= NODE_FLAGS_CONNECTED;
++      node->flags &= ~NODE_FLAGS_DISCONNECTED;
        DEBUG(1,("%s: connected to %s - %u connected\n", 
                 node->ctdb->name, node->name, node->ctdb->num_connected));
  }
index 271a34085f9eed3b73172bd6eb24b8dd93b6f2d3,271a34085f9eed3b73172bd6eb24b8dd93b6f2d3..74b79426d53a1eea6782db77168b8d04ac89a942
@@@ -1277,7 -1277,7 +1277,7 @@@ uint32_t *ctdb_get_connected_nodes(stru
        }
  
        for (i=0;i<map->num;i++) {
--              if (map->nodes[i].flags & NODE_FLAGS_CONNECTED) {
++              if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
                        nodes[*num_nodes] = map->nodes[i].vnn;
                        (*num_nodes)++;
                }
@@@ -1921,20 -1921,20 +1921,25 @@@ int ctdb_ctrl_get_public_ips(struct ctd
  /*
    set/clear the permanent disabled bit on a remote node
   */
--int ctdb_ctrl_permdisable(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t mode)
++int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
++                     uint32_t set, uint32_t clear)
  {
        int ret;
        TDB_DATA data;
++      struct ctdb_node_modflags m;
        int32_t res;
  
--      data.dsize = sizeof(uint32_t);
--      data.dptr = (unsigned char *)&mode;
++      m.set = set;
++      m.clear = clear;
++
++      data.dsize = sizeof(m);
++      data.dptr = (unsigned char *)&m;
  
        ret = ctdb_control(ctdb, destnode, 0, 
--                         CTDB_CONTROL_PERMANENTLY_DISABLE, 0, data, 
++                         CTDB_CONTROL_MODIFY_FLAGS, 0, data, 
                           NULL, NULL, &res, &timeout, NULL);
        if (ret != 0 || res != 0) {
--              DEBUG(0,(__location__ " ctdb_control for setpermdisable failed\n"));
++              DEBUG(0,(__location__ " ctdb_control for modflags failed\n"));
                return -1;
        }
  
index e22cc7f07325e5c7b902b0e7018c08632e64c702,83a4ee33a6472df8c302688b642e23a3e33a6ddc..fbb0662dc27eb6a9ebfb235802bbb1d3dd1bcee6
@@@ -288,9 -288,14 +288,9 @@@ static int32_t ctdb_control_dispatch(st
        case CTDB_CONTROL_LIST_TUNABLES:
                return ctdb_control_list_tunables(ctdb, outdata);
  
--      case CTDB_CONTROL_PERMANENTLY_DISABLE:
--              CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
-               return ctdb_control_permdisable(ctdb, indata);
 -              if ( *(uint32_t *)indata.dptr ){
 -                      ctdb->nodes[ctdb->vnn]->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
 -              } else {
 -                      ctdb->nodes[ctdb->vnn]->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
 -              }
 -              return 0;
++      case CTDB_CONTROL_MODIFY_FLAGS:
++              CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_node_modflags));
++              return ctdb_control_modflags(ctdb, indata);
  
        default:
                DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
@@@ -445,7 -450,7 +445,7 @@@ int ctdb_daemon_send_control(struct ctd
  
        if (destnode != CTDB_BROADCAST_VNNMAP && destnode != CTDB_BROADCAST_ALL && 
            (!ctdb_validate_vnn(ctdb, destnode) || 
--           !(ctdb->nodes[destnode]->flags & NODE_FLAGS_CONNECTED))) {
++           (ctdb->nodes[destnode]->flags & NODE_FLAGS_DISCONNECTED))) {
                if (!(flags & CTDB_CTRL_FLAG_NOREPLY)) {
                        callback(ctdb, -1, tdb_null, "ctdb_control to disconnected node", private_data);
                }
index 78f5313babc11e784b15db61c975eebaa7850a01,78f5313babc11e784b15db61c975eebaa7850a01..761e8fb33e316eb46568baca15ddef4ec33d5390
@@@ -44,10 -44,10 +44,10 @@@ static void flag_change_handler(struct 
                return;
        }
  
--      /* don't get the connected flag from the other node */
++      /* don't get the disconnected flag from the other node */
        ctdb->nodes[c->vnn]->flags = 
--              (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_CONNECTED) 
--              | (c->flags & ~NODE_FLAGS_CONNECTED);   
++              (ctdb->nodes[c->vnn]->flags&NODE_FLAGS_DISCONNECTED) 
++              | (c->flags & ~NODE_FLAGS_DISCONNECTED);        
  }
  
  /* called when the "startup" event script has finished */
index 3b838e6e047437c22275d0f56a4778e54b7549d9,8461dca242c385be72388728a6ea62852ef3e2d6..1bf1128b0296f5dde6eeaf1a1cc52872809e06ee
@@@ -48,7 -48,7 +48,7 @@@ static void ctdb_check_for_dead_nodes(s
                        continue;
                }
                
--              if (!(node->flags & NODE_FLAGS_CONNECTED)) {
++              if (node->flags & NODE_FLAGS_DISCONNECTED) {
                        /* it might have come alive again */
                        if (node->rx_cnt != 0) {
                                ctdb_node_connected(node);
@@@ -182,33 -186,3 +182,36 @@@ void ctdb_start_monitoring(struct ctdb_
                             ctdb_check_health, ctdb);
        CTDB_NO_MEMORY_FATAL(ctdb, te);
  }
-   administratively disable/enable a node 
 +
 +
 +/*
- int32_t ctdb_control_permdisable(struct ctdb_context *ctdb, TDB_DATA indata)
++  modify flags on a node
 + */
-       uint32_t set = *(uint32_t *)indata.dptr;
++int32_t ctdb_control_modflags(struct ctdb_context *ctdb, TDB_DATA indata)
 +{
-       if (set) {
-               node->flags |= NODE_FLAGS_PERMANENTLY_DISABLED;
-       } else {
-               node->flags &= ~NODE_FLAGS_PERMANENTLY_DISABLED;
++      struct ctdb_node_modflags *m = (struct ctdb_node_modflags *)indata.dptr;
 +      TDB_DATA data;
 +      struct ctdb_node_flag_change c;
 +      struct ctdb_node *node = ctdb->nodes[ctdb->vnn];
++      uint32_t old_flags = node->flags;
 +
++      node->flags |= m->set;
++      node->flags &= ~m->clear;
++
++      if (node->flags == old_flags) {
++              /* no change */
++              return 0;
 +      }
 +
 +      c.vnn = ctdb->vnn;
 +      c.flags = node->flags;
 +
 +      data.dptr = (uint8_t *)&c;
 +      data.dsize = sizeof(c);
 +
 +      /* tell the other nodes that something has changed */
 +      ctdb_daemon_send_message(ctdb, CTDB_BROADCAST_VNNMAP,
 +                               CTDB_SRVID_NODE_FLAGS_CHANGED, data);
 +      
 +      return 0;
 +}
index 8a376d66f3671aaa35cd3cfdd5e92358e025dde1,8a376d66f3671aaa35cd3cfdd5e92358e025dde1..b7a401730774901528b014e1cb136f6c2379dcef
  #include "../include/ctdb.h"
  #include "../include/ctdb_private.h"
  
--static int timed_out = 0;
--
--static void timeout_func(struct event_context *ev, struct timed_event *te, 
--      struct timeval t, void *private_data)
--{
--      timed_out = 1;
--}
++/*
++  private state of recovery daemon
++ */
++struct ctdb_recoverd {
++      struct ctdb_context *ctdb;
++      TALLOC_CTX *mem_ctx;
++      uint32_t last_culprit;
++      uint32_t culprit_counter;
++      struct timeval first_recover_time;
++      bool *banned_nodes;
++};
  
  #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
  #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
  
++/*
++  change recovery mode on all nodes
++ */
  static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t rec_mode)
  {
        int j, ret;
@@@ -45,7 -45,7 +52,7 @@@
        /* set recovery mode to active on all nodes */
        for (j=0; j<nodemap->num; j++) {
                /* dont change it for nodes that are unavailable */
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
  
@@@ -75,6 -75,6 +82,9 @@@
        return 0;
  }
  
++/*
++  change recovery master on all node
++ */
  static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn)
  {
        int j, ret;
@@@ -82,7 -82,7 +92,7 @@@
        /* set recovery master to vnn on all nodes */
        for (j=0; j<nodemap->num; j++) {
                /* dont change it for nodes that are unavailable */
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
  
        return 0;
  }
  
--static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
++
++/*
++  ensure all other nodes have attached to any databases that we have
++ */
++static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
++                                         uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
  {
        int i, j, db, ret;
        struct ctdb_dbid_map *remote_dbmap;
                        continue;
                }
                /* dont check nodes that are unavailable */
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
  
  }
  
  
--static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
++/*
++  ensure we are attached to any databases that anyone else is attached to
++ */
++static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
++                                        uint32_t vnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
  {
        int i, j, db, ret;
        struct ctdb_dbid_map *remote_dbmap;
                        continue;
                }
                /* dont check nodes that are unavailable */
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
  
  }
  
  
--static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
++/*
++  pull all the remote database contents into ours
++ */
++static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
++                                   uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
  {
        int i, j, ret;
  
                                continue;
                        }
                        /* dont merge from nodes that are unavailable */
--                      if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++                      if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                                continue;
                        }
                        ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
  }
  
  
--
++/*
++  change the dmaster on all databases to point to us
++ */
  static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
                                           uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
  {
        for (i=0;i<dbmap->num;i++) {
                for (j=0; j<nodemap->num; j++) {
                        /* dont repoint nodes that are unavailable */
--                      if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++                      if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                                continue;
                        }
                        ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].vnn, ctdb, dbmap->dbids[i], vnn);
        return 0;
  }
  
++
++/*
++  update flags on all active nodes
++ */
++static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
++{
++      int i;
++      for (i=0;i<nodemap->num;i++) {
++              struct ctdb_node_flag_change c;
++              TDB_DATA data;
++              uint32_t flags = nodemap->nodes[i].flags;
++
++              if (flags & NODE_FLAGS_DISCONNECTED) {
++                      continue;
++              }
++
++              c.vnn = nodemap->nodes[i].vnn;
++              c.flags = nodemap->nodes[i].flags;
++
++              data.dptr = (uint8_t *)&c;
++              data.dsize = sizeof(c);
++
++              ctdb_send_message(ctdb, CTDB_BROADCAST_VNNMAP,
++                                CTDB_SRVID_NODE_FLAGS_CHANGED, data);
++
++      }
++      return 0;
++}
++
  /*
    vacuum one database
   */
@@@ -279,7 -279,7 +333,7 @@@ static int vacuum_db(struct ctdb_contex
  
        /* set rsn on non-empty records to max_rsn+1 */
        for (i=0;i<nodemap->num;i++) {
--              if (!nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) {
++              if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
                ret = ctdb_ctrl_set_rsn_nonempty(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
  
        /* delete records with rsn < max_rsn+1 on all nodes */
        for (i=0;i<nodemap->num;i++) {
--              if (!nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) {
++              if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
                ret = ctdb_ctrl_delete_low_rsn(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].vnn,
  }
  
  
++/*
++  vacuum all attached databases
++ */
  static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
                                struct ctdb_dbid_map *dbmap)
  {
  }
  
  
++/*
++  push out all our database contents to all other nodes
++ */
  static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
                                    uint32_t vnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
  {
                                continue;
                        }
                        /* dont push to nodes that are unavailable */
--                      if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++                      if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                                continue;
                        }
                        ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), vnn, nodemap->nodes[j].vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
  }
  
  
++/*
++  ensure all nodes have the same vnnmap we do
++ */
  static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
                                      uint32_t vnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
  {
        /* push the new vnn map out to all the nodes */
        for (j=0; j<nodemap->num; j++) {
                /* dont push to nodes that are unavailable */
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
  
        return 0;
  }
  
++
++struct ban_state {
++      struct ctdb_recoverd *rec;
++      uint32_t banned_node;
++};
++
++/*
++  called when a ban has timed out
++ */
++static void ctdb_ban_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
++{
++      struct ban_state *state = talloc_get_type(p, struct ban_state);
++      DEBUG(0,("Node %u in now unbanned\n", state->banned_node));
++      
++      state->rec->banned_nodes[state->banned_node] = false;   
++      talloc_free(state);
++}
++
++
  /*
    we are the recmaster, and recovery is needed - start a recovery run
   */
--static int do_recovery(struct ctdb_context *ctdb
++static int do_recovery(struct ctdb_recoverd *rec
                       TALLOC_CTX *mem_ctx, uint32_t vnn, uint32_t num_active,
--                     struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
++                     struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap,
++                     uint32_t culprit)
  {
++      struct ctdb_context *ctdb = rec->ctdb;
        int i, j, ret;
        uint32_t generation;
        struct ctdb_dbid_map *dbmap;
  
++      if (rec->last_culprit != culprit ||
++          timeval_elapsed(&rec->first_recover_time) > ctdb->tunable.recovery_grace_period) {
++              /* either a new node is the culprit, or we've decide to forgive them */
++              rec->last_culprit = culprit;
++              rec->first_recover_time = timeval_current();
++              rec->culprit_counter = 0;
++      }
++      rec->culprit_counter++;
++
++      if (rec->culprit_counter > 2*nodemap->num) {
++              struct ban_state *state;
++
++              DEBUG(0,("Node %u has caused %u recoveries in %.0f seconds - banning it for %u seconds\n",
++                       culprit, rec->culprit_counter, timeval_elapsed(&rec->first_recover_time),
++                       ctdb->tunable.recovery_ban_period));
++              ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), culprit, NODE_FLAGS_BANNED, 0);
++              rec->banned_nodes[culprit] = true;
++
++              state = talloc(rec->mem_ctx, struct ban_state);
++              CTDB_NO_MEMORY_FATAL(ctdb, state);
++
++              state->rec = rec;
++              state->banned_node = culprit;
++
++              event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.recovery_ban_period, 0),
++                              ctdb_ban_timeout, state);
++      }
++
        if (!ctdb_recovery_lock(ctdb, true)) {
                DEBUG(0,("Unable to get recovery lock - aborting recovery\n"));
                return -1;
                return -1;
        }
  
--      DEBUG(0, (__location__ " Recovery initiated\n"));
++      DEBUG(0, (__location__ " Recovery initiated due to problem with node %u\n", culprit));
  
        /* pick a new generation number */
        generation = random();
  
  
  
--      /* build a new vnn map with all the currently active nodes */
++      /* build a new vnn map with all the currently active and
++         unbanned nodes */
        generation = random();
        vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
        CTDB_NO_MEMORY(ctdb, vnnmap);
        vnnmap->size = num_active;
        vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
        for (i=j=0;i<nodemap->num;i++) {
--              if (nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED) {
++              if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
                        vnnmap->map[j++] = nodemap->nodes[i].vnn;
                }
        }
                return -1;
        }
  
++      /*
++        update all nodes to have the same flags that we have
++       */
++      ret = update_flags_on_all_nodes(ctdb, nodemap);
++      if (ret != 0) {
++              DEBUG(0, (__location__ " Unable to update flags on all nodes\n"));
++              return -1;
++      }
++      
        /*
          run a vacuum operation on empty records
         */
@@@ -553,6 -553,6 +675,10 @@@ struct election_message 
        uint32_t vnn;
  };
  
++
++/*
++  send out an election request
++ */
  static int send_election_request(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, uint32_t vnn)
  {
        int ret;
    handler for recovery master elections
  */
  static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
--                               TDB_DATA data, void *private_data)
++                           TDB_DATA data, void *private_data)
  {
++      struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
        int ret;
        struct election_message *em = (struct election_message *)data.dptr;
        TALLOC_CTX *mem_ctx;
                return;
        }
  
++      /* release any ban information */
++      talloc_free(rec->mem_ctx);
++      rec->mem_ctx = talloc_new(rec);
++      CTDB_NO_MEMORY_FATAL(rec->mem_ctx, rec->banned_nodes);
++
++      rec->last_culprit = (uint32_t)-1;
++      talloc_free(rec->banned_nodes);
++      rec->banned_nodes = talloc_zero_array(rec, bool, ctdb->num_nodes);
++      CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
++
        talloc_free(mem_ctx);
        return;
  }
  
  
++/*
++  called when ctdb_wait_timeout should finish
++ */
++static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
++                            struct timeval yt, void *p)
++{
++      uint32_t *timed_out = (uint32_t *)p;
++      (*timed_out) = 1;
++}
++
++/*
++  wait for a given number of seconds
++ */
++static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
++{
++      uint32_t timed_out = 0;
++      event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
++      while (!timed_out) {
++              event_loop_once(ctdb->ev);
++      }
++}
++
++/*
++  force the start of the election process
++ */
  static void force_election(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, uint32_t vnn, struct ctdb_node_map *nodemap)
  {
        int ret;
        }
  
        /* wait for a few seconds to collect all responses */
--      timed_out = 0;
--      event_add_timed(ctdb->ev, mem_ctx, timeval_current_ofs(ctdb->tunable.election_timeout, 0),
--                      timeout_func, ctdb);
--      while (!timed_out) {
--              event_loop_once(ctdb->ev);
++      ctdb_wait_timeout(ctdb, ctdb->tunable.election_timeout);
++}
++
++
++
++/*
++  handler for when a node changes its flags
++*/
++static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
++                          TDB_DATA data, void *private_data)
++{
++      int ret;
++      struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
++      struct ctdb_node_map *nodemap=NULL;
++      TALLOC_CTX *tmp_ctx;
++      int i;
++
++      if (data.dsize != sizeof(*c)) {
++              DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
++              return;
++      }
++
++      tmp_ctx = talloc_new(ctdb);
++      CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
++
++      ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
++
++      for (i=0;i<nodemap->num;i++) {
++              if (nodemap->nodes[i].vnn == c->vnn) break;
++      }
++
++      if (i == nodemap->num) {
++              DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->vnn));
++              talloc_free(tmp_ctx);
++              return;
++      }
++
++      if (nodemap->nodes[i].flags != c->flags) {
++              DEBUG(0,("Node %u has changed flags - now 0x%x\n", c->vnn, c->flags));
++      }
++
++      nodemap->nodes[i].flags = c->flags;
++
++      ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), 
++                                   CTDB_CURRENT_NODE, &ctdb->recovery_master);
++
++      if (ret == 0) {
++              ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), 
++                                         CTDB_CURRENT_NODE, &ctdb->recovery_mode);
        }
++      
++      if (ret == 0 &&
++          ctdb->recovery_master == ctdb->vnn &&
++          ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
++          ctdb->takeover.enabled) {
++              ret = ctdb_takeover_run(ctdb, nodemap);
++              if (ret != 0) {
++                      DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
++              }
++      }
++
++      talloc_free(tmp_ctx);
  }
  
++
++
  /*
    the main monitoring loop
   */
@@@ -672,6 -672,6 +892,23 @@@ static void monitor_cluster(struct ctdb
        struct ctdb_vnn_map *remote_vnnmap=NULL;
        int i, j, ret;
        bool need_takeover_run;
++      struct ctdb_recoverd *rec;
++
++      rec = talloc_zero(ctdb, struct ctdb_recoverd);
++      CTDB_NO_MEMORY_FATAL(ctdb, rec);
++
++      rec->ctdb = ctdb;
++      rec->banned_nodes = talloc_zero_array(rec, bool, ctdb->num_nodes);
++      CTDB_NO_MEMORY_FATAL(ctdb, rec->banned_nodes);
++
++      rec->mem_ctx = talloc_new(rec);
++      CTDB_NO_MEMORY_FATAL(ctdb, rec->mem_ctx);
++
++      /* register a message port for recovery elections */
++      ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
++
++      /* and one for when nodes are disabled/enabled */
++      ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, rec);
        
  again:
        need_takeover_run = false;
        }
  
        /* we only check for recovery once every second */
--      timed_out = 0;
--      event_add_timed(ctdb->ev, mem_ctx, MONITOR_TIMEOUT(), timeout_func, ctdb);
--      while (!timed_out) {
--              event_loop_once(ctdb->ev);
--      }
++      ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
  
        /* get relevant tunables */
        ctdb_ctrl_get_tunable(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 
                              "ElectionTimeout", &ctdb->tunable.election_timeout);
        ctdb_ctrl_get_tunable(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 
                              "TakeoverTimeout", &ctdb->tunable.takeover_timeout);
++      ctdb_ctrl_get_tunable(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 
++                            "RecoveryGracePeriod", &ctdb->tunable.recovery_grace_period);
++      ctdb_ctrl_get_tunable(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 
++                            "RecoveryBanPeriod", &ctdb->tunable.recovery_ban_period);
  
        vnn = ctdb_ctrl_getvnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
        if (vnn == (uint32_t)-1) {
                goto again;
        }
  
--      ctdb->vnn = vnn;
--
        /* get the vnnmap */
        ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), vnn, mem_ctx, &vnnmap);
        if (ret != 0) {
        /* count how many active nodes there are */
        num_active = 0;
        for (i=0; i<nodemap->num; i++) {
--              if (nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED) {
++              if (rec->banned_nodes[nodemap->nodes[i].vnn]) {
++                      nodemap->nodes[i].flags |= NODE_FLAGS_BANNED;
++              } else {
++                      nodemap->nodes[i].flags &= ~NODE_FLAGS_BANNED;
++              }
++              if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
                        num_active++;
                }
        }
                goto again;
        }
  
--      if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++      if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].vnn));
                force_election(ctdb, mem_ctx, vnn, nodemap);
                goto again;
  
        /* verify that all active nodes agree that we are the recmaster */
        for (j=0; j<nodemap->num; j++) {
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
                if (nodemap->nodes[j].vnn == vnn) {
           and not in recovery mode 
         */
        for (j=0; j<nodemap->num; j++) {
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
  
                        DEBUG(0, ("Unable to get recmode from node %u\n", vnn));
                        goto again;
                }
--              if (recmode!=CTDB_RECOVERY_NORMAL) {
++              if (recmode != CTDB_RECOVERY_NORMAL) {
                        DEBUG(0, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", nodemap->nodes[j].vnn));
--                      do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++                      do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                        goto again;
                }
        }
           they are the same as for this node
         */
        for (j=0; j<nodemap->num; j++) {
--              if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
                if (nodemap->nodes[j].vnn == vnn) {
                   then this is a good reason to try recovery
                 */
                if (remote_nodemap->num != nodemap->num) {
--                      DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n", nodemap->nodes[j].vnn, remote_nodemap->num, nodemap->num));
--                      do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++                      DEBUG(0, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
++                                nodemap->nodes[j].vnn, remote_nodemap->num, nodemap->num));
++                      do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                        goto again;
                }
  
                 */
                for (i=0;i<nodemap->num;i++) {
                        if ((remote_nodemap->nodes[i].vnn != nodemap->nodes[i].vnn)
--                          || ((remote_nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED) != 
--                              (nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED))) {
--                              DEBUG(0, (__location__ " Remote node:%u has different nodemap.\n", nodemap->nodes[j].vnn));
--                              do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++                          || ((remote_nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) != 
++                              (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE))) {
++                              DEBUG(0, (__location__ " Remote node:%u has different nodemap.\n", 
++                                        nodemap->nodes[j].vnn));
++                              do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                                goto again;
                        }
                }
           as there are active nodes or we will have to do a recovery
         */
        if (vnnmap->size != num_active) {
--              DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", vnnmap->size, num_active));
--              do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++              DEBUG(0, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
++                        vnnmap->size, num_active));
++              do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, ctdb->vnn);
                goto again;
        }
  
           the vnnmap.
         */
        for (j=0; j<nodemap->num; j++) {
--              if (!(nodemap->nodes[j].flags & NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
                if (nodemap->nodes[j].vnn == vnn) {
                                break;
                        }
                }
--              if (i==vnnmap->size) {
--                      DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", nodemap->nodes[j].vnn));
--                      do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++              if (i == vnnmap->size) {
++                      DEBUG(0, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
++                                nodemap->nodes[j].vnn));
++                      do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                        goto again;
                }
        }
           and are from the same generation
         */
        for (j=0; j<nodemap->num; j++) {
--              if (!(nodemap->nodes[j].flags & NODE_FLAGS_CONNECTED)) {
++              if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
                        continue;
                }
                if (nodemap->nodes[j].vnn == vnn) {
  
                /* verify the vnnmap generation is the same */
                if (vnnmap->generation != remote_vnnmap->generation) {
--                      DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", nodemap->nodes[j].vnn, remote_vnnmap->generation, vnnmap->generation));
--                      do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++                      DEBUG(0, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
++                                nodemap->nodes[j].vnn, remote_vnnmap->generation, vnnmap->generation));
++                      do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                        goto again;
                }
  
                /* verify the vnnmap size is the same */
                if (vnnmap->size != remote_vnnmap->size) {
--                      DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", nodemap->nodes[j].vnn, remote_vnnmap->size, vnnmap->size));
--                      do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++                      DEBUG(0, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
++                                nodemap->nodes[j].vnn, remote_vnnmap->size, vnnmap->size));
++                      do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                        goto again;
                }
  
                /* verify the vnnmap is the same */
                for (i=0;i<vnnmap->size;i++) {
                        if (remote_vnnmap->map[i] != vnnmap->map[i]) {
--                              DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", nodemap->nodes[j].vnn));
--                              do_recovery(ctdb, mem_ctx, vnn, num_active, nodemap, vnnmap);
++                              DEBUG(0, (__location__ " Remote node %u has different vnnmap.\n", 
++                                        nodemap->nodes[j].vnn));
++                              do_recovery(rec, mem_ctx, vnn, num_active, nodemap, vnnmap, nodemap->nodes[j].vnn);
                                goto again;
                        }
                }
  
  }
  
--
  /*
--  handler for when a node changes its flags
--*/
--static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
--                          TDB_DATA data, void *private_data)
--{
--      int ret;
--      struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
--      struct ctdb_node_map *nodemap=NULL;
--      TALLOC_CTX *tmp_ctx;
--      int i;
--
--      if (data.dsize != sizeof(*c)) {
--              DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
--              return;
--      }
--
--      tmp_ctx = talloc_new(ctdb);
--      CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
--
--      ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
--
--      for (i=0;i<nodemap->num;i++) {
--              if (nodemap->nodes[i].vnn == c->vnn) break;
--      }
--
--      if (i == nodemap->num) {
--              DEBUG(0,(__location__ "Flag change for non-existant node %u\n", c->vnn));
--              talloc_free(tmp_ctx);
--              return;
--      }
--
--      if (c->vnn != ctdb->vnn) {
--              DEBUG(0,("Node %u has changed flags - now 0x%x\n", c->vnn, c->flags));
--      }
--
--      nodemap->nodes[i].flags = c->flags;
--
--      ret = ctdb_ctrl_getrecmaster(ctdb, CONTROL_TIMEOUT(), 
--                                   CTDB_CURRENT_NODE, &ctdb->recovery_master);
--
--      if (ret == 0) {
--              ret = ctdb_ctrl_getrecmode(ctdb, CONTROL_TIMEOUT(), 
--                                         CTDB_CURRENT_NODE, &ctdb->recovery_mode);
--      }
--      
--      if (ret == 0 &&
--          ctdb->recovery_master == ctdb->vnn &&
--          ctdb->recovery_mode == CTDB_RECOVERY_NORMAL &&
--          ctdb->takeover.enabled) {
--              ret = ctdb_takeover_run(ctdb, nodemap);
--              if (ret != 0) {
--                      DEBUG(0, (__location__ " Unable to setup public takeover addresses\n"));
--              }
--      }
--
--      talloc_free(tmp_ctx);
--}
--
--
++  event handler for when the main ctdbd dies
++ */
  static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
                                 uint16_t flags, void *private_data)
  {
        _exit(1);
  }
  
++
++
++/*
++  startup the recovery daemon as a child of the main ctdb daemon
++ */
  int ctdb_start_recoverd(struct ctdb_context *ctdb)
  {
        int ret;
                exit(1);
        }
  
--      /* register a message port for recovery elections */
--      ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, NULL);
--
--      /* and one for when nodes are disabled/enabled */
--      ctdb_set_message_handler(ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, monitor_handler, NULL);
--
        monitor_cluster(ctdb);
  
        DEBUG(0,("ERROR: ctdb_recoverd finished!?\n"));
index 09ba515abe29c86be70aacd4a42adb62c3605696,09ba515abe29c86be70aacd4a42adb62c3605696..2005f3ada43db58e0d7bf3f8ecbcab57f37284db
@@@ -25,19 -25,19 +25,21 @@@ static const struct 
        uint32_t default_v;
        size_t offset;  
  } tunable_map[] = {
--      { "MaxRedirectCount",    3,  offsetof(struct ctdb_tunable, max_redirect_count) },
--      { "SeqnumFrequency",     1,  offsetof(struct ctdb_tunable, seqnum_frequency) },
--      { "ControlTimeout",     60, offsetof(struct ctdb_tunable, control_timeout) },
--      { "TraverseTimeout",    20, offsetof(struct ctdb_tunable, traverse_timeout) },
--      { "KeepaliveInterval",   2,  offsetof(struct ctdb_tunable, keepalive_interval) },
--      { "KeepaliveLimit",      3,  offsetof(struct ctdb_tunable, keepalive_limit) },
--      { "MaxLACount",          7,  offsetof(struct ctdb_tunable, max_lacount) },
--      { "RecoverTimeout",      5,  offsetof(struct ctdb_tunable, recover_timeout) },
--      { "RecoverInterval",     1,  offsetof(struct ctdb_tunable, recover_interval) },
--      { "ElectionTimeout",     3,  offsetof(struct ctdb_tunable, election_timeout) },
--      { "TakeoverTimeout",     5,  offsetof(struct ctdb_tunable, takeover_timeout) },
--      { "MonitorInterval",    15,  offsetof(struct ctdb_tunable, monitor_interval) },
--      { "EventScriptTimeout", 20,  offsetof(struct ctdb_tunable, script_timeout) },
++      { "MaxRedirectCount",     3,  offsetof(struct ctdb_tunable, max_redirect_count) },
++      { "SeqnumFrequency",      1,  offsetof(struct ctdb_tunable, seqnum_frequency) },
++      { "ControlTimeout",      60, offsetof(struct ctdb_tunable, control_timeout) },
++      { "TraverseTimeout",     20, offsetof(struct ctdb_tunable, traverse_timeout) },
++      { "KeepaliveInterval",    2,  offsetof(struct ctdb_tunable, keepalive_interval) },
++      { "KeepaliveLimit",       3,  offsetof(struct ctdb_tunable, keepalive_limit) },
++      { "MaxLACount",           7,  offsetof(struct ctdb_tunable, max_lacount) },
++      { "RecoverTimeout",       5,  offsetof(struct ctdb_tunable, recover_timeout) },
++      { "RecoverInterval",      1,  offsetof(struct ctdb_tunable, recover_interval) },
++      { "ElectionTimeout",      3,  offsetof(struct ctdb_tunable, election_timeout) },
++      { "TakeoverTimeout",      5,  offsetof(struct ctdb_tunable, takeover_timeout) },
++      { "MonitorInterval",     15,  offsetof(struct ctdb_tunable, monitor_interval) },
++      { "EventScriptTimeout",  20,  offsetof(struct ctdb_tunable, script_timeout) },
++      { "RecoveryGracePeriod", 60,  offsetof(struct ctdb_tunable, recovery_grace_period) },
++      { "RecoveryBanPeriod",  300,  offsetof(struct ctdb_tunable, recovery_ban_period) },
  };
  
  /*
diff --cc include/ctdb.h
index 4e8fe4fa3c018094cc639ba3cc5b8e401d41b02e,4e8fe4fa3c018094cc639ba3cc5b8e401d41b02e..545120b1742b2f51fed603b44adaf8a80f123db4
@@@ -351,9 -351,9 +351,9 @@@ int ctdb_ctrl_list_tunables(struct ctdb
                            TALLOC_CTX *mem_ctx,
                            const char ***list, uint32_t *count);
  
--int ctdb_ctrl_permdisable(struct ctdb_context *ctdb, 
--                        struct timeval timeout, 
--                        uint32_t destnode, 
--                        uint32_t mode);
++int ctdb_ctrl_modflags(struct ctdb_context *ctdb, 
++                     struct timeval timeout, 
++                     uint32_t destnode, 
++                     uint32_t set, uint32_t clear);
  
  #endif
index 2ec30817620ae50d26fa3a4f3d86a9e7ee78a0a8,c22c8a6d99665bac6e5288ad39a36eedda7e0c9e..aced667b1540fc0d94acf701b0e4d2b7ba4faf82
@@@ -53,6 -53,6 +53,8 @@@ struct ctdb_tunable 
        uint32_t takeover_timeout;
        uint32_t monitor_interval;
        uint32_t script_timeout;
++      uint32_t recovery_grace_period;
++      uint32_t recovery_ban_period;
  };
  
  /*
@@@ -111,10 -111,9 +113,12 @@@ struct ctdb_node 
        const char *name; /* for debug messages */
        void *private_data; /* private to transport */
        uint32_t vnn;
--#define NODE_FLAGS_CONNECTED          0x00000001
- #define NODE_FLAGS_UNHEALTHY                  0x00000002
 -#define NODE_FLAGS_DISABLED           0x00000002
--#define NODE_FLAGS_PERMANENTLY_DISABLED       0x00000004
++#define NODE_FLAGS_DISCONNECTED               0x00000001 /* node isn't connected */
++#define NODE_FLAGS_UNHEALTHY                  0x00000002 /* monitoring says node is unhealthy */
++#define NODE_FLAGS_PERMANENTLY_DISABLED       0x00000004 /* administrator has disabled node */
++#define NODE_FLAGS_BANNED             0x00000008 /* recovery daemon has banned the node */
 +#define NODE_FLAGS_DISABLED           (NODE_FLAGS_UNHEALTHY|NODE_FLAGS_PERMANENTLY_DISABLED)
++#define NODE_FLAGS_INACTIVE           (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_BANNED)
        uint32_t flags;
  
        /* used by the dead node monitoring */
@@@ -414,7 -413,7 +418,7 @@@ enum ctdb_controls {CTDB_CONTROL_PROCES
                    CTDB_CONTROL_GET_TUNABLE             = 49,
                    CTDB_CONTROL_LIST_TUNABLES           = 50,
                    CTDB_CONTROL_GET_PUBLIC_IPS          = 51,
--                  CTDB_CONTROL_PERMANENTLY_DISABLE     = 52,
++                  CTDB_CONTROL_MODIFY_FLAGS            = 52,
  };
  
  /*
@@@ -467,6 -466,6 +471,14 @@@ struct ctdb_node_flag_change 
        uint32_t flags;
  };
  
++/*
++  structure to change flags on a node
++ */
++struct ctdb_node_modflags {
++      uint32_t set;
++      uint32_t clear;
++};
++
  enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
  
  #define CTDB_LMASTER_ANY      0xffffffff
@@@ -1010,6 -1009,4 +1022,6 @@@ int32_t ctdb_control_list_tunables(stru
  
  void ctdb_tunables_set_defaults(struct ctdb_context *ctdb);
  
- int32_t ctdb_control_permdisable(struct ctdb_context *ctdb, TDB_DATA indata);
++int32_t ctdb_control_modflags(struct ctdb_context *ctdb, TDB_DATA indata);
 +
  #endif
index a90a1a08c9c127a0d9d4188a4b4daa0bbec735d5,537c3d296036c33e63a607ad35521553aaa428ce..c18203ff58a79ec1ac2ef63d1f537125c7c1b8fa
@@@ -404,6 -404,6 +404,29 @@@ static bool ctdb_same_subnet(const cha
        return true;
  }
  
++
++/*
++  try to find an available node to take a given nodes IP that meets the
++  criterion given by the flags
++ */
++static void ctdb_takeover_find_node(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap,
++                                  int start_node, uint32_t mask_flags)
++{
++      int j;
++      for (j=(start_node+1)%nodemap->num;
++           j != start_node;
++           j=(j+1)%nodemap->num) {
++              if (!(nodemap->nodes[j].flags & mask_flags) &&
++                  ctdb_same_subnet(ctdb->nodes[j]->public_address, 
++                                   ctdb->nodes[start_node]->public_address, 
++                                   ctdb->nodes[j]->public_netmask_bits)) {
++                      ctdb->nodes[start_node]->takeover_vnn = nodemap->nodes[j].vnn;
++                      break;
++              }
++      }
++}
++
++
  /*
    make any IP alias changes for public addresses that are necessary 
   */
@@@ -412,52 -412,81 +435,33 @@@ int ctdb_takeover_run(struct ctdb_conte
        int i, j;
        int ret;
        struct ctdb_public_ip ip;
 -      int takeover_node = 0;
  
-       /* work out which node will look after each public IP */
+       /* Work out which node will look after each public IP.
+        * takeover_node cycles over the nodes and is incremented each time a 
+        * node has been assigned to take over for another node.
+        * This spreads the failed nodes out across the remaining
+        * nodes more evenly
+        */
        for (i=0;i<nodemap->num;i++) {
--              if ((nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) && 
--                  !(nodemap->nodes[i].flags & NODE_FLAGS_DISABLED)) {
++              if (!(nodemap->nodes[i].flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED))) {
                        ctdb->nodes[i]->takeover_vnn = nodemap->nodes[i].vnn;
                } else {
 -                      j = takeover_node;
 -
--                      /* assign this dead nodes IP to the next higher node */
-                       for (j=(i+1)%nodemap->num;
-                            j != i;
-                            j=(j+1)%nodemap->num) {
 -                      ctdb->nodes[i]->takeover_vnn = -1;
 -                      while(1) {
--                              if ((nodemap->nodes[j].flags & NODE_FLAGS_CONNECTED) &&
--                                  !(nodemap->nodes[j].flags & NODE_FLAGS_DISABLED) &&
--                                  ctdb_same_subnet(ctdb->nodes[j]->public_address, 
--                                                   ctdb->nodes[i]->public_address, 
--                                                   ctdb->nodes[j]->public_netmask_bits)) {
--                                      ctdb->nodes[i]->takeover_vnn = nodemap->nodes[j].vnn;
 -                                      takeover_node = (takeover_node+1)%nodemap->num;
 -                                      break;
 -                              }
 -                              /* ok, try the next node instead then */
 -                              j=(j+1)%nodemap->num;
 -
 -                              /* If we wrapped back to the takeover node node
 -                               * without finding any ENABLED nodes
 -                               * there is not much to do 
 -                               */
 -                              if (j==takeover_node) {
--                                      break;
--                              }
 -                              
--                      }
++                      ctdb->nodes[i]->takeover_vnn = (uint32_t)-1;    
++
++                      ctdb_takeover_find_node(ctdb, nodemap, i, NODE_FLAGS_INACTIVE|NODE_FLAGS_DISABLED);
                        
                        /* if no enabled node can take it, then we
--                         might as well use any connected node. It
++                         might as well use any active node. It
                           probably means that some subsystem (such as
                           NFS) is sick on all nodes. Best we can do
                           is to keep the other services up. */
-                       if (j == i) {
-                               for (j=(i+1)%nodemap->num;
-                                    j != i;
-                                    j=(j+1)%nodemap->num) {
 -                      if (ctdb->nodes[i]->takeover_vnn == -1) {
 -                              j = takeover_node;
 -                              while(1) {
--                                      if ((nodemap->nodes[j].flags & NODE_FLAGS_CONNECTED) &&
--                                          ctdb_same_subnet(ctdb->nodes[j]->public_address, 
--                                                           ctdb->nodes[i]->public_address, 
--                                                           ctdb->nodes[j]->public_netmask_bits)) {
--                                              ctdb->nodes[i]->takeover_vnn = nodemap->nodes[j].vnn;
--                                              DEBUG(0,("All available nodes disabled for %s - using a connected node\n",
--                                                       ctdb->nodes[i]->public_address));
 -                                              takeover_node = (takeover_node+1)%nodemap->num;
 -                                              break;
 -                                      }
 -                                      /* ok, try the next node instead then */
 -                                      j=(j+1)%nodemap->num;
 -
 -                                      /* If we wrapped back to the takeover node node
 -                                       * without finding any ENABLED nodes
 -                                       * there is not much to do 
 -                                       */
 -                                      if (j==takeover_node) {
--                                              break;
--                                      }
--                              }
++                      if (ctdb->nodes[i]->takeover_vnn == (uint32_t)-1) {
++                              ctdb_takeover_find_node(ctdb, nodemap, i, NODE_FLAGS_INACTIVE);
                        }
--                      
-                       if (j == i) {
 -                      if (ctdb->nodes[i]->takeover_vnn == -1) {
++
++                      if (ctdb->nodes[i]->takeover_vnn == (uint32_t)-1) {
                                DEBUG(0,(__location__ " No node available on same network to take %s\n",
                                         ctdb->nodes[i]->public_address));
--                              ctdb->nodes[i]->takeover_vnn = -1;      
                        }
                }
        }       
           hold the given alias */
        for (i=0;i<nodemap->num;i++) {
                /* don't talk to unconnected nodes */
--              if (!(nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED)) continue;
++              if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
++                      continue;
++              }
  
                /* tell this node to delete all of the aliases that it should not have */
                for (j=0;j<nodemap->num;j++) {
index e44fbfb923d9d698cd571058119fd40cfb0db5ff,e44fbfb923d9d698cd571058119fd40cfb0db5ff..79717845f51470afcda609fc1a343b46de044386
@@@ -274,7 -274,7 +274,7 @@@ static int ctdb_tcp_listen_automatic(st
                                     ctdb->address.address, 
                                     ctdb->address.port);
        ctdb->vnn = ctdb->nodes[i]->vnn;
--      ctdb->nodes[i]->flags |= NODE_FLAGS_CONNECTED;
++      ctdb->nodes[i]->flags &= ~NODE_FLAGS_DISCONNECTED;
        DEBUG(1,("ctdb chose network address %s:%u vnn %u\n", 
                 ctdb->address.address, 
                 ctdb->address.port, 
diff --cc tests/nodes.txt
index 876f0c241a9f2cf805492160dadeae1dfecc47cb,876f0c241a9f2cf805492160dadeae1dfecc47cb..99b07328b383bd468885a27f4b84aa7ed35e331d
@@@ -1,3 -1,3 +1,4 @@@
  127.0.0.1
  127.0.0.2
--
++127.0.0.3
++127.0.0.4
index 3525fd672c5052723511167343ea7ff605f63686,94c5be3b41d712267c4f2f3f9b20254b5d72f3f7..f7e7ea6ce9dac8d02ac392add214a1e2a70fa0fa
@@@ -285,13 -285,13 +285,13 @@@ static int control_status(struct ctdb_c
        }
  
        if(options.machinereadable){
--              printf(":Node:IP:Connected:Disabled:Permanently Disabled:\n");
++              printf(":Node:IP:Disonnected:Disabled:Permanently Disabled:\n");
                for(i=0;i<nodemap->num;i++){
                        printf(":%d:%s:%d:%d:%d:\n", nodemap->nodes[i].vnn,
                                inet_ntoa(nodemap->nodes[i].sin.sin_addr),
--                              !!(nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED),
-                               !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
 -                              !!(nodemap->nodes[i].flags&NODE_FLAGS_DISABLED),
--                              !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED));
++                             !!(nodemap->nodes[i].flags&NODE_FLAGS_DISCONNECTED),
++                             !!(nodemap->nodes[i].flags&NODE_FLAGS_UNHEALTHY),
++                             !!(nodemap->nodes[i].flags&NODE_FLAGS_PERMANENTLY_DISABLED));
                }
                return 0;
        }
        for(i=0;i<nodemap->num;i++){
                const char *flags_str;
                if (nodemap->nodes[i].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
 -                      flags_str = "PERM DISABLED";
 -              } else if (nodemap->nodes[i].flags & NODE_FLAGS_DISABLED) {
                        flags_str = "DISABLED";
 -              } else if (nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) {
 -                      flags_str = "CONNECTED";
 +              } else if (nodemap->nodes[i].flags & NODE_FLAGS_UNHEALTHY) {
 +                      flags_str = "UNHEALTHY";
-               } else if (nodemap->nodes[i].flags & NODE_FLAGS_CONNECTED) {
-                       flags_str = "CONNECTED";
++              } else if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
++                      flags_str = "DISCONNECTED";
                } else {
--                      flags_str = "UNAVAILABLE";
++                      flags_str = "OK";
                }
                printf("vnn:%d %-16s %s%s\n", nodemap->nodes[i].vnn,
                       inet_ntoa(nodemap->nodes[i].sin.sin_addr),
@@@ -405,7 -405,7 +405,7 @@@ static int control_disable(struct ctdb_
  {
        int ret;
  
--      ret = ctdb_ctrl_permdisable(ctdb, TIMELIMIT(), options.vnn, NODE_FLAGS_PERMANENTLY_DISABLED);
++      ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.vnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
        if (ret != 0) {
                printf("Unable to disable node %u\n", options.vnn);
                return ret;
@@@ -421,7 -421,7 +421,7 @@@ static int control_enable(struct ctdb_c
  {
        int ret;
  
--      ret = ctdb_ctrl_permdisable(ctdb, TIMELIMIT(), options.vnn, 0);
++      ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.vnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
        if (ret != 0) {
                printf("Unable to enable node %u\n", options.vnn);
                return ret;
@@@ -618,7 -618,7 +618,7 @@@ static int control_getvar(struct ctdb_c
                return -1;
        }
  
--      printf("%-18s = %u\n", name, value);
++      printf("%-19s = %u\n", name, value);
        return 0;
  }