X-Git-Url: http://git.samba.org/?a=blobdiff_plain;f=ctdb%2Fserver%2Fctdb_call.c;h=ed943f91b3cbad9a679a5e6736089b0b4da1103f;hb=5da471919d2b7ed45be574a8cb7c88351f894797;hp=a6fd85e623ae99b0477dbb955a5b35f647fe7822;hpb=621bfe8b0d49c45a5db566e8bc12b36122179088;p=samba.git diff --git a/ctdb/server/ctdb_call.c b/ctdb/server/ctdb_call.c index a6fd85e623a..ed943f91b3c 100644 --- a/ctdb/server/ctdb_call.c +++ b/ctdb/server/ctdb_call.c @@ -20,13 +20,28 @@ see http://wiki.samba.org/index.php/Samba_%26_Clustering for protocol design and packet details */ -#include "includes.h" -#include "tdb.h" -#include "lib/util/dlinklist.h" +#include "replace.h" #include "system/network.h" #include "system/filesys.h" -#include "../include/ctdb_private.h" -#include "../common/rb_tree.h" + +#include +#include + +#include "lib/util/dlinklist.h" +#include "lib/util/debug.h" +#include "lib/util/samba_util.h" +#include "lib/util/sys_rw.h" +#include "lib/util/util_process.h" + +#include "ctdb_private.h" +#include "ctdb_client.h" + +#include "common/rb_tree.h" +#include "common/reqid.h" +#include "common/system.h" +#include "common/common.h" +#include "common/logging.h" +#include "common/hash_count.h" struct ctdb_sticky_record { struct ctdb_context *ctdb; @@ -70,7 +85,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb, const char *fmt, ...) { va_list ap; - struct ctdb_reply_error *r; + struct ctdb_reply_error_old *r; char *msg; int msglen, len; @@ -87,9 +102,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb, va_end(ap); msglen = strlen(msg)+1; - len = offsetof(struct ctdb_reply_error, msg); + len = offsetof(struct ctdb_reply_error_old, msg); r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen, - struct ctdb_reply_error); + struct ctdb_reply_error_old); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = hdr->srcnode; @@ -120,7 +135,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb, static void ctdb_call_send_redirect(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, - struct ctdb_req_call *c, + struct ctdb_req_call_old *c, struct ctdb_ltdb_header *header) { uint32_t lmaster = ctdb_lmaster(ctdb, &key); @@ -157,7 +172,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db, uint32_t reqid) { struct ctdb_context *ctdb = ctdb_db->ctdb; - struct ctdb_reply_dmaster *r; + struct ctdb_reply_dmaster_old *r; int ret, len; TALLOC_CTX *tmp_ctx; @@ -183,13 +198,14 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db, tmp_ctx = talloc_new(ctdb); /* send the CTDB_REPLY_DMASTER */ - len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t); + len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t); r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len, - struct ctdb_reply_dmaster); + struct ctdb_reply_dmaster_old); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = new_dmaster; r->hdr.reqid = reqid; + r->hdr.generation = ctdb_db->generation; r->rsn = header->rsn; r->keylen = key.dsize; r->datalen = data.dsize; @@ -211,11 +227,11 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db, CTDB_REPLY_DMASTER to the new dmaster */ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, - struct ctdb_req_call *c, + struct ctdb_req_call_old *c, struct ctdb_ltdb_header *header, TDB_DATA *key, TDB_DATA *data) { - struct ctdb_req_dmaster *r; + struct ctdb_req_dmaster_old *r; struct ctdb_context *ctdb = ctdb_db->ctdb; int len; uint32_t lmaster = ctdb_lmaster(ctdb, key); @@ -235,13 +251,14 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, return; } - len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize + len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize + sizeof(uint32_t); r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, - struct ctdb_req_dmaster); + struct ctdb_req_dmaster_old); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = lmaster; r->hdr.reqid = c->hdr.reqid; + r->hdr.generation = ctdb_db->generation; r->db_id = c->db_id; r->rsn = header->rsn; r->dmaster = c->hdr.srcnode; @@ -261,8 +278,9 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db, talloc_free(r); } -static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) +static void ctdb_sticky_pindown_timeout(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, void *private_data) { struct ctdb_sticky_record *sr = talloc_get_type(private_data, struct ctdb_sticky_record); @@ -281,16 +299,13 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_ uint32_t *k; struct ctdb_sticky_record *sr; - k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4); + k = ctdb_key_to_idkey(tmp_ctx, key); if (k == NULL) { DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n")); talloc_free(tmp_ctx); return -1; } - k[0] = (key.dsize + 3) / 4 + 1; - memcpy(&k[1], key.dptr, key.dsize); - sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]); if (sr == NULL) { talloc_free(tmp_ctx); @@ -306,7 +321,10 @@ ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_ DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n")); return -1; } - event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr); + tevent_add_timer(ctdb->ev, sr->pindown, + timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, + (ctdb->tunable.sticky_pindown * 1000) % 1000000), + ctdb_sticky_pindown_timeout, sr); } return 0; @@ -335,7 +353,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, header.dmaster = ctdb->pnn; header.flags = record_flags; - state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); + state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state) { if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) { @@ -399,7 +417,9 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, return; } - ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn); + (void) hash_count_increment(ctdb_db->migratedb, key); + + ctdb_call_local(ctdb_db, state->call, &header, state, &data, true); ret = ctdb_ltdb_unlock(ctdb_db, state->call->key); if (ret != 0) { @@ -412,7 +432,147 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, } } +struct dmaster_defer_call { + struct dmaster_defer_call *next, *prev; + struct ctdb_context *ctdb; + struct ctdb_req_header *hdr; +}; + +struct dmaster_defer_queue { + struct ctdb_db_context *ctdb_db; + uint32_t generation; + struct dmaster_defer_call *deferred_calls; +}; + +static void dmaster_defer_reprocess(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, + void *private_data) +{ + struct dmaster_defer_call *call = talloc_get_type( + private_data, struct dmaster_defer_call); + + ctdb_input_pkt(call->ctdb, call->hdr); + talloc_free(call); +} + +static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq) +{ + /* Ignore requests, if database recovery happens in-between. */ + if (ddq->generation != ddq->ctdb_db->generation) { + return 0; + } + + while (ddq->deferred_calls != NULL) { + struct dmaster_defer_call *call = ddq->deferred_calls; + + DLIST_REMOVE(ddq->deferred_calls, call); + + talloc_steal(call->ctdb, call); + tevent_add_timer(call->ctdb->ev, call, timeval_zero(), + dmaster_defer_reprocess, call); + } + return 0; +} + +static void *insert_ddq_callback(void *parm, void *data) +{ + if (data) { + talloc_free(data); + } + return parm; +} + +/** + * This function is used to reigster a key in database that needs to be updated. + * Any requests for that key should get deferred till this is completed. + */ +static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db, + struct ctdb_req_header *hdr, + TDB_DATA key) +{ + uint32_t *k; + struct dmaster_defer_queue *ddq; + + k = ctdb_key_to_idkey(hdr, key); + if (k == NULL) { + DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n")); + return -1; + } + + /* Already exists */ + ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k); + if (ddq != NULL) { + if (ddq->generation == ctdb_db->generation) { + talloc_free(k); + return 0; + } + + /* Recovery ocurred - get rid of old queue. All the deferred + * requests will be resent anyway from ctdb_call_resend_db. + */ + talloc_free(ddq); + } + + ddq = talloc(hdr, struct dmaster_defer_queue); + if (ddq == NULL) { + DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n")); + talloc_free(k); + return -1; + } + ddq->ctdb_db = ctdb_db; + ddq->generation = hdr->generation; + ddq->deferred_calls = NULL; + + trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k, + insert_ddq_callback, ddq); + talloc_set_destructor(ddq, dmaster_defer_queue_destructor); + + talloc_free(k); + return 0; +} + +static int dmaster_defer_add(struct ctdb_db_context *ctdb_db, + struct ctdb_req_header *hdr, + TDB_DATA key) +{ + struct dmaster_defer_queue *ddq; + struct dmaster_defer_call *call; + uint32_t *k; + + k = ctdb_key_to_idkey(hdr, key); + if (k == NULL) { + DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n")); + return -1; + } + + ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k); + if (ddq == NULL) { + talloc_free(k); + return -1; + } + + talloc_free(k); + + if (ddq->generation != hdr->generation) { + talloc_set_destructor(ddq, NULL); + talloc_free(ddq); + return -1; + } + + call = talloc(ddq, struct dmaster_defer_call); + if (call == NULL) { + DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n")); + return -1; + } + + call->ctdb = ctdb_db->ctdb; + call->hdr = talloc_steal(call, hdr); + DLIST_ADD_END(ddq->deferred_calls, call); + + return 0; +} /* called when a CTDB_REQ_DMASTER packet comes in @@ -422,7 +582,7 @@ static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, */ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr; + struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr; TDB_DATA key, data, data2; struct ctdb_ltdb_header header; struct ctdb_db_context *ctdb_db; @@ -434,10 +594,11 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr key.dsize = c->keylen; data.dptr = c->data + c->keylen; data.dsize = c->datalen; - len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize + len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t); if (len <= c->hdr.length) { - record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen]; + memcpy(&record_flags, &c->data[c->keylen + c->datalen], + sizeof(record_flags)); } ctdb_db = find_ctdb_db(ctdb, c->db_id); @@ -447,7 +608,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr c->db_id); return; } - + + dmaster_defer_setup(ctdb_db, hdr, key); + /* fetch the current record */ ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2, ctdb_call_input_pkt, ctdb, false); @@ -461,9 +624,10 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr } if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) { - DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n", - ctdb->pnn, ctdb_lmaster(ctdb, &key), - hdr->generation, ctdb->vnn_map->generation)); + DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster " + "db=%s lmaster=%u gen=%u curgen=%u\n", + ctdb_db->db_name, ctdb_lmaster(ctdb, &key), + hdr->generation, ctdb_db->generation)); ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster"); } @@ -513,7 +677,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr } } -static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te, +static void ctdb_sticky_record_timeout(struct tevent_context *ev, + struct tevent_timer *te, struct timeval t, void *private_data) { struct ctdb_sticky_record *sr = talloc_get_type(private_data, @@ -537,16 +702,13 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_ uint32_t *k; struct ctdb_sticky_record *sr; - k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4); + k = ctdb_key_to_idkey(tmp_ctx, key); if (k == NULL) { DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n")); talloc_free(tmp_ctx); return -1; } - k[0] = (key.dsize + 3) / 4 + 1; - memcpy(&k[1], key.dptr, key.dsize); - sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]); if (sr != NULL) { talloc_free(tmp_ctx); @@ -570,7 +732,9 @@ ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_ trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr); - event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr); + tevent_add_timer(ctdb->ev, sr, + timeval_current_ofs(ctdb->tunable.sticky_duration, 0), + ctdb_sticky_record_timeout, sr); talloc_free(tmp_ctx); return 0; @@ -586,8 +750,9 @@ struct pinned_down_deferred_call { struct ctdb_req_header *hdr; }; -static void pinned_down_requeue(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) +static void pinned_down_requeue(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, void *private_data) { struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle); struct ctdb_context *ctdb = handle->ctdb; @@ -607,7 +772,8 @@ static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down) handle->hdr = pinned_down->hdr; talloc_steal(handle, handle->hdr); - event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle); + tevent_add_timer(ctdb->ev, handle, timeval_zero(), + pinned_down_requeue, handle); return 0; } @@ -620,16 +786,13 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context struct ctdb_sticky_record *sr; struct pinned_down_deferred_call *pinned_down; - k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4); + k = ctdb_key_to_idkey(tmp_ctx, key); if (k == NULL) { DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n")); talloc_free(tmp_ctx); return -1; } - k[0] = (key.dsize + 3) / 4 + 1; - memcpy(&k[1], key.dptr, key.dsize); - sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]); if (sr == NULL) { talloc_free(tmp_ctx); @@ -658,12 +821,13 @@ ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context } static void -ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount) +ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, + int count) { int i, id; /* smallest value is always at index 0 */ - if (hopcount <= ctdb_db->statistics.hot_keys[0].count) { + if (count <= ctdb_db->statistics.hot_keys[0].count) { return; } @@ -676,10 +840,10 @@ ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int continue; } /* found an entry for this key */ - if (hopcount <= ctdb_db->statistics.hot_keys[i].count) { + if (count <= ctdb_db->statistics.hot_keys[i].count) { return; } - ctdb_db->statistics.hot_keys[i].count = hopcount; + ctdb_db->statistics.hot_keys[i].count = count; goto sort_keys; } @@ -695,7 +859,10 @@ ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int } ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize; ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize); - ctdb_db->statistics.hot_keys[id].count = hopcount; + ctdb_db->statistics.hot_keys[id].count = count; + DEBUG(DEBUG_NOTICE, + ("Updated hot key database=%s key=0x%08x id=%d count=%d\n", + ctdb_db->db_name, ctdb_hash(&key), id, count)); sort_keys: for (i = 1; i < MAX_HOT_KEYS; i++) { @@ -703,9 +870,9 @@ sort_keys: continue; } if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) { - hopcount = ctdb_db->statistics.hot_keys[i].count; + count = ctdb_db->statistics.hot_keys[i].count; ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count; - ctdb_db->statistics.hot_keys[0].count = hopcount; + ctdb_db->statistics.hot_keys[0].count = count; key = ctdb_db->statistics.hot_keys[i].key; ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key; @@ -719,9 +886,9 @@ sort_keys: */ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_req_call *c = (struct ctdb_req_call *)hdr; + struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr; TDB_DATA data; - struct ctdb_reply_call *r; + struct ctdb_reply_call_old *r; int ret, len; struct ctdb_ltdb_header header; struct ctdb_call *call; @@ -759,11 +926,17 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) */ if (ctdb_db->sticky) { if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) { - DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name)); + DEBUG(DEBUG_WARNING, + ("Defer request for pinned down record in %s\n", ctdb_db->db_name)); + talloc_free(call); return; } } + if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) { + talloc_free(call); + return; + } /* determine if we are the dmaster for this key. This also fetches the record data (if any), thus avoiding a 2nd fetch of the data @@ -773,14 +946,16 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) ctdb_call_input_pkt, ctdb, false); if (ret == -1) { ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call"); + talloc_free(call); return; } if (ret == -2) { DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n")); + talloc_free(call); return; } - /* Dont do READONLY if we dont have a tracking database */ + /* Dont do READONLY if we don't have a tracking database */ if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) { c->flags &= ~CTDB_WANT_READONLY; } @@ -826,6 +1001,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) if (ret != 0) { DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret)); } + talloc_free(call); return; } @@ -880,12 +1056,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret)); } - len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header); + len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header); r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, - struct ctdb_reply_call); + struct ctdb_reply_call_old); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = c->hdr.srcnode; r->hdr.reqid = c->hdr.reqid; + r->hdr.generation = ctdb_db->generation; r->status = 0; r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header); header.rsn -= 2; @@ -902,6 +1079,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations); talloc_free(r); + talloc_free(call); return; } @@ -917,7 +1095,6 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) } CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]); CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]); - ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount); /* If this database supports sticky records, then check if the hopcount is big. If it is it means the record is hot and we @@ -928,16 +1105,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) } - /* if this nodes has done enough consecutive calls on the same record - then give them the record - or if the node requested an immediate migration - */ - if ( c->hdr.srcnode != ctdb->pnn && - ((header.laccessor == c->hdr.srcnode - && header.lacount >= ctdb->tunable.max_lacount - && ctdb->tunable.max_lacount != 0) - || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) { - if (ctdb_db->transaction_active) { + /* Try if possible to migrate the record off to the caller node. + * From the clients perspective a fetch of the data is just as + * expensive as a migration. + */ + if (c->hdr.srcnode != ctdb->pnn) { + if (ctdb_db->persistent_state) { DEBUG(DEBUG_INFO, (__location__ " refusing migration" " of key %s while transaction is active\n", (char *)call->key.dptr)); @@ -951,11 +1124,12 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) if (ret != 0) { DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret)); } - return; } + talloc_free(call); + return; } - ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode); + ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true); if (ret != 0) { DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n")); call->status = -1; @@ -966,12 +1140,13 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret)); } - len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize; + len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize; r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, - struct ctdb_reply_call); + struct ctdb_reply_call_old); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = hdr->srcnode; r->hdr.reqid = hdr->reqid; + r->hdr.generation = ctdb_db->generation; r->status = call->status; r->datalen = call->reply_data.dsize; if (call->reply_data.dsize) { @@ -981,6 +1156,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) ctdb_queue_packet(ctdb, &r->hdr); talloc_free(r); + talloc_free(call); } /** @@ -991,10 +1167,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) */ void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr; + struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr; struct ctdb_call_state *state; - state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); + state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) { DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid)); return; @@ -1079,16 +1255,16 @@ finished_ro: } -/* - called when a CTDB_REPLY_DMASTER packet comes in - - This packet comes in from the lmaster in response to a CTDB_REQ_CALL - request packet. It means that the current dmaster wants to give us - the dmaster role. -*/ +/** + * called when a CTDB_REPLY_DMASTER packet comes in + * + * This packet comes in from the lmaster in response to a CTDB_REQ_CALL + * request packet. It means that the current dmaster wants to give us + * the dmaster role. + */ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr; + struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr; struct ctdb_db_context *ctdb_db; TDB_DATA key, data; uint32_t record_flags = 0; @@ -1105,12 +1281,15 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) key.dsize = c->keylen; data.dptr = &c->data[key.dsize]; data.dsize = c->datalen; - len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t); if (len <= c->hdr.length) { - record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen]; + memcpy(&record_flags, &c->data[c->keylen + c->datalen], + sizeof(record_flags)); } + dmaster_defer_setup(ctdb_db, hdr, key); + ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, ctdb_call_input_pkt, ctdb, false); if (ret == -2) { @@ -1130,10 +1309,10 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) */ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) { - struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr; + struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr; struct ctdb_call_state *state; - state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state); + state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state); if (state == NULL) { DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n", ctdb->pnn, hdr->reqid)); @@ -1161,8 +1340,8 @@ void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr) */ static int ctdb_call_destructor(struct ctdb_call_state *state) { - DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state); - ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid); + DLIST_REMOVE(state->ctdb_db->pending_calls, state); + reqid_remove(state->ctdb_db->ctdb->idr, state->reqid); return 0; } @@ -1174,11 +1353,11 @@ static void ctdb_call_resend(struct ctdb_call_state *state) { struct ctdb_context *ctdb = state->ctdb_db->ctdb; - state->generation = ctdb->vnn_map->generation; + state->generation = state->ctdb_db->generation; /* use a new reqid, in case the old reply does eventually come in */ - ctdb_reqid_remove(ctdb, state->reqid); - state->reqid = ctdb_reqid_new(ctdb, state); + reqid_remove(ctdb->idr, state->reqid); + state->reqid = reqid_new(ctdb->idr, state); state->c->hdr.reqid = state->reqid; /* update the generation count for this request, so its valid with the new vnn_map */ @@ -1188,26 +1367,38 @@ static void ctdb_call_resend(struct ctdb_call_state *state) state->c->hdr.destnode = ctdb->pnn; ctdb_queue_packet(ctdb, &state->c->hdr); - DEBUG(DEBUG_NOTICE,("resent ctdb_call\n")); + DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n", + state->ctdb_db->db_name, state->reqid, state->generation)); } /* resend all pending calls on recovery */ -void ctdb_call_resend_all(struct ctdb_context *ctdb) +void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db) { struct ctdb_call_state *state, *next; - for (state=ctdb->pending_calls;state;state=next) { + + for (state = ctdb_db->pending_calls; state; state = next) { next = state->next; ctdb_call_resend(state); } } +void ctdb_call_resend_all(struct ctdb_context *ctdb) +{ + struct ctdb_db_context *ctdb_db; + + for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) { + ctdb_call_resend_db(ctdb_db); + } +} + /* this allows the caller to setup a async.fn */ -static void call_local_trigger(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) +static void call_local_trigger(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, void *private_data) { struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state); if (state->async.fn) { @@ -1242,12 +1433,13 @@ struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db, *(state->call) = *call; state->ctdb_db = ctdb_db; - ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn); + ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true); if (ret != 0) { DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret)); } - event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state); + tevent_add_timer(ctdb->ev, state, timeval_zero(), + call_local_trigger, state); return state; } @@ -1277,18 +1469,19 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd state->call = talloc(state, struct ctdb_call); CTDB_NO_MEMORY_NULL(ctdb, state->call); - state->reqid = ctdb_reqid_new(ctdb, state); + state->reqid = reqid_new(ctdb->idr, state); state->ctdb_db = ctdb_db; talloc_set_destructor(state, ctdb_call_destructor); - len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize; + len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize; state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len, - struct ctdb_req_call); + struct ctdb_req_call_old); CTDB_NO_MEMORY_NULL(ctdb, state->c); state->c->hdr.destnode = header->dmaster; /* this limits us to 16k outstanding messages - not unreasonable */ state->c->hdr.reqid = state->reqid; + state->c->hdr.generation = ctdb_db->generation; state->c->flags = call->flags; state->c->db_id = ctdb_db->db_id; state->c->callid = call->call_id; @@ -1303,9 +1496,9 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd state->call->key.dptr = &state->c->data[0]; state->state = CTDB_CALL_WAIT; - state->generation = ctdb->vnn_map->generation; + state->generation = ctdb_db->generation; - DLIST_ADD(ctdb->pending_calls, state); + DLIST_ADD(ctdb_db->pending_calls, state); ctdb_queue_packet(ctdb, &state->c->hdr); @@ -1321,7 +1514,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) { while (state->state < CTDB_CALL_DONE) { - event_loop_once(state->ctdb_db->ctdb->ev); + tevent_loop_once(state->ctdb_db->ctdb->ev); } if (state->state != CTDB_CALL_DONE) { ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg); @@ -1349,7 +1542,7 @@ int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call) */ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode) { - struct ctdb_req_keepalive *r; + struct ctdb_req_keepalive_old *r; if (ctdb->methods == NULL) { DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n")); @@ -1357,8 +1550,8 @@ void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode) } r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE, - sizeof(struct ctdb_req_keepalive), - struct ctdb_req_keepalive); + sizeof(struct ctdb_req_keepalive_old), + struct ctdb_req_keepalive_old); CTDB_NO_MEMORY_FATAL(ctdb, r); r->hdr.destnode = destnode; r->hdr.reqid = 0; @@ -1383,7 +1576,7 @@ struct revokechild_handle { struct revokechild_handle *next, *prev; struct ctdb_context *ctdb; struct ctdb_db_context *ctdb_db; - struct fd_event *fde; + struct tevent_fd *fde; int status; int fd[2]; pid_t child; @@ -1397,8 +1590,9 @@ struct revokechild_requeue_handle { void *ctx; }; -static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private_data) +static void deferred_call_requeue(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval t, void *private_data) { struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle); @@ -1410,7 +1604,6 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c { struct ctdb_context *ctdb = deferred_call->ctdb; struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle); - struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr; requeue_handle->ctdb = ctdb; requeue_handle->hdr = deferred_call->hdr; @@ -1418,8 +1611,13 @@ static int deferred_call_destructor(struct revokechild_deferred_call *deferred_c requeue_handle->ctx = deferred_call->ctx; talloc_steal(requeue_handle, requeue_handle->hdr); - /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */ - event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle); + /* Always delay revoke requests. Either wait for the read/write + * operation to complete, or if revoking failed wait for recovery to + * complete + */ + tevent_add_timer(ctdb->ev, requeue_handle, + timeval_current_ofs(1, 0), + deferred_call_requeue, requeue_handle); return 0; } @@ -1443,15 +1641,16 @@ static int revokechild_destructor(struct revokechild_handle *rc) return 0; } -static void revokechild_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private_data) +static void revokechild_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, void *private_data) { struct revokechild_handle *rc = talloc_get_type(private_data, struct revokechild_handle); int ret; char c; - ret = read(rc->fd[0], &c, 1); + ret = sys_read(rc->fd[0], &c, 1); if (ret != 1) { DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno)); rc->status = -1; @@ -1507,7 +1706,7 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat struct ctdb_revoke_state *revoke_state = private_data; struct ctdb_client_control_state *state; - state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data); + state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data); if (state == NULL) { DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n")); revoke_state->status = -1; @@ -1520,8 +1719,9 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat } -static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, - struct timeval yt, void *private_data) +static void ctdb_revoke_timeout_handler(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval yt, void *private_data) { struct ctdb_revoke_state *state = private_data; @@ -1533,7 +1733,8 @@ static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_e static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data) { struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state); - int status; + struct ctdb_ltdb_header new_header; + TDB_DATA new_data; state->ctdb_db = ctdb_db; state->key = key; @@ -1542,55 +1743,60 @@ static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state); - event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state); + tevent_add_timer(ctdb->ev, state, + timeval_current_ofs(ctdb->tunable.control_timeout, 0), + ctdb_revoke_timeout_handler, state); while (state->finished == 0) { - event_loop_once(ctdb->ev); + tevent_loop_once(ctdb->ev); } - status = state->status; - - if (status == 0) { - struct ctdb_ltdb_header new_header; - TDB_DATA new_data; + if (ctdb_ltdb_lock(ctdb_db, key) != 0) { + DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n")); + talloc_free(state); + return -1; + } + if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n")); + talloc_free(state); + return -1; + } + header->rsn++; + if (new_header.rsn > header->rsn) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n")); + talloc_free(state); + return -1; + } + if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) { + ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n")); + talloc_free(state); + return -1; + } - if (ctdb_ltdb_lock(ctdb_db, key) != 0) { - DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n")); - talloc_free(state); - return -1; - } - if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) { - ctdb_ltdb_unlock(ctdb_db, key); - DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n")); - talloc_free(state); - return -1; - } - header->rsn++; - if (new_header.rsn > header->rsn) { - ctdb_ltdb_unlock(ctdb_db, key); - DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n")); - talloc_free(state); - return -1; - } - if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) { - ctdb_ltdb_unlock(ctdb_db, key); - DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n")); - talloc_free(state); - return -1; - } + /* + * If revoke on all nodes succeed, revoke is complete. Otherwise, + * remove CTDB_REC_RO_REVOKING_READONLY flag and retry. + */ + if (state->status == 0) { new_header.rsn++; new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE; - if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) { - ctdb_ltdb_unlock(ctdb_db, key); - DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n")); - talloc_free(state); - return -1; - } + } else { + DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n")); + new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY; + } + if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) { ctdb_ltdb_unlock(ctdb_db, key); + DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n")); + talloc_free(state); + return -1; } + ctdb_ltdb_unlock(ctdb_db, key); talloc_free(state); - return status; + return 0; } @@ -1653,10 +1859,9 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex if (rc->child == 0) { char c = 0; close(rc->fd[0]); - debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name); - ctdb_set_process_name("ctdb_revokechild"); - if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) { + prctl_set_comment("ctdb_revokechild"); + if (switch_from_server_to_client(ctdb) != 0) { DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n")); c = 1; goto child_finished; @@ -1665,11 +1870,8 @@ int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_contex c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data); child_finished: - write(rc->fd[1], &c, 1); - /* make sure we die when our parent dies */ - while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) { - sleep(5); - } + sys_write(rc->fd[1], &c, 1); + ctdb_wait_for_process_to_exit(parent); _exit(0); } @@ -1678,11 +1880,10 @@ child_finished: set_close_on_exec(rc->fd[0]); /* This is an active revokechild child process */ - DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL); + DLIST_ADD_END(ctdb_db->revokechild_active, rc); - rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0], - EVENT_FD_READ, revokechild_handler, - (void *)rc); + rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ, + revokechild_handler, (void *)rc); if (rc->fde == NULL) { DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n")); talloc_free(rc); @@ -1730,3 +1931,73 @@ int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_cont return 0; } + +static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter, + void *private_data) +{ + struct ctdb_db_context *ctdb_db = talloc_get_type_abort( + private_data, struct ctdb_db_context); + int value; + + value = (counter < INT_MAX ? counter : INT_MAX); + ctdb_update_db_stat_hot_keys(ctdb_db, key, value); +} + +static void ctdb_migration_cleandb_event(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval current_time, + void *private_data) +{ + struct ctdb_db_context *ctdb_db = talloc_get_type_abort( + private_data, struct ctdb_db_context); + + if (ctdb_db->migratedb == NULL) { + return; + } + + hash_count_expire(ctdb_db->migratedb, NULL); + + te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb, + tevent_timeval_current_ofs(10, 0), + ctdb_migration_cleandb_event, ctdb_db); + if (te == NULL) { + DEBUG(DEBUG_ERR, + ("Memory error in migration cleandb event for %s\n", + ctdb_db->db_name)); + TALLOC_FREE(ctdb_db->migratedb); + } +} + +int ctdb_migration_init(struct ctdb_db_context *ctdb_db) +{ + struct timeval one_second = { 1, 0 }; + struct tevent_timer *te; + int ret; + + if (ctdb_db->persistent) { + return 0; + } + + ret = hash_count_init(ctdb_db, one_second, + ctdb_migration_count_handler, ctdb_db, + &ctdb_db->migratedb); + if (ret != 0) { + DEBUG(DEBUG_ERR, + ("Memory error in migration init for %s\n", + ctdb_db->db_name)); + return -1; + } + + te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb, + tevent_timeval_current_ofs(10, 0), + ctdb_migration_cleandb_event, ctdb_db); + if (te == NULL) { + DEBUG(DEBUG_ERR, + ("Memory error in migration init for %s\n", + ctdb_db->db_name)); + TALLOC_FREE(ctdb_db->migratedb); + return -1; + } + + return 0; +}