freeze: abort vacuuming when we're going to freeze.
authorRusty Russell <rusty@rustcorp.com.au>
Wed, 21 Jul 2010 02:59:55 +0000 (12:29 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Wed, 21 Jul 2010 02:59:55 +0000 (12:29 +0930)
There are some reports of freeze timeouts, and it looks like vacuuming might
be the culprit.  So we add code to tell them to abort when a freeze is
going on.

CQ:S1018154 & S1018349
Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
include/ctdb_private.h
server/ctdb_freeze.c
server/ctdb_vacuum.c

index b3e87e429296af651c955f8eaeee6965020d9f0b..41a761a5355fbea779675c63fc58d6ca8fbe7216 100644 (file)
@@ -467,6 +467,8 @@ struct ctdb_context {
 
        TALLOC_CTX *banning_ctx;
 
+       struct ctdb_vacuum_child_context *vacuumers;
+
        /* mapping from pid to ctdb_client * */
        struct ctdb_client_pid_list *client_pids;
 
@@ -1523,6 +1525,7 @@ int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval
 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply);
 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb);
 
+void ctdb_stop_vacuuming(struct ctdb_context *ctdb);
 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db);
 
 int32_t ctdb_control_enable_script(struct ctdb_context *ctdb, TDB_DATA indata);
index bf4c83e88b191f7d65ba388f2c2ff4a997768fd4..0dc86a7acba561b6894a0339a115c91c5b45e896 100644 (file)
@@ -273,6 +273,9 @@ int ctdb_start_freeze(struct ctdb_context *ctdb, uint32_t priority)
                return 0;
        }
 
