ctdb-daemon: Fix signed/unsigned comparisons by declaring as unsigned
[samba.git] / ctdb / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5    Copyright (C) Michael Adam 2010-2013
6    Copyright (C) Stefan Metzmacher 2010-2011
7
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "replace.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/time.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/util/sys_rw.h"
35 #include "lib/util/util_process.h"
36
37 #include "ctdb_private.h"
38 #include "ctdb_client.h"
39
40 #include "common/rb_tree.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #define TIMELIMIT() timeval_current_ofs(10, 0)
45
46 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
47
48 struct ctdb_vacuum_child_context {
49         struct ctdb_vacuum_child_context *next, *prev;
50         struct ctdb_vacuum_handle *vacuum_handle;
51         /* fd child writes status to */
52         int fd[2];
53         pid_t child_pid;
54         enum vacuum_child_status status;
55         struct timeval start_time;
56 };
57
58 struct ctdb_vacuum_handle {
59         struct ctdb_db_context *ctdb_db;
60         struct ctdb_vacuum_child_context *child_ctx;
61         uint32_t fast_path_count;
62 };
63
64
65 /*  a list of records to possibly delete */
66 struct vacuum_data {
67         struct ctdb_context *ctdb;
68         struct ctdb_db_context *ctdb_db;
69         struct tdb_context *dest_db;
70         trbt_tree_t *delete_list;
71         struct ctdb_marshall_buffer **vacuum_fetch_list;
72         struct timeval start;
73         bool traverse_error;
74         bool vacuum;
75         struct {
76                 struct {
77                         uint32_t added_to_vacuum_fetch_list;
78                         uint32_t added_to_delete_list;
79                         uint32_t deleted;
80                         uint32_t skipped;
81                         uint32_t error;
82                         uint32_t total;
83                 } delete_queue;
84                 struct {
85                         uint32_t scheduled;
86                         uint32_t skipped;
87                         uint32_t error;
88                         uint32_t total;
89                 } db_traverse;
90                 struct {
91                         uint32_t total;
92                         uint32_t remote_error;
93                         uint32_t local_error;
94                         uint32_t deleted;
95                         uint32_t skipped;
96                         uint32_t left;
97                 } delete_list;
98                 struct {
99                         uint32_t vacuumed;
100                         uint32_t copied;
101                 } repack;
102         } count;
103 };
104
105 /* this structure contains the information for one record to be deleted */
106 struct delete_record_data {
107         struct ctdb_context *ctdb;
108         struct ctdb_db_context *ctdb_db;
109         struct ctdb_ltdb_header hdr;
110         uint32_t remote_fail_count;
111         TDB_DATA key;
112         uint8_t keydata[1];
113 };
114
115 struct delete_records_list {
116         struct ctdb_marshall_buffer *records;
117         struct vacuum_data *vdata;
118 };
119
120 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
121                                            const struct ctdb_ltdb_header *hdr,
122                                            TDB_DATA key);
123
124 /**
125  * Store key and header in a tree, indexed by the key hash.
126  */
127 static int insert_delete_record_data_into_tree(struct ctdb_context *ctdb,
128                                                struct ctdb_db_context *ctdb_db,
129                                                trbt_tree_t *tree,
130                                                const struct ctdb_ltdb_header *hdr,
131                                                TDB_DATA key)
132 {
133         struct delete_record_data *dd;
134         uint32_t hash;
135         size_t len;
136
137         len = offsetof(struct delete_record_data, keydata) + key.dsize;
138
139         dd = (struct delete_record_data *)talloc_size(tree, len);
140         if (dd == NULL) {
141                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
142                 return -1;
143         }
144         talloc_set_name_const(dd, "struct delete_record_data");
145
146         dd->ctdb      = ctdb;
147         dd->ctdb_db   = ctdb_db;
148         dd->key.dsize = key.dsize;
149         dd->key.dptr  = dd->keydata;
150         memcpy(dd->keydata, key.dptr, key.dsize);
151
152         dd->hdr = *hdr;
153         dd->remote_fail_count = 0;
154
155         hash = ctdb_hash(&key);
156
157         trbt_insert32(tree, hash, dd);
158
159         return 0;
160 }
161
162 static int add_record_to_delete_list(struct vacuum_data *vdata, TDB_DATA key,
163                                      struct ctdb_ltdb_header *hdr)
164 {
165         struct ctdb_context *ctdb = vdata->ctdb;
166         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
167         uint32_t hash;
168         int ret;
169
170         hash = ctdb_hash(&key);
171
172         if (trbt_lookup32(vdata->delete_list, hash)) {
173                 DEBUG(DEBUG_INFO, (__location__ " Hash collision when vacuuming, skipping this record.\n"));
174                 return 0;
175         }
176
177         ret = insert_delete_record_data_into_tree(ctdb, ctdb_db,
178                                                   vdata->delete_list,
179                                                   hdr, key);
180         if (ret != 0) {
181                 return -1;
182         }
183
184         vdata->count.delete_list.total++;
185
186         return 0;
187 }
188
189 /**
190  * Add a record to the list of records to be sent
191  * to their lmaster with VACUUM_FETCH.
192  */
193 static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
194                                            TDB_DATA key)
195 {
196         struct ctdb_context *ctdb = vdata->ctdb;
197         uint32_t lmaster;
198         struct ctdb_marshall_buffer *vfl;
199
200         lmaster = ctdb_lmaster(ctdb, &key);
201
202         vfl = vdata->vacuum_fetch_list[lmaster];
203
204         vfl = ctdb_marshall_add(ctdb, vfl, vfl->db_id, ctdb->pnn,
205                                 key, NULL, tdb_null);
206         if (vfl == NULL) {
207                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
208                 vdata->traverse_error = true;
209                 return -1;
210         }
211
212         vdata->vacuum_fetch_list[lmaster] = vfl;
213
214         return 0;
215 }
216
217
218 static void ctdb_vacuum_event(struct tevent_context *ev,
219                               struct tevent_timer *te,
220                               struct timeval t, void *private_data);
221
222 static int vacuum_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
223 {
224         struct ctdb_ltdb_header *header =
225                 (struct ctdb_ltdb_header *)private_data;
226
227         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
228                 return -1;
229         }
230
231         *header = *(struct ctdb_ltdb_header *)data.dptr;
232
233         return 0;
234 }
235
236 /*
237  * traverse function for gathering the records that can be deleted
238  */
239 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
240                            void *private_data)
241 {
242         struct vacuum_data *vdata = talloc_get_type(private_data,
243                                                     struct vacuum_data);
244         struct ctdb_context *ctdb = vdata->ctdb;
245         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
246         uint32_t lmaster;
247         struct ctdb_ltdb_header *hdr;
248         int res = 0;
249
250         vdata->count.db_traverse.total++;
251
252         lmaster = ctdb_lmaster(ctdb, &key);
253         if (lmaster >= ctdb->num_nodes) {
254                 vdata->count.db_traverse.error++;
255                 DEBUG(DEBUG_CRIT, (__location__
256                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
257                                    " with hash[%u]!\n",
258                                    (unsigned)lmaster,
259                                    (unsigned)ctdb->num_nodes,
260                                    (unsigned)ctdb_hash(&key)));
261                 return -1;
262         }
263
264         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
265                 /* it is not a deleted record */
266                 vdata->count.db_traverse.skipped++;
267                 return 0;
268         }
269
270         hdr = (struct ctdb_ltdb_header *)data.dptr;
271
272         if (hdr->dmaster != ctdb->pnn) {
273                 vdata->count.db_traverse.skipped++;
274                 return 0;
275         }
276
277         /*
278          * Add the record to this process's delete_queue for processing
279          * in the subsequent traverse in the fast vacuum run.
280          */
281         res = insert_record_into_delete_queue(ctdb_db, hdr, key);
282         if (res != 0) {
283                 vdata->count.db_traverse.error++;
284         } else {
285                 vdata->count.db_traverse.scheduled++;
286         }
287
288         return 0;
289 }
290
291 /*
292  * traverse the tree of records to delete and marshall them into
293  * a blob
294  */
295 static int delete_marshall_traverse(void *param, void *data)
296 {
297         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
298         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
299         struct ctdb_marshall_buffer *m;
300
301         m = ctdb_marshall_add(recs, recs->records, recs->records->db_id,
302                               recs->records->db_id,
303                               dd->key, &dd->hdr, tdb_null);
304         if (m == NULL) {
305                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
306                 return -1;
307         }
308
309         recs->records = m;
310         return 0;
311 }
312
313 /**
314  * traverse function for the traversal of the delete_queue,
315  * the fast-path vacuuming list.
316  *
317  *  - If the record has been migrated off the node
318  *    or has been revived (filled with data) on the node,
319  *    then skip the record.
320  *
321  *  - If the current node is the record's lmaster and it is
322  *    a record that has never been migrated with data, then
323  *    delete the record from the local tdb.
324  *
325  *  - If the current node is the record's lmaster and it has
326  *    been migrated with data, then schedule it for the normal
327  *    vacuuming procedure (i.e. add it to the delete_list).
328  *
329  *  - If the current node is NOT the record's lmaster then
330  *    add it to the list of records that are to be sent to
331  *    the lmaster with the VACUUM_FETCH message.
332  */
333 static int delete_queue_traverse(void *param, void *data)
334 {
335         struct delete_record_data *dd =
336                 talloc_get_type(data, struct delete_record_data);
337         struct vacuum_data *vdata = talloc_get_type(param, struct vacuum_data);
338         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
339         struct ctdb_context *ctdb = ctdb_db->ctdb; /* or dd->ctdb ??? */
340         int res;
341         struct ctdb_ltdb_header header;
342         uint32_t lmaster;
343         uint32_t hash = ctdb_hash(&(dd->key));
344
345         vdata->count.delete_queue.total++;
346
347         res = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key);
348         if (res != 0) {
349                 vdata->count.delete_queue.error++;
350                 return 0;
351         }
352
353         res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
354                                vacuum_record_parser, &header);
355         if (res != 0) {
356                 goto skipped;
357         }
358
359         if (header.dmaster != ctdb->pnn) {
360                 /* The record has been migrated off the node. Skip. */
361                 goto skipped;
362         }
363
364         if (header.rsn != dd->hdr.rsn) {
365                 /*
366                  * The record has been migrated off the node and back again.
367                  * But not requeued for deletion. Skip it.
368                  */
369                 goto skipped;
370         }
371
372         /*
373          * We are dmaster, and the record has no data, and it has
374          * not been migrated after it has been queued for deletion.
375          *
376          * At this stage, the record could still have been revived locally
377          * and last been written with empty data. This can only be
378          * fixed with the addition of an active or delete flag. (TODO)
379          */
380
381         lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
382
383         if (lmaster != ctdb->pnn) {
384                 res = add_record_to_vacuum_fetch_list(vdata, dd->key);
385
386                 if (res != 0) {
387                         DEBUG(DEBUG_ERR,
388                               (__location__ " Error adding record to list "
389                                "of records to send to lmaster.\n"));
390                         vdata->count.delete_queue.error++;
391                 } else {
392                         vdata->count.delete_queue.added_to_vacuum_fetch_list++;
393                 }
394                 goto done;
395         }
396
397         /* use header->flags or dd->hdr.flags ?? */
398         if (dd->hdr.flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
399                 res = add_record_to_delete_list(vdata, dd->key, &dd->hdr);
400
401                 if (res != 0) {
402                         DEBUG(DEBUG_ERR,
403                               (__location__ " Error adding record to list "
404                                "of records for deletion on lmaster.\n"));
405                         vdata->count.delete_queue.error++;
406                 } else {
407                         vdata->count.delete_queue.added_to_delete_list++;
408                 }
409         } else {
410                 res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
411
412                 if (res != 0) {
413                         DEBUG(DEBUG_ERR,
414                               (__location__ " Error deleting record with key "
415                                "hash [0x%08x] from local data base db[%s].\n",
416                                hash, ctdb_db->db_name));
417                         vdata->count.delete_queue.error++;
418                         goto done;
419                 }
420
421                 DEBUG(DEBUG_DEBUG,
422                       (__location__ " Deleted record with key hash "
423                        "[0x%08x] from local data base db[%s].\n",
424                        hash, ctdb_db->db_name));
425                 vdata->count.delete_queue.deleted++;
426         }
427
428         goto done;
429
430 skipped:
431         vdata->count.delete_queue.skipped++;
432
433 done:
434         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
435
436         return 0;
437 }
438
439 /**
440  * Delete the records that we are lmaster and dmaster for and
441  * that could be deleted on all other nodes via the TRY_DELETE_RECORDS
442  * control.
443  */
444 static int delete_record_traverse(void *param, void *data)
445 {
446         struct delete_record_data *dd =
447                 talloc_get_type(data, struct delete_record_data);
448         struct vacuum_data *vdata = talloc_get_type(param, struct vacuum_data);
449         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
450         struct ctdb_context *ctdb = ctdb_db->ctdb;
451         int res;
452         struct ctdb_ltdb_header header;
453         uint32_t lmaster;
454         uint32_t hash = ctdb_hash(&(dd->key));
455
456         if (dd->remote_fail_count > 0) {
457                 vdata->count.delete_list.remote_error++;
458                 vdata->count.delete_list.left--;
459                 talloc_free(dd);
460                 return 0;
461         }
462
463         res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
464         if (res != 0) {
465                 DEBUG(DEBUG_ERR,
466                       (__location__ " Error getting chainlock on record with "
467                        "key hash [0x%08x] on database db[%s].\n",
468                        hash, ctdb_db->db_name));
469                 vdata->count.delete_list.local_error++;
470                 vdata->count.delete_list.left--;
471                 talloc_free(dd);
472                 return 0;
473         }
474
475         /*
476          * Verify that the record is still empty, its RSN has not
477          * changed and that we are still its lmaster and dmaster.
478          */
479
480         res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
481                                vacuum_record_parser, &header);
482         if (res != 0) {
483                 goto skip;
484         }
485
486         if (header.flags & CTDB_REC_RO_FLAGS) {
487                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
488                                    "on database db[%s] has read-only flags. "
489                                    "skipping.\n",
490                                    hash, ctdb_db->db_name));
491                 goto skip;
492         }
493
494         if (header.dmaster != ctdb->pnn) {
495                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
496                                    "on database db[%s] has been migrated away. "
497                                    "skipping.\n",
498                                    hash, ctdb_db->db_name));
499                 goto skip;
500         }
501
502         if (header.rsn != dd->hdr.rsn) {
503                 /*
504                  * The record has been migrated off the node and back again.
505                  * But not requeued for deletion. Skip it.
506                  */
507                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
508                                    "on database db[%s] seems to have been "
509                                    "migrated away and back again (with empty "
510                                    "data). skipping.\n",
511                                    hash, ctdb_db->db_name));
512                 goto skip;
513         }
514
515         lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
516
517         if (lmaster != ctdb->pnn) {
518                 DEBUG(DEBUG_INFO, (__location__ ": not lmaster for record in "
519                                    "delete list (key hash [0x%08x], db[%s]). "
520                                    "Strange! skipping.\n",
521                                    hash, ctdb_db->db_name));
522                 goto skip;
523         }
524
525         res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
526
527         if (res != 0) {
528                 DEBUG(DEBUG_ERR,
529                       (__location__ " Error deleting record with key hash "
530                        "[0x%08x] from local data base db[%s].\n",
531                        hash, ctdb_db->db_name));
532                 vdata->count.delete_list.local_error++;
533                 goto done;
534         }
535
536         DEBUG(DEBUG_DEBUG,
537               (__location__ " Deleted record with key hash [0x%08x] from "
538                "local data base db[%s].\n", hash, ctdb_db->db_name));
539
540         vdata->count.delete_list.deleted++;
541         goto done;
542
543 skip:
544         vdata->count.delete_list.skipped++;
545
546 done:
547         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
548
549         talloc_free(dd);
550         vdata->count.delete_list.left--;
551
552         return 0;
553 }
554
555 /**
556  * Traverse the delete_queue.
557  * Records are either deleted directly or filled
558  * into the delete list or the vacuum fetch lists
559  * for further processing.
560  */
561 static void ctdb_process_delete_queue(struct ctdb_db_context *ctdb_db,
562                                       struct vacuum_data *vdata)
563 {
564         uint32_t sum;
565         int ret;
566
567         ret = trbt_traversearray32(ctdb_db->delete_queue, 1,
568                                    delete_queue_traverse, vdata);
569
570         if (ret != 0) {
571                 DEBUG(DEBUG_ERR, (__location__ " Error traversing "
572                       "the delete queue.\n"));
573         }
574
575         sum = vdata->count.delete_queue.deleted
576             + vdata->count.delete_queue.skipped
577             + vdata->count.delete_queue.error
578             + vdata->count.delete_queue.added_to_delete_list
579             + vdata->count.delete_queue.added_to_vacuum_fetch_list;
580
581         if (vdata->count.delete_queue.total != sum) {
582                 DEBUG(DEBUG_ERR, (__location__ " Inconsistency in fast vacuum "
583                       "counts for db[%s]: total[%u] != sum[%u]\n",
584                       ctdb_db->db_name,
585                       (unsigned)vdata->count.delete_queue.total,
586                       (unsigned)sum));
587         }
588
589         if (vdata->count.delete_queue.total > 0) {
590                 DEBUG(DEBUG_INFO,
591                       (__location__
592                        " fast vacuuming delete_queue traverse statistics: "
593                        "db[%s] "
594                        "total[%u] "
595                        "del[%u] "
596                        "skp[%u] "
597                        "err[%u] "
598                        "adl[%u] "
599                        "avf[%u]\n",
600                        ctdb_db->db_name,
601                        (unsigned)vdata->count.delete_queue.total,
602                        (unsigned)vdata->count.delete_queue.deleted,
603                        (unsigned)vdata->count.delete_queue.skipped,
604                        (unsigned)vdata->count.delete_queue.error,
605                        (unsigned)vdata->count.delete_queue.added_to_delete_list,
606                        (unsigned)vdata->count.delete_queue.added_to_vacuum_fetch_list));
607         }
608
609         return;
610 }
611
612 /**
613  * read-only traverse of the database, looking for records that
614  * might be able to be vacuumed.
615  *
616  * This is not done each time but only every tunable
617  * VacuumFastPathCount times.
618  */
619 static void ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
620                                     struct vacuum_data *vdata)
621 {
622         int ret;
623
624         ret = tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata);
625         if (ret == -1 || vdata->traverse_error) {
626                 DEBUG(DEBUG_ERR, (__location__ " Traverse error in vacuuming "
627                                   "'%s'\n", ctdb_db->db_name));
628                 return;
629         }
630
631         if (vdata->count.db_traverse.total > 0) {
632                 DEBUG(DEBUG_INFO,
633                       (__location__
634                        " full vacuuming db traverse statistics: "
635                        "db[%s] "
636                        "total[%u] "
637                        "skp[%u] "
638                        "err[%u] "
639                        "sched[%u]\n",
640                        ctdb_db->db_name,
641                        (unsigned)vdata->count.db_traverse.total,
642                        (unsigned)vdata->count.db_traverse.skipped,
643                        (unsigned)vdata->count.db_traverse.error,
644                        (unsigned)vdata->count.db_traverse.scheduled));
645         }
646
647         return;
648 }
649
650 /**
651  * Process the vacuum fetch lists:
652  * For records for which we are not the lmaster, tell the lmaster to
653  * fetch the record.
654  */
655 static void ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
656                                             struct vacuum_data *vdata)
657 {
658         unsigned int i;
659         struct ctdb_context *ctdb = ctdb_db->ctdb;
660
661         for (i = 0; i < ctdb->num_nodes; i++) {
662                 TDB_DATA data;
663                 struct ctdb_marshall_buffer *vfl = vdata->vacuum_fetch_list[i];
664
665                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
666                         continue;
667                 }
668
669                 if (vfl->count == 0) {
670                         continue;
671                 }
672
673                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
674                                    vfl->count, ctdb->nodes[i]->pnn,
675                                    ctdb_db->db_name));
676
677                 data = ctdb_marshall_finish(vfl);
678                 if (ctdb_client_send_message(ctdb, ctdb->nodes[i]->pnn,
679                                              CTDB_SRVID_VACUUM_FETCH,
680                                              data) != 0)
681                 {
682                         DEBUG(DEBUG_ERR, (__location__ " Failed to send vacuum "
683                                           "fetch message to %u\n",
684                                           ctdb->nodes[i]->pnn));
685                 }
686         }
687
688         return;
689 }
690
691 /**
692  * Process the delete list:
693  *
694  * This is the last step of vacuuming that consistently deletes
695  * those records that have been migrated with data and can hence
696  * not be deleted when leaving a node.
697  *
698  * In this step, the lmaster does the final deletion of those empty
699  * records that it is also dmaster for. It has ususally received
700  * at least some of these records previously from the former dmasters
701  * with the vacuum fetch message.
702  *
703  *  1) Send the records to all active nodes with the TRY_DELETE_RECORDS
704  *     control. The remote notes delete their local copy.
705  *  2) The lmaster locally deletes its copies of all records that
706  *     could successfully be deleted remotely in step #2.
707  */
708 static void ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
709                                      struct vacuum_data *vdata)
710 {
711         int ret, i;
712         struct ctdb_context *ctdb = ctdb_db->ctdb;
713         struct delete_records_list *recs;
714         TDB_DATA indata;
715         struct ctdb_node_map_old *nodemap;
716         uint32_t *active_nodes;
717         int num_active_nodes;
718         TALLOC_CTX *tmp_ctx;
719         uint32_t sum;
720
721         if (vdata->count.delete_list.total == 0) {
722                 return;
723         }
724
725         tmp_ctx = talloc_new(vdata);
726         if (tmp_ctx == NULL) {
727                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
728                 return;
729         }
730
731         vdata->count.delete_list.left = vdata->count.delete_list.total;
732
733         /*
734          * get the list of currently active nodes
735          */
736
737         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
738                                    CTDB_CURRENT_NODE,
739                                    tmp_ctx,
740                                    &nodemap);
741         if (ret != 0) {
742                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
743                 goto done;
744         }
745
746         active_nodes = list_of_active_nodes(ctdb, nodemap,
747                                             nodemap, /* talloc context */
748                                             false /* include self */);
749         /* yuck! ;-) */
750         num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
751
752         /*
753          * Now delete the records all active nodes in a two-phase process:
754          * 1) tell all active remote nodes to delete all their copy
755          * 2) if all remote nodes deleted their record copy, delete it locally
756          */
757
758         recs = talloc_zero(tmp_ctx, struct delete_records_list);
759         if (recs == NULL) {
760                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
761                 goto done;
762         }
763
764         /*
765          * Step 1:
766          * Send all records to all active nodes for deletion.
767          */
768
769         /*
770          * Create a marshall blob from the remaining list of records to delete.
771          */
772
773         recs->records = (struct ctdb_marshall_buffer *)
774                 talloc_zero_size(recs,
775                                  offsetof(struct ctdb_marshall_buffer, data));
776         if (recs->records == NULL) {
777                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
778                 goto done;
779         }
780         recs->records->db_id = ctdb_db->db_id;
781
782         ret = trbt_traversearray32(vdata->delete_list, 1,
783                                    delete_marshall_traverse, recs);
784         if (ret != 0) {
785                 DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
786                       "delete list for second marshalling.\n"));
787                 goto done;
788         }
789
790         indata = ctdb_marshall_finish(recs->records);
791
792         for (i = 0; i < num_active_nodes; i++) {
793                 struct ctdb_marshall_buffer *records;
794                 struct ctdb_rec_data_old *rec;
795                 int32_t res;
796                 TDB_DATA outdata;
797
798                 ret = ctdb_control(ctdb, active_nodes[i], 0,
799                                 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
800                                 indata, recs, &outdata, &res,
801                                 NULL, NULL);
802                 if (ret != 0 || res != 0) {
803                         DEBUG(DEBUG_ERR, ("Failed to delete records on "
804                                           "node %u: ret[%d] res[%d]\n",
805                                           active_nodes[i], ret, res));
806                         goto done;
807                 }
808
809                 /*
810                  * outdata contains the list of records coming back
811                  * from the node: These are the records that the
812                  * remote node could not delete. We remove these from
813                  * the list to delete locally.
814                  */
815                 records = (struct ctdb_marshall_buffer *)outdata.dptr;
816                 rec = (struct ctdb_rec_data_old *)&records->data[0];
817                 while (records->count-- > 1) {
818                         TDB_DATA reckey, recdata;
819                         struct ctdb_ltdb_header *rechdr;
820                         struct delete_record_data *dd;
821
822                         reckey.dptr = &rec->data[0];
823                         reckey.dsize = rec->keylen;
824                         recdata.dptr = &rec->data[reckey.dsize];
825                         recdata.dsize = rec->datalen;
826
827                         if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
828                                 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
829                                 goto done;
830                         }
831                         rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
832                         recdata.dptr += sizeof(*rechdr);
833                         recdata.dsize -= sizeof(*rechdr);
834
835                         dd = (struct delete_record_data *)trbt_lookup32(
836                                         vdata->delete_list,
837                                         ctdb_hash(&reckey));
838                         if (dd != NULL) {
839                                 /*
840                                  * The remote node could not delete the
841                                  * record.  Since other remote nodes can
842                                  * also fail, we just mark the record.
843                                  */
844                                 dd->remote_fail_count++;
845                         } else {
846                                 DEBUG(DEBUG_ERR, (__location__ " Failed to "
847                                       "find record with hash 0x%08x coming "
848                                       "back from TRY_DELETE_RECORDS "
849                                       "control in delete list.\n",
850                                       ctdb_hash(&reckey)));
851                         }
852
853                         rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
854                 }
855         }
856
857         /*
858          * Step 2:
859          * Delete the remaining records locally.
860          *
861          * These records have successfully been deleted on all
862          * active remote nodes.
863          */
864
865         ret = trbt_traversearray32(vdata->delete_list, 1,
866                                    delete_record_traverse, vdata);
867         if (ret != 0) {
868                 DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
869                       "delete list for deletion.\n"));
870         }
871
872         if (vdata->count.delete_list.left != 0) {
873                 DEBUG(DEBUG_ERR, (__location__ " Vaccum db[%s] error: "
874                       "there are %u records left for deletion after "
875                       "processing delete list\n",
876                       ctdb_db->db_name,
877                       (unsigned)vdata->count.delete_list.left));
878         }
879
880         sum = vdata->count.delete_list.deleted
881             + vdata->count.delete_list.skipped
882             + vdata->count.delete_list.remote_error
883             + vdata->count.delete_list.local_error
884             + vdata->count.delete_list.left;
885
886         if (vdata->count.delete_list.total != sum) {
887                 DEBUG(DEBUG_ERR, (__location__ " Inconsistency in vacuum "
888                       "delete list counts for db[%s]: total[%u] != sum[%u]\n",
889                       ctdb_db->db_name,
890                       (unsigned)vdata->count.delete_list.total,
891                       (unsigned)sum));
892         }
893
894         if (vdata->count.delete_list.total > 0) {
895                 DEBUG(DEBUG_INFO,
896                       (__location__
897                        " vacuum delete list statistics: "
898                        "db[%s] "
899                        "total[%u] "
900                        "del[%u] "
901                        "skip[%u] "
902                        "rem.err[%u] "
903                        "loc.err[%u] "
904                        "left[%u]\n",
905                        ctdb_db->db_name,
906                        (unsigned)vdata->count.delete_list.total,
907                        (unsigned)vdata->count.delete_list.deleted,
908                        (unsigned)vdata->count.delete_list.skipped,
909                        (unsigned)vdata->count.delete_list.remote_error,
910                        (unsigned)vdata->count.delete_list.local_error,
911                        (unsigned)vdata->count.delete_list.left));
912         }
913
914 done:
915         talloc_free(tmp_ctx);
916
917         return;
918 }
919
920 /**
921  * initialize the vacuum_data
922  */
923 static struct vacuum_data *ctdb_vacuum_init_vacuum_data(
924                                         struct ctdb_db_context *ctdb_db,
925                                         TALLOC_CTX *mem_ctx)
926 {
927         unsigned int i;
928         struct ctdb_context *ctdb = ctdb_db->ctdb;
929         struct vacuum_data *vdata;
930
931         vdata = talloc_zero(mem_ctx, struct vacuum_data);
932         if (vdata == NULL) {
933                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
934                 return NULL;
935         }
936
937         vdata->ctdb = ctdb_db->ctdb;
938         vdata->ctdb_db = ctdb_db;
939         vdata->delete_list = trbt_create(vdata, 0);
940         if (vdata->delete_list == NULL) {
941                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
942                 goto fail;
943         }
944
945         vdata->start = timeval_current();
946
947         vdata->count.delete_queue.added_to_delete_list = 0;
948         vdata->count.delete_queue.added_to_vacuum_fetch_list = 0;
949         vdata->count.delete_queue.deleted = 0;
950         vdata->count.delete_queue.skipped = 0;
951         vdata->count.delete_queue.error = 0;
952         vdata->count.delete_queue.total = 0;
953         vdata->count.db_traverse.scheduled = 0;
954         vdata->count.db_traverse.skipped = 0;
955         vdata->count.db_traverse.error = 0;
956         vdata->count.db_traverse.total = 0;
957         vdata->count.delete_list.total = 0;
958         vdata->count.delete_list.left = 0;
959         vdata->count.delete_list.remote_error = 0;
960         vdata->count.delete_list.local_error = 0;
961         vdata->count.delete_list.skipped = 0;
962         vdata->count.delete_list.deleted = 0;
963
964         /* the list needs to be of length num_nodes */
965         vdata->vacuum_fetch_list = talloc_zero_array(vdata,
966                                                 struct ctdb_marshall_buffer *,
967                                                 ctdb->num_nodes);
968         if (vdata->vacuum_fetch_list == NULL) {
969                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
970                 goto fail;
971         }
972         for (i = 0; i < ctdb->num_nodes; i++) {
973                 vdata->vacuum_fetch_list[i] = (struct ctdb_marshall_buffer *)
974                         talloc_zero_size(vdata->vacuum_fetch_list,
975                                          offsetof(struct ctdb_marshall_buffer, data));
976                 if (vdata->vacuum_fetch_list[i] == NULL) {
977                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
978                         talloc_free(vdata);
979                         return NULL;
980                 }
981                 vdata->vacuum_fetch_list[i]->db_id = ctdb_db->db_id;
982         }
983
984         return vdata;
985
986 fail:
987         talloc_free(vdata);
988         return NULL;
989 }
990
991 /**
992  * Vacuum a DB:
993  *  - Always do the fast vacuuming run, which traverses
994  *    the in-memory delete queue: these records have been
995  *    scheduled for deletion.
996  *  - Only if explicitly requested, the database is traversed
997  *    in order to use the traditional heuristics on empty records
998  *    to trigger deletion.
999  *    This is done only every VacuumFastPathCount'th vacuuming run.
1000  *
1001  * The traverse runs fill two lists:
1002  *
1003  * - The delete_list:
1004  *   This is the list of empty records the current
1005  *   node is lmaster and dmaster for. These records are later
1006  *   deleted first on other nodes and then locally.
1007  *
1008  *   The fast vacuuming run has a short cut for those records
1009  *   that have never been migrated with data: these records
1010  *   are immediately deleted locally, since they have left
1011  *   no trace on other nodes.
1012  *
1013  * - The vacuum_fetch lists
1014  *   (one for each other lmaster node):
1015  *   The records in this list are sent for deletion to
1016  *   their lmaster in a bulk VACUUM_FETCH message.
1017  *
1018  *   The lmaster then migrates all these records to itelf
1019  *   so that they can be vacuumed there.
1020  *
1021  * This executes in the child context.
1022  */
1023 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
1024                           bool full_vacuum_run)
1025 {
1026         struct ctdb_context *ctdb = ctdb_db->ctdb;
1027         int ret, pnn;
1028         struct vacuum_data *vdata;
1029         TALLOC_CTX *tmp_ctx;
1030
1031         DEBUG(DEBUG_INFO, (__location__ " Entering %s vacuum run for db "
1032                            "%s db_id[0x%08x]\n",
1033                            full_vacuum_run ? "full" : "fast",
1034                            ctdb_db->db_name, ctdb_db->db_id));
1035
1036         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
1037         if (ret != 0) {
1038                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
1039                 return ret;
1040         }
1041
1042         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1043         if (pnn == -1) {
1044                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
1045                 return -1;
1046         }
1047
1048         ctdb->pnn = pnn;
1049
1050         tmp_ctx = talloc_new(ctdb_db);
1051         if (tmp_ctx == NULL) {
1052                 DEBUG(DEBUG_ERR, ("Out of memory!\n"));
1053                 return -1;
1054         }
1055
1056         vdata = ctdb_vacuum_init_vacuum_data(ctdb_db, tmp_ctx);
1057         if (vdata == NULL) {
1058                 talloc_free(tmp_ctx);
1059                 return -1;
1060         }
1061
1062         if (full_vacuum_run) {
1063                 ctdb_vacuum_traverse_db(ctdb_db, vdata);
1064         }
1065
1066         ctdb_process_delete_queue(ctdb_db, vdata);
1067
1068         ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
1069
1070         ctdb_process_delete_list(ctdb_db, vdata);
1071
1072         talloc_free(tmp_ctx);
1073
1074         /* this ensures we run our event queue */
1075         ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1076
1077         return 0;
1078 }
1079
1080 /*
1081  * repack and vaccum a db
1082  * called from the child context
1083  */
1084 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
1085                                      bool full_vacuum_run)
1086 {
1087         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
1088         const char *name = ctdb_db->db_name;
1089         int freelist_size = 0;
1090         int ret;
1091
1092         if (ctdb_vacuum_db(ctdb_db, full_vacuum_run) != 0) {
1093                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
1094         }
1095
1096         freelist_size = tdb_freelist_size(ctdb_db->ltdb->tdb);
1097         if (freelist_size == -1) {
1098                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
1099                 return -1;
1100         }
1101
1102         /*
1103          * decide if a repack is necessary
1104          */
1105         if ((repack_limit == 0 || (uint32_t)freelist_size < repack_limit))
1106         {
1107                 return 0;
1108         }
1109
1110         DEBUG(DEBUG_INFO, ("Repacking %s with %u freelist entries\n",
1111                            name, freelist_size));
1112
1113         ret = tdb_repack(ctdb_db->ltdb->tdb);
1114         if (ret != 0) {
1115                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
1116                 return -1;
1117         }
1118
1119         return 0;
1120 }
1121
1122 static uint32_t get_vacuum_interval(struct ctdb_db_context *ctdb_db)
1123 {
1124         uint32_t interval = ctdb_db->ctdb->tunable.vacuum_interval;
1125
1126         return interval;
1127 }
1128
1129 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
1130 {
1131         double l = timeval_elapsed(&child_ctx->start_time);
1132         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
1133         struct ctdb_context *ctdb = ctdb_db->ctdb;
1134
1135         CTDB_UPDATE_DB_LATENCY(ctdb_db, "vacuum", vacuum.latency, l);
1136         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
1137
1138         if (child_ctx->child_pid != -1) {
1139                 ctdb_kill(ctdb, child_ctx->child_pid, SIGKILL);
1140         } else {
1141                 /* Bump the number of successful fast-path runs. */
1142                 child_ctx->vacuum_handle->fast_path_count++;
1143         }
1144
1145         DLIST_REMOVE(ctdb->vacuumers, child_ctx);
1146
1147         tevent_add_timer(ctdb->ev, child_ctx->vacuum_handle,
1148                          timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1149                          ctdb_vacuum_event, child_ctx->vacuum_handle);
1150
1151         return 0;
1152 }
1153
1154 /*
1155  * this event is generated when a vacuum child process times out
1156  */
1157 static void vacuum_child_timeout(struct tevent_context *ev,
1158                                  struct tevent_timer *te,
1159                                  struct timeval t, void *private_data)
1160 {
1161         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
1162
1163         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
1164
1165         child_ctx->status = VACUUM_TIMEOUT;
1166
1167         talloc_free(child_ctx);
1168 }
1169
1170
1171 /*
1172  * this event is generated when a vacuum child process has completed
1173  */
1174 static void vacuum_child_handler(struct tevent_context *ev,
1175                                  struct tevent_fd *fde,
1176                                  uint16_t flags, void *private_data)
1177 {
1178         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
1179         char c = 0;
1180         int ret;
1181
1182         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
1183         child_ctx->child_pid = -1;
1184
1185         ret = sys_read(child_ctx->fd[0], &c, 1);
1186         if (ret != 1 || c != 0) {
1187                 child_ctx->status = VACUUM_ERROR;
1188                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
1189         } else {
1190                 child_ctx->status = VACUUM_OK;
1191         }
1192
1193         talloc_free(child_ctx);
1194 }
1195
1196 /*
1197  * this event is called every time we need to start a new vacuum process
1198  */
1199 static void ctdb_vacuum_event(struct tevent_context *ev,
1200                               struct tevent_timer *te,
1201                               struct timeval t, void *private_data)
1202 {
1203         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
1204         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
1205         struct ctdb_context *ctdb = ctdb_db->ctdb;
1206         struct ctdb_vacuum_child_context *child_ctx;
1207         struct tevent_fd *fde;
1208         bool full_vacuum_run = false;
1209         int ret;
1210
1211         /* we don't vacuum if we are in recovery mode, or db frozen */
1212         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
1213             ctdb_db_frozen(ctdb_db)) {
1214                 DEBUG(DEBUG_INFO, ("Not vacuuming %s (%s)\n", ctdb_db->db_name,
1215                                    ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ?
1216                                         "in recovery" : "frozen"));
1217                 tevent_add_timer(ctdb->ev, vacuum_handle,
1218                                  timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1219                                  ctdb_vacuum_event, vacuum_handle);
1220                 return;
1221         }
1222
1223         /* Do not allow multiple vacuuming child processes to be active at the
1224          * same time.  If there is vacuuming child process active, delay
1225          * new vacuuming event to stagger vacuuming events.
1226          */
1227         if (ctdb->vacuumers != NULL) {
1228                 tevent_add_timer(ctdb->ev, vacuum_handle,
1229                                  timeval_current_ofs(0, 500*1000),
1230                                  ctdb_vacuum_event, vacuum_handle);
1231                 return;
1232         }
1233
1234         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
1235         if (child_ctx == NULL) {
1236                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
1237                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
1238         }
1239
1240
1241         ret = pipe(child_ctx->fd);
1242         if (ret != 0) {
1243                 talloc_free(child_ctx);
1244                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
1245                 tevent_add_timer(ctdb->ev, vacuum_handle,
1246                                  timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1247                                  ctdb_vacuum_event, vacuum_handle);
1248                 return;
1249         }
1250
1251         if (vacuum_handle->fast_path_count >=
1252             ctdb->tunable.vacuum_fast_path_count) {
1253                 if (ctdb->tunable.vacuum_fast_path_count > 0) {
1254                         full_vacuum_run = true;
1255                 }
1256                 vacuum_handle->fast_path_count = 0;
1257         }
1258
1259         child_ctx->child_pid = ctdb_fork(ctdb);
1260         if (child_ctx->child_pid == (pid_t)-1) {
1261                 close(child_ctx->fd[0]);
1262                 close(child_ctx->fd[1]);
1263                 talloc_free(child_ctx);
1264                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
1265                 tevent_add_timer(ctdb->ev, vacuum_handle,
1266                                  timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1267                                  ctdb_vacuum_event, vacuum_handle);
1268                 return;
1269         }
1270
1271
1272         if (child_ctx->child_pid == 0) {
1273                 char cc = 0;
1274                 close(child_ctx->fd[0]);
1275
1276                 DEBUG(DEBUG_INFO,("Vacuuming child process %d for db %s started\n", getpid(), ctdb_db->db_name));
1277                 prctl_set_comment("ctdb_vacuum");
1278                 if (switch_from_server_to_client(ctdb) != 0) {
1279                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch vacuum daemon into client mode. Shutting down.\n"));
1280                         _exit(1);
1281                 }
1282
1283                 cc = ctdb_vacuum_and_repack_db(ctdb_db, full_vacuum_run);
1284
1285                 sys_write(child_ctx->fd[1], &cc, 1);
1286                 _exit(0);
1287         }
1288
1289         set_close_on_exec(child_ctx->fd[0]);
1290         close(child_ctx->fd[1]);
1291
1292         child_ctx->status = VACUUM_RUNNING;
1293         child_ctx->start_time = timeval_current();
1294
1295         DLIST_ADD(ctdb->vacuumers, child_ctx);
1296         talloc_set_destructor(child_ctx, vacuum_child_destructor);
1297
1298         /*
1299          * Clear the fastpath vacuuming list in the parent.
1300          */
1301         talloc_free(ctdb_db->delete_queue);
1302         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
1303         if (ctdb_db->delete_queue == NULL) {
1304                 /* fatal here? ... */
1305                 ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
1306                                  "in parent context. Shutting down\n");
1307         }
1308
1309         tevent_add_timer(ctdb->ev, child_ctx,
1310                          timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
1311                          vacuum_child_timeout, child_ctx);
1312
1313         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
1314
1315         fde = tevent_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
1316                             TEVENT_FD_READ, vacuum_child_handler, child_ctx);
1317         tevent_fd_set_auto_close(fde);
1318
1319         vacuum_handle->child_ctx = child_ctx;
1320         child_ctx->vacuum_handle = vacuum_handle;
1321 }
1322
1323 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
1324 {
1325         /* Simply free them all. */
1326         while (ctdb->vacuumers) {
1327                 DEBUG(DEBUG_INFO, ("Aborting vacuuming for %s (%i)\n",
1328                            ctdb->vacuumers->vacuum_handle->ctdb_db->db_name,
1329                            (int)ctdb->vacuumers->child_pid));
1330                 /* vacuum_child_destructor kills it, removes from list */
1331                 talloc_free(ctdb->vacuumers);
1332         }
1333 }
1334
1335 /* this function initializes the vacuuming context for a database
1336  * starts the vacuuming events
1337  */
1338 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
1339 {
1340         if (! ctdb_db_volatile(ctdb_db)) {
1341                 DEBUG(DEBUG_ERR,
1342                       ("Vacuuming is disabled for non-volatile database %s\n",
1343                        ctdb_db->db_name));
1344                 return 0;
1345         }
1346
1347         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
1348         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
1349
1350         ctdb_db->vacuum_handle->ctdb_db         = ctdb_db;
1351         ctdb_db->vacuum_handle->fast_path_count = 0;
1352
1353         tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle,
1354                          timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1355                          ctdb_vacuum_event, ctdb_db->vacuum_handle);
1356
1357         return 0;
1358 }
1359
1360 static void remove_record_from_delete_queue(struct ctdb_db_context *ctdb_db,
1361                                             const struct ctdb_ltdb_header *hdr,
1362                                             const TDB_DATA key)
1363 {
1364         struct delete_record_data *kd;
1365         uint32_t hash;
1366
1367         hash = (uint32_t)ctdb_hash(&key);
1368
1369         DEBUG(DEBUG_DEBUG, (__location__
1370                             " remove_record_from_delete_queue: "
1371                             "db[%s] "
1372                             "db_id[0x%08x] "
1373                             "key_hash[0x%08x] "
1374                             "lmaster[%u] "
1375                             "migrated_with_data[%s]\n",
1376                              ctdb_db->db_name, ctdb_db->db_id,
1377                              hash,
1378                              ctdb_lmaster(ctdb_db->ctdb, &key),
1379                              hdr->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
1380
1381         kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
1382         if (kd == NULL) {
1383                 DEBUG(DEBUG_DEBUG, (__location__
1384                                     " remove_record_from_delete_queue: "
1385                                     "record not in queue (hash[0x%08x])\n.",
1386                                     hash));
1387                 return;
1388         }
1389
1390         if ((kd->key.dsize != key.dsize) ||
1391             (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
1392         {
1393                 DEBUG(DEBUG_DEBUG, (__location__
1394                                     " remove_record_from_delete_queue: "
1395                                     "hash collision for key with hash[0x%08x] "
1396                                     "in db[%s] - skipping\n",
1397                                     hash, ctdb_db->db_name));
1398                 return;
1399         }
1400
1401         DEBUG(DEBUG_DEBUG, (__location__
1402                             " remove_record_from_delete_queue: "
1403                             "removing key with hash[0x%08x]\n",
1404                              hash));
1405
1406         talloc_free(kd);
1407
1408         return;
1409 }
1410
1411 /**
1412  * Insert a record into the ctdb_db context's delete queue,
1413  * handling hash collisions.
1414  */
1415 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
1416                                            const struct ctdb_ltdb_header *hdr,
1417                                            TDB_DATA key)
1418 {
1419         struct delete_record_data *kd;
1420         uint32_t hash;
1421         int ret;
1422
1423         hash = (uint32_t)ctdb_hash(&key);
1424
1425         DEBUG(DEBUG_DEBUG, (__location__ " schedule for deletion: db[%s] "
1426                             "db_id[0x%08x] "
1427                             "key_hash[0x%08x] "
1428                             "lmaster[%u] "
1429                             "migrated_with_data[%s]\n",
1430                             ctdb_db->db_name, ctdb_db->db_id,
1431                             hash,
1432                             ctdb_lmaster(ctdb_db->ctdb, &key),
1433                             hdr->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
1434
1435         kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
1436         if (kd != NULL) {
1437                 if ((kd->key.dsize != key.dsize) ||
1438                     (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
1439                 {
1440                         DEBUG(DEBUG_INFO,
1441                               (__location__ " schedule for deletion: "
1442                                "hash collision for key hash [0x%08x]. "
1443                                "Skipping the record.\n", hash));
1444                         return 0;
1445                 } else {
1446                         DEBUG(DEBUG_DEBUG,
1447                               (__location__ " schedule for deletion: "
1448                                "updating entry for key with hash [0x%08x].\n",
1449                                hash));
1450                 }
1451         }
1452
1453         ret = insert_delete_record_data_into_tree(ctdb_db->ctdb, ctdb_db,
1454                                                   ctdb_db->delete_queue,
1455                                                   hdr, key);
1456         if (ret != 0) {
1457                 DEBUG(DEBUG_INFO,
1458                       (__location__ " schedule for deletion: error "
1459                        "inserting key with hash [0x%08x] into delete queue\n",
1460                        hash));
1461                 return -1;
1462         }
1463
1464         return 0;
1465 }
1466
1467 /**
1468  * Schedule a record for deletetion.
1469  * Called from the parent context.
1470  */
1471 int32_t ctdb_control_schedule_for_deletion(struct ctdb_context *ctdb,
1472                                            TDB_DATA indata)
1473 {
1474         struct ctdb_control_schedule_for_deletion *dd;
1475         struct ctdb_db_context *ctdb_db;
1476         int ret;
1477         TDB_DATA key;
1478
1479         dd = (struct ctdb_control_schedule_for_deletion *)indata.dptr;
1480
1481         ctdb_db = find_ctdb_db(ctdb, dd->db_id);
1482         if (ctdb_db == NULL) {
1483                 DEBUG(DEBUG_ERR, (__location__ " Unknown db id 0x%08x\n",
1484                                   dd->db_id));
1485                 return -1;
1486         }
1487
1488         key.dsize = dd->keylen;
1489         key.dptr = dd->key;
1490
1491         ret = insert_record_into_delete_queue(ctdb_db, &dd->hdr, key);
1492
1493         return ret;
1494 }
1495
1496 int32_t ctdb_local_schedule_for_deletion(struct ctdb_db_context *ctdb_db,
1497                                          const struct ctdb_ltdb_header *hdr,
1498                                          TDB_DATA key)
1499 {
1500         int ret;
1501         struct ctdb_control_schedule_for_deletion *dd;
1502         TDB_DATA indata;
1503         int32_t status;
1504
1505         if (ctdb_db->ctdb->ctdbd_pid == getpid()) {
1506                 /* main daemon - directly queue */
1507                 ret = insert_record_into_delete_queue(ctdb_db, hdr, key);
1508
1509                 return ret;
1510         }
1511
1512         /* if we don't have a connection to the daemon we can not send
1513            a control. For example sometimes from update_record control child
1514            process.
1515         */
1516         if (!ctdb_db->ctdb->can_send_controls) {
1517                 return -1;
1518         }
1519
1520
1521         /* child process: send the main daemon a control */
1522         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + key.dsize;
1523         indata.dptr = talloc_zero_array(ctdb_db, uint8_t, indata.dsize);
1524         if (indata.dptr == NULL) {
1525                 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
1526                 return -1;
1527         }
1528         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
1529         dd->db_id = ctdb_db->db_id;
1530         dd->hdr = *hdr;
1531         dd->keylen = key.dsize;
1532         memcpy(dd->key, key.dptr, key.dsize);
1533
1534         ret = ctdb_control(ctdb_db->ctdb,
1535                            CTDB_CURRENT_NODE,
1536                            ctdb_db->db_id,
1537                            CTDB_CONTROL_SCHEDULE_FOR_DELETION,
1538                            CTDB_CTRL_FLAG_NOREPLY, /* flags */
1539                            indata,
1540                            NULL, /* mem_ctx */
1541                            NULL, /* outdata */
1542                            &status,
1543                            NULL, /* timeout : NULL == wait forever */
1544                            NULL); /* error message */
1545
1546         talloc_free(indata.dptr);
1547
1548         if (ret != 0 || status != 0) {
1549                 DEBUG(DEBUG_ERR, (__location__ " Error sending "
1550                                   "SCHEDULE_FOR_DELETION "
1551                                   "control.\n"));
1552                 if (status != 0) {
1553                         ret = -1;
1554                 }
1555         }
1556
1557         return ret;
1558 }
1559
1560 void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,
1561                                          const struct ctdb_ltdb_header *hdr,
1562                                          const TDB_DATA key)
1563 {
1564         if (ctdb_db->ctdb->ctdbd_pid != getpid()) {
1565                 /*
1566                  * Only remove the record from the delete queue if called
1567                  * in the main daemon.
1568                  */
1569                 return;
1570         }
1571
1572         remove_record_from_delete_queue(ctdb_db, hdr, key);
1573
1574         return;
1575 }