Add back la-count based migration
[ctdb.git] / server / ctdb_call.c
index b666a9ed1bc80d58d4467578e9bab457743c29f6..c6065710bec24552a68190dc085540992ee353ae 100644 (file)
@@ -21,7 +21,7 @@
   protocol design and packet details
 */
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
@@ -70,7 +70,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        int msglen, len;
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to send error. Transport is DOWN\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
                return;
        }
 
@@ -99,9 +99,30 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
 }
 
 
-/*
-  send a redirect reply
-*/
+/**
+ * send a redirect reply
+ *
+ * The logic behind this function is this:
+ *
+ * A client wants to grab a record and sends a CTDB_REQ_CALL packet
+ * to its local ctdb (ctdb_request_call). If the node is not itself
+ * the record's DMASTER, it first redirects the packet to  the
+ * record's LMASTER. The LMASTER then redirects the call packet to
+ * the current DMASTER. But there is a race: The record may have
+ * been migrated off the DMASTER while the redirected packet is
+ * on the wire (or in the local queue). So in case the record has
+ * migrated off the new destinaton of the call packet, instead of
+ * going back to the LMASTER to get the new DMASTER, we try to
+ * reduce rountrips by fist chasing the record a couple of times
+ * before giving up the direct chase and finally going back to the
+ * LMASTER (again). Note that this works because auf this: When
+ * a record is migrated off a node, then the new DMASTER is stored
+ * in the record's copy on the former DMASTER.
+ *
+ * The maxiumum number of attempts for direct chase to make before
+ * going back to the LMASTER is configurable by the tunable
+ * "MaxRedirectCount".
+ */
 static void ctdb_call_send_redirect(struct ctdb_context *ctdb, 
                                    TDB_DATA key,
                                    struct ctdb_req_call *c, 
@@ -151,7 +172,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        }
 
        if (ctdb->methods == NULL) {
-               ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster sicne transport is down");
+               ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
                return;
        }
 
@@ -160,7 +181,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        tmp_ctx = talloc_new(ctdb);
 
        /* send the CTDB_REPLY_DMASTER */
-       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
+       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
        r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
                                    struct ctdb_reply_dmaster);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
@@ -173,6 +194,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        r->db_id         = ctdb_db->db_id;
        memcpy(&r->data[0], key.dptr, key.dsize);
        memcpy(&r->data[key.dsize], data.dptr, data.dsize);
+       memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
 
        ctdb_queue_packet(ctdb, &r->hdr);
 
@@ -201,13 +223,18 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
                return;
        }
 
+       if (data->dsize != 0) {
+               header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
+       }
+
        if (lmaster == ctdb->pnn) {
                ctdb_send_dmaster_reply(ctdb_db, header, *key, *data, 
                                        c->hdr.srcnode, c->hdr.reqid);
                return;
        }
        
-       len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
+       len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
+                       + sizeof(uint32_t);
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
                                    struct ctdb_req_dmaster);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
@@ -220,6 +247,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        r->datalen       = data->dsize;
        memcpy(&r->data[0], key->dptr, key->dsize);
        memcpy(&r->data[key->dsize], data->dptr, data->dsize);
+       memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
 
        header->dmaster = c->hdr.srcnode;
        if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
@@ -237,46 +265,86 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
 
   must be called with the chainlock held. This function releases the chainlock
 */
-static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, 
+static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
                                struct ctdb_req_header *hdr,
                                TDB_DATA key, TDB_DATA data,
