ctdb-daemon: Add tracking of migration records
[samba.git] / ctdb / server / ctdb_call.c
index a6fd85e623ae99b0477dbb955a5b35f647fe7822..ed943f91b3cbad9a679a5e6736089b0b4da1103f 100644 (file)
   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
   protocol design and packet details
 */
-#include "includes.h"
-#include "tdb.h"
-#include "lib/util/dlinklist.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+#include "lib/util/sys_rw.h"
+#include "lib/util/util_process.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/rb_tree.h"
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
+#include "common/hash_count.h"
 
 struct ctdb_sticky_record {
        struct ctdb_context *ctdb;
@@ -70,7 +85,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
                            const char *fmt, ...)
 {
        va_list ap;
-       struct ctdb_reply_error *r;
+       struct ctdb_reply_error_old *r;
        char *msg;
        int msglen, len;
 
@@ -87,9 +102,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
        va_end(ap);
 
        msglen = strlen(msg)+1;
-       len = offsetof(struct ctdb_reply_error, msg);
+       len = offsetof(struct ctdb_reply_error_old, msg);
        r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, 
-                                   struct ctdb_reply_error);
+                                   struct ctdb_reply_error_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
 
        r->hdr.destnode  = hdr->srcnode;
@@ -120,7 +135,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
                                    struct ctdb_db_context *ctdb_db,
                                    TDB_DATA key,
-                                   struct ctdb_req_call *c, 
+                                   struct ctdb_req_call_old *c, 
                                    struct ctdb_ltdb_header *header)
 {
        uint32_t lmaster = ctdb_lmaster(ctdb, &key);
@@ -157,7 +172,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
                                    uint32_t reqid)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
-       struct ctdb_reply_dmaster *r;
+       struct ctdb_reply_dmaster_old *r;
        int ret, len;
        TALLOC_CTX *tmp_ctx;
 
@@ -183,13 +198,14 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        tmp_ctx = talloc_new(ctdb);
 
        /* send the CTDB_REPLY_DMASTER */
-       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
+       len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
        r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
-                                   struct ctdb_reply_dmaster);
+                                   struct ctdb_reply_dmaster_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
 
        r->hdr.destnode  = new_dmaster;
        r->hdr.reqid     = reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->rsn           = header->rsn;
        r->keylen        = key.dsize;
        r->datalen       = data.dsize;
@@ -211,11 +227,11 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
   CTDB_REPLY_DMASTER to the new dmaster
 */
 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, 
-                                  struct ctdb_req_call *c, 
+                                  struct ctdb_req_call_old *c, 
                                   struct ctdb_ltdb_header *header,
                                   TDB_DATA *key, TDB_DATA *data)
 {
-       struct ctdb_req_dmaster *r;
+       struct ctdb_req_dmaster_old *r;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        int len;
        uint32_t lmaster = ctdb_lmaster(ctdb, key);
@@ -235,13 +251,14 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
                return;
        }
        
-       len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
+       len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
                        + sizeof(uint32_t);
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
-                                   struct ctdb_req_dmaster);
+                                   struct ctdb_req_dmaster_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = lmaster;
        r->hdr.reqid     = c->hdr.reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->db_id         = c->db_id;
        r->rsn           = header->rsn;
        r->dmaster       = c->hdr.srcnode;
@@ -261,8 +278,9 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        talloc_free(r);
 }
 
-static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, 
-                                      struct timeval t, void *private_data)
+static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval t, void *private_data)
 {
        struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
                                                       struct ctdb_sticky_record);
@@ -281,16 +299,13 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        uint32_t *k;
        struct ctdb_sticky_record *sr;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr == NULL) {
                talloc_free(tmp_ctx);
@@ -306,7 +321,10 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
                        DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
                        return -1;
                }
-               event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
+               tevent_add_timer(ctdb->ev, sr->pindown,
+                                timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
+                                                    (ctdb->tunable.sticky_pindown * 1000) % 1000000),
+                                ctdb_sticky_pindown_timeout, sr);
        }
 
        return 0;
@@ -335,7 +353,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        header.dmaster = ctdb->pnn;
        header.flags = record_flags;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
 
        if (state) {
                if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
@@ -399,7 +417,9 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
                return;
        }
 
-       ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
+       (void) hash_count_increment(ctdb_db->migratedb, key);
+
+       ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
 
        ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
        if (ret != 0) {
@@ -412,7 +432,147 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
        }
 }
 
