Merge commit 'ronnie-ctdb/master' into tridge
authorroot <root@h01c001mz.VSOFS1.COM>
Thu, 8 May 2008 06:46:23 +0000 (16:46 +1000)
committerroot <root@h01c001mz.VSOFS1.COM>
Thu, 8 May 2008 06:46:23 +0000 (16:46 +1000)
(This used to be ctdb commit 2ef839c95a48cd8a5f0ed385e92bdb89de71b4a2)

14 files changed:
ctdb/client/ctdb_client.c
ctdb/common/ctdb_ltdb.c
ctdb/config/ctdb.init
ctdb/config/ctdb.sysconfig
ctdb/include/ctdb.h
ctdb/include/ctdb_private.h
ctdb/packaging/RPM/ctdb.spec
ctdb/server/ctdb_control.c
ctdb/server/ctdb_ltdb_server.c
ctdb/server/ctdb_recover.c
ctdb/server/ctdb_recoverd.c
ctdb/server/ctdbd.c
ctdb/tcp/tcp_connect.c
ctdb/tools/ctdb.c

index f852e5f99b677755b5068855b7d85ffcfc66eaf3..921392c84485bd6d067a5ddbd6103685289041c8 100644 (file)
@@ -1839,19 +1839,33 @@ int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *
  */
 static int dumpdb_fn(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
 {
+       int i;
        FILE *f = (FILE *)p;
-       char *keystr, *datastr;
        struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
 
-       keystr  = hex_encode_talloc(ctdb, key.dptr, key.dsize);
-       datastr = hex_encode_talloc(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
-
        fprintf(f, "dmaster: %u\n", h->dmaster);
        fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
-       fprintf(f, "key: %s\ndata: %s\n", keystr, datastr);
 
-       talloc_free(keystr);
-       talloc_free(datastr);
+       fprintf(f, "key(%d) = \"", key.dsize);
+       for (i=0;i<key.dsize;i++) {
+               if (isascii(key.dptr[i])) {
+                       fprintf(f, "%c", key.dptr[i]);
+               } else {
+                       fprintf(f, "\\%02X", key.dptr[i]);
+               }
+       }
+       fprintf(f, "\"\n");
+
+       fprintf(f, "data(%d) = \"", data.dsize);
+       for (i=sizeof(*h);i<data.dsize;i++) {
+               if (isascii(data.dptr[i])) {
+                       fprintf(f, "%c", data.dptr[i]);
+               } else {
+                       fprintf(f, "\\%02X", data.dptr[i]);
+               }
+       }
+       fprintf(f, "\"\n");
+
        return 0;
 }
 
@@ -2657,8 +2671,11 @@ int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, ui
 static void async_callback(struct ctdb_client_control_state *state)
 {
        struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
+       struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
        int ret;
+       TDB_DATA outdata;
        int32_t res;
+       uint32_t destnode = state->c->hdr.destnode;
 
        /* one more node has responded with recmode data */
        data->count--;
@@ -2676,13 +2693,16 @@ static void async_callback(struct ctdb_client_control_state *state)
        
        state->async.fn = NULL;
 
-       ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
+       ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
        if ((ret != 0) || (res != 0)) {
                if ( !data->dont_log_errors) {
                        DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d\n", ret, (int)res));
                }
                data->fail_count++;
        }
+       if ((ret == 0) && (data->callback != NULL)) {
+               data->callback(ctdb, destnode, res, outdata);
+       }
 }
 
 
@@ -2725,15 +2745,17 @@ int ctdb_client_async_control(struct ctdb_context *ctdb,
                                uint32_t *nodes,
                                struct timeval timeout,
                                bool dont_log_errors,
-                               TDB_DATA data)
+                               TDB_DATA data,
+                               client_async_callback client_callback)
 {
        struct client_async_data *async_data;
        struct ctdb_client_control_state *state;
        int j, num_nodes;
-       
+
        async_data = talloc_zero(ctdb, struct client_async_data);
        CTDB_NO_MEMORY_FATAL(ctdb, async_data);
        async_data->dont_log_errors = dont_log_errors;
+       async_data->callback = client_callback;
 
        num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
 
@@ -2857,3 +2879,44 @@ ctdb_read_pnn_lock(int fd, int32_t pnn)
        return c;
 }
 
+/*
+  get capabilities of a remote node
+ */
+struct ctdb_client_control_state *
+ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
+{
+       return ctdb_control_send(ctdb, destnode, 0, 
+                          CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
+                          mem_ctx, &timeout, NULL);
+}
+
+int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
+{
+       int ret;
+       int32_t res;
+       TDB_DATA outdata;
+
+       ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
+       if ( (ret != 0) || (res != 0) ) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
+               return -1;
+       }
+
+       if (capabilities) {
+               *capabilities = *((uint32_t *)outdata.dptr);
+       }
+
+       return 0;
+}
+
+int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
+{
+       struct ctdb_client_control_state *state;
+       TALLOC_CTX *tmp_ctx = talloc_new(NULL);
+       int ret;
+
+       state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
+       ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
+       talloc_free(tmp_ctx);
+       return ret;
+}
index d9e4f2ab74ebd5499f0fdffc9faa9dda8c88e75e..e8a334afecafad5a591fe91b5d5962041ad94235 100644 (file)
@@ -150,25 +150,7 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
        memcpy(rec.dptr, header, sizeof(*header));
        memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
 
-       /* if this is a persistent database without NOSYNC then we
-          will do this via a transaction */
-       if (ctdb_db->persistent && !(ctdb_db->client_tdb_flags & TDB_NOSYNC)) {
-               ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
-               if (ret != 0) {
-                       DEBUG(DEBUG_CRIT, ("Failed to start local transaction\n"));
-                       goto failed;
-               }
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
-               if (ret != 0) {
-                       tdb_transaction_cancel(ctdb_db->ltdb->tdb);
-                       goto failed;
-               }
-               ret = tdb_transaction_commit(ctdb_db->ltdb->tdb);
-       } else {
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
-       }
-
-failed:
+       ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
        talloc_free(rec.dptr);
 
        return ret;
