implemented replayable transactions in ctdb to prevent deadlock tridge obnox/tridge origin/tridge
authorAndrew Tridgell <tridge@samba.org>
Mon, 4 Aug 2008 04:51:51 +0000 (14:51 +1000)
committerAndrew Tridgell <tridge@samba.org>
Mon, 4 Aug 2008 04:51:51 +0000 (14:51 +1000)
client/ctdb_client.c
server/ctdb_persistent.c

index 5004a69b0e9fb280a62b8f0f916862a1dc1b47e8..48eb19d969e6956521d4ed01ecb9631bf68444f5 100644 (file)
@@ -2956,7 +2956,12 @@ int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout,
 
 struct ctdb_transaction_handle {
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_marshall_buffer *m;
+       bool in_replay;
+       /* we store the reads and writes done under a transaction one
+          list stores both reads and writes, the other just writes
+       */
+       struct ctdb_marshall_buffer *m_all;
+       struct ctdb_marshall_buffer *m_write;
 };
 
 /* start a transaction on a database */
@@ -2967,33 +2972,32 @@ static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
 }
 
 /* start a transaction on a database */
-struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
-                                                      TALLOC_CTX *mem_ctx)
+static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
 {
        struct ctdb_record_handle *rh;
-       struct ctdb_transaction_handle *h;
        TDB_DATA key;
        struct ctdb_ltdb_header header;
        TALLOC_CTX *tmp_ctx;
        const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
        int ret;
+       struct ctdb_db_context *ctdb_db = h->ctdb_db;
 
        key.dptr = discard_const(keyname);
        key.dsize = strlen(keyname);
 
        if (!ctdb_db->persistent) {
                DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
-               return NULL;
+               return -1;
        }
 
 again:
-       tmp_ctx = talloc_new(mem_ctx);
+       tmp_ctx = talloc_new(h);
 
        rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
        if (rh == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));             
                talloc_free(tmp_ctx);
-               return NULL;
+               return -1;
        }
        talloc_free(rh);
 
@@ -3001,7 +3005,7 @@ again:
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
                talloc_free(tmp_ctx);
-               return NULL;
+               return -1;
        }
 
        ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, NULL);
@@ -3013,16 +3017,32 @@ again:
 
        talloc_free(tmp_ctx);
 
+       return 0;
+}
+
+
+/* start a transaction on a database */
+struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
+                                                      TALLOC_CTX *mem_ctx)
+{
+       struct ctdb_transaction_handle *h;
+       int ret;
+
        /* we have a good transaction */
        h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
        if (h == NULL) {
-               tdb_transaction_cancel(ctdb_db->ltdb->tdb);
                DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
                return NULL;
        }
 
        h->ctdb_db = ctdb_db;
 
+       ret = ctdb_transaction_fetch_start(h);
+       if (ret != 0) {
+               talloc_free(h);
+               return NULL;
+       }
+
        talloc_set_destructor(h, ctdb_transaction_destructor);
 
        return h;
@@ -3046,9 +3066,22 @@ int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
        if (ret == -1 && header.dmaster == (uint32_t)-1) {
                /* record doesn't exist yet */
                *data = tdb_null;
-               return 0;
+               ret = 0;
        }
-       return ret;
+       
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (!h->in_replay) {
+               h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+               if (h->m_all == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+                       return -1;
+               }
+       }
+
+       return 0;
 }
 
 /*
@@ -3079,11 +3112,20 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h,
 
        header.rsn++;
 
-       h->m = ctdb_marshall_add(h, h->m, h->ctdb_db->db_id, 0, key, &header, data);
-       if (h->m == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       if (!h->in_replay) {
+               h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+               if (h->m_all == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
+               
+               h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
+               if (h->m_write == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+                       talloc_free(tmp_ctx);
+                       return -1;
+               }
        }
        
        ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
@@ -3093,6 +3135,61 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h,
        return ret;
 }
 
+/*
+  replay a transaction
+ */
+static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
+{
+       int ret, i;
+       struct ctdb_rec_data *rec = NULL;
+
+       h->in_replay = true;
+
+       ret = ctdb_transaction_fetch_start(h);
+       if (ret != 0) {
+               return ret;
+       }
+
+       for (i=0;i<h->m_all->count;i++) {
+               TDB_DATA key, data;
+
+               rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
+               if (rec == NULL) {
+                       DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
+                       goto failed;
+               }
+
+               if (rec->reqid == 0) {
+                       /* its a store */
+                       if (ctdb_transaction_store(h, key, data) != 0) {
+                               goto failed;
+                       }
+               } else {
+                       TDB_DATA data2;
+                       TALLOC_CTX *tmp_ctx = talloc_new(h);
+
+                       if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
+                               talloc_free(tmp_ctx);
+                               goto failed;
+                       }
+                       if (data2.dsize != data.dsize ||
+                           memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
+                               /* the record has changed on us - we have to give up */
+                               talloc_free(tmp_ctx);
+                               goto failed;
+                       }
+                       talloc_free(tmp_ctx);
+               }
+       }
+       
+       return 0;
+
+failed:
+       tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
+       return -1;
+}
+
+
 /*
   commit a transaction
  */