+struct dmaster_defer_call {
+       struct dmaster_defer_call *next, *prev;
+       struct ctdb_context *ctdb;
+       struct ctdb_req_header *hdr;
+};
+
+struct dmaster_defer_queue {
+       struct ctdb_db_context *ctdb_db;
+       uint32_t generation;
+       struct dmaster_defer_call *deferred_calls;
+};
+
+static void dmaster_defer_reprocess(struct tevent_context *ev,
+                                   struct tevent_timer *te,
+                                   struct timeval t,
+                                   void *private_data)
+{
+       struct dmaster_defer_call *call = talloc_get_type(
+               private_data, struct dmaster_defer_call);
+
+       ctdb_input_pkt(call->ctdb, call->hdr);
+       talloc_free(call);
+}
+
+static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
+{
+       /* Ignore requests, if database recovery happens in-between. */
+       if (ddq->generation != ddq->ctdb_db->generation) {
+               return 0;
+       }
+
+       while (ddq->deferred_calls != NULL) {
+               struct dmaster_defer_call *call = ddq->deferred_calls;
+
+               DLIST_REMOVE(ddq->deferred_calls, call);
+
+               talloc_steal(call->ctdb, call);
+               tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
+                                dmaster_defer_reprocess, call);
+       }
+       return 0;
+}
+
+static void *insert_ddq_callback(void *parm, void *data)
+{
+       if (data) {
+               talloc_free(data);
+       }
+       return parm;
+}
+
+/**
+ * This function is used to reigster a key in database that needs to be updated.
+ * Any requests for that key should get deferred till this is completed.
+ */
+static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
+                              struct ctdb_req_header *hdr,
+                              TDB_DATA key)
+{
+       uint32_t *k;
+       struct dmaster_defer_queue *ddq;
+
+       k = ctdb_key_to_idkey(hdr, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
+               return -1;
+       }
+
+       /* Already exists */
+       ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+       if (ddq != NULL) {
+               if (ddq->generation == ctdb_db->generation) {
+                       talloc_free(k);
+                       return 0;
+               }
+
+               /* Recovery ocurred - get rid of old queue. All the deferred
+                * requests will be resent anyway from ctdb_call_resend_db.
+                */
+               talloc_free(ddq);
+       }
+
+       ddq = talloc(hdr, struct dmaster_defer_queue);
+       if (ddq == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
+               talloc_free(k);
+               return -1;
+       }
+       ddq->ctdb_db = ctdb_db;
+       ddq->generation = hdr->generation;
+       ddq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
+                                   insert_ddq_callback, ddq);
+       talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
+
+       talloc_free(k);
+       return 0;
+}
+
+static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
+                            struct ctdb_req_header *hdr,
+                            TDB_DATA key)
+{
+       struct dmaster_defer_queue *ddq;
+       struct dmaster_defer_call *call;
+       uint32_t *k;
+
+       k = ctdb_key_to_idkey(hdr, key);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
+               return -1;
+       }
+
+       ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
+       if (ddq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+       talloc_free(k);
+
+       if (ddq->generation != hdr->generation) {
+               talloc_set_destructor(ddq, NULL);
+               talloc_free(ddq);
+               return -1;
+       }
+
+       call = talloc(ddq, struct dmaster_defer_call);
+       if (call == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
+               return -1;
+       }
+
+       call->ctdb = ctdb_db->ctdb;
+       call->hdr = talloc_steal(call, hdr);
 
+       DLIST_ADD_END(ddq->deferred_calls, call);
+
+       return 0;
+}
 
 /*
   called when a CTDB_REQ_DMASTER packet comes in
@@ -422,7 +582,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
 */
 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
+       struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
        TDB_DATA key, data, data2;
        struct ctdb_ltdb_header header;
        struct ctdb_db_context *ctdb_db;
@@ -434,10 +594,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
        data.dsize = c->datalen;
-       len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
+       len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
                        + sizeof(uint32_t);
        if (len <= c->hdr.length) {
-               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+               memcpy(&record_flags, &c->data[c->keylen + c->datalen],
+                      sizeof(record_flags));
        }
 
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
@@ -447,7 +608,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
                                c->db_id);
                return;
        }
-       
+
+       dmaster_defer_setup(ctdb_db, hdr, key);
+
        /* fetch the current record */
        ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
                                           ctdb_call_input_pkt, ctdb, false);