index bae52c2c8bfa2ce42c633fbe6f1ec8f2c59d7861..922a53d57fc6c09280c16d853660a9f5a6314b9d 100755 (executable)
@@ -66,6 +66,12 @@ CTDB_OPTIONS="$CTDB_OPTIONS --reclock=$CTDB_RECOVERY_LOCK"
 [ -z "$CTDB_START_AS_DISABLED" ] || [ "$CTDB_START_AS_DISABLED" != "yes" ] || {
        CTDB_OPTIONS="$CTDB_OPTIONS --start-as-disabled"
 }
+[ -z "$CTDB_CAPABILITY_RECMASTER" ] || [ "$CTDB_CAPABILITY_RECMASTER" != "no" ] || {
+       CTDB_OPTIONS="$CTDB_OPTIONS --no-recmaster"
+}
+[ -z "$CTDB_CAPABILITY_LMASTER" ] || [ "$CTDB_CAPABILITY_LMASTER" != "no" ] || {
+       CTDB_OPTIONS="$CTDB_OPTIONS --no-lmaster"
+}
 
 if [ -x /sbin/startproc ]; then
     init_style="suse"
index 9d1e4341e0e8c396069e8b0111f2664c234f960b..58edbff9d9a7b1b4cb197724b07ccdf30c672c5f 100644 (file)
 # the node with "ctdb enable"
 # CTDB_START_AS_DISABLED="yes"
 
+# LMASTER and RECMASTER capabilities.
+# By default all nodes are capable of both being LMASTER for records and
+# also for taking the RECMASTER role and perform recovery.
+# These parameters can be used to disable these two roles on a node.
+# Note: If there are NO available nodes left in a cluster that can perform
+# the RECMASTER role, the cluster will not be able to recover from a failure
+# and will remain in RECOVERY mode until an RECMASTER capable node becomes
+# available. Same for LMASTER.
+# These parametersd are useful for scenarios where you have one "remote" node
+# in a cluster and you do not want the remote node to be fully participating
+# in the cluster and slow things down.
+# For that case, set both roles to "no" for the remote node on the remote site
+# but leave the roles default to "yes" on the primary nodes in the central
+# datacentre.
+# CTDB_CAPABILITY_RECMASTER=yes
+# CTDB_CAPABILITY_LMASTER=yes
+
 # where to log messages
 # the default is /var/log/log.ctdb
 # CTDB_LOGFILE=/var/log/log.ctdb
index bfba37e178c590d4279432dc07deecdbca2af1a9..95d3f2f3929f8910b2e7868ae806db17bc2d0e4d 100644 (file)
@@ -536,4 +536,13 @@ uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
 
 int ctdb_read_pnn_lock(int fd, int32_t pnn);
 
+/*
+  get capabilities of a remote node
+ */
+int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities);
+
+struct ctdb_client_control_state *ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode);
+
+int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities);
+
 #endif
