remove the old ctdb_attach() function and move all callers over to the new ctdb_attac...
[sahlberg/ctdb.git] / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "include/ctdb_protocol.h"
25 #include "include/ctdb_private.h"
26 #include "common/rb_tree.h"
27 #include "db_wrap.h"
28
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
31
32
33 /* 
34    a list of records to possibly delete
35  */
36 struct vacuum_data {
37         uint32_t vacuum_limit;
38         struct ctdb_context *ctdb;
39         struct ctdb_db_context *ctdb_db;
40         trbt_tree_t *delete_tree;
41         uint32_t delete_count;
42         struct ctdb_marshall_buffer **list;
43         bool traverse_error;
44         uint32_t total;
45 };
46
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49         struct ctdb_context *ctdb;
50         struct ctdb_db_context *ctdb_db;
51         struct ctdb_ltdb_header hdr;
52         TDB_DATA key;
53 };
54
55 /*
56   traverse function for vacuuming
57  */
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
59 {
60         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61         struct ctdb_context *ctdb = vdata->ctdb;
62         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
63         uint32_t lmaster;
64         struct ctdb_ltdb_header *hdr;
65         struct ctdb_rec_data *rec;
66         size_t old_size;
67                
68         lmaster = ctdb_lmaster(ctdb, &key);
69         if (lmaster >= ctdb->vnn_map->size) {
70                 return 0;
71         }
72
73         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74                 /* its not a deleted record */
75                 return 0;
76         }
77
78         hdr = (struct ctdb_ltdb_header *)data.dptr;
79
80         if (hdr->dmaster != ctdb->pnn) {
81                 return 0;
82         }
83
84
85         /* is this a records we could possibly delete? I.e.
86            if the record is empty and also we are both lmaster
87            and dmaster for the record we should be able to delete it
88         */
89         if ( (lmaster == ctdb->pnn)
90            &&( (vdata->delete_count < vdata->vacuum_limit)
91              ||(vdata->vacuum_limit == 0) ) ){
92                 uint32_t hash;
93
94                 hash = ctdb_hash(&key);
95                 if (trbt_lookup32(vdata->delete_tree, hash)) {
96                         DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
97                 } else {
98                         struct delete_record_data *dd;
99
100                         /* store key and header indexed by the key hash */
101                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
102                         if (dd == NULL) {
103                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
104                                 return -1;
105                         }
106                         dd->ctdb      = ctdb;
107                         dd->ctdb_db   = ctdb_db;
108                         dd->key.dsize = key.dsize;
109                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
110                         if (dd->key.dptr == NULL) {
111                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
112                                 return -1;
113                         }
114
115                         dd->hdr = *hdr;
116
117         
118                         trbt_insert32(vdata->delete_tree, hash, dd);
119
120                         vdata->delete_count++;
121                 }
122         }
123
124
125         /* add the record to the blob ready to send to the nodes */
126         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
127         if (rec == NULL) {
128                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129                 vdata->traverse_error = true;
130                 return -1;
131         }
132         old_size = talloc_get_size(vdata->list[lmaster]);
133         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
134                                                    old_size + rec->length);
135         if (vdata->list[lmaster] == NULL) {
136                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137                 vdata->traverse_error = true;
138                 return -1;
139         }
140         vdata->list[lmaster]->count++;
141         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
142         talloc_free(rec);
143
144         vdata->total++;
145
146         /* don't gather too many records */
147         if (vdata->vacuum_limit != 0 &&
148             vdata->total == vdata->vacuum_limit) {
149                 return -1;
150         }
151
152         return 0;
153 }
154
155 struct delete_records_list {
156         struct ctdb_marshall_buffer *records;
157 };
158
159 /*
160  traverse the tree of records to delete and marshall them into
161  a blob
162 */
163 static void
164 delete_traverse(void *param, void *data)
165 {
166         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168         struct ctdb_rec_data *rec;
169         size_t old_size;
170
171         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
172         if (rec == NULL) {
173                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
174                 return;
175         }
176
177         old_size = talloc_get_size(recs->records);
178         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179         if (recs->records == NULL) {
180                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
181                 return;
182         }
183         recs->records->count++;
184         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
185 }
186
187
188 static void delete_record(void *param, void *d)
189 {
190         struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
191         struct ctdb_context *ctdb = dd->ctdb;
192         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
193         uint32_t *count = (uint32_t *)param;
194         struct ctdb_ltdb_header *hdr;
195         TDB_DATA data;
196
197         /* its deleted on all other nodes - refetch, check and delete */
198         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
199                 /* the chain is busy - come back later */
200                 return;
201         }
202
203         data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
204         if (data.dptr == NULL) {
205                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
206                 return;
207         }
208         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
209                 free(data.dptr);
210                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
211                 return;
212         }
213
214         hdr = (struct ctdb_ltdb_header *)data.dptr;
215
216         /* if we are not the lmaster and the dmaster then skip the record */
217         if (hdr->dmaster != ctdb->pnn ||
218             ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
219             dd->hdr.rsn != hdr->rsn) {
220                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
221                 free(data.dptr);
222                 return;
223         }
224
225         ctdb_block_signal(SIGALRM);
226         tdb_delete(ctdb_db->ltdb->tdb, dd->key);
227         ctdb_unblock_signal(SIGALRM);
228         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
229         free(data.dptr);
230
231         (*count)++;
232 }
233
234 /* vacuum one database */
235 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
236                           bool persistent, uint32_t vacuum_limit)
237 {
238         struct ctdb_db_context *ctdb_db;
239         const char *name;
240         struct vacuum_data *vdata;
241         int ret, i;
242
243         vdata = talloc_zero(ctdb, struct vacuum_data);
244         if (vdata == NULL) {
245                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
246                 return -1;
247         }
248
249         vdata->ctdb = ctdb;
250         vdata->vacuum_limit = vacuum_limit;
251         vdata->delete_tree = trbt_create(vdata, 0);
252         if (vdata->delete_tree == NULL) {
253                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
254                 return -1;
255         }
256
257         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
258                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
259                 talloc_free(vdata);
260                 return -1;
261         }
262
263         ret = ctdb_attachdb(ctdb, name, persistent, 0, &ctdb_db);
264         if (ret != 0) {
265                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
266                 talloc_free(vdata);
267                 return -1;
268         }
269         vdata->ctdb_db = ctdb_db;
270
271         /* the list needs to be of length num_nodes */
272         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
273         if (vdata->list == NULL) {
274                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
275                 talloc_free(vdata);
276                 return -1;
277         }
278         for (i=0;i<ctdb->vnn_map->size;i++) {
279                 vdata->list[i] = (struct ctdb_marshall_buffer *)
280                         talloc_zero_size(vdata->list, 
281                                     offsetof(struct ctdb_marshall_buffer, data));
282                 if (vdata->list[i] == NULL) {
283                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
284                         talloc_free(vdata);
285                         return -1;
286                 }
287                 vdata->list[i]->db_id = db_id;
288         }
289
290         /* traverse, looking for records that might be able to be vacuumed */
291         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
292             vdata->traverse_error) {
293                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
294                 talloc_free(vdata);
295                 return -1;              
296         }
297
298
299         for (i=0;i<ctdb->vnn_map->size;i++) {
300                 if (vdata->list[i]->count == 0) {
301                         continue;
302                 }
303
304                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
305                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
306                         TDB_DATA data;
307                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
308
309                         data.dsize = talloc_get_size(vdata->list[i]);
310                         data.dptr  = (void *)vdata->list[i];
311                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
312                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
313                                          ctdb->vnn_map->map[i]));
314                                 talloc_free(vdata);
315                                 return -1;              
316                         }
317                         continue;
318                 }
319         }       
320
321
322         /* Process all records we can delete (if any) */
323         if (vdata->delete_count > 0) {
324                 struct delete_records_list *recs;
325                 TDB_DATA indata, outdata;
326                 int32_t res;
327                 uint32_t count;
328
329                 recs = talloc_zero(vdata, struct delete_records_list);
330                 if (recs == NULL) {
331                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
332                         return -1;
333                 }
334                 recs->records = (struct ctdb_marshall_buffer *)
335                         talloc_zero_size(vdata, 
336                                     offsetof(struct ctdb_marshall_buffer, data));
337                 if (recs->records == NULL) {
338                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
339                         return -1;
340                 }
341                 recs->records->db_id = db_id;
342
343                 /* traverse the tree of all records we want to delete and
344                    create a blob we can send to the other nodes.
345                 */
346                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
347
348                 indata.dsize = talloc_get_size(recs->records);
349                 indata.dptr  = (void *)recs->records;
350
351                 /* now tell all the other nodes to delete all these records
352                    (if possible)
353                  */
354                 for (i=0;i<ctdb->vnn_map->size;i++) {
355                         struct ctdb_marshall_buffer *records;
356                         struct ctdb_rec_data *rec;
357
358                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
359                                 /* we dont delete the records on the local node
360                                    just yet
361                                 */
362                                 continue;
363                         }
364
365                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
366                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
367                                         indata, recs, &outdata, &res,
368                                         NULL, NULL);
369                         if (ret != 0 || res != 0) {
370                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
371                                 exit(10);
372                         }
373
374                         /* outdata countains the list of records coming back
375                            from the node which the node could not delete
376                         */
377                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
378                         rec = (struct ctdb_rec_data *)&records->data[0];
379                         while (records->count-- > 1) {
380                                 TDB_DATA reckey, recdata;
381                                 struct ctdb_ltdb_header *rechdr;
382
383                                 reckey.dptr = &rec->data[0];
384                                 reckey.dsize = rec->keylen;
385                                 recdata.dptr = &rec->data[reckey.dsize];
386                                 recdata.dsize = rec->datalen;
387
388                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
389                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
390                                         exit(10);
391                                 }
392                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
393                                 recdata.dptr += sizeof(*rechdr);
394                                 recdata.dsize -= sizeof(*rechdr);
395
396                                 /* that other node couldnt delete the record
397                                    so we shouldnt delete it either.
398                                    remove it from the tree.
399                                 */
400                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
401
402                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
403                         }           
404                 }
405
406
407                 /* the only records remaining in the tree would be those
408                    records where all other nodes could successfully
409                    delete them, so we can now safely delete them on the
410                    lmaster as well.
411                 */
412                 count = 0;
413                 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
414                 if (vdata->delete_count != 0) {
415                         printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
416                 }
417         }
418
419         /* this ensures we run our event queue */
420         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
421
422         talloc_free(vdata);
423
424         return 0;
425 }
426
427
428 /*
429   vacuum all our databases
430  */
431 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
432 {
433         struct ctdb_dbid_map *dbmap=NULL;
434         struct ctdb_node_map *nodemap=NULL;
435         int ret, i, pnn;
436         uint32_t vacuum_limit = 0;
437
438         if (argc > 0) {
439                 vacuum_limit = atoi(argv[0]);
440         }
441
442         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
443         if (ret != 0) {
444                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
445                 return ret;
446         }
447
448         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
449         if (ret != 0) {
450                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
451                 return ret;
452         }
453
454         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
455         if (ret != 0) {
456                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
457                 return ret;
458         }
459
460         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
461         if (pnn == -1) {
462                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
463                 return -1;
464         }
465         ctdb->pnn = pnn;
466
467         for (i=0;i<dbmap->num;i++) {
468                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
469                                    dbmap->dbs[i].persistent, vacuum_limit) != 0) {
470                         DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
471                         return -1;
472                 }
473         }
474
475         return 0;
476 }
477
478 struct traverse_state {
479         bool error;
480         struct tdb_context *dest_db;
481 };
482
483 /*
484   traverse function for repacking
485  */
486 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
487 {
488         struct traverse_state *state = (struct traverse_state *)private;
489         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
490                 state->error = true;
491                 return -1;
492         }
493         return 0;
494 }
495
496 /*
497   repack a tdb
498  */
499 static int ctdb_repack_tdb(struct tdb_context *tdb)
500 {
501         struct tdb_context *tmp_db;
502         struct traverse_state state;
503
504         if (tdb_transaction_start(tdb) != 0) {
505                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
506                 return -1;
507         }
508
509         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
510                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
511                           O_RDWR|O_CREAT, 0);
512         if (tmp_db == NULL) {
513                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
514                 tdb_transaction_cancel(tdb);
515                 return -1;
516         }
517
518         state.error = false;
519         state.dest_db = tmp_db;
520
521         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
522                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
523                 tdb_transaction_cancel(tdb);
524                 tdb_close(tmp_db);
525                 return -1;              
526         }
527
528         if (state.error) {
529                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
530                 tdb_transaction_cancel(tdb);
531                 tdb_close(tmp_db);
532                 return -1;
533         }
534
535         if (tdb_wipe_all(tdb) != 0) {
536                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
537                 tdb_transaction_cancel(tdb);
538                 tdb_close(tmp_db);
539                 return -1;
540         }
541
542         state.error = false;
543         state.dest_db = tdb;
544
545         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
546                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
547                 tdb_transaction_cancel(tdb);
548                 tdb_close(tmp_db);
549                 return -1;              
550         }
551
552         if (state.error) {
553                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
554                 tdb_transaction_cancel(tdb);
555                 tdb_close(tmp_db);
556                 return -1;
557         }
558
559         tdb_close(tmp_db);
560
561         if (tdb_transaction_commit(tdb) != 0) {
562                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
563                 return -1;
564         }
565
566         return 0;
567 }
568
569
570 /* repack one database */
571 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
572                           bool persistent, uint32_t repack_limit)
573 {
574         struct ctdb_db_context *ctdb_db;
575         const char *name;
576         int ret, size;
577
578         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
579                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
580                 return -1;
581         }
582
583         ret = ctdb_attachdb(ctdb, name, persistent, 0, &ctdb_db);
584         if (ret != 0) {
585                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
586                 return -1;
587         }
588
589         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
590         if (size == -1) {
591                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
592                 return -1;
593         }
594
595         if (size <= repack_limit) {
596                 return 0;
597         }
598
599         printf("Repacking %s with %u freelist entries\n", name, size);
600
601         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
602                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
603                 return -1;
604         }
605
606         return 0;
607 }
608
609
610 /*
611   repack all our databases
612  */
613 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
614 {
615         struct ctdb_dbid_map *dbmap=NULL;
616         int ret, i;
617         /* a reasonable default limit to prevent us using too much memory */
618         uint32_t repack_limit = 10000; 
619
620         if (argc > 0) {
621                 repack_limit = atoi(argv[0]);
622         }
623
624         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
625         if (ret != 0) {
626                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
627                 return ret;
628         }
629
630         for (i=0;i<dbmap->num;i++) {
631                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
632                                    dbmap->dbs[i].persistent, repack_limit) != 0) {
633                         DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
634                         return -1;
635                 }
636         }
637
638         return 0;
639 }