60a0b0a3ee7b410e054e63b79de9aebe70f58755
[sahlberg/ctdb.git] / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb.h"
25 #include "../include/ctdb_private.h"
26 #include "../common/rb_tree.h"
27 #include "db_wrap.h"
28
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
31
32
33 /* 
34    a list of records to possibly delete
35  */
36 struct vacuum_data {
37         uint32_t vacuum_limit;
38         struct ctdb_context *ctdb;
39         struct ctdb_db_context *ctdb_db;
40         trbt_tree_t *delete_tree;
41         uint32_t delete_count;
42         struct ctdb_control_pulldb_reply **list;
43         bool traverse_error;
44         uint32_t total;
45 };
46
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49         struct ctdb_context *ctdb;
50         struct ctdb_db_context *ctdb_db;
51         struct ctdb_ltdb_header hdr;
52         TDB_DATA key;
53 };
54
55 /*
56   traverse function for vacuuming
57  */
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
59 {
60         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61         struct ctdb_context *ctdb = vdata->ctdb;
62         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
63         uint32_t lmaster;
64         struct ctdb_ltdb_header *hdr;
65         struct ctdb_rec_data *rec;
66         size_t old_size;
67                
68         lmaster = ctdb_lmaster(ctdb, &key);
69         if (lmaster >= ctdb->vnn_map->size) {
70                 return 0;
71         }
72
73         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74                 /* its not a deleted record */
75                 return 0;
76         }
77
78         hdr = (struct ctdb_ltdb_header *)data.dptr;
79
80         if (hdr->dmaster != ctdb->pnn) {
81                 return 0;
82         }
83
84
85         /* is this a records we could possibly delete? I.e.
86            if the record is empty and also we are both lmaster
87            and dmaster for the record we should be able to delete it
88         */
89         if ( (lmaster == ctdb->pnn)
90            &&( (vdata->delete_count < vdata->vacuum_limit)
91              ||(vdata->vacuum_limit == 0) ) ){
92                 uint32_t hash;
93
94                 hash = ctdb_hash(&key);
95                 if (trbt_lookup32(vdata->delete_tree, hash)) {
96                         DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
97                 } else {
98                         struct delete_record_data *dd;
99
100                         /* store key and header indexed by the key hash */
101                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
102                         if (dd == NULL) {
103                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
104                                 return -1;
105                         }
106                         dd->ctdb      = ctdb;
107                         dd->ctdb_db   = ctdb_db;
108                         dd->key.dsize = key.dsize;
109                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
110                         if (dd->key.dptr == NULL) {
111                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
112                                 return -1;
113                         }
114
115                         dd->hdr = *hdr;
116
117         
118                         trbt_insert32(vdata->delete_tree, hash, dd);
119
120                         vdata->delete_count++;
121                 }
122         }
123
124
125         /* add the record to the blob ready to send to the nodes */
126         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
127         if (rec == NULL) {
128                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129                 vdata->traverse_error = true;
130                 return -1;
131         }
132         old_size = talloc_get_size(vdata->list[lmaster]);
133         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
134                                                    old_size + rec->length);
135         if (vdata->list[lmaster] == NULL) {
136                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137                 vdata->traverse_error = true;
138                 return -1;
139         }
140         vdata->list[lmaster]->count++;
141         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
142         talloc_free(rec);
143
144         vdata->total++;
145
146         /* don't gather too many records */
147         if (vdata->vacuum_limit != 0 &&
148             vdata->total == vdata->vacuum_limit) {
149                 return -1;
150         }
151
152         return 0;
153 }
154
155 struct delete_records_list {
156         struct ctdb_control_pulldb_reply *records;
157 };
158
159 /*
160  traverse the tree of records to delete and marshall them into
161  a blob
162 */
163 static void
164 delete_traverse(void *param, void *data)
165 {
166         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168         struct ctdb_rec_data *rec;
169         size_t old_size;
170
171         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
172         if (rec == NULL) {
173                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
174                 return;
175         }
176
177         old_size = talloc_get_size(recs->records);
178         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179         if (recs->records == NULL) {
180                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
181                 return;
182         }
183         recs->records->count++;
184         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
185 }
186
187
188 static void delete_record(void *param, void *d)
189 {
190         struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
191         struct ctdb_context *ctdb = dd->ctdb;
192         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
193         uint32_t *count = (uint32_t *)param;
194         struct ctdb_ltdb_header *hdr;
195         TDB_DATA data;
196
197         /* its deleted on all other nodes - refetch, check and delete */
198         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
199                 /* the chain is busy - come back later */
200                 return;
201         }
202
203         data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
204         if (data.dptr == NULL) {
205                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
206                 return;
207         }
208         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
209                 free(data.dptr);
210                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
211                 return;
212         }
213
214         hdr = (struct ctdb_ltdb_header *)data.dptr;
215
216         /* if we are not the lmaster and the dmaster then skip the record */
217         if (hdr->dmaster != ctdb->pnn ||
218             ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
219             dd->hdr.rsn != hdr->rsn) {
220                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
221                 free(data.dptr);
222                 return;
223         }
224
225         ctdb_block_signal(SIGALRM);
226         tdb_delete(ctdb_db->ltdb->tdb, dd->key);
227         ctdb_unblock_signal(SIGALRM);
228         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
229         free(data.dptr);
230
231         (*count)++;
232 }
233
234 /* vacuum one database */
235 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
236                           bool persistent, uint32_t vacuum_limit)
237 {
238         struct ctdb_db_context *ctdb_db;
239         const char *name;
240         struct vacuum_data *vdata;
241         int i;
242
243         vdata = talloc_zero(ctdb, struct vacuum_data);
244         if (vdata == NULL) {
245                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
246                 return -1;
247         }
248
249         vdata->ctdb = ctdb;
250         vdata->vacuum_limit = vacuum_limit;
251         vdata->delete_tree = trbt_create(vdata, 0);
252         if (vdata->delete_tree == NULL) {
253                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
254                 return -1;
255         }
256
257         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
259                 talloc_free(vdata);
260                 return -1;
261         }
262
263         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
264         if (ctdb_db == NULL) {
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
266                 talloc_free(vdata);
267                 return -1;
268         }
269         vdata->ctdb_db = ctdb_db;
270
271         /* the list needs to be of length num_nodes */
272         vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
273         if (vdata->list == NULL) {
274                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
275                 talloc_free(vdata);
276                 return -1;
277         }
278         for (i=0;i<ctdb->vnn_map->size;i++) {
279                 vdata->list[i] = (struct ctdb_control_pulldb_reply *)
280                         talloc_zero_size(vdata->list, 
281                                     offsetof(struct ctdb_control_pulldb_reply, data));
282                 if (vdata->list[i] == NULL) {
283                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
284                         talloc_free(vdata);
285                         return -1;
286                 }
287                 vdata->list[i]->db_id = db_id;
288         }
289
290         /* traverse, looking for records that might be able to be vacuumed */
291         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
292             vdata->traverse_error) {
293                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
294                 talloc_free(vdata);
295                 return -1;              
296         }
297
298
299         for (i=0;i<ctdb->vnn_map->size;i++) {
300                 if (vdata->list[i]->count == 0) {
301                         continue;
302                 }
303
304                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
305                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
306                         TDB_DATA data;
307                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
308
309                         data.dsize = talloc_get_size(vdata->list[i]);
310                         data.dptr  = (void *)vdata->list[i];
311                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
312                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
313                                          ctdb->vnn_map->map[i]));
314                                 talloc_free(vdata);
315                                 return -1;              
316                         }
317                         continue;
318                 }
319         }       
320
321
322         /* Process all records we can delete (if any) */
323         if (vdata->delete_count > 0) {
324                 struct delete_records_list *recs;
325                 TDB_DATA indata, outdata;
326                 int ret;
327                 int32_t res;
328                 uint32_t count;
329
330                 recs = talloc_zero(vdata, struct delete_records_list);
331                 if (recs == NULL) {
332                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
333                         return -1;
334                 }
335                 recs->records = (struct ctdb_control_pulldb_reply *)
336                         talloc_zero_size(vdata, 
337                                     offsetof(struct ctdb_control_pulldb_reply, data));
338                 if (recs->records == NULL) {
339                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
340                         return -1;
341                 }
342                 recs->records->db_id = db_id;
343
344                 /* traverse the tree of all records we want to delete and
345                    create a blob we can send to the other nodes.
346                 */
347                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
348
349                 indata.dsize = talloc_get_size(recs->records);
350                 indata.dptr  = (void *)recs->records;
351
352                 /* now tell all the other nodes to delete all these records
353                    (if possible)
354                  */
355                 for (i=0;i<ctdb->vnn_map->size;i++) {
356                         struct ctdb_control_pulldb_reply *records;
357                         struct ctdb_rec_data *rec;
358
359                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
360                                 /* we dont delete the records on the local node
361                                    just yet
362                                 */
363                                 continue;
364                         }
365
366                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
367                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
368                                         indata, recs, &outdata, &res,
369                                         NULL, NULL);
370                         if (ret != 0 || res != 0) {
371                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
372                                 exit(10);
373                         }
374
375                         /* outdata countains the list of records coming back
376                            from the node which the node could not delete
377                         */
378                         records = (struct ctdb_control_pulldb_reply *)outdata.dptr;
379                         rec = (struct ctdb_rec_data *)&records->data[0];
380                         while (records->count-- > 1) {
381                                 TDB_DATA reckey, recdata;
382                                 struct ctdb_ltdb_header *rechdr;
383
384                                 reckey.dptr = &rec->data[0];
385                                 reckey.dsize = rec->keylen;
386                                 recdata.dptr = &rec->data[reckey.dsize];
387                                 recdata.dsize = rec->datalen;
388
389                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
390                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
391                                         exit(10);
392                                 }
393                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
394                                 recdata.dptr += sizeof(*rechdr);
395                                 recdata.dsize -= sizeof(*rechdr);
396
397                                 /* that other node couldnt delete the record
398                                    so we shouldnt delete it either.
399                                    remove it from the tree.
400                                 */
401                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
402
403                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
404                         }           
405                 }
406
407
408                 /* the only records remaining in the tree would be those
409                    records where all other nodes could successfully
410                    delete them, so we can now safely delete them on the
411                    lmaster as well.
412                 */
413                 count = 0;
414                 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
415                 if (vdata->delete_count != 0) {
416                         printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
417                 }
418         }
419
420         /* this ensures we run our event queue */
421         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
422
423         talloc_free(vdata);
424
425         return 0;
426 }
427
428
429 /*
430   vacuum all our databases
431  */
432 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
433 {
434         struct ctdb_dbid_map *dbmap=NULL;
435         struct ctdb_node_map *nodemap=NULL;
436         int ret, i, pnn;
437         uint32_t vacuum_limit = 0;
438
439         if (argc > 0) {
440                 vacuum_limit = atoi(argv[0]);
441         }
442
443         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
444         if (ret != 0) {
445                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
446                 return ret;
447         }
448
449         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
450         if (ret != 0) {
451                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
452                 return ret;
453         }
454
455         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
456         if (ret != 0) {
457                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
458                 return ret;
459         }
460
461         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
462         if (pnn == -1) {
463                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
464                 return -1;
465         }
466         ctdb->pnn = pnn;
467
468         for (i=0;i<dbmap->num;i++) {
469                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
470                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
471                         DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
472                         return -1;
473                 }
474         }
475
476         return 0;
477 }
478
479 struct traverse_state {
480         bool error;
481         struct tdb_context *dest_db;
482 };
483
484 /*
485   traverse function for repacking
486  */
487 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
488 {
489         struct traverse_state *state = (struct traverse_state *)private;
490         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
491                 state->error = true;
492                 return -1;
493         }
494         return 0;
495 }
496
497 /*
498   repack a tdb
499  */
500 static int ctdb_repack_tdb(struct tdb_context *tdb)
501 {
502         struct tdb_context *tmp_db;
503         struct traverse_state state;
504
505         if (tdb_transaction_start(tdb) != 0) {
506                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
507                 return -1;
508         }
509
510         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
511         if (tmp_db == NULL) {
512                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
513                 tdb_transaction_cancel(tdb);
514                 return -1;
515         }
516
517         state.error = false;
518         state.dest_db = tmp_db;
519
520         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
521                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
522                 tdb_transaction_cancel(tdb);
523                 tdb_close(tmp_db);
524                 return -1;              
525         }
526
527         if (state.error) {
528                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
529                 tdb_transaction_cancel(tdb);
530                 tdb_close(tmp_db);
531                 return -1;
532         }
533
534         if (tdb_wipe_all(tdb) != 0) {
535                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
536                 tdb_transaction_cancel(tdb);
537                 tdb_close(tmp_db);
538                 return -1;
539         }
540
541         state.error = false;
542         state.dest_db = tdb;
543
544         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
545                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
546                 tdb_transaction_cancel(tdb);
547                 tdb_close(tmp_db);
548                 return -1;              
549         }
550
551         if (state.error) {
552                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
553                 tdb_transaction_cancel(tdb);
554                 tdb_close(tmp_db);
555                 return -1;
556         }
557
558         tdb_close(tmp_db);
559
560         if (tdb_transaction_commit(tdb) != 0) {
561                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
562                 return -1;
563         }
564
565         return 0;
566 }
567
568
569 /* repack one database */
570 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
571                           bool persistent, uint32_t repack_limit)
572 {
573         struct ctdb_db_context *ctdb_db;
574         const char *name;
575         int size;
576
577         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
578                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
579                 return -1;
580         }
581
582         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
583         if (ctdb_db == NULL) {
584                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
585                 return -1;
586         }
587
588         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
589         if (size == -1) {
590                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
591                 return -1;
592         }
593
594         if (size <= repack_limit) {
595                 return 0;
596         }
597
598         printf("Repacking %s with %u freelist entries\n", name, size);
599
600         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
601                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
602                 return -1;
603         }
604
605         return 0;
606 }
607
608
609 /*
610   repack all our databases
611  */
612 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
613 {
614         struct ctdb_dbid_map *dbmap=NULL;
615         int ret, i;
616         /* a reasonable default limit to prevent us using too much memory */
617         uint32_t repack_limit = 10000; 
618
619         if (argc > 0) {
620                 repack_limit = atoi(argv[0]);
621         }
622
623         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
624         if (ret != 0) {
625                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
626                 return ret;
627         }
628
629         for (i=0;i<dbmap->num;i++) {
630                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
631                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
632                         DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
633                         return -1;
634                 }
635         }
636
637         return 0;
638 }