server: rename ctdb_repack_db() to ctdb_vacuum_and_repack_db()
[sahlberg/ctdb.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/tevent/tevent.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_child_context *next, *prev;
40         struct ctdb_vacuum_handle *vacuum_handle;
41         /* fd child writes status to */
42         int fd[2];
43         pid_t child_pid;
44         enum vacuum_child_status status;
45         struct timeval start_time;
46 };
47
48 struct ctdb_vacuum_handle {
49         struct ctdb_db_context *ctdb_db;
50         struct ctdb_vacuum_child_context *child_ctx;
51 };
52
53
54 /*  a list of records to possibly delete */
55 struct vacuum_data {
56         uint32_t vacuum_limit;
57         uint32_t repack_limit;
58         struct ctdb_context *ctdb;
59         struct ctdb_db_context *ctdb_db;
60         struct tdb_context *dest_db;
61         trbt_tree_t *delete_tree;
62         uint32_t delete_count;
63         struct ctdb_marshall_buffer **list;
64         struct timeval start;
65         bool traverse_error;
66         bool vacuum;
67         uint32_t total;
68         uint32_t vacuumed;
69         uint32_t copied;
70 };
71
72 /* tuning information stored for every db */
73 struct vacuum_tuning_data {
74         uint32_t last_num_repack;
75         uint32_t last_num_empty;
76         uint32_t last_interval;
77         uint32_t new_interval;
78         struct timeval last_start;
79         double   last_duration;
80 };
81
82 /* this structure contains the information for one record to be deleted */
83 struct delete_record_data {
84         struct ctdb_context *ctdb;
85         struct ctdb_db_context *ctdb_db;
86         struct ctdb_ltdb_header hdr;
87         TDB_DATA key;
88 };
89
90 struct delete_records_list {
91         struct ctdb_marshall_buffer *records;
92 };
93
94 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
95                                                           struct timeval t, void *private_data);
96
97
98 /*
99  * traverse function for gathering the records that can be deleted
100  */
101 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
102 {
103         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
104         struct ctdb_context *ctdb = vdata->ctdb;
105         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
106         uint32_t lmaster;
107         struct ctdb_ltdb_header *hdr;
108         struct ctdb_rec_data *rec;
109         size_t old_size;
110                
111         lmaster = ctdb_lmaster(ctdb, &key);
112         if (lmaster >= ctdb->num_nodes) {
113                 DEBUG(DEBUG_CRIT, (__location__
114                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
115                                    " with hash[%u]!\n",
116                                    (unsigned)lmaster,
117                                    (unsigned)ctdb->num_nodes,
118                                    (unsigned)ctdb_hash(&key)));
119                 return -1;
120         }
121
122         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
123                 /* its not a deleted record */
124                 return 0;
125         }
126
127         hdr = (struct ctdb_ltdb_header *)data.dptr;
128
129         if (hdr->dmaster != ctdb->pnn) {
130                 return 0;
131         }
132
133         /* Is this a record we could possibly delete? I.e.
134            if the record is empty and also we are both lmaster
135            and dmaster for the record we should be able to delete it
136         */
137         if (lmaster == ctdb->pnn) {
138                 uint32_t hash;
139
140                 hash = ctdb_hash(&key);
141                 if (trbt_lookup32(vdata->delete_tree, hash)) {
142                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
143                 } 
144                 else {
145                         struct delete_record_data *dd;
146
147                         /* store key and header indexed by the key hash */
148                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
149                         if (dd == NULL) {
150                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
151                                 return -1;
152                         }
153                         dd->ctdb      = ctdb;
154                         dd->ctdb_db   = ctdb_db;
155                         dd->key.dsize = key.dsize;
156                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
157                         if (dd->key.dptr == NULL) {
158                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
159                                 return -1;
160                         }
161
162                         dd->hdr = *hdr;
163         
164                         trbt_insert32(vdata->delete_tree, hash, dd);
165
166                         vdata->delete_count++;
167                 }
168         }
169
170         /* add the record to the blob ready to send to the nodes */
171         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
172         if (rec == NULL) {
173                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
174                 vdata->traverse_error = true;
175                 return -1;
176         }
177         old_size = talloc_get_size(vdata->list[lmaster]);
178         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
179                                                    old_size + rec->length);
180         if (vdata->list[lmaster] == NULL) {
181                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
182                 vdata->traverse_error = true;
183                 return -1;
184         }
185         vdata->list[lmaster]->count++;
186         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
187         talloc_free(rec);
188
189         vdata->total++;
190
191         return 0;
192 }
193
194 /*
195  * traverse the tree of records to delete and marshall them into
196  * a blob
197  */
198 static void delete_traverse(void *param, void *data)
199 {
200         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
201         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
202         struct ctdb_rec_data *rec;
203         size_t old_size;
204
205         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
206         if (rec == NULL) {
207                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
208                 return;
209         }
210
211         old_size = talloc_get_size(recs->records);
212         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
213         if (recs->records == NULL) {
214                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
215                 return;
216         }
217         recs->records->count++;
218         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
219 }
220
221 /* 
222  * read-only traverse the database in order to find
223  * records that can be deleted and try to delete these
224  * records on the other nodes
225  * this executes in the child context
226  */
227 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
228 {
229         struct ctdb_context *ctdb = ctdb_db->ctdb;
230         const char *name = ctdb_db->db_name;
231         int ret, i, pnn;
232
233         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
234         if (ret != 0) {
235                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
236                 return ret;
237         }
238
239         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
240         if (pnn == -1) {
241                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
242                 return -1;
243         }
244
245         ctdb->pnn = pnn;
246         /* the list needs to be of length num_nodes */
247         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
248         if (vdata->list == NULL) {
249                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
250                 return -1;
251         }
252         for (i = 0; i < ctdb->num_nodes; i++) {
253                 vdata->list[i] = (struct ctdb_marshall_buffer *)
254                         talloc_zero_size(vdata->list, 
255                                                          offsetof(struct ctdb_marshall_buffer, data));
256                 if (vdata->list[i] == NULL) {
257                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
258                         return -1;
259                 }
260                 vdata->list[i]->db_id = ctdb_db->db_id;
261         }
262
263         /* read-only traverse, looking for records that might be able to be vacuumed */
264         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
265             vdata->traverse_error) {
266                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
267                 return -1;              
268         }
269
270         /*
271          * For records where we are not the lmaster,
272          * tell the lmaster to fetch the record.
273          */
274         for (i = 0; i < ctdb->num_nodes; i++) {
275                 TDB_DATA data;
276
277                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
278                         continue;
279                 }
280
281                 if (vdata->list[i]->count == 0) {
282                         continue;
283                 }
284
285                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
286                                    vdata->list[i]->count, ctdb->nodes[i]->pnn,
287                                    name));
288
289                 data.dsize = talloc_get_size(vdata->list[i]);
290                 data.dptr  = (void *)vdata->list[i];
291                 if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
292                         DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
293                                           "fetch message to %u\n",
294                                           ctdb->nodes[i]->pnn));
295                         return -1;
296                 }
297         }       
298
299         /* Process all records we can delete (if any) */
300         if (vdata->delete_count > 0) {
301                 struct delete_records_list *recs;
302                 TDB_DATA indata, outdata;
303                 int32_t res;
304                 struct ctdb_node_map *nodemap;
305                 uint32_t *active_nodes;
306                 int num_active_nodes;
307
308                 recs = talloc_zero(vdata, struct delete_records_list);
309                 if (recs == NULL) {
310                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
311                         return -1;
312                 }
313                 recs->records = (struct ctdb_marshall_buffer *)
314                         talloc_zero_size(vdata, 
315                                     offsetof(struct ctdb_marshall_buffer, data));
316                 if (recs->records == NULL) {
317                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
318                         return -1;
319                 }
320                 recs->records->db_id = ctdb_db->db_id;
321
322                 /* 
323                  * traverse the tree of all records we want to delete and
324                  * create a blob we can send to the other nodes.
325                  */
326                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
327
328                 indata.dsize = talloc_get_size(recs->records);
329                 indata.dptr  = (void *)recs->records;
330
331                 /* 
332                  * now tell all the active nodes to delete all these records
333                  * (if possible)
334                  */
335
336                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
337                                            CTDB_CURRENT_NODE,
338                                            recs, /* talloc context */
339                                            &nodemap);
340                 if (ret != 0) {
341                         DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
342                         return -1;
343                 }
344
345                 active_nodes = list_of_active_nodes(ctdb, nodemap,
346                                                     nodemap, /* talloc context */
347                                                     false /* include self */);
348                 /* yuck! ;-) */
349                 num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
350
351                 for (i = 0; i < num_active_nodes; i++) {
352                         struct ctdb_marshall_buffer *records;
353                         struct ctdb_rec_data *rec;
354
355                         ret = ctdb_control(ctdb, active_nodes[i], 0,
356                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
357                                         indata, recs, &outdata, &res,
358                                         NULL, NULL);
359                         if (ret != 0 || res != 0) {
360                                 DEBUG(DEBUG_ERR, ("Failed to delete records on "
361                                                   "node %u: ret[%d] res[%d]\n",
362                                                   active_nodes[i], ret, res));
363                                 return -1;
364                         }
365
366                         /* 
367                          * outdata countains the list of records coming back
368                          * from the node which the node could not delete
369                          */
370                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
371                         rec = (struct ctdb_rec_data *)&records->data[0];
372                         while (records->count-- > 1) {
373                                 TDB_DATA reckey, recdata;
374                                 struct ctdb_ltdb_header *rechdr;
375
376                                 reckey.dptr = &rec->data[0];
377                                 reckey.dsize = rec->keylen;
378                                 recdata.dptr = &rec->data[reckey.dsize];
379                                 recdata.dsize = rec->datalen;
380
381                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
382                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
383                                         return -1;
384                                 }
385                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
386                                 recdata.dptr += sizeof(*rechdr);
387                                 recdata.dsize -= sizeof(*rechdr);
388
389                                 /* 
390                                  * that other node couldnt delete the record
391                                  * so we should delete it and thereby remove it from the tree
392                                  */
393                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
394
395                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
396                         }           
397                 }
398
399                 /* free nodemap and active_nodes */
400                 talloc_free(nodemap);
401
402                 /* 
403                  * The only records remaining in the tree would be those
404                  * records where all other nodes could successfully
405                  * delete them, so we can safely delete them on the
406                  * lmaster as well. Deletion implictely happens while
407                  * we repack the database. The repack algorithm revisits 
408                  * the tree in order to find the records that don't need
409                  * to be copied / repacked.
410                  */
411         }
412
413         /* this ensures we run our event queue */
414         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
415
416         return 0;
417 }
418
419
420 /*
421  * traverse function for repacking
422  */
423 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
424 {
425         struct vacuum_data *vdata = (struct vacuum_data *)private;
426
427         if (vdata->vacuum) {
428                 uint32_t hash = ctdb_hash(&key);
429                 struct delete_record_data *kd;
430                 /*
431                  * check if we can ignore this record because it's in the delete_tree
432                  */
433                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
434                 /*
435                  * there might be hash collisions so we have to compare the keys here to be sure
436                  */
437                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
438                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
439                         /*
440                          * we have to check if the record hasn't changed in the meantime in order to
441                          * savely remove it from the database
442                          */
443                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
444                                 hdr->dmaster == kd->ctdb->pnn &&
445                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
446                                 kd->hdr.rsn == hdr->rsn) {
447                                 vdata->vacuumed++;
448                                 return 0;
449                         }
450                 }
451         }
452         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
453                 vdata->traverse_error = true;
454                 return -1;
455         }
456         vdata->copied++;
457         return 0;
458 }
459
460 /*
461  * repack a tdb
462  */
463 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
464 {
465         struct tdb_context *tmp_db;
466
467         if (tdb_transaction_start(tdb) != 0) {
468                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
469                 return -1;
470         }
471
472         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
473                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
474                           O_RDWR|O_CREAT, 0);
475         if (tmp_db == NULL) {
476                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
477                 tdb_transaction_cancel(tdb);
478                 return -1;
479         }
480
481         vdata->traverse_error = false;
482         vdata->dest_db = tmp_db;
483         vdata->vacuum = true;
484         vdata->vacuumed = 0;
485         vdata->copied = 0;
486
487         /*
488          * repack and vacuum on-the-fly by not writing the records that are
489          * no longer needed
490          */
491         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
492                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
493                 tdb_transaction_cancel(tdb);
494                 tdb_close(tmp_db);
495                 return -1;              
496         }
497
498         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
499         
500         if (vdata->traverse_error) {
501                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
502                 tdb_transaction_cancel(tdb);
503                 tdb_close(tmp_db);
504                 return -1;
505         }
506
507         if (tdb_wipe_all(tdb) != 0) {
508                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
509                 tdb_transaction_cancel(tdb);
510                 tdb_close(tmp_db);
511                 return -1;
512         }
513
514         vdata->traverse_error = false;
515         vdata->dest_db = tdb;
516         vdata->vacuum = false;
517         vdata->copied = 0;
518
519         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
520                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
521                 tdb_transaction_cancel(tdb);
522                 tdb_close(tmp_db);
523                 return -1;              
524         }
525
526         if (vdata->traverse_error) {
527                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
528                 tdb_transaction_cancel(tdb);
529                 tdb_close(tmp_db);
530                 return -1;
531         }
532
533         tdb_close(tmp_db);
534
535
536         if (tdb_transaction_commit(tdb) != 0) {
537                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
538                 return -1;
539         }
540         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
541
542         return 0;
543 }
544
545 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
546 {
547         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
548         TDB_CONTEXT *tune_tdb;
549         TDB_DATA key, value;
550         struct vacuum_tuning_data tdata;
551         struct vacuum_tuning_data *tptr;
552         char *vac_dbname;
553         int flags;
554
555         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
556                                      ctdb_db->ctdb->db_directory_state,
557                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
558         if (vac_dbname == NULL) {
559                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
560                 talloc_free(tmp_ctx);
561                 return -1;
562         }
563
564         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
565         flags |= TDB_DISALLOW_NESTING;
566         tune_tdb = tdb_open(vac_dbname, 0,
567                             flags,
568                             O_RDWR|O_CREAT, 0600);
569         if (tune_tdb == NULL) {
570                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
571                 talloc_free(tmp_ctx);
572                 return -1;
573         }
574         
575         if (tdb_transaction_start(tune_tdb) != 0) {
576                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
577                 tdb_close(tune_tdb);
578                 return -1;
579         }
580         key.dptr = discard_const(ctdb_db->db_name);
581         key.dsize = strlen(ctdb_db->db_name);
582         value = tdb_fetch(tune_tdb, key);
583
584         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
585                 tptr = (struct vacuum_tuning_data *)value.dptr;
586                 tdata = *tptr;
587
588                 /*
589                  * re-calc new vacuum interval:
590                  * in case no limit was reached we continously increase the interval
591                  * until vacuum_max_interval is reached
592                  * in case a limit was reached we divide the current interval by 2
593                  * unless vacuum_min_interval is reached
594                  */
595                 if (freelist < vdata->repack_limit &&
596                     vdata->delete_count < vdata->vacuum_limit) {
597                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
598                                 tdata.new_interval = tdata.last_interval * 110 / 100;
599                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
600                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
601                         }
602                 } else {
603                         tdata.new_interval = tdata.last_interval / 2;
604                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
605                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
606                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
607                         }               
608                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
609                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
610                 }
611                 tdata.last_interval = tdata.new_interval;
612         } else {
613                 DEBUG(DEBUG_DEBUG,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
614                 tdata.last_num_repack = freelist;
615                 tdata.last_num_empty = vdata->delete_count;
616                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
617         }
618
619         if (value.dptr != NULL) {
620                 free(value.dptr);
621         }
622
623         tdata.last_start = vdata->start;
624         tdata.last_duration = timeval_elapsed(&vdata->start);
625
626         value.dptr = (unsigned char *)&tdata;
627         value.dsize = sizeof(tdata);
628
629         if (tdb_store(tune_tdb, key, value, 0) != 0) {
630                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
631                 tdb_transaction_cancel(tune_tdb);
632                 tdb_close(tune_tdb);
633                 talloc_free(tmp_ctx);
634                 return -1;
635         }
636         tdb_transaction_commit(tune_tdb);
637         tdb_close(tune_tdb);
638         talloc_free(tmp_ctx);
639
640         return 0;
641 }
642
643 /*
644  * repack and vaccum a db
645  * called from the child context
646  */
647 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
648                                      TALLOC_CTX *mem_ctx)
649 {
650         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
651         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
652         const char *name = ctdb_db->db_name;
653         int size;
654         struct vacuum_data *vdata;
655
656         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
657         if (size == -1) {
658                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
659                 return -1;
660         }
661
662         vdata = talloc_zero(mem_ctx, struct vacuum_data);
663         if (vdata == NULL) {
664                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
665                 return -1;
666         }
667
668         vdata->ctdb = ctdb_db->ctdb;
669         vdata->vacuum_limit = vacuum_limit;
670         vdata->repack_limit = repack_limit;
671         vdata->delete_tree = trbt_create(vdata, 0);
672         vdata->ctdb_db = ctdb_db;
673         if (vdata->delete_tree == NULL) {
674                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
675                 talloc_free(vdata);
676                 return -1;
677         }
678
679         vdata->start = timeval_current();
680  
681         /*
682          * gather all records that can be deleted in vdata
683          */
684         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
685                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
686         }
687
688         /*
689          * decide if a repack is necessary
690          */
691         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
692                 update_tuning_db(ctdb_db, vdata, size);
693                 talloc_free(vdata);
694                 return 0;
695         }
696
697         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
698                         name, size, vdata->delete_count));
699
700         /*
701          * repack and implicitely get rid of the records we can delete
702          */
703         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
704                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
705                 update_tuning_db(ctdb_db, vdata, size);
706                 talloc_free(vdata);
707                 return -1;
708         }
709         update_tuning_db(ctdb_db, vdata, size);
710         talloc_free(vdata);
711
712         return 0;
713 }
714
715 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
716 {
717         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
718         TDB_CONTEXT *tdb;
719         TDB_DATA key, value;
720         char *vac_dbname;
721         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
722         struct ctdb_context *ctdb = ctdb_db->ctdb;
723         int flags;
724
725         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
726         if (vac_dbname == NULL) {
727                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
728                 talloc_free(tmp_ctx);
729                 return interval;
730         }
731
732         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
733         flags |= TDB_DISALLOW_NESTING;
734         tdb = tdb_open(vac_dbname, 0,
735                        flags,
736                        O_RDWR|O_CREAT, 0600);
737         if (!tdb) {
738                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval. Errno : %s (%d)\n", vac_dbname, strerror(errno), errno));
739                 talloc_free(tmp_ctx);
740                 return interval;
741         }
742
743         key.dptr = discard_const(ctdb_db->db_name);
744         key.dsize = strlen(ctdb_db->db_name);
745
746         value = tdb_fetch(tdb, key);
747
748         if (value.dptr != NULL) {
749                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
750                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
751
752                         interval = tptr->new_interval;
753
754                         if (interval < ctdb->tunable.vacuum_min_interval) {
755                                 interval = ctdb->tunable.vacuum_min_interval;
756                         } 
757                         if (interval > ctdb->tunable.vacuum_max_interval) {
758                                 interval = ctdb->tunable.vacuum_max_interval;
759                         }
760                 }
761                 free(value.dptr);
762         }
763         tdb_close(tdb);
764
765         talloc_free(tmp_ctx);
766
767         return interval;
768 }
769
770 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
771 {
772         double l = timeval_elapsed(&child_ctx->start_time);
773         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
774         struct ctdb_context *ctdb = ctdb_db->ctdb;
775
776         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
777
778         if (child_ctx->child_pid != -1) {
779                 kill(child_ctx->child_pid, SIGKILL);
780         }
781
782         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
783
784         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
785                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
786                         ctdb_vacuum_event, child_ctx->vacuum_handle);
787
788         return 0;
789 }
790
791 /*
792  * this event is generated when a vacuum child process times out
793  */
794 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
795                                          struct timeval t, void *private_data)
796 {
797         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
798
799         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
800
801         child_ctx->status = VACUUM_TIMEOUT;
802
803         talloc_free(child_ctx);
804 }
805
806
807 /*
808  * this event is generated when a vacuum child process has completed
809  */
810 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
811                              uint16_t flags, void *private_data)
812 {
813         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
814         char c = 0;
815         int ret;
816
817         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
818         child_ctx->child_pid = -1;
819
820         ret = read(child_ctx->fd[0], &c, 1);
821         if (ret != 1 || c != 0) {
822                 child_ctx->status = VACUUM_ERROR;
823                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
824         } else {
825                 child_ctx->status = VACUUM_OK;
826         }
827
828         talloc_free(child_ctx);
829 }
830
831 /*
832  * this event is called every time we need to start a new vacuum process
833  */
834 static void
835 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
836                                struct timeval t, void *private_data)
837 {
838         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
839         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
840         struct ctdb_context *ctdb = ctdb_db->ctdb;
841         struct ctdb_vacuum_child_context *child_ctx;
842         struct tevent_fd *fde;
843         int ret;
844
845         /* we dont vacuum if we are in recovery mode, or db frozen */
846         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
847             ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
848                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
849                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
850                                    : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
851                                    ? "freeze pending"
852                                    : "frozen"));
853                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
854                 return;
855         }
856
857         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
858         if (child_ctx == NULL) {
859                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
860                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
861         }
862
863
864         ret = pipe(child_ctx->fd);
865         if (ret != 0) {
866                 talloc_free(child_ctx);
867                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
868                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
869                 return;
870         }
871
872         child_ctx->child_pid = ctdb_fork(ctdb);
873         if (child_ctx->child_pid == (pid_t)-1) {
874                 close(child_ctx->fd[0]);
875                 close(child_ctx->fd[1]);
876                 talloc_free(child_ctx);
877                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
878                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
879                 return;
880         }
881
882
883         if (child_ctx->child_pid == 0) {
884                 char cc = 0;
885                 close(child_ctx->fd[0]);
886
887                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
888         
889                 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
890                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
891                         _exit(1);
892                 }
893
894                 /* 
895                  * repack the db
896                  */
897                 cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx);
898
899                 write(child_ctx->fd[1], &cc, 1);
900                 _exit(0);
901         }
902
903         set_close_on_exec(child_ctx->fd[0]);
904         close(child_ctx->fd[1]);
905
906         child_ctx->status = VACUUM_RUNNING;
907         child_ctx->start_time = timeval_current();
908
909         DLIST_ADD(ctdb->vacuumers, child_ctx);
910         talloc_set_destructor(child_ctx, vacuum_child_destructor);
911
912         /*
913          * Clear the fastpath vacuuming list in the parent.
914          */
915         talloc_free(ctdb_db->delete_queue);
916         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
917         if (ctdb_db->delete_queue == NULL) {
918                 /* fatal here? ... */
919                 ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
920                                  "in parent context. Shutting down\n");
921         }
922
923         event_add_timed(ctdb->ev, child_ctx,
924                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
925                 vacuum_child_timeout, child_ctx);
926
927         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
928
929         fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
930                            EVENT_FD_READ, vacuum_child_handler, child_ctx);
931         tevent_fd_set_auto_close(fde);
932
933         vacuum_handle->child_ctx = child_ctx;
934         child_ctx->vacuum_handle = vacuum_handle;
935 }
936
937 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
938 {
939         /* Simply free them all. */
940         while (ctdb->vacuumers) {
941                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
942                            ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
943                            (int)ctdb->vacuumers->child_pid));
944                 /* vacuum_child_destructor kills it, removes from list */
945                 talloc_free(ctdb->vacuumers);
946         }
947 }
948
949 /* this function initializes the vacuuming context for a database
950  * starts the vacuuming events
951  */
952 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
953 {
954         if (ctdb_db->persistent != 0) {
955                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
956                 return 0;
957         }
958
959         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
960         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
961
962         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
963
964         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
965                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
966                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
967
968         return 0;
969 }