index d51e2f7f48f1fc0b84521ae9fd212603a1f312df..d31b148fcaaa91a63efb582f5aad3c2b320d53d3 100644 (file)
@@ -199,6 +199,11 @@ struct ctdb_node {
        uint32_t rx_cnt;
        uint32_t tx_cnt;
 
+       /* used to track node capabilities, is only valid/tracked inside the
+          recovery daemon.
+       */
+       uint32_t capabilities;
+
        /* a list of controls pending to this node, so we can time them out quickly
           if the node becomes disconnected */
        struct daemon_control_state *pending_controls;
@@ -332,6 +337,10 @@ enum ctdb_freeze_mode {CTDB_FREEZE_NONE, CTDB_FREEZE_PENDING, CTDB_FREEZE_FROZEN
 #define CTDB_MONITORING_ACTIVE         0
 #define CTDB_MONITORING_DISABLED       1
 
+/* The different capabilities of the ctdb daemon. */
+#define CTDB_CAP_RECMASTER             0x00000001
+#define CTDB_CAP_LMASTER               0x00000002
+
 /* main state of the ctdb daemon */
 struct ctdb_context {
        struct event_context *ev;
@@ -356,6 +365,7 @@ struct ctdb_context {
        uint32_t num_nodes;
        uint32_t num_connected;
        unsigned flags;
+       uint32_t capabilities;
        struct idr_context *idr;
        uint16_t idr_cnt;
        struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
@@ -400,7 +410,6 @@ struct ctdb_db_context {
        struct ctdb_registered_call *calls; /* list of registered calls */
        uint32_t seqnum;
        struct timed_event *te;
-       uint32_t client_tdb_flags;
 };
 
 
@@ -514,6 +523,7 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS          = 0,
                    CTDB_CONTROL_ADD_PUBLIC_IP           = 77,
                    CTDB_CONTROL_DEL_PUBLIC_IP           = 78,
                    CTDB_CONTROL_RUN_EVENTSCRIPTS        = 79,
+                   CTDB_CONTROL_GET_CAPABILITIES        = 80,
 };     
 
 /*
@@ -911,7 +921,7 @@ int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
                             void *private_data);
 
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata, 
-                              TDB_DATA *outdata, uint64_t tdb_flags, bool persistent);
+                              TDB_DATA *outdata, bool persistent);
 
 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
                         ctdb_fn_t fn, int id);
@@ -1271,10 +1281,13 @@ int32_t ctdb_monitoring_mode(struct ctdb_context *ctdb);
 int ctdb_set_child_logging(struct ctdb_context *ctdb);
 
 
+typedef void (*client_async_callback)(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata);
+
 struct client_async_data {
        bool dont_log_errors;
        uint32_t count;
        uint32_t fail_count;
+       client_async_callback callback;
 };
 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state);
 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data);
@@ -1283,12 +1296,14 @@ int ctdb_client_async_control(struct ctdb_context *ctdb,
                                uint32_t *nodes,
                                struct timeval timeout,
                                bool dont_log_errors,
-                               TDB_DATA data);
+                               TDB_DATA data,
+                               client_async_callback client_callback);
 
 void ctdb_load_nodes_file(struct ctdb_context *ctdb);
 
 int ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode);
 
 int32_t ctdb_dump_memory(struct ctdb_context *ctdb, TDB_DATA *outdata);
+int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata);
 
 #endif
index 69747390209e7910a92ce2cc287faeb056588c80..c0ed54cae3edba4bf723619187998c8f6a9b7e2a 100644 (file)
@@ -5,7 +5,7 @@ Vendor: Samba Team
 Packager: Samba Team <samba@samba.org>
 Name: ctdb
 Version: 1.0
-Release: 33
+Release: 35
 Epoch: 0
 License: GNU GPL version 3
 Group: System Environment/Daemons
@@ -120,6 +120,36 @@ fi
 %{_includedir}/ctdb_private.h
 
 %changelog
+* Wed May 7 2008 : Version 1.0.35
+ - During recovery, when we define the new set of lmasters (vnnmap)
+   only consider those nodes that have the can-be-lmaster capability
+   when we create the vnnmap. unless there are no nodes available which
+   supports this capability in which case we allow the recmaster to
+   become lmaster capable (temporarily).
+ - Extend the async framework so that we can use paralell async calls
+   to controls that return data.
+ - If we do not have the "can be recmaster" capability, make sure we will
+   lose any recmaster elections, unless there are no nodes available that
+   have the capability, in which case we "take/win" the election anyway.
+ - Close and reopen the reclock pnn file at regular intervals.
+   Make it a non-fatal event if we occasionally fail to open/read/write
+   to this file.
+ - Monitor that the recovery daemon is still running from the main ctdb
+   daemon and shutdown the main daemon when recovery daemon has terminated.
+ - Add a "ctdb getcapabilities" command to read the capabilities off a node.
+ - Define two new capabilities : can be recmaster and can be lmaster
+   and default both capabilities to YES.
+ - Log denied tcp connection attempts with DEBUG_ERR and not DEBUG_WARNING
+* Thu Apr 24 2008 : Version 1.0.34
+ - When deleting a public ip from a node, try to migrate the ip to a different
+   node first.
+ - Change catdb to produce output similar to tdbdump
+ - When adding a new public ip address, if this ip does not exist yet in
+   the cluster, then grab the ip on the local node and activate it.
+ - When a node disagrees with the recmaster on WHO is the recmaster, then
+   mark that node as a recovery culprit so it will eventually become
+   banned.
+ - Make ctdb eventscript support the -n all argument.
 * Thu Apr 10 2008 : Version 1.0.33
  - Add facilities to include site local adaptations to the eventscript
    by /etc/ctdb/rc.local which will be read by all eventscripts.
index 4de27305da21a713faf98e4482de29f95b00008c..6c8a4fc1346fd32cedd17fe1208679fcd6dd051d 100644 (file)
@@ -206,10 +206,10 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
        }
 
        case CTDB_CONTROL_DB_ATTACH:
-               return ctdb_control_db_attach(ctdb, indata, outdata, srvid, false);
+               return ctdb_control_db_attach(ctdb, indata, outdata, false);
 
        case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
-               return ctdb_control_db_attach(ctdb, indata, outdata, srvid, true);
+               return ctdb_control_db_attach(ctdb, indata, outdata, true);
 
        case CTDB_CONTROL_SET_CALL: {
                struct ctdb_control_set_call *sc = 
@@ -389,6 +389,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
        case CTDB_CONTROL_DEL_PUBLIC_IP:
                return ctdb_control_del_public_address(ctdb, indata);
 
+       case CTDB_CONTROL_GET_CAPABILITIES:
+               return ctdb_control_get_capabilities(ctdb, outdata);
+
        default:
                DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 5146ed86dae60bef1b20ebd56e63e01e917752da..e900f7b9ca8c80b839e4498263864e62569b191f 100644 (file)
@@ -296,19 +296,12 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
   a client has asked to attach a new database
  */
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
-                              TDB_DATA *outdata, uint64_t tdb_flags, 
-                              bool persistent)
+                              TDB_DATA *outdata, bool persistent)
 {
        const char *db_name = (const char *)indata.dptr;
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
 
-       /* the client can optionally pass additional tdb flags, but we
-          only allow a subset of those on the database in ctdb. Note
-          that tdb_flags is passed in via the (otherwise unused)
-          srvid to the attach control */
-       tdb_flags &= TDB_NOSYNC;
-
        /* If the node is inactive it is not part of the cluster
           and we should not allow clients to attach to any
           databases
@@ -324,7 +317,6 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        if (db) {
                outdata->dptr  = (uint8_t *)&db->db_id;
                outdata->dsize = sizeof(db->db_id);
-               db->client_tdb_flags |= tdb_flags;
                return 0;
        }
 
@@ -338,9 +330,6 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return -1;
        }
 
-       /* remember the flags the client has specified */
-       db->client_tdb_flags = tdb_flags;
-
        outdata->dptr  = (uint8_t *)&db->db_id;
        outdata->dsize = sizeof(db->db_id);
 
index 83e5424abe23d35d5d8fdd262ee816d4f86b5dfa..7a96733e92f5e58f4a0e407781e97dd41b22d444 100644 (file)
@@ -957,3 +957,21 @@ int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA inda
 
        return 0;
 }
+
+/*
+  report capabilities
+ */
+int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
+{
+       uint32_t *capabilities = NULL;
+
+       capabilities = talloc(outdata, uint32_t);
+       CTDB_NO_MEMORY(ctdb, capabilities);
+       *capabilities = ctdb->capabilities;
+
+       outdata->dsize = sizeof(uint32_t);
+       outdata->dptr = (uint8_t *)capabilities;
+
+       return 0;       
+}
+
index 0d1ef02eb988fbec458f7e4640f9c11ee43db85e..c3dff32b4ade0608553c44779444e7089ddfae4c 100644 (file)
@@ -212,7 +212,7 @@ static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
                        list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, tdb_null) != 0) {
+                       CONTROL_TIMEOUT(), false, tdb_null, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -234,7 +234,7 @@ static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
                        list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, tdb_null) != 0) {
+                       CONTROL_TIMEOUT(), false, tdb_null, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -244,6 +244,40 @@ static int run_startrecovery_eventscript(struct ctdb_context *ctdb, struct ctdb_
        return 0;
 }
 
+static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata)
+{
+       if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
+               DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %d %p\n", outdata.dsize, outdata.dptr));
+               return;
+       }
+       ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
+}
+
+/*
+  update the node capabilities for all connected nodes
+ */
+static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
+{
+       uint32_t *nodes;
+       TALLOC_CTX *tmp_ctx;
+
+       tmp_ctx = talloc_new(ctdb);
+       CTDB_NO_MEMORY(ctdb, tmp_ctx);
+
+       nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
+
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
+                                       nodes, CONTROL_TIMEOUT(),
+                                       false, tdb_null, async_getcap_callback) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
 /*
   change recovery mode on all nodes
  */
