2 ctdb control tool - database vacuum
4 Copyright (C) Andrew Tridgell 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "include/ctdb_protocol.h"
25 #include "include/ctdb_private.h"
26 #include "common/rb_tree.h"
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 a list of records to possibly delete
37 uint32_t vacuum_limit;
38 struct ctdb_context *ctdb;
39 struct ctdb_db_context *ctdb_db;
40 trbt_tree_t *delete_tree;
41 uint32_t delete_count;
42 struct ctdb_marshall_buffer **list;
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49 struct ctdb_context *ctdb;
50 struct ctdb_db_context *ctdb_db;
51 struct ctdb_ltdb_header hdr;
56 traverse function for vacuuming
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
60 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61 struct ctdb_context *ctdb = vdata->ctdb;
62 struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
64 struct ctdb_ltdb_header *hdr;
65 struct ctdb_rec_data *rec;
68 lmaster = ctdb_lmaster(ctdb, &key);
69 if (lmaster >= ctdb->vnn_map->size) {
73 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74 /* its not a deleted record */
78 hdr = (struct ctdb_ltdb_header *)data.dptr;
80 if (hdr->dmaster != ctdb->pnn) {
85 /* is this a records we could possibly delete? I.e.
86 if the record is empty and also we are both lmaster
87 and dmaster for the record we should be able to delete it
89 if ( (lmaster == ctdb->pnn)
90 &&( (vdata->delete_count < vdata->vacuum_limit)
91 ||(vdata->vacuum_limit == 0) ) ){
94 hash = ctdb_hash(&key);
95 if (trbt_lookup32(vdata->delete_tree, hash)) {
96 DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
98 struct delete_record_data *dd;
100 /* store key and header indexed by the key hash */
101 dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
103 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
107 dd->ctdb_db = ctdb_db;
108 dd->key.dsize = key.dsize;
109 dd->key.dptr = talloc_memdup(dd, key.dptr, key.dsize);
110 if (dd->key.dptr == NULL) {
111 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
118 trbt_insert32(vdata->delete_tree, hash, dd);
120 vdata->delete_count++;
125 /* add the record to the blob ready to send to the nodes */
126 rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
128 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129 vdata->traverse_error = true;
132 old_size = talloc_get_size(vdata->list[lmaster]);
133 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
134 old_size + rec->length);
135 if (vdata->list[lmaster] == NULL) {
136 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137 vdata->traverse_error = true;
140 vdata->list[lmaster]->count++;
141 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
146 /* don't gather too many records */
147 if (vdata->vacuum_limit != 0 &&
148 vdata->total == vdata->vacuum_limit) {
155 struct delete_records_list {
156 struct ctdb_marshall_buffer *records;
160 traverse the tree of records to delete and marshall them into
164 delete_traverse(void *param, void *data)
166 struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167 struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168 struct ctdb_rec_data *rec;
171 rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
173 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
177 old_size = talloc_get_size(recs->records);
178 recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179 if (recs->records == NULL) {
180 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
183 recs->records->count++;
184 memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
188 static void delete_record(void *param, void *d)
190 struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
191 struct ctdb_context *ctdb = dd->ctdb;
192 struct ctdb_db_context *ctdb_db = dd->ctdb_db;
193 uint32_t *count = (uint32_t *)param;
194 struct ctdb_ltdb_header *hdr;
197 /* its deleted on all other nodes - refetch, check and delete */
198 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
199 /* the chain is busy - come back later */
203 data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
204 if (data.dptr == NULL) {
205 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
208 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
210 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
214 hdr = (struct ctdb_ltdb_header *)data.dptr;
216 /* if we are not the lmaster and the dmaster then skip the record */
217 if (hdr->dmaster != ctdb->pnn ||
218 ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
219 dd->hdr.rsn != hdr->rsn) {
220 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
225 ctdb_block_signal(SIGALRM);
226 tdb_delete(ctdb_db->ltdb->tdb, dd->key);
227 ctdb_unblock_signal(SIGALRM);
228 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
234 /* vacuum one database */
235 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
236 bool persistent, uint32_t vacuum_limit)
238 struct ctdb_db_context *ctdb_db;
240 struct vacuum_data *vdata;
243 vdata = talloc_zero(ctdb, struct vacuum_data);
245 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
250 vdata->vacuum_limit = vacuum_limit;
251 vdata->delete_tree = trbt_create(vdata, 0);
252 if (vdata->delete_tree == NULL) {
253 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
257 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
258 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
263 ret = ctdb_attachdb(ctdb, name, persistent, 0, &ctdb_db);
265 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
269 vdata->ctdb_db = ctdb_db;
271 /* the list needs to be of length num_nodes */
272 vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
273 if (vdata->list == NULL) {
274 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
278 for (i=0;i<ctdb->vnn_map->size;i++) {
279 vdata->list[i] = (struct ctdb_marshall_buffer *)
280 talloc_zero_size(vdata->list,
281 offsetof(struct ctdb_marshall_buffer, data));
282 if (vdata->list[i] == NULL) {
283 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
287 vdata->list[i]->db_id = db_id;
290 /* traverse, looking for records that might be able to be vacuumed */
291 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
292 vdata->traverse_error) {
293 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
299 for (i=0;i<ctdb->vnn_map->size;i++) {
300 if (vdata->list[i]->count == 0) {
304 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
305 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
307 printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
309 data.dsize = talloc_get_size(vdata->list[i]);
310 data.dptr = (void *)vdata->list[i];
311 if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
312 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
313 ctdb->vnn_map->map[i]));
322 /* Process all records we can delete (if any) */
323 if (vdata->delete_count > 0) {
324 struct delete_records_list *recs;
325 TDB_DATA indata, outdata;
329 recs = talloc_zero(vdata, struct delete_records_list);
331 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
334 recs->records = (struct ctdb_marshall_buffer *)
335 talloc_zero_size(vdata,
336 offsetof(struct ctdb_marshall_buffer, data));
337 if (recs->records == NULL) {
338 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
341 recs->records->db_id = db_id;
343 /* traverse the tree of all records we want to delete and
344 create a blob we can send to the other nodes.
346 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
348 indata.dsize = talloc_get_size(recs->records);
349 indata.dptr = (void *)recs->records;
351 /* now tell all the other nodes to delete all these records
354 for (i=0;i<ctdb->vnn_map->size;i++) {
355 struct ctdb_marshall_buffer *records;
356 struct ctdb_rec_data *rec;
358 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
359 /* we dont delete the records on the local node
365 ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
366 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
367 indata, recs, &outdata, &res,
369 if (ret != 0 || res != 0) {
370 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
374 /* outdata countains the list of records coming back
375 from the node which the node could not delete
377 records = (struct ctdb_marshall_buffer *)outdata.dptr;
378 rec = (struct ctdb_rec_data *)&records->data[0];
379 while (records->count-- > 1) {
380 TDB_DATA reckey, recdata;
381 struct ctdb_ltdb_header *rechdr;
383 reckey.dptr = &rec->data[0];
384 reckey.dsize = rec->keylen;
385 recdata.dptr = &rec->data[reckey.dsize];
386 recdata.dsize = rec->datalen;
388 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
389 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
392 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
393 recdata.dptr += sizeof(*rechdr);
394 recdata.dsize -= sizeof(*rechdr);
396 /* that other node couldnt delete the record
397 so we shouldnt delete it either.
398 remove it from the tree.
400 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
402 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
407 /* the only records remaining in the tree would be those
408 records where all other nodes could successfully
409 delete them, so we can now safely delete them on the
413 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
414 if (vdata->delete_count != 0) {
415 printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
419 /* this ensures we run our event queue */
420 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
429 vacuum all our databases
431 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
433 struct ctdb_dbid_map *dbmap=NULL;
434 struct ctdb_node_map *nodemap=NULL;
436 uint32_t vacuum_limit = 0;
439 vacuum_limit = atoi(argv[0]);
442 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
444 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
448 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
450 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
454 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
456 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
460 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
462 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
467 for (i=0;i<dbmap->num;i++) {
468 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap,
469 dbmap->dbs[i].persistent, vacuum_limit) != 0) {
470 DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
478 struct traverse_state {
480 struct tdb_context *dest_db;
484 traverse function for repacking
486 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
488 struct traverse_state *state = (struct traverse_state *)private;
489 if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
499 static int ctdb_repack_tdb(struct tdb_context *tdb)
501 struct tdb_context *tmp_db;
502 struct traverse_state state;
504 if (tdb_transaction_start(tdb) != 0) {
505 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
509 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
510 TDB_INTERNAL|TDB_DISALLOW_NESTING,
512 if (tmp_db == NULL) {
513 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
514 tdb_transaction_cancel(tdb);
519 state.dest_db = tmp_db;
521 if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
522 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
523 tdb_transaction_cancel(tdb);
529 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
530 tdb_transaction_cancel(tdb);
535 if (tdb_wipe_all(tdb) != 0) {
536 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
537 tdb_transaction_cancel(tdb);
545 if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
546 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
547 tdb_transaction_cancel(tdb);
553 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
554 tdb_transaction_cancel(tdb);
561 if (tdb_transaction_commit(tdb) != 0) {
562 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
570 /* repack one database */
571 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
572 bool persistent, uint32_t repack_limit)
574 struct ctdb_db_context *ctdb_db;
578 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
579 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
583 ret = ctdb_attachdb(ctdb, name, persistent, 0, &ctdb_db);
585 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
589 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
591 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
595 if (size <= repack_limit) {
599 printf("Repacking %s with %u freelist entries\n", name, size);
601 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
602 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
611 repack all our databases
613 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
615 struct ctdb_dbid_map *dbmap=NULL;
617 /* a reasonable default limit to prevent us using too much memory */
618 uint32_t repack_limit = 10000;
621 repack_limit = atoi(argv[0]);
624 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
626 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
630 for (i=0;i<dbmap->num;i++) {
631 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid,
632 dbmap->dbs[i].persistent, repack_limit) != 0) {
633 DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));