enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
struct ctdb_vacuum_child_context {
+ struct ctdb_vacuum_child_context *next, *prev;
struct ctdb_vacuum_handle *vacuum_handle;
+ /* fd child writes status to */
int fd[2];
+ /* fd to abort vacuuming. */
+ int abortfd[2];
pid_t child_pid;
enum vacuum_child_status status;
struct timeval start_time;
uint32_t total;
uint32_t vacuumed;
uint32_t copied;
+ int abortfd;
+ bool abort;
};
/* tuning information stored for every db */
struct ctdb_ltdb_header *hdr;
struct ctdb_rec_data *rec;
size_t old_size;
+ char c;
+
+ /* Should we abort? */
+ if (read(vdata->abortfd, &c, 1) == 1) {
+ DEBUG(DEBUG_INFO, ("Abort during vacuum_traverse for %s\n",
+ ctdb_db->db_name));
+ vdata->abort = true;
+ return -1;
+ }
lmaster = ctdb_lmaster(ctdb, &key);
if (lmaster >= ctdb->vnn_map->size) {
DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
return -1;
}
-
+ if (vdata->abort) {
+ DEBUG(DEBUG_INFO,("Traverse aborted vacuuming '%s'\n", name));
+ return -1;
+ }
for ( i = 0; i < ctdb->vnn_map->size; i++) {
if (vdata->list[i]->count == 0) {
continue;
for (i = 0; i < ctdb->vnn_map->size; i++) {
struct ctdb_marshall_buffer *records;
struct ctdb_rec_data *rec;
+ char c;
if (ctdb->vnn_map->map[i] == ctdb->pnn) {
/* we dont delete the records on the local node just yet */
continue;
}
+ /* Should we abort? */
+ if (read(vdata->abortfd, &c, 1) == 1) {
+ DEBUG(DEBUG_INFO,("Aborted vacuuming '%s'\n", name));
+ return -1;
+ }
+
ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
indata, recs, &outdata, &res,
* repack and vaccum a db
* called from the child context
*/
-static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
+static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, int abortfd, TALLOC_CTX *mem_ctx)
{
uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
vdata->vacuum_limit = vacuum_limit;
vdata->repack_limit = repack_limit;
vdata->delete_tree = trbt_create(vdata, 0);
+ vdata->abortfd = abortfd;
if (vdata->delete_tree == NULL) {
DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
talloc_free(vdata);
/*
* decide if a repack is necessary
*/
- if (size < repack_limit && vdata->delete_count < vacuum_limit) {
+ if (vdata->abort || (size < repack_limit && vdata->delete_count < vacuum_limit)) {
update_tuning_db(ctdb_db, vdata, size);
talloc_free(vdata);
return 0;
kill(child_ctx->child_pid, SIGKILL);
}
+ DLIST_REMOVE(ctdb->vacuumers, child_ctx);
+
event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
ctdb_vacuum_event, child_ctx->vacuum_handle);
return;
}
+ ret = pipe(child_ctx->abortfd);
+ if (ret != 0) {
+ close(child_ctx->fd[0]);
+ close(child_ctx->fd[1]);
+ talloc_free(child_ctx);
+ DEBUG(DEBUG_ERR, ("Failed to create abort pipe for vacuum child process.\n"));
+ event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
+ return;
+ }
+
child_ctx->child_pid = fork();
if (child_ctx->child_pid == (pid_t)-1) {
close(child_ctx->fd[0]);
close(child_ctx->fd[1]);
+ close(child_ctx->abortfd[0]);
+ close(child_ctx->abortfd[1]);
talloc_free(child_ctx);
DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
if (child_ctx->child_pid == 0) {
char cc = 0;
close(child_ctx->fd[0]);
+ close(child_ctx->abortfd[1]);
+ set_nonblocking(child_ctx->abortfd[0]);
DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
/*
* repack the db
*/
- cc = ctdb_repack_db(ctdb_db, child_ctx);
+ cc = ctdb_repack_db(ctdb_db, child_ctx->abortfd[0], child_ctx);
write(child_ctx->fd[1], &cc, 1);
_exit(0);
set_close_on_exec(child_ctx->fd[0]);
close(child_ctx->fd[1]);
+ close(child_ctx->abortfd[0]);
+ set_close_on_exec(child_ctx->abortfd[1]);
+ set_nonblocking(child_ctx->abortfd[1]);
child_ctx->status = VACUUM_RUNNING;
child_ctx->start_time = timeval_current();
+ DLIST_ADD(ctdb->vacuumers, child_ctx);
talloc_set_destructor(child_ctx, vacuum_child_destructor);
event_add_timed(ctdb->ev, child_ctx,
child_ctx->vacuum_handle = vacuum_handle;
}
+void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
+{
+ struct ctdb_vacuum_child_context *i;
+ char c = 1;
+
+ /* FIXME: We don't just free them, since current TDB is not robust
+ * against death during transaction commit. */
+ for (i = ctdb->vacuumers; i; i = i->next) {
+ DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%p)\n",
+ i->vacuum_handle->ctdb_db->db_name,
+ i->child_pid));
+ write(i->abortfd[1], &c, 1);
+ }
+}
/* this function initializes the vacuuming context for a database
* starts the vacuuming events