@@ -461,9 +624,10 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        }
 
        if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
-               DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
-                        ctdb->pnn, ctdb_lmaster(ctdb, &key), 
-                        hdr->generation, ctdb->vnn_map->generation));
+               DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
+                                 "db=%s lmaster=%u gen=%u curgen=%u\n",
+                                 ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
+                                 hdr->generation, ctdb_db->generation));
                ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
        }
 
@@ -513,7 +677,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        }
 }
 
-static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, 
+static void ctdb_sticky_record_timeout(struct tevent_context *ev,
+                                      struct tevent_timer *te,
                                       struct timeval t, void *private_data)
 {
        struct ctdb_sticky_record *sr = talloc_get_type(private_data, 
@@ -537,16 +702,13 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
        uint32_t *k;
        struct ctdb_sticky_record *sr;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr != NULL) {
                talloc_free(tmp_ctx);
@@ -570,7 +732,9 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_
 
        trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
 
-       event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
+       tevent_add_timer(ctdb->ev, sr,
+                        timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
+                        ctdb_sticky_record_timeout, sr);
 
        talloc_free(tmp_ctx);
        return 0;
@@ -586,8 +750,9 @@ struct pinned_down_deferred_call {
        struct ctdb_req_header *hdr;
 };
 
-static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void pinned_down_requeue(struct tevent_context *ev,
+                               struct tevent_timer *te,
+                               struct timeval t, void *private_data)
 {
        struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
        struct ctdb_context *ctdb = handle->ctdb;
@@ -607,7 +772,8 @@ static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
        handle->hdr  = pinned_down->hdr;
        talloc_steal(handle, handle->hdr);
 
-       event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
+       tevent_add_timer(ctdb->ev, handle, timeval_zero(),
+                        pinned_down_requeue, handle);
 
        return 0;
 }
@@ -620,16 +786,13 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context
        struct ctdb_sticky_record *sr;
        struct pinned_down_deferred_call *pinned_down;
 
-       k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
+       k = ctdb_key_to_idkey(tmp_ctx, key);
        if (k == NULL) {
                DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       k[0] = (key.dsize + 3) / 4 + 1;
-       memcpy(&k[1], key.dptr, key.dsize);
-
        sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
        if (sr == NULL) {
                talloc_free(tmp_ctx);
@@ -658,12 +821,13 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context
 }
 
 static void
-ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
+ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
+                            int count)
 {
        int i, id;
 
        /* smallest value is always at index 0 */
-       if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
+       if (count <= ctdb_db->statistics.hot_keys[0].count) {
                return;
        }
 
@@ -676,10 +840,10 @@ ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int
                        continue;
                }
                /* found an entry for this key */
-               if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
+               if (count <= ctdb_db->statistics.hot_keys[i].count) {
                        return;
                }
-               ctdb_db->statistics.hot_keys[i].count = hopcount;
+               ctdb_db->statistics.hot_keys[i].count = count;
                goto sort_keys;
        }
 
@@ -695,7 +859,10 @@ ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int
        }
        ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
        ctdb_db->statistics.hot_keys[id].key.dptr  = talloc_memdup(ctdb_db, key.dptr, key.dsize);
-       ctdb_db->statistics.hot_keys[id].count = hopcount;
+       ctdb_db->statistics.hot_keys[id].count = count;
+       DEBUG(DEBUG_NOTICE,
+             ("Updated hot key database=%s key=0x%08x id=%d count=%d\n",
+              ctdb_db->db_name, ctdb_hash(&key), id, count));
 
 sort_keys:
        for (i = 1; i < MAX_HOT_KEYS; i++) {
@@ -703,9 +870,9 @@ sort_keys:
                        continue;
                }
                if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
-                       hopcount = ctdb_db->statistics.hot_keys[i].count;
+                       count = ctdb_db->statistics.hot_keys[i].count;
                        ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
-                       ctdb_db->statistics.hot_keys[0].count = hopcount;
+                       ctdb_db->statistics.hot_keys[0].count = count;
 
                        key = ctdb_db->statistics.hot_keys[i].key;
                        ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
@@ -719,9 +886,9 @@ sort_keys:
 */
 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+       struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
        TDB_DATA data;
-       struct ctdb_reply_call *r;
+       struct ctdb_reply_call_old *r;
        int ret, len;
        struct ctdb_ltdb_header header;
        struct ctdb_call *call;