@@ -262,7 +296,7 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        if (rec_mode == CTDB_RECOVERY_ACTIVE) {
                if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
                                                nodes, CONTROL_TIMEOUT(),
-                                               false, tdb_null) != 0) {
+                                               false, tdb_null, NULL) != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
                        talloc_free(tmp_ctx);
                        return -1;
@@ -275,7 +309,7 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
                                        nodes, CONTROL_TIMEOUT(),
-                                       false, data) != 0) {
+                                       false, data, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -284,7 +318,7 @@ static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_node_map *no
        if (rec_mode == CTDB_RECOVERY_NORMAL) {
                if (ctdb_client_async_control(ctdb, CTDB_CONTROL_THAW,
                                                nodes, CONTROL_TIMEOUT(),
-                                               false, tdb_null) != 0) {
+                                               false, tdb_null, NULL) != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Unable to thaw nodes. Recovery failed.\n"));
                        talloc_free(tmp_ctx);
                        return -1;
@@ -311,7 +345,7 @@ static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
                        list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                       CONTROL_TIMEOUT(), false, data, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
                talloc_free(tmp_ctx);
                return -1;
@@ -1142,7 +1176,7 @@ static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
                        list_of_active_nodes(ctdb, nodemap, tmp_ctx, true),
-                       CONTROL_TIMEOUT(), false, outdata) != 0) {
+                       CONTROL_TIMEOUT(), false, outdata, NULL) != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
                talloc_free(recdata);
                talloc_free(tmp_ctx);
@@ -1198,7 +1232,7 @@ static int recover_database(struct ctdb_recoverd *rec,
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
                        list_of_active_nodes(ctdb, nodemap, recdb, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                       CONTROL_TIMEOUT(), false, data, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
                talloc_free(recdb);
                return -1;
@@ -1321,7 +1355,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
 
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
                        list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                       CONTROL_TIMEOUT(), false, data, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
                return -1;
        }
@@ -1340,7 +1374,7 @@ static int do_recovery(struct ctdb_recoverd *rec,
        /* commit all the changes */
        if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
                        list_of_active_nodes(ctdb, nodemap, mem_ctx, true),
-                       CONTROL_TIMEOUT(), false, data) != 0) {
+                       CONTROL_TIMEOUT(), false, data, NULL) != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
                return -1;
        }
@@ -1348,19 +1382,45 @@ static int do_recovery(struct ctdb_recoverd *rec,
        DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
        
 
+       /* update the capabilities for all nodes */
+       ret = update_capabilities(ctdb, nodemap);
+       if (ret!=0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
+               return -1;
+       }
+
        /* build a new vnn map with all the currently active and
           unbanned nodes */
        generation = new_generation();
        vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
        CTDB_NO_MEMORY(ctdb, vnnmap);
        vnnmap->generation = generation;
-       vnnmap->size = rec->num_active;
+       vnnmap->size = 0;
        vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
+       CTDB_NO_MEMORY(ctdb, vnnmap->map);
        for (i=j=0;i<nodemap->num;i++) {
-               if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
-                       vnnmap->map[j++] = nodemap->nodes[i].pnn;
+               if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
+                       continue;
                }
+               if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
+                       /* this node can not be an lmaster */
+                       DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
+                       continue;
+               }
+
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc_size(vnnmap, vnnmap->map, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[j++] = nodemap->nodes[i].pnn;
+
        }
+       if (vnnmap->size == 0) {
+               DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
+               vnnmap->size++;
+               vnnmap->map = talloc_realloc_size(vnnmap, vnnmap->map, vnnmap->size);
+               CTDB_NO_MEMORY(ctdb, vnnmap->map);
+               vnnmap->map[0] = pnn;
+       }       
 
        /* update to the new vnnmap on all nodes */
        ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
@@ -1481,6 +1541,13 @@ static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_messag
                        em->num_connected++;
                }
        }
+
+       /* we shouldnt try to win this election if we cant be a recmaster */
+       if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
+               em->num_connected = 0;
+               em->priority_time = timeval_current();
+       }
+
        talloc_free(nodemap);
 }
 
@@ -1494,6 +1561,11 @@ static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message
 
        ctdb_election_data(rec, &myem);
 
