vacuum: fix crash on vacuum abort
[metze/ctdb/wip.git] / server / ctdb_vacuum.c
index b1927bab67f6b10b4fa79b10b71a95f43985953c..59ce0ec284350e36ae362d9047a019350b5dba15 100644 (file)
@@ -36,7 +36,9 @@
 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
 
 struct ctdb_vacuum_child_context {
+       struct ctdb_vacuum_child_context *next, *prev;
        struct ctdb_vacuum_handle *vacuum_handle;
+       /* fd child writes status to */
        int fd[2];
        pid_t child_pid;
        enum vacuum_child_status status;
@@ -131,7 +133,7 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
                hash = ctdb_hash(&key);
                if (trbt_lookup32(vdata->delete_tree, hash)) {
-                       DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
+                       DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
                } 
                else {
                        struct delete_record_data *dd;
@@ -267,12 +269,12 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                /* for records where we are not the lmaster, tell the lmaster to fetch the record */
                if (ctdb->vnn_map->map[i] != ctdb->pnn) {
                        TDB_DATA data;
-                       DEBUG(DEBUG_NOTICE,("Found %u records for lmaster %u in '%s'\n", 
+                       DEBUG(DEBUG_INFO,("Found %u records for lmaster %u in '%s'\n", 
                                                                vdata->list[i]->count, i, name));
 
                        data.dsize = talloc_get_size(vdata->list[i]);
                        data.dptr  = (void *)vdata->list[i];
-                       if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
+                       if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
                                DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
                                         ctdb->vnn_map->map[i]));
                                return -1;              
@@ -401,8 +403,18 @@ static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
                 * there might be hash collisions so we have to compare the keys here to be sure
                 */
                if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
-                       vdata->vacuumed++;
-                       return 0;
+                       struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
+                       /*
+                        * we have to check if the record hasn't changed in the meantime in order to
+                        * savely remove it from the database
+                        */
+                       if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
+                               hdr->dmaster == kd->ctdb->pnn &&
+                               ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
+                               kd->hdr.rsn == hdr->rsn) {
+                               vdata->vacuumed++;
+                               return 0;
+                       }
                }
        }
        if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
@@ -425,7 +437,9 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                return -1;
        }
 
-       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
+       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
+                         TDB_INTERNAL|TDB_DISALLOW_NESTING,
+                         O_RDWR|O_CREAT, 0);
        if (tmp_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
                tdb_transaction_cancel(tdb);
@@ -449,7 +463,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                return -1;              
        }
 
-       DEBUG(DEBUG_NOTICE,(__location__ " %u records vacuumed\n", vdata->vacuumed));
+       DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
        
        if (vdata->traverse_error) {
                DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
@@ -491,7 +505,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
                return -1;
        }
-       DEBUG(DEBUG_NOTICE,(__location__ " %u records copied\n", vdata->copied));
+       DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
 
        return 0;
 }
@@ -504,17 +518,22 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
        struct vacuum_tuning_data tdata;
        struct vacuum_tuning_data *tptr;
        char *vac_dbname;
+       int flags;
 
        vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
