91ea74fc5e4bd7bb188dae6d445721860dffb7ea
[sahlberg/ctdb.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/events/events.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_child_context *next, *prev;
40         struct ctdb_vacuum_handle *vacuum_handle;
41         /* fd child writes status to */
42         int fd[2];
43         /* fd to abort vacuuming. */
44         int abortfd[2];
45         pid_t child_pid;
46         enum vacuum_child_status status;
47         struct timeval start_time;
48 };
49
50 struct ctdb_vacuum_handle {
51         struct ctdb_db_context *ctdb_db;
52         struct ctdb_vacuum_child_context *child_ctx;
53 };
54
55
56 /*  a list of records to possibly delete */
57 struct vacuum_data {
58         uint32_t vacuum_limit;
59         uint32_t repack_limit;
60         struct ctdb_context *ctdb;
61         struct ctdb_db_context *ctdb_db;
62         struct tdb_context *dest_db;
63         trbt_tree_t *delete_tree;
64         uint32_t delete_count;
65         struct ctdb_marshall_buffer **list;
66         struct timeval start;
67         bool traverse_error;
68         bool vacuum;
69         uint32_t total;
70         uint32_t vacuumed;
71         uint32_t copied;
72         int abortfd;
73         bool abort;
74 };
75
76 /* tuning information stored for every db */
77 struct vacuum_tuning_data {
78         uint32_t last_num_repack;
79         uint32_t last_num_empty;
80         uint32_t last_interval;
81         uint32_t new_interval;
82         struct timeval last_start;
83         double   last_duration;
84 };
85
86 /* this structure contains the information for one record to be deleted */
87 struct delete_record_data {
88         struct ctdb_context *ctdb;
89         struct ctdb_db_context *ctdb_db;
90         struct ctdb_ltdb_header hdr;
91         TDB_DATA key;
92 };
93
94 struct delete_records_list {
95         struct ctdb_marshall_buffer *records;
96 };
97
98 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
99                                                           struct timeval t, void *private_data);
100
101
102 /*
103  * traverse function for gathering the records that can be deleted
104  */
105 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
106 {
107         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
108         struct ctdb_context *ctdb = vdata->ctdb;
109         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
110         uint32_t lmaster;
111         struct ctdb_ltdb_header *hdr;
112         struct ctdb_rec_data *rec;
113         size_t old_size;
114         char c;
115
116         /* Should we abort? */
117         if (read(vdata->abortfd, &c, 1) == 1) {
118                 DEBUG(DEBUG_INFO, ("Abort during vacuum_traverse for %s\n",
119                                    ctdb_db->db_name));
120                 vdata->abort = true;
121                 return -1;
122         }
123                
124         lmaster = ctdb_lmaster(ctdb, &key);
125         if (lmaster >= ctdb->vnn_map->size) {
126                 return 0;
127         }
128
129         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
130                 /* its not a deleted record */
131                 return 0;
132         }
133
134         hdr = (struct ctdb_ltdb_header *)data.dptr;
135
136         if (hdr->dmaster != ctdb->pnn) {
137                 return 0;
138         }
139
140         /* is this a records we could possibly delete? I.e.
141            if the record is empty and also we are both lmaster
142            and dmaster for the record we should be able to delete it
143         */
144         if (lmaster == ctdb->pnn) {
145                 uint32_t hash;
146
147                 hash = ctdb_hash(&key);
148                 if (trbt_lookup32(vdata->delete_tree, hash)) {
149                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
150                 } 
151                 else {
152                         struct delete_record_data *dd;
153
154                         /* store key and header indexed by the key hash */
155                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
156                         if (dd == NULL) {
157                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
158                                 return -1;
159                         }
160                         dd->ctdb      = ctdb;
161                         dd->ctdb_db   = ctdb_db;
162                         dd->key.dsize = key.dsize;
163                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
164                         if (dd->key.dptr == NULL) {
165                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
166                                 return -1;
167                         }
168
169                         dd->hdr = *hdr;
170         
171                         trbt_insert32(vdata->delete_tree, hash, dd);
172
173                         vdata->delete_count++;
174                 }
175         }
176
177         /* add the record to the blob ready to send to the nodes */
178         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
179         if (rec == NULL) {
180                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
181                 vdata->traverse_error = true;
182                 return -1;
183         }
184         old_size = talloc_get_size(vdata->list[lmaster]);
185         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
186                                                    old_size + rec->length);
187         if (vdata->list[lmaster] == NULL) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
189                 vdata->traverse_error = true;
190                 return -1;
191         }
192         vdata->list[lmaster]->count++;
193         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
194         talloc_free(rec);
195
196         vdata->total++;
197
198         return 0;
199 }
200
201 /*
202  * traverse the tree of records to delete and marshall them into
203  * a blob
204  */
205 static void delete_traverse(void *param, void *data)
206 {
207         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
208         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
209         struct ctdb_rec_data *rec;
210         size_t old_size;
211
212         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
213         if (rec == NULL) {
214                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
215                 return;
216         }
217
218         old_size = talloc_get_size(recs->records);
219         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
220         if (recs->records == NULL) {
221                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
222                 return;
223         }
224         recs->records->count++;
225         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
226 }
227
228 /* 
229  * read-only traverse the database in order to find
230  * records that can be deleted and try to delete these
231  * records on the other nodes
232  * this executes in the child context
233  */
234 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
235 {
236         struct ctdb_context *ctdb = ctdb_db->ctdb;
237         const char *name = ctdb_db->db_name;
238         int ret, i, pnn;
239
240         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
241         if (ret != 0) {
242                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
243                 return ret;
244         }
245
246         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
247         if (pnn == -1) {
248                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
249                 return -1;
250         }
251
252         ctdb->pnn = pnn;
253         /* the list needs to be of length num_nodes */
254         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
255         if (vdata->list == NULL) {
256                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
257                 return -1;
258         }
259         for (i = 0; i < ctdb->vnn_map->size; i++) {
260                 vdata->list[i] = (struct ctdb_marshall_buffer *)
261                         talloc_zero_size(vdata->list, 
262                                                          offsetof(struct ctdb_marshall_buffer, data));
263                 if (vdata->list[i] == NULL) {
264                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
265                         return -1;
266                 }
267                 vdata->list[i]->db_id = ctdb_db->db_id;
268         }
269
270         /* read-only traverse, looking for records that might be able to be vacuumed */
271         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
272             vdata->traverse_error) {
273                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
274                 return -1;              
275         }
276         if (vdata->abort) {
277                 DEBUG(DEBUG_INFO,("Traverse aborted vacuuming '%s'\n", name));
278                 return -1;
279         }
280         for ( i = 0; i < ctdb->vnn_map->size; i++) {
281                 if (vdata->list[i]->count == 0) {
282                         continue;
283                 }
284
285                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
286                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
287                         TDB_DATA data;
288                         DEBUG(DEBUG_INFO,("Found %u records for lmaster %u in '%s'\n", 
289                                                                 vdata->list[i]->count, i, name));
290
291                         data.dsize = talloc_get_size(vdata->list[i]);
292                         data.dptr  = (void *)vdata->list[i];
293                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
294                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
295                                          ctdb->vnn_map->map[i]));
296                                 return -1;              
297                         }
298                         continue;
299                 }
300         }       
301
302         /* Process all records we can delete (if any) */
303         if (vdata->delete_count > 0) {
304                 struct delete_records_list *recs;
305                 TDB_DATA indata, outdata;
306                 int32_t res;
307
308                 recs = talloc_zero(vdata, struct delete_records_list);
309                 if (recs == NULL) {
310                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
311                         return -1;
312                 }
313                 recs->records = (struct ctdb_marshall_buffer *)
314                         talloc_zero_size(vdata, 
315                                     offsetof(struct ctdb_marshall_buffer, data));
316                 if (recs->records == NULL) {
317                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
318                         return -1;
319                 }
320                 recs->records->db_id = ctdb_db->db_id;
321
322                 /* 
323                  * traverse the tree of all records we want to delete and
324                  * create a blob we can send to the other nodes.
325                  */
326                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
327
328                 indata.dsize = talloc_get_size(recs->records);
329                 indata.dptr  = (void *)recs->records;
330
331                 /* 
332                  * now tell all the other nodes to delete all these records
333                  * (if possible)
334                  */
335                 for (i = 0; i < ctdb->vnn_map->size; i++) {
336                         struct ctdb_marshall_buffer *records;
337                         struct ctdb_rec_data *rec;
338                         char c;
339
340                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
341                                 /* we dont delete the records on the local node just yet */
342                                 continue;
343                         }
344
345                         /* Should we abort? */
346                         if (read(vdata->abortfd, &c, 1) == 1) {
347                                 DEBUG(DEBUG_INFO,("Aborted vacuuming '%s'\n", name));
348                                 return -1;
349                         }
350
351                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
352                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
353                                         indata, recs, &outdata, &res,
354                                         NULL, NULL);
355                         if (ret != 0 || res != 0) {
356                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
357                                 return -1;
358                         }
359
360                         /* 
361                          * outdata countains the list of records coming back
362                          * from the node which the node could not delete
363                          */
364                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
365                         rec = (struct ctdb_rec_data *)&records->data[0];
366                         while (records->count-- > 1) {
367                                 TDB_DATA reckey, recdata;
368                                 struct ctdb_ltdb_header *rechdr;
369
370                                 reckey.dptr = &rec->data[0];
371                                 reckey.dsize = rec->keylen;
372                                 recdata.dptr = &rec->data[reckey.dsize];
373                                 recdata.dsize = rec->datalen;
374
375                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
376                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
377                                         return -1;
378                                 }
379                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
380                                 recdata.dptr += sizeof(*rechdr);
381                                 recdata.dsize -= sizeof(*rechdr);
382
383                                 /* 
384                                  * that other node couldnt delete the record
385                                  * so we should delete it and thereby remove it from the tree
386                                  */
387                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
388
389                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
390                         }           
391                 }
392
393                 /* 
394                  * The only records remaining in the tree would be those
395                  * records where all other nodes could successfully
396                  * delete them, so we can safely delete them on the
397                  * lmaster as well. Deletion implictely happens while
398                  * we repack the database. The repack algorithm revisits 
399                  * the tree in order to find the records that don't need
400                  * to be copied / repacked.
401                  */
402         }
403
404         /* this ensures we run our event queue */
405         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
406
407         return 0;
408 }
409
410
411 /*
412  * traverse function for repacking
413  */
414 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
415 {
416         struct vacuum_data *vdata = (struct vacuum_data *)private;
417
418         if (vdata->vacuum) {
419                 uint32_t hash = ctdb_hash(&key);
420                 struct delete_record_data *kd;
421                 /*
422                  * check if we can ignore this record because it's in the delete_tree
423                  */
424                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
425                 /*
426                  * there might be hash collisions so we have to compare the keys here to be sure
427                  */
428                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
429                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
430                         /*
431                          * we have to check if the record hasn't changed in the meantime in order to
432                          * savely remove it from the database
433                          */
434                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
435                                 hdr->dmaster == kd->ctdb->pnn &&
436                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
437                                 kd->hdr.rsn == hdr->rsn) {
438                                 vdata->vacuumed++;
439                                 return 0;
440                         }
441                 }
442         }
443         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
444                 vdata->traverse_error = true;
445                 return -1;
446         }
447         vdata->copied++;
448         return 0;
449 }
450
451 /*
452  * repack a tdb
453  */
454 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
455 {
456         struct tdb_context *tmp_db;
457
458         if (tdb_transaction_start(tdb) != 0) {
459                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
460                 return -1;
461         }
462
463         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
464                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
465                           O_RDWR|O_CREAT, 0);
466         if (tmp_db == NULL) {
467                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
468                 tdb_transaction_cancel(tdb);
469                 return -1;
470         }
471
472         vdata->traverse_error = false;
473         vdata->dest_db = tmp_db;
474         vdata->vacuum = true;
475         vdata->vacuumed = 0;
476         vdata->copied = 0;
477
478         /*
479          * repack and vacuum on-the-fly by not writing the records that are
480          * no longer needed
481          */
482         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
483                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
484                 tdb_transaction_cancel(tdb);
485                 tdb_close(tmp_db);
486                 return -1;              
487         }
488
489         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
490         
491         if (vdata->traverse_error) {
492                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
493                 tdb_transaction_cancel(tdb);
494                 tdb_close(tmp_db);
495                 return -1;
496         }
497
498         if (tdb_wipe_all(tdb) != 0) {
499                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
500                 tdb_transaction_cancel(tdb);
501                 tdb_close(tmp_db);
502                 return -1;
503         }
504
505         vdata->traverse_error = false;
506         vdata->dest_db = tdb;
507         vdata->vacuum = false;
508         vdata->copied = 0;
509
510         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
511                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
512                 tdb_transaction_cancel(tdb);
513                 tdb_close(tmp_db);
514                 return -1;              
515         }
516
517         if (vdata->traverse_error) {
518                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
519                 tdb_transaction_cancel(tdb);
520                 tdb_close(tmp_db);
521                 return -1;
522         }
523
524         tdb_close(tmp_db);
525
526
527         if (tdb_transaction_commit(tdb) != 0) {
528                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
529                 return -1;
530         }
531         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
532
533         return 0;
534 }
535
536 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
537 {
538         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
539         TDB_CONTEXT *tune_tdb;
540         TDB_DATA key, value;
541         struct vacuum_tuning_data tdata;
542         struct vacuum_tuning_data *tptr;
543         char *vac_dbname;
544         int flags;
545
546         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
547                                      ctdb_db->ctdb->db_directory_state,
548                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
549         if (vac_dbname == NULL) {
550                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
551                 talloc_free(tmp_ctx);
552                 return -1;
553         }
554
555         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
556         flags |= TDB_DISALLOW_NESTING;
557         tune_tdb = tdb_open(vac_dbname, 0,
558                             flags,
559                             O_RDWR|O_CREAT, 0600);
560         if (tune_tdb == NULL) {
561                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
562                 talloc_free(tmp_ctx);
563                 return -1;
564         }
565         
566         if (tdb_transaction_start(tune_tdb) != 0) {
567                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
568                 tdb_close(tune_tdb);
569                 return -1;
570         }
571         key.dptr = discard_const(ctdb_db->db_name);
572         key.dsize = strlen(ctdb_db->db_name);
573         value = tdb_fetch(tune_tdb, key);
574
575         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
576                 tptr = (struct vacuum_tuning_data *)value.dptr;
577                 tdata = *tptr;
578
579                 /*
580                  * re-calc new vacuum interval:
581                  * in case no limit was reached we continously increase the interval
582                  * until vacuum_max_interval is reached
583                  * in case a limit was reached we divide the current interval by 2
584                  * unless vacuum_min_interval is reached
585                  */
586                 if (freelist < vdata->repack_limit &&
587                     vdata->delete_count < vdata->vacuum_limit) {
588                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
589                                 tdata.new_interval = tdata.last_interval * 110 / 100;
590                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
591                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
592                         }
593                 } else {
594                         tdata.new_interval = tdata.last_interval / 2;
595                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
596                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
597                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
598                         }               
599                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
600                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
601                 }
602                 tdata.last_interval = tdata.new_interval;
603         } else {
604                 DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
605                 tdata.last_num_repack = freelist;
606                 tdata.last_num_empty = vdata->delete_count;
607                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
608         }
609
610         if (value.dptr != NULL) {
611                 free(value.dptr);
612         }
613
614         tdata.last_start = vdata->start;
615         tdata.last_duration = timeval_elapsed(&vdata->start);
616
617         value.dptr = (unsigned char *)&tdata;
618         value.dsize = sizeof(tdata);
619
620         if (tdb_store(tune_tdb, key, value, 0) != 0) {
621                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
622                 tdb_transaction_cancel(tune_tdb);
623                 tdb_close(tune_tdb);
624                 talloc_free(tmp_ctx);
625                 return -1;
626         }
627         tdb_transaction_commit(tune_tdb);
628         tdb_close(tune_tdb);
629         talloc_free(tmp_ctx);
630
631         return 0;
632 }
633
634 /*
635  * repack and vaccum a db
636  * called from the child context
637  */
638 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, int abortfd, TALLOC_CTX *mem_ctx)
639 {
640         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
641         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
642         const char *name = ctdb_db->db_name;
643         int size;
644         struct vacuum_data *vdata;
645
646         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
647         if (size == -1) {
648                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
649                 return -1;
650         }
651
652         vdata = talloc_zero(mem_ctx, struct vacuum_data);
653         if (vdata == NULL) {
654                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
655                 return -1;
656         }
657
658         vdata->ctdb = ctdb_db->ctdb;
659         vdata->vacuum_limit = vacuum_limit;
660         vdata->repack_limit = repack_limit;
661         vdata->delete_tree = trbt_create(vdata, 0);
662         vdata->abortfd = abortfd;
663         vdata->ctdb_db = ctdb_db;
664         if (vdata->delete_tree == NULL) {
665                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
666                 talloc_free(vdata);
667                 return -1;
668         }
669
670         vdata->start = timeval_current();
671  
672         /*
673          * gather all records that can be deleted in vdata
674          */
675         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
676                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
677         }
678
679         /*
680          * decide if a repack is necessary
681          */
682         if (vdata->abort || (size < repack_limit && vdata->delete_count < vacuum_limit)) {
683                 update_tuning_db(ctdb_db, vdata, size);
684                 talloc_free(vdata);
685                 return 0;
686         }
687
688         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
689                         name, size, vdata->delete_count));
690
691         /*
692          * repack and implicitely get rid of the records we can delete
693          */
694         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
695                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
696                 update_tuning_db(ctdb_db, vdata, size);
697                 talloc_free(vdata);
698                 return -1;
699         }
700         update_tuning_db(ctdb_db, vdata, size);
701         talloc_free(vdata);
702
703         return 0;
704 }
705
706 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
707 {
708         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
709         TDB_CONTEXT *tdb;
710         TDB_DATA key, value;
711         char *vac_dbname;
712         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
713         struct ctdb_context *ctdb = ctdb_db->ctdb;
714         int flags;
715
716         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
717         if (vac_dbname == NULL) {
718                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
719                 talloc_free(tmp_ctx);
720                 return interval;
721         }
722
723         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
724         flags |= TDB_DISALLOW_NESTING;
725         tdb = tdb_open(vac_dbname, 0,
726                        flags,
727                        O_RDWR|O_CREAT, 0600);
728         if (!tdb) {
729                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
730                 talloc_free(tmp_ctx);
731                 return interval;
732         }
733
734         key.dptr = discard_const(ctdb_db->db_name);
735         key.dsize = strlen(ctdb_db->db_name);
736
737         value = tdb_fetch(tdb, key);
738
739         if (value.dptr != NULL) {
740                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
741                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
742
743                         interval = tptr->new_interval;
744
745                         if (interval < ctdb->tunable.vacuum_min_interval) {
746                                 interval = ctdb->tunable.vacuum_min_interval;
747                         } 
748                         if (interval > ctdb->tunable.vacuum_max_interval) {
749                                 interval = ctdb->tunable.vacuum_max_interval;
750                         }
751                 }
752                 free(value.dptr);
753         }
754         tdb_close(tdb);
755
756         talloc_free(tmp_ctx);
757
758         return interval;
759 }
760
761 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
762 {
763         double l = timeval_elapsed(&child_ctx->start_time);
764         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
765         struct ctdb_context *ctdb = ctdb_db->ctdb;
766
767         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
768
769         if (child_ctx->child_pid != -1) {
770                 kill(child_ctx->child_pid, SIGKILL);
771         }
772
773         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
774
775         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
776                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
777                         ctdb_vacuum_event, child_ctx->vacuum_handle);
778
779         return 0;
780 }
781
782 /*
783  * this event is generated when a vacuum child process times out
784  */
785 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
786                                          struct timeval t, void *private_data)
787 {
788         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
789
790         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
791
792         child_ctx->status = VACUUM_TIMEOUT;
793
794         talloc_free(child_ctx);
795 }
796
797
798 /*
799  * this event is generated when a vacuum child process has completed
800  */
801 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
802                              uint16_t flags, void *private_data)
803 {
804         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
805         char c = 0;
806         int ret;
807
808         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
809         child_ctx->child_pid = -1;
810
811         ret = read(child_ctx->fd[0], &c, 1);
812         if (ret != 1 || c != 0) {
813                 child_ctx->status = VACUUM_ERROR;
814                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
815         } else {
816                 child_ctx->status = VACUUM_OK;
817         }
818
819         talloc_free(child_ctx);
820 }
821
822 /*
823  * this event is called every time we need to start a new vacuum process
824  */
825 static void
826 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
827                                struct timeval t, void *private_data)
828 {
829         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
830         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
831         struct ctdb_context *ctdb = ctdb_db->ctdb;
832         struct ctdb_vacuum_child_context *child_ctx;
833         int ret;
834
835         /* we dont vacuum if we are in recovery mode, or db frozen */
836         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
837             ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
838                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
839                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
840                                    : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
841                                    ? "freeze pending"
842                                    : "frozen"));
843                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
844                 return;
845         }
846
847         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
848         if (child_ctx == NULL) {
849                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
850                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
851         }
852
853
854         ret = pipe(child_ctx->fd);
855         if (ret != 0) {
856                 talloc_free(child_ctx);
857                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
858                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
859                 return;
860         }
861
862         ret = pipe(child_ctx->abortfd);
863         if (ret != 0) {
864                 close(child_ctx->fd[0]);
865                 close(child_ctx->fd[1]);
866                 talloc_free(child_ctx);
867                 DEBUG(DEBUG_ERR, ("Failed to create abort pipe for vacuum child process.\n"));
868                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
869                 return;
870         }
871
872         child_ctx->child_pid = fork();
873         if (child_ctx->child_pid == (pid_t)-1) {
874                 close(child_ctx->fd[0]);
875                 close(child_ctx->fd[1]);
876                 close(child_ctx->abortfd[0]);
877                 close(child_ctx->abortfd[1]);
878                 talloc_free(child_ctx);
879                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
880                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
881                 return;
882         }
883
884
885         if (child_ctx->child_pid == 0) {
886                 char cc = 0;
887                 close(child_ctx->fd[0]);
888                 close(child_ctx->abortfd[1]);
889                 set_nonblocking(child_ctx->abortfd[0]);
890
891                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
892         
893                 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
894                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
895                         _exit(1);
896                 }
897
898                 /* 
899                  * repack the db
900                  */
901                 cc = ctdb_repack_db(ctdb_db, child_ctx->abortfd[0], child_ctx);
902
903                 write(child_ctx->fd[1], &cc, 1);
904                 _exit(0);
905         }
906
907         set_close_on_exec(child_ctx->fd[0]);
908         close(child_ctx->fd[1]);
909         close(child_ctx->abortfd[0]);
910         set_close_on_exec(child_ctx->abortfd[1]);
911         set_nonblocking(child_ctx->abortfd[1]);
912
913         child_ctx->status = VACUUM_RUNNING;
914         child_ctx->start_time = timeval_current();
915
916         DLIST_ADD(ctdb->vacuumers, child_ctx);
917         talloc_set_destructor(child_ctx, vacuum_child_destructor);
918
919         event_add_timed(ctdb->ev, child_ctx,
920                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
921                 vacuum_child_timeout, child_ctx);
922
923         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
924
925         event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
926                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
927                 vacuum_child_handler,
928                 child_ctx);
929
930         vacuum_handle->child_ctx = child_ctx;
931         child_ctx->vacuum_handle = vacuum_handle;
932 }
933
934 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
935 {
936         struct ctdb_vacuum_child_context *i;
937         char c = 1;
938
939         /* FIXME: We don't just free them, since current TDB is not robust
940          * against death during transaction commit. */
941         for (i = ctdb->vacuumers; i; i = i->next) {
942                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
943                                    i->vacuum_handle->ctdb_db->db_name,
944                                    (int)i->child_pid));
945                 write(i->abortfd[1], &c, 1);
946         }
947 }
948
949 /* this function initializes the vacuuming context for a database
950  * starts the vacuuming events
951  */
952 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
953 {
954         if (ctdb_db->persistent != 0) {
955                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
956                 return 0;
957         }
958
959         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
960         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
961
962         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
963
964         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
965                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
966                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
967
968         return 0;
969 }