vacuum: correctly send TRY_DELETE_RECORDS ctrl to all active nodes
[ctdb.git] / server / ctdb_vacuum.c
index 97a7dfbc012fc1faff5352af8367d4b69b65fd5a..2c643f72cc3e9e319dba02f2d4e69ced96d6c967 100644 (file)
@@ -18,7 +18,7 @@
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
@@ -26,7 +26,7 @@
 #include "../include/ctdb_private.h"
 #include "db_wrap.h"
 #include "lib/util/dlinklist.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "../include/ctdb_private.h"
 #include "../common/rb_tree.h"
 
@@ -36,7 +36,9 @@
 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
 
 struct ctdb_vacuum_child_context {
+       struct ctdb_vacuum_child_context *next, *prev;
        struct ctdb_vacuum_handle *vacuum_handle;
+       /* fd child writes status to */
        int fd[2];
        pid_t child_pid;
        enum vacuum_child_status status;
@@ -122,7 +124,7 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
                return 0;
        }
 
-       /* is this a records we could possibly delete? I.e.
+       /* Is this a record we could possibly delete? I.e.
           if the record is empty and also we are both lmaster
           and dmaster for the record we should be able to delete it
        */
@@ -236,12 +238,12 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
 
        ctdb->pnn = pnn;
        /* the list needs to be of length num_nodes */
-       vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
+       vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
        if (vdata->list == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                return -1;
        }
-       for (i = 0; i < ctdb->vnn_map->size; i++) {
+       for (i = 0; i < ctdb->num_nodes; i++) {
                vdata->list[i] = (struct ctdb_marshall_buffer *)
                        talloc_zero_size(vdata->list, 
                                                         offsetof(struct ctdb_marshall_buffer, data));
@@ -259,22 +261,24 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                return -1;              
        }
 
-       for ( i = 0; i < ctdb->vnn_map->size; i++) {
+       for (i = 0; i < ctdb->num_nodes; i++) {
                if (vdata->list[i]->count == 0) {
                        continue;
                }
 
                /* for records where we are not the lmaster, tell the lmaster to fetch the record */
-               if (ctdb->vnn_map->map[i] != ctdb->pnn) {
+               if (ctdb->nodes[i]->pnn != ctdb->pnn) {
                        TDB_DATA data;
-                       DEBUG(DEBUG_NOTICE,("Found %u records for lmaster %u in '%s'\n", 
-                                                               vdata->list[i]->count, i, name));
+                       DEBUG(DEBUG_INFO,
+                             ("Found %u records for lmaster %u in '%s'\n",
+                              vdata->list[i]->count, ctdb->nodes[i]->pnn,
+                              name));
 
                        data.dsize = talloc_get_size(vdata->list[i]);
                        data.dptr  = (void *)vdata->list[i];
-                       if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
+                       if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
                                DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
-                                        ctdb->vnn_map->map[i]));
+                                        ctdb->nodes[i]->pnn));
                                return -1;              
                        }
                        continue;
@@ -286,6 +290,9 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                struct delete_records_list *recs;
                TDB_DATA indata, outdata;
                int32_t res;
+               struct ctdb_node_map *nodemap;
+               uint32_t *active_nodes;
+               int num_active_nodes;
 
                recs = talloc_zero(vdata, struct delete_records_list);
                if (recs == NULL) {
@@ -311,24 +318,37 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                indata.dptr  = (void *)recs->records;
 
                /* 
-                * now tell all the other nodes to delete all these records
+                * now tell all the active nodes to delete all these records
                 * (if possible)
                 */
-               for (i = 0; i < ctdb->vnn_map->size; i++) {
+
+               ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
+                                          CTDB_CURRENT_NODE,
+                                          recs, /* talloc context */
+                                          &nodemap);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
+                       return -1;
+               }
+
+               active_nodes = list_of_active_nodes(ctdb, nodemap,
+                                                   nodemap, /* talloc context */
+                                                   false /* include self */);
+               /* yuck! ;-) */
+               num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
+
+               for (i = 0; i < num_active_nodes; i++) {
                        struct ctdb_marshall_buffer *records;
                        struct ctdb_rec_data *rec;
 
-                       if (ctdb->vnn_map->map[i] == ctdb->pnn) {
-                               /* we dont delete the records on the local node just yet */
-                               continue;
-                       }
-
-                       ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
+                       ret = ctdb_control(ctdb, active_nodes[i], 0,
                                        CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
                                        indata, recs, &outdata, &res,
                                        NULL, NULL);
                        if (ret != 0 || res != 0) {
-                               DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
+                               DEBUG(DEBUG_ERR, ("Failed to delete records on "
+                                                 "node %u: ret[%d] res[%d]\n",
+                                                 active_nodes[i], ret, res));
                                return -1;
                        }
 
@@ -365,6 +385,9 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                        }           
                }
 
