vacuum: correctly send TRY_DELETE_RECORDS ctrl to all active nodes
[ctdb.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/tevent/tevent.h"
30 #include "../include/ctdb_private.h"
31 #include "../common/rb_tree.h"
32
33 #define TIMELIMIT() timeval_current_ofs(10, 0)
34 #define TUNINGDBNAME "vactune.tdb"
35
36 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
37
38 struct ctdb_vacuum_child_context {
39         struct ctdb_vacuum_child_context *next, *prev;
40         struct ctdb_vacuum_handle *vacuum_handle;
41         /* fd child writes status to */
42         int fd[2];
43         pid_t child_pid;
44         enum vacuum_child_status status;
45         struct timeval start_time;
46 };
47
48 struct ctdb_vacuum_handle {
49         struct ctdb_db_context *ctdb_db;
50         struct ctdb_vacuum_child_context *child_ctx;
51 };
52
53
54 /*  a list of records to possibly delete */
55 struct vacuum_data {
56         uint32_t vacuum_limit;
57         uint32_t repack_limit;
58         struct ctdb_context *ctdb;
59         struct ctdb_db_context *ctdb_db;
60         struct tdb_context *dest_db;
61         trbt_tree_t *delete_tree;
62         uint32_t delete_count;
63         struct ctdb_marshall_buffer **list;
64         struct timeval start;
65         bool traverse_error;
66         bool vacuum;
67         uint32_t total;
68         uint32_t vacuumed;
69         uint32_t copied;
70 };
71
72 /* tuning information stored for every db */
73 struct vacuum_tuning_data {
74         uint32_t last_num_repack;
75         uint32_t last_num_empty;
76         uint32_t last_interval;
77         uint32_t new_interval;
78         struct timeval last_start;
79         double   last_duration;
80 };
81
82 /* this structure contains the information for one record to be deleted */
83 struct delete_record_data {
84         struct ctdb_context *ctdb;
85         struct ctdb_db_context *ctdb_db;
86         struct ctdb_ltdb_header hdr;
87         TDB_DATA key;
88 };
89
90 struct delete_records_list {
91         struct ctdb_marshall_buffer *records;
92 };
93
94 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, 
95                                                           struct timeval t, void *private_data);
96
97
98 /*
99  * traverse function for gathering the records that can be deleted
100  */
101 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
102 {
103         struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
104         struct ctdb_context *ctdb = vdata->ctdb;
105         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
106         uint32_t lmaster;
107         struct ctdb_ltdb_header *hdr;
108         struct ctdb_rec_data *rec;
109         size_t old_size;
110                
111         lmaster = ctdb_lmaster(ctdb, &key);
112         if (lmaster >= ctdb->vnn_map->size) {
113                 return 0;
114         }
115
116         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
117                 /* its not a deleted record */
118                 return 0;
119         }
120
121         hdr = (struct ctdb_ltdb_header *)data.dptr;
122
123         if (hdr->dmaster != ctdb->pnn) {
124                 return 0;
125         }
126
127         /* Is this a record we could possibly delete? I.e.
128            if the record is empty and also we are both lmaster
129            and dmaster for the record we should be able to delete it
130         */
131         if (lmaster == ctdb->pnn) {
132                 uint32_t hash;
133
134                 hash = ctdb_hash(&key);
135                 if (trbt_lookup32(vdata->delete_tree, hash)) {
136                         DEBUG(DEBUG_DEBUG, (__location__ " Hash collission when vacuuming, skipping this record.\n"));
137                 } 
138                 else {
139                         struct delete_record_data *dd;
140
141                         /* store key and header indexed by the key hash */
142                         dd = talloc_zero(vdata->delete_tree, struct delete_record_data);
143                         if (dd == NULL) {
144                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
145                                 return -1;
146                         }
147                         dd->ctdb      = ctdb;
148                         dd->ctdb_db   = ctdb_db;
149                         dd->key.dsize = key.dsize;
150                         dd->key.dptr  = talloc_memdup(dd, key.dptr, key.dsize);
151                         if (dd->key.dptr == NULL) {
152                                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
153                                 return -1;
154                         }
155
156                         dd->hdr = *hdr;
157         
158                         trbt_insert32(vdata->delete_tree, hash, dd);
159
160                         vdata->delete_count++;
161                 }
162         }
163
164         /* add the record to the blob ready to send to the nodes */
165         rec = ctdb_marshall_record(vdata->list[lmaster], ctdb->pnn, key, NULL, tdb_null);
166         if (rec == NULL) {
167                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
168                 vdata->traverse_error = true;
169                 return -1;
170         }
171         old_size = talloc_get_size(vdata->list[lmaster]);
172         vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster], 
173                                                    old_size + rec->length);
174         if (vdata->list[lmaster] == NULL) {
175                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
176                 vdata->traverse_error = true;
177                 return -1;
178         }
179         vdata->list[lmaster]->count++;
180         memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
181         talloc_free(rec);
182
183         vdata->total++;
184
185         return 0;
186 }
187
188 /*
189  * traverse the tree of records to delete and marshall them into
190  * a blob
191  */
192 static void delete_traverse(void *param, void *data)
193 {
194         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
195         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
196         struct ctdb_rec_data *rec;
197         size_t old_size;
198
199         rec = ctdb_marshall_record(dd, recs->records->db_id, dd->key, &dd->hdr, tdb_null);
200         if (rec == NULL) {
201                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
202                 return;
203         }
204
205         old_size = talloc_get_size(recs->records);
206         recs->records = talloc_realloc_size(NULL, recs->records, old_size + rec->length);
207         if (recs->records == NULL) {
208                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
209                 return;
210         }
211         recs->records->count++;
212         memcpy(old_size+(uint8_t *)(recs->records), rec, rec->length);
213 }
214
215 /* 
216  * read-only traverse the database in order to find
217  * records that can be deleted and try to delete these
218  * records on the other nodes
219  * this executes in the child context
220  */
221 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata)
222 {
223         struct ctdb_context *ctdb = ctdb_db->ctdb;
224         const char *name = ctdb_db->db_name;
225         int ret, i, pnn;
226
227         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
228         if (ret != 0) {
229                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
230                 return ret;
231         }
232
233         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
234         if (pnn == -1) {
235                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
236                 return -1;
237         }
238
239         ctdb->pnn = pnn;
240         /* the list needs to be of length num_nodes */
241         vdata->list = talloc_array(vdata, struct ctdb_marshall_buffer *, ctdb->num_nodes);
242         if (vdata->list == NULL) {
243                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
244                 return -1;
245         }
246         for (i = 0; i < ctdb->num_nodes; i++) {
247                 vdata->list[i] = (struct ctdb_marshall_buffer *)
248                         talloc_zero_size(vdata->list, 
249                                                          offsetof(struct ctdb_marshall_buffer, data));
250                 if (vdata->list[i] == NULL) {
251                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
252                         return -1;
253                 }
254                 vdata->list[i]->db_id = ctdb_db->db_id;
255         }
256
257         /* read-only traverse, looking for records that might be able to be vacuumed */
258         if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
259             vdata->traverse_error) {
260                 DEBUG(DEBUG_ERR,(__location__ " Traverse error in vacuuming '%s'\n", name));
261                 return -1;              
262         }
263
264         for (i = 0; i < ctdb->num_nodes; i++) {
265                 if (vdata->list[i]->count == 0) {
266                         continue;
267                 }
268
269                 /* for records where we are not the lmaster, tell the lmaster to fetch the record */
270                 if (ctdb->nodes[i]->pnn != ctdb->pnn) {
271                         TDB_DATA data;
272                         DEBUG(DEBUG_INFO,
273                               ("Found %u records for lmaster %u in '%s'\n",
274                                vdata->list[i]->count, ctdb->nodes[i]->pnn,
275                                name));
276
277                         data.dsize = talloc_get_size(vdata->list[i]);
278                         data.dptr  = (void *)vdata->list[i];
279                         if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn, CTDB_SRVID_VACUUM_FETCH, data) != 0) {
280                                 DEBUG(DEBUG_ERR,(__location__ " Failed to send vacuum fetch message to %u\n",
281                                          ctdb->nodes[i]->pnn));
282                                 return -1;              
283                         }
284                         continue;
285                 }
286         }       
287
288         /* Process all records we can delete (if any) */
289         if (vdata->delete_count > 0) {
290                 struct delete_records_list *recs;
291                 TDB_DATA indata, outdata;
292                 int32_t res;
293                 struct ctdb_node_map *nodemap;
294                 uint32_t *active_nodes;
295                 int num_active_nodes;
296
297                 recs = talloc_zero(vdata, struct delete_records_list);
298                 if (recs == NULL) {
299                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
300                         return -1;
301                 }
302                 recs->records = (struct ctdb_marshall_buffer *)
303                         talloc_zero_size(vdata, 
304                                     offsetof(struct ctdb_marshall_buffer, data));
305                 if (recs->records == NULL) {
306                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
307                         return -1;
308                 }
309                 recs->records->db_id = ctdb_db->db_id;
310
311                 /* 
312                  * traverse the tree of all records we want to delete and
313                  * create a blob we can send to the other nodes.
314                  */
315                 trbt_traversearray32(vdata->delete_tree, 1, delete_traverse, recs);
316
317                 indata.dsize = talloc_get_size(recs->records);
318                 indata.dptr  = (void *)recs->records;
319
320                 /* 
321                  * now tell all the active nodes to delete all these records
322                  * (if possible)
323                  */
324
325                 ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
326                                            CTDB_CURRENT_NODE,
327                                            recs, /* talloc context */
328                                            &nodemap);
329                 if (ret != 0) {
330                         DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
331                         return -1;
332                 }
333
334                 active_nodes = list_of_active_nodes(ctdb, nodemap,
335                                                     nodemap, /* talloc context */
336                                                     false /* include self */);
337                 /* yuck! ;-) */
338                 num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
339
340                 for (i = 0; i < num_active_nodes; i++) {
341                         struct ctdb_marshall_buffer *records;
342                         struct ctdb_rec_data *rec;
343
344                         ret = ctdb_control(ctdb, active_nodes[i], 0,
345                                         CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
346                                         indata, recs, &outdata, &res,
347                                         NULL, NULL);
348                         if (ret != 0 || res != 0) {
349                                 DEBUG(DEBUG_ERR, ("Failed to delete records on "
350                                                   "node %u: ret[%d] res[%d]\n",
351                                                   active_nodes[i], ret, res));
352                                 return -1;
353                         }
354
355                         /* 
356                          * outdata countains the list of records coming back
357                          * from the node which the node could not delete
358                          */
359                         records = (struct ctdb_marshall_buffer *)outdata.dptr;
360                         rec = (struct ctdb_rec_data *)&records->data[0];
361                         while (records->count-- > 1) {
362                                 TDB_DATA reckey, recdata;
363                                 struct ctdb_ltdb_header *rechdr;
364
365                                 reckey.dptr = &rec->data[0];
366                                 reckey.dsize = rec->keylen;
367                                 recdata.dptr = &rec->data[reckey.dsize];
368                                 recdata.dsize = rec->datalen;
369
370                                 if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
371                                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
372                                         return -1;
373                                 }
374                                 rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
375                                 recdata.dptr += sizeof(*rechdr);
376                                 recdata.dsize -= sizeof(*rechdr);
377
378                                 /* 
379                                  * that other node couldnt delete the record
380                                  * so we should delete it and thereby remove it from the tree
381                                  */
382                                 talloc_free(trbt_lookup32(vdata->delete_tree, ctdb_hash(&reckey)));
383
384                                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
385                         }           
386                 }
387
388                 /* free nodemap and active_nodes */
389                 talloc_free(nodemap);
390
391                 /* 
392                  * The only records remaining in the tree would be those
393                  * records where all other nodes could successfully
394                  * delete them, so we can safely delete them on the
395                  * lmaster as well. Deletion implictely happens while
396                  * we repack the database. The repack algorithm revisits 
397                  * the tree in order to find the records that don't need
398                  * to be copied / repacked.
399                  */
400         }
401
402         /* this ensures we run our event queue */
403         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
404
405         return 0;
406 }
407
408
409 /*
410  * traverse function for repacking
411  */
412 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
413 {
414         struct vacuum_data *vdata = (struct vacuum_data *)private;
415
416         if (vdata->vacuum) {
417                 uint32_t hash = ctdb_hash(&key);
418                 struct delete_record_data *kd;
419                 /*
420                  * check if we can ignore this record because it's in the delete_tree
421                  */
422                 kd = (struct delete_record_data *)trbt_lookup32(vdata->delete_tree, hash);
423                 /*
424                  * there might be hash collisions so we have to compare the keys here to be sure
425                  */
426                 if (kd && kd->key.dsize == key.dsize && memcmp(kd->key.dptr, key.dptr, key.dsize) == 0) {
427                         struct ctdb_ltdb_header *hdr = (struct ctdb_ltdb_header *)data.dptr;
428                         /*
429                          * we have to check if the record hasn't changed in the meantime in order to
430                          * savely remove it from the database
431                          */
432                         if (data.dsize == sizeof(struct ctdb_ltdb_header) &&
433                                 hdr->dmaster == kd->ctdb->pnn &&
434                                 ctdb_lmaster(kd->ctdb, &(kd->key)) == kd->ctdb->pnn &&
435                                 kd->hdr.rsn == hdr->rsn) {
436                                 vdata->vacuumed++;
437                                 return 0;
438                         }
439                 }
440         }
441         if (tdb_store(vdata->dest_db, key, data, TDB_INSERT) != 0) {
442                 vdata->traverse_error = true;
443                 return -1;
444         }
445         vdata->copied++;
446         return 0;
447 }
448
449 /*
450  * repack a tdb
451  */
452 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx, struct vacuum_data *vdata)
453 {
454         struct tdb_context *tmp_db;
455
456         if (tdb_transaction_start(tdb) != 0) {
457                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
458                 return -1;
459         }
460
461         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb),
462                           TDB_INTERNAL|TDB_DISALLOW_NESTING,
463                           O_RDWR|O_CREAT, 0);
464         if (tmp_db == NULL) {
465                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
466                 tdb_transaction_cancel(tdb);
467                 return -1;
468         }
469
470         vdata->traverse_error = false;
471         vdata->dest_db = tmp_db;
472         vdata->vacuum = true;
473         vdata->vacuumed = 0;
474         vdata->copied = 0;
475
476         /*
477          * repack and vacuum on-the-fly by not writing the records that are
478          * no longer needed
479          */
480         if (tdb_traverse_read(tdb, repack_traverse, vdata) == -1) {
481                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
482                 tdb_transaction_cancel(tdb);
483                 tdb_close(tmp_db);
484                 return -1;              
485         }
486
487         DEBUG(DEBUG_INFO,(__location__ " %u records vacuumed\n", vdata->vacuumed));
488         
489         if (vdata->traverse_error) {
490                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
491                 tdb_transaction_cancel(tdb);
492                 tdb_close(tmp_db);
493                 return -1;
494         }
495
496         if (tdb_wipe_all(tdb) != 0) {
497                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
498                 tdb_transaction_cancel(tdb);
499                 tdb_close(tmp_db);
500                 return -1;
501         }
502
503         vdata->traverse_error = false;
504         vdata->dest_db = tdb;
505         vdata->vacuum = false;
506         vdata->copied = 0;
507
508         if (tdb_traverse_read(tmp_db, repack_traverse, vdata) == -1) {
509                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
510                 tdb_transaction_cancel(tdb);
511                 tdb_close(tmp_db);
512                 return -1;              
513         }
514
515         if (vdata->traverse_error) {
516                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
517                 tdb_transaction_cancel(tdb);
518                 tdb_close(tmp_db);
519                 return -1;
520         }
521
522         tdb_close(tmp_db);
523
524
525         if (tdb_transaction_commit(tdb) != 0) {
526                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
527                 return -1;
528         }
529         DEBUG(DEBUG_INFO,(__location__ " %u records copied\n", vdata->copied));
530
531         return 0;
532 }
533
534 static int update_tuning_db(struct ctdb_db_context *ctdb_db, struct vacuum_data *vdata, uint32_t freelist)
535 {
536         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
537         TDB_CONTEXT *tune_tdb;
538         TDB_DATA key, value;
539         struct vacuum_tuning_data tdata;
540         struct vacuum_tuning_data *tptr;
541         char *vac_dbname;
542         int flags;
543
544         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u",
545                                      ctdb_db->ctdb->db_directory_state,
546                                      TUNINGDBNAME, ctdb_db->ctdb->pnn);
547         if (vac_dbname == NULL) {
548                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552
553         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
554         flags |= TDB_DISALLOW_NESTING;
555         tune_tdb = tdb_open(vac_dbname, 0,
556                             flags,
557                             O_RDWR|O_CREAT, 0600);
558         if (tune_tdb == NULL) {
559                 DEBUG(DEBUG_ERR,(__location__ " Failed to create/open %s\n", TUNINGDBNAME));
560                 talloc_free(tmp_ctx);
561                 return -1;
562         }
563         
564         if (tdb_transaction_start(tune_tdb) != 0) {
565                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
566                 tdb_close(tune_tdb);
567                 return -1;
568         }
569         key.dptr = discard_const(ctdb_db->db_name);
570         key.dsize = strlen(ctdb_db->db_name);
571         value = tdb_fetch(tune_tdb, key);
572
573         if (value.dptr != NULL && value.dsize == sizeof(struct vacuum_tuning_data)) {
574                 tptr = (struct vacuum_tuning_data *)value.dptr;
575                 tdata = *tptr;
576
577                 /*
578                  * re-calc new vacuum interval:
579                  * in case no limit was reached we continously increase the interval
580                  * until vacuum_max_interval is reached
581                  * in case a limit was reached we divide the current interval by 2
582                  * unless vacuum_min_interval is reached
583                  */
584                 if (freelist < vdata->repack_limit &&
585                     vdata->delete_count < vdata->vacuum_limit) {
586                         if (tdata.last_interval < ctdb_db->ctdb->tunable.vacuum_max_interval) {
587                                 tdata.new_interval = tdata.last_interval * 110 / 100;
588                                 DEBUG(DEBUG_INFO,("Increasing vacuum interval %u -> %u for %s\n", 
589                                         tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
590                         }
591                 } else {
592                         tdata.new_interval = tdata.last_interval / 2;
593                         if (tdata.new_interval < ctdb_db->ctdb->tunable.vacuum_min_interval ||
594                                 tdata.new_interval > ctdb_db->ctdb->tunable.vacuum_max_interval) {
595                                 tdata.new_interval = ctdb_db->ctdb->tunable.vacuum_min_interval;
596                         }               
597                         DEBUG(DEBUG_INFO,("Decreasing vacuum interval %u -> %u for %s\n", 
598                                          tdata.last_interval, tdata.new_interval, ctdb_db->db_name));
599                 }
600                 tdata.last_interval = tdata.new_interval;
601         } else {
602                 DEBUG(DEBUG_DEBUG,(__location__ " Cannot find tunedb record for %s. Using default interval\n", ctdb_db->db_name));
603                 tdata.last_num_repack = freelist;
604                 tdata.last_num_empty = vdata->delete_count;
605                 tdata.last_interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
606         }
607
608         if (value.dptr != NULL) {
609                 free(value.dptr);
610         }
611
612         tdata.last_start = vdata->start;
613         tdata.last_duration = timeval_elapsed(&vdata->start);
614
615         value.dptr = (unsigned char *)&tdata;
616         value.dsize = sizeof(tdata);
617
618         if (tdb_store(tune_tdb, key, value, 0) != 0) {
619                 DEBUG(DEBUG_ERR,(__location__ " Unable to store tundb record for %s\n", ctdb_db->db_name));
620                 tdb_transaction_cancel(tune_tdb);
621                 tdb_close(tune_tdb);
622                 talloc_free(tmp_ctx);
623                 return -1;
624         }
625         tdb_transaction_commit(tune_tdb);
626         tdb_close(tune_tdb);
627         talloc_free(tmp_ctx);
628
629         return 0;
630 }
631
632 /*
633  * repack and vaccum a db
634  * called from the child context
635  */
636 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
637 {
638         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
639         uint32_t vacuum_limit = ctdb_db->ctdb->tunable.vacuum_limit;
640         const char *name = ctdb_db->db_name;
641         int size;
642         struct vacuum_data *vdata;
643
644         size = tdb_freelist_size(ctdb_db->ltdb->tdb);
645         if (size == -1) {
646                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
647                 return -1;
648         }
649
650         vdata = talloc_zero(mem_ctx, struct vacuum_data);
651         if (vdata == NULL) {
652                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
653                 return -1;
654         }
655
656         vdata->ctdb = ctdb_db->ctdb;
657         vdata->vacuum_limit = vacuum_limit;
658         vdata->repack_limit = repack_limit;
659         vdata->delete_tree = trbt_create(vdata, 0);
660         vdata->ctdb_db = ctdb_db;
661         if (vdata->delete_tree == NULL) {
662                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
663                 talloc_free(vdata);
664                 return -1;
665         }
666
667         vdata->start = timeval_current();
668  
669         /*
670          * gather all records that can be deleted in vdata
671          */
672         if (ctdb_vacuum_db(ctdb_db, vdata) != 0) {
673                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
674         }
675
676         /*
677          * decide if a repack is necessary
678          */
679         if (size < repack_limit && vdata->delete_count < vacuum_limit) {
680                 update_tuning_db(ctdb_db, vdata, size);
681                 talloc_free(vdata);
682                 return 0;
683         }
684
685         DEBUG(DEBUG_INFO,("Repacking %s with %u freelist entries and %u records to delete\n", 
686                         name, size, vdata->delete_count));
687
688         /*
689          * repack and implicitely get rid of the records we can delete
690          */
691         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx, vdata) != 0) {
692                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
693                 update_tuning_db(ctdb_db, vdata, size);
694                 talloc_free(vdata);
695                 return -1;
696         }
697         update_tuning_db(ctdb_db, vdata, size);
698         talloc_free(vdata);
699
700         return 0;
701 }
702
703 static int get_vacuum_interval(struct ctdb_db_context *ctdb_db)
704 {
705         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
706         TDB_CONTEXT *tdb;
707         TDB_DATA key, value;
708         char *vac_dbname;
709         uint interval = ctdb_db->ctdb->tunable.vacuum_default_interval;
710         struct ctdb_context *ctdb = ctdb_db->ctdb;
711         int flags;
712
713         vac_dbname = talloc_asprintf(tmp_ctx, "%s/%s.%u", ctdb->db_directory, TUNINGDBNAME, ctdb->pnn);
714         if (vac_dbname == NULL) {
715                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory error while allocating '%s'\n", vac_dbname));
716                 talloc_free(tmp_ctx);
717                 return interval;
718         }
719
720         flags  = ctdb_db->ctdb->valgrinding ? TDB_NOMMAP : 0;
721         flags |= TDB_DISALLOW_NESTING;
722         tdb = tdb_open(vac_dbname, 0,
723                        flags,
724                        O_RDWR|O_CREAT, 0600);
725         if (!tdb) {
726                 DEBUG(DEBUG_ERR,("Unable to open/create database %s using default interval. Errno : %s (%d)\n", vac_dbname, strerror(errno), errno));
727                 talloc_free(tmp_ctx);
728                 return interval;
729         }
730
731         key.dptr = discard_const(ctdb_db->db_name);
732         key.dsize = strlen(ctdb_db->db_name);
733
734         value = tdb_fetch(tdb, key);
735
736         if (value.dptr != NULL) {
737                 if (value.dsize == sizeof(struct vacuum_tuning_data)) {
738                         struct vacuum_tuning_data *tptr = (struct vacuum_tuning_data *)value.dptr;
739
740                         interval = tptr->new_interval;
741
742                         if (interval < ctdb->tunable.vacuum_min_interval) {
743                                 interval = ctdb->tunable.vacuum_min_interval;
744                         } 
745                         if (interval > ctdb->tunable.vacuum_max_interval) {
746                                 interval = ctdb->tunable.vacuum_max_interval;
747                         }
748                 }
749                 free(value.dptr);
750         }
751         tdb_close(tdb);
752
753         talloc_free(tmp_ctx);
754
755         return interval;
756 }
757
758 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
759 {
760         double l = timeval_elapsed(&child_ctx->start_time);
761         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
762         struct ctdb_context *ctdb = ctdb_db->ctdb;
763
764         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
765
766         if (child_ctx->child_pid != -1) {
767                 kill(child_ctx->child_pid, SIGKILL);
768         }
769
770         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
771
772         event_add_timed(ctdb->ev, child_ctx->vacuum_handle,
773                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
774                         ctdb_vacuum_event, child_ctx->vacuum_handle);
775
776         return 0;
777 }
778
779 /*
780  * this event is generated when a vacuum child process times out
781  */
782 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
783                                          struct timeval t, void *private_data)
784 {
785         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
786
787         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
788
789         child_ctx->status = VACUUM_TIMEOUT;
790
791         talloc_free(child_ctx);
792 }
793
794
795 /*
796  * this event is generated when a vacuum child process has completed
797  */
798 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
799                              uint16_t flags, void *private_data)
800 {
801         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
802         char c = 0;
803         int ret;
804
805         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
806         child_ctx->child_pid = -1;
807
808         ret = read(child_ctx->fd[0], &c, 1);
809         if (ret != 1 || c != 0) {
810                 child_ctx->status = VACUUM_ERROR;
811                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
812         } else {
813                 child_ctx->status = VACUUM_OK;
814         }
815
816         talloc_free(child_ctx);
817 }
818
819 /*
820  * this event is called every time we need to start a new vacuum process
821  */
822 static void
823 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
824                                struct timeval t, void *private_data)
825 {
826         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
827         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
828         struct ctdb_context *ctdb = ctdb_db->ctdb;
829         struct ctdb_vacuum_child_context *child_ctx;
830         struct tevent_fd *fde;
831         int ret;
832
833         /* we dont vacuum if we are in recovery mode, or db frozen */
834         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
835             ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_NONE) {
836                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
837                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ? "in recovery"
838                                    : ctdb->freeze_mode[ctdb_db->priority] == CTDB_FREEZE_PENDING
839                                    ? "freeze pending"
840                                    : "frozen"));
841                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
842                 return;
843         }
844
845         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
846         if (child_ctx == NULL) {
847                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
848                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
849         }
850
851
852         ret = pipe(child_ctx->fd);
853         if (ret != 0) {
854                 talloc_free(child_ctx);
855                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
856                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
857                 return;
858         }
859
860         child_ctx->child_pid = ctdb_fork(ctdb);
861         if (child_ctx->child_pid == (pid_t)-1) {
862                 close(child_ctx->fd[0]);
863                 close(child_ctx->fd[1]);
864                 talloc_free(child_ctx);
865                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
866                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
867                 return;
868         }
869
870
871         if (child_ctx->child_pid == 0) {
872                 char cc = 0;
873                 close(child_ctx->fd[0]);
874
875                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
876         
877                 if (switch_from_server_to_client(ctdb, "vacuum-%s", ctdb_db->db_name) != 0) {
878                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
879                         _exit(1);
880                 }
881
882                 /* 
883                  * repack the db
884                  */
885                 cc = ctdb_repack_db(ctdb_db, child_ctx);
886
887                 write(child_ctx->fd[1], &cc, 1);
888                 _exit(0);
889         }
890
891         set_close_on_exec(child_ctx->fd[0]);
892         close(child_ctx->fd[1]);
893
894         child_ctx->status = VACUUM_RUNNING;
895         child_ctx->start_time = timeval_current();
896
897         DLIST_ADD(ctdb->vacuumers, child_ctx);
898         talloc_set_destructor(child_ctx, vacuum_child_destructor);
899
900         event_add_timed(ctdb->ev, child_ctx,
901                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
902                 vacuum_child_timeout, child_ctx);
903
904         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
905
906         fde = event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
907                            EVENT_FD_READ, vacuum_child_handler, child_ctx);
908         tevent_fd_set_auto_close(fde);
909
910         vacuum_handle->child_ctx = child_ctx;
911         child_ctx->vacuum_handle = vacuum_handle;
912 }
913
914 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
915 {
916         /* Simply free them all. */
917         while (ctdb->vacuumers) {
918                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
919                            ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
920                            (int)ctdb->vacuumers->child_pid));
921                 /* vacuum_child_destructor kills it, removes from list */
922                 talloc_free(ctdb->vacuumers);
923         }
924 }
925
926 /* this function initializes the vacuuming context for a database
927  * starts the vacuuming events
928  */
929 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
930 {
931         if (ctdb_db->persistent != 0) {
932                 DEBUG(DEBUG_ERR,("Vacuuming is disabled for persistent database %s\n", ctdb_db->db_name));
933                 return 0;
934         }
935
936         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
937         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
938
939         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
940
941         event_add_timed(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle, 
942                         timeval_current_ofs(get_vacuum_interval(ctdb_db), 0), 
943                         ctdb_vacuum_event, ctdb_db->vacuum_handle);
944
945         return 0;
946 }