-                               uint64_t rsn)
+                               uint64_t rsn, uint32_t record_flags)
 {
        struct ctdb_call_state *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct ctdb_ltdb_header header;
+       int ret;
 
-       DEBUG(DEBUG_INFO,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
+       DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
 
        ZERO_STRUCT(header);
        header.rsn = rsn + 1;
        header.dmaster = ctdb->pnn;
+       header.flags = record_flags;
+
+       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+
+       if (state) {
+               if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
+                       /*
+                        * We temporarily add the VACUUM_MIGRATED flag to
+                        * the record flags, so that ctdb_ltdb_store can
+                        * decide whether the record should be stored or
+                        * deleted.
+                        */
+                       header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
+               }
+       }
 
        if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
                ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
 
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
                         ctdb->pnn, hdr->reqid, hdr->srcnode));
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+               return;
+       }
+
+       if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
+               DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
        if (hdr->reqid != state->reqid) {
                /* we found a record  but it was the wrong one */
                DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
        ctdb_call_local(ctdb_db, state->call, &header, state, &data, ctdb->pnn);
 
-       ctdb_ltdb_unlock(ctdb_db, state->call->key);
+       ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        state->state = CTDB_CALL_DONE;
        if (state->async.fn) {
@@ -298,12 +366,19 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        TDB_DATA key, data, data2;
        struct ctdb_ltdb_header header;
        struct ctdb_db_context *ctdb_db;
+       uint32_t record_flags = 0;
+       size_t len;
        int ret;
 
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
        data.dsize = c->datalen;
+       len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
+                       + sizeof(uint32_t);
+       if (len <= c->hdr.length) {
+               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+       }
 
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
@@ -332,7 +407,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
        }
 
-       DEBUG(DEBUG_INFO,("pnn %u dmaster request on %08x for %u from %u\n", 
+       DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n", 
                 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
 
        /* its a protocol error if the sending node is not the current dmaster */
@@ -343,7 +418,10 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                         (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
                         (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
                if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
-                       ctdb_fatal(ctdb, "ctdb_req_dmaster from non-master");
+                       DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
+
+                       ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+                       ctdb_ltdb_unlock(ctdb_db, key);
                        return;
                }
        }
@@ -358,13 +436,20 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        /* use the rsn from the sending node */
        header.rsn = c->rsn;
 
+       /* store the record flags from the sending node */
+       header.flags = record_flags;
+
        /* check if the new dmaster is the lmaster, in which case we
           skip the dmaster reply */
        if (c->dmaster == ctdb->pnn) {
-               ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
+               ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
        } else {
                ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
-               ctdb_ltdb_unlock(ctdb_db, key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
        }
 }
 
@@ -383,7 +468,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_db_context *ctdb_db;
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
                return;
        }
 
@@ -425,13 +510,15 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        if (header.dmaster != ctdb->pnn) {
                talloc_free(data.dptr);
                ctdb_call_send_redirect(ctdb, call->key, c, &header);
-               ctdb_ltdb_unlock(ctdb_db, call->key);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
                return;
        }
 
-       if (c->hopcount > ctdb->statistics.max_hop_count) {
-               ctdb->statistics.max_hop_count = c->hopcount;
-       }
+       CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
 
        /* if this nodes has done enough consecutive calls on the same record
           then give them the record
@@ -439,19 +526,33 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        */
        if ( c->hdr.srcnode != ctdb->pnn &&
             ((header.laccessor == c->hdr.srcnode
-              && header.lacount >= ctdb->tunable.max_lacount)
+              && header.lacount >= ctdb->tunable.max_lacount
+              && ctdb->tunable.max_lacount != 0)
              || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
-               DEBUG(DEBUG_INFO,("pnn %u starting migration of %08x to %u\n", 
-                        ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
-               ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
-               talloc_free(data.dptr);
-               ctdb_ltdb_unlock(ctdb_db, call->key);
-               return;
+               if (ctdb_db->transaction_active) {
+                       DEBUG(DEBUG_INFO, (__location__ " refusing migration"
+                             " of key %s while transaction is active\n",
+                             (char *)call->key.dptr));
+               } else {
+                       DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
+                                ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
+                       ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
+                       talloc_free(data.dptr);
+
+                       ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
        }
 
        ctdb_call_local(ctdb_db, call, &header, hdr, &data, c->hdr.srcnode);
 
-       ctdb_ltdb_unlock(ctdb_db, call->key);
+       ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
@@ -518,6 +619,8 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
        struct ctdb_db_context *ctdb_db;
        TDB_DATA key, data;
+       uint32_t record_flags = 0;
+       size_t len;
        int ret;
 
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
@@ -530,6 +633,11 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        key.dsize = c->keylen;
        data.dptr = &c->data[key.dsize];
        data.dsize = c->datalen;
+       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
+               + sizeof(uint32_t);
+       if (len <= c->hdr.length) {
+               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+       }
 
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
                                     ctdb_call_input_pkt, ctdb, False);
@@ -541,7 +649,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-       ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
+       ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
 }
 
 
@@ -685,7 +793,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed send packet. Transport is down\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
                return NULL;
        }
 
@@ -769,7 +877,7 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
        struct ctdb_req_keepalive *r;
        
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
                return;
        }
 
@@ -780,7 +888,7 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
        r->hdr.destnode  = destnode;
        r->hdr.reqid     = 0;
        
-       ctdb->statistics.keepalive_packets_sent++;
+       CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
 
        ctdb_queue_packet(ctdb, &r->hdr);