+       /* Stop any vacuuming going on: we don't want to wait. */
+       ctdb_stop_vacuuming(ctdb);
+
        /* if there isn't a freeze lock child then create one */
        if (ctdb->freeze_handles[priority] == NULL) {
                ctdb->freeze_handles[priority] = ctdb_freeze_lock(ctdb, priority);
index 6ff8eb1ae1d58678d40525d94b2d66427c756201..72d86555e401a1ec6d188d1a19d99caa61a9ffe9 100644 (file)
 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
 
 struct ctdb_vacuum_child_context {
+       struct ctdb_vacuum_child_context *next, *prev;
        struct ctdb_vacuum_handle *vacuum_handle;
+       /* fd child writes status to */
        int fd[2];
+       /* fd to abort vacuuming. */
+       int abortfd[2];
        pid_t child_pid;
        enum vacuum_child_status status;
        struct timeval start_time;
@@ -65,6 +69,8 @@ struct vacuum_data {
        uint32_t total;
        uint32_t vacuumed;
        uint32_t copied;
+       int abortfd;
+       bool abort;
 };
 
 /* tuning information stored for every db */
@@ -105,6 +111,15 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
        struct ctdb_ltdb_header *hdr;
        struct ctdb_rec_data *rec;
        size_t old_size;
+       char c;
+
+       /* Should we abort? */
+       if (read(vdata->abortfd, &c, 1) == 1) {
+               DEBUG(DEBUG_INFO, ("Abort during vacuum_traverse for %s\n",
+                                  ctdb_db->db_name));
+               vdata->abort = true;
+               return -1;
+       }
               
        lmaster = ctdb_lmaster(ctdb, &key);
        if (lmaster >= ctdb->vnn_map->size) {
@@ -258,7 +273,10 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
                return -1;              
        }
-
+       if (vdata->abort) {
+               DEBUG(DEBUG_INFO,("Traverse aborted vacuuming '%s'\n", name));
+               return -1;
+       }
        for ( i = 0; i < ctdb->vnn_map->size; i++) {
                if (vdata->list[i]->count == 0) {
                        continue;
@@ -317,12 +335,19 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                for (i = 0; i < ctdb->vnn_map->size; i++) {
                        struct ctdb_marshall_buffer *records;
                        struct ctdb_rec_data *rec;
+                       char c;
 
                        if (ctdb->vnn_map->map[i] == ctdb->pnn) {
                                /* we dont delete the records on the local node just yet */
                                continue;
                        }
 
+                       /* Should we abort? */
+                       if (read(vdata->abortfd, &c, 1) == 1) {
+                               DEBUG(DEBUG_INFO,("Aborted vacuuming '%s'\n", name));
+                               return -1;
+                       }
+
                        ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
                                        CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
                                        indata, recs, &outdata, &res,
@@ -610,7 +635,7 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
  * repack and vaccum a db
  * called from the child context
  */
-static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
+static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, int abortfd, TALLOC_CTX *mem_ctx)
 {
        uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
        uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
@@ -634,6 +659,7 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
        vdata->vacuum_limit = vacuum_limit;
        vdata->repack_limit = repack_limit;
        vdata->delete_tree = trbt_create(vdata, 0);
+       vdata->abortfd = abortfd;
        if (vdata->delete_tree == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                talloc_free(vdata);
@@ -652,7 +678,7 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
        /*
         * decide if a repack is necessary
         */
-       if (size < repack_limit && vdata->delete_count < vacuum_limit) {
+       if (vdata->abort || (size < repack_limit && vdata->delete_count < vacuum_limit)) {
                update_tuning_db(ctdb_db, vdata, size);
                talloc_free(vdata);
                return 0;
@@ -743,6 +769,8 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
                kill(child_ctx->child_pid, SIGKILL);
        }
 
+       DLIST_REMOVE(ctdb->vacuumers, child_ctx);
+
        event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
                        ctdb_vacuum_event, child_ctx->vacuum_handle);
@@ -830,10 +858,22 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                return;
        }
 
+       ret = pipe(child_ctx->abortfd);
+       if (ret != 0) {
+               close(child_ctx->fd[0]);
+               close(child_ctx->fd[1]);
+               talloc_free(child_ctx);
+               DEBUG(DEBUG_ERR, ("Failed to create abort pipe for vacuum child process.\n"));
+               event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
+               return;
+       }
+
        child_ctx->child_pid = fork();
        if (child_ctx->child_pid == (pid_t)-1) {
                close(child_ctx->fd[0]);
                close(child_ctx->fd[1]);
+               close(child_ctx->abortfd[0]);
+               close(child_ctx->abortfd[1]);
                talloc_free(child_ctx);
                DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
                event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
@@ -844,6 +884,8 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        if (child_ctx->child_pid == 0) {
                char cc = 0;
                close(child_ctx->fd[0]);
+               close(child_ctx->abortfd[1]);
+               set_nonblocking(child_ctx->abortfd[0]);
 
                DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
        
@@ -855,7 +897,7 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                /* 
                 * repack the db
                 */
-               cc = ctdb_repack_db(ctdb_db, child_ctx);
+               cc = ctdb_repack_db(ctdb_db, child_ctx->abortfd[0], child_ctx);
 
                write(child_ctx->fd[1], &cc, 1);
                _exit(0);
@@ -863,10 +905,14 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
 
        set_close_on_exec(child_ctx->fd[0]);
        close(child_ctx->fd[1]);
+       close(child_ctx->abortfd[0]);
+       set_close_on_exec(child_ctx->abortfd[1]);
+       set_nonblocking(child_ctx->abortfd[1]);
 
        child_ctx->status = VACUUM_RUNNING;
        child_ctx->start_time = timeval_current();
 
+       DLIST_ADD(ctdb->vacuumers, child_ctx);
        talloc_set_destructor(child_ctx, vacuum_child_destructor);
 
        event_add_timed(ctdb->ev, child_ctx,
@@ -884,6 +930,20 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        child_ctx->vacuum_handle = vacuum_handle;
 }
 
+void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
+{
+       struct ctdb_vacuum_child_context *i;
+       char c = 1;
+
+       /* FIXME: We don't just free them, since current TDB is not robust
+        * against death during transaction commit. */
+       for (i = ctdb->vacuumers; i; i = i->next) {
+               DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%p)\n",
+                                  i->vacuum_handle->ctdb_db->db_name,
+                                  i->child_pid));
+               write(i->abortfd[1], &c, 1);
+       }
+}
 
 /* this function initializes the vacuuming context for a database
  * starts the vacuuming events