4 Copyright (C) Ronnie Sahlberg 2009
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/tevent/tevent.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
38 struct ctdb_vacuum_child_context {
39 struct ctdb_vacuum_child_context *next, *prev;
40 struct ctdb_vacuum_handle *vacuum_handle;
41 /* fd child writes status to */
44 enum vacuum_child_status status;
45 struct timeval start_time;
48 struct ctdb_vacuum_handle {
49 struct ctdb_db_context *ctdb_db;
50 struct ctdb_vacuum_child_context *child_ctx;
54 /* a list of records to possibly delete */
56 uint32_t vacuum_limit;
57 uint32_t repack_limit;
58 struct ctdb_context *ctdb;
59 struct ctdb_db_context *ctdb_db;
60 struct tdb_context *dest_db;
61 trbt_tree_t *delete_tree;
62 uint32_t delete_count;
63 struct ctdb_marshall_buffer **list;
72 /* tuning information stored for every db */
73 struct vacuum_tuning_data {
74 uint32_t last_num_repack;
75 uint32_t last_num_empty;
76 uint32_t last_interval;
77 uint32_t new_interval;
78 struct timeval last_start;
82 /* this structure contains the information for one record to be deleted */
83 struct delete_record_data {
84 struct ctdb_context *ctdb;
85 struct ctdb_db_context *ctdb_db;
86 struct ctdb_ltdb_header hdr;
90 struct delete_records_list {
91 struct ctdb_marshall_buffer *records;
94 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
95 struct timeval t, void *private_data);
99 * traverse function for gathering the records that can be deleted
101 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
103 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
104 struct ctdb_context *ctdb = vdata->ctdb;
105 struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
107 struct ctdb_ltdb_header *hdr;
108 struct ctdb_rec_data *rec;
111 lmaster = ctdb_lmaster(ctdb, &key);
112 if (lmaster >= ctdb->num_nodes) {
113 DEBUG(DEBUG_CRIT, (__location__
114 " lmaster[%u] >= ctdb->num_nodes[%u] for key"
117 (unsigned)ctdb->num_nodes,
118 (unsigned)ctdb_hash(&key)));
122 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
123 /* its not a deleted record */
127 hdr = (struct ctdb_ltdb_header *)data.dptr;
129 if (hdr->dmaster != ctdb->pnn) {
133 /* Is this a record we could possibly delete? I.e.
134 if the record is empty and also we are both lmaster
135 and dmaster for the record we should be able to delete it
137 if (lmaster == ctdb->pnn) {
140 hash = ctdb_hash(&key);
141 if (trbt_lookup32(vdata->delete_tree, hash)) {
142 DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
145 struct delete_record_data *dd;
147 /* store key and header indexed by the key hash */
148 dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
150 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
154 dd->ctdb_db = ctdb_db;
155 dd->key.dsize = key.dsize;
156 dd->key.dptr = talloc_memdup(dd, key.dptr, key.dsize);
157 if (dd->key.dptr == NULL) {
158 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
164 trbt_insert32(vdata->delete_tree, hash, dd);
166 vdata->delete_count++;
170 /* add the record to the blob ready to send to the nodes */
171 rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
173 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
174 vdata->traverse_error = true;
177 old_size = talloc_get_size(vdata->list[lmaster]);
178 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
179 old_size + rec->length);
180 if (vdata->list[lmaster] == NULL) {
181 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
182 vdata->traverse_error = true;
185 vdata->list[lmaster]->count++;
186 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
195 * traverse the tree of records to delete and marshall them into
198 static void delete_traverse(void *param, void *data)
200 struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
201 struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
202 struct ctdb_rec_data *rec;
205 rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
207 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
211 old_size = talloc_get_size(recs->records);
212 recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
213 if (recs->records == NULL) {
214 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
217 recs->records->count++;
218 memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
222 * read-only traverse the database in order to find
223 * records that can be deleted and try to delete these
224 * records on the other nodes
225 * this executes in the child context
227 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
229 struct ctdb_context *ctdb = ctdb_db->ctdb;
230 const char *name = ctdb_db->db_name;
233 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
235 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
239 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
241 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
246 /* the list needs to be of length num_nodes */
247 vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
248 if (vdata->list == NULL) {
249 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
252 for (i = 0; i < ctdb->num_nodes; i++) {
253 vdata->list[i] = (struct ctdb_marshall_buffer *)
254 talloc_zero_size(vdata->list,
255 offsetof(struct ctdb_marshall_buffer, data));
256 if (vdata->list[i] == NULL) {
257 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
260 vdata->list[i]->db_id = ctdb_db->db_id;
263 /* read-only traverse, looking for records that might be able to be vacuumed */
264 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
265 vdata->traverse_error) {
266 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
271 * For records where we are not the lmaster,
272 * tell the lmaster to fetch the record.
274 for (i = 0; i < ctdb->num_nodes; i++) {
277 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
281 if (vdata->list[i]->count == 0) {
285 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
286 vdata->list[i]->count, ctdb->nodes[i]->pnn,
289 data.dsize = talloc_get_size(vdata->list[i]);
290 data.dptr = (void *)vdata->list[i];
291 if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
292 DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
293 "fetch message to %u\n",
294 ctdb->nodes[i]->pnn));
299 /* Process all records we can delete (if any) */
300 if (vdata->delete_count > 0) {
301 struct delete_records_list *recs;
302 TDB_DATA indata, outdata;
304 struct ctdb_node_map *nodemap;
305 uint32_t *active_nodes;
306 int num_active_nodes;
308 recs = talloc_zero(vdata, struct delete_records_list);
310 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
313 recs->records = (struct ctdb_marshall_buffer *)
314 talloc_zero_size(vdata,
315 offsetof(struct ctdb_marshall_buffer, data));
316 if (recs->records == NULL) {
317 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
320 recs->records->db_id = ctdb_db->db_id;
323 * traverse the tree of all records we want to delete and
324 * create a blob we can send to the other nodes.
326 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
328 indata.dsize = talloc_get_size(recs->records);
329 indata.dptr = (void *)recs->records;
332 * now tell all the active nodes to delete all these records
336 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
338 recs, /* talloc context */
341 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
345 active_nodes = list_of_active_nodes(ctdb, nodemap,
346 nodemap, /* talloc context */
347 false /* include self */);
349 num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
351 for (i = 0; i < num_active_nodes; i++) {
352 struct ctdb_marshall_buffer *records;
353 struct ctdb_rec_data *rec;
355 ret = ctdb_control(ctdb, active_nodes[i], 0,
356 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
357 indata, recs, &outdata, &res,
359 if (ret != 0 || res != 0) {
360 DEBUG(DEBUG_ERR, ("Failed to delete records on "
361 "node %u: ret[%d] res[%d]\n",
362 active_nodes[i], ret, res));
367 * outdata countains the list of records coming back
368 * from the node which the node could not delete
370 records = (struct ctdb_marshall_buffer *)outdata.dptr;
371 rec = (struct ctdb_rec_data *)&records->data[0];
372 while (records->count-- > 1) {
373 TDB_DATA reckey, recdata;
374 struct ctdb_ltdb_header *rechdr;
376 reckey.dptr = &rec->data[0];
377 reckey.dsize = rec->keylen;
378 recdata.dptr = &rec->data[reckey.dsize];
379 recdata.dsize = rec->datalen;
381 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
382 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
385 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
386 recdata.dptr += sizeof(*rechdr);
387 recdata.dsize -= sizeof(*rechdr);
390 * that other node couldnt delete the record
391 * so we should delete it and thereby remove it from the tree
393 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
395 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
399 /* free nodemap and active_nodes */
400 talloc_free(nodemap);
403 * The only records remaining in the tree would be those
404 * records where all other nodes could successfully
405 * delete them, so we can safely delete them on the
406 * lmaster as well. Deletion implictely happens while
407 * we repack the database. The repack algorithm revisits
408 * the tree in order to find the records that don't need
409 * to be copied / repacked.
413 /* this ensures we run our event queue */
414 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
421 * traverse function for repacking
423 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
425 struct vacuum_data *vdata = (struct vacuum_data *)private;
428 uint32_t hash = ctdb_hash(&key);
429 struct delete_record_data *kd;
431 * check if we can ignore this record because it's in the delete_tree
433 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
435 * there might be hash collisions so we have to compare the keys here to be sure
437 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
438 struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
440 * we have to check if the record hasn't changed in the meantime in order to
441 * savely remove it from the database
443 if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
444 hdr->dmaster == kd->ctdb->pnn &&
445 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
446 kd->hdr.rsn == hdr->rsn) {
452 if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
453 vdata->traverse_error = true;
463 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
465 struct tdb_context *tmp_db;
467 if (tdb_transaction_start(tdb) != 0) {
468 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
472 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
473 TDB_INTERNAL|TDB_DISALLOW_NESTING,
475 if (tmp_db == NULL) {
476 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
477 tdb_transaction_cancel(tdb);
481 vdata->traverse_error = false;
482 vdata->dest_db = tmp_db;
483 vdata->vacuum = true;
488 * repack and vacuum on-the-fly by not writing the records that are
491 if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
492 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
493 tdb_transaction_cancel(tdb);
498 DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
500 if (vdata->traverse_error) {
501 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
502 tdb_transaction_cancel(tdb);
507 if (tdb_wipe_all(tdb) != 0) {
508 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
509 tdb_transaction_cancel(tdb);
514 vdata->traverse_error = false;
515 vdata->dest_db = tdb;
516 vdata->vacuum = false;
519 if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
520 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
521 tdb_transaction_cancel(tdb);
526 if (vdata->traverse_error) {
527 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
528 tdb_transaction_cancel(tdb);
536 if (tdb_transaction_commit(tdb) != 0) {
537 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
540 DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
545 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
547 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
548 TDB_CONTEXT *tune_tdb;
550 struct vacuum_tuning_data tdata;
551 struct vacuum_tuning_data *tptr;
555 vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
556 ctdb_db->ctdb->db_directory_state,
557 TUNINGDBNAME, ctdb_db->ctdb->pnn);
558 if (vac_dbname == NULL) {
559 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
560 talloc_free(tmp_ctx);
564 flags = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
565 flags |= TDB_DISALLOW_NESTING;
566 tune_tdb = tdb_open(vac_dbname, 0,
568 O_RDWR|O_CREAT, 0600);
569 if (tune_tdb == NULL) {
570 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
571 talloc_free(tmp_ctx);
575 if (tdb_transaction_start(tune_tdb) != 0) {
576 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
580 key.dptr = discard_const(ctdb_db->db_name);
581 key.dsize = strlen(ctdb_db->db_name);
582 value = tdb_fetch(tune_tdb, key);
584 if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
585 tptr = (struct vacuum_tuning_data *)value.dptr;
589 * re-calc new vacuum interval:
590 * in case no limit was reached we continously increase the interval
591 * until vacuum_max_interval is reached
592 * in case a limit was reached we divide the current interval by 2
593 * unless vacuum_min_interval is reached
595 if (freelist < vdata->repack_limit &&
596 vdata->delete_count < vdata->vacuum_limit) {
597 if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
598 tdata.new_interval = tdata.last_interval * 110 / 100;
599 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n",
600 tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
603 tdata.new_interval = tdata.last_interval / 2;
604 if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
605 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
606 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
608 DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n",
609 tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
611 tdata.last_interval = tdata.new_interval;
613 DEBUG(DEBUG_DEBUG,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
614 tdata.last_num_repack = freelist;
615 tdata.last_num_empty = vdata->delete_count;
616 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
619 if (value.dptr != NULL) {
623 tdata.last_start = vdata->start;
624 tdata.last_duration = timeval_elapsed(&vdata->start);
626 value.dptr = (unsigned char *)&tdata;
627 value.dsize = sizeof(tdata);
629 if (tdb_store(tune_tdb, key, value, 0) != 0) {
630 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
631 tdb_transaction_cancel(tune_tdb);
633 talloc_free(tmp_ctx);
636 tdb_transaction_commit(tune_tdb);
638 talloc_free(tmp_ctx);
644 * repack and vaccum a db
645 * called from the child context
647 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
650 uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
651 uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
652 const char *name = ctdb_db->db_name;
654 struct vacuum_data *vdata;
656 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
658 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
662 vdata = talloc_zero(mem_ctx, struct vacuum_data);
664 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
668 vdata->ctdb = ctdb_db->ctdb;
669 vdata->vacuum_limit = vacuum_limit;
670 vdata->repack_limit = repack_limit;
671 vdata->delete_tree = trbt_create(vdata, 0);
672 vdata->ctdb_db = ctdb_db;
673 if (vdata->delete_tree == NULL) {
674 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
679 vdata->start = timeval_current();
682 * gather all records that can be deleted in vdata
684 if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
685 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
689 * decide if a repack is necessary
691 if (size < repack_limit && vdata->delete_count < vacuum_limit) {
692 update_tuning_db(ctdb_db, vdata, size);
697 DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n",
698 name, size, vdata->delete_count));
701 * repack and implicitely get rid of the records we can delete
703 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
704 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
705 update_tuning_db(ctdb_db, vdata, size);
709 update_tuning_db(ctdb_db, vdata, size);
715 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
717 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
721 uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
722 struct ctdb_context *ctdb = ctdb_db->ctdb;
725 vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
726 if (vac_dbname == NULL) {
727 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
728 talloc_free(tmp_ctx);
732 flags = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
733 flags |= TDB_DISALLOW_NESTING;
734 tdb = tdb_open(vac_dbname, 0,
736 O_RDWR|O_CREAT, 0600);
738 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval. Errno : %s (%d)\n", vac_dbname, strerror(errno), errno));
739 talloc_free(tmp_ctx);
743 key.dptr = discard_const(ctdb_db->db_name);
744 key.dsize = strlen(ctdb_db->db_name);
746 value = tdb_fetch(tdb, key);
748 if (value.dptr != NULL) {
749 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
750 struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
752 interval = tptr->new_interval;
754 if (interval < ctdb->tunable.vacuum_min_interval) {
755 interval = ctdb->tunable.vacuum_min_interval;
757 if (interval > ctdb->tunable.vacuum_max_interval) {
758 interval = ctdb->tunable.vacuum_max_interval;
765 talloc_free(tmp_ctx);
770 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
772 double l = timeval_elapsed(&child_ctx->start_time);
773 struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
774 struct ctdb_context *ctdb = ctdb_db->ctdb;
776 DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
778 if (child_ctx->child_pid != -1) {
779 kill(child_ctx->child_pid, SIGKILL);
782 DLIST_REMOVE(ctdb->vacuumers, child_ctx);
784 event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
785 timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
786 ctdb_vacuum_event, child_ctx->vacuum_handle);
792 * this event is generated when a vacuum child process times out
794 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
795 struct timeval t, void *private_data)
797 struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
799 DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
801 child_ctx->status = VACUUM_TIMEOUT;
803 talloc_free(child_ctx);
808 * this event is generated when a vacuum child process has completed
810 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
811 uint16_t flags, void *private_data)
813 struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
817 DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
818 child_ctx->child_pid = -1;
820 ret = read(child_ctx->fd[0], &c, 1);
821 if (ret != 1 || c != 0) {
822 child_ctx->status = VACUUM_ERROR;
823 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
825 child_ctx->status = VACUUM_OK;
828 talloc_free(child_ctx);
832 * this event is called every time we need to start a new vacuum process
835 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
836 struct timeval t, void *private_data)
838 struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
839 struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
840 struct ctdb_context *ctdb = ctdb_db->ctdb;
841 struct ctdb_vacuum_child_context *child_ctx;
842 struct tevent_fd *fde;
845 /* we dont vacuum if we are in recovery mode, or db frozen */
846 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
847 ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
848 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
849 ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
850 : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
853 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
857 child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
858 if (child_ctx == NULL) {
859 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
860 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
864 ret = pipe(child_ctx->fd);
866 talloc_free(child_ctx);
867 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
868 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
872 child_ctx->child_pid = ctdb_fork(ctdb);
873 if (child_ctx->child_pid == (pid_t)-1) {
874 close(child_ctx->fd[0]);
875 close(child_ctx->fd[1]);
876 talloc_free(child_ctx);
877 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
878 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
883 if (child_ctx->child_pid == 0) {
885 close(child_ctx->fd[0]);
887 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
889 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
890 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
897 cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx);
899 write(child_ctx->fd[1], &cc, 1);
903 set_close_on_exec(child_ctx->fd[0]);
904 close(child_ctx->fd[1]);
906 child_ctx->status = VACUUM_RUNNING;
907 child_ctx->start_time = timeval_current();
909 DLIST_ADD(ctdb->vacuumers, child_ctx);
910 talloc_set_destructor(child_ctx, vacuum_child_destructor);
913 * Clear the fastpath vacuuming list in the parent.
915 talloc_free(ctdb_db->delete_queue);
916 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
917 if (ctdb_db->delete_queue == NULL) {
918 /* fatal here? ... */
919 ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
920 "in parent context. Shutting down\n");
923 event_add_timed(ctdb->ev, child_ctx,
924 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
925 vacuum_child_timeout, child_ctx);
927 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
929 fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
930 EVENT_FD_READ, vacuum_child_handler, child_ctx);
931 tevent_fd_set_auto_close(fde);
933 vacuum_handle->child_ctx = child_ctx;
934 child_ctx->vacuum_handle = vacuum_handle;
937 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
939 /* Simply free them all. */
940 while (ctdb->vacuumers) {
941 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
942 ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
943 (int)ctdb->vacuumers->child_pid));
944 /* vacuum_child_destructor kills it, removes from list */
945 talloc_free(ctdb->vacuumers);
949 /* this function initializes the vacuuming context for a database
950 * starts the vacuuming events
952 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
954 if (ctdb_db->persistent != 0) {
955 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
959 ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
960 CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
962 ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
964 event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle,
965 timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
966 ctdb_vacuum_event, ctdb_db->vacuum_handle);