persistent: add a client context to the persistent_stat and track the db_id
[sahlberg/ctdb.git] / server / ctdb_persistent.c
index b686cbdee86ed2ba3af60d710849c0ec293154a4..0627037ea9246ef2dbe70df3de243aa0355a9451 100644 (file)
@@ -19,7 +19,7 @@
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "system/filesys.h"
 #include "system/wait.h"
 #include "db_wrap.h"
@@ -28,6 +28,8 @@
 
 struct ctdb_persistent_state {
        struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db; /* used by trans3_commit */
+       struct ctdb_client *client; /* used by trans3_commit */
        struct ctdb_req_control *c;
        const char *errormsg;
        uint32_t num_pending;
@@ -52,27 +54,48 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb,
 {
        struct ctdb_persistent_state *state = talloc_get_type(private_data, 
                                                              struct ctdb_persistent_state);
+       enum ctdb_trans2_commit_error etype;
+
+       if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
+                                  "during recovery\n"));
+               return;
+       }
 
        if (status != 0) {
                DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
-                        status, errormsg));
+                        status, errormsg?errormsg:"no error message given"));
                state->status = status;
                state->errormsg = errormsg;
                state->num_failed++;
+
+               /*
+                * If a node failed to complete the update_record control,
+                * then either a recovery is already running or something
+                * bad is going on. So trigger a recovery and let the
+                * recovery finish the transaction, sending back the reply
+                * for the trans3_commit control to the client.
+                */
+               ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+               return;
        }
+
        state->num_pending--;
-       if (state->num_pending == 0) {
-               enum ctdb_trans2_commit_error etype;
-               if (state->num_failed == state->num_sent) {
-                       etype = CTDB_TRANS2_COMMIT_ALLFAIL;
-               } else if (state->num_failed != 0) {
-                       etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
-               } else {
-                       etype = CTDB_TRANS2_COMMIT_SUCCESS;
-               }
-               ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
-               talloc_free(state);
+
+       if (state->num_pending != 0) {
+               return;
+       }
+
+       if (state->num_failed == state->num_sent) {
+               etype = CTDB_TRANS2_COMMIT_ALLFAIL;
+       } else if (state->num_failed != 0) {
+               etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
+       } else {
+               etype = CTDB_TRANS2_COMMIT_SUCCESS;
        }
+
+       ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
+       talloc_free(state);
 }
 
 /*
@@ -82,7 +105,13 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed
                                         struct timeval t, void *private_data)
 {
        struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
-       
+
+       if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
+                                  "timeout during recovery\n"));
+               return;
+       }
+
        ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
                                   "timeout in ctdb_persistent_state");
 
@@ -247,6 +276,18 @@ int32_t ctdb_control_trans2_commit(struct ctdb_context *ctdb,
        return 0;
 }
 
+static int ctdb_persistent_state_destructor(struct ctdb_persistent_state *state)
+{
+       if (state->client != NULL) {
+               state->client->db_id = 0;
+       }
+
+       if (state->ctdb_db != NULL) {
+               state->ctdb_db->persistent_state = NULL;
+       }
+
+       return 0;
+}
 
 /*
  * Store a set of persistent records.
@@ -267,6 +308,21 @@ int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
                return -1;
        }
 
+       client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
+       if (client == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
+                                "to a client. Returning error\n"));
+               return -1;
+       }
+
+       if (client->db_id != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ERROR: trans3_commit: "
+                                "client-db_id[0x%08x] != 0 "
+                                "(client_id[0x%08x]): trans3_commit active?\n",
+                                client->db_id, client->client_id));
+               return -1;
+       }
+
        ctdb_db = find_ctdb_db(ctdb, m->db_id);
        if (ctdb_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " ctdb_control_trans3_commit: "
@@ -274,18 +330,27 @@ int32_t ctdb_control_trans3_commit(struct ctdb_context *ctdb,
                return -1;
        }
 
-       client = ctdb_reqid_find(ctdb, c->client_id, struct ctdb_client);
-       if (client == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " can not match persistent_store "
-                                "to a client. Returning error\n"));
+       if (ctdb_db->persistent_state != NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Error: "
+                                 "ctdb_control_trans3_commit "
+                                 "called while a transaction commit is "
+                                 "active. db_id[0x%08x]\n", m->db_id));
                return -1;
        }
 
-       state = talloc_zero(ctdb, struct ctdb_persistent_state);
-       CTDB_NO_MEMORY(ctdb, state);
+       ctdb_db->persistent_state = talloc_zero(ctdb_db,
+                                               struct ctdb_persistent_state);
+       CTDB_NO_MEMORY(ctdb, ctdb_db->persistent_state);
+
+       client->db_id = m->db_id;
 
+       state = ctdb_db->persistent_state;
        state->ctdb = ctdb;
+       state->ctdb_db = ctdb_db;
        state->c    = c;
+       state->client = client;
+
+       talloc_set_destructor(state, ctdb_persistent_state_destructor);
 
        for (i = 0; i < ctdb->vnn_map->size; i++) {
                struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
@@ -455,7 +520,7 @@ struct childwrite_handle {
 
 static int childwrite_destructor(struct childwrite_handle *h)
 {
-       h->ctdb->statistics.pending_childwrite_calls--;
+       CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
        kill(h->child, SIGKILL);
        return 0;
 }
@@ -475,8 +540,8 @@ static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
        int ret;
        char c;
 
-       ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
-       h->ctdb->statistics.pending_childwrite_calls--;
+       CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
+       CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
 
        /* the handle needs to go away when the context is gone - when
           the handle goes away this implicitly closes the pipe, which
@@ -508,11 +573,11 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
        int ret;
        pid_t parent = getpid();
 
-       ctdb_db->ctdb->statistics.childwrite_calls++;
-       ctdb_db->ctdb->statistics.pending_childwrite_calls++;
+       CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
+       CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
 
        if (!(result = talloc_zero(state, struct childwrite_handle))) {
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
 
@@ -520,17 +585,17 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
 
        if (ret != 0) {
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
 
-       result->child = fork();
+       result->child = ctdb_fork(ctdb_db->ctdb);
 
        if (result->child == (pid_t)-1) {
                close(result->fd[0]);
                close(result->fd[1]);
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
 
@@ -543,6 +608,7 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
                char c = 0;
 
                close(result->fd[0]);
+               debug_extra = talloc_asprintf(NULL, "childwrite-%s:", ctdb_db->db_name);
                ret = ctdb_persistent_store(state);
                if (ret != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " Failed to write persistent data\n"));
@@ -563,16 +629,17 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
 
        talloc_set_destructor(result, childwrite_destructor);
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
 
        result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
-                                  EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,
+                                  EVENT_FD_READ, childwrite_handler,
                                   (void *)result);
        if (result->fde == NULL) {
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
+       tevent_fd_set_auto_close(result->fde);
 
        result->start_time = timeval_current();