server: when we migrate off a record with data, set the MIGRATED_WITH_DATA flag
[metze/ctdb/wip.git] / server / ctdb_call.c
index e4d880d12209751e93cae64aae6daf72effda7fe..f3d93a8d972001abbd272db114972c3125304c7c 100644 (file)
@@ -21,7 +21,7 @@
   protocol design and packet details
 */
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
@@ -70,7 +70,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        int msglen, len;
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to send error. Transport is DOWN\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
                return;
        }
 
@@ -151,7 +151,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        }
 
        if (ctdb->methods == NULL) {
-               ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster sicne transport is down");
+               ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
                return;
        }
 
@@ -201,6 +201,10 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
                return;
        }
 
+       if (data->dsize != 0) {
+               header->flags &= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
+       }
+
        if (lmaster == ctdb->pnn) {
                ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
                                        c->hdr.srcnode, c->hdr.reqid);
@@ -245,8 +249,9 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        struct ctdb_call_state *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct ctdb_ltdb_header header;
+       int ret;
 
-       DEBUG(DEBUG_INFO,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
+       DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
 
        ZERO_STRUCT(header);
        header.rsn = rsn + 1;
@@ -254,7 +259,11 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
 
        if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
                ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
@@ -263,20 +272,41 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
                         ctdb->pnn, hdr->reqid, hdr->srcnode));
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+               return;
+       }
+
+       if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
+               DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
        if (hdr->reqid != state->reqid) {
                /* we found a record  but it was the wrong one */
                DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
        ctdb_call_local(ctdb_db, state->call, &header, state, &data, ctdb->pnn);
 
-       ctdb_ltdb_unlock(ctdb_db, state->call->key);
+       ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        state->state = CTDB_CALL_DONE;
        if (state->async.fn) {
@@ -332,7 +362,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
        }
 
-       DEBUG(DEBUG_INFO,("pnn %u dmaster request on %08x for %u from %u\n", 
+       DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
                 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
 
        /* its a protocol error if the sending node is not the current dmaster */
@@ -364,7 +394,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
        } else {
                ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
        }
 }
 
@@ -382,6 +416,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_call *call;
        struct ctdb_db_context *ctdb_db;
 
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
+               return;
+       }
+
+
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
                ctdb_send_error(ctdb, hdr, -1,
@@ -419,13 +459,15 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        if (header.dmaster != ctdb->pnn) {
                talloc_free(data.dptr);
                ctdb_call_send_redirect(ctdb, call->key, c, &header);
-               ctdb_ltdb_unlock(ctdb_db, call->key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
-       if (c->hopcount > ctdb->statistics.max_hop_count) {
-               ctdb->statistics.max_hop_count = c->hopcount;
-       }
+       CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
 
        /* if this nodes has done enough consecutive calls on the same record
           then give them the record
@@ -435,17 +477,30 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
             ((header.laccessor == c->hdr.srcnode
               && header.lacount >= ctdb->tunable.max_lacount)
              || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
-               DEBUG(DEBUG_INFO,("pnn %u starting migration of %08x to %u\n", 
-                        ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
-               ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
-               talloc_free(data.dptr);
-               ctdb_ltdb_unlock(ctdb_db, call->key);
-               return;
+               if (ctdb_db->transaction_active) {
+                       DEBUG(DEBUG_INFO, (__location__ " refusing migration"
+                             " of key %s while transaction is active\n",
+                             (char *)call->key.dptr));
+               } else {
+                       DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
+                                ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
+                       ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
+                       talloc_free(data.dptr);
+
+                       ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
        }
 
        ctdb_call_local(ctdb_db, call, &header, hdr, &data, c->hdr.srcnode);
 
-       ctdb_ltdb_unlock(ctdb_db, call->key);
+       ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
@@ -679,7 +734,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed send packet. Transport is down\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
                return NULL;
        }
 
@@ -762,6 +817,11 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
 {
        struct ctdb_req_keepalive *r;
        
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
+               return;
+       }
+
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
                                    sizeof(struct ctdb_req_keepalive), 
                                    struct ctdb_req_keepalive);
@@ -769,7 +829,7 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
        r->hdr.destnode  = destnode;
        r->hdr.reqid     = 0;
        
-       ctdb->statistics.keepalive_packets_sent++;
+       CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
 
        ctdb_queue_packet(ctdb, &r->hdr);