f412c0430386f792ce7f9c89b319d531c1b8f362
[rusty/ctdb.git] / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
26 #include "db_wrap.h"
27
28 /* should be tunable */
29 #define TIMELIMIT() timeval_current_ofs(10, 0)
30
31 /*
32   vacuum one record
33  */
34 static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key, 
35                            struct ctdb_db_context *ctdb_db, uint32_t *count)
36 {
37         TDB_DATA data;
38         struct ctdb_ltdb_header *hdr;
39         struct ctdb_rec_data *rec;
40         uint64_t rsn;
41
42         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
43                 /* the chain is busy - come back later */
44                 return 0;
45         }
46
47         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
48         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
49         if (data.dptr == NULL) {
50                 return 0;
51         }
52         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
53                 free(data.dptr);
54                 return 0;
55         }
56
57
58         hdr = (struct ctdb_ltdb_header *)data.dptr;
59         rsn = hdr->rsn;
60
61         /* if we are not the lmaster and the dmaster then skip the record */
62         if (hdr->dmaster != ctdb->pnn ||
63             ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
64                 free(data.dptr);
65                 return 0;
66         }
67
68         rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
69         free(data.dptr);
70         if (rec == NULL) {
71                 /* try it again later */
72                 return 0;
73         }
74
75         data.dptr = (void *)rec;
76         data.dsize = rec->length;
77
78         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_DELETE_RECORD,
79                         list_of_vnnmap_nodes(ctdb, ctdb->vnn_map, rec, false),
80                         TIMELIMIT(), true, data) != 0) {
81                 /* one or more nodes failed to delete a record - no problem! */
82                 talloc_free(rec);
83                 return 0;
84         }
85
86         talloc_free(rec);
87
88         /* its deleted on all other nodes - refetch, check and delete */
89         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
90                 /* the chain is busy - come back later */
91                 return 0;
92         }
93
94         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
95         if (data.dptr == NULL) {
96                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
97                 return 0;
98         }
99         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
100                 free(data.dptr);
101                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
102                 return 0;
103         }
104
105         hdr = (struct ctdb_ltdb_header *)data.dptr;
106
107         /* if we are not the lmaster and the dmaster then skip the record */
108         if (hdr->dmaster != ctdb->pnn ||
109             ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
110             rsn != hdr->rsn) {
111                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
112                 free(data.dptr);
113                 return 0;
114         }
115
116         ctdb_block_signal(SIGALRM);
117         tdb_delete(ctdb_db->ltdb->tdb, key);
118         ctdb_unblock_signal(SIGALRM);
119         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
120         free(data.dptr);
121
122         (*count)++;
123
124         return 0;
125 }
126
127
128 /*
129   vacuum records for which we are the lmaster 
130  */
131 static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list, 
132                              struct ctdb_db_context *ctdb_db, uint32_t *count)
133 {
134         struct ctdb_rec_data *r;
135         int i;
136
137         r = (struct ctdb_rec_data *)&list->data[0];
138         
139         for (i=0;
140              i<list->count;
141              r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
142                 TDB_DATA key;
143                 key.dptr = &r->data[0];
144                 key.dsize = r->keylen;
145                 if (ctdb_vacuum_one(ctdb, key, ctdb_db, count) != 0) {
146                         return -1;
147                 }
148         }
149
150         return 0;       
151 }
152
153 /* 
154    a list of records to possibly delete
155  */
156 struct vacuum_data {
157         uint32_t vacuum_limit;
158         struct ctdb_context *ctdb;
159         struct ctdb_control_pulldb_reply **list;
160         bool traverse_error;
161         uint32_t total;
162 };
163
164 /*
165   traverse function for vacuuming
166  */
167 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
168 {
169         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
170         uint32_t lmaster;
171         struct ctdb_ltdb_header *hdr;
172         struct ctdb_rec_data *rec;
173         size_t old_size;
174                
175         lmaster = ctdb_lmaster(vdata->ctdb, &key);
176         if (lmaster >= vdata->ctdb->vnn_map->size) {
177                 return 0;
178         }
179
180         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
181                 /* its not a deleted record */
182                 return 0;
183         }
184
185         hdr = (struct ctdb_ltdb_header *)data.dptr;
186
187         if (hdr->dmaster != vdata->ctdb->pnn) {
188                 return 0;
189         }
190
191
192         /* add the record to the blob ready to send to the nodes */
193         rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
194         if (rec == NULL) {
195                 DEBUG(0,(__location__ " Out of memory\n"));
196                 vdata->traverse_error = true;
197                 return -1;
198         }
199         old_size = talloc_get_size(vdata->list[lmaster]);
200         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
201                                                    old_size + rec->length);
202         if (vdata->list[lmaster] == NULL) {
203                 DEBUG(0,(__location__ " Failed to expand\n"));
204                 vdata->traverse_error = true;
205                 return -1;
206         }
207         vdata->list[lmaster]->count++;
208         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
209         talloc_free(rec);
210
211         vdata->total++;
212
213         /* don't gather too many records */
214         if (vdata->vacuum_limit != 0 &&
215             vdata->total == vdata->vacuum_limit) {
216                 return -1;
217         }
218
219         return 0;
220 }
221
222
223 /* vacuum one database */
224 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
225                           bool persistent, uint32_t vacuum_limit)
226 {
227         struct ctdb_db_context *ctdb_db;
228         const char *name;
229         struct vacuum_data *vdata;
230         int i;
231
232         vdata = talloc_zero(ctdb, struct vacuum_data);
233         if (vdata == NULL) {
234                 DEBUG(0,(__location__ " Out of memory\n"));
235                 return -1;
236         }
237
238         vdata->ctdb = ctdb;
239         vdata->vacuum_limit = vacuum_limit;
240
241         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
242                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
243                 talloc_free(vdata);
244                 return -1;
245         }
246
247         ctdb_db = ctdb_attach(ctdb, name, persistent);
248         if (ctdb_db == NULL) {
249                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
250                 talloc_free(vdata);
251                 return -1;
252         }
253
254         /* the list needs to be of length num_nodes */
255         vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
256         if (vdata->list == NULL) {
257                 DEBUG(0,(__location__ " Out of memory\n"));
258                 talloc_free(vdata);
259                 return -1;
260         }
261         for (i=0;i<ctdb->vnn_map->size;i++) {
262                 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
263                         talloc_zero_size(vdata->list, 
264                                     offsetof(struct ctdb_control_pulldb_reply, data));
265                 if (vdata->list[i] == NULL) {
266                         DEBUG(0,(__location__ " Out of memory\n"));
267                         talloc_free(vdata);
268                         return -1;
269                 }
270                 vdata->list[i]->db_id = db_id;
271         }
272
273         /* traverse, looking for records that might be able to be vacuumed */
274         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
275             vdata->traverse_error) {
276                 DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
277                 talloc_free(vdata);
278                 return -1;              
279         }
280
281
282         for (i=0;i<ctdb->vnn_map->size;i++) {
283                 if (vdata->list[i]->count == 0) {
284                         continue;
285                 }
286
287                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
288                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
289                         TDB_DATA data;
290                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
291
292                         data.dsize = talloc_get_size(vdata->list[i]);
293                         data.dptr  = (void *)vdata->list[i];
294                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
295                                 DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
296                                          ctdb->vnn_map->map[i]));
297                                 talloc_free(vdata);
298                                 return -1;              
299                         }
300                         continue;
301                 }
302         }       
303
304         for (i=0;i<ctdb->vnn_map->size;i++) {
305                 uint32_t count = 0;
306
307                 if (vdata->list[i]->count == 0) {
308                         continue;
309                 }
310
311                 /* for records where we are the lmaster, we can try to delete them */
312                 if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db, &count) != 0) {
313                         DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
314                         talloc_free(vdata);
315                         return -1;                                      
316                 }
317                 if (count != 0) {
318                         printf("Deleted %u records on this node from '%s'\n", count, name);
319                 }
320         }       
321
322         /* this ensures we run our event queue */
323         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
324
325         talloc_free(vdata);
326
327         return 0;
328 }
329
330
331 /*
332   vacuum all our databases
333  */
334 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
335 {
336         struct ctdb_dbid_map *dbmap=NULL;
337         struct ctdb_node_map *nodemap=NULL;
338         int ret, i, pnn;
339         uint32_t vacuum_limit = 0;
340
341         if (argc > 0) {
342                 vacuum_limit = atoi(argv[0]);
343         }
344
345         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
346         if (ret != 0) {
347                 DEBUG(0, ("Unable to get dbids from local node\n"));
348                 return ret;
349         }
350
351         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
352         if (ret != 0) {
353                 DEBUG(0, ("Unable to get nodemap from local node\n"));
354                 return ret;
355         }
356
357         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
358         if (ret != 0) {
359                 DEBUG(0, ("Unable to get vnnmap from local node\n"));
360                 return ret;
361         }
362
363         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
364         if (pnn == -1) {
365                 DEBUG(0, ("Unable to get pnn from local node\n"));
366                 return -1;
367         }
368         ctdb->pnn = pnn;
369
370         for (i=0;i<dbmap->num;i++) {
371                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
372                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
373                         DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
374                         return -1;
375                 }
376         }
377
378         return 0;
379 }
380
381 struct traverse_state {
382         bool error;
383         struct tdb_context *dest_db;
384 };
385
386 /*
387   traverse function for repacking
388  */
389 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
390 {
391         struct traverse_state *state = (struct traverse_state *)private;
392         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
393                 state->error = true;
394                 return -1;
395         }
396         return 0;
397 }
398
399 /*
400   repack a tdb
401  */
402 static int ctdb_repack_tdb(struct tdb_context *tdb)
403 {
404         struct tdb_context *tmp_db;
405         struct traverse_state state;
406
407         if (tdb_transaction_start(tdb) != 0) {
408                 DEBUG(0,(__location__ " Failed to start transaction\n"));
409                 return -1;
410         }
411
412         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
413         if (tmp_db == NULL) {
414                 DEBUG(0,(__location__ " Failed to create tmp_db\n"));
415                 tdb_transaction_cancel(tdb);
416                 return -1;
417         }
418
419         state.error = false;
420         state.dest_db = tmp_db;
421
422         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
423                 DEBUG(0,(__location__ " Failed to traverse copying out\n"));
424                 tdb_transaction_cancel(tdb);
425                 tdb_close(tmp_db);
426                 return -1;              
427         }
428
429         if (state.error) {
430                 DEBUG(0,(__location__ " Error during traversal\n"));
431                 tdb_transaction_cancel(tdb);
432                 tdb_close(tmp_db);
433                 return -1;
434         }
435
436         if (tdb_wipe_all(tdb) != 0) {
437                 DEBUG(0,(__location__ " Failed to wipe database\n"));
438                 tdb_transaction_cancel(tdb);
439                 tdb_close(tmp_db);
440                 return -1;
441         }
442
443         state.error = false;
444         state.dest_db = tdb;
445
446         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
447                 DEBUG(0,(__location__ " Failed to traverse copying back\n"));
448                 tdb_transaction_cancel(tdb);
449                 tdb_close(tmp_db);
450                 return -1;              
451         }
452
453         if (state.error) {
454                 DEBUG(0,(__location__ " Error during second traversal\n"));
455                 tdb_transaction_cancel(tdb);
456                 tdb_close(tmp_db);
457                 return -1;
458         }
459
460         tdb_close(tmp_db);
461
462         if (tdb_transaction_commit(tdb) != 0) {
463                 DEBUG(0,(__location__ " Failed to commit\n"));
464                 return -1;
465         }
466
467         return 0;
468 }
469
470
471 /* repack one database */
472 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
473                           bool persistent, uint32_t repack_limit)
474 {
475         struct ctdb_db_context *ctdb_db;
476         const char *name;
477         int size;
478
479         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
480                 DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
481                 return -1;
482         }
483
484         ctdb_db = ctdb_attach(ctdb, name, persistent);
485         if (ctdb_db == NULL) {
486                 DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
487                 return -1;
488         }
489
490         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
491         if (size == -1) {
492                 DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
493                 return -1;
494         }
495
496         if (size <= repack_limit) {
497                 return 0;
498         }
499
500         printf("Repacking %s with %u freelist entries\n", name, size);
501
502         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
503                 DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
504                 return -1;
505         }
506
507         return 0;
508 }
509
510
511 /*
512   repack all our databases
513  */
514 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
515 {
516         struct ctdb_dbid_map *dbmap=NULL;
517         int ret, i;
518         /* a reasonable default limit to prevent us using too much memory */
519         uint32_t repack_limit = 10000; 
520
521         if (argc > 0) {
522                 repack_limit = atoi(argv[0]);
523         }
524
525         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
526         if (ret != 0) {
527                 DEBUG(0, ("Unable to get dbids from local node\n"));
528                 return ret;
529         }
530
531         for (i=0;i<dbmap->num;i++) {
532                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
533                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
534                         DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
535                         return -1;
536                 }
537         }
538
539         return 0;
540 }