ctdb-client: Fix implementation of transaction commit
authorAmitay Isaacs <amitay@gmail.com>
Fri, 15 Apr 2016 07:44:14 +0000 (17:44 +1000)
committerMartin Schwenke <martins@samba.org>
Tue, 5 Jul 2016 08:53:14 +0000 (10:53 +0200)
There is no need to explicitly check that recovery is not active before
sending TRANS33_COMMIT control.  Just try TRANS3_COMMIT control and if
recovery occurs before the control is completed, the control will fail
and it can be retried.

Make sure g_lock lock is released after the transaction is complete.
Also, add timeout to the client api.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/client/client.h
ctdb/client/client_db.c

index 3ad098fd699297cf144d2abfac1b3a2eb377933c..f5b8601aa2caadfc35143658ba01b5182710a92b 100644 (file)
@@ -828,6 +828,7 @@ int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
 struct tevent_req *ctdb_transaction_commit_send(
                                        TALLOC_CTX *mem_ctx,
                                        struct tevent_context *ev,
+                                       struct timeval timeout,
                                        struct ctdb_transaction_handle *h);
 
 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr);
index e3b0ea1c187a5b7a9d5a1b1c55a8b62a84f910b7..c353ece1ec76864d44933d97ac2661205cb01159 100644 (file)
@@ -1970,20 +1970,23 @@ static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
 
 struct ctdb_transaction_commit_state {
        struct tevent_context *ev;
+       struct timeval timeout;
        struct ctdb_transaction_handle *h;
        uint64_t seqnum;
 };
 
-static void ctdb_transaction_commit_try(struct tevent_req *subreq);
 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
+static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
 
 struct tevent_req *ctdb_transaction_commit_send(
                                        TALLOC_CTX *mem_ctx,
                                        struct tevent_context *ev,
+                                       struct timeval timeout,
                                        struct ctdb_transaction_handle *h)
 {
        struct tevent_req *req, *subreq;
        struct ctdb_transaction_commit_state *state;
+       struct ctdb_req_control request;
        int ret;
 
        req = tevent_req_create(mem_ctx, &state,
@@ -1993,6 +1996,7 @@ struct tevent_req *ctdb_transaction_commit_send(
        }
 
        state->ev = ev;
+       state->timeout = timeout;
        state->h = h;
 
        ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
@@ -2007,42 +2011,18 @@ struct tevent_req *ctdb_transaction_commit_send(
                return tevent_req_post(req, ev);
        }
 
-       subreq = ctdb_recovery_wait_send(state, ev, h->client);
+       ctdb_req_control_trans3_commit(&request, h->recbuf);
+       subreq = ctdb_client_control_send(state, ev, h->client,
+                                         ctdb_client_pnn(h->client),
+                                         timeout, &request);
        if (tevent_req_nomem(subreq, req)) {
                return tevent_req_post(req, ev);
        }
-       tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
+       tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
 
        return req;
 }
 
-static void ctdb_transaction_commit_try(struct tevent_req *subreq)
-{
-       struct tevent_req *req = tevent_req_callback_data(
-               subreq, struct tevent_req);
-       struct ctdb_transaction_commit_state *state = tevent_req_data(
-               req, struct ctdb_transaction_commit_state);
-       struct ctdb_req_control request;
-       int ret;
-       bool status;
-
-       status = ctdb_recovery_wait_recv(subreq, &ret);
-       TALLOC_FREE(subreq);
-       if (! status) {
-               tevent_req_error(req, ret);
-               return;
-       }
-
-       ctdb_req_control_trans3_commit(&request, state->h->recbuf);
-       subreq = ctdb_client_control_send(state, state->ev, state->h->client,
-                                         state->h->client->pnn,
-                                         tevent_timeval_zero(), &request);
-       if (tevent_req_nomem(subreq, req)) {
-               return;
-       }
-       tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
-}
-
 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
 {
        struct tevent_req *req = tevent_req_callback_data(
@@ -2063,39 +2043,69 @@ static void ctdb_transaction_commit_done(struct tevent_req *subreq)
        }
 
        ret = ctdb_reply_control_trans3_commit(reply);
-       if (ret < 0) {
+       talloc_free(reply);
+
+       if (ret != 0) {
                /* Control failed due to recovery */
-               subreq = ctdb_recovery_wait_send(state, state->ev, h->client);
-               if (tevent_req_nomem(subreq, req)) {
+
+               ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
+               if (ret != 0) {
+                       tevent_req_error(req, ret);
                        return;
                }
-               tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
-                                       req);
-               return;
-       }
 
-       ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
-       if (ret != 0) {
-               tevent_req_error(req, ret);
-               return;
-       }
+               if (seqnum == state->seqnum) {
+                       struct ctdb_req_control request;
 
-       if (seqnum == state->seqnum) {
-               /* try again */
-               subreq = ctdb_recovery_wait_send(state, state->ev, h->client);
-               if (tevent_req_nomem(subreq, req)) {
+                       /* try again */
+                       ctdb_req_control_trans3_commit(&request,
+                                                      state->h->recbuf);
+                       subreq = ctdb_client_control_send(
+                                       state, state->ev, state->h->client,
+                                       ctdb_client_pnn(state->h->client),
+                                       state->timeout, &request);
+                       if (tevent_req_nomem(subreq, req)) {
+                               return;
+                       }
+                       tevent_req_set_callback(subreq,
+                                               ctdb_transaction_commit_done,
+                                               req);
                        return;
                }
-               tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
-                                       req);
+
+               if (seqnum != state->seqnum + 1) {
+                       tevent_req_error(req, EIO);
+                       return;
+               }
+       }
+
+       /* trans3_commit successful */
+       subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
+                                        h->db_g_lock, h->lock_name, h->sid);
+       if (tevent_req_nomem(subreq, req)) {
                return;
        }
+       tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
+                               req);
+}
 
-       if (seqnum != state->seqnum + 1) {
-               tevent_req_error(req, EIO);
+static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_transaction_commit_state *state = tevent_req_data(
+               req, struct ctdb_transaction_commit_state);
+       int ret;
+       bool status;
+
+       status = ctdb_g_lock_unlock_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, ret);
                return;
        }
 
+       talloc_free(state->h);
        tevent_req_done(req);
 }
 
@@ -2115,30 +2125,37 @@ bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
 
 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
 {
+       struct tevent_context *ev = h->ev;
+       TALLOC_CTX *mem_ctx;
        struct tevent_req *req;
        int ret;
        bool status;
 
        if (h->readonly || ! h->updated) {
-               talloc_free(h);
-               return 0;
+               return ctdb_transaction_cancel(h);
        }
 
-       req = ctdb_transaction_commit_send(h, h->ev, h);
+       mem_ctx = talloc_new(NULL);
+       if (mem_ctx == NULL) {
+               return ENOMEM;
+       }
+
+       req = ctdb_transaction_commit_send(mem_ctx, ev,
+                                          tevent_timeval_zero(), h);
        if (req == NULL) {
-               talloc_free(h);
+               talloc_free(mem_ctx);
                return ENOMEM;
        }
 
-       tevent_req_poll(req, h->ev);
+       tevent_req_poll(req, ev);
 
        status = ctdb_transaction_commit_recv(req, &ret);
        if (! status) {
-               talloc_free(h);
+               talloc_free(mem_ctx);
                return ret;
        }
 
-       talloc_free(h);
+       talloc_free(mem_ctx);
        return 0;
 }