server: rename ctdb_repack_db() to ctdb_vacuum_and_repack_db()
[ctdb.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/events/events.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_handle *vacuum_handle;
40         int fd[2];
41         pid_t child_pid;
42         enum vacuum_child_status status;
43         struct timeval start_time;
44 };
45
46 struct ctdb_vacuum_handle {
47         struct ctdb_db_context *ctdb_db;
48         struct ctdb_vacuum_child_context *child_ctx;
49 };
50
51
52 /*  a list of records to possibly delete */
53 struct vacuum_data {
54         uint32_t vacuum_limit;
55         uint32_t repack_limit;
56         struct ctdb_context *ctdb;
57         struct ctdb_db_context *ctdb_db;
58         struct tdb_context *dest_db;
59         trbt_tree_t *delete_tree;
60         uint32_t delete_count;
61         struct ctdb_marshall_buffer **list;
62         struct timeval start;
63         bool traverse_error;
64         bool vacuum;
65         uint32_t total;
66         uint32_t vacuumed;
67         uint32_t copied;
68 };
69
70 /* tuning information stored for every db */
71 struct vacuum_tuning_data {
72         uint32_t last_num_repack;
73         uint32_t last_num_empty;
74         uint32_t last_interval;
75         uint32_t new_interval;
76         struct timeval last_start;
77         double   last_duration;
78 };
79
80 /* this structure contains the information for one record to be deleted */
81 struct delete_record_data {
82         struct ctdb_context *ctdb;
83         struct ctdb_db_context *ctdb_db;
84         struct ctdb_ltdb_header hdr;
85         TDB_DATA key;
86 };
87
88 struct delete_records_list {
89         struct ctdb_marshall_buffer *records;
90 };
91
92 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
93                                                           struct timeval t, void *private_data);
94
95
96 /*
97  * traverse function for gathering the records that can be deleted
98  */
99 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
100 {
101         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
102         struct ctdb_context *ctdb = vdata->ctdb;
103         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
104         uint32_t lmaster;
105         struct ctdb_ltdb_header *hdr;
106         struct ctdb_rec_data *rec;
107         size_t old_size;
108                
109         lmaster = ctdb_lmaster(ctdb, &key);
110         if (lmaster >= ctdb->num_nodes) {
111                 DEBUG(DEBUG_CRIT, (__location__
112                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
113                                    " with hash[%u]!\n",
114                                    (unsigned)lmaster,
115                                    (unsigned)ctdb->num_nodes,
116                                    (unsigned)ctdb_hash(&key)));
117                 return -1;
118         }
119
120         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
121                 /* its not a deleted record */
122                 return 0;
123         }
124
125         hdr = (struct ctdb_ltdb_header *)data.dptr;
126
127         if (hdr->dmaster != ctdb->pnn) {
128                 return 0;
129         }
130
131         /* Is this a record we could possibly delete? I.e.
132            if the record is empty and also we are both lmaster
133            and dmaster for the record we should be able to delete it
134         */
135         if (lmaster == ctdb->pnn) {
136                 uint32_t hash;
137
138                 hash = ctdb_hash(&key);
139                 if (trbt_lookup32(vdata->delete_tree, hash)) {
140                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
141                 } 
142                 else {
143                         struct delete_record_data *dd;
144
145                         /* store key and header indexed by the key hash */
146                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
147                         if (dd == NULL) {
148                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
149                                 return -1;
150                         }
151                         dd->ctdb      = ctdb;
152                         dd->ctdb_db   = ctdb_db;
153                         dd->key.dsize = key.dsize;
154                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
155                         if (dd->key.dptr == NULL) {
156                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
157                                 return -1;
158                         }
159
160                         dd->hdr = *hdr;
161         
162                         trbt_insert32(vdata->delete_tree, hash, dd);
163
164                         vdata->delete_count++;
165                 }
166         }
167
168         /* add the record to the blob ready to send to the nodes */
169         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
170         if (rec == NULL) {
171                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
172                 vdata->traverse_error = true;
173                 return -1;
174         }
175         old_size = talloc_get_size(vdata->list[lmaster]);
176         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
177                                                    old_size + rec->length);
178         if (vdata->list[lmaster] == NULL) {
179                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
180                 vdata->traverse_error = true;
181                 return -1;
182         }
183         vdata->list[lmaster]->count++;
184         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
185         talloc_free(rec);
186
187         vdata->total++;
188
189         return 0;
190 }
191
192 /*
193  * traverse the tree of records to delete and marshall them into
194  * a blob
195  */
196 static void delete_traverse(void *param, void *data)
197 {
198         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
199         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
200         struct ctdb_rec_data *rec;
201         size_t old_size;
202
203         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
204         if (rec == NULL) {
205                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
206                 return;
207         }
208
209         old_size = talloc_get_size(recs->records);
210         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
211         if (recs->records == NULL) {
212                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
213                 return;
214         }
215         recs->records->count++;
216         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
217 }
218
219 /* 
220  * read-only traverse the database in order to find
221  * records that can be deleted and try to delete these
222  * records on the other nodes
223  * this executes in the child context
224  */
225 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
226 {
227         struct ctdb_context *ctdb = ctdb_db->ctdb;
228         const char *name = ctdb_db->db_name;
229         int ret, i, pnn;
230
231         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
232         if (ret != 0) {
233                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
234                 return ret;
235         }
236
237         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
238         if (pnn == -1) {
239                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
240                 return -1;
241         }
242
243         ctdb->pnn = pnn;
244         /* the list needs to be of length num_nodes */
245         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
246         if (vdata->list == NULL) {
247                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
248                 return -1;
249         }
250         for (i = 0; i < ctdb->num_nodes; i++) {
251                 vdata->list[i] = (struct ctdb_marshall_buffer *)
252                         talloc_zero_size(vdata->list, 
253                                                          offsetof(struct ctdb_marshall_buffer, data));
254                 if (vdata->list[i] == NULL) {
255                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
256                         return -1;
257                 }
258                 vdata->list[i]->db_id = ctdb_db->db_id;
259         }
260
261         /* read-only traverse, looking for records that might be able to be vacuumed */
262         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
263             vdata->traverse_error) {
264                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
265                 return -1;              
266         }
267
268         /*
269          * For records where we are not the lmaster,
270          * tell the lmaster to fetch the record.
271          */
272         for (i = 0; i < ctdb->num_nodes; i++) {
273                 TDB_DATA data;
274
275                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
276                         continue;
277                 }
278
279                 if (vdata->list[i]->count == 0) {
280                         continue;
281                 }
282
283                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
284                                    vdata->list[i]->count, ctdb->nodes[i]->pnn,
285                                    name));
286
287                 data.dsize = talloc_get_size(vdata->list[i]);
288                 data.dptr  = (void *)vdata->list[i];
289                 if (ctdb_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
290                         DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
291                                           "fetch message to %u\n",
292                                           ctdb->nodes[i]->pnn));
293                         return -1;
294                 }
295         }       
296
297         /* Process all records we can delete (if any) */
298         if (vdata->delete_count > 0) {
299                 struct delete_records_list *recs;
300                 TDB_DATA indata, outdata;
301                 int32_t res;
302                 struct ctdb_node_map *nodemap;
303                 uint32_t *active_nodes;
304                 int num_active_nodes;
305
306                 recs = talloc_zero(vdata, struct delete_records_list);
307                 if (recs == NULL) {
308                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
309                         return -1;
310                 }
311                 recs->records = (struct ctdb_marshall_buffer *)
312                         talloc_zero_size(vdata, 
313                                     offsetof(struct ctdb_marshall_buffer, data));
314                 if (recs->records == NULL) {
315                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
316                         return -1;
317                 }
318                 recs->records->db_id = ctdb_db->db_id;
319
320                 /* 
321                  * traverse the tree of all records we want to delete and
322                  * create a blob we can send to the other nodes.
323                  */
324                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
325
326                 indata.dsize = talloc_get_size(recs->records);
327                 indata.dptr  = (void *)recs->records;
328
329                 /* 
330                  * now tell all the active nodes to delete all these records
331                  * (if possible)
332                  */
333
334                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
335                                            CTDB_CURRENT_NODE,
336                                            recs, /* talloc context */
337                                            &nodemap);
338                 if (ret != 0) {
339                         DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
340                         return -1;
341                 }
342
343                 active_nodes = list_of_active_nodes(ctdb, nodemap,
344                                                     nodemap, /* talloc context */
345                                                     false /* include self */);
346                 /* yuck! ;-) */
347                 num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
348
349                 for (i = 0; i < num_active_nodes; i++) {
350                         struct ctdb_marshall_buffer *records;
351                         struct ctdb_rec_data *rec;
352
353                         ret = ctdb_control(ctdb, active_nodes[i], 0,
354                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
355                                         indata, recs, &outdata, &res,
356                                         NULL, NULL);
357                         if (ret != 0 || res != 0) {
358                                 DEBUG(DEBUG_ERR, ("Failed to delete records on "
359                                                   "node %u: ret[%d] res[%d]\n",
360                                                   active_nodes[i], ret, res));
361                                 return -1;
362                         }
363
364                         /* 
365                          * outdata countains the list of records coming back
366                          * from the node which the node could not delete
367                          */
368                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
369                         rec = (struct ctdb_rec_data *)&records->data[0];
370                         while (records->count-- > 1) {
371                                 TDB_DATA reckey, recdata;
372                                 struct ctdb_ltdb_header *rechdr;
373
374                                 reckey.dptr = &rec->data[0];
375                                 reckey.dsize = rec->keylen;
376                                 recdata.dptr = &rec->data[reckey.dsize];
377                                 recdata.dsize = rec->datalen;
378
379                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
380                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
381                                         return -1;
382                                 }
383                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
384                                 recdata.dptr += sizeof(*rechdr);
385                                 recdata.dsize -= sizeof(*rechdr);
386
387                                 /* 
388                                  * that other node couldnt delete the record
389                                  * so we should delete it and thereby remove it from the tree
390                                  */
391                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
392
393                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
394                         }           
395                 }
396
397                 /* free nodemap and active_nodes */
398                 talloc_free(nodemap);
399
400                 /* 
401                  * The only records remaining in the tree would be those
402                  * records where all other nodes could successfully
403                  * delete them, so we can safely delete them on the
404                  * lmaster as well. Deletion implictely happens while
405                  * we repack the database. The repack algorithm revisits 
406                  * the tree in order to find the records that don't need
407                  * to be copied / repacked.
408                  */
409         }
410
411         /* this ensures we run our event queue */
412         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
413
414         return 0;
415 }
416
417
418 /*
419  * traverse function for repacking
420  */
421 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
422 {
423         struct vacuum_data *vdata = (struct vacuum_data *)private;
424
425         if (vdata->vacuum) {
426                 uint32_t hash = ctdb_hash(&key);
427                 struct delete_record_data *kd;
428                 /*
429                  * check if we can ignore this record because it's in the delete_tree
430                  */
431                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
432                 /*
433                  * there might be hash collisions so we have to compare the keys here to be sure
434                  */
435                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
436                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
437                         /*
438                          * we have to check if the record hasn't changed in the meantime in order to
439                          * savely remove it from the database
440                          */
441                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
442                                 hdr->dmaster == kd->ctdb->pnn &&
443                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
444                                 kd->hdr.rsn == hdr->rsn) {
445                                 vdata->vacuumed++;
446                                 return 0;
447                         }
448                 }
449         }
450         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
451                 vdata->traverse_error = true;
452                 return -1;
453         }
454         vdata->copied++;
455         return 0;
456 }
457
458 /*
459  * repack a tdb
460  */
461 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
462 {
463         struct tdb_context *tmp_db;
464
465         if (tdb_transaction_start(tdb) != 0) {
466                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
467                 return -1;
468         }
469
470         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
471                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
472                           O_RDWR|O_CREAT, 0);
473         if (tmp_db == NULL) {
474                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
475                 tdb_transaction_cancel(tdb);
476                 return -1;
477         }
478
479         vdata->traverse_error = false;
480         vdata->dest_db = tmp_db;
481         vdata->vacuum = true;
482         vdata->vacuumed = 0;
483         vdata->copied = 0;
484
485         /*
486          * repack and vacuum on-the-fly by not writing the records that are
487          * no longer needed
488          */
489         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
490                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
491                 tdb_transaction_cancel(tdb);
492                 tdb_close(tmp_db);
493                 return -1;              
494         }
495
496         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
497         
498         if (vdata->traverse_error) {
499                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
500                 tdb_transaction_cancel(tdb);
501                 tdb_close(tmp_db);
502                 return -1;
503         }
504
505         if (tdb_wipe_all(tdb) != 0) {
506                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
507                 tdb_transaction_cancel(tdb);
508                 tdb_close(tmp_db);
509                 return -1;
510         }
511
512         vdata->traverse_error = false;
513         vdata->dest_db = tdb;
514         vdata->vacuum = false;
515         vdata->copied = 0;
516
517         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
518                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
519                 tdb_transaction_cancel(tdb);
520                 tdb_close(tmp_db);
521                 return -1;              
522         }
523
524         if (vdata->traverse_error) {
525                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
526                 tdb_transaction_cancel(tdb);
527                 tdb_close(tmp_db);
528                 return -1;
529         }
530
531         tdb_close(tmp_db);
532
533
534         if (tdb_transaction_commit(tdb) != 0) {
535                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
536                 return -1;
537         }
538         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
539
540         return 0;
541 }
542
543 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
544 {
545         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
546         TDB_CONTEXT *tune_tdb;
547         TDB_DATA key, value;
548         struct vacuum_tuning_data tdata;
549         struct vacuum_tuning_data *tptr;
550         char *vac_dbname;
551         int flags;
552
553         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
554                                      ctdb_db->ctdb->db_directory_state,
555                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
556         if (vac_dbname == NULL) {
557                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
558                 talloc_free(tmp_ctx);
559                 return -1;
560         }
561
562         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
563         flags |= TDB_DISALLOW_NESTING;
564         tune_tdb = tdb_open(vac_dbname, 0,
565                             flags,
566                             O_RDWR|O_CREAT, 0600);
567         if (tune_tdb == NULL) {
568                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
569                 talloc_free(tmp_ctx);
570                 return -1;
571         }
572         
573         if (tdb_transaction_start(tune_tdb) != 0) {
574                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
575                 tdb_close(tune_tdb);
576                 return -1;
577         }
578         key.dptr = discard_const(ctdb_db->db_name);
579         key.dsize = strlen(ctdb_db->db_name);
580         value = tdb_fetch(tune_tdb, key);
581
582         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
583                 tptr = (struct vacuum_tuning_data *)value.dptr;
584                 tdata = *tptr;
585
586                 /*
587                  * re-calc new vacuum interval:
588                  * in case no limit was reached we continously increase the interval
589                  * until vacuum_max_interval is reached
590                  * in case a limit was reached we divide the current interval by 2
591                  * unless vacuum_min_interval is reached
592                  */
593                 if (freelist < vdata->repack_limit &&
594                     vdata->delete_count < vdata->vacuum_limit) {
595                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
596                                 tdata.new_interval = tdata.last_interval * 110 / 100;
597                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
598                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
599                         }
600                 } else {
601                         tdata.new_interval = tdata.last_interval / 2;
602                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
603                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
604                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
605                         }               
606                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
607                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
608                 }
609                 tdata.last_interval = tdata.new_interval;
610         } else {
611                 DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
612                 tdata.last_num_repack = freelist;
613                 tdata.last_num_empty = vdata->delete_count;
614                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
615         }
616
617         if (value.dptr != NULL) {
618                 free(value.dptr);
619         }
620
621         tdata.last_start = vdata->start;
622         tdata.last_duration = timeval_elapsed(&vdata->start);
623
624         value.dptr = (unsigned char *)&tdata;
625         value.dsize = sizeof(tdata);
626
627         if (tdb_store(tune_tdb, key, value, 0) != 0) {
628                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
629                 tdb_transaction_cancel(tune_tdb);
630                 tdb_close(tune_tdb);
631                 talloc_free(tmp_ctx);
632                 return -1;
633         }
634         tdb_transaction_commit(tune_tdb);
635         tdb_close(tune_tdb);
636         talloc_free(tmp_ctx);
637
638         return 0;
639 }
640
641 /*
642  * repack and vaccum a db
643  * called from the child context
644  */
645 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
646                                      TALLOC_CTX *mem_ctx)
647 {
648         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
649         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
650         const char *name = ctdb_db->db_name;
651         int size;
652         struct vacuum_data *vdata;
653
654         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
655         if (size == -1) {
656                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
657                 return -1;
658         }
659
660         vdata = talloc_zero(mem_ctx, struct vacuum_data);
661         if (vdata == NULL) {
662                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
663                 return -1;
664         }
665
666         vdata->ctdb = ctdb_db->ctdb;
667         vdata->vacuum_limit = vacuum_limit;
668         vdata->repack_limit = repack_limit;
669         vdata->delete_tree = trbt_create(vdata, 0);
670         if (vdata->delete_tree == NULL) {
671                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
672                 talloc_free(vdata);
673                 return -1;
674         }
675
676         vdata->start = timeval_current();
677  
678         /*
679          * gather all records that can be deleted in vdata
680          */
681         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
682                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
683         }
684
685         /*
686          * decide if a repack is necessary
687          */
688         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
689                 update_tuning_db(ctdb_db, vdata, size);
690                 talloc_free(vdata);
691                 return 0;
692         }
693
694         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
695                         name, size, vdata->delete_count));
696
697         /*
698          * repack and implicitely get rid of the records we can delete
699          */
700         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
701                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
702                 update_tuning_db(ctdb_db, vdata, size);
703                 talloc_free(vdata);
704                 return -1;
705         }
706         update_tuning_db(ctdb_db, vdata, size);
707         talloc_free(vdata);
708
709         return 0;
710 }
711
712 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
713 {
714         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
715         TDB_CONTEXT *tdb;
716         TDB_DATA key, value;
717         char *vac_dbname;
718         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
719         struct ctdb_context *ctdb = ctdb_db->ctdb;
720         int flags;
721
722         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
723         if (vac_dbname == NULL) {
724                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
725                 talloc_free(tmp_ctx);
726                 return interval;
727         }
728
729         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
730         flags |= TDB_DISALLOW_NESTING;
731         tdb = tdb_open(vac_dbname, 0,
732                        flags,
733                        O_RDWR|O_CREAT, 0600);
734         if (!tdb) {
735                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
736                 talloc_free(tmp_ctx);
737                 return interval;
738         }
739
740         key.dptr = discard_const(ctdb_db->db_name);
741         key.dsize = strlen(ctdb_db->db_name);
742
743         value = tdb_fetch(tdb, key);
744
745         if (value.dptr != NULL) {
746                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
747                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
748
749                         interval = tptr->new_interval;
750
751                         if (interval < ctdb->tunable.vacuum_min_interval) {
752                                 interval = ctdb->tunable.vacuum_min_interval;
753                         } 
754                         if (interval > ctdb->tunable.vacuum_max_interval) {
755                                 interval = ctdb->tunable.vacuum_max_interval;
756                         }
757                 }
758                 free(value.dptr);
759         }
760         tdb_close(tdb);
761
762         talloc_free(tmp_ctx);
763
764         return interval;
765 }
766
767 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
768 {
769         double l = timeval_elapsed(&child_ctx->start_time);
770         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
771         struct ctdb_context *ctdb = ctdb_db->ctdb;
772
773         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
774
775         if (child_ctx->child_pid != -1) {
776                 kill(child_ctx->child_pid, SIGKILL);
777         }
778
779         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
780                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
781                         ctdb_vacuum_event, child_ctx->vacuum_handle);
782
783         return 0;
784 }
785
786 /*
787  * this event is generated when a vacuum child process times out
788  */
789 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
790                                          struct timeval t, void *private_data)
791 {
792         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
793
794         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
795
796         child_ctx->status = VACUUM_TIMEOUT;
797
798         talloc_free(child_ctx);
799 }
800
801
802 /*
803  * this event is generated when a vacuum child process has completed
804  */
805 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
806                              uint16_t flags, void *private_data)
807 {
808         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
809         char c = 0;
810         int ret;
811
812         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
813         child_ctx->child_pid = -1;
814
815         ret = read(child_ctx->fd[0], &c, 1);
816         if (ret != 1 || c != 0) {
817                 child_ctx->status = VACUUM_ERROR;
818                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
819         } else {
820                 child_ctx->status = VACUUM_OK;
821         }
822
823         talloc_free(child_ctx);
824 }
825
826 /*
827  * this event is called every time we need to start a new vacuum process
828  */
829 static void
830 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
831                                struct timeval t, void *private_data)
832 {
833         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
834         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
835         struct ctdb_context *ctdb = ctdb_db->ctdb;
836         struct ctdb_vacuum_child_context *child_ctx;
837         int ret;
838
839         /* we dont vacuum if we are in recovery mode */
840         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
841                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
842                 return;
843         }
844
845         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
846         if (child_ctx == NULL) {
847                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
848                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
849         }
850
851
852         ret = pipe(child_ctx->fd);
853         if (ret != 0) {
854                 talloc_free(child_ctx);
855                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
856                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
857                 return;
858         }
859
860         child_ctx->child_pid = fork();
861         if (child_ctx->child_pid == (pid_t)-1) {
862                 close(child_ctx->fd[0]);
863                 close(child_ctx->fd[1]);
864                 talloc_free(child_ctx);
865                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
866                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
867                 return;
868         }
869
870
871         if (child_ctx->child_pid == 0) {
872                 char cc = 0;
873                 close(child_ctx->fd[0]);
874
875                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
876         
877                 if (switch_from_server_to_client(ctdb) != 0) {
878                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
879                         _exit(1);
880                 }
881
882                 /* 
883                  * repack the db
884                  */
885                 cc = ctdb_vacuum_and_repack_db(ctdb_db, child_ctx);
886
887                 write(child_ctx->fd[1], &cc, 1);
888                 _exit(0);
889         }
890
891         set_close_on_exec(child_ctx->fd[0]);
892         close(child_ctx->fd[1]);
893
894         child_ctx->status = VACUUM_RUNNING;
895         child_ctx->start_time = timeval_current();
896
897         talloc_set_destructor(child_ctx, vacuum_child_destructor);
898
899         /*
900          * Clear the fastpath vacuuming list in the parent.
901          */
902         talloc_free(ctdb_db->delete_queue);
903         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
904         if (ctdb_db->delete_queue == NULL) {
905                 /* fatal here? ... */
906                 ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
907                                  "in parent context. Shutting down\n");
908         }
909
910         event_add_timed(ctdb->ev, child_ctx,
911                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
912                 vacuum_child_timeout, child_ctx);
913
914         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
915
916         event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
917                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
918                 vacuum_child_handler,
919                 child_ctx);
920
921         vacuum_handle->child_ctx = child_ctx;
922         child_ctx->vacuum_handle = vacuum_handle;
923 }
924
925
926 /* this function initializes the vacuuming context for a database
927  * starts the vacuuming events
928  */
929 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
930 {
931         if (ctdb_db->persistent != 0) {
932                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
933                 return 0;
934         }
935
936         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
937         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
938
939         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
940
941         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
942                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
943                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
944
945         return 0;
946 }