+       /* we cant win if we dont have the recmaster capability */
+       if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
+               return false;
+       }
+
        /* we cant win if we are banned */
        if (rec->node_flags & NODE_FLAGS_BANNED) {
                return false;
@@ -1914,6 +1986,7 @@ static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb
 
 
 struct verify_recmaster_data {
+       struct ctdb_recoverd *rec;
        uint32_t count;
        uint32_t pnn;
        enum monitor_result status;
@@ -1942,6 +2015,7 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
        */
        if (state->status != rmdata->pnn) {
                DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
+               ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
                rmdata->status = MONITOR_ELECTION_NEEDED;
        }
 
@@ -1950,8 +2024,9 @@ static void verify_recmaster_callback(struct ctdb_client_control_state *state)
 
 
 /* verify that all nodes agree that we are the recmaster */
-static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
+static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
 {
+       struct ctdb_context *ctdb = rec->ctdb;
        struct verify_recmaster_data *rmdata;
        TALLOC_CTX *mem_ctx = talloc_new(ctdb);
        struct ctdb_client_control_state *state;
@@ -1960,6 +2035,7 @@ static enum monitor_result verify_recmaster(struct ctdb_context *ctdb, struct ct
        
        rmdata = talloc(mem_ctx, struct verify_recmaster_data);
        CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
+       rmdata->rec    = rec;
        rmdata->count  = 0;
        rmdata->pnn    = pnn;
        rmdata->status = MONITOR_OK;
@@ -2013,8 +2089,15 @@ ctdb_recoverd_write_pnn_connect_count(struct ctdb_recoverd *rec)
        const char count = rec->num_connected;
        struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
 
+       if (rec->rec_file_fd == -1) {
+               DEBUG(DEBUG_CRIT,(__location__ " Unable to write pnn count. pnnfile is not open.\n"));
+               return;
+       } 
+
        if (pwrite(rec->rec_file_fd, &count, 1, ctdb->pnn) == -1) {
                DEBUG(DEBUG_CRIT, (__location__ " Failed to write pnn count\n"));
+               close(rec->rec_file_fd);
+               rec->rec_file_fd = -1;
        }
 }
 
@@ -2034,8 +2117,8 @@ ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
        DEBUG(DEBUG_INFO, ("Setting PNN lock for pnn:%d\n", ctdb->pnn));
 
        if (rec->rec_file_fd != -1) {
-               DEBUG(DEBUG_CRIT, (__location__ " rec_lock_fd is already open. Aborting\n"));
-               exit(10);
+               close(rec->rec_file_fd);
+               rec->rec_file_fd = -1;
        }
 
        pnnfile = talloc_asprintf(rec, "%s.pnn", ctdb->recovery_lock_file);
@@ -2045,7 +2128,8 @@ ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
        if (rec->rec_file_fd == -1) {
                DEBUG(DEBUG_CRIT,(__location__ " Unable to open %s - (%s)\n", 
                         pnnfile, strerror(errno)));
-               exit(10);
+               talloc_free(pnnfile);
+               return;
        }
 
        set_close_on_exec(rec->rec_file_fd);
@@ -2059,12 +2143,12 @@ ctdb_recoverd_get_pnn_lock(struct ctdb_recoverd *rec)
                close(rec->rec_file_fd);
                rec->rec_file_fd = -1;
                DEBUG(DEBUG_CRIT,(__location__ " Failed to get pnn lock on '%s'\n", pnnfile));
-               exit(10);
+               talloc_free(pnnfile);
+               return;
        }
 
 
        DEBUG(DEBUG_NOTICE,(__location__ " Got pnn lock on '%s'\n", pnnfile));
-
        talloc_free(pnnfile);
 
        /* we start out with 0 connected nodes */
@@ -2082,6 +2166,9 @@ static void ctdb_update_pnn_count(struct event_context *ev, struct timed_event *
        struct ctdb_context *ctdb     = rec->ctdb;
        struct ctdb_node_map *nodemap = rec->nodemap;
 
+       /* close and reopen the pnn lock file */
+       ctdb_recoverd_get_pnn_lock(rec);
+
        ctdb_recoverd_write_pnn_connect_count(rec);
 
        event_add_timed(rec->ctdb->ev, rec->ctdb,
@@ -2104,6 +2191,10 @@ static void ctdb_update_pnn_count(struct event_context *ev, struct timed_event *
                return;
        }
        if (ctdb->recovery_lock_fd == -1) {
+               DEBUG(DEBUG_ERR, (__location__ " Lost reclock pnn file. Yielding recmaster role\n"));
+               close(ctdb->recovery_lock_fd);
+               ctdb->recovery_lock_fd = -1;
+               force_election(rec, ctdb->pnn, rec->nodemap);
                return;
        }
        for (i=0; i<nodemap->num; i++) {
@@ -2453,7 +2544,7 @@ again:
 
 
        /* verify that all active nodes agree that we are the recmaster */
-       switch (verify_recmaster(ctdb, nodemap, pnn)) {
+       switch (verify_recmaster(rec, nodemap, pnn)) {
        case MONITOR_RECOVERY_NEEDED:
                /* can not happen */
                goto again;
@@ -2683,6 +2774,37 @@ static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde,
        _exit(1);
 }
 
+/*
+  called regularly to verify that the recovery daemon is still running
+ */
+static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *p)
+{
+       struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
+
+       /* make sure we harvest the child if signals are blocked for some
+          reason
+       */
+       waitpid(ctdb->recoverd_pid, 0, WNOHANG);
+
+       if (kill(ctdb->recoverd_pid, 0) != 0) {
+               DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
+
+               ctdb_stop_recoverd(ctdb);
+               ctdb_stop_keepalive(ctdb);
+               ctdb_stop_monitoring(ctdb);
+               ctdb_release_all_ips(ctdb);
+               ctdb->methods->shutdown(ctdb);
+               ctdb_event_script(ctdb, "shutdown");
+
+               exit(10);       
+       }
+
+       event_add_timed(ctdb->ev, ctdb, 
+                       timeval_current_ofs(30, 0),
+                       ctdb_check_recd, ctdb);
+}
+
 /*
   startup the recovery daemon as a child of the main ctdb daemon
  */
@@ -2704,6 +2826,9 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
        
        if (ctdb->recoverd_pid != 0) {
                close(fd[0]);
+               event_add_timed(ctdb->ev, ctdb, 
+                               timeval_current_ofs(30, 0),
+                               ctdb_check_recd, ctdb);
                return 0;
        }
 
index 0686e0b8296a0e9eba1d6ed74059cb3eb8963cc9..c21d4349e94fe186b894e35706c173962d893590 100644 (file)
@@ -43,6 +43,8 @@ static struct {
        int         no_setsched;
        int         use_syslog;
        int         start_as_disabled;
+       int         no_lmaster;
+       int         no_recmaster;
 } options = {
        .nlist = ETCDIR "/ctdb/nodes",
        .transport = "tcp",
@@ -120,6 +122,8 @@ int main(int argc, const char *argv[])
                { "nosetsched", 0, POPT_ARG_NONE, &options.no_setsched, 0, "disable setscheduler SCHED_FIFO call", NULL },
                { "syslog", 0, POPT_ARG_NONE, &options.use_syslog, 0, "log messages to syslog", NULL },
                { "start-as-disabled", 0, POPT_ARG_NONE, &options.start_as_disabled, 0, "Node starts in disabled state", NULL },
+               { "no-lmaster", 0, POPT_ARG_NONE, &options.no_lmaster, 0, "disable lmaster role on this node", NULL },
+               { "no-recmaster", 0, POPT_ARG_NONE, &options.no_recmaster, 0, "disable recmaster role on this node", NULL },
                POPT_TABLEEND
        };
        int opt, ret;
@@ -200,6 +204,15 @@ int main(int argc, const char *argv[])
                }
        }
 
+       /* set ctdbd capabilities */
+       ctdb->capabilities = 0;
+       if (options.no_lmaster == 0) {
+               ctdb->capabilities |= CTDB_CAP_LMASTER;
+       }
+       if (options.no_recmaster == 0) {
+               ctdb->capabilities |= CTDB_CAP_RECMASTER;
+       }
+
        /* tell ctdb what nodes are available */
        ctdb_load_nodes_file(ctdb);
 
index 0ead087c5bd5d56846798c3e3f27ed910dc3607e..f3b4f7d3089ea9d18dc9904116843862dbf539c0 100644 (file)
@@ -217,7 +217,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
        nodeid = ctdb_ip_to_nodeid(ctdb, incoming_node);
 
        if (nodeid == -1) {
-               DEBUG(DEBUG_WARNING, ("Refused connection from unknown node %s\n", incoming_node));
+               DEBUG(DEBUG_ERR, ("Refused connection from unknown node %s\n", incoming_node));
                close(fd);
                return;
        }
index 94e681fed4a79b80138c0c8703eb929d06f5e0e6..6f78b6540841fc0732f8aa3dc8eea8988cd33eea 100644 (file)
@@ -28,6 +28,7 @@
 #include "cmdline.h"
 #include "../include/ctdb.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 
 static void usage(void);
 
@@ -416,6 +417,40 @@ static int control_get_tickles(struct ctdb_context *ctdb, int argc, const char *
        return 0;
 }
 
+/* send a release ip to all nodes */
+static int control_send_release(struct ctdb_context *ctdb, uint32_t pnn,
+struct sockaddr_in *sin)
+{
+       int ret;
+       struct ctdb_public_ip pip;
+       TDB_DATA data;
+       struct ctdb_node_map *nodemap=NULL;
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
+               return ret;
+       }
+
+       /* send a moveip message to the recovery master */
+       pip.pnn = pnn;
+       pip.sin.sin_family = AF_INET;
+       pip.sin.sin_addr   = sin->sin_addr;
+       data.dsize = sizeof(pip);
+       data.dptr = (unsigned char *)&pip;
+
+
+       /* send release ip to all nodes */
+       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
+                       list_of_active_nodes(ctdb, nodemap, ctdb, true),
+                       TIMELIMIT(), false, data, NULL) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Unable to send 'ReleaseIP' to all nodes.\n"));
+               return -1;
+       }
+
+       return 0;
+}
+
 /*
   move/failover an ip address to a specific node
  */
