b21e0954cd427773fa0fd8c985e81eb397560571
[metze/samba/wip.git] / ctdb / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5    Copyright (C) Michael Adam 2010-2013
6    Copyright (C) Stefan Metzmacher 2010-2011
7
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "replace.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/time.h"
26
27 #include <talloc.h>
28 #include <tevent.h>
29
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/dlinklist.h"
32 #include "lib/util/debug.h"
33 #include "lib/util/samba_util.h"
34 #include "lib/util/sys_rw.h"
35 #include "lib/util/util_process.h"
36
37 #include "ctdb_private.h"
38 #include "ctdb_client.h"
39
40 #include "common/rb_tree.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44 #include "protocol/protocol_api.h"
45
46 #define TIMELIMIT() timeval_current_ofs(10, 0)
47
48 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
49
50 struct ctdb_vacuum_child_context {
51         struct ctdb_vacuum_handle *vacuum_handle;
52         /* fd child writes status to */
53         int fd[2];
54         pid_t child_pid;
55         enum vacuum_child_status status;
56         struct timeval start_time;
57 };
58
59 struct ctdb_vacuum_handle {
60         struct ctdb_db_context *ctdb_db;
61         uint32_t fast_path_count;
62 };
63
64
65 /*  a list of records to possibly delete */
66 struct vacuum_data {
67         struct ctdb_context *ctdb;
68         struct ctdb_db_context *ctdb_db;
69         struct tdb_context *dest_db;
70         trbt_tree_t *delete_list;
71         struct ctdb_marshall_buffer **vacuum_fetch_list;
72         struct timeval start;
73         bool traverse_error;
74         bool vacuum;
75         struct {
76                 struct {
77                         uint32_t added_to_vacuum_fetch_list;
78                         uint32_t added_to_delete_list;
79                         uint32_t deleted;
80                         uint32_t skipped;
81                         uint32_t error;
82                         uint32_t total;
83                 } delete_queue;
84                 struct {
85                         uint32_t scheduled;
86                         uint32_t skipped;
87                         uint32_t error;
88                         uint32_t total;
89                 } db_traverse;
90                 struct {
91                         uint32_t total;
92                         uint32_t remote_error;
93                         uint32_t local_error;
94                         uint32_t deleted;
95                         uint32_t skipped;
96                         uint32_t left;
97                 } delete_list;
98                 struct {
99                         uint32_t vacuumed;
100                         uint32_t copied;
101                 } repack;
102         } count;
103 };
104
105 /* this structure contains the information for one record to be deleted */
106 struct delete_record_data {
107         struct ctdb_context *ctdb;
108         struct ctdb_db_context *ctdb_db;
109         struct ctdb_ltdb_header hdr;
110         uint32_t remote_fail_count;
111         TDB_DATA key;
112         uint8_t keydata[1];
113 };
114
115 struct delete_records_list {
116         struct ctdb_marshall_buffer *records;
117         struct vacuum_data *vdata;
118 };
119
120 struct fetch_record_data {
121         TDB_DATA key;
122         uint8_t keydata[1];
123 };
124
125 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
126                                            const struct ctdb_ltdb_header *hdr,
127                                            TDB_DATA key);
128
129 /**
130  * Store key and header in a tree, indexed by the key hash.
131  */
132 static int insert_delete_record_data_into_tree(struct ctdb_context *ctdb,
133                                                struct ctdb_db_context *ctdb_db,
134                                                trbt_tree_t *tree,
135                                                const struct ctdb_ltdb_header *hdr,
136                                                TDB_DATA key)
137 {
138         struct delete_record_data *dd;
139         uint32_t hash;
140         size_t len;
141
142         len = offsetof(struct delete_record_data, keydata) + key.dsize;
143
144         dd = (struct delete_record_data *)talloc_size(tree, len);
145         if (dd == NULL) {
146                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
147                 return -1;
148         }
149         talloc_set_name_const(dd, "struct delete_record_data");
150
151         dd->ctdb      = ctdb;
152         dd->ctdb_db   = ctdb_db;
153         dd->key.dsize = key.dsize;
154         dd->key.dptr  = dd->keydata;
155         memcpy(dd->keydata, key.dptr, key.dsize);
156
157         dd->hdr = *hdr;
158         dd->remote_fail_count = 0;
159
160         hash = ctdb_hash(&key);
161
162         trbt_insert32(tree, hash, dd);
163
164         return 0;
165 }
166
167 static int add_record_to_delete_list(struct vacuum_data *vdata, TDB_DATA key,
168                                      struct ctdb_ltdb_header *hdr)
169 {
170         struct ctdb_context *ctdb = vdata->ctdb;
171         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
172         uint32_t hash;
173         int ret;
174
175         hash = ctdb_hash(&key);
176
177         if (trbt_lookup32(vdata->delete_list, hash)) {
178                 DEBUG(DEBUG_INFO, (__location__ " Hash collision when vacuuming, skipping this record.\n"));
179                 return 0;
180         }
181
182         ret = insert_delete_record_data_into_tree(ctdb, ctdb_db,
183                                                   vdata->delete_list,
184                                                   hdr, key);
185         if (ret != 0) {
186                 return -1;
187         }
188
189         vdata->count.delete_list.total++;
190
191         return 0;
192 }
193
194 /**
195  * Add a record to the list of records to be sent
196  * to their lmaster with VACUUM_FETCH.
197  */
198 static int add_record_to_vacuum_fetch_list(struct vacuum_data *vdata,
199                                            TDB_DATA key)
200 {
201         struct ctdb_context *ctdb = vdata->ctdb;
202         uint32_t lmaster;
203         struct ctdb_marshall_buffer *vfl;
204
205         lmaster = ctdb_lmaster(ctdb, &key);
206
207         vfl = vdata->vacuum_fetch_list[lmaster];
208
209         vfl = ctdb_marshall_add(ctdb, vfl, vfl->db_id, ctdb->pnn,
210                                 key, NULL, tdb_null);
211         if (vfl == NULL) {
212                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
213                 vdata->traverse_error = true;
214                 return -1;
215         }
216
217         vdata->vacuum_fetch_list[lmaster] = vfl;
218
219         return 0;
220 }
221
222
223 static void ctdb_vacuum_event(struct tevent_context *ev,
224                               struct tevent_timer *te,
225                               struct timeval t, void *private_data);
226
227 static int vacuum_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
228 {
229         struct ctdb_ltdb_header *header =
230                 (struct ctdb_ltdb_header *)private_data;
231
232         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
233                 return -1;
234         }
235
236         *header = *(struct ctdb_ltdb_header *)data.dptr;
237
238         return 0;
239 }
240
241 /*
242  * traverse function for gathering the records that can be deleted
243  */
244 static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
245                            void *private_data)
246 {
247         struct vacuum_data *vdata = talloc_get_type(private_data,
248                                                     struct vacuum_data);
249         struct ctdb_context *ctdb = vdata->ctdb;
250         struct ctdb_db_context *ctdb_db = vdata->ctdb_db;
251         uint32_t lmaster;
252         struct ctdb_ltdb_header *hdr;
253         int res = 0;
254
255         vdata->count.db_traverse.total++;
256
257         lmaster = ctdb_lmaster(ctdb, &key);
258         if (lmaster >= ctdb->num_nodes) {
259                 vdata->count.db_traverse.error++;
260                 DEBUG(DEBUG_CRIT, (__location__
261                                    " lmaster[%u] >= ctdb->num_nodes[%u] for key"
262                                    " with hash[%u]!\n",
263                                    (unsigned)lmaster,
264                                    (unsigned)ctdb->num_nodes,
265                                    (unsigned)ctdb_hash(&key)));
266                 return -1;
267         }
268
269         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
270                 /* it is not a deleted record */
271                 vdata->count.db_traverse.skipped++;
272                 return 0;
273         }
274
275         hdr = (struct ctdb_ltdb_header *)data.dptr;
276
277         if (hdr->dmaster != ctdb->pnn) {
278                 vdata->count.db_traverse.skipped++;
279                 return 0;
280         }
281
282         /*
283          * Add the record to this process's delete_queue for processing
284          * in the subsequent traverse in the fast vacuum run.
285          */
286         res = insert_record_into_delete_queue(ctdb_db, hdr, key);
287         if (res != 0) {
288                 vdata->count.db_traverse.error++;
289         } else {
290                 vdata->count.db_traverse.scheduled++;
291         }
292
293         return 0;
294 }
295
296 /*
297  * traverse the tree of records to delete and marshall them into
298  * a blob
299  */
300 static int delete_marshall_traverse(void *param, void *data)
301 {
302         struct delete_record_data *dd = talloc_get_type(data, struct delete_record_data);
303         struct delete_records_list *recs = talloc_get_type(param, struct delete_records_list);
304         struct ctdb_marshall_buffer *m;
305
306         m = ctdb_marshall_add(recs, recs->records, recs->records->db_id,
307                               recs->records->db_id,
308                               dd->key, &dd->hdr, tdb_null);
309         if (m == NULL) {
310                 DEBUG(DEBUG_ERR, (__location__ " failed to marshall record\n"));
311                 return -1;
312         }
313
314         recs->records = m;
315         return 0;
316 }
317
318 struct fetch_queue_state {
319         struct ctdb_db_context *ctdb_db;
320         int count;
321 };
322
323 struct fetch_record_migrate_state {
324         struct fetch_queue_state *fetch_queue;
325         TDB_DATA key;
326 };
327
328 static void fetch_record_migrate_callback(struct ctdb_client_call_state *state)
329 {
330         struct fetch_record_migrate_state *fetch = talloc_get_type_abort(
331                 state->async.private_data, struct fetch_record_migrate_state);
332         struct fetch_queue_state *fetch_queue = fetch->fetch_queue;
333         struct ctdb_ltdb_header hdr;
334         struct ctdb_call call = { 0 };
335         int ret;
336
337         ret = ctdb_call_recv(state, &call);
338         fetch_queue->count--;
339         if (ret != 0) {
340                 D_ERR("Failed to migrate record for vacuuming\n");
341                 goto done;
342         }
343
344         ret = tdb_chainlock_nonblock(fetch_queue->ctdb_db->ltdb->tdb,
345                                      fetch->key);
346         if (ret != 0) {
347                 goto done;
348         }
349
350         ret = tdb_parse_record(fetch_queue->ctdb_db->ltdb->tdb,
351                                fetch->key,
352                                vacuum_record_parser,
353                                &hdr);
354
355         tdb_chainunlock(fetch_queue->ctdb_db->ltdb->tdb, fetch->key);
356
357         if (ret != 0) {
358                 goto done;
359         }
360
361         D_INFO("Vacuum Fetch record, key=%.*s\n",
362                (int)fetch->key.dsize,
363                fetch->key.dptr);
364
365         (void) ctdb_local_schedule_for_deletion(fetch_queue->ctdb_db,
366                                                 &hdr,
367                                                 fetch->key);
368
369 done:
370         talloc_free(fetch);
371 }
372
373 static int fetch_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
374 {
375         struct ctdb_ltdb_header *header =
376                 (struct ctdb_ltdb_header *)private_data;
377
378         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
379                 return -1;
380         }
381
382         memcpy(header, data.dptr, sizeof(*header));
383         return 0;
384 }
385
386 /**
387  * traverse function for the traversal of the fetch_queue.
388  *
389  * Send a record migration request.
390  */
391 static int fetch_queue_traverse(void *param, void *data)
392 {
393         struct fetch_record_data *rd = talloc_get_type_abort(
394                 data, struct fetch_record_data);
395         struct fetch_queue_state *fetch_queue =
396                 (struct fetch_queue_state *)param;
397         struct ctdb_db_context *ctdb_db = fetch_queue->ctdb_db;
398         struct ctdb_client_call_state *state;
399         struct fetch_record_migrate_state *fetch;
400         struct ctdb_call call = { 0 };
401         struct ctdb_ltdb_header header;
402         int ret;
403
404         ret = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, rd->key);
405         if (ret != 0) {
406                 return 0;
407         }
408
409         ret = tdb_parse_record(ctdb_db->ltdb->tdb,
410                                rd->key,
411                                fetch_record_parser,
412                                &header);
413
414         tdb_chainunlock(ctdb_db->ltdb->tdb, rd->key);
415
416         if (ret != 0) {
417                 goto skipped;
418         }
419
420         if (header.dmaster == ctdb_db->ctdb->pnn) {
421                 /* If the record is already migrated, skip */
422                 goto skipped;
423         }
424
425         fetch = talloc_zero(ctdb_db, struct fetch_record_migrate_state);
426         if (fetch == NULL) {
427                 D_ERR("Failed to setup fetch record migrate state\n");
428                 return 0;
429         }
430
431         fetch->fetch_queue = fetch_queue;
432
433         fetch->key.dsize = rd->key.dsize;
434         fetch->key.dptr = talloc_memdup(fetch, rd->key.dptr, rd->key.dsize);
435         if (fetch->key.dptr == NULL) {
436                 D_ERR("Memory error in fetch_queue_traverse\n");
437                 talloc_free(fetch);
438                 return 0;
439         }
440
441         call.call_id = CTDB_NULL_FUNC;
442         call.flags = CTDB_IMMEDIATE_MIGRATION |
443                      CTDB_CALL_FLAG_VACUUM_MIGRATION;
444         call.key = fetch->key;
445
446         state = ctdb_call_send(ctdb_db, &call);
447         if (state == NULL) {
448                 DEBUG(DEBUG_ERR, ("Failed to setup vacuum fetch call\n"));
449                 talloc_free(fetch);
450                 return 0;
451         }
452
453         state->async.fn = fetch_record_migrate_callback;
454         state->async.private_data = fetch;
455
456         fetch_queue->count++;
457
458         return 0;
459
460 skipped:
461         D_INFO("Skipped Fetch record, key=%.*s\n",
462                (int)rd->key.dsize,
463                rd->key.dptr);
464         return 0;
465 }
466
467 /**
468  * Traverse the fetch.
469  * Records are migrated to the local node and
470  * added to delete queue for further processing.
471  */
472 static void ctdb_process_fetch_queue(struct ctdb_db_context *ctdb_db)
473 {
474         struct fetch_queue_state state;
475         int ret;
476
477         state.ctdb_db = ctdb_db;
478         state.count = 0;
479
480         ret = trbt_traversearray32(ctdb_db->fetch_queue, 1,
481                                    fetch_queue_traverse, &state);
482         if (ret != 0) {
483                 DEBUG(DEBUG_ERR, (__location__ " Error traversing "
484                       "the fetch queue.\n"));
485         }
486
487         /* Wait for all migrations to complete */
488         while (state.count > 0) {
489                 tevent_loop_once(ctdb_db->ctdb->ev);
490         }
491 }
492
493 /**
494  * traverse function for the traversal of the delete_queue,
495  * the fast-path vacuuming list.
496  *
497  *  - If the record has been migrated off the node
498  *    or has been revived (filled with data) on the node,
499  *    then skip the record.
500  *
501  *  - If the current node is the record's lmaster and it is
502  *    a record that has never been migrated with data, then
503  *    delete the record from the local tdb.
504  *
505  *  - If the current node is the record's lmaster and it has
506  *    been migrated with data, then schedule it for the normal
507  *    vacuuming procedure (i.e. add it to the delete_list).
508  *
509  *  - If the current node is NOT the record's lmaster then
510  *    add it to the list of records that are to be sent to
511  *    the lmaster with the VACUUM_FETCH message.
512  */
513 static int delete_queue_traverse(void *param, void *data)
514 {
515         struct delete_record_data *dd =
516                 talloc_get_type(data, struct delete_record_data);
517         struct vacuum_data *vdata = talloc_get_type(param, struct vacuum_data);
518         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
519         struct ctdb_context *ctdb = ctdb_db->ctdb; /* or dd->ctdb ??? */
520         int res;
521         struct ctdb_ltdb_header header;
522         uint32_t lmaster;
523         uint32_t hash = ctdb_hash(&(dd->key));
524
525         vdata->count.delete_queue.total++;
526
527         res = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, dd->key);
528         if (res != 0) {
529                 vdata->count.delete_queue.error++;
530                 return 0;
531         }
532
533         res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
534                                vacuum_record_parser, &header);
535         if (res != 0) {
536                 goto skipped;
537         }
538
539         if (header.dmaster != ctdb->pnn) {
540                 /* The record has been migrated off the node. Skip. */
541                 goto skipped;
542         }
543
544         if (header.rsn != dd->hdr.rsn) {
545                 /*
546                  * The record has been migrated off the node and back again.
547                  * But not requeued for deletion. Skip it.
548                  */
549                 goto skipped;
550         }
551
552         /*
553          * We are dmaster, and the record has no data, and it has
554          * not been migrated after it has been queued for deletion.
555          *
556          * At this stage, the record could still have been revived locally
557          * and last been written with empty data. This can only be
558          * fixed with the addition of an active or delete flag. (TODO)
559          */
560
561         lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
562
563         if (lmaster != ctdb->pnn) {
564                 res = add_record_to_vacuum_fetch_list(vdata, dd->key);
565
566                 if (res != 0) {
567                         DEBUG(DEBUG_ERR,
568                               (__location__ " Error adding record to list "
569                                "of records to send to lmaster.\n"));
570                         vdata->count.delete_queue.error++;
571                 } else {
572                         vdata->count.delete_queue.added_to_vacuum_fetch_list++;
573                 }
574                 goto done;
575         }
576
577         /* use header->flags or dd->hdr.flags ?? */
578         if (dd->hdr.flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
579                 res = add_record_to_delete_list(vdata, dd->key, &dd->hdr);
580
581                 if (res != 0) {
582                         DEBUG(DEBUG_ERR,
583                               (__location__ " Error adding record to list "
584                                "of records for deletion on lmaster.\n"));
585                         vdata->count.delete_queue.error++;
586                 } else {
587                         vdata->count.delete_queue.added_to_delete_list++;
588                 }
589         } else {
590                 res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
591
592                 if (res != 0) {
593                         DEBUG(DEBUG_ERR,
594                               (__location__ " Error deleting record with key "
595                                "hash [0x%08x] from local data base db[%s].\n",
596                                hash, ctdb_db->db_name));
597                         vdata->count.delete_queue.error++;
598                         goto done;
599                 }
600
601                 DEBUG(DEBUG_DEBUG,
602                       (__location__ " Deleted record with key hash "
603                        "[0x%08x] from local data base db[%s].\n",
604                        hash, ctdb_db->db_name));
605                 vdata->count.delete_queue.deleted++;
606         }
607
608         goto done;
609
610 skipped:
611         vdata->count.delete_queue.skipped++;
612
613 done:
614         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
615
616         return 0;
617 }
618
619 /**
620  * Delete the records that we are lmaster and dmaster for and
621  * that could be deleted on all other nodes via the TRY_DELETE_RECORDS
622  * control.
623  */
624 static int delete_record_traverse(void *param, void *data)
625 {
626         struct delete_record_data *dd =
627                 talloc_get_type(data, struct delete_record_data);
628         struct vacuum_data *vdata = talloc_get_type(param, struct vacuum_data);
629         struct ctdb_db_context *ctdb_db = dd->ctdb_db;
630         struct ctdb_context *ctdb = ctdb_db->ctdb;
631         int res;
632         struct ctdb_ltdb_header header;
633         uint32_t lmaster;
634         uint32_t hash = ctdb_hash(&(dd->key));
635
636         if (dd->remote_fail_count > 0) {
637                 vdata->count.delete_list.remote_error++;
638                 vdata->count.delete_list.left--;
639                 talloc_free(dd);
640                 return 0;
641         }
642
643         res = tdb_chainlock(ctdb_db->ltdb->tdb, dd->key);
644         if (res != 0) {
645                 DEBUG(DEBUG_ERR,
646                       (__location__ " Error getting chainlock on record with "
647                        "key hash [0x%08x] on database db[%s].\n",
648                        hash, ctdb_db->db_name));
649                 vdata->count.delete_list.local_error++;
650                 vdata->count.delete_list.left--;
651                 talloc_free(dd);
652                 return 0;
653         }
654
655         /*
656          * Verify that the record is still empty, its RSN has not
657          * changed and that we are still its lmaster and dmaster.
658          */
659
660         res = tdb_parse_record(ctdb_db->ltdb->tdb, dd->key,
661                                vacuum_record_parser, &header);
662         if (res != 0) {
663                 goto skip;
664         }
665
666         if (header.flags & CTDB_REC_RO_FLAGS) {
667                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
668                                    "on database db[%s] has read-only flags. "
669                                    "skipping.\n",
670                                    hash, ctdb_db->db_name));
671                 goto skip;
672         }
673
674         if (header.dmaster != ctdb->pnn) {
675                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
676                                    "on database db[%s] has been migrated away. "
677                                    "skipping.\n",
678                                    hash, ctdb_db->db_name));
679                 goto skip;
680         }
681
682         if (header.rsn != dd->hdr.rsn) {
683                 /*
684                  * The record has been migrated off the node and back again.
685                  * But not requeued for deletion. Skip it.
686                  */
687                 DEBUG(DEBUG_INFO, (__location__ ": record with hash [0x%08x] "
688                                    "on database db[%s] seems to have been "
689                                    "migrated away and back again (with empty "
690                                    "data). skipping.\n",
691                                    hash, ctdb_db->db_name));
692                 goto skip;
693         }
694
695         lmaster = ctdb_lmaster(ctdb_db->ctdb, &dd->key);
696
697         if (lmaster != ctdb->pnn) {
698                 DEBUG(DEBUG_INFO, (__location__ ": not lmaster for record in "
699                                    "delete list (key hash [0x%08x], db[%s]). "
700                                    "Strange! skipping.\n",
701                                    hash, ctdb_db->db_name));
702                 goto skip;
703         }
704
705         res = tdb_delete(ctdb_db->ltdb->tdb, dd->key);
706
707         if (res != 0) {
708                 DEBUG(DEBUG_ERR,
709                       (__location__ " Error deleting record with key hash "
710                        "[0x%08x] from local data base db[%s].\n",
711                        hash, ctdb_db->db_name));
712                 vdata->count.delete_list.local_error++;
713                 goto done;
714         }
715
716         DEBUG(DEBUG_DEBUG,
717               (__location__ " Deleted record with key hash [0x%08x] from "
718                "local data base db[%s].\n", hash, ctdb_db->db_name));
719
720         vdata->count.delete_list.deleted++;
721         goto done;
722
723 skip:
724         vdata->count.delete_list.skipped++;
725
726 done:
727         tdb_chainunlock(ctdb_db->ltdb->tdb, dd->key);
728
729         talloc_free(dd);
730         vdata->count.delete_list.left--;
731
732         return 0;
733 }
734
735 /**
736  * Traverse the delete_queue.
737  * Records are either deleted directly or filled
738  * into the delete list or the vacuum fetch lists
739  * for further processing.
740  */
741 static void ctdb_process_delete_queue(struct ctdb_db_context *ctdb_db,
742                                       struct vacuum_data *vdata)
743 {
744         uint32_t sum;
745         int ret;
746
747         ret = trbt_traversearray32(ctdb_db->delete_queue, 1,
748                                    delete_queue_traverse, vdata);
749
750         if (ret != 0) {
751                 DEBUG(DEBUG_ERR, (__location__ " Error traversing "
752                       "the delete queue.\n"));
753         }
754
755         sum = vdata->count.delete_queue.deleted
756             + vdata->count.delete_queue.skipped
757             + vdata->count.delete_queue.error
758             + vdata->count.delete_queue.added_to_delete_list
759             + vdata->count.delete_queue.added_to_vacuum_fetch_list;
760
761         if (vdata->count.delete_queue.total != sum) {
762                 DEBUG(DEBUG_ERR, (__location__ " Inconsistency in fast vacuum "
763                       "counts for db[%s]: total[%u] != sum[%u]\n",
764                       ctdb_db->db_name,
765                       (unsigned)vdata->count.delete_queue.total,
766                       (unsigned)sum));
767         }
768
769         if (vdata->count.delete_queue.total > 0) {
770                 DEBUG(DEBUG_INFO,
771                       (__location__
772                        " fast vacuuming delete_queue traverse statistics: "
773                        "db[%s] "
774                        "total[%u] "
775                        "del[%u] "
776                        "skp[%u] "
777                        "err[%u] "
778                        "adl[%u] "
779                        "avf[%u]\n",
780                        ctdb_db->db_name,
781                        (unsigned)vdata->count.delete_queue.total,
782                        (unsigned)vdata->count.delete_queue.deleted,
783                        (unsigned)vdata->count.delete_queue.skipped,
784                        (unsigned)vdata->count.delete_queue.error,
785                        (unsigned)vdata->count.delete_queue.added_to_delete_list,
786                        (unsigned)vdata->count.delete_queue.added_to_vacuum_fetch_list));
787         }
788
789         return;
790 }
791
792 /**
793  * read-only traverse of the database, looking for records that
794  * might be able to be vacuumed.
795  *
796  * This is not done each time but only every tunable
797  * VacuumFastPathCount times.
798  */
799 static void ctdb_vacuum_traverse_db(struct ctdb_db_context *ctdb_db,
800                                     struct vacuum_data *vdata)
801 {
802         int ret;
803
804         ret = tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata);
805         if (ret == -1 || vdata->traverse_error) {
806                 DEBUG(DEBUG_ERR, (__location__ " Traverse error in vacuuming "
807                                   "'%s'\n", ctdb_db->db_name));
808                 return;
809         }
810
811         if (vdata->count.db_traverse.total > 0) {
812                 DEBUG(DEBUG_INFO,
813                       (__location__
814                        " full vacuuming db traverse statistics: "
815                        "db[%s] "
816                        "total[%u] "
817                        "skp[%u] "
818                        "err[%u] "
819                        "sched[%u]\n",
820                        ctdb_db->db_name,
821                        (unsigned)vdata->count.db_traverse.total,
822                        (unsigned)vdata->count.db_traverse.skipped,
823                        (unsigned)vdata->count.db_traverse.error,
824                        (unsigned)vdata->count.db_traverse.scheduled));
825         }
826
827         return;
828 }
829
830 /**
831  * Process the vacuum fetch lists:
832  * For records for which we are not the lmaster, tell the lmaster to
833  * fetch the record.
834  */
835 static void ctdb_process_vacuum_fetch_lists(struct ctdb_db_context *ctdb_db,
836                                             struct vacuum_data *vdata)
837 {
838         unsigned int i;
839         struct ctdb_context *ctdb = ctdb_db->ctdb;
840         int ret, res;
841
842         for (i = 0; i < ctdb->num_nodes; i++) {
843                 TDB_DATA data;
844                 struct ctdb_marshall_buffer *vfl = vdata->vacuum_fetch_list[i];
845
846                 if (ctdb->nodes[i]->pnn == ctdb->pnn) {
847                         continue;
848                 }
849
850                 if (vfl->count == 0) {
851                         continue;
852                 }
853
854                 DEBUG(DEBUG_INFO, ("Found %u records for lmaster %u in '%s'\n",
855                                    vfl->count, ctdb->nodes[i]->pnn,
856                                    ctdb_db->db_name));
857
858                 data = ctdb_marshall_finish(vfl);
859
860                 ret = ctdb_control(ctdb, ctdb->nodes[i]->pnn, 0,
861                                    CTDB_CONTROL_VACUUM_FETCH, 0,
862                                    data, NULL, NULL, &res, NULL, NULL);
863                 if (ret != 0 || res != 0) {
864                         DEBUG(DEBUG_ERR, ("Failed to send vacuum "
865                                           "fetch control to node %u\n",
866                                           ctdb->nodes[i]->pnn));
867                 }
868         }
869 }
870
871 /**
872  * Process the delete list:
873  *
874  * This is the last step of vacuuming that consistently deletes
875  * those records that have been migrated with data and can hence
876  * not be deleted when leaving a node.
877  *
878  * In this step, the lmaster does the final deletion of those empty
879  * records that it is also dmaster for. It has ususally received
880  * at least some of these records previously from the former dmasters
881  * with the vacuum fetch message.
882  *
883  *  1) Send the records to all active nodes with the TRY_DELETE_RECORDS
884  *     control. The remote notes delete their local copy.
885  *  2) The lmaster locally deletes its copies of all records that
886  *     could successfully be deleted remotely in step #2.
887  */
888 static void ctdb_process_delete_list(struct ctdb_db_context *ctdb_db,
889                                      struct vacuum_data *vdata)
890 {
891         int ret, i;
892         struct ctdb_context *ctdb = ctdb_db->ctdb;
893         struct delete_records_list *recs;
894         TDB_DATA indata;
895         struct ctdb_node_map_old *nodemap;
896         uint32_t *active_nodes;
897         int num_active_nodes;
898         TALLOC_CTX *tmp_ctx;
899         uint32_t sum;
900
901         if (vdata->count.delete_list.total == 0) {
902                 return;
903         }
904
905         tmp_ctx = talloc_new(vdata);
906         if (tmp_ctx == NULL) {
907                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
908                 return;
909         }
910
911         vdata->count.delete_list.left = vdata->count.delete_list.total;
912
913         /*
914          * get the list of currently active nodes
915          */
916
917         ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(),
918                                    CTDB_CURRENT_NODE,
919                                    tmp_ctx,
920                                    &nodemap);
921         if (ret != 0) {
922                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
923                 goto done;
924         }
925
926         active_nodes = list_of_active_nodes(ctdb, nodemap,
927                                             nodemap, /* talloc context */
928                                             false /* include self */);
929         /* yuck! ;-) */
930         num_active_nodes = talloc_get_size(active_nodes)/sizeof(*active_nodes);
931
932         /*
933          * Now delete the records all active nodes in a two-phase process:
934          * 1) tell all active remote nodes to delete all their copy
935          * 2) if all remote nodes deleted their record copy, delete it locally
936          */
937
938         recs = talloc_zero(tmp_ctx, struct delete_records_list);
939         if (recs == NULL) {
940                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
941                 goto done;
942         }
943
944         /*
945          * Step 1:
946          * Send all records to all active nodes for deletion.
947          */
948
949         /*
950          * Create a marshall blob from the remaining list of records to delete.
951          */
952
953         recs->records = (struct ctdb_marshall_buffer *)
954                 talloc_zero_size(recs,
955                                  offsetof(struct ctdb_marshall_buffer, data));
956         if (recs->records == NULL) {
957                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
958                 goto done;
959         }
960         recs->records->db_id = ctdb_db->db_id;
961
962         ret = trbt_traversearray32(vdata->delete_list, 1,
963                                    delete_marshall_traverse, recs);
964         if (ret != 0) {
965                 DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
966                       "delete list for second marshalling.\n"));
967                 goto done;
968         }
969
970         indata = ctdb_marshall_finish(recs->records);
971
972         for (i = 0; i < num_active_nodes; i++) {
973                 struct ctdb_marshall_buffer *records;
974                 struct ctdb_rec_data_old *rec;
975                 int32_t res;
976                 TDB_DATA outdata;
977
978                 ret = ctdb_control(ctdb, active_nodes[i], 0,
979                                 CTDB_CONTROL_TRY_DELETE_RECORDS, 0,
980                                 indata, recs, &outdata, &res,
981                                 NULL, NULL);
982                 if (ret != 0 || res != 0) {
983                         DEBUG(DEBUG_ERR, ("Failed to delete records on "
984                                           "node %u: ret[%d] res[%d]\n",
985                                           active_nodes[i], ret, res));
986                         goto done;
987                 }
988
989                 /*
990                  * outdata contains the list of records coming back
991                  * from the node: These are the records that the
992                  * remote node could not delete. We remove these from
993                  * the list to delete locally.
994                  */
995                 records = (struct ctdb_marshall_buffer *)outdata.dptr;
996                 rec = (struct ctdb_rec_data_old *)&records->data[0];
997                 while (records->count-- > 0) {
998                         TDB_DATA reckey, recdata;
999                         struct ctdb_ltdb_header *rechdr;
1000                         struct delete_record_data *dd;
1001
1002                         reckey.dptr = &rec->data[0];
1003                         reckey.dsize = rec->keylen;
1004                         recdata.dptr = &rec->data[reckey.dsize];
1005                         recdata.dsize = rec->datalen;
1006
1007                         if (recdata.dsize < sizeof(struct ctdb_ltdb_header)) {
1008                                 DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
1009                                 goto done;
1010                         }
1011                         rechdr = (struct ctdb_ltdb_header *)recdata.dptr;
1012                         recdata.dptr += sizeof(*rechdr);
1013                         recdata.dsize -= sizeof(*rechdr);
1014
1015                         dd = (struct delete_record_data *)trbt_lookup32(
1016                                         vdata->delete_list,
1017                                         ctdb_hash(&reckey));
1018                         if (dd != NULL) {
1019                                 /*
1020                                  * The remote node could not delete the
1021                                  * record.  Since other remote nodes can
1022                                  * also fail, we just mark the record.
1023                                  */
1024                                 dd->remote_fail_count++;
1025                         } else {
1026                                 DEBUG(DEBUG_ERR, (__location__ " Failed to "
1027                                       "find record with hash 0x%08x coming "
1028                                       "back from TRY_DELETE_RECORDS "
1029                                       "control in delete list.\n",
1030                                       ctdb_hash(&reckey)));
1031                         }
1032
1033                         rec = (struct ctdb_rec_data_old *)(rec->length + (uint8_t *)rec);
1034                 }
1035         }
1036
1037         /*
1038          * Step 2:
1039          * Delete the remaining records locally.
1040          *
1041          * These records have successfully been deleted on all
1042          * active remote nodes.
1043          */
1044
1045         ret = trbt_traversearray32(vdata->delete_list, 1,
1046                                    delete_record_traverse, vdata);
1047         if (ret != 0) {
1048                 DEBUG(DEBUG_ERR, (__location__ " Error traversing the "
1049                       "delete list for deletion.\n"));
1050         }
1051
1052         if (vdata->count.delete_list.left != 0) {
1053                 DEBUG(DEBUG_ERR, (__location__ " Vaccum db[%s] error: "
1054                       "there are %u records left for deletion after "
1055                       "processing delete list\n",
1056                       ctdb_db->db_name,
1057                       (unsigned)vdata->count.delete_list.left));
1058         }
1059
1060         sum = vdata->count.delete_list.deleted
1061             + vdata->count.delete_list.skipped
1062             + vdata->count.delete_list.remote_error
1063             + vdata->count.delete_list.local_error
1064             + vdata->count.delete_list.left;
1065
1066         if (vdata->count.delete_list.total != sum) {
1067                 DEBUG(DEBUG_ERR, (__location__ " Inconsistency in vacuum "
1068                       "delete list counts for db[%s]: total[%u] != sum[%u]\n",
1069                       ctdb_db->db_name,
1070                       (unsigned)vdata->count.delete_list.total,
1071                       (unsigned)sum));
1072         }
1073
1074         if (vdata->count.delete_list.total > 0) {
1075                 DEBUG(DEBUG_INFO,
1076                       (__location__
1077                        " vacuum delete list statistics: "
1078                        "db[%s] "
1079                        "total[%u] "
1080                        "del[%u] "
1081                        "skip[%u] "
1082                        "rem.err[%u] "
1083                        "loc.err[%u] "
1084                        "left[%u]\n",
1085                        ctdb_db->db_name,
1086                        (unsigned)vdata->count.delete_list.total,
1087                        (unsigned)vdata->count.delete_list.deleted,
1088                        (unsigned)vdata->count.delete_list.skipped,
1089                        (unsigned)vdata->count.delete_list.remote_error,
1090                        (unsigned)vdata->count.delete_list.local_error,
1091                        (unsigned)vdata->count.delete_list.left));
1092         }
1093
1094 done:
1095         talloc_free(tmp_ctx);
1096
1097         return;
1098 }
1099
1100 /**
1101  * initialize the vacuum_data
1102  */
1103 static struct vacuum_data *ctdb_vacuum_init_vacuum_data(
1104                                         struct ctdb_db_context *ctdb_db,
1105                                         TALLOC_CTX *mem_ctx)
1106 {
1107         unsigned int i;
1108         struct ctdb_context *ctdb = ctdb_db->ctdb;
1109         struct vacuum_data *vdata;
1110
1111         vdata = talloc_zero(mem_ctx, struct vacuum_data);
1112         if (vdata == NULL) {
1113                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1114                 return NULL;
1115         }
1116
1117         vdata->ctdb = ctdb_db->ctdb;
1118         vdata->ctdb_db = ctdb_db;
1119         vdata->delete_list = trbt_create(vdata, 0);
1120         if (vdata->delete_list == NULL) {
1121                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1122                 goto fail;
1123         }
1124
1125         vdata->start = timeval_current();
1126
1127         vdata->count.delete_queue.added_to_delete_list = 0;
1128         vdata->count.delete_queue.added_to_vacuum_fetch_list = 0;
1129         vdata->count.delete_queue.deleted = 0;
1130         vdata->count.delete_queue.skipped = 0;
1131         vdata->count.delete_queue.error = 0;
1132         vdata->count.delete_queue.total = 0;
1133         vdata->count.db_traverse.scheduled = 0;
1134         vdata->count.db_traverse.skipped = 0;
1135         vdata->count.db_traverse.error = 0;
1136         vdata->count.db_traverse.total = 0;
1137         vdata->count.delete_list.total = 0;
1138         vdata->count.delete_list.left = 0;
1139         vdata->count.delete_list.remote_error = 0;
1140         vdata->count.delete_list.local_error = 0;
1141         vdata->count.delete_list.skipped = 0;
1142         vdata->count.delete_list.deleted = 0;
1143
1144         /* the list needs to be of length num_nodes */
1145         vdata->vacuum_fetch_list = talloc_zero_array(vdata,
1146                                                 struct ctdb_marshall_buffer *,
1147                                                 ctdb->num_nodes);
1148         if (vdata->vacuum_fetch_list == NULL) {
1149                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1150                 goto fail;
1151         }
1152         for (i = 0; i < ctdb->num_nodes; i++) {
1153                 vdata->vacuum_fetch_list[i] = (struct ctdb_marshall_buffer *)
1154                         talloc_zero_size(vdata->vacuum_fetch_list,
1155                                          offsetof(struct ctdb_marshall_buffer, data));
1156                 if (vdata->vacuum_fetch_list[i] == NULL) {
1157                         DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1158                         talloc_free(vdata);
1159                         return NULL;
1160                 }
1161                 vdata->vacuum_fetch_list[i]->db_id = ctdb_db->db_id;
1162         }
1163
1164         return vdata;
1165
1166 fail:
1167         talloc_free(vdata);
1168         return NULL;
1169 }
1170
1171 /**
1172  * Vacuum a DB:
1173  *  - Always do the fast vacuuming run, which traverses
1174  *    - the in-memory fetch queue: these records have been
1175  *      scheduled for migration
1176  *    - the in-memory delete queue: these records have been
1177  *      scheduled for deletion.
1178  *  - Only if explicitly requested, the database is traversed
1179  *    in order to use the traditional heuristics on empty records
1180  *    to trigger deletion.
1181  *    This is done only every VacuumFastPathCount'th vacuuming run.
1182  *
1183  * The traverse runs fill two lists:
1184  *
1185  * - The delete_list:
1186  *   This is the list of empty records the current
1187  *   node is lmaster and dmaster for. These records are later
1188  *   deleted first on other nodes and then locally.
1189  *
1190  *   The fast vacuuming run has a short cut for those records
1191  *   that have never been migrated with data: these records
1192  *   are immediately deleted locally, since they have left
1193  *   no trace on other nodes.
1194  *
1195  * - The vacuum_fetch lists
1196  *   (one for each other lmaster node):
1197  *   The records in this list are sent for deletion to
1198  *   their lmaster in a bulk VACUUM_FETCH control.
1199  *
1200  *   The lmaster then migrates all these records to itelf
1201  *   so that they can be vacuumed there.
1202  *
1203  * This executes in the child context.
1204  */
1205 static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
1206                           bool full_vacuum_run)
1207 {
1208         struct ctdb_context *ctdb = ctdb_db->ctdb;
1209         int ret, pnn;
1210         struct vacuum_data *vdata;
1211         TALLOC_CTX *tmp_ctx;
1212
1213         DEBUG(DEBUG_INFO, (__location__ " Entering %s vacuum run for db "
1214                            "%s db_id[0x%08x]\n",
1215                            full_vacuum_run ? "full" : "fast",
1216                            ctdb_db->db_name, ctdb_db->db_id));
1217
1218         ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
1219         if (ret != 0) {
1220                 DEBUG(DEBUG_ERR, ("Unable to get vnnmap from local node\n"));
1221                 return ret;
1222         }
1223
1224         pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
1225         if (pnn == -1) {
1226                 DEBUG(DEBUG_ERR, ("Unable to get pnn from local node\n"));
1227                 return -1;
1228         }
1229
1230         ctdb->pnn = pnn;
1231
1232         tmp_ctx = talloc_new(ctdb_db);
1233         if (tmp_ctx == NULL) {
1234                 DEBUG(DEBUG_ERR, ("Out of memory!\n"));
1235                 return -1;
1236         }
1237
1238         vdata = ctdb_vacuum_init_vacuum_data(ctdb_db, tmp_ctx);
1239         if (vdata == NULL) {
1240                 talloc_free(tmp_ctx);
1241                 return -1;
1242         }
1243
1244         if (full_vacuum_run) {
1245                 ctdb_vacuum_traverse_db(ctdb_db, vdata);
1246         }
1247
1248         ctdb_process_fetch_queue(ctdb_db);
1249
1250         ctdb_process_delete_queue(ctdb_db, vdata);
1251
1252         ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
1253
1254         ctdb_process_delete_list(ctdb_db, vdata);
1255
1256         talloc_free(tmp_ctx);
1257
1258         return 0;
1259 }
1260
1261 /*
1262  * repack and vaccum a db
1263  * called from the child context
1264  */
1265 static int ctdb_vacuum_and_repack_db(struct ctdb_db_context *ctdb_db,
1266                                      bool full_vacuum_run)
1267 {
1268         uint32_t repack_limit = ctdb_db->ctdb->tunable.repack_limit;
1269         const char *name = ctdb_db->db_name;
1270         int freelist_size = 0;
1271         int ret;
1272
1273         if (ctdb_vacuum_db(ctdb_db, full_vacuum_run) != 0) {
1274                 DEBUG(DEBUG_ERR,(__location__ " Failed to vacuum '%s'\n", name));
1275         }
1276
1277         freelist_size = tdb_freelist_size(ctdb_db->ltdb->tdb);
1278         if (freelist_size == -1) {
1279                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
1280                 return -1;
1281         }
1282
1283         /*
1284          * decide if a repack is necessary
1285          */
1286         if ((repack_limit == 0 || (uint32_t)freelist_size < repack_limit))
1287         {
1288                 return 0;
1289         }
1290
1291         D_NOTICE("Repacking %s with %u freelist entries\n",
1292                  name,
1293                  freelist_size);
1294
1295         ret = tdb_repack(ctdb_db->ltdb->tdb);
1296         if (ret != 0) {
1297                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
1298                 return -1;
1299         }
1300
1301         return 0;
1302 }
1303
1304 static uint32_t get_vacuum_interval(struct ctdb_db_context *ctdb_db)
1305 {
1306         uint32_t interval = ctdb_db->ctdb->tunable.vacuum_interval;
1307
1308         return interval;
1309 }
1310
1311 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
1312 {
1313         double l = timeval_elapsed(&child_ctx->start_time);
1314         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
1315         struct ctdb_context *ctdb = ctdb_db->ctdb;
1316
1317         CTDB_UPDATE_DB_LATENCY(ctdb_db, "vacuum", vacuum.latency, l);
1318         DEBUG(DEBUG_INFO,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
1319
1320         if (child_ctx->child_pid != -1) {
1321                 ctdb_kill(ctdb, child_ctx->child_pid, SIGKILL);
1322         } else {
1323                 /* Bump the number of successful fast-path runs. */
1324                 child_ctx->vacuum_handle->fast_path_count++;
1325         }
1326
1327         ctdb->vacuumer = NULL;
1328
1329         tevent_add_timer(ctdb->ev, child_ctx->vacuum_handle,
1330                          timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1331                          ctdb_vacuum_event, child_ctx->vacuum_handle);
1332
1333         return 0;
1334 }
1335
1336 /*
1337  * this event is generated when a vacuum child process times out
1338  */
1339 static void vacuum_child_timeout(struct tevent_context *ev,
1340                                  struct tevent_timer *te,
1341                                  struct timeval t, void *private_data)
1342 {
1343         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
1344
1345         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
1346
1347         child_ctx->status = VACUUM_TIMEOUT;
1348
1349         talloc_free(child_ctx);
1350 }
1351
1352
1353 /*
1354  * this event is generated when a vacuum child process has completed
1355  */
1356 static void vacuum_child_handler(struct tevent_context *ev,
1357                                  struct tevent_fd *fde,
1358                                  uint16_t flags, void *private_data)
1359 {
1360         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
1361         char c = 0;
1362         int ret;
1363
1364         DEBUG(DEBUG_INFO,("Vacuuming child process %d finished for db %s\n", child_ctx->child_pid, child_ctx->vacuum_handle->ctdb_db->db_name));
1365         child_ctx->child_pid = -1;
1366
1367         ret = sys_read(child_ctx->fd[0], &c, 1);
1368         if (ret != 1 || c != 0) {
1369                 child_ctx->status = VACUUM_ERROR;
1370                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
1371         } else {
1372                 child_ctx->status = VACUUM_OK;
1373         }
1374
1375         talloc_free(child_ctx);
1376 }
1377
1378 /*
1379  * this event is called every time we need to start a new vacuum process
1380  */
1381 static int vacuum_db_child(TALLOC_CTX *mem_ctx,
1382                            struct ctdb_db_context *ctdb_db,
1383                            bool full_vacuum_run,
1384                            struct ctdb_vacuum_child_context **out)
1385 {
1386         struct ctdb_context *ctdb = ctdb_db->ctdb;
1387         struct ctdb_vacuum_child_context *child_ctx;
1388         struct tevent_fd *fde;
1389         int ret;
1390
1391         /* we don't vacuum if we are in recovery mode, or db frozen */
1392         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ||
1393             ctdb_db_frozen(ctdb_db)) {
1394                 D_INFO("Not vacuuming %s (%s)\n", ctdb_db->db_name,
1395                        ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE ?
1396                        "in recovery" : "frozen");
1397                 return EAGAIN;
1398         }
1399
1400         /* Do not allow multiple vacuuming child processes to be active at the
1401          * same time.  If there is vacuuming child process active, delay
1402          * new vacuuming event to stagger vacuuming events.
1403          */
1404         if (ctdb->vacuumer != NULL) {
1405                 return EBUSY;
1406         }
1407
1408         child_ctx = talloc_zero(mem_ctx, struct ctdb_vacuum_child_context);
1409         if (child_ctx == NULL) {
1410                 DBG_ERR("Failed to allocate child context for vacuuming of %s\n",
1411                         ctdb_db->db_name);
1412                 return ENOMEM;
1413         }
1414
1415
1416         ret = pipe(child_ctx->fd);
1417         if (ret != 0) {
1418                 talloc_free(child_ctx);
1419                 D_ERR("Failed to create pipe for vacuum child process.\n");
1420                 return EAGAIN;
1421         }
1422
1423         child_ctx->child_pid = ctdb_fork(ctdb);
1424         if (child_ctx->child_pid == (pid_t)-1) {
1425                 close(child_ctx->fd[0]);
1426                 close(child_ctx->fd[1]);
1427                 talloc_free(child_ctx);
1428                 D_ERR("Failed to fork vacuum child process.\n");
1429                 return EAGAIN;
1430         }
1431
1432
1433         if (child_ctx->child_pid == 0) {
1434                 char cc = 0;
1435                 close(child_ctx->fd[0]);
1436
1437                 D_INFO("Vacuuming child process %d for db %s started\n",
1438                        getpid(),
1439                        ctdb_db->db_name);
1440                 prctl_set_comment("ctdb_vacuum");
1441                 ret = switch_from_server_to_client(ctdb);
1442                 if (ret != 0) {
1443                         DBG_ERR("ERROR: failed to switch vacuum daemon "
1444                                 "into client mode.\n");
1445                         return EIO;
1446                 }
1447
1448                 cc = ctdb_vacuum_and_repack_db(ctdb_db, full_vacuum_run);
1449
1450                 sys_write(child_ctx->fd[1], &cc, 1);
1451                 _exit(0);
1452         }
1453
1454         set_close_on_exec(child_ctx->fd[0]);
1455         close(child_ctx->fd[1]);
1456
1457         child_ctx->status = VACUUM_RUNNING;
1458         child_ctx->start_time = timeval_current();
1459
1460         ctdb->vacuumer = child_ctx;
1461         talloc_set_destructor(child_ctx, vacuum_child_destructor);
1462
1463         /*
1464          * Clear the fastpath vacuuming list in the parent.
1465          */
1466         talloc_free(ctdb_db->delete_queue);
1467         ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
1468         if (ctdb_db->delete_queue == NULL) {
1469                 DBG_ERR("Out of memory when re-creating vacuum tree\n");
1470                 return ENOMEM;
1471         }
1472
1473         talloc_free(ctdb_db->fetch_queue);
1474         ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
1475         if (ctdb_db->fetch_queue == NULL) {
1476                 ctdb_fatal(ctdb, "Out of memory when re-create fetch queue "
1477                                  " in parent context. Shutting down\n");
1478         }
1479
1480         tevent_add_timer(ctdb->ev, child_ctx,
1481                          timeval_current_ofs(ctdb->tunable.vacuum_max_run_time,
1482                                              0),
1483                          vacuum_child_timeout, child_ctx);
1484
1485         DBG_DEBUG(" Created PIPE FD:%d to child vacuum process\n",
1486                   child_ctx->fd[0]);
1487
1488         fde = tevent_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
1489                             TEVENT_FD_READ, vacuum_child_handler, child_ctx);
1490         tevent_fd_set_auto_close(fde);
1491
1492         child_ctx->vacuum_handle = ctdb_db->vacuum_handle;
1493
1494         *out = child_ctx;
1495         return 0;
1496 }
1497
1498 static void ctdb_vacuum_event(struct tevent_context *ev,
1499                               struct tevent_timer *te,
1500                               struct timeval t, void *private_data)
1501 {
1502         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(
1503                 private_data, struct ctdb_vacuum_handle);
1504         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
1505         struct ctdb_context *ctdb = ctdb_db->ctdb;
1506         struct ctdb_vacuum_child_context *child_ctx = NULL;
1507         uint32_t fast_path_max = ctdb->tunable.vacuum_fast_path_count;
1508         bool full_vacuum_run = false;
1509         int ret;
1510
1511         if (vacuum_handle->fast_path_count >= fast_path_max) {
1512                 if (fast_path_max > 0) {
1513                         full_vacuum_run = true;
1514                 }
1515                 vacuum_handle->fast_path_count = 0;
1516         }
1517
1518         ret = vacuum_db_child(vacuum_handle,
1519                               ctdb_db,
1520                               full_vacuum_run,
1521                               &child_ctx);
1522
1523         if (ret == 0) {
1524                 return;
1525         }
1526
1527         switch (ret) {
1528         case EBUSY:
1529                 /* Stagger */
1530                 tevent_add_timer(ctdb->ev,
1531                                  vacuum_handle,
1532                                  timeval_current_ofs(0, 500*1000),
1533                                  ctdb_vacuum_event,
1534                                  vacuum_handle);
1535                 break;
1536
1537         default:
1538                 /* Temporary failure, schedule next attempt */
1539                 tevent_add_timer(ctdb->ev,
1540                                  vacuum_handle,
1541                                  timeval_current_ofs(
1542                                          get_vacuum_interval(ctdb_db), 0),
1543                                  ctdb_vacuum_event,
1544                                  vacuum_handle);
1545         }
1546
1547 }
1548
1549 void ctdb_stop_vacuuming(struct ctdb_context *ctdb)
1550 {
1551         if (ctdb->vacuumer != NULL) {
1552                 D_INFO("Aborting vacuuming for %s (%i)\n",
1553                        ctdb->vacuumer->vacuum_handle->ctdb_db->db_name,
1554                        (int)ctdb->vacuumer->child_pid);
1555                 /* vacuum_child_destructor kills it, removes from list */
1556                 talloc_free(ctdb->vacuumer);
1557         }
1558 }
1559
1560 /* this function initializes the vacuuming context for a database
1561  * starts the vacuuming events
1562  */
1563 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
1564 {
1565         if (! ctdb_db_volatile(ctdb_db)) {
1566                 DEBUG(DEBUG_ERR,
1567                       ("Vacuuming is disabled for non-volatile database %s\n",
1568                        ctdb_db->db_name));
1569                 return 0;
1570         }
1571
1572         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
1573         CTDB_NO_MEMORY(ctdb_db->ctdb, ctdb_db->vacuum_handle);
1574
1575         ctdb_db->vacuum_handle->ctdb_db         = ctdb_db;
1576         ctdb_db->vacuum_handle->fast_path_count = 0;
1577
1578         tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->vacuum_handle,
1579                          timeval_current_ofs(get_vacuum_interval(ctdb_db), 0),
1580                          ctdb_vacuum_event, ctdb_db->vacuum_handle);
1581
1582         return 0;
1583 }
1584
1585 static void remove_record_from_delete_queue(struct ctdb_db_context *ctdb_db,
1586                                             const struct ctdb_ltdb_header *hdr,
1587                                             const TDB_DATA key)
1588 {
1589         struct delete_record_data *kd;
1590         uint32_t hash;
1591
1592         hash = (uint32_t)ctdb_hash(&key);
1593
1594         DEBUG(DEBUG_DEBUG, (__location__
1595                             " remove_record_from_delete_queue: "
1596                             "db[%s] "
1597                             "db_id[0x%08x] "
1598                             "key_hash[0x%08x] "
1599                             "lmaster[%u] "
1600                             "migrated_with_data[%s]\n",
1601                              ctdb_db->db_name, ctdb_db->db_id,
1602                              hash,
1603                              ctdb_lmaster(ctdb_db->ctdb, &key),
1604                              hdr->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
1605
1606         kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
1607         if (kd == NULL) {
1608                 DEBUG(DEBUG_DEBUG, (__location__
1609                                     " remove_record_from_delete_queue: "
1610                                     "record not in queue (hash[0x%08x])\n.",
1611                                     hash));
1612                 return;
1613         }
1614
1615         if ((kd->key.dsize != key.dsize) ||
1616             (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
1617         {
1618                 DEBUG(DEBUG_DEBUG, (__location__
1619                                     " remove_record_from_delete_queue: "
1620                                     "hash collision for key with hash[0x%08x] "
1621                                     "in db[%s] - skipping\n",
1622                                     hash, ctdb_db->db_name));
1623                 return;
1624         }
1625
1626         DEBUG(DEBUG_DEBUG, (__location__
1627                             " remove_record_from_delete_queue: "
1628                             "removing key with hash[0x%08x]\n",
1629                              hash));
1630
1631         talloc_free(kd);
1632
1633         return;
1634 }
1635
1636 /**
1637  * Insert a record into the ctdb_db context's delete queue,
1638  * handling hash collisions.
1639  */
1640 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
1641                                            const struct ctdb_ltdb_header *hdr,
1642                                            TDB_DATA key)
1643 {
1644         struct delete_record_data *kd;
1645         uint32_t hash;
1646         int ret;
1647
1648         hash = (uint32_t)ctdb_hash(&key);
1649
1650         DEBUG(DEBUG_DEBUG, (__location__ " schedule for deletion: db[%s] "
1651                             "db_id[0x%08x] "
1652                             "key_hash[0x%08x] "
1653                             "lmaster[%u] "
1654                             "migrated_with_data[%s]\n",
1655                             ctdb_db->db_name, ctdb_db->db_id,
1656                             hash,
1657                             ctdb_lmaster(ctdb_db->ctdb, &key),
1658                             hdr->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
1659
1660         kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
1661         if (kd != NULL) {
1662                 if ((kd->key.dsize != key.dsize) ||
1663                     (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
1664                 {
1665                         DEBUG(DEBUG_INFO,
1666                               (__location__ " schedule for deletion: "
1667                                "hash collision for key hash [0x%08x]. "
1668                                "Skipping the record.\n", hash));
1669                         return 0;
1670                 } else {
1671                         DEBUG(DEBUG_DEBUG,
1672                               (__location__ " schedule for deletion: "
1673                                "updating entry for key with hash [0x%08x].\n",
1674                                hash));
1675                 }
1676         }
1677
1678         ret = insert_delete_record_data_into_tree(ctdb_db->ctdb, ctdb_db,
1679                                                   ctdb_db->delete_queue,
1680                                                   hdr, key);
1681         if (ret != 0) {
1682                 DEBUG(DEBUG_INFO,
1683                       (__location__ " schedule for deletion: error "
1684                        "inserting key with hash [0x%08x] into delete queue\n",
1685                        hash));
1686                 return -1;
1687         }
1688
1689         return 0;
1690 }
1691
1692 /**
1693  * Schedule a record for deletetion.
1694  * Called from the parent context.
1695  */
1696 int32_t ctdb_control_schedule_for_deletion(struct ctdb_context *ctdb,
1697                                            TDB_DATA indata)
1698 {
1699         struct ctdb_control_schedule_for_deletion *dd;
1700         struct ctdb_db_context *ctdb_db;
1701         int ret;
1702         TDB_DATA key;
1703
1704         dd = (struct ctdb_control_schedule_for_deletion *)indata.dptr;
1705
1706         ctdb_db = find_ctdb_db(ctdb, dd->db_id);
1707         if (ctdb_db == NULL) {
1708                 DEBUG(DEBUG_ERR, (__location__ " Unknown db id 0x%08x\n",
1709                                   dd->db_id));
1710                 return -1;
1711         }
1712
1713         key.dsize = dd->keylen;
1714         key.dptr = dd->key;
1715
1716         ret = insert_record_into_delete_queue(ctdb_db, &dd->hdr, key);
1717
1718         return ret;
1719 }
1720
1721 int32_t ctdb_local_schedule_for_deletion(struct ctdb_db_context *ctdb_db,
1722                                          const struct ctdb_ltdb_header *hdr,
1723                                          TDB_DATA key)
1724 {
1725         int ret;
1726         struct ctdb_control_schedule_for_deletion *dd;
1727         TDB_DATA indata;
1728         int32_t status;
1729
1730         if (ctdb_db->ctdb->ctdbd_pid == getpid()) {
1731                 /* main daemon - directly queue */
1732                 ret = insert_record_into_delete_queue(ctdb_db, hdr, key);
1733
1734                 return ret;
1735         }
1736
1737         /* if we don't have a connection to the daemon we can not send
1738            a control. For example sometimes from update_record control child
1739            process.
1740         */
1741         if (!ctdb_db->ctdb->can_send_controls) {
1742                 return -1;
1743         }
1744
1745
1746         /* child process: send the main daemon a control */
1747         indata.dsize = offsetof(struct ctdb_control_schedule_for_deletion, key) + key.dsize;
1748         indata.dptr = talloc_zero_array(ctdb_db, uint8_t, indata.dsize);
1749         if (indata.dptr == NULL) {
1750                 DEBUG(DEBUG_ERR, (__location__ " out of memory\n"));
1751                 return -1;
1752         }
1753         dd = (struct ctdb_control_schedule_for_deletion *)(void *)indata.dptr;
1754         dd->db_id = ctdb_db->db_id;
1755         dd->hdr = *hdr;
1756         dd->keylen = key.dsize;
1757         memcpy(dd->key, key.dptr, key.dsize);
1758
1759         ret = ctdb_control(ctdb_db->ctdb,
1760                            CTDB_CURRENT_NODE,
1761                            ctdb_db->db_id,
1762                            CTDB_CONTROL_SCHEDULE_FOR_DELETION,
1763                            CTDB_CTRL_FLAG_NOREPLY, /* flags */
1764                            indata,
1765                            NULL, /* mem_ctx */
1766                            NULL, /* outdata */
1767                            &status,
1768                            NULL, /* timeout : NULL == wait forever */
1769                            NULL); /* error message */
1770
1771         talloc_free(indata.dptr);
1772
1773         if (ret != 0 || status != 0) {
1774                 DEBUG(DEBUG_ERR, (__location__ " Error sending "
1775                                   "SCHEDULE_FOR_DELETION "
1776                                   "control.\n"));
1777                 if (status != 0) {
1778                         ret = -1;
1779                 }
1780         }
1781
1782         return ret;
1783 }
1784
1785 void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,
1786                                          const struct ctdb_ltdb_header *hdr,
1787                                          const TDB_DATA key)
1788 {
1789         if (ctdb_db->ctdb->ctdbd_pid != getpid()) {
1790                 /*
1791                  * Only remove the record from the delete queue if called
1792                  * in the main daemon.
1793                  */
1794                 return;
1795         }
1796
1797         remove_record_from_delete_queue(ctdb_db, hdr, key);
1798
1799         return;
1800 }
1801
1802 static int vacuum_fetch_parser(uint32_t reqid,
1803                                struct ctdb_ltdb_header *header,
1804                                TDB_DATA key, TDB_DATA data,
1805                                void *private_data)
1806 {
1807         struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1808                 private_data, struct ctdb_db_context);
1809         struct fetch_record_data *rd;
1810         size_t len;
1811         uint32_t hash;
1812
1813         len = offsetof(struct fetch_record_data, keydata) + key.dsize;
1814
1815         rd = (struct fetch_record_data *)talloc_size(ctdb_db->fetch_queue,
1816                                                      len);
1817         if (rd == NULL) {
1818                 DEBUG(DEBUG_ERR, (__location__ " Memory error\n"));
1819                 return -1;
1820         }
1821         talloc_set_name_const(rd, "struct fetch_record_data");
1822
1823         rd->key.dsize = key.dsize;
1824         rd->key.dptr = rd->keydata;
1825         memcpy(rd->keydata, key.dptr, key.dsize);
1826
1827         hash = ctdb_hash(&key);
1828
1829         trbt_insert32(ctdb_db->fetch_queue, hash, rd);
1830
1831         return 0;
1832 }
1833
1834 int32_t ctdb_control_vacuum_fetch(struct ctdb_context *ctdb, TDB_DATA indata)
1835 {
1836         struct ctdb_rec_buffer *recbuf;
1837         struct ctdb_db_context *ctdb_db;
1838         size_t npull;
1839         int ret;
1840
1841         ret = ctdb_rec_buffer_pull(indata.dptr, indata.dsize, ctdb, &recbuf,
1842                                    &npull);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR, ("Invalid data in vacuum_fetch\n"));
1845                 return -1;
1846         }
1847
1848         ctdb_db = find_ctdb_db(ctdb, recbuf->db_id);
1849         if (ctdb_db == NULL) {
1850                 talloc_free(recbuf);
1851                 DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
1852                                   recbuf->db_id));
1853                 return -1;
1854         }
1855
1856         ret = ctdb_rec_buffer_traverse(recbuf, vacuum_fetch_parser, ctdb_db);
1857         talloc_free(recbuf);
1858         return ret;
1859 }