2 ctdb control tool - database vacuum
4 Copyright (C) Andrew Tridgell 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb_client.h"
25 #include "../include/ctdb_private.h"
26 #include "../common/rb_tree.h"
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 a list of records to possibly delete
37 uint32_t vacuum_limit;
38 struct ctdb_context *ctdb;
39 struct ctdb_db_context *ctdb_db;
40 trbt_tree_t *delete_tree;
41 uint32_t delete_count;
42 struct ctdb_marshall_buffer **list;
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49 struct ctdb_context *ctdb;
50 struct ctdb_db_context *ctdb_db;
51 struct ctdb_ltdb_header hdr;
56 traverse function for vacuuming
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
60 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61 struct ctdb_context *ctdb = vdata->ctdb;
62 struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
64 struct ctdb_ltdb_header *hdr;
65 struct ctdb_rec_data *rec;
68 lmaster = ctdb_lmaster(ctdb, &key);
69 if (lmaster >= ctdb->vnn_map->size) {
73 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74 /* its not a deleted record */
78 hdr = (struct ctdb_ltdb_header *)data.dptr;
80 if (hdr->dmaster != ctdb->pnn) {
85 /* is this a records we could possibly delete? I.e.
86 if the record is empty and also we are both lmaster
87 and dmaster for the record we should be able to delete it
89 if ( (lmaster == ctdb->pnn)
90 &&( (vdata->delete_count < vdata->vacuum_limit)
91 ||(vdata->vacuum_limit == 0) ) ){
94 hash = ctdb_hash(&key);
95 if (trbt_lookup32(vdata->delete_tree, hash)) {
96 DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
98 struct delete_record_data *dd;
100 /* store key and header indexed by the key hash */
101 dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
103 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
107 dd->ctdb_db = ctdb_db;
108 dd->key.dsize = key.dsize;
109 dd->key.dptr = talloc_memdup(dd, key.dptr, key.dsize);
110 if (dd->key.dptr == NULL) {
111 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
118 trbt_insert32(vdata->delete_tree, hash, dd);
120 vdata->delete_count++;
125 /* add the record to the blob ready to send to the nodes */
126 rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
128 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129 vdata->traverse_error = true;
132 old_size = talloc_get_size(vdata->list[lmaster]);
133 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
134 old_size + rec->length);
135 if (vdata->list[lmaster] == NULL) {
136 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137 vdata->traverse_error = true;
140 vdata->list[lmaster]->count++;
141 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
146 /* don't gather too many records */
147 if (vdata->vacuum_limit != 0 &&
148 vdata->total == vdata->vacuum_limit) {
155 struct delete_records_list {
156 struct ctdb_marshall_buffer *records;
160 traverse the tree of records to delete and marshall them into
164 delete_traverse(void *param, void *data)
166 struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167 struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168 struct ctdb_rec_data *rec;
171 rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
173 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
177 old_size = talloc_get_size(recs->records);
178 recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179 if (recs->records == NULL) {
180 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
183 recs->records->count++;
184 memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
189 static int delete_record(void *param, void *d)
191 struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
192 struct ctdb_context *ctdb = dd->ctdb;
193 struct ctdb_db_context *ctdb_db = dd->ctdb_db;
194 uint32_t *count = (uint32_t *)param;
195 struct ctdb_ltdb_header *hdr;
198 /* its deleted on all other nodes - refetch, check and delete */
199 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
200 /* the chain is busy - come back later */
204 data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
205 if (data.dptr == NULL) {
206 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
209 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
211 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
215 hdr = (struct ctdb_ltdb_header *)data.dptr;
217 /* if we are not the lmaster and the dmaster then skip the record */
218 if (hdr->dmaster != ctdb->pnn ||
219 ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
220 dd->hdr.rsn != hdr->rsn) {
221 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
226 ctdb_block_signal(SIGALRM);
227 tdb_delete(ctdb_db->ltdb->tdb, dd->key);
228 ctdb_unblock_signal(SIGALRM);
229 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
236 /* vacuum one database */
237 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
238 bool persistent, uint32_t vacuum_limit)
240 struct ctdb_db_context *ctdb_db;
242 struct vacuum_data *vdata;
245 vdata = talloc_zero(ctdb, struct vacuum_data);
247 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
252 vdata->vacuum_limit = vacuum_limit;
253 vdata->delete_tree = trbt_create(vdata, 0);
254 if (vdata->delete_tree == NULL) {
255 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
259 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
260 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
265 ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), name, persistent, 0);
266 if (ctdb_db == NULL) {
267 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
271 vdata->ctdb_db = ctdb_db;
273 /* the list needs to be of length num_nodes */
274 vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
275 if (vdata->list == NULL) {
276 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
280 for (i=0;i<ctdb->vnn_map->size;i++) {
281 vdata->list[i] = (struct ctdb_marshall_buffer *)
282 talloc_zero_size(vdata->list,
283 offsetof(struct ctdb_marshall_buffer, data));
284 if (vdata->list[i] == NULL) {
285 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
289 vdata->list[i]->db_id = db_id;
292 /* traverse, looking for records that might be able to be vacuumed */
293 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
294 vdata->traverse_error) {
295 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
301 for (i=0;i<ctdb->vnn_map->size;i++) {
302 if (vdata->list[i]->count == 0) {
306 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
307 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
309 printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
311 data.dsize = talloc_get_size(vdata->list[i]);
312 data.dptr = (void *)vdata->list[i];
313 if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
314 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
315 ctdb->vnn_map->map[i]));
324 /* Process all records we can delete (if any) */
325 if (vdata->delete_count > 0) {
326 struct delete_records_list *recs;
327 TDB_DATA indata, outdata;
332 recs = talloc_zero(vdata, struct delete_records_list);
334 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
337 recs->records = (struct ctdb_marshall_buffer *)
338 talloc_zero_size(vdata,
339 offsetof(struct ctdb_marshall_buffer, data));
340 if (recs->records == NULL) {
341 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
344 recs->records->db_id = db_id;
346 /* traverse the tree of all records we want to delete and
347 create a blob we can send to the other nodes.
349 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
351 indata.dsize = talloc_get_size(recs->records);
352 indata.dptr = (void *)recs->records;
354 /* now tell all the other nodes to delete all these records
357 for (i=0;i<ctdb->vnn_map->size;i++) {
358 struct ctdb_marshall_buffer *records;
359 struct ctdb_rec_data *rec;
361 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
362 /* we dont delete the records on the local node
368 ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
369 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
370 indata, recs, &outdata, &res,
372 if (ret != 0 || res != 0) {
373 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
377 /* outdata countains the list of records coming back
378 from the node which the node could not delete
380 records = (struct ctdb_marshall_buffer *)outdata.dptr;
381 rec = (struct ctdb_rec_data *)&records->data[0];
382 while (records->count-- > 1) {
383 TDB_DATA reckey, recdata;
384 struct ctdb_ltdb_header *rechdr;
386 reckey.dptr = &rec->data[0];
387 reckey.dsize = rec->keylen;
388 recdata.dptr = &rec->data[reckey.dsize];
389 recdata.dsize = rec->datalen;
391 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
392 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
395 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
396 recdata.dptr += sizeof(*rechdr);
397 recdata.dsize -= sizeof(*rechdr);
399 /* that other node couldnt delete the record
400 so we shouldnt delete it either.
401 remove it from the tree.
403 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
405 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
410 /* the only records remaining in the tree would be those
411 records where all other nodes could successfully
412 delete them, so we can now safely delete them on the
416 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
417 if (vdata->delete_count != 0) {
418 printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
422 /* this ensures we run our event queue */
423 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
432 vacuum all our databases
434 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
436 struct ctdb_dbid_map *dbmap=NULL;
437 struct ctdb_node_map *nodemap=NULL;
439 uint32_t vacuum_limit = 0;
442 vacuum_limit = atoi(argv[0]);
445 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
447 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
451 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
453 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
457 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
459 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
463 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
465 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
470 for (i=0;i<dbmap->num;i++) {
471 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap,
472 dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, vacuum_limit) != 0) {
473 DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
481 struct traverse_state {
483 struct tdb_context *dest_db;
487 traverse function for repacking
489 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
491 struct traverse_state *state = (struct traverse_state *)private;
492 if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
502 static int ctdb_repack_tdb(struct tdb_context *tdb)
504 struct tdb_context *tmp_db;
505 struct traverse_state state;
507 if (tdb_transaction_start(tdb) != 0) {
508 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
512 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
513 TDB_INTERNAL|TDB_DISALLOW_NESTING,
515 if (tmp_db == NULL) {
516 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
517 tdb_transaction_cancel(tdb);
522 state.dest_db = tmp_db;
524 if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
525 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
526 tdb_transaction_cancel(tdb);
532 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
533 tdb_transaction_cancel(tdb);
538 if (tdb_wipe_all(tdb) != 0) {
539 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
540 tdb_transaction_cancel(tdb);
548 if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
549 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
550 tdb_transaction_cancel(tdb);
556 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
557 tdb_transaction_cancel(tdb);
564 if (tdb_transaction_commit(tdb) != 0) {
565 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
573 /* repack one database */
574 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
575 bool persistent, uint32_t repack_limit)
577 struct ctdb_db_context *ctdb_db;
581 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
582 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
586 ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), name, persistent, 0);
587 if (ctdb_db == NULL) {
588 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
592 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
594 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
598 if (size <= repack_limit) {
602 printf("Repacking %s with %u freelist entries\n", name, size);
604 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
605 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
614 repack all our databases
616 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
618 struct ctdb_dbid_map *dbmap=NULL;
620 /* a reasonable default limit to prevent us using too much memory */
621 uint32_t repack_limit = 10000;
624 repack_limit = atoi(argv[0]);
627 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
629 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
633 for (i=0;i<dbmap->num;i++) {
634 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid,
635 dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, repack_limit) != 0) {
636 DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));