Revert "server: when we migrate off a record with data, set the MIGRATED_WITH_DATA...
[sahlberg/ctdb.git] / server / ctdb_call.c
index 8129668751c5a16caa72d34ee6d5cabf92c880d9..d6c08666a6c28fc22403f3745dc48853daf1eeaa 100644 (file)
@@ -21,7 +21,7 @@
   protocol design and packet details
 */
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
@@ -69,6 +69,11 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        char *msg;
        int msglen, len;
 
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
+               return;
+       }
+
        va_start(ap, fmt);
        msg = talloc_vasprintf(ctdb, fmt, ap);
        if (msg == NULL) {
@@ -141,7 +146,12 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        header->dmaster = new_dmaster;
        ret = ctdb_ltdb_store(ctdb_db, key, header, data);
        if (ret != 0) {
-               ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+               ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
+               return;
+       }
+
+       if (ctdb->methods == NULL) {
+               ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
                return;
        }
 
@@ -186,6 +196,11 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        int len;
        uint32_t lmaster = ctdb_lmaster(ctdb, key);
 
+       if (ctdb->methods == NULL) {
+               ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
+               return;
+       }
+
        if (lmaster == ctdb->pnn) {
                ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
                                        c->hdr.srcnode, c->hdr.reqid);
@@ -230,8 +245,9 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        struct ctdb_call_state *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct ctdb_ltdb_header header;
+       int ret;
 
-       DEBUG(DEBUG_INFO,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
+       DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
 
        ZERO_STRUCT(header);
        header.rsn = rsn + 1;
@@ -239,7 +255,11 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
 
        if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
                ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
@@ -248,20 +268,41 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
                         ctdb->pnn, hdr->reqid, hdr->srcnode));
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+               return;
+       }
+
+       if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
+               DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
        if (hdr->reqid != state->reqid) {
                /* we found a record  but it was the wrong one */
                DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
-       ctdb_call_local(ctdb_db, &state->call, &header, state, &data, ctdb->pnn);
+       ctdb_call_local(ctdb_db, state->call, &header, state, &data);
 
-       ctdb_ltdb_unlock(ctdb_db, state->call.key);
+       ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        state->state = CTDB_CALL_DONE;
        if (state->async.fn) {
@@ -317,7 +358,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
        }
 
-       DEBUG(DEBUG_INFO,("pnn %u dmaster request on %08x for %u from %u\n", 
+       DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
                 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
 
        /* its a protocol error if the sending node is not the current dmaster */
@@ -349,7 +390,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
        } else {
                ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
        }
 }
 
@@ -364,9 +409,15 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_reply_call *r;
        int ret, len;
        struct ctdb_ltdb_header header;
-       struct ctdb_call call;
+       struct ctdb_call *call;
        struct ctdb_db_context *ctdb_db;
 
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
+               return;
+       }
+
+
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
                ctdb_send_error(ctdb, hdr, -1,
@@ -375,17 +426,20 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-       call.call_id  = c->callid;
-       call.key.dptr = c->data;
-       call.key.dsize = c->keylen;
-       call.call_data.dptr = c->data + c->keylen;
-       call.call_data.dsize = c->calldatalen;
+       call = talloc(hdr, struct ctdb_call);
+       CTDB_NO_MEMORY_FATAL(ctdb, call);
+
+       call->call_id  = c->callid;
+       call->key.dptr = c->data;
+       call->key.dsize = c->keylen;
+       call->call_data.dptr = c->data + c->keylen;
+       call->call_data.dsize = c->calldatalen;
 
        /* determine if we are the dmaster for this key. This also
           fetches the record data (if any), thus avoiding a 2nd fetch of the data 
           if the call will be answered locally */
 
-       ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call.key, &header, hdr, &data,
+       ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
                                           ctdb_call_input_pkt, ctdb, False);
        if (ret == -1) {
                ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
@@ -400,45 +454,57 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
           requesting node */
        if (header.dmaster != ctdb->pnn) {
                talloc_free(data.dptr);
-               ctdb_call_send_redirect(ctdb, call.key, c, &header);
-               ctdb_ltdb_unlock(ctdb_db, call.key);
-               return;
-       }
+               ctdb_call_send_redirect(ctdb, call->key, c, &header);
 
-       if (c->hopcount > ctdb->statistics.max_hop_count) {
-               ctdb->statistics.max_hop_count = c->hopcount;
+               ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+               return;
        }
 
-       /* if this nodes has done enough consecutive calls on the same record
-          then give them the record
-          or if the node requested an immediate migration
-       */
-       if ( c->hdr.srcnode != ctdb->pnn &&
-            ((header.laccessor == c->hdr.srcnode
-              && header.lacount >= ctdb->tunable.max_lacount)
-             || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
-               DEBUG(DEBUG_INFO,("pnn %u starting migration of %08x to %u\n", 
-                        ctdb->pnn, ctdb_hash(&call.key), c->hdr.srcnode));
-               ctdb_call_send_dmaster(ctdb_db, c, &header, &call.key, &data);
-               talloc_free(data.dptr);
-               ctdb_ltdb_unlock(ctdb_db, call.key);
-               return;
+       CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
+
+       /* Try if possible to migrate the record off to the caller node.
+        * From the clients perspective a fetch of the data is just as 
+        * expensive as a migration.
+        */
+       if (c->hdr.srcnode != ctdb->pnn) {
+               if (ctdb_db->transaction_active) {
+                       DEBUG(DEBUG_INFO, (__location__ " refusing migration"
+                             " of key %s while transaction is active\n",
+                             (char *)call->key.dptr));
+               } else {
+                       DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
+                                ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
+                       ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
+                       talloc_free(data.dptr);
+
+                       ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
        }
 
-       ctdb_call_local(ctdb_db, &call, &header, hdr, &data, c->hdr.srcnode);
+       ctdb_call_local(ctdb_db, call, &header, hdr, &data);
 
-       ctdb_ltdb_unlock(ctdb_db, call.key);
+       ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
-       len = offsetof(struct ctdb_reply_call, data) + call.reply_data.dsize;
+       len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
                                    struct ctdb_reply_call);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = hdr->srcnode;
        r->hdr.reqid     = hdr->reqid;
-       r->status        = call.status;
-       r->datalen       = call.reply_data.dsize;
-       if (call.reply_data.dsize) {
-               memcpy(&r->data[0], call.reply_data.dptr, call.reply_data.dsize);
+       r->status        = call->status;
+       r->datalen       = call->reply_data.dsize;
+       if (call->reply_data.dsize) {
+               memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
        }
 
        ctdb_queue_packet(ctdb, &r->hdr);
@@ -469,9 +535,9 @@ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-       state->call.reply_data.dptr = c->data;
-       state->call.reply_data.dsize = c->datalen;
-       state->call.status = c->status;
+       state->call->reply_data.dptr = c->data;
+       state->call->reply_data.dsize = c->datalen;
+       state->call->status = c->status;
 
        talloc_steal(state, c);
 
@@ -633,10 +699,12 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
        talloc_steal(state, data->dptr);
 
        state->state = CTDB_CALL_DONE;
-       state->call = *call;
+       state->call  = talloc(state, struct ctdb_call);
+       CTDB_NO_MEMORY_NULL(ctdb, state->call);
+       *(state->call) = *call;
        state->ctdb_db = ctdb_db;
 
-       ret = ctdb_call_local(ctdb_db, &state->call, header, state, data, ctdb->pnn);
+       ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
 
        event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
 
@@ -658,8 +726,16 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        struct ctdb_call_state *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
+               return NULL;
+       }
+
        state = talloc_zero(ctdb_db, struct ctdb_call_state);
        CTDB_NO_MEMORY_NULL(ctdb, state);
+       state->call = talloc(state, struct ctdb_call);
+       CTDB_NO_MEMORY_NULL(ctdb, state->call);
+
        state->reqid = ctdb_reqid_new(ctdb, state);
        state->ctdb_db = ctdb_db;
        talloc_set_destructor(state, ctdb_call_destructor);
@@ -681,9 +757,9 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
        memcpy(&state->c->data[call->key.dsize], 
               call->call_data.dptr, call->call_data.dsize);
-       state->call                = *call;
-       state->call.call_data.dptr = &state->c->data[call->key.dsize];
-       state->call.key.dptr       = &state->c->data[0];
+       *(state->call)              = *call;
+       state->call->call_data.dptr = &state->c->data[call->key.dsize];
+       state->call->key.dptr       = &state->c->data[0];
 
        state->state  = CTDB_CALL_WAIT;
        state->generation = ctdb->vnn_map->generation;
@@ -712,16 +788,16 @@ int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
                return -1;
        }
 
-       if (state->call.reply_data.dsize) {
-               call->reply_data.dptr = talloc_memdup(state->ctdb_db->ctdb,
-                                                     state->call.reply_data.dptr,
-                                                     state->call.reply_data.dsize);
-               call->reply_data.dsize = state->call.reply_data.dsize;
+       if (state->call->reply_data.dsize) {
+               call->reply_data.dptr = talloc_memdup(call,
+                                                     state->call->reply_data.dptr,
+                                                     state->call->reply_data.dsize);
+               call->reply_data.dsize = state->call->reply_data.dsize;
        } else {
                call->reply_data.dptr = NULL;
                call->reply_data.dsize = 0;
        }
-       call->status = state->call.status;
+       call->status = state->call->status;
        talloc_free(state);
        return 0;
 }
@@ -734,6 +810,11 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
 {
        struct ctdb_req_keepalive *r;
        
+       if (ctdb->methods == NULL) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
+               return;
+       }
+
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
                                    sizeof(struct ctdb_req_keepalive), 
                                    struct ctdb_req_keepalive);
@@ -741,7 +822,7 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
        r->hdr.destnode  = destnode;
        r->hdr.reqid     = 0;
        
-       ctdb->statistics.keepalive_packets_sent++;
+       CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
 
        ctdb_queue_packet(ctdb, &r->hdr);