@@ -425,9 +460,6 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
        struct sockaddr_in ip;
        uint32_t value;
        struct ctdb_all_public_ips *ips;
-       struct ctdb_public_ip pip;
-       TDB_DATA data;
-       struct ctdb_node_map *nodemap=NULL;
        int i, ret;
 
        if (argc < 2) {
@@ -441,12 +473,6 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
        }
 
 
-       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
-               return ret;
-       }
-
        if (sscanf(argv[1], "%u", &pnn) != 1) {
                DEBUG(DEBUG_ERR, ("Badly formed pnn\n"));
                return -1;
@@ -495,47 +521,175 @@ static int control_moveip(struct ctdb_context *ctdb, int argc, const char **argv
                return -1;
        }
 
-       /* send a moveip message to the recovery master */
-       pip.pnn = pnn;
-       pip.sin.sin_family = AF_INET;
-       pip.sin.sin_addr   = ips->ips[i].sin.sin_addr;
-       data.dsize = sizeof(pip);
-       data.dptr = (unsigned char *)&pip;
+       ret = control_send_release(ctdb, pnn, &ips->ips[i].sin);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to send 'change ip' to all nodes\n"));;
+               return -1;
+       }
 
+       return 0;
+}
 
-       /* send release ip to all nodes */
-       if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELEASE_IP,
-                       list_of_active_nodes(ctdb, nodemap, ctdb, true),
-                       TIMELIMIT(), false, data) != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Unable to send 'ReleaseIP' to all nodes.\n"));
-               return -1;
+struct node_ip {
+       uint32_t pnn;
+       struct sockaddr_in sin;
+};
+
+void getips_store_callback(void *param, void *data)
+{
+       struct node_ip *node_ip = (struct node_ip *)data;
+       struct ctdb_all_public_ips *ips = param;
+       int i;
+
+       i = ips->num++;
+       ips->ips[i].pnn = node_ip->pnn;
+       ips->ips[i].sin = node_ip->sin;
+}
+
+void getips_count_callback(void *param, void *data)
+{
+       uint32_t *count = param;
+
+       (*count)++;
+}
+
+static int
+control_get_all_public_ips(struct ctdb_context *ctdb, TALLOC_CTX *tmp_ctx, struct ctdb_all_public_ips **ips)
+{
+       struct ctdb_all_public_ips *tmp_ips;
+       struct ctdb_node_map *nodemap=NULL;
+       trbt_tree_t *tree;
+       int i, j, len, ret;
+       uint32_t count;
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               return ret;
        }
 