@@ -759,11 +926,17 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        */
        if (ctdb_db->sticky) {
                if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
-                 DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
+                       DEBUG(DEBUG_WARNING,
+                             ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
+                       talloc_free(call);
                        return;
                }
        }
 
+       if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
+               talloc_free(call);
+               return;
+       }
 
        /* determine if we are the dmaster for this key. This also
           fetches the record data (if any), thus avoiding a 2nd fetch of the data 
@@ -773,14 +946,16 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                                           ctdb_call_input_pkt, ctdb, false);
        if (ret == -1) {
                ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
+               talloc_free(call);
                return;
        }
        if (ret == -2) {
                DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
+               talloc_free(call);
                return;
        }
 
-       /* Dont do READONLY if we dont have a tracking database */
+       /* Dont do READONLY if we don't have a tracking database */
        if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
                c->flags &= ~CTDB_WANT_READONLY;
        }
@@ -826,6 +1001,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                if (ret != 0) {
                        DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
+               talloc_free(call);
                return;
        }
 
@@ -880,12 +1056,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
 
-               len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
+               len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
                r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
-                                           struct ctdb_reply_call);
+                                           struct ctdb_reply_call_old);
                CTDB_NO_MEMORY_FATAL(ctdb, r);
                r->hdr.destnode  = c->hdr.srcnode;
                r->hdr.reqid     = c->hdr.reqid;
+               r->hdr.generation = ctdb_db->generation;
                r->status        = 0;
                r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
                header.rsn      -= 2;
@@ -902,6 +1079,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
 
                talloc_free(r);
+               talloc_free(call);
                return;
        }
 
@@ -917,7 +1095,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        }
        CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
        CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
-       ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
 
        /* If this database supports sticky records, then check if the
           hopcount is big. If it is it means the record is hot and we
@@ -928,16 +1105,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        }
 
 
-       /* if this nodes has done enough consecutive calls on the same record
-          then give them the record
-          or if the node requested an immediate migration
-       */
-       if ( c->hdr.srcnode != ctdb->pnn &&
-            ((header.laccessor == c->hdr.srcnode
-              && header.lacount >= ctdb->tunable.max_lacount
-              && ctdb->tunable.max_lacount != 0)
-             || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
-               if (ctdb_db->transaction_active) {
+       /* Try if possible to migrate the record off to the caller node.
+        * From the clients perspective a fetch of the data is just as 
+        * expensive as a migration.
+        */
+       if (c->hdr.srcnode != ctdb->pnn) {
+               if (ctdb_db->persistent_state) {
                        DEBUG(DEBUG_INFO, (__location__ " refusing migration"
                              " of key %s while transaction is active\n",
                              (char *)call->key.dptr));
@@ -951,11 +1124,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                        if (ret != 0) {
                                DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                        }
-                       return;
                }
+               talloc_free(call);
+               return;
        }
 
-       ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
+       ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
                call->status = -1;
@@ -966,12 +1140,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
        }
 
-       len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
+       len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
-                                   struct ctdb_reply_call);
+                                   struct ctdb_reply_call_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = hdr->srcnode;
        r->hdr.reqid     = hdr->reqid;
+       r->hdr.generation = ctdb_db->generation;
        r->status        = call->status;
        r->datalen       = call->reply_data.dsize;
        if (call->reply_data.dsize) {
@@ -981,6 +1156,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        ctdb_queue_packet(ctdb, &r->hdr);
 
        talloc_free(r);
+       talloc_free(call);
 }
 
 /**
@@ -991,10 +1167,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
  */
 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+       struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
        struct ctdb_call_state *state;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
                return;
@@ -1079,16 +1255,16 @@ finished_ro:
 }
 
 
-/*
-  called when a CTDB_REPLY_DMASTER packet comes in
-
-  This packet comes in from the lmaster in response to a CTDB_REQ_CALL
-  request packet. It means that the current dmaster wants to give us
-  the dmaster role.
-*/
+/**
* called when a CTDB_REPLY_DMASTER packet comes in
+ *
* This packet comes in from the lmaster in response to a CTDB_REQ_CALL
* request packet. It means that the current dmaster wants to give us
* the dmaster role.
+ */
 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
+       struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
        struct ctdb_db_context *ctdb_db;
        TDB_DATA key, data;
        uint32_t record_flags = 0;
