reduce the log level for three vacuuming related log messages
[metze/ctdb/wip.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/events/events.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_handle *vacuum_handle;
40         int fd[2];
41         pid_t child_pid;
42         enum vacuum_child_status status;
43         struct timeval start_time;
44 };
45
46 struct ctdb_vacuum_handle {
47         struct ctdb_db_context *ctdb_db;
48         struct ctdb_vacuum_child_context *child_ctx;
49 };
50
51
52 /*  a list of records to possibly delete */
53 struct vacuum_data {
54         uint32_t vacuum_limit;
55         uint32_t repack_limit;
56         struct ctdb_context *ctdb;
57         struct ctdb_db_context *ctdb_db;
58         struct tdb_context *dest_db;
59         trbt_tree_t *delete_tree;
60         uint32_t delete_count;
61         struct ctdb_marshall_buffer **list;
62         struct timeval start;
63         bool traverse_error;
64         bool vacuum;
65         uint32_t total;
66         uint32_t vacuumed;
67         uint32_t copied;
68 };
69
70 /* tuning information stored for every db */
71 struct vacuum_tuning_data {
72         uint32_t last_num_repack;
73         uint32_t last_num_empty;
74         uint32_t last_interval;
75         uint32_t new_interval;
76         struct timeval last_start;
77         double   last_duration;
78 };
79
80 /* this structure contains the information for one record to be deleted */
81 struct delete_record_data {
82         struct ctdb_context *ctdb;
83         struct ctdb_db_context *ctdb_db;
84         struct ctdb_ltdb_header hdr;
85         TDB_DATA key;
86 };
87
88 struct delete_records_list {
89         struct ctdb_marshall_buffer *records;
90 };
91
92 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
93                                                           struct timeval t, void *private_data);
94
95
96 /*
97  * traverse function for gathering the records that can be deleted
98  */
99 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
100 {
101         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
102         struct ctdb_context *ctdb = vdata->ctdb;
103         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
104         uint32_t lmaster;
105         struct ctdb_ltdb_header *hdr;
106         struct ctdb_rec_data *rec;
107         size_t old_size;
108                
109         lmaster = ctdb_lmaster(ctdb, &key);
110         if (lmaster >= ctdb->vnn_map->size) {
111                 return 0;
112         }
113
114         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
115                 /* its not a deleted record */
116                 return 0;
117         }
118
119         hdr = (struct ctdb_ltdb_header *)data.dptr;
120
121         if (hdr->dmaster != ctdb->pnn) {
122                 return 0;
123         }
124
125         /* is this a records we could possibly delete? I.e.
126            if the record is empty and also we are both lmaster
127            and dmaster for the record we should be able to delete it
128         */
129         if (lmaster == ctdb->pnn) {
130                 uint32_t hash;
131
132                 hash = ctdb_hash(&key);
133                 if (trbt_lookup32(vdata->delete_tree, hash)) {
134                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
135                 } 
136                 else {
137                         struct delete_record_data *dd;
138
139                         /* store key and header indexed by the key hash */
140                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
141                         if (dd == NULL) {
142                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
143                                 return -1;
144                         }
145                         dd->ctdb      = ctdb;
146                         dd->ctdb_db   = ctdb_db;
147                         dd->key.dsize = key.dsize;
148                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
149                         if (dd->key.dptr == NULL) {
150                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
151                                 return -1;
152                         }
153
154                         dd->hdr = *hdr;
155         
156                         trbt_insert32(vdata->delete_tree, hash, dd);
157
158                         vdata->delete_count++;
159                 }
160         }
161
162         /* add the record to the blob ready to send to the nodes */
163         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
164         if (rec == NULL) {
165                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
166                 vdata->traverse_error = true;
167                 return -1;
168         }
169         old_size = talloc_get_size(vdata->list[lmaster]);
170         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
171                                                    old_size + rec->length);
172         if (vdata->list[lmaster] == NULL) {
173                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
174                 vdata->traverse_error = true;
175                 return -1;
176         }
177         vdata->list[lmaster]->count++;
178         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
179         talloc_free(rec);
180
181         vdata->total++;
182
183         return 0;
184 }
185
186 /*
187  * traverse the tree of records to delete and marshall them into
188  * a blob
189  */
190 static void delete_traverse(void *param, void *data)
191 {
192         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
193         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
194         struct ctdb_rec_data *rec;
195         size_t old_size;
196
197         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
198         if (rec == NULL) {
199                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
200                 return;
201         }
202
203         old_size = talloc_get_size(recs->records);
204         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
205         if (recs->records == NULL) {
206                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
207                 return;
208         }
209         recs->records->count++;
210         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
211 }
212
213 /* 
214  * read-only traverse the database in order to find
215  * records that can be deleted and try to delete these
216  * records on the other nodes
217  * this executes in the child context
218  */
219 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
220 {
221         struct ctdb_context *ctdb = ctdb_db->ctdb;
222         const char *name = ctdb_db->db_name;
223         int ret, i, pnn;
224
225         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
226         if (ret != 0) {
227                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
228                 return ret;
229         }
230
231         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
232         if (pnn == -1) {
233                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
234                 return -1;
235         }
236
237         ctdb->pnn = pnn;
238         /* the list needs to be of length num_nodes */
239         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->vnn_map->size);
240         if (vdata->list == NULL) {
241                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
242                 return -1;
243         }
244         for (i = 0; i < ctdb->vnn_map->size; i++) {
245                 vdata->list[i] = (struct ctdb_marshall_buffer *)
246                         talloc_zero_size(vdata->list, 
247                                                          offsetof(struct ctdb_marshall_buffer, data));
248                 if (vdata->list[i] == NULL) {
249                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
250                         return -1;
251                 }
252                 vdata->list[i]->db_id = ctdb_db->db_id;
253         }
254
255         /* read-only traverse, looking for records that might be able to be vacuumed */
256         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
257             vdata->traverse_error) {
258                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
259                 return -1;              
260         }
261
262         for ( i = 0; i < ctdb->vnn_map->size; i++) {
263                 if (vdata->list[i]->count == 0) {
264                         continue;
265                 }
266
267                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
268                 if (ctdb->vnn_map->map[i] != ctdb->pnn) {
269                         TDB_DATA data;
270                         DEBUG(DEBUG_NOTICE,("Found %u records for lmaster %u in '%s'\n", 
271                                                                 vdata->list[i]->count, i, name));
272
273                         data.dsize = talloc_get_size(vdata->list[i]);
274                         data.dptr  = (void *)vdata->list[i];
275                         if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
276                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
277                                          ctdb->vnn_map->map[i]));
278                                 return -1;              
279                         }
280                         continue;
281                 }
282         }       
283
284         /* Process all records we can delete (if any) */
285         if (vdata->delete_count > 0) {
286                 struct delete_records_list *recs;
287                 TDB_DATA indata, outdata;
288                 int32_t res;
289
290                 recs = talloc_zero(vdata, struct delete_records_list);
291                 if (recs == NULL) {
292                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
293                         return -1;
294                 }
295                 recs->records = (struct ctdb_marshall_buffer *)
296                         talloc_zero_size(vdata, 
297                                     offsetof(struct ctdb_marshall_buffer, data));
298                 if (recs->records == NULL) {
299                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
300                         return -1;
301                 }
302                 recs->records->db_id = ctdb_db->db_id;
303
304                 /* 
305                  * traverse the tree of all records we want to delete and
306                  * create a blob we can send to the other nodes.
307                  */
308                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
309
310                 indata.dsize = talloc_get_size(recs->records);
311                 indata.dptr  = (void *)recs->records;
312
313                 /* 
314                  * now tell all the other nodes to delete all these records
315                  * (if possible)
316                  */
317                 for (i = 0; i < ctdb->vnn_map->size; i++) {
318                         struct ctdb_marshall_buffer *records;
319                         struct ctdb_rec_data *rec;
320
321                         if (ctdb->vnn_map->map[i] == ctdb->pnn) {
322                                 /* we dont delete the records on the local node just yet */
323                                 continue;
324                         }
325
326                         ret = ctdb_control(ctdb, ctdb->vnn_map->map[i], 0,
327                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
328                                         indata, recs, &outdata, &res,
329                                         NULL, NULL);
330                         if (ret != 0 || res != 0) {
331                                 DEBUG(DEBUG_ERR,("Failed to delete records on node %u\n", ctdb->vnn_map->map[i]));
332                                 return -1;
333                         }
334
335                         /* 
336                          * outdata countains the list of records coming back
337                          * from the node which the node could not delete
338                          */
339                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
340                         rec = (struct ctdb_rec_data *)&records->data[0];
341                         while (records->count-- > 1) {
342                                 TDB_DATA reckey, recdata;
343                                 struct ctdb_ltdb_header *rechdr;
344
345                                 reckey.dptr = &rec->data[0];
346                                 reckey.dsize = rec->keylen;
347                                 recdata.dptr = &rec->data[reckey.dsize];
348                                 recdata.dsize = rec->datalen;
349
350                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
351                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
352                                         return -1;
353                                 }
354                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
355                                 recdata.dptr += sizeof(*rechdr);
356                                 recdata.dsize -= sizeof(*rechdr);
357
358                                 /* 
359                                  * that other node couldnt delete the record
360                                  * so we should delete it and thereby remove it from the tree
361                                  */
362                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
363
364                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
365                         }           
366                 }
367
368                 /* 
369                  * The only records remaining in the tree would be those
370                  * records where all other nodes could successfully
371                  * delete them, so we can safely delete them on the
372                  * lmaster as well. Deletion implictely happens while
373                  * we repack the database. The repack algorithm revisits 
374                  * the tree in order to find the records that don't need
375                  * to be copied / repacked.
376                  */
377         }
378
379         /* this ensures we run our event queue */
380         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
381
382         return 0;
383 }
384
385
386 /*
387  * traverse function for repacking
388  */
389 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
390 {
391         struct vacuum_data *vdata = (struct vacuum_data *)private;
392
393         if (vdata->vacuum) {
394                 uint32_t hash = ctdb_hash(&key);
395                 struct delete_record_data *kd;
396                 /*
397                  * check if we can ignore this record because it's in the delete_tree
398                  */
399                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
400                 /*
401                  * there might be hash collisions so we have to compare the keys here to be sure
402                  */
403                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
404                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
405                         /*
406                          * we have to check if the record hasn't changed in the meantime in order to
407                          * savely remove it from the database
408                          */
409                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
410                                 hdr->dmaster == kd->ctdb->pnn &&
411                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
412                                 kd->hdr.rsn == hdr->rsn) {
413                                 vdata->vacuumed++;
414                                 return 0;
415                         }
416                 }
417         }
418         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
419                 vdata->traverse_error = true;
420                 return -1;
421         }
422         vdata->copied++;
423         return 0;
424 }
425
426 /*
427  * repack a tdb
428  */
429 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
430 {
431         struct tdb_context *tmp_db;
432
433         if (tdb_transaction_start(tdb) != 0) {
434                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
435                 return -1;
436         }
437
438         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
439         if (tmp_db == NULL) {
440                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
441                 tdb_transaction_cancel(tdb);
442                 return -1;
443         }
444
445         vdata->traverse_error = false;
446         vdata->dest_db = tmp_db;
447         vdata->vacuum = true;
448         vdata->vacuumed = 0;
449         vdata->copied = 0;
450
451         /*
452          * repack and vacuum on-the-fly by not writing the records that are
453          * no longer needed
454          */
455         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
456                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
457                 tdb_transaction_cancel(tdb);
458                 tdb_close(tmp_db);
459                 return -1;              
460         }
461
462         DEBUG(DEBUG_NOTICE,(__location__ " %u records vacuumed\n", vdata->vacuumed));
463         
464         if (vdata->traverse_error) {
465                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
466                 tdb_transaction_cancel(tdb);
467                 tdb_close(tmp_db);
468                 return -1;
469         }
470
471         if (tdb_wipe_all(tdb) != 0) {
472                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
473                 tdb_transaction_cancel(tdb);
474                 tdb_close(tmp_db);
475                 return -1;
476         }
477
478         vdata->traverse_error = false;
479         vdata->dest_db = tdb;
480         vdata->vacuum = false;
481         vdata->copied = 0;
482
483         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
484                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
485                 tdb_transaction_cancel(tdb);
486                 tdb_close(tmp_db);
487                 return -1;              
488         }
489
490         if (vdata->traverse_error) {
491                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
492                 tdb_transaction_cancel(tdb);
493                 tdb_close(tmp_db);
494                 return -1;
495         }
496
497         tdb_close(tmp_db);
498
499
500         if (tdb_transaction_commit(tdb) != 0) {
501                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
502                 return -1;
503         }
504         DEBUG(DEBUG_NOTICE,(__location__ " %u records copied\n", vdata->copied));
505
506         return 0;
507 }
508
509 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
510 {
511         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
512         TDB_CONTEXT *tune_tdb;
513         TDB_DATA key, value;
514         struct vacuum_tuning_data tdata;
515         struct vacuum_tuning_data *tptr;
516         char *vac_dbname;
517
518         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
519                                         ctdb_db->ctdb->db_directory, 
520                                         TUNINGDBNAME, ctdb_db->ctdb->pnn);
521         if (vac_dbname == NULL) {
522                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
523                 talloc_free(tmp_ctx);
524                 return -1;
525         }
526
527         tune_tdb = tdb_open(vac_dbname, 0, 0, O_RDWR|O_CREAT, 0644);
528         if (tune_tdb == NULL) {
529                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
530                 talloc_free(tmp_ctx);
531                 return -1;
532         }
533         
534         if (tdb_transaction_start(tune_tdb) != 0) {
535                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
536                 tdb_close(tune_tdb);
537                 return -1;
538         }
539         key.dptr = discard_const(ctdb_db->db_name);
540         key.dsize = strlen(ctdb_db->db_name);
541         value = tdb_fetch(tune_tdb, key);
542
543         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
544                 tptr = (struct vacuum_tuning_data *)value.dptr;
545                 tdata = *tptr;
546
547                 /*
548                  * re-calc new vacuum interval:
549                  * in case no limit was reached we continously increase the interval
550                  * until vacuum_max_interval is reached
551                  * in case a limit was reached we divide the current interval by 2
552                  * unless vacuum_min_interval is reached
553                  */
554                 if (freelist < vdata->repack_limit &&
555                     vdata->delete_count < vdata->vacuum_limit) {
556                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
557                                 tdata.new_interval = tdata.last_interval * 110 / 100;
558                                 DEBUG(DEBUG_NOTICE,("Increasing vacuum interval %u -> %u for %s\n", 
559                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
560                         }
561                 } else {
562                         tdata.new_interval = tdata.last_interval / 2;
563                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
564                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
565                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
566                         }               
567                         DEBUG(DEBUG_ERR,("Decreasing vacuum interval %u -> %u for %s\n", 
568                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
569                 }
570                 tdata.last_interval = tdata.new_interval;
571         } else {
572                 DEBUG(DEBUG_ERR,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
573                 tdata.last_num_repack = freelist;
574                 tdata.last_num_empty = vdata->delete_count;
575                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
576         }
577
578         if (value.dptr != NULL) {
579                 free(value.dptr);
580         }
581
582         tdata.last_start = vdata->start;
583         tdata.last_duration = timeval_elapsed(&vdata->start);
584
585         value.dptr = (unsigned char *)&tdata;
586         value.dsize = sizeof(tdata);
587
588         if (tdb_store(tune_tdb, key, value, 0) != 0) {
589                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
590                 tdb_transaction_cancel(tune_tdb);
591                 tdb_close(tune_tdb);
592                 talloc_free(tmp_ctx);
593                 return -1;
594         }
595         tdb_transaction_commit(tune_tdb);
596         tdb_close(tune_tdb);
597         talloc_free(tmp_ctx);
598
599         return 0;
600 }
601
602 /*
603  * repack and vaccum a db
604  * called from the child context
605  */
606 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
607 {
608         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
609         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
610         const char *name = ctdb_db->db_name;
611         int size;
612         struct vacuum_data *vdata;
613
614         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
615         if (size == -1) {
616                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
617                 return -1;
618         }
619
620         vdata = talloc_zero(mem_ctx, struct vacuum_data);
621         if (vdata == NULL) {
622                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
623                 return -1;
624         }
625
626         vdata->ctdb = ctdb_db->ctdb;
627         vdata->vacuum_limit = vacuum_limit;
628         vdata->repack_limit = repack_limit;
629         vdata->delete_tree = trbt_create(vdata, 0);
630         if (vdata->delete_tree == NULL) {
631                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
632                 talloc_free(vdata);
633                 return -1;
634         }
635
636         vdata->start = timeval_current();
637  
638         /*
639          * gather all records that can be deleted in vdata
640          */
641         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
642                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
643         }
644
645         /*
646          * decide if a repack is necessary
647          */
648         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
649                 update_tuning_db(ctdb_db, vdata, size);
650                 talloc_free(vdata);
651                 return 0;
652         }
653
654         DEBUG(DEBUG_NOTICE,("Repacking %s with %u freelist entries and %u records to delete\n", 
655                         name, size, vdata->delete_count));
656
657         /*
658          * repack and implicitely get rid of the records we can delete
659          */
660         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
661                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
662                 update_tuning_db(ctdb_db, vdata, size);
663                 talloc_free(vdata);
664                 return -1;
665         }
666         update_tuning_db(ctdb_db, vdata, size);
667         talloc_free(vdata);
668
669         return 0;
670 }
671
672 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
673 {
674         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
675         TDB_CONTEXT *tdb;
676         TDB_DATA key, value;
677         char *vac_dbname;
678         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
679         struct ctdb_context *ctdb = ctdb_db->ctdb;
680
681         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
682         if (vac_dbname == NULL) {
683                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
684                 talloc_free(tmp_ctx);
685                 return interval;
686         }
687
688         tdb = tdb_open(vac_dbname, 0, 0, O_RDWR|O_CREAT, 0644);
689         if (!tdb) {
690                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval\n", vac_dbname));
691                 talloc_free(tmp_ctx);
692                 return interval;
693         }
694
695         key.dptr = discard_const(ctdb_db->db_name);
696         key.dsize = strlen(ctdb_db->db_name);
697
698         value = tdb_fetch(tdb, key);
699
700         if (value.dptr != NULL) {
701                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
702                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
703
704                         interval = tptr->new_interval;
705
706                         if (interval < ctdb->tunable.vacuum_min_interval) {
707                                 interval = ctdb->tunable.vacuum_min_interval;
708                         } 
709                         if (interval > ctdb->tunable.vacuum_max_interval) {
710                                 interval = ctdb->tunable.vacuum_max_interval;
711                         }
712                 }
713                 free(value.dptr);
714         }
715         tdb_close(tdb);
716
717         talloc_free(tmp_ctx);
718
719         return interval;
720 }
721
722 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
723 {
724         double l = timeval_elapsed(&child_ctx->start_time);
725         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
726         struct ctdb_context *ctdb = ctdb_db->ctdb;
727
728         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
729
730         if (child_ctx->child_pid != -1) {
731                 kill(child_ctx->child_pid, SIGKILL);
732         }
733
734         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
735                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
736                         ctdb_vacuum_event, child_ctx->vacuum_handle);
737
738         return 0;
739 }
740
741 /*
742  * this event is generated when a vacuum child process times out
743  */
744 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
745                                          struct timeval t, void *private_data)
746 {
747         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
748
749         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
750
751         child_ctx->status = VACUUM_TIMEOUT;
752
753         talloc_free(child_ctx);
754 }
755
756
757 /*
758  * this event is generated when a vacuum child process has completed
759  */
760 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
761                              uint16_t flags, void *private_data)
762 {
763         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
764         char c = 0;
765         int ret;
766
767         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
768         child_ctx->child_pid = -1;
769
770         ret = read(child_ctx->fd[0], &c, 1);
771         if (ret != 1 || c != 0) {
772                 child_ctx->status = VACUUM_ERROR;
773                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
774         } else {
775                 child_ctx->status = VACUUM_OK;
776         }
777
778         talloc_free(child_ctx);
779 }
780
781 /*
782  * this event is called every time we need to start a new vacuum process
783  */
784 static void
785 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
786                                struct timeval t, void *private_data)
787 {
788         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
789         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
790         struct ctdb_context *ctdb = ctdb_db->ctdb;
791         struct ctdb_vacuum_child_context *child_ctx;
792         int ret;
793
794         /* we dont vacuum if we are in recovery mode */
795         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
796                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
797                 return;
798         }
799
800         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
801         if (child_ctx == NULL) {
802                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
803                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
804         }
805
806
807         ret = pipe(child_ctx->fd);
808         if (ret != 0) {
809                 talloc_free(child_ctx);
810                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
811                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
812                 return;
813         }
814
815         child_ctx->child_pid = fork();
816         if (child_ctx->child_pid == (pid_t)-1) {
817                 close(child_ctx->fd[0]);
818                 close(child_ctx->fd[1]);
819                 talloc_free(child_ctx);
820                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
821                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
822                 return;
823         }
824
825
826         if (child_ctx->child_pid == 0) {
827                 char cc = 0;
828                 close(child_ctx->fd[0]);
829
830                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
831         
832                 if (switch_from_server_to_client(ctdb) != 0) {
833                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
834                         _exit(1);
835                 }
836
837                 /* 
838                  * repack the db
839                  */
840                 cc = ctdb_repack_db(ctdb_db, child_ctx);
841
842                 write(child_ctx->fd[1], &cc, 1);
843                 _exit(0);
844         }
845
846         set_close_on_exec(child_ctx->fd[0]);
847         close(child_ctx->fd[1]);
848
849         child_ctx->status = VACUUM_RUNNING;
850         child_ctx->start_time = timeval_current();
851
852         talloc_set_destructor(child_ctx, vacuum_child_destructor);
853
854         event_add_timed(ctdb->ev, child_ctx,
855                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
856                 vacuum_child_timeout, child_ctx);
857
858         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
859
860         event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
861                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
862                 vacuum_child_handler,
863                 child_ctx);
864
865         vacuum_handle->child_ctx = child_ctx;
866         child_ctx->vacuum_handle = vacuum_handle;
867 }
868
869
870 /* this function initializes the vacuuming context for a database
871  * starts the vacuuming events
872  */
873 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
874 {
875         if (ctdb_db->persistent != 0) {
876                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
877                 return 0;
878         }
879
880         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
881         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
882
883         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
884
885         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
886                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
887                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
888
889         return 0;
890 }