logging: give a unique logging name to each forked child.
[metze/ctdb/wip.git] / server / ctdb_vacuum.c
index b1927bab67f6b10b4fa79b10b71a95f43985953c..2f751b326d1eb5180b7de605334c9d45ba87076b 100644 (file)
@@ -131,7 +131,7 @@ static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 
                hash = ctdb_hash(&key);
                if (trbt_lookup32(vdata->delete_tree, hash)) {
-                       DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
+                       DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
                } 
                else {
                        struct delete_record_data *dd;
@@ -267,7 +267,7 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *v
                /* for records where we are not the lmaster, tell the lmaster to fetch the record */
                if (ctdb->vnn_map->map[i] != ctdb->pnn) {
                        TDB_DATA data;
-                       DEBUG(DEBUG_NOTICE,("Found %u records for lmaster %u in '%s'\n", 
+                       DEBUG(DEBUG_INFO,("Found %u records for lmaster %u in '%s'\n", 
                                                                vdata->list[i]->count, i, name));
 
                        data.dsize = talloc_get_size(vdata->list[i]);
@@ -401,8 +401,18 @@ static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
                 * there might be hash collisions so we have to compare the keys here to be sure
                 */
                if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
-                       vdata->vacuumed++;
-                       return 0;
+                       struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
+                       /*
+                        * we have to check if the record hasn't changed in the meantime in order to
+                        * savely remove it from the database
+                        */
+                       if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
+                               hdr->dmaster == kd->ctdb->pnn &&
+                               ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
+                               kd->hdr.rsn == hdr->rsn) {
+                               vdata->vacuumed++;
+                               return 0;
+                       }
                }
        }
        if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
@@ -425,7 +435,9 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                return -1;
        }
 
-       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
+       tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
+                         TDB_INTERNAL|TDB_DISALLOW_NESTING,
+                         O_RDWR|O_CREAT, 0);
        if (tmp_db == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
                tdb_transaction_cancel(tdb);
@@ -449,7 +461,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                return -1;              
        }
 
-       DEBUG(DEBUG_NOTICE,(__location__ " %u records vacuumed\n", vdata->vacuumed));
+       DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
        
        if (vdata->traverse_error) {
                DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
@@ -491,7 +503,7 @@ static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct
                DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
                return -1;
        }
-       DEBUG(DEBUG_NOTICE,(__location__ " %u records copied\n", vdata->copied));
+       DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
 
        return 0;
 }
@@ -504,17 +516,22 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
        struct vacuum_tuning_data tdata;
        struct vacuum_tuning_data *tptr;
        char *vac_dbname;
+       int flags;
 
        vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
-                                       ctdb_db->ctdb->db_directory, 
-                                       TUNINGDBNAME, ctdb_db->ctdb->pnn);
+                                    ctdb_db->ctdb->db_directory_state,
+                                    TUNINGDBNAME, ctdb_db->ctdb->pnn);
        if (vac_dbname == NULL) {
                DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
                talloc_free(tmp_ctx);
                return -1;
        }
 
-       tune_tdb = tdb_open(vac_dbname, 0, 0, O_RDWR|O_CREAT, 0600);
+       flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
+       flags |= TDB_DISALLOW_NESTING;
+       tune_tdb = tdb_open(vac_dbname, 0,
+                           flags,
+                           O_RDWR|O_CREAT, 0600);
        if (tune_tdb == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
                talloc_free(tmp_ctx);
@@ -545,7 +562,7 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
                    vdata->delete_count < vdata->vacuum_limit) {
                        if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
                                tdata.new_interval = tdata.last_interval * 110 / 100;
-                               DEBUG(DEBUG_NOTICE,("Increasing vacuum interval %u -> %u for %s\n", 
+                               DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
                                        tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
                        }
                } else {
@@ -554,7 +571,7 @@ static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data
                                tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
                                tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
                        }               
-                       DEBUG(DEBUG_ERR,("Decreasing vacuum interval %u -> %u for %s\n", 
+                       DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
                }
                tdata.last_interval = tdata.new_interval;
@@ -636,12 +653,12 @@ static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
         * decide if a repack is necessary
         */
        if (size < repack_limit && vdata->delete_count < vacuum_limit) {
-               talloc_free(vdata);
                update_tuning_db(ctdb_db, vdata, size);
+               talloc_free(vdata);
                return 0;
        }
 
-       DEBUG(DEBUG_NOTICE,("Repacking %s with %u freelist entries and %u records to delete\n", 
+       DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
                        name, size, vdata->delete_count));
 
        /*
@@ -667,6 +684,7 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
        char *vac_dbname;
        uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
+       int flags;
 
        vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
        if (vac_dbname == NULL) {
@@ -675,9 +693,13 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
                return interval;
        }
 
-       tdb = tdb_open(vac_dbname, 0, 0, O_RDONLY, 0600);
+       flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
+       flags |= TDB_DISALLOW_NESTING;
+       tdb = tdb_open(vac_dbname, 0,
+                      flags,
+                      O_RDWR|O_CREAT, 0600);
        if (!tdb) {
-               DEBUG(DEBUG_ERR,("Unable to open database %s using default interval\n", vac_dbname));
+               DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
                talloc_free(tmp_ctx);
                return interval;
        }
@@ -701,8 +723,6 @@ static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
                        }
                }
                free(value.dptr);
-
-               DEBUG(DEBUG_NOTICE,("Using new interval %u for database %s\n", interval, ctdb_db->db_name));
        }
        tdb_close(tdb);
 
@@ -717,7 +737,7 @@ static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
        struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
 
-       DEBUG(DEBUG_ERR,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
+       DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
 
        if (child_ctx->child_pid != -1) {
                kill(child_ctx->child_pid, SIGKILL);
@@ -756,7 +776,7 @@ static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
        char c = 0;
        int ret;
 
-       DEBUG(DEBUG_NOTICE,("Vacuuming child finished for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
+       DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
        child_ctx->child_pid = -1;
 
        ret = read(child_ctx->fd[0], &c, 1);
@@ -789,8 +809,6 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                return;
        }
 
-       DEBUG(DEBUG_NOTICE,("Start a vacuuming child process for db %s\n", ctdb_db->db_name));
-
        child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
        if (child_ctx == NULL) {
                DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
@@ -821,7 +839,9 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                char cc = 0;
                close(child_ctx->fd[0]);
 
-               if (switch_from_server_to_client(ctdb) != 0) {
+               DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
+       
+               if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
                        DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
                        _exit(1);
                }
@@ -847,7 +867,7 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
                timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
                vacuum_child_timeout, child_ctx);
 
-       DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
+       DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
 
        event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
                EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
@@ -864,6 +884,11 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
  */
 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
 {
+       if (ctdb_db->persistent != 0) {
+               DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
+               return 0;
+       }
+
        ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
        CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);