@@ -1105,12 +1281,15 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        key.dsize = c->keylen;
        data.dptr = &c->data[key.dsize];
        data.dsize = c->datalen;
-       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
+       len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
                + sizeof(uint32_t);
        if (len <= c->hdr.length) {
-               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+               memcpy(&record_flags, &c->data[c->keylen + c->datalen],
+                      sizeof(record_flags));
        }
 
+       dmaster_defer_setup(ctdb_db, hdr, key);
+
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
                                     ctdb_call_input_pkt, ctdb, false);
        if (ret == -2) {
@@ -1130,10 +1309,10 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 */
 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 {
-       struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
+       struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
        struct ctdb_call_state *state;
 
-       state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
+       state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
                         ctdb->pnn, hdr->reqid));
@@ -1161,8 +1340,8 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
 */
 static int ctdb_call_destructor(struct ctdb_call_state *state)
 {
-       DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
-       ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
+       DLIST_REMOVE(state->ctdb_db->pending_calls, state);
+       reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
        return 0;
 }
 
@@ -1174,11 +1353,11 @@ static void ctdb_call_resend(struct ctdb_call_state *state)
 {
        struct ctdb_context *ctdb = state->ctdb_db->ctdb;
 
-       state->generation = ctdb->vnn_map->generation;
+       state->generation = state->ctdb_db->generation;
 
        /* use a new reqid, in case the old reply does eventually come in */
-       ctdb_reqid_remove(ctdb, state->reqid);
-       state->reqid = ctdb_reqid_new(ctdb, state);
+       reqid_remove(ctdb->idr, state->reqid);
+       state->reqid = reqid_new(ctdb->idr, state);
        state->c->hdr.reqid = state->reqid;
 
        /* update the generation count for this request, so its valid with the new vnn_map */
@@ -1188,26 +1367,38 @@ static void ctdb_call_resend(struct ctdb_call_state *state)
        state->c->hdr.destnode = ctdb->pnn;
 
        ctdb_queue_packet(ctdb, &state->c->hdr);
-       DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
+       DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
+                           state->ctdb_db->db_name, state->reqid, state->generation));
 }
 
 /*
   resend all pending calls on recovery
  */
-void ctdb_call_resend_all(struct ctdb_context *ctdb)
+void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
 {
        struct ctdb_call_state *state, *next;
-       for (state=ctdb->pending_calls;state;state=next) {
+
+       for (state = ctdb_db->pending_calls; state; state = next) {
                next = state->next;
                ctdb_call_resend(state);
        }
 }
 
+void ctdb_call_resend_all(struct ctdb_context *ctdb)
+{
+       struct ctdb_db_context *ctdb_db;
+
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               ctdb_call_resend_db(ctdb_db);
+       }
+}
+
 /*
   this allows the caller to setup a async.fn 
 */
-static void call_local_trigger(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void call_local_trigger(struct tevent_context *ev,
+                              struct tevent_timer *te,
+                              struct timeval t, void *private_data)
 {
        struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
        if (state->async.fn) {
@@ -1242,12 +1433,13 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
        *(state->call) = *call;
        state->ctdb_db = ctdb_db;
 
-       ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
+       ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
        if (ret != 0) {
                DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
        }
 
-       event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
+       tevent_add_timer(ctdb->ev, state, timeval_zero(),
+                        call_local_trigger, state);
 
        return state;
 }
@@ -1277,18 +1469,19 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        state->call = talloc(state, struct ctdb_call);
        CTDB_NO_MEMORY_NULL(ctdb, state->call);
 
-       state->reqid = ctdb_reqid_new(ctdb, state);
+       state->reqid = reqid_new(ctdb->idr, state);
        state->ctdb_db = ctdb_db;
        talloc_set_destructor(state, ctdb_call_destructor);
 
-       len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
+       len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
        state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, 
-                                          struct ctdb_req_call);
+                                          struct ctdb_req_call_old);
        CTDB_NO_MEMORY_NULL(ctdb, state->c);
        state->c->hdr.destnode  = header->dmaster;
 
        /* this limits us to 16k outstanding messages - not unreasonable */
        state->c->hdr.reqid     = state->reqid;
+       state->c->hdr.generation = ctdb_db->generation;
        state->c->flags         = call->flags;
        state->c->db_id         = ctdb_db->db_id;
        state->c->callid        = call->call_id;
@@ -1303,9 +1496,9 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
        state->call->key.dptr       = &state->c->data[0];
 
        state->state  = CTDB_CALL_WAIT;
