rename ctdb_send_message to ctdb_client_send_message to resolve colission with the...
[rusty/ctdb.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/events/events.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_handle *vacuum_handle;
40         int fd[2];
41         pid_t child_pid;
42         enum vacuum_child_status status;
43         struct timeval start_time;
44 };
45
46 struct ctdb_vacuum_handle {
47         struct ctdb_db_context *ctdb_db;
48         struct ctdb_vacuum_child_context *child_ctx;
49 };
50
51
52 /*  a list of records to possibly delete */
53 struct vacuum_data {
54         uint32_t vacuum_limit;
55         uint32_t repack_limit;
56         struct ctdb_context *ctdb;
57         struct ctdb_db_context *ctdb_db;
58         struct tdb_context *dest_db;
59         trbt_tree_t *delete_tree;
60         uint32_t delete_count;
61         struct ctdb_marshall_buffer **list;
62         struct timeval start;
63         bool traverse_error;
64         bool vacuum;
65         uint32_t total;
66         uint32_t vacuumed;
67         uint32_t copied;
68 };
69
70 /* tuning information stored for every db */
71 struct vacuum_tuning_data {
72         uint32_t last_num_repack;
73         uint32_t last_num_empty;
74         uint32_t last_interval;
75         uint32_t new_interval;
76         struct timeval last_start;
77         double   last_duration;
78 };
79
80 /* this structure contains the information for one record to be deleted */
81 struct delete_record_data {
82         struct ctdb_context *ctdb;
83         struct ctdb_db_context *ctdb_db;
84         struct ctdb_ltdb_header hdr;
85         TDB_DATA key;
86 };
87
88 struct delete_records_list {
89         struct ctdb_marshall_buffer *records;
90 };
91
92 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
93                                                           struct timeval t, void *private_data);
94
95
96 /*
97  * traverse function for gathering the records that can be deleted
98  */
99 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
100 {
101         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
102         struct ctdb_context *ctdb = vdata->ctdb;
103         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
104         uint32_t lmaster;
105         struct ctdb_ltdb_header *hdr;
106         struct ctdb_rec_data *rec;
107         size_t old_size;
108                
109         lmaster = ctdb_lmaster(ctdb, &key);
110         if (lmaster >= ctdb->vnn_map->size) {
111                 return 0;
112         }
113
114         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
115                 /* its not a deleted record */
116                 return 0;
117         }
118
119         hdr = (struct ctdb_ltdb_header *)data.dptr;
120
121         if (hdr->dmaster != ctdb->pnn) {
122                 return 0;
123         }
124
125         /* is this a records we could possibly delete? I.e.
126            if the record is empty and also we are both lmaster
127            and dmaster for the record we should be able to delete it
128         */
129         if (lmaster == ctdb->pnn) {
130                 uint32_t hash;
131
132                 hash = ctdb_hash(&key);
133                 if (trbt_lookup32(vdata->delete_tree, hash)) {
134                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
135                 } 
136                 else {
137                         struct delete_record_data *dd;
138
139                         /* store key and header indexed by the key hash */
140                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
141                         if (dd == NULL) {
142                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
143                                 return -1;
144                         }
145                         dd->ctdb      = ctdb;
146                         dd->ctdb_db   = ctdb_db;
147                         dd->key.dsize = key.dsize;
148                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
149                         if (dd->key.dptr == NULL) {
150                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
151                                 return -1;
152                         }
153
154                         dd->hdr = *hdr;
155         
156                         trbt_insert32(vdata->delete_tree, hash, dd);
157
158                         vdata->delete_count++;
159                 }
160         }
161
162         /* add the record to the blob ready to send to the nodes */
163         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
164         if (rec == NULL) {
165                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
166                 vdata->traverse_error = true;
167                 return -1;
168         }
169         old_size = talloc_get_size(vdata->list[lmaster]);
170         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
171                                                    old_size + rec->length);
172         if (vdata->list[lmaster] == NULL) {
173                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
174                 vdata->traverse_error = true;
175                 return -1;
176         }
177         vdata->list[lmaster]->count++;
178         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
179         talloc_free(rec);
180
181         vdata->total++;
182
183         return 0;
184 }
185
186 /*
187  * traverse the tree of records to delete and marshall them into
188  * a blob
189  */
190 static void delete_traverse(void *param, void *data)
191 {
192         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
193         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
194         struct ctdb_rec_data *rec;
195         size_t old_size;
196
197         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
198         if (rec == NULL) {
199                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
200                 return;
201         }
202
203         old_size = talloc_get_size(recs->records);
204         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
205         if (recs->records == NULL) {
206                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
207                 return;
208         }
209         recs->records->count++;
210         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
211 }
212
213 /* 
214  * read-only traverse the database in order to find
215  * records that can be deleted and try to delete these
216  * records on the other nodes
217  * this executes in the child context
218  */
219 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
220 {
221         struct ctdb_context *ctdb = ctdb_db->ctdb;
222         const char *name = ctdb_db->db_name;
223         int ret, i, pnn;
224
225         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
226         if (ret != 0) {
227                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
228                 return ret;
229         }
230
231         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
232         if (pnn == -1) {
233                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
234                 return -1;
235         }
236
237         ctdb->pnn = pnn;
238         /* the list needs to be of length num_nodes */
239         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
240         if (vdata->list == NULL) {
241                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
242                 return -1;
243         }
244         for (i = 0; i < ctdb->vnn_map->size; i++) {
245                 vdata->list[i] = (struct ctdb_marshall_buffer *)
246                         talloc_zero_size(vdata->list, 
247                                                          offsetof(struct ctdb_marshall_buffer, data));
248                 if (vdata->list[i] == NULL) {
249                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
250                         return -1;
251                 }
252                 vdata->list[i]->db_id = ctdb_db->db_id;
253         }
254
255         /* read-only traverse, looking for records that might be able to be vacuumed */
256         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
257             vdata->traverse_error) {
258                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
259                 return -1;              
260         }
261
262         for ( i = 0; i < ctdb->vnn_map->size; i++) {
263                 if (vdata->list[i]->count == 0) {
264                         continue;
265                 }
266
267                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
268                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
269                         TDB_DATA data;
270                         DEBUG(DEBUG_INFO,("Found %u records for lmaster %u in '%s'\n", 
271                                                                 vdata->list[i]->count, i, name));
272
273                         data.dsize = talloc_get_size(vdata->list[i]);
274                         data.dptr  = (void *)vdata->list[i];
275                         if (ctdb_client_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
276                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
277                                          ctdb->vnn_map->map[i]));
278                                 return -1;              
279                         }
280                         continue;
281                 }
282         }       
283
284         /* Process all records we can delete (if any) */
285         if (vdata->delete_count > 0) {
286                 struct delete_records_list *recs;
287                 TDB_DATA indata, outdata;
288                 int32_t res;
289
290                 recs = talloc_zero(vdata, struct delete_records_list);
291                 if (recs == NULL) {
292                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
293                         return -1;
294                 }
295                 recs->records = (struct ctdb_marshall_buffer *)
296                         talloc_zero_size(vdata, 
297                                     offsetof(struct ctdb_marshall_buffer, data));
298                 if (recs->records == NULL) {
299                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
300                         return -1;
301                 }
302                 recs->records->db_id = ctdb_db->db_id;
303
304                 /* 
305                  * traverse the tree of all records we want to delete and
306                  * create a blob we can send to the other nodes.
307                  */
308                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
309
310                 indata.dsize = talloc_get_size(recs->records);
311                 indata.dptr  = (void *)recs->records;
312
313                 /* 
314                  * now tell all the other nodes to delete all these records
315                  * (if possible)
316                  */
317                 for (i = 0; i < ctdb->vnn_map->size; i++) {
318                         struct ctdb_marshall_buffer *records;
319                         struct ctdb_rec_data *rec;
320
321                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
322                                 /* we dont delete the records on the local node just yet */
323                                 continue;
324                         }
325
326                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
327                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
328                                         indata, recs, &outdata, &res,
329                                         NULL, NULL);
330                         if (ret != 0 || res != 0) {
331                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
332                                 return -1;
333                         }
334
335                         /* 
336                          * outdata countains the list of records coming back
337                          * from the node which the node could not delete
338                          */
339                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
340                         rec = (struct ctdb_rec_data *)&records->data[0];
341                         while (records->count-- > 1) {
342                                 TDB_DATA reckey, recdata;
343                                 struct ctdb_ltdb_header *rechdr;
344
345                                 reckey.dptr = &rec->data[0];
346                                 reckey.dsize = rec->keylen;
347                                 recdata.dptr = &rec->data[reckey.dsize];
348                                 recdata.dsize = rec->datalen;
349
350                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
351                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
352                                         return -1;
353                                 }
354                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
355                                 recdata.dptr += sizeof(*rechdr);
356                                 recdata.dsize -= sizeof(*rechdr);
357
358                                 /* 
359                                  * that other node couldnt delete the record
360                                  * so we should delete it and thereby remove it from the tree
361                                  */
362                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
363
364                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
365                         }           
366                 }
367
368                 /* 
369                  * The only records remaining in the tree would be those
370                  * records where all other nodes could successfully
371                  * delete them, so we can safely delete them on the
372                  * lmaster as well. Deletion implictely happens while
373                  * we repack the database. The repack algorithm revisits 
374                  * the tree in order to find the records that don't need
375                  * to be copied / repacked.
376                  */
377         }
378
379         /* this ensures we run our event queue */
380         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
381
382         return 0;
383 }
384
385
386 /*
387  * traverse function for repacking
388  */
389 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
390 {
391         struct vacuum_data *vdata = (struct vacuum_data *)private;
392
393         if (vdata->vacuum) {
394                 uint32_t hash = ctdb_hash(&key);
395                 struct delete_record_data *kd;
396                 /*
397                  * check if we can ignore this record because it's in the delete_tree
398                  */
399                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
400                 /*
401                  * there might be hash collisions so we have to compare the keys here to be sure
402                  */
403                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
404                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
405                         /*
406                          * we have to check if the record hasn't changed in the meantime in order to
407                          * savely remove it from the database
408                          */
409                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
410                                 hdr->dmaster == kd->ctdb->pnn &&
411                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
412                                 kd->hdr.rsn == hdr->rsn) {
413                                 vdata->vacuumed++;
414                                 return 0;
415                         }
416                 }
417         }
418         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
419                 vdata->traverse_error = true;
420                 return -1;
421         }
422         vdata->copied++;
423         return 0;
424 }
425
426 /*
427  * repack a tdb
428  */
429 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
430 {
431         struct tdb_context *tmp_db;
432
433         if (tdb_transaction_start(tdb) != 0) {
434                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
435                 return -1;
436         }
437
438         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
439                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
440                           O_RDWR|O_CREAT, 0);
441         if (tmp_db == NULL) {
442                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
443                 tdb_transaction_cancel(tdb);
444                 return -1;
445         }
446
447         vdata->traverse_error = false;
448         vdata->dest_db = tmp_db;
449         vdata->vacuum = true;
450         vdata->vacuumed = 0;
451         vdata->copied = 0;
452
453         /*
454          * repack and vacuum on-the-fly by not writing the records that are
455          * no longer needed
456          */
457         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
458                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
459                 tdb_transaction_cancel(tdb);
460                 tdb_close(tmp_db);
461                 return -1;              
462         }
463
464         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
465         
466         if (vdata->traverse_error) {
467                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
468                 tdb_transaction_cancel(tdb);
469                 tdb_close(tmp_db);
470                 return -1;
471         }
472
473         if (tdb_wipe_all(tdb) != 0) {
474                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
475                 tdb_transaction_cancel(tdb);
476                 tdb_close(tmp_db);
477                 return -1;
478         }
479
480         vdata->traverse_error = false;
481         vdata->dest_db = tdb;
482         vdata->vacuum = false;
483         vdata->copied = 0;
484
485         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
486                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
487                 tdb_transaction_cancel(tdb);
488                 tdb_close(tmp_db);
489                 return -1;              
490         }
491
492         if (vdata->traverse_error) {
493                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
494                 tdb_transaction_cancel(tdb);
495                 tdb_close(tmp_db);
496                 return -1;
497         }
498
499         tdb_close(tmp_db);
500
501
502         if (tdb_transaction_commit(tdb) != 0) {
503                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
504                 return -1;
505         }
506         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
507
508         return 0;
509 }
510
511 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
512 {
513         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
514         TDB_CONTEXT *tune_tdb;
515         TDB_DATA key, value;
516         struct vacuum_tuning_data tdata;
517         struct vacuum_tuning_data *tptr;
518         char *vac_dbname;
519         int flags;
520
521         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
522                                      ctdb_db->ctdb->db_directory_state,
523                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
524         if (vac_dbname == NULL) {
525                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
526                 talloc_free(tmp_ctx);
527                 return -1;
528         }
529
530         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
531         flags |= TDB_DISALLOW_NESTING;
532         tune_tdb = tdb_open(vac_dbname, 0,
533                             flags,
534                             O_RDWR|O_CREAT, 0600);
535         if (tune_tdb == NULL) {
536                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
537                 talloc_free(tmp_ctx);
538                 return -1;
539         }
540         
541         if (tdb_transaction_start(tune_tdb) != 0) {
542                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
543                 tdb_close(tune_tdb);
544                 return -1;
545         }
546         key.dptr = discard_const(ctdb_db->db_name);
547         key.dsize = strlen(ctdb_db->db_name);
548         value = tdb_fetch(tune_tdb, key);
549
550         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
551                 tptr = (struct vacuum_tuning_data *)value.dptr;
552                 tdata = *tptr;
553
554                 /*
555                  * re-calc new vacuum interval:
556                  * in case no limit was reached we continously increase the interval
557                  * until vacuum_max_interval is reached
558                  * in case a limit was reached we divide the current interval by 2
559                  * unless vacuum_min_interval is reached
560                  */
561                 if (freelist < vdata->repack_limit &&
562                     vdata->delete_count < vdata->vacuum_limit) {
563                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
564                                 tdata.new_interval = tdata.last_interval * 110 / 100;
565                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
566                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
567                         }
568                 } else {
569                         tdata.new_interval = tdata.last_interval / 2;
570                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
571                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
572                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
573                         }               
574                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
575                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
576                 }
577                 tdata.last_interval = tdata.new_interval;
578         } else {
579                 DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
580                 tdata.last_num_repack = freelist;
581                 tdata.last_num_empty = vdata->delete_count;
582                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
583         }
584
585         if (value.dptr != NULL) {
586                 free(value.dptr);
587         }
588
589         tdata.last_start = vdata->start;
590         tdata.last_duration = timeval_elapsed(&vdata->start);
591
592         value.dptr = (unsigned char *)&tdata;
593         value.dsize = sizeof(tdata);
594
595         if (tdb_store(tune_tdb, key, value, 0) != 0) {
596                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
597                 tdb_transaction_cancel(tune_tdb);
598                 tdb_close(tune_tdb);
599                 talloc_free(tmp_ctx);
600                 return -1;
601         }
602         tdb_transaction_commit(tune_tdb);
603         tdb_close(tune_tdb);
604         talloc_free(tmp_ctx);
605
606         return 0;
607 }
608
609 /*
610  * repack and vaccum a db
611  * called from the child context
612  */
613 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
614 {
615         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
616         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
617         const char *name = ctdb_db->db_name;
618         int size;
619         struct vacuum_data *vdata;
620
621         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
622         if (size == -1) {
623                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
624                 return -1;
625         }
626
627         vdata = talloc_zero(mem_ctx, struct vacuum_data);
628         if (vdata == NULL) {
629                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
630                 return -1;
631         }
632
633         vdata->ctdb = ctdb_db->ctdb;
634         vdata->vacuum_limit = vacuum_limit;
635         vdata->repack_limit = repack_limit;
636         vdata->delete_tree = trbt_create(vdata, 0);
637         if (vdata->delete_tree == NULL) {
638                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
639                 talloc_free(vdata);
640                 return -1;
641         }
642
643         vdata->start = timeval_current();
644  
645         /*
646          * gather all records that can be deleted in vdata
647          */
648         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
649                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
650         }
651
652         /*
653          * decide if a repack is necessary
654          */
655         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
656                 update_tuning_db(ctdb_db, vdata, size);
657                 talloc_free(vdata);
658                 return 0;
659         }
660
661         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
662                         name, size, vdata->delete_count));
663
664         /*
665          * repack and implicitely get rid of the records we can delete
666          */
667         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
668                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
669                 update_tuning_db(ctdb_db, vdata, size);
670                 talloc_free(vdata);
671                 return -1;
672         }
673         update_tuning_db(ctdb_db, vdata, size);
674         talloc_free(vdata);
675
676         return 0;
677 }
678
679 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
680 {
681         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
682         TDB_CONTEXT *tdb;
683         TDB_DATA key, value;
684         char *vac_dbname;
685         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
686         struct ctdb_context *ctdb = ctdb_db->ctdb;
687         int flags;
688
689         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
690         if (vac_dbname == NULL) {
691                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
692                 talloc_free(tmp_ctx);
693                 return interval;
694         }
695
696         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
697         flags |= TDB_DISALLOW_NESTING;
698         tdb = tdb_open(vac_dbname, 0,
699                        flags,
700                        O_RDWR|O_CREAT, 0600);
701         if (!tdb) {
702                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
703                 talloc_free(tmp_ctx);
704                 return interval;
705         }
706
707         key.dptr = discard_const(ctdb_db->db_name);
708         key.dsize = strlen(ctdb_db->db_name);
709
710         value = tdb_fetch(tdb, key);
711
712         if (value.dptr != NULL) {
713                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
714                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
715
716                         interval = tptr->new_interval;
717
718                         if (interval < ctdb->tunable.vacuum_min_interval) {
719                                 interval = ctdb->tunable.vacuum_min_interval;
720                         } 
721                         if (interval > ctdb->tunable.vacuum_max_interval) {
722                                 interval = ctdb->tunable.vacuum_max_interval;
723                         }
724                 }
725                 free(value.dptr);
726         }
727         tdb_close(tdb);
728
729         talloc_free(tmp_ctx);
730
731         return interval;
732 }
733
734 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
735 {
736         double l = timeval_elapsed(&child_ctx->start_time);
737         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
738         struct ctdb_context *ctdb = ctdb_db->ctdb;
739
740         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
741
742         if (child_ctx->child_pid != -1) {
743                 kill(child_ctx->child_pid, SIGKILL);
744         }
745
746         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
747                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
748                         ctdb_vacuum_event, child_ctx->vacuum_handle);
749
750         return 0;
751 }
752
753 /*
754  * this event is generated when a vacuum child process times out
755  */
756 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
757                                          struct timeval t, void *private_data)
758 {
759         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
760
761         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
762
763         child_ctx->status = VACUUM_TIMEOUT;
764
765         talloc_free(child_ctx);
766 }
767
768
769 /*
770  * this event is generated when a vacuum child process has completed
771  */
772 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
773                              uint16_t flags, void *private_data)
774 {
775         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
776         char c = 0;
777         int ret;
778
779         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
780         child_ctx->child_pid = -1;
781
782         ret = read(child_ctx->fd[0], &c, 1);
783         if (ret != 1 || c != 0) {
784                 child_ctx->status = VACUUM_ERROR;
785                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
786         } else {
787                 child_ctx->status = VACUUM_OK;
788         }
789
790         talloc_free(child_ctx);
791 }
792
793 /*
794  * this event is called every time we need to start a new vacuum process
795  */
796 static void
797 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
798                                struct timeval t, void *private_data)
799 {
800         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
801         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
802         struct ctdb_context *ctdb = ctdb_db->ctdb;
803         struct ctdb_vacuum_child_context *child_ctx;
804         int ret;
805
806         /* we dont vacuum if we are in recovery mode */
807         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
808                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
809                 return;
810         }
811
812         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
813         if (child_ctx == NULL) {
814                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
815                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
816         }
817
818
819         ret = pipe(child_ctx->fd);
820         if (ret != 0) {
821                 talloc_free(child_ctx);
822                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
823                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
824                 return;
825         }
826
827         child_ctx->child_pid = fork();
828         if (child_ctx->child_pid == (pid_t)-1) {
829                 close(child_ctx->fd[0]);
830                 close(child_ctx->fd[1]);
831                 talloc_free(child_ctx);
832                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
833                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
834                 return;
835         }
836
837
838         if (child_ctx->child_pid == 0) {
839                 char cc = 0;
840                 close(child_ctx->fd[0]);
841
842                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
843         
844                 if (switch_from_server_to_client(ctdb) != 0) {
845                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
846                         _exit(1);
847                 }
848
849                 /* 
850                  * repack the db
851                  */
852                 cc = ctdb_repack_db(ctdb_db, child_ctx);
853
854                 write(child_ctx->fd[1], &cc, 1);
855                 _exit(0);
856         }
857
858         set_close_on_exec(child_ctx->fd[0]);
859         close(child_ctx->fd[1]);
860
861         child_ctx->status = VACUUM_RUNNING;
862         child_ctx->start_time = timeval_current();
863
864         talloc_set_destructor(child_ctx, vacuum_child_destructor);
865
866         event_add_timed(ctdb->ev, child_ctx,
867                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
868                 vacuum_child_timeout, child_ctx);
869
870         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
871
872         event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
873                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
874                 vacuum_child_handler,
875                 child_ctx);
876
877         vacuum_handle->child_ctx = child_ctx;
878         child_ctx->vacuum_handle = vacuum_handle;
879 }
880
881
882 /* this function initializes the vacuuming context for a database
883  * starts the vacuuming events
884  */
885 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
886 {
887         if (ctdb_db->persistent != 0) {
888                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
889                 return 0;
890         }
891
892         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
893         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
894
895         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
896
897         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
898                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
899                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
900
901         return 0;
902 }