4 Copyright (C) Ronnie Sahlberg 2009
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/events/events.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
38 struct ctdb_vacuum_child_context {
39 struct ctdb_vacuum_child_context *next, *prev;
40 struct ctdb_vacuum_handle *vacuum_handle;
41 /* fd child writes status to */
43 /* fd to abort vacuuming. */
46 enum vacuum_child_status status;
47 struct timeval start_time;
50 struct ctdb_vacuum_handle {
51 struct ctdb_db_context *ctdb_db;
52 struct ctdb_vacuum_child_context *child_ctx;
56 /* a list of records to possibly delete */
58 uint32_t vacuum_limit;
59 uint32_t repack_limit;
60 struct ctdb_context *ctdb;
61 struct ctdb_db_context *ctdb_db;
62 struct tdb_context *dest_db;
63 trbt_tree_t *delete_tree;
64 uint32_t delete_count;
65 struct ctdb_marshall_buffer **list;
76 /* tuning information stored for every db */
77 struct vacuum_tuning_data {
78 uint32_t last_num_repack;
79 uint32_t last_num_empty;
80 uint32_t last_interval;
81 uint32_t new_interval;
82 struct timeval last_start;
86 /* this structure contains the information for one record to be deleted */
87 struct delete_record_data {
88 struct ctdb_context *ctdb;
89 struct ctdb_db_context *ctdb_db;
90 struct ctdb_ltdb_header hdr;
94 struct delete_records_list {
95 struct ctdb_marshall_buffer *records;
98 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
99 struct timeval t, void *private_data);
103 * traverse function for gathering the records that can be deleted
105 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
107 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
108 struct ctdb_context *ctdb = vdata->ctdb;
109 struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
111 struct ctdb_ltdb_header *hdr;
112 struct ctdb_rec_data *rec;
116 /* Should we abort? */
117 if (read(vdata->abortfd, &c, 1) == 1) {
118 DEBUG(DEBUG_INFO, ("Abort during vacuum_traverse for %s\n",
124 lmaster = ctdb_lmaster(ctdb, &key);
125 if (lmaster >= ctdb->vnn_map->size) {
129 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
130 /* its not a deleted record */
134 hdr = (struct ctdb_ltdb_header *)data.dptr;
136 if (hdr->dmaster != ctdb->pnn) {
140 /* is this a records we could possibly delete? I.e.
141 if the record is empty and also we are both lmaster
142 and dmaster for the record we should be able to delete it
144 if (lmaster == ctdb->pnn) {
147 hash = ctdb_hash(&key);
148 if (trbt_lookup32(vdata->delete_tree, hash)) {
149 DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
152 struct delete_record_data *dd;
154 /* store key and header indexed by the key hash */
155 dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
157 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
161 dd->ctdb_db = ctdb_db;
162 dd->key.dsize = key.dsize;
163 dd->key.dptr = talloc_memdup(dd, key.dptr, key.dsize);
164 if (dd->key.dptr == NULL) {
165 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
171 trbt_insert32(vdata->delete_tree, hash, dd);
173 vdata->delete_count++;
177 /* add the record to the blob ready to send to the nodes */
178 rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
180 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
181 vdata->traverse_error = true;
184 old_size = talloc_get_size(vdata->list[lmaster]);
185 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
186 old_size + rec->length);
187 if (vdata->list[lmaster] == NULL) {
188 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
189 vdata->traverse_error = true;
192 vdata->list[lmaster]->count++;
193 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
202 * traverse the tree of records to delete and marshall them into
205 static void delete_traverse(void *param, void *data)
207 struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
208 struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
209 struct ctdb_rec_data *rec;
212 rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
214 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
218 old_size = talloc_get_size(recs->records);
219 recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
220 if (recs->records == NULL) {
221 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
224 recs->records->count++;
225 memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
229 * read-only traverse the database in order to find
230 * records that can be deleted and try to delete these
231 * records on the other nodes
232 * this executes in the child context
234 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
236 struct ctdb_context *ctdb = ctdb_db->ctdb;
237 const char *name = ctdb_db->db_name;
240 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
242 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
246 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
248 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
253 /* the list needs to be of length num_nodes */
254 vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
255 if (vdata->list == NULL) {
256 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
259 for (i = 0; i < ctdb->vnn_map->size; i++) {
260 vdata->list[i] = (struct ctdb_marshall_buffer *)
261 talloc_zero_size(vdata->list,
262 offsetof(struct ctdb_marshall_buffer, data));
263 if (vdata->list[i] == NULL) {
264 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
267 vdata->list[i]->db_id = ctdb_db->db_id;
270 /* read-only traverse, looking for records that might be able to be vacuumed */
271 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
272 vdata->traverse_error) {
273 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
277 DEBUG(DEBUG_INFO,("Traverse aborted vacuuming '%s'\n", name));
280 for ( i = 0; i < ctdb->vnn_map->size; i++) {
281 if (vdata->list[i]->count == 0) {
285 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
286 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
288 DEBUG(DEBUG_INFO,("Found %u records for lmaster %u in '%s'\n",
289 vdata->list[i]->count, i, name));
291 data.dsize = talloc_get_size(vdata->list[i]);
292 data.dptr = (void *)vdata->list[i];
293 if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
294 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
295 ctdb->vnn_map->map[i]));
302 /* Process all records we can delete (if any) */
303 if (vdata->delete_count > 0) {
304 struct delete_records_list *recs;
305 TDB_DATA indata, outdata;
308 recs = talloc_zero(vdata, struct delete_records_list);
310 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
313 recs->records = (struct ctdb_marshall_buffer *)
314 talloc_zero_size(vdata,
315 offsetof(struct ctdb_marshall_buffer, data));
316 if (recs->records == NULL) {
317 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
320 recs->records->db_id = ctdb_db->db_id;
323 * traverse the tree of all records we want to delete and
324 * create a blob we can send to the other nodes.
326 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
328 indata.dsize = talloc_get_size(recs->records);
329 indata.dptr = (void *)recs->records;
332 * now tell all the other nodes to delete all these records
335 for (i = 0; i < ctdb->vnn_map->size; i++) {
336 struct ctdb_marshall_buffer *records;
337 struct ctdb_rec_data *rec;
340 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
341 /* we dont delete the records on the local node just yet */
345 /* Should we abort? */
346 if (read(vdata->abortfd, &c, 1) == 1) {
347 DEBUG(DEBUG_INFO,("Aborted vacuuming '%s'\n", name));
351 ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
352 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
353 indata, recs, &outdata, &res,
355 if (ret != 0 || res != 0) {
356 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
361 * outdata countains the list of records coming back
362 * from the node which the node could not delete
364 records = (struct ctdb_marshall_buffer *)outdata.dptr;
365 rec = (struct ctdb_rec_data *)&records->data[0];
366 while (records->count-- > 1) {
367 TDB_DATA reckey, recdata;
368 struct ctdb_ltdb_header *rechdr;
370 reckey.dptr = &rec->data[0];
371 reckey.dsize = rec->keylen;
372 recdata.dptr = &rec->data[reckey.dsize];
373 recdata.dsize = rec->datalen;
375 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
376 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
379 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
380 recdata.dptr += sizeof(*rechdr);
381 recdata.dsize -= sizeof(*rechdr);
384 * that other node couldnt delete the record
385 * so we should delete it and thereby remove it from the tree
387 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
389 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
394 * The only records remaining in the tree would be those
395 * records where all other nodes could successfully
396 * delete them, so we can safely delete them on the
397 * lmaster as well. Deletion implictely happens while
398 * we repack the database. The repack algorithm revisits
399 * the tree in order to find the records that don't need
400 * to be copied / repacked.
404 /* this ensures we run our event queue */
405 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
412 * traverse function for repacking
414 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
416 struct vacuum_data *vdata = (struct vacuum_data *)private;
419 uint32_t hash = ctdb_hash(&key);
420 struct delete_record_data *kd;
422 * check if we can ignore this record because it's in the delete_tree
424 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
426 * there might be hash collisions so we have to compare the keys here to be sure
428 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
429 struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
431 * we have to check if the record hasn't changed in the meantime in order to
432 * savely remove it from the database
434 if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
435 hdr->dmaster == kd->ctdb->pnn &&
436 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
437 kd->hdr.rsn == hdr->rsn) {
443 if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
444 vdata->traverse_error = true;
454 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
456 struct tdb_context *tmp_db;
458 if (tdb_transaction_start(tdb) != 0) {
459 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
463 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
464 TDB_INTERNAL|TDB_DISALLOW_NESTING,
466 if (tmp_db == NULL) {
467 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
468 tdb_transaction_cancel(tdb);
472 vdata->traverse_error = false;
473 vdata->dest_db = tmp_db;
474 vdata->vacuum = true;
479 * repack and vacuum on-the-fly by not writing the records that are
482 if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
483 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
484 tdb_transaction_cancel(tdb);
489 DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
491 if (vdata->traverse_error) {
492 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
493 tdb_transaction_cancel(tdb);
498 if (tdb_wipe_all(tdb) != 0) {
499 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
500 tdb_transaction_cancel(tdb);
505 vdata->traverse_error = false;
506 vdata->dest_db = tdb;
507 vdata->vacuum = false;
510 if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
511 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
512 tdb_transaction_cancel(tdb);
517 if (vdata->traverse_error) {
518 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
519 tdb_transaction_cancel(tdb);
527 if (tdb_transaction_commit(tdb) != 0) {
528 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
531 DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
536 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
538 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
539 TDB_CONTEXT *tune_tdb;
541 struct vacuum_tuning_data tdata;
542 struct vacuum_tuning_data *tptr;
546 vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
547 ctdb_db->ctdb->db_directory_state,
548 TUNINGDBNAME, ctdb_db->ctdb->pnn);
549 if (vac_dbname == NULL) {
550 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
551 talloc_free(tmp_ctx);
555 flags = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
556 flags |= TDB_DISALLOW_NESTING;
557 tune_tdb = tdb_open(vac_dbname, 0,
559 O_RDWR|O_CREAT, 0600);
560 if (tune_tdb == NULL) {
561 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
562 talloc_free(tmp_ctx);
566 if (tdb_transaction_start(tune_tdb) != 0) {
567 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
571 key.dptr = discard_const(ctdb_db->db_name);
572 key.dsize = strlen(ctdb_db->db_name);
573 value = tdb_fetch(tune_tdb, key);
575 if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
576 tptr = (struct vacuum_tuning_data *)value.dptr;
580 * re-calc new vacuum interval:
581 * in case no limit was reached we continously increase the interval
582 * until vacuum_max_interval is reached
583 * in case a limit was reached we divide the current interval by 2
584 * unless vacuum_min_interval is reached
586 if (freelist < vdata->repack_limit &&
587 vdata->delete_count < vdata->vacuum_limit) {
588 if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
589 tdata.new_interval = tdata.last_interval * 110 / 100;
590 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n",
591 tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
594 tdata.new_interval = tdata.last_interval / 2;
595 if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
596 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
597 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
599 DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n",
600 tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
602 tdata.last_interval = tdata.new_interval;
604 DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
605 tdata.last_num_repack = freelist;
606 tdata.last_num_empty = vdata->delete_count;
607 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
610 if (value.dptr != NULL) {
614 tdata.last_start = vdata->start;
615 tdata.last_duration = timeval_elapsed(&vdata->start);
617 value.dptr = (unsigned char *)&tdata;
618 value.dsize = sizeof(tdata);
620 if (tdb_store(tune_tdb, key, value, 0) != 0) {
621 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
622 tdb_transaction_cancel(tune_tdb);
624 talloc_free(tmp_ctx);
627 tdb_transaction_commit(tune_tdb);
629 talloc_free(tmp_ctx);
635 * repack and vaccum a db
636 * called from the child context
638 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, int abortfd, TALLOC_CTX *mem_ctx)
640 uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
641 uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
642 const char *name = ctdb_db->db_name;
644 struct vacuum_data *vdata;
646 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
648 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
652 vdata = talloc_zero(mem_ctx, struct vacuum_data);
654 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
658 vdata->ctdb = ctdb_db->ctdb;
659 vdata->vacuum_limit = vacuum_limit;
660 vdata->repack_limit = repack_limit;
661 vdata->delete_tree = trbt_create(vdata, 0);
662 vdata->abortfd = abortfd;
663 vdata->ctdb_db = ctdb_db;
664 if (vdata->delete_tree == NULL) {
665 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
670 vdata->start = timeval_current();
673 * gather all records that can be deleted in vdata
675 if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
676 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
680 * decide if a repack is necessary
682 if (vdata->abort || (size < repack_limit && vdata->delete_count < vacuum_limit)) {
683 update_tuning_db(ctdb_db, vdata, size);
688 DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n",
689 name, size, vdata->delete_count));
692 * repack and implicitely get rid of the records we can delete
694 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
695 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
696 update_tuning_db(ctdb_db, vdata, size);
700 update_tuning_db(ctdb_db, vdata, size);
706 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
708 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
712 uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
713 struct ctdb_context *ctdb = ctdb_db->ctdb;
716 vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
717 if (vac_dbname == NULL) {
718 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
719 talloc_free(tmp_ctx);
723 flags = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
724 flags |= TDB_DISALLOW_NESTING;
725 tdb = tdb_open(vac_dbname, 0,
727 O_RDWR|O_CREAT, 0600);
729 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
730 talloc_free(tmp_ctx);
734 key.dptr = discard_const(ctdb_db->db_name);
735 key.dsize = strlen(ctdb_db->db_name);
737 value = tdb_fetch(tdb, key);
739 if (value.dptr != NULL) {
740 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
741 struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
743 interval = tptr->new_interval;
745 if (interval < ctdb->tunable.vacuum_min_interval) {
746 interval = ctdb->tunable.vacuum_min_interval;
748 if (interval > ctdb->tunable.vacuum_max_interval) {
749 interval = ctdb->tunable.vacuum_max_interval;
756 talloc_free(tmp_ctx);
761 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
763 double l = timeval_elapsed(&child_ctx->start_time);
764 struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
765 struct ctdb_context *ctdb = ctdb_db->ctdb;
767 DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
769 if (child_ctx->child_pid != -1) {
770 kill(child_ctx->child_pid, SIGKILL);
773 DLIST_REMOVE(ctdb->vacuumers, child_ctx);
775 event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
776 timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
777 ctdb_vacuum_event, child_ctx->vacuum_handle);
783 * this event is generated when a vacuum child process times out
785 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
786 struct timeval t, void *private_data)
788 struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
790 DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
792 child_ctx->status = VACUUM_TIMEOUT;
794 talloc_free(child_ctx);
799 * this event is generated when a vacuum child process has completed
801 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
802 uint16_t flags, void *private_data)
804 struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
808 DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
809 child_ctx->child_pid = -1;
811 ret = read(child_ctx->fd[0], &c, 1);
812 if (ret != 1 || c != 0) {
813 child_ctx->status = VACUUM_ERROR;
814 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
816 child_ctx->status = VACUUM_OK;
819 talloc_free(child_ctx);
823 * this event is called every time we need to start a new vacuum process
826 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
827 struct timeval t, void *private_data)
829 struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
830 struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
831 struct ctdb_context *ctdb = ctdb_db->ctdb;
832 struct ctdb_vacuum_child_context *child_ctx;
835 /* we dont vacuum if we are in recovery mode, or db frozen */
836 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
837 ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
838 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
839 ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
840 : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
843 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
847 child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
848 if (child_ctx == NULL) {
849 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
850 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
854 ret = pipe(child_ctx->fd);
856 talloc_free(child_ctx);
857 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
858 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
862 ret = pipe(child_ctx->abortfd);
864 close(child_ctx->fd[0]);
865 close(child_ctx->fd[1]);
866 talloc_free(child_ctx);
867 DEBUG(DEBUG_ERR, ("Failed to create abort pipe for vacuum child process.\n"));
868 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
872 child_ctx->child_pid = fork();
873 if (child_ctx->child_pid == (pid_t)-1) {
874 close(child_ctx->fd[0]);
875 close(child_ctx->fd[1]);
876 close(child_ctx->abortfd[0]);
877 close(child_ctx->abortfd[1]);
878 talloc_free(child_ctx);
879 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
880 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
885 if (child_ctx->child_pid == 0) {
887 close(child_ctx->fd[0]);
888 close(child_ctx->abortfd[1]);
889 set_nonblocking(child_ctx->abortfd[0]);
891 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
893 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
894 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
901 cc = ctdb_repack_db(ctdb_db, child_ctx->abortfd[0], child_ctx);
903 write(child_ctx->fd[1], &cc, 1);
907 set_close_on_exec(child_ctx->fd[0]);
908 close(child_ctx->fd[1]);
909 close(child_ctx->abortfd[0]);
910 set_close_on_exec(child_ctx->abortfd[1]);
911 set_nonblocking(child_ctx->abortfd[1]);
913 child_ctx->status = VACUUM_RUNNING;
914 child_ctx->start_time = timeval_current();
916 DLIST_ADD(ctdb->vacuumers, child_ctx);
917 talloc_set_destructor(child_ctx, vacuum_child_destructor);
919 event_add_timed(ctdb->ev, child_ctx,
920 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
921 vacuum_child_timeout, child_ctx);
923 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
925 event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
926 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
927 vacuum_child_handler,
930 vacuum_handle->child_ctx = child_ctx;
931 child_ctx->vacuum_handle = vacuum_handle;
934 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
936 struct ctdb_vacuum_child_context *i;
939 /* FIXME: We don't just free them, since current TDB is not robust
940 * against death during transaction commit. */
941 for (i = ctdb->vacuumers; i; i = i->next) {
942 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
943 i->vacuum_handle->ctdb_db->db_name,
945 write(i->abortfd[1], &c, 1);
949 /* this function initializes the vacuuming context for a database
950 * starts the vacuuming events
952 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
954 if (ctdb_db->persistent != 0) {
955 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
959 ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
960 CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
962 ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
964 event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle,
965 timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
966 ctdb_vacuum_event, ctdb_db->vacuum_handle);