persistent_callback: print "no error message given" instead of "(null)"
[sahlberg/ctdb.git] / server / ctdb_persistent.c
index dc05cbfa90b0c91e005d4cfe0a6f6528e40daf80..a6fcf48e3a643099ff25ab91aa805990bf69e795 100644 (file)
@@ -52,27 +52,48 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb,
 {
        struct ctdb_persistent_state *state = talloc_get_type(private_data, 
                                                              struct ctdb_persistent_state);
+       enum ctdb_trans2_commit_error etype;
+
+       if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
+                                  "during recovery\n"));
+               return;
+       }
 
        if (status != 0) {
                DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
-                        status, errormsg));
+                        status, errormsg?errormsg:"no error message given"));
                state->status = status;
                state->errormsg = errormsg;
                state->num_failed++;
+
+               /*
+                * If a node failed to complete the update_record control,
+                * then either a recovery is already running or something
+                * bad is going on. So trigger a recovery and let the
+                * recovery finish the transaction, sending back the reply
+                * for the trans3_commit control to the client.
+                */
+               ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+               return;
        }
+
        state->num_pending--;
-       if (state->num_pending == 0) {
-               enum ctdb_trans2_commit_error etype;
-               if (state->num_failed == state->num_sent) {
-                       etype = CTDB_TRANS2_COMMIT_ALLFAIL;
-               } else if (state->num_failed != 0) {
-                       etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
-               } else {
-                       etype = CTDB_TRANS2_COMMIT_SUCCESS;
-               }
-               ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
-               talloc_free(state);
+
+       if (state->num_pending != 0) {
+               return;
        }
+
+       if (state->num_failed == state->num_sent) {
+               etype = CTDB_TRANS2_COMMIT_ALLFAIL;
+       } else if (state->num_failed != 0) {
+               etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
+       } else {
+               etype = CTDB_TRANS2_COMMIT_SUCCESS;
+       }
+
+       ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
+       talloc_free(state);
 }
 
 /*
@@ -82,7 +103,13 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed
                                         struct timeval t, void *private_data)
 {
        struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
-       
+
+       if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
+                                  "timeout during recovery\n"));
+               return;
+       }
+
        ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
                                   "timeout in ctdb_persistent_state");
 
@@ -455,7 +482,7 @@ struct childwrite_handle {
 
 static int childwrite_destructor(struct childwrite_handle *h)
 {
-       h->ctdb->statistics.pending_childwrite_calls--;
+       CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
        kill(h->child, SIGKILL);
        return 0;
 }
@@ -475,8 +502,8 @@ static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
        int ret;
        char c;
 
-       ctdb_latency(h->ctdb_db, "persistent", &h->ctdb->statistics.max_childwrite_latency, h->start_time);
-       h->ctdb->statistics.pending_childwrite_calls--;
+       CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
+       CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
 
        /* the handle needs to go away when the context is gone - when
           the handle goes away this implicitly closes the pipe, which
@@ -508,11 +535,11 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
        int ret;
        pid_t parent = getpid();
 
-       ctdb_db->ctdb->statistics.childwrite_calls++;
-       ctdb_db->ctdb->statistics.pending_childwrite_calls++;
+       CTDB_INCREMENT_STAT(ctdb_db->ctdb, childwrite_calls);
+       CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
 
        if (!(result = talloc_zero(state, struct childwrite_handle))) {
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
 
@@ -520,17 +547,17 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
 
        if (ret != 0) {
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
 
-       result->child = fork();
+       result->child = ctdb_fork(ctdb_db->ctdb);
 
        if (result->child == (pid_t)-1) {
                close(result->fd[0]);
                close(result->fd[1]);
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
 
@@ -571,7 +598,7 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
                                   (void *)result);
        if (result->fde == NULL) {
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_childwrite_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_childwrite_calls);
                return NULL;
        }
        tevent_fd_set_auto_close(result->fde);