-                                       ctdb_db->ctdb->db_directory, 
-                                       TUNINGDBNAME, ctdb_db->ctdb->pnn);
+                                    ctdb_db->ctdb->db_directory_state,
+                                    TUNINGDBNAME, ctdb_db->ctdb->pnn);
        if (vac_dbname == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       tune_tdb = tdb_open(vac_dbname, 0, 0, O_RDWR|O_CREAT, 0600);
+       flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
+       flags |= TDB_DISALLOW_NESTING;
+       tune_tdb = tdb_open(vac_dbname, 0,
+                           flags,
+                           O_RDWR|O_CREAT, 0600);
        if (tune_tdb == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
                talloc_free(tmp_ctx);
@@ -545,7 +564,7 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
                    vdata->delete_count < vdata->vacuum_limit) {
                        if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
                                tdata.new_interval = tdata.last_interval * 110 / 100;
-                               DEBUG(DEBUG_NOTICE,("Increasing vacuum interval %u -> %u for %s\n", 
+                               DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
                                        tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
                        }
                } else {
@@ -554,7 +573,7 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
                                tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
                                tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
                        }               
-                       DEBUG(DEBUG_ERR,("Decreasing vacuum interval %u -> %u for %s\n", 
+                       DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
                }
                tdata.last_interval = tdata.new_interval;
@@ -617,6 +636,7 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
        vdata->vacuum_limit = vacuum_limit;
        vdata->repack_limit = repack_limit;
        vdata->delete_tree = trbt_create(vdata, 0);
+       vdata->ctdb_db = ctdb_db;
        if (vdata->delete_tree == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                talloc_free(vdata);
@@ -636,12 +656,12 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
         * decide if a repack is necessary
         */
        if (size < repack_limit && vdata->delete_count < vacuum_limit) {
-               talloc_free(vdata);
                update_tuning_db(ctdb_db, vdata, size);
+               talloc_free(vdata);
                return 0;
        }
 
-       DEBUG(DEBUG_NOTICE,("Repacking %s with %u freelist entries and %u records to delete\n", 
+       DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
                        name, size, vdata->delete_count));
 
        /*
@@ -667,6 +687,7 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
        char *vac_dbname;
        uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
+       int flags;
 
        vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
        if (vac_dbname == NULL) {
@@ -675,9 +696,13 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
                return interval;
        }
 
-       tdb = tdb_open(vac_dbname, 0, 0, O_RDONLY, 0600);
+       flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
+       flags |= TDB_DISALLOW_NESTING;
+       tdb = tdb_open(vac_dbname, 0,
+                      flags,
+                      O_RDWR|O_CREAT, 0600);
        if (!tdb) {
-               DEBUG(DEBUG_ERR,("Unable to open database %s using default interval\n", vac_dbname));
+               DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
                talloc_free(tmp_ctx);
                return interval;
        }
@@ -701,8 +726,6 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
                        }
                }
                free(value.dptr);
-
-               DEBUG(DEBUG_NOTICE,("Using new interval %u for database %s\n", interval, ctdb_db->db_name));
        }
        tdb_close(tdb);
 
@@ -717,12 +740,14 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
        struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
-       DEBUG(DEBUG_ERR,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
+       DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
 
        if (child_ctx->child_pid != -1) {
                kill(child_ctx->child_pid, SIGKILL);
        }
 
+       DLIST_REMOVE(ctdb->vacuumers, child_ctx);
+
        event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
                        ctdb_vacuum_event, child_ctx->vacuum_handle);
@@ -756,7 +781,7 @@ static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
        char c = 0;
        int ret;
 
-       DEBUG(DEBUG_NOTICE,("Vacuuming child finished for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
+       DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
        child_ctx->child_pid = -1;
 
        ret = read(child_ctx->fd[0], &c, 1);
@@ -789,8 +814,6 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                return;
        }
 
-       DEBUG(DEBUG_NOTICE,("Start a vacuuming child process for db %s\n", ctdb_db->db_name));
-
        child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
        if (child_ctx == NULL) {
                DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
@@ -821,6 +844,8 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                char cc = 0;
                close(child_ctx->fd[0]);
 
+               DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
+       
                if (switch_from_server_to_client(ctdb) != 0) {
                        DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
                        _exit(1);
@@ -841,13 +866,14 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        child_ctx->status = VACUUM_RUNNING;
        child_ctx->start_time = timeval_current();
 
+       DLIST_ADD(ctdb->vacuumers, child_ctx);
        talloc_set_destructor(child_ctx, vacuum_child_destructor);
 
        event_add_timed(ctdb->ev, child_ctx,
                timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
                vacuum_child_timeout, child_ctx);
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
 
        event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
                EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
@@ -858,12 +884,28 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        child_ctx->vacuum_handle = vacuum_handle;
 }
 
+void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
+{
+       /* Simply free them all. */
+       while (ctdb->vacuumers) {
+               DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
+                          ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
+                          (int)ctdb->vacuumers->child_pid));
+               /* vacuum_child_destructor kills it, removes from list */
+               talloc_free(ctdb->vacuumers);
+       }
+}
 
 /* this function initializes the vacuuming context for a database
  * starts the vacuuming events
  */
 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
 {
+       if (ctdb_db->persistent != 0) {
+               DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
+               return 0;
+       }
+
        ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
        CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);