2 ctdb control tool - database vacuum
4 Copyright (C) Andrew Tridgell 2008
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
28 /* should be tunable */
29 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key,
35 struct ctdb_db_context *ctdb_db, uint32_t *count)
38 struct ctdb_ltdb_header *hdr;
39 struct ctdb_rec_data *rec;
42 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
43 /* the chain is busy - come back later */
47 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
48 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
49 if (data.dptr == NULL) {
52 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
58 hdr = (struct ctdb_ltdb_header *)data.dptr;
61 /* if we are not the lmaster and the dmaster then skip the record */
62 if (hdr->dmaster != ctdb->pnn ||
63 ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
68 rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
71 /* try it again later */
75 data.dptr = (void *)rec;
76 data.dsize = rec->length;
78 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DELETE_RECORD,
79 list_of_vnnmap_nodes(ctdb, ctdb->vnn_map, rec, false),
80 TIMELIMIT(), true, data) != 0) {
81 /* one or more nodes failed to delete a record - no problem! */
88 /* its deleted on all other nodes - refetch, check and delete */
89 if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
90 /* the chain is busy - come back later */
94 data = tdb_fetch(ctdb_db->ltdb->tdb, key);
95 if (data.dptr == NULL) {
96 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
99 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
101 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
105 hdr = (struct ctdb_ltdb_header *)data.dptr;
107 /* if we are not the lmaster and the dmaster then skip the record */
108 if (hdr->dmaster != ctdb->pnn ||
109 ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
111 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
116 ctdb_block_signal(SIGALRM);
117 tdb_delete(ctdb_db->ltdb->tdb, key);
118 ctdb_unblock_signal(SIGALRM);
119 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
129 vacuum records for which we are the lmaster
131 static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list,
132 struct ctdb_db_context *ctdb_db, uint32_t *count)
134 struct ctdb_rec_data *r;
137 r = (struct ctdb_rec_data *)&list->data[0];
141 r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
143 key.dptr = &r->data[0];
144 key.dsize = r->keylen;
145 if (ctdb_vacuum_one(ctdb, key, ctdb_db, count) != 0) {
154 a list of records to possibly delete
157 uint32_t vacuum_limit;
158 struct ctdb_context *ctdb;
159 struct ctdb_control_pulldb_reply **list;
165 traverse function for vacuuming
167 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
169 struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
171 struct ctdb_ltdb_header *hdr;
172 struct ctdb_rec_data *rec;
175 lmaster = ctdb_lmaster(vdata->ctdb, &key);
176 if (lmaster >= vdata->ctdb->vnn_map->size) {
180 if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
181 /* its not a deleted record */
185 hdr = (struct ctdb_ltdb_header *)data.dptr;
187 if (hdr->dmaster != vdata->ctdb->pnn) {
192 /* add the record to the blob ready to send to the nodes */
193 rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
195 DEBUG(0,(__location__ " Out of memory\n"));
196 vdata->traverse_error = true;
199 old_size = talloc_get_size(vdata->list[lmaster]);
200 vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
201 old_size + rec->length);
202 if (vdata->list[lmaster] == NULL) {
203 DEBUG(0,(__location__ " Failed to expand\n"));
204 vdata->traverse_error = true;
207 vdata->list[lmaster]->count++;
208 memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
213 /* don't gather too many records */
214 if (vdata->vacuum_limit != 0 &&
215 vdata->total == vdata->vacuum_limit) {
223 /* vacuum one database */
224 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
225 bool persistent, uint32_t vacuum_limit)
227 struct ctdb_db_context *ctdb_db;
229 struct vacuum_data *vdata;
232 vdata = talloc_zero(ctdb, struct vacuum_data);
234 DEBUG(0,(__location__ " Out of memory\n"));
239 vdata->vacuum_limit = vacuum_limit;
241 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
242 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
247 ctdb_db = ctdb_attach(ctdb, name, persistent);
248 if (ctdb_db == NULL) {
249 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
254 /* the list needs to be of length num_nodes */
255 vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
256 if (vdata->list == NULL) {
257 DEBUG(0,(__location__ " Out of memory\n"));
261 for (i=0;i<ctdb->vnn_map->size;i++) {
262 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
263 talloc_zero_size(vdata->list,
264 offsetof(struct ctdb_control_pulldb_reply, data));
265 if (vdata->list[i] == NULL) {
266 DEBUG(0,(__location__ " Out of memory\n"));
270 vdata->list[i]->db_id = db_id;
273 /* traverse, looking for records that might be able to be vacuumed */
274 if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
275 vdata->traverse_error) {
276 DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
282 for (i=0;i<ctdb->vnn_map->size;i++) {
283 if (vdata->list[i]->count == 0) {
287 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
288 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
290 printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
292 data.dsize = talloc_get_size(vdata->list[i]);
293 data.dptr = (void *)vdata->list[i];
294 if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
295 DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
296 ctdb->vnn_map->map[i]));
304 for (i=0;i<ctdb->vnn_map->size;i++) {
307 if (vdata->list[i]->count == 0) {
311 /* for records where we are the lmaster, we can try to delete them */
312 if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db, &count) != 0) {
313 DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
318 printf("Deleted %u records on this node from '%s'\n", count, name);
322 /* this ensures we run our event queue */
323 ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
332 vacuum all our databases
334 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
336 struct ctdb_dbid_map *dbmap=NULL;
337 struct ctdb_node_map *nodemap=NULL;
339 uint32_t vacuum_limit = 0;
342 vacuum_limit = atoi(argv[0]);
345 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
347 DEBUG(0, ("Unable to get dbids from local node\n"));
351 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
353 DEBUG(0, ("Unable to get nodemap from local node\n"));
357 ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
359 DEBUG(0, ("Unable to get vnnmap from local node\n"));
363 pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
365 DEBUG(0, ("Unable to get pnn from local node\n"));
370 for (i=0;i<dbmap->num;i++) {
371 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap,
372 dbmap->dbs[i].persistent, vacuum_limit) != 0) {
373 DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
381 struct traverse_state {
383 struct tdb_context *dest_db;
387 traverse function for repacking
389 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
391 struct traverse_state *state = (struct traverse_state *)private;
392 if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
402 static int ctdb_repack_tdb(struct tdb_context *tdb)
404 struct tdb_context *tmp_db;
405 struct traverse_state state;
407 if (tdb_transaction_start(tdb) != 0) {
408 DEBUG(0,(__location__ " Failed to start transaction\n"));
412 tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
413 if (tmp_db == NULL) {
414 DEBUG(0,(__location__ " Failed to create tmp_db\n"));
415 tdb_transaction_cancel(tdb);
420 state.dest_db = tmp_db;
422 if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
423 DEBUG(0,(__location__ " Failed to traverse copying out\n"));
424 tdb_transaction_cancel(tdb);
430 DEBUG(0,(__location__ " Error during traversal\n"));
431 tdb_transaction_cancel(tdb);
436 if (tdb_wipe_all(tdb) != 0) {
437 DEBUG(0,(__location__ " Failed to wipe database\n"));
438 tdb_transaction_cancel(tdb);
446 if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
447 DEBUG(0,(__location__ " Failed to traverse copying back\n"));
448 tdb_transaction_cancel(tdb);
454 DEBUG(0,(__location__ " Error during second traversal\n"));
455 tdb_transaction_cancel(tdb);
462 if (tdb_transaction_commit(tdb) != 0) {
463 DEBUG(0,(__location__ " Failed to commit\n"));
471 /* repack one database */
472 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
473 bool persistent, uint32_t repack_limit)
475 struct ctdb_db_context *ctdb_db;
479 if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
480 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
484 ctdb_db = ctdb_attach(ctdb, name, persistent);
485 if (ctdb_db == NULL) {
486 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
490 size = tdb_freelist_size(ctdb_db->ltdb->tdb);
492 DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
496 if (size <= repack_limit) {
500 printf("Repacking %s with %u freelist entries\n", name, size);
502 if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
503 DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
512 repack all our databases
514 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
516 struct ctdb_dbid_map *dbmap=NULL;
518 /* a reasonable default limit to prevent us using too much memory */
519 uint32_t repack_limit = 10000;
522 repack_limit = atoi(argv[0]);
525 ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
527 DEBUG(0, ("Unable to get dbids from local node\n"));
531 for (i=0;i<dbmap->num;i++) {
532 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid,
533 dbmap->dbs[i].persistent, repack_limit) != 0) {
534 DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));