@@ -3101,23 +3198,47 @@ int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
        int ret;
        int32_t status;
        struct ctdb_context *ctdb = h->ctdb_db->ctdb;
+       struct timeval timeout;
 
        talloc_set_destructor(h, NULL);
 
-       if (h->m == NULL) {
+       if (h->m_write == NULL) {
                /* no changes were made */
                talloc_free(h);
                return 0;
        }
 
+       /* our commit strategy is quite complex.
+
+          - we first try to commit the changes to all other nodes
+
+          - if that works, then we commit locally and we are done
+
+          - if a commit on another node fails, then we need to cancel
+            the transaction, then restart the transaction (thus
+            opening a window of time for a pending recovery to
+            complete), then replay the transaction, checking all the
+            reads and writes (checking that reads give the same data,
+            and writes succeed). Then we retry the transaction to the
+            other nodes
+       */
+
+again:
        /* tell ctdbd to commit to the other nodes */
+       timeout = timeval_current_ofs(1, 0);
        ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
                           CTDB_CONTROL_TRANS2_COMMIT, 0, 
-                          ctdb_marshall_finish(h->m), NULL, NULL, &status, NULL, NULL);
+                          ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
+                          &timeout, NULL);
        if (ret != 0 || status != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Control failed for remote transaction commit\n"));
-               talloc_free(h);
-               return -1;
+               tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
+               sleep(1);
+               if (ctdb_replay_transaction(h) != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " Failed to replay transaction\n"));
+                       talloc_free(h);
+                       return -1;
+               }
+               goto again;
        }
 
        /* do the real commit locally */
@@ -3132,16 +3253,9 @@ int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
        }
 
        /* tell ctdbd that we are finished with our local commit */
-       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
-                          CTDB_CONTROL_TRANS2_FINISHED, 0, 
-                          tdb_null, NULL, NULL, &status, NULL, NULL);
-       if (ret != 0 || status != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Control failed to finish transaction commit\n"));
-               talloc_free(h);
-               return -1;
-       }
-
-
+       ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
+                    CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
+                    tdb_null, NULL, NULL, &status, NULL, NULL);
        talloc_free(h);
        return 0;
 }
index 81c623999a8a70e6a902ae9320cf27e73b3a5eef..77cff9c6e3211cb4fb15d3d02c53b00008cef79f 100644 (file)
@@ -50,8 +50,6 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb,
                         status, errormsg));
                state->status = status;
                state->errormsg = errormsg;
-               DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
-               ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
        }
        state->num_pending--;
        if (state->num_pending == 0) {
@@ -69,8 +67,6 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed
        struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
        
        ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_state");
-       DEBUG(DEBUG_ERR,(__location__ " Forcing recovery\n"));
-       state->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
 
        talloc_free(state);
 }
@@ -415,7 +411,7 @@ int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
        struct ctdb_marshall_buffer *m = (struct ctdb_marshall_buffer *)recdata.dptr;
 
        if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
-               DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_update_record when recovery active\n"));
+               DEBUG(DEBUG_INFO,("rejecting ctdb_control_update_record when recovery active\n"));
                return -1;
        }