ctdb_call: remove requirement that read-only requests be FETCH_WITH_HEADER
[rusty/ctdb.git] / tools / ctdb_vacuum.c
1 /* 
2    ctdb control tool - database vacuum 
3
4    Copyright (C) Andrew Tridgell  2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/network.h"
24 #include "../include/ctdb_client.h"
25 #include "../include/ctdb_private.h"
26 #include "../common/rb_tree.h"
27 #include "db_wrap.h"
28
29 /* should be tunable */
30 #define TIMELIMIT() timeval_current_ofs(10, 0)
31
32
33 /* 
34    a list of records to possibly delete
35  */
36 struct vacuum_data {
37         uint32_t vacuum_limit;
38         struct ctdb_context *ctdb;
39         struct ctdb_db_context *ctdb_db;
40         trbt_tree_t *delete_tree;
41         uint32_t delete_count;
42         struct ctdb_marshall_buffer **list;
43         bool traverse_error;
44         uint32_t total;
45 };
46
47 /* this structure contains the information for one record to be deleted */
48 struct delete_record_data {
49         struct ctdb_context *ctdb;
50         struct ctdb_db_context *ctdb_db;
51         struct ctdb_ltdb_header hdr;
52         TDB_DATA key;
53 };
54
55 /*
56   traverse function for vacuuming
57  */
58 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
59 {
60         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
61         struct ctdb_context *ctdb = vdata->ctdb;
62         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
63         uint32_t lmaster;
64         struct ctdb_ltdb_header *hdr;
65         struct ctdb_rec_data *rec;
66         size_t old_size;
67                
68         lmaster = ctdb_lmaster(ctdb, &key);
69         if (lmaster >= ctdb->vnn_map->size) {
70                 return 0;
71         }
72
73         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
74                 /* its not a deleted record */
75                 return 0;
76         }
77
78         hdr = (struct ctdb_ltdb_header *)data.dptr;
79
80         if (hdr->dmaster != ctdb->pnn) {
81                 return 0;
82         }
83
84
85         /* is this a records we could possibly delete? I.e.
86            if the record is empty and also we are both lmaster
87            and dmaster for the record we should be able to delete it
88         */
89         if ( (lmaster == ctdb->pnn)
90            &&( (vdata->delete_count < vdata->vacuum_limit)
91              ||(vdata->vacuum_limit == 0) ) ){
92                 uint32_t hash;
93
94                 hash = ctdb_hash(&key);
95                 if (trbt_lookup32(vdata->delete_tree, hash)) {
96                         DEBUG(DEBUG_INFO, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
97                 } else {
98                         struct delete_record_data *dd;
99
100                         /* store key and header indexed by the key hash */
101                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
102                         if (dd == NULL) {
103                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
104                                 return -1;
105                         }
106                         dd->ctdb      = ctdb;
107                         dd->ctdb_db   = ctdb_db;
108                         dd->key.dsize = key.dsize;
109                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
110                         if (dd->key.dptr == NULL) {
111                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
112                                 return -1;
113                         }
114
115                         dd->hdr = *hdr;
116
117         
118                         trbt_insert32(vdata->delete_tree, hash, dd);
119
120                         vdata->delete_count++;
121                 }
122         }
123
124
125         /* add the record to the blob ready to send to the nodes */
126         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
127         if (rec == NULL) {
128                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
129                 vdata->traverse_error = true;
130                 return -1;
131         }
132         old_size = talloc_get_size(vdata->list[lmaster]);
133         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
134                                                    old_size + rec->length);
135         if (vdata->list[lmaster] == NULL) {
136                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
137                 vdata->traverse_error = true;
138                 return -1;
139         }
140         vdata->list[lmaster]->count++;
141         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
142         talloc_free(rec);
143
144         vdata->total++;
145
146         /* don't gather too many records */
147         if (vdata->vacuum_limit != 0 &&
148             vdata->total == vdata->vacuum_limit) {
149                 return -1;
150         }
151
152         return 0;
153 }
154
155 struct delete_records_list {
156         struct ctdb_marshall_buffer *records;
157 };
158
159 /*
160  traverse the tree of records to delete and marshall them into
161  a blob
162 */
163 static int
164 delete_traverse(void *param, void *data)
165 {
166         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
167         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
168         struct ctdb_rec_data *rec;
169         size_t old_size;
170
171         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
172         if (rec == NULL) {
173                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
174                 return 0;
175         }
176
177         old_size = talloc_get_size(recs->records);
178         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
179         if (recs->records == NULL) {
180                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
181                 return 0;
182         }
183         recs->records->count++;
184         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
185         return 0;
186 }
187
188
189 static int delete_record(void *param, void *d)
190 {
191         struct delete_record_data *dd = talloc_get_type(d, struct delete_record_data);
192         struct ctdb_context *ctdb = dd->ctdb;
193         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
194         uint32_t *count = (uint32_t *)param;
195         struct ctdb_ltdb_header *hdr;
196         TDB_DATA data;
197
198         /* its deleted on all other nodes - refetch, check and delete */
199         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key) != 0) {
200                 /* the chain is busy - come back later */
201                 return 0;
202         }
203
204         data = tdb_fetch(ctdb_db->ltdb->tdb, dd->key);
205         if (data.dptr == NULL) {
206                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
207                 return 0;
208         }
209         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
210                 free(data.dptr);
211                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
212                 return 0;
213         }
214
215         hdr = (struct ctdb_ltdb_header *)data.dptr;
216
217         /* if we are not the lmaster and the dmaster then skip the record */
218         if (hdr->dmaster != ctdb->pnn ||
219             ctdb_lmaster(ctdb, &(dd->key)) != ctdb->pnn ||
220             dd->hdr.rsn != hdr->rsn) {
221                 tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
222                 free(data.dptr);
223                 return 0;
224         }
225
226         ctdb_block_signal(SIGALRM);
227         tdb_delete(ctdb_db->ltdb->tdb, dd->key);
228         ctdb_unblock_signal(SIGALRM);
229         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
230         free(data.dptr);
231
232         (*count)++;
233         return 0;
234 }
235
236 /* vacuum one database */
237 static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
238                           bool persistent, uint32_t vacuum_limit)
239 {
240         struct ctdb_db_context *ctdb_db;
241         const char *name;
242         struct vacuum_data *vdata;
243         int i;
244
245         vdata = talloc_zero(ctdb, struct vacuum_data);
246         if (vdata == NULL) {
247                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
248                 return -1;
249         }
250
251         vdata->ctdb = ctdb;
252         vdata->vacuum_limit = vacuum_limit;
253         vdata->delete_tree = trbt_create(vdata, 0);
254         if (vdata->delete_tree == NULL) {
255                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
256                 return -1;
257         }
258
259         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
261                 talloc_free(vdata);
262                 return -1;
263         }
264
265         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), name, persistent, 0);
266         if (ctdb_db == NULL) {
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
268                 talloc_free(vdata);
269                 return -1;
270         }
271         vdata->ctdb_db = ctdb_db;
272
273         /* the list needs to be of length num_nodes */
274         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
275         if (vdata->list == NULL) {
276                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
277                 talloc_free(vdata);
278                 return -1;
279         }
280         for (i=0;i<ctdb->vnn_map->size;i++) {
281                 vdata->list[i] = (struct ctdb_marshall_buffer *)
282                         talloc_zero_size(vdata->list, 
283                                     offsetof(struct ctdb_marshall_buffer, data));
284                 if (vdata->list[i] == NULL) {
285                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
286                         talloc_free(vdata);
287                         return -1;
288                 }
289                 vdata->list[i]->db_id = db_id;
290         }
291
292         /* traverse, looking for records that might be able to be vacuumed */
293         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
294             vdata->traverse_error) {
295                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
296                 talloc_free(vdata);
297                 return -1;              
298         }
299
300
301         for (i=0;i<ctdb->vnn_map->size;i++) {
302                 if (vdata->list[i]->count == 0) {
303                         continue;
304                 }
305
306                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
307                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
308                         TDB_DATA data;
309                         printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
310
311                         data.dsize = talloc_get_size(vdata->list[i]);
312                         data.dptr  = (void *)vdata->list[i];
313                         if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
314                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
315                                          ctdb->vnn_map->map[i]));
316                                 talloc_free(vdata);
317                                 return -1;              
318                         }
319                         continue;
320                 }
321         }       
322
323
324         /* Process all records we can delete (if any) */
325         if (vdata->delete_count > 0) {
326                 struct delete_records_list *recs;
327                 TDB_DATA indata, outdata;
328                 int ret;
329                 int32_t res;
330                 uint32_t count;
331
332                 recs = talloc_zero(vdata, struct delete_records_list);
333                 if (recs == NULL) {
334                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
335                         return -1;
336                 }
337                 recs->records = (struct ctdb_marshall_buffer *)
338                         talloc_zero_size(vdata, 
339                                     offsetof(struct ctdb_marshall_buffer, data));
340                 if (recs->records == NULL) {
341                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
342                         return -1;
343                 }
344                 recs->records->db_id = db_id;
345
346                 /* traverse the tree of all records we want to delete and
347                    create a blob we can send to the other nodes.
348                 */
349                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
350
351                 indata.dsize = talloc_get_size(recs->records);
352                 indata.dptr  = (void *)recs->records;
353
354                 /* now tell all the other nodes to delete all these records
355                    (if possible)
356                  */
357                 for (i=0;i<ctdb->vnn_map->size;i++) {
358                         struct ctdb_marshall_buffer *records;
359                         struct ctdb_rec_data *rec;
360
361                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
362                                 /* we dont delete the records on the local node
363                                    just yet
364                                 */
365                                 continue;
366                         }
367
368                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
369                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
370                                         indata, recs, &outdata, &res,
371                                         NULL, NULL);
372                         if (ret != 0 || res != 0) {
373                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
374                                 exit(10);
375                         }
376
377                         /* outdata countains the list of records coming back
378                            from the node which the node could not delete
379                         */
380                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
381                         rec = (struct ctdb_rec_data *)&records->data[0];
382                         while (records->count-- > 1) {
383                                 TDB_DATA reckey, recdata;
384                                 struct ctdb_ltdb_header *rechdr;
385
386                                 reckey.dptr = &rec->data[0];
387                                 reckey.dsize = rec->keylen;
388                                 recdata.dptr = &rec->data[reckey.dsize];
389                                 recdata.dsize = rec->datalen;
390
391                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
392                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
393                                         exit(10);
394                                 }
395                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
396                                 recdata.dptr += sizeof(*rechdr);
397                                 recdata.dsize -= sizeof(*rechdr);
398
399                                 /* that other node couldnt delete the record
400                                    so we shouldnt delete it either.
401                                    remove it from the tree.
402                                 */
403                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
404
405                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
406                         }           
407                 }
408
409
410                 /* the only records remaining in the tree would be those
411                    records where all other nodes could successfully
412                    delete them, so we can now safely delete them on the
413                    lmaster as well.
414                 */
415                 count = 0;
416                 trbt_traversearray32(vdata->delete_tree, 1, delete_record, &count);
417                 if (vdata->delete_count != 0) {
418                         printf("Deleted %u records out of %u on this node from '%s'\n", count, vdata->delete_count, name);
419                 }
420         }
421
422         /* this ensures we run our event queue */
423         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
424
425         talloc_free(vdata);
426
427         return 0;
428 }
429
430
431 /*
432   vacuum all our databases
433  */
434 int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
435 {
436         struct ctdb_dbid_map *dbmap=NULL;
437         struct ctdb_node_map *nodemap=NULL;
438         int ret, i, pnn;
439         uint32_t vacuum_limit = 0;
440
441         if (argc > 0) {
442                 vacuum_limit = atoi(argv[0]);
443         }
444
445         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
446         if (ret != 0) {
447                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
448                 return ret;
449         }
450
451         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
452         if (ret != 0) {
453                 DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
454                 return ret;
455         }
456
457         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
458         if (ret != 0) {
459                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
460                 return ret;
461         }
462
463         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
464         if (pnn == -1) {
465                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
466                 return -1;
467         }
468         ctdb->pnn = pnn;
469
470         for (i=0;i<dbmap->num;i++) {
471                 if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap, 
472                                    dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, vacuum_limit) != 0) {
473                         DEBUG(DEBUG_ERR,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
474                         return -1;
475                 }
476         }
477
478         return 0;
479 }
480
481 struct traverse_state {
482         bool error;
483         struct tdb_context *dest_db;
484 };
485
486 /*
487   traverse function for repacking
488  */
489 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
490 {
491         struct traverse_state *state = (struct traverse_state *)private;
492         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
493                 state->error = true;
494                 return -1;
495         }
496         return 0;
497 }
498
499 /*
500   repack a tdb
501  */
502 static int ctdb_repack_tdb(struct tdb_context *tdb)
503 {
504         struct tdb_context *tmp_db;
505         struct traverse_state state;
506
507         if (tdb_transaction_start(tdb) != 0) {
508                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
509                 return -1;
510         }
511
512         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
513                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
514                           O_RDWR|O_CREAT, 0);
515         if (tmp_db == NULL) {
516                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
517                 tdb_transaction_cancel(tdb);
518                 return -1;
519         }
520
521         state.error = false;
522         state.dest_db = tmp_db;
523
524         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
525                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
526                 tdb_transaction_cancel(tdb);
527                 tdb_close(tmp_db);
528                 return -1;              
529         }
530
531         if (state.error) {
532                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
533                 tdb_transaction_cancel(tdb);
534                 tdb_close(tmp_db);
535                 return -1;
536         }
537
538         if (tdb_wipe_all(tdb) != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
540                 tdb_transaction_cancel(tdb);
541                 tdb_close(tmp_db);
542                 return -1;
543         }
544
545         state.error = false;
546         state.dest_db = tdb;
547
548         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
549                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
550                 tdb_transaction_cancel(tdb);
551                 tdb_close(tmp_db);
552                 return -1;              
553         }
554
555         if (state.error) {
556                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
557                 tdb_transaction_cancel(tdb);
558                 tdb_close(tmp_db);
559                 return -1;
560         }
561
562         tdb_close(tmp_db);
563
564         if (tdb_transaction_commit(tdb) != 0) {
565                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
566                 return -1;
567         }
568
569         return 0;
570 }
571
572
573 /* repack one database */
574 static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id, 
575                           bool persistent, uint32_t repack_limit)
576 {
577         struct ctdb_db_context *ctdb_db;
578         const char *name;
579         int size;
580
581         if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
582                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", db_id));
583                 return -1;
584         }
585
586         ctdb_db = ctdb_attach(ctdb, TIMELIMIT(), name, persistent, 0);
587         if (ctdb_db == NULL) {
588                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
589                 return -1;
590         }
591
592         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
593         if (size == -1) {
594                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
595                 return -1;
596         }
597
598         if (size <= repack_limit) {
599                 return 0;
600         }
601
602         printf("Repacking %s with %u freelist entries\n", name, size);
603
604         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
605                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
606                 return -1;
607         }
608
609         return 0;
610 }
611
612
613 /*
614   repack all our databases
615  */
616 int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
617 {
618         struct ctdb_dbid_map *dbmap=NULL;
619         int ret, i;
620         /* a reasonable default limit to prevent us using too much memory */
621         uint32_t repack_limit = 10000; 
622
623         if (argc > 0) {
624                 repack_limit = atoi(argv[0]);
625         }
626
627         ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
628         if (ret != 0) {
629                 DEBUG(DEBUG_ERR, ("Unable to get dbids from local node\n"));
630                 return ret;
631         }
632
633         for (i=0;i<dbmap->num;i++) {
634                 if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid, 
635                                    dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT, repack_limit) != 0) {
636                         DEBUG(DEBUG_ERR,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
637                         return -1;
638                 }
639         }
640
641         return 0;
642 }