+       tree = trbt_create(tmp_ctx, 0);
+
+       for(i=0;i<nodemap->num;i++){
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+
+               /* read the public ip list from this node */
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &tmp_ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
+                       return -1;
+               }
+       
+               for (j=0; j<tmp_ips->num;j++) {
+                       struct node_ip *node_ip;
+
+                       node_ip = talloc(tmp_ctx, struct node_ip);
+                       node_ip->pnn = tmp_ips->ips[j].pnn;
+                       node_ip->sin = tmp_ips->ips[j].sin;
+
+                       trbt_insert32(tree, tmp_ips->ips[j].sin.sin_addr.s_addr, node_ip);
+               }
+               talloc_free(tmp_ips);
+       }
+
+       /* traverse */
+       count = 0;
+       trbt_traversearray32(tree, 1, getips_count_callback, &count);
+
+       len = offsetof(struct ctdb_all_public_ips, ips) + 
+               count*sizeof(struct ctdb_public_ip);
+       tmp_ips = talloc_zero_size(tmp_ctx, len);
+       trbt_traversearray32(tree, 1, getips_store_callback, tmp_ips);
+
+       *ips = tmp_ips;
+
        return 0;
 }
 
+
+/* 
+ * scans all other nodes and returns a pnn for another node that can host this 
+ * ip address or -1
+ */
+static int
+find_other_host_for_public_ip(struct ctdb_context *ctdb, struct sockaddr_in *addr)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_all_public_ips *ips;
+       struct ctdb_node_map *nodemap=NULL;
+       int i, j, ret;
+
+       ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get nodemap from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       for(i=0;i<nodemap->num;i++){
+               if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
+                       continue;
+               }
+               if (nodemap->nodes[i].pnn == options.pnn) {
+                       continue;
+               }
+
+               /* read the public ip list from this node */
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), nodemap->nodes[i].pnn, tmp_ctx, &ips);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, ("Unable to get public ip list from node %u\n", nodemap->nodes[i].pnn));
+                       return -1;
+               }
+
+               for (j=0;j<ips->num;j++) {
+                       if (ctdb_same_ip(addr, &ips->ips[j].sin)) {
+                               talloc_free(tmp_ctx);
+                               return nodemap->nodes[i].pnn;
+                       }
+               }
+               talloc_free(ips);
+       }
+
+       talloc_free(tmp_ctx);
+       return -1;
+}
+
 /*
   add a public ip address to a node
  */
 static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int ret;
+       int i, ret;
        int len;
        unsigned mask;
        struct sockaddr_in addr;
        struct ctdb_control_ip_iface *pub;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_all_public_ips *ips;
 
        if (argc != 2) {
+               talloc_free(tmp_ctx);
                usage();
        }
 
        if (!parse_ip_mask(argv[0], &addr, &mask)) {
                DEBUG(DEBUG_ERR, ("Badly formed ip/mask : %s\n", argv[0]));
+               talloc_free(tmp_ctx);
                return -1;
        }
 
+       ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+
        len = offsetof(struct ctdb_control_ip_iface, iface) + strlen(argv[1]) + 1;
-       pub = talloc_size(ctdb, len); 
+       pub = talloc_size(tmp_ctx, len); 
        CTDB_NO_MEMORY(ctdb, pub);
 
        pub->sin   = addr;
@@ -546,9 +700,32 @@ static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
        ret = ctdb_ctrl_add_public_ip(ctdb, TIMELIMIT(), options.pnn, pub);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to add public ip to node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return ret;
        }
 
