vacuum event framework
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Thu, 23 Jul 2009 06:03:39 +0000 (16:03 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 2 Oct 2009 00:00:36 +0000 (10:00 +1000)
Signed-off-by: Ronnie Sahlberg <ronniesahlberg@gmail.com>
Signed-off-by: Wolfgang Mueller-Friedt <wolfmuel@de.ibm.com>
Makefile.in
server/ctdb_ltdb_server.c
server/ctdb_tunables.c
server/ctdb_vacuum.c [new file with mode: 0644]

index 9fd020ec73d7819e97d04c2c2372bde802c7fbb3..7e1ee967433d1afbbadd40064afba4fd27ba4c44 100755 (executable)
@@ -56,6 +56,7 @@ CTDB_SERVER_OBJ = server/ctdbd.o server/ctdb_daemon.o server/ctdb_lockwait.o \
        server/ctdb_traverse.o server/eventscript.o server/ctdb_takeover.o \
        server/ctdb_serverids.o server/ctdb_persistent.o \
        server/ctdb_keepalive.o server/ctdb_logging.o server/ctdb_uptime.c \
+       server/ctdb_vacuum.c \
        $(CTDB_CLIENT_OBJ) $(CTDB_TCP_OBJ) @INFINIBAND_WRAPPER_OBJ@
 
 TEST_BINS=tests/bin/ctdb_bench tests/bin/ctdb_fetch tests/bin/ctdb_store \
index b330768dcb59df4b73d5ec57ac27841bc29d6ae4..e76a50a552b6cf35ee65b1c65250fd6d4f73c4bc 100644 (file)
@@ -291,6 +291,14 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, boo
                return -1;
        }
 
+       ret = ctdb_vacuum_init(ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for database '%s'\n", ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
+
        DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
        
        /* success */
index 6a8876fc2b7ee0eede30d3e7bbcb5410acad91c4..456c9746085b74be4f19c6dafc387e996d998b9f 100644 (file)
@@ -56,6 +56,8 @@ static const struct {
        { "RecLockLatencyMs",  1000,  offsetof(struct ctdb_tunable, reclock_latency_ms) },
        { "RecoveryDropAllIPs",  60,  offsetof(struct ctdb_tunable, recovery_drop_all_ips) },
        { "VerifyRecoveryLock",   1,  offsetof(struct ctdb_tunable, verify_recovery_lock) },
+       { "VacuumDefaultInterval", 10,  offsetof(struct ctdb_tunable, vacuum_default_interval) },
+       { "VacuumMaxRunTime",      5,  offsetof(struct ctdb_tunable, vacuum_max_run_time) },
 };
 
 /*
diff --git a/server/ctdb_vacuum.c b/server/ctdb_vacuum.c
new file mode 100644 (file)
index 0000000..8c9cd8c
--- /dev/null
@@ -0,0 +1,210 @@
+/*
+   ctdb vacuuming events
+
+   Copyright (C) Ronnie Sahlberg  2009
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "system/dir.h"
+#include "../include/ctdb_private.h"
+#include "db_wrap.h"
+#include "lib/util/dlinklist.h"
+#include "lib/events/events.h"
+#include "../include/ctdb_private.h"
+
+
+enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
+
+struct ctdb_vacuum_child_context {
+       struct ctdb_vacuum_handle *vacuum_handle;
+       int fd[2];
+       pid_t child_pid;
+       enum vacuum_child_status status;
+       struct timeval start_time;
+};
+
+struct ctdb_vacuum_handle {
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_vacuum_child_context *child_ctx;
+};
+
+static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
+
+static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
+{
+       double l = timeval_elapsed(&child_ctx->start_time);
+       struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+
+       DEBUG(DEBUG_ERR,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
+
+       if (child_ctx->child_pid != -1) {
+               kill(child_ctx->child_pid, SIGKILL);
+       }
+
+       /* here calculate a new interval */
+       /* child_ctx->status */
+
+       DEBUG(DEBUG_ERR, ("Start new vacuum event for %s\n", ctdb_db->db_name));
+
+       event_add_timed(ctdb->ev, child_ctx->vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, child_ctx->vacuum_handle);
+
+       return 0;
+}
+
+/*
+ * this event is generated when a vacuum child process times out
+ */
+static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
+                                        struct timeval t, void *private_data)
+{
+       struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
+
+       DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
+
+       child_ctx->status = VACUUM_TIMEOUT;
+
+       talloc_free(child_ctx);
+}
+
+
+/*
+ * this event is generated when a vacuum child process has completed
+ */
+static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
+                            uint16_t flags, void *private_data)
+{
+       struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
+       char c = 0;
+       int ret;
+
+       DEBUG(DEBUG_ERR,("Vacuuming child finished for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
+
+       child_ctx->child_pid = -1;
+
+       ret = read(child_ctx->fd[0], &c, 1);
+       if (ret != 1 || c != 0) {
+               child_ctx->status = VACUUM_ERROR;
+               DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
+       } else {
+               child_ctx->status = VACUUM_OK;
+       }
+
+       talloc_free(child_ctx);
+}
+
+/*
+ * this event is called every time we need to start a new vacuum process
+ */
+static void
+ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
+                              struct timeval t, void *private_data)
+{
+       struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
+       struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+       struct ctdb_vacuum_child_context *child_ctx;
+       int ret;
+
+       DEBUG(DEBUG_ERR,("Start a vacuuming child process for db %s\n", ctdb_db->db_name));
+
+       /* we dont vacuum if we are in recovery mode */
+       if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+               event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
+               return;
+       }
+
+
+       child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
+       if (child_ctx == NULL) {
+               DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
+               ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
+       }
+
+
+       ret = pipe(child_ctx->fd);
+       if (ret != 0) {
+               talloc_free(child_ctx);
+               DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
+               event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
+               return;
+       }
+
+       child_ctx->child_pid = fork();
+       if (child_ctx->child_pid == (pid_t)-1) {
+               close(child_ctx->fd[0]);
+               close(child_ctx->fd[1]);
+               talloc_free(child_ctx);
+               DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
+               event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
+               return;
+       }
+
+
+       if (child_ctx->child_pid == 0) {
+               char cc = 0;
+               close(child_ctx->fd[0]);
+
+               DEBUG(DEBUG_ERR,("Child process doing vacuuming stuff on db %s\n", ctdb_db->db_name));
+
+               write(child_ctx->fd[1], &cc, 1);
+               _exit(0);
+       }
+
+       set_close_on_exec(child_ctx->fd[0]);
+       close(child_ctx->fd[1]);
+
+       child_ctx->status = VACUUM_RUNNING;
+       child_ctx->start_time = timeval_current();
+
+       talloc_set_destructor(child_ctx, vacuum_child_destructor);
+
+       event_add_timed(ctdb->ev, child_ctx,
+               timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
+               vacuum_child_timeout, child_ctx);
+
+       event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
+               EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
+               vacuum_child_handler,
+               child_ctx);
+
+       vacuum_handle->child_ctx = child_ctx;
+       child_ctx->vacuum_handle = vacuum_handle;
+}
+
+
+/* this function initializes the vacuuming context for a database
+ * starts the vacuuming events
+ */
+int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
+{
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+
+       DEBUG(DEBUG_ERR,("Start vacuuming process for database %s\n", ctdb_db->db_name));
+
+       ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
+       CTDB_NO_MEMORY(ctdb, ctdb_db->vacuum_handle);
+
+       ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
+
+       event_add_timed(ctdb->ev, ctdb_db->vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, ctdb_db->vacuum_handle);
+
+       return 0;
+}