From b6d9a0396fb4b325778d3810dc656f719f31b9f1 Mon Sep 17 00:00:00 2001 From: Andrew Tridgell Date: Mon, 4 Aug 2008 14:51:51 +1000 Subject: [PATCH] implemented replayable transactions in ctdb to prevent deadlock --- client/ctdb_client.c | 176 ++++++++++++++++++++++++++++++++------- server/ctdb_persistent.c | 6 +- 2 files changed, 146 insertions(+), 36 deletions(-) diff --git a/client/ctdb_client.c b/client/ctdb_client.c index 5004a69b..48eb19d9 100644 --- a/client/ctdb_client.c +++ b/client/ctdb_client.c @@ -2956,7 +2956,12 @@ int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, struct ctdb_transaction_handle { struct ctdb_db_context *ctdb_db; - struct ctdb_marshall_buffer *m; + bool in_replay; + /* we store the reads and writes done under a transaction one + list stores both reads and writes, the other just writes + */ + struct ctdb_marshall_buffer *m_all; + struct ctdb_marshall_buffer *m_write; }; /* start a transaction on a database */ @@ -2967,33 +2972,32 @@ static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h) } /* start a transaction on a database */ -struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db, - TALLOC_CTX *mem_ctx) +static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h) { struct ctdb_record_handle *rh; - struct ctdb_transaction_handle *h; TDB_DATA key; struct ctdb_ltdb_header header; TALLOC_CTX *tmp_ctx; const char *keyname = CTDB_TRANSACTION_LOCK_KEY; int ret; + struct ctdb_db_context *ctdb_db = h->ctdb_db; key.dptr = discard_const(keyname); key.dsize = strlen(keyname); if (!ctdb_db->persistent) { DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n")); - return NULL; + return -1; } again: - tmp_ctx = talloc_new(mem_ctx); + tmp_ctx = talloc_new(h); rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL); if (rh == NULL) { DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n")); talloc_free(tmp_ctx); - return NULL; + return -1; } talloc_free(rh); @@ -3001,7 +3005,7 @@ again: if (ret != 0) { DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n")); talloc_free(tmp_ctx); - return NULL; + return -1; } ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL); @@ -3013,16 +3017,32 @@ again: talloc_free(tmp_ctx); + return 0; +} + + +/* start a transaction on a database */ +struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db, + TALLOC_CTX *mem_ctx) +{ + struct ctdb_transaction_handle *h; + int ret; + /* we have a good transaction */ h = talloc_zero(mem_ctx, struct ctdb_transaction_handle); if (h == NULL) { - tdb_transaction_cancel(ctdb_db->ltdb->tdb); DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n")); return NULL; } h->ctdb_db = ctdb_db; + ret = ctdb_transaction_fetch_start(h); + if (ret != 0) { + talloc_free(h); + return NULL; + } + talloc_set_destructor(h, ctdb_transaction_destructor); return h; @@ -3046,9 +3066,22 @@ int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, if (ret == -1 && header.dmaster == (uint32_t)-1) { /* record doesn't exist yet */ *data = tdb_null; - return 0; + ret = 0; } - return ret; + + if (ret != 0) { + return ret; + } + + if (!h->in_replay) { + h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data); + if (h->m_all == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + return -1; + } + } + + return 0; } /* @@ -3079,11 +3112,20 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h, header.rsn++; - h->m = ctdb_marshall_add(h, h->m, h->ctdb_db->db_id, 0, key, &header, data); - if (h->m == NULL) { - DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); - talloc_free(tmp_ctx); - return -1; + if (!h->in_replay) { + h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data); + if (h->m_all == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + talloc_free(tmp_ctx); + return -1; + } + + h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data); + if (h->m_write == NULL) { + DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n")); + talloc_free(tmp_ctx); + return -1; + } } ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data); @@ -3093,6 +3135,61 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h, return ret; } +/* + replay a transaction + */ +static int ctdb_replay_transaction(struct ctdb_transaction_handle *h) +{ + int ret, i; + struct ctdb_rec_data *rec = NULL; + + h->in_replay = true; + + ret = ctdb_transaction_fetch_start(h); + if (ret != 0) { + return ret; + } + + for (i=0;im_all->count;i++) { + TDB_DATA key, data; + + rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data); + if (rec == NULL) { + DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n")); + goto failed; + } + + if (rec->reqid == 0) { + /* its a store */ + if (ctdb_transaction_store(h, key, data) != 0) { + goto failed; + } + } else { + TDB_DATA data2; + TALLOC_CTX *tmp_ctx = talloc_new(h); + + if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) { + talloc_free(tmp_ctx); + goto failed; + } + if (data2.dsize != data.dsize || + memcmp(data2.dptr, data.dptr, data.dsize) != 0) { + /* the record has changed on us - we have to give up */ + talloc_free(tmp_ctx); + goto failed; + } + talloc_free(tmp_ctx); + } + } + + return 0; + +failed: + tdb_transaction_cancel(h->ctdb_db->ltdb->tdb); + return -1; +} + + /* commit a transaction */ @@ -3101,23 +3198,47 @@ int ctdb_transaction_commit(struct ctdb_transaction_handle *h) int ret; int32_t status; struct ctdb_context *ctdb = h->ctdb_db->ctdb; + struct timeval timeout; talloc_set_destructor(h, NULL); - if (h->m == NULL) { + if (h->m_write == NULL) { /* no changes were made */ talloc_free(h); return 0; } + /* our commit strategy is quite complex. + + - we first try to commit the changes to all other nodes + + - if that works, then we commit locally and we are done + + - if a commit on another node fails, then we need to cancel + the transaction, then restart the transaction (thus + opening a window of time for a pending recovery to + complete), then replay the transaction, checking all the + reads and writes (checking that reads give the same data, + and writes succeed). Then we retry the transaction to the + other nodes + */ + +again: /* tell ctdbd to commit to the other nodes */ + timeout = timeval_current_ofs(1, 0); ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, CTDB_CONTROL_TRANS2_COMMIT, 0, - ctdb_marshall_finish(h->m), NULL, NULL, &status, NULL, NULL); + ctdb_marshall_finish(h->m_write), NULL, NULL, &status, + &timeout, NULL); if (ret != 0 || status != 0) { - DEBUG(DEBUG_ERR,(__location__ " Control failed for remote transaction commit\n")); - talloc_free(h); - return -1; + tdb_transaction_cancel(h->ctdb_db->ltdb->tdb); + sleep(1); + if (ctdb_replay_transaction(h) != 0) { + DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n")); + talloc_free(h); + return -1; + } + goto again; } /* do the real commit locally */ @@ -3132,16 +3253,9 @@ int ctdb_transaction_commit(struct ctdb_transaction_handle *h) } /* tell ctdbd that we are finished with our local commit */ - ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, - CTDB_CONTROL_TRANS2_FINISHED, 0, - tdb_null, NULL, NULL, &status, NULL, NULL); - if (ret != 0 || status != 0) { - DEBUG(DEBUG_ERR,(__location__ " Control failed to finish transaction commit\n")); - talloc_free(h); - return -1; - } - - + ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, + CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, + tdb_null, NULL, NULL, &status, NULL, NULL); talloc_free(h); return 0; } diff --git a/server/ctdb_persistent.c b/server/ctdb_persistent.c index 81c62399..77cff9c6 100644 --- a/server/ctdb_persistent.c +++ b/server/ctdb_persistent.c @@ -50,8 +50,6 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb, status, errormsg)); state->status = status; state->errormsg = errormsg; - DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n")); - ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE; } state->num_pending--; if (state->num_pending == 0) { @@ -69,8 +67,6 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state); ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state"); - DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n")); - state->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE; talloc_free(state); } @@ -415,7 +411,7 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb, struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr; if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) { - DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_update_record when recovery active\n")); + DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n")); return -1; } -- 2.34.1