persistent_callback: print "no error message given" instead of "(null)"
[sahlberg/ctdb.git] / server / ctdb_persistent.c
index 8672c5b3094b27d533166a4339b11875a92b3355..a6fcf48e3a643099ff25ab91aa805990bf69e795 100644 (file)
@@ -52,27 +52,48 @@ static void ctdb_persistent_callback(struct ctdb_context *ctdb,
 {
        struct ctdb_persistent_state *state = talloc_get_type(private_data, 
                                                              struct ctdb_persistent_state);
+       enum ctdb_trans2_commit_error etype;
+
+       if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(DEBUG_INFO, ("ctdb_persistent_callback: ignoring reply "
+                                  "during recovery\n"));
+               return;
+       }
 
        if (status != 0) {
                DEBUG(DEBUG_ERR,("ctdb_persistent_callback failed with status %d (%s)\n",
-                        status, errormsg));
+                        status, errormsg?errormsg:"no error message given"));
                state->status = status;
                state->errormsg = errormsg;
                state->num_failed++;
+
+               /*
+                * If a node failed to complete the update_record control,
+                * then either a recovery is already running or something
+                * bad is going on. So trigger a recovery and let the
+                * recovery finish the transaction, sending back the reply
+                * for the trans3_commit control to the client.
+                */
+               ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
+               return;
        }
+
        state->num_pending--;
-       if (state->num_pending == 0) {
-               enum ctdb_trans2_commit_error etype;
-               if (state->num_failed == state->num_sent) {
-                       etype = CTDB_TRANS2_COMMIT_ALLFAIL;
-               } else if (state->num_failed != 0) {
-                       etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
-               } else {
-                       etype = CTDB_TRANS2_COMMIT_SUCCESS;
-               }
-               ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
-               talloc_free(state);
+
+       if (state->num_pending != 0) {
+               return;
        }
+
+       if (state->num_failed == state->num_sent) {
+               etype = CTDB_TRANS2_COMMIT_ALLFAIL;
+       } else if (state->num_failed != 0) {
+               etype = CTDB_TRANS2_COMMIT_SOMEFAIL;
+       } else {
+               etype = CTDB_TRANS2_COMMIT_SUCCESS;
+       }
+
+       ctdb_request_control_reply(state->ctdb, state->c, NULL, etype, state->errormsg);
+       talloc_free(state);
 }
 
 /*
@@ -82,7 +103,13 @@ static void ctdb_persistent_store_timeout(struct event_context *ev, struct timed
                                         struct timeval t, void *private_data)
 {
        struct ctdb_persistent_state *state = talloc_get_type(private_data, struct ctdb_persistent_state);
-       
+
+       if (state->ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
+               DEBUG(DEBUG_INFO, ("ctdb_persistent_store_timeout: ignoring "
+                                  "timeout during recovery\n"));
+               return;
+       }
+
        ctdb_request_control_reply(state->ctdb, state->c, NULL, CTDB_TRANS2_COMMIT_TIMEOUT, 
                                   "timeout in ctdb_persistent_state");
 
@@ -475,7 +502,7 @@ static void childwrite_handler(struct event_context *ev, struct fd_event *fde,
        int ret;
        char c;
 
-       CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", max_childwrite_latency, h->start_time);
+       CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "persistent", childwrite_latency, h->start_time);
        CTDB_DECREMENT_STAT(h->ctdb, pending_childwrite_calls);
 
        /* the handle needs to go away when the context is gone - when
@@ -524,7 +551,7 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
                return NULL;
        }
 
-       result->child = fork();
+       result->child = ctdb_fork(ctdb_db->ctdb);
 
        if (result->child == (pid_t)-1) {
                close(result->fd[0]);