+               /* free nodemap and active_nodes */
+               talloc_free(nodemap);
+
                /* 
                 * The only records remaining in the tree would be those
                 * records where all other nodes could successfully
@@ -435,7 +458,9 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                return -1;
        }
 
-       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
+       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
+                         TDB_INTERNAL|TDB_DISALLOW_NESTING,
+                         O_RDWR|O_CREAT, 0);
        if (tmp_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
                tdb_transaction_cancel(tdb);
@@ -459,7 +484,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                return -1;              
        }
 
-       DEBUG(DEBUG_NOTICE,(__location__ " %u records vacuumed\n", vdata->vacuumed));
+       DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
        
        if (vdata->traverse_error) {
                DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
@@ -501,7 +526,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
                return -1;
        }
-       DEBUG(DEBUG_NOTICE,(__location__ " %u records copied\n", vdata->copied));
+       DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
 
        return 0;
 }
@@ -514,17 +539,22 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
        struct vacuum_tuning_data tdata;
        struct vacuum_tuning_data *tptr;
        char *vac_dbname;
+       int flags;
 
        vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
-                                       ctdb_db->ctdb->db_directory, 
-                                       TUNINGDBNAME, ctdb_db->ctdb->pnn);
+                                    ctdb_db->ctdb->db_directory_state,
+                                    TUNINGDBNAME, ctdb_db->ctdb->pnn);
        if (vac_dbname == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       tune_tdb = tdb_open(vac_dbname, 0, 0, O_RDWR|O_CREAT, 0644);
+       flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
+       flags |= TDB_DISALLOW_NESTING;
+       tune_tdb = tdb_open(vac_dbname, 0,
+                           flags,
+                           O_RDWR|O_CREAT, 0600);
        if (tune_tdb == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
                talloc_free(tmp_ctx);
@@ -555,7 +585,7 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
                    vdata->delete_count < vdata->vacuum_limit) {
                        if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
                                tdata.new_interval = tdata.last_interval * 110 / 100;
-                               DEBUG(DEBUG_NOTICE,("Increasing vacuum interval %u -> %u for %s\n", 
+                               DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
                                        tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
                        }
                } else {
@@ -564,12 +594,12 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
                                tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
                                tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
                        }               
-                       DEBUG(DEBUG_ERR,("Decreasing vacuum interval %u -> %u for %s\n", 
+                       DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
                }
                tdata.last_interval = tdata.new_interval;
        } else {
-               DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
+               DEBUG(DEBUG_DEBUG,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
                tdata.last_num_repack = freelist;
                tdata.last_num_empty = vdata->delete_count;
                tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
@@ -627,6 +657,7 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
        vdata->vacuum_limit = vacuum_limit;
        vdata->repack_limit = repack_limit;
        vdata->delete_tree = trbt_create(vdata, 0);
+       vdata->ctdb_db = ctdb_db;
        if (vdata->delete_tree == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
                talloc_free(vdata);
@@ -651,7 +682,7 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
                return 0;
        }
 
-       DEBUG(DEBUG_NOTICE,("Repacking %s with %u freelist entries and %u records to delete\n", 
+       DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
                        name, size, vdata->delete_count));
 
        /*
@@ -677,6 +708,7 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
        char *vac_dbname;
        uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
+       int flags;
 
        vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
        if (vac_dbname == NULL) {
@@ -685,9 +717,13 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
                return interval;
        }
 
-       tdb = tdb_open(vac_dbname, 0, 0, O_RDWR|O_CREAT, 0644);
+       flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
+       flags |= TDB_DISALLOW_NESTING;
+       tdb = tdb_open(vac_dbname, 0,
+                      flags,
+                      O_RDWR|O_CREAT, 0600);
        if (!tdb) {
-               DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
+               DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval. Errno : %s (%d)\n", vac_dbname, strerror(errno), errno));
                talloc_free(tmp_ctx);
                return interval;
        }
@@ -731,6 +767,8 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
                kill(child_ctx->child_pid, SIGKILL);
        }
 
+       DLIST_REMOVE(ctdb->vacuumers, child_ctx);
+
        event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
                        timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
                        ctdb_vacuum_event, child_ctx->vacuum_handle);
@@ -789,10 +827,17 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        struct ctdb_vacuum_child_context *child_ctx;
+       struct tevent_fd *fde;
        int ret;
 
-       /* we dont vacuum if we are in recovery mode */
-       if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+       /* we dont vacuum if we are in recovery mode, or db frozen */
+       if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
+           ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
+               DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
+                                  ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
+                                  : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
+                                  ? "freeze pending"
+                                  : "frozen"));
                event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
                return;
        }
@@ -812,7 +857,7 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                return;
        }
 
-       child_ctx->child_pid = fork();
+       child_ctx->child_pid = ctdb_fork(ctdb);
        if (child_ctx->child_pid == (pid_t)-1) {
                close(child_ctx->fd[0]);
                close(child_ctx->fd[1]);
@@ -829,7 +874,7 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
 
                DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
        
-               if (switch_from_server_to_client(ctdb) != 0) {
+               if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
                        DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
                        _exit(1);
                }
@@ -849,23 +894,34 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
        child_ctx->status = VACUUM_RUNNING;
        child_ctx->start_time = timeval_current();
 
+       DLIST_ADD(ctdb->vacuumers, child_ctx);
        talloc_set_destructor(child_ctx, vacuum_child_destructor);
 
        event_add_timed(ctdb->ev, child_ctx,
                timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
                vacuum_child_timeout, child_ctx);
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
 
-       event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
-               EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
-               vacuum_child_handler,
-               child_ctx);
+       fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
+                          EVENT_FD_READ, vacuum_child_handler, child_ctx);
+       tevent_fd_set_auto_close(fde);
 
        vacuum_handle->child_ctx = child_ctx;
        child_ctx->vacuum_handle = vacuum_handle;
 }
 
+void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
+{
+       /* Simply free them all. */
+       while (ctdb->vacuumers) {
+               DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
+                          ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
+                          (int)ctdb->vacuumers->child_pid));
+               /* vacuum_child_destructor kills it, removes from list */
+               talloc_free(ctdb->vacuumers);
+       }
+}
 
 /* this function initializes the vacuuming context for a database
  * starts the vacuuming events