+
+       /* check if some other node is already serving this ip, if not,
+        * we will claim it
+        */
+       for (i=0;i<ips->num;i++) {
+               if (ctdb_same_ip(&addr, &ips->ips[i].sin)) {
+                       break;
+               }
+       }
+       /* no one has this ip so we claim it */
+       if (i == ips->num) {
+               ret = control_send_release(ctdb, options.pnn, &addr);
+       } else {
+               ret = control_send_release(ctdb, ips->ips[i].pnn, &addr);
+       }
+
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to send 'change ip' to all nodes\n"));
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -557,11 +734,14 @@ static int control_addip(struct ctdb_context *ctdb, int argc, const char **argv)
  */
 static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
 {
-       int ret;
+       int i, ret;
        struct sockaddr_in addr;
        struct ctdb_control_ip_iface pub;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+       struct ctdb_all_public_ips *ips;
 
        if (argc != 1) {
+               talloc_free(tmp_ctx);
                usage();
        }
 
@@ -575,12 +755,44 @@ static int control_delip(struct ctdb_context *ctdb, int argc, const char **argv)
        pub.mask  = 0;
        pub.len   = 0;
 
+       ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get public ip list from cluster\n"));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+       
+       for (i=0;i<ips->num;i++) {
+               if (ctdb_same_ip(&addr, &ips->ips[i].sin)) {
+                       break;
+               }
+       }
+
+       if (i==ips->num) {
+               DEBUG(DEBUG_ERR, ("This node does not support this public address '%s'\n",
+                       inet_ntoa(addr.sin_addr)));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       if (ips->ips[i].pnn == options.pnn) {
+               ret = find_other_host_for_public_ip(ctdb, &addr);
+               if (ret != -1) {
+                       ret = control_send_release(ctdb, ret, &addr);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR, ("Failed to migrate this ip to another node. Use moveip of recover to reassign this address to a node\n"));
+                       }
+               }
+       }
+
        ret = ctdb_ctrl_del_public_ip(ctdb, TIMELIMIT(), options.pnn, &pub);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to del public ip from node %u\n", options.pnn));
+               talloc_free(tmp_ctx);
                return ret;
        }
 
+       talloc_free(tmp_ctx);
        return 0;
 }
 
@@ -794,8 +1006,13 @@ static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
        TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
        struct ctdb_all_public_ips *ips;
 
-       /* read the public ip list from this node */
-       ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
+       if (options.pnn == CTDB_BROADCAST_ALL) {
+               /* read the list of public ips from all nodes */
+               ret = control_get_all_public_ips(ctdb, tmp_ctx, &ips);
+       } else {
+               /* read the public ip list from this node */
+               ret = ctdb_ctrl_get_public_ips(ctdb, TIMELIMIT(), options.pnn, tmp_ctx, &ips);
+       }
        if (ret != 0) {
                DEBUG(DEBUG_ERR, ("Unable to get public ips from node %u\n", options.pnn));
                talloc_free(tmp_ctx);
@@ -805,7 +1022,11 @@ static int control_ip(struct ctdb_context *ctdb, int argc, const char **argv)
        if (options.machinereadable){
                printf(":Public IP:Node:\n");
        } else {
-               printf("Public IPs on node %u\n", options.pnn);
+               if (options.pnn == CTDB_BROADCAST_ALL) {
+                       printf("Public IPs on ALL nodes\n");
+               } else {
+                       printf("Public IPs on node %u\n", options.pnn);
+               }
        }
 
        for (i=1;i<=ips->num;i++) {
@@ -981,6 +1202,32 @@ static int control_getmonmode(struct ctdb_context *ctdb, int argc, const char **
 }
 
 
+/*
+  display capabilities of a remote node
+ */
+static int control_getcapabilities(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+       uint32_t capabilities;
+       int ret;
+
+       ret = ctdb_ctrl_getcapabilities(ctdb, TIMELIMIT(), options.pnn, &capabilities);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to get capabilities from node %u\n", options.pnn));
+               return ret;
+       }
+       
+       if (!options.machinereadable){
+               printf("RECMASTER: %s\n", (capabilities&CTDB_CAP_RECMASTER)?"YES":"NO");
+               printf("LMASTER: %s\n", (capabilities&CTDB_CAP_LMASTER)?"YES":"NO");
+       } else {
+               printf(":RECMASTER:LMASTER:\n");
+               printf(":%d:%d:\n",
+                       !!(capabilities&CTDB_CAP_RECMASTER),
+                       !!(capabilities&CTDB_CAP_LMASTER));
+       }
+       return 0;
+}
+
 /*
   disable monitoring on a  node
  */
@@ -1582,11 +1829,12 @@ static const struct {
        { "listvars",        control_listvars,          true,  "list tunable variables"},
        { "statistics",      control_statistics,        false, "show statistics" },
        { "statisticsreset", control_statistics_reset,  true,  "reset statistics"},
-       { "ip",              control_ip,                true,  "show which public ip's that ctdb manages" },
+       { "ip",              control_ip,                false,  "show which public ip's that ctdb manages" },
        { "process-exists",  control_process_exists,    true,  "check if a process exists on a node",  "<pid>"},
        { "getdbmap",        control_getdbmap,          true,  "show the database map" },
        { "catdb",           control_catdb,             true,  "dump a database" ,                     "<dbname>"},
        { "getmonmode",      control_getmonmode,        true,  "show monitoring mode" },
+       { "getcapabilities", control_getcapabilities,   true,  "show node capabilities" },
        { "disablemonitor",      control_disable_monmode,        true,  "set monitoring mode to DISABLE" },
        { "enablemonitor",      control_enable_monmode,        true,  "set monitoring mode to ACTIVE" },
        { "setdebug",        control_setdebug,          true,  "set debug level",                      "<EMERG|ALERT|CRIT|ERR|WARNING|NOTICE|INFO|DEBUG>" },
@@ -1619,9 +1867,9 @@ static const struct {
        { "reloadnodes",     control_reload_nodes_file,         false, "reload the nodes file and restart the transport on all nodes"},
        { "getreclock",      control_getreclock,        false,  "get the path to the reclock file" },
        { "moveip",          control_moveip,            false, "move/failover an ip address to another node", "<ip> <node>"},
-       { "addip",           control_addip,             false, "add a ip address to a node", "<ip/mask> <iface>"},
+       { "addip",           control_addip,             true, "add a ip address to a node", "<ip/mask> <iface>"},
        { "delip",           control_delip,             false, "delete an ip address from a node", "<ip>"},
-       { "eventscript",     control_eventscript,       false, "run the eventscript with the given parameters on a node", "<arguments>"},
+       { "eventscript",     control_eventscript,       true, "run the eventscript with the given parameters on a node", "<arguments>"},
 };
 
 /*