-       state->generation = ctdb->vnn_map->generation;
+       state->generation = ctdb_db->generation;
 
-       DLIST_ADD(ctdb->pending_calls, state);
+       DLIST_ADD(ctdb_db->pending_calls, state);
 
        ctdb_queue_packet(ctdb, &state->c->hdr);
 
@@ -1321,7 +1514,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
 {
        while (state->state < CTDB_CALL_DONE) {
-               event_loop_once(state->ctdb_db->ctdb->ev);
+               tevent_loop_once(state->ctdb_db->ctdb->ev);
        }
        if (state->state != CTDB_CALL_DONE) {
                ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
@@ -1349,7 +1542,7 @@ int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
 */
 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
 {
-       struct ctdb_req_keepalive *r;
+       struct ctdb_req_keepalive_old *r;
        
        if (ctdb->methods == NULL) {
                DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
@@ -1357,8 +1550,8 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
        }
 
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
-                                   sizeof(struct ctdb_req_keepalive), 
-                                   struct ctdb_req_keepalive);
+                                   sizeof(struct ctdb_req_keepalive_old), 
+                                   struct ctdb_req_keepalive_old);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
        r->hdr.destnode  = destnode;
        r->hdr.reqid     = 0;
@@ -1383,7 +1576,7 @@ struct revokechild_handle {
        struct revokechild_handle *next, *prev;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
-       struct fd_event *fde;
+       struct tevent_fd *fde;
        int status;
        int fd[2];
        pid_t child;
@@ -1397,8 +1590,9 @@ struct revokechild_requeue_handle {
        void *ctx;
 };
 
-static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
-                      struct timeval t, void *private_data)
+static void deferred_call_requeue(struct tevent_context *ev,
+                                 struct tevent_timer *te,
+                                 struct timeval t, void *private_data)
 {
        struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
 
@@ -1410,7 +1604,6 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c
 {
        struct ctdb_context *ctdb = deferred_call->ctdb;
        struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
-       struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
 
        requeue_handle->ctdb = ctdb;
        requeue_handle->hdr  = deferred_call->hdr;
@@ -1418,8 +1611,13 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c
        requeue_handle->ctx  = deferred_call->ctx;
        talloc_steal(requeue_handle, requeue_handle->hdr);
 
-       /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
-       event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
+       /* Always delay revoke requests.  Either wait for the read/write
+        * operation to complete, or if revoking failed wait for recovery to
+        * complete
+        */
+       tevent_add_timer(ctdb->ev, requeue_handle,
+                        timeval_current_ofs(1, 0),
+                        deferred_call_requeue, requeue_handle);
 
        return 0;
 }
@@ -1443,15 +1641,16 @@ static int revokechild_destructor(struct revokechild_handle *rc)
        return 0;
 }
 
-static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
-                            uint16_t flags, void *private_data)
+static void revokechild_handler(struct tevent_context *ev,
+                               struct tevent_fd *fde,
+                               uint16_t flags, void *private_data)
 {
        struct revokechild_handle *rc = talloc_get_type(private_data, 
                                                     struct revokechild_handle);
        int ret;
        char c;
 
-       ret = read(rc->fd[0], &c, 1);
+       ret = sys_read(rc->fd[0], &c, 1);
        if (ret != 1) {
                DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
                rc->status = -1;
@@ -1507,7 +1706,7 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
        struct ctdb_revoke_state *revoke_state = private_data;
        struct ctdb_client_control_state *state;
 
-       state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
+       state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
        if (state == NULL) {
                DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
                revoke_state->status = -1;
@@ -1520,8 +1719,9 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
 
 }
 
-static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
-                             struct timeval yt, void *private_data)
+static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
+                                       struct tevent_timer *te,
+                                       struct timeval yt, void *private_data)
 {
        struct ctdb_revoke_state *state = private_data;
 
@@ -1533,7 +1733,8 @@ static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_e
 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
 {
        struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
-       int status;
+       struct ctdb_ltdb_header new_header;
+       TDB_DATA new_data;
 
        state->ctdb_db = ctdb_db;
        state->key     = key;
@@ -1542,55 +1743,60 @@ static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db
  
        ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
 
-       event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
+       tevent_add_timer(ctdb->ev, state,
+                        timeval_current_ofs(ctdb->tunable.control_timeout, 0),
+                        ctdb_revoke_timeout_handler, state);
 
        while (state->finished == 0) {
-               event_loop_once(ctdb->ev);
+               tevent_loop_once(ctdb->ev);
        }
 
-       status = state->status;
-
-       if (status == 0) {
-               struct ctdb_ltdb_header new_header;
-               TDB_DATA new_data;
+       if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       header->rsn++;
+       if (new_header.rsn > header->rsn) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
+       if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
+               talloc_free(state);
+               return -1;
+       }
 
-               if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
-                       DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               header->rsn++;
-               if (new_header.rsn > header->rsn) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
-               if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
+       /*
+        * If revoke on all nodes succeed, revoke is complete.  Otherwise,
+        * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
+        */
+       if (state->status == 0) {
                new_header.rsn++;
                new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
-               if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
-                       DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
-                       talloc_free(state);
-                       return -1;
-               }
+       } else {
+               DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
+               new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
+       }
+       if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
                ctdb_ltdb_unlock(ctdb_db, key);
+               DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
+               talloc_free(state);
+               return -1;
        }
+       ctdb_ltdb_unlock(ctdb_db, key);
 
        talloc_free(state);
-       return status;
+       return 0;
 }
 
 
@@ -1653,10 +1859,9 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex
        if (rc->child == 0) {
                char c = 0;
                close(rc->fd[0]);
-               debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
 
-               ctdb_set_process_name("ctdb_revokechild");
-               if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
+               prctl_set_comment("ctdb_revokechild");
+               if (switch_from_server_to_client(ctdb) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
                        c = 1;
                        goto child_finished;
@@ -1665,11 +1870,8 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex
                c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
 
 child_finished:
-               write(rc->fd[1], &c, 1);
-               /* make sure we die when our parent dies */
-               while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
-                       sleep(5);
-               }
+               sys_write(rc->fd[1], &c, 1);
+               ctdb_wait_for_process_to_exit(parent);
                _exit(0);
        }
 
@@ -1678,11 +1880,10 @@ child_finished:
        set_close_on_exec(rc->fd[0]);
 
        /* This is an active revokechild child process */
-       DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
+       DLIST_ADD_END(ctdb_db->revokechild_active, rc);
 
-       rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
-                                  EVENT_FD_READ, revokechild_handler,
-                                  (void *)rc);
+       rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
+                               revokechild_handler, (void *)rc);
        if (rc->fde == NULL) {
                DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
                talloc_free(rc);
@@ -1730,3 +1931,73 @@ int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_cont
 
        return 0;
 }
+
+static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
+                                        void *private_data)
+{
+       struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
+               private_data, struct ctdb_db_context);
+       int value;
+
+       value = (counter < INT_MAX ? counter : INT_MAX);
+       ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
+}
+
+static void ctdb_migration_cleandb_event(struct tevent_context *ev,
+                                        struct tevent_timer *te,
+                                        struct timeval current_time,
+                                        void *private_data)
+{
+       struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
+               private_data, struct ctdb_db_context);
+
+       if (ctdb_db->migratedb == NULL) {
+               return;
+       }
+
+       hash_count_expire(ctdb_db->migratedb, NULL);
+
+       te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
+                             tevent_timeval_current_ofs(10, 0),
+                             ctdb_migration_cleandb_event, ctdb_db);
+       if (te == NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Memory error in migration cleandb event for %s\n",
+                      ctdb_db->db_name));
+               TALLOC_FREE(ctdb_db->migratedb);
+       }
+}
+
+int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
+{
+       struct timeval one_second = { 1, 0 };
+       struct tevent_timer *te;
+       int ret;
+
+       if (ctdb_db->persistent) {
+               return 0;
+       }
+
+       ret = hash_count_init(ctdb_db, one_second,
+                             ctdb_migration_count_handler, ctdb_db,
+                             &ctdb_db->migratedb);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Memory error in migration init for %s\n",
+                      ctdb_db->db_name));
+               return -1;
+       }
+
+       te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
+                             tevent_timeval_current_ofs(10, 0),
+                             ctdb_migration_cleandb_event, ctdb_db);
+       if (te == NULL) {
+               DEBUG(DEBUG_ERR,
+                     ("Memory error in migration init for %s\n",
+                      ctdb_db->db_name));
+               TALLOC_FREE(ctdb_db->migratedb);
+               return -1;
+       }
+
+       return 0;
+}