added event repacking
[ctdb.git] / server / ctdb_vacuum.c
1 /*
2    ctdb vacuuming events
3
4    Copyright (C) Ronnie Sahlberg  2009
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "../include/ctdb_private.h"
27 #include "db_wrap.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/events/events.h"
30 #include "../include/ctdb_private.h"
31
32
33 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
34
35 struct ctdb_vacuum_child_context {
36         struct ctdb_vacuum_handle *vacuum_handle;
37         int fd[2];
38         pid_t child_pid;
39         enum vacuum_child_status status;
40         struct timeval start_time;
41 };
42
43 struct ctdb_vacuum_handle {
44         struct ctdb_db_context *ctdb_db;
45         struct ctdb_vacuum_child_context *child_ctx;
46 };
47
48
49 static void ctdb_vacuum_event(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
50
51 struct traverse_state {
52         bool error;
53         struct tdb_context *dest_db;
54 };
55
56 /*
57   traverse function for repacking
58  */
59 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
60 {
61         struct traverse_state *state = (struct traverse_state *)private;
62         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
63                 state->error = true;
64                 return -1;
65         }
66         return 0;
67 }
68
69 /*
70   repack a tdb
71  */
72 static int ctdb_repack_tdb(struct tdb_context *tdb, TALLOC_CTX *mem_ctx)
73 {
74         struct tdb_context *tmp_db;
75         struct traverse_state *state;
76
77         state = talloc(mem_ctx, struct traverse_state);
78         if (!state) {
79                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
80                 return -1;
81         }
82
83         if (tdb_transaction_start(tdb) != 0) {
84                 DEBUG(DEBUG_ERR,(__location__ " Failed to start transaction\n"));
85                 return -1;
86         }
87
88         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
89         if (tmp_db == NULL) {
90                 DEBUG(DEBUG_ERR,(__location__ " Failed to create tmp_db\n"));
91                 tdb_transaction_cancel(tdb);
92                 return -1;
93         }
94
95         state->error = false;
96         state->dest_db = tmp_db;
97
98         if (tdb_traverse_read(tdb, repack_traverse, state) == -1) {
99                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying out\n"));
100                 tdb_transaction_cancel(tdb);
101                 tdb_close(tmp_db);
102                 return -1;              
103         }
104
105         if (state->error) {
106                 DEBUG(DEBUG_ERR,(__location__ " Error during traversal\n"));
107                 tdb_transaction_cancel(tdb);
108                 tdb_close(tmp_db);
109                 return -1;
110         }
111
112         if (tdb_wipe_all(tdb) != 0) {
113                 DEBUG(DEBUG_ERR,(__location__ " Failed to wipe database\n"));
114                 tdb_transaction_cancel(tdb);
115                 tdb_close(tmp_db);
116                 return -1;
117         }
118
119         state->error = false;
120         state->dest_db = tdb;
121
122         if (tdb_traverse_read(tmp_db, repack_traverse, state) == -1) {
123                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse copying back\n"));
124                 tdb_transaction_cancel(tdb);
125                 tdb_close(tmp_db);
126                 return -1;              
127         }
128
129         if (state->error) {
130                 DEBUG(DEBUG_ERR,(__location__ " Error during second traversal\n"));
131                 tdb_transaction_cancel(tdb);
132                 tdb_close(tmp_db);
133                 return -1;
134         }
135
136         tdb_close(tmp_db);
137
138         if (tdb_transaction_commit(tdb) != 0) {
139                 DEBUG(DEBUG_ERR,(__location__ " Failed to commit\n"));
140                 return -1;
141         }
142
143         return 0;
144 }
145
146
147 static int ctdb_repack_db(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx)
148 {
149         uint32_t repack_limit = 10000;   /* should be made tunable */
150         const char *name = ctdb_db->db_name;
151         int size = tdb_freelist_size(ctdb_db->ltdb->tdb);
152
153         if (size == -1) {
154                 DEBUG(DEBUG_ERR,(__location__ " Failed to get freelist size for '%s'\n", name));
155                 return -1;
156         }
157
158         if (size <= repack_limit) {
159                 return 0;
160         }
161
162         DEBUG(DEBUG_ERR,("Repacking %s with %u freelist entries\n", name, size));
163
164         if (ctdb_repack_tdb(ctdb_db->ltdb->tdb, mem_ctx) != 0) {
165                 DEBUG(DEBUG_ERR,(__location__ " Failed to repack '%s'\n", name));
166                 return -1;
167         }
168
169         return 0;
170 }
171
172 static int vacuum_child_destructor(struct ctdb_vacuum_child_context *child_ctx)
173 {
174         double l = timeval_elapsed(&child_ctx->start_time);
175         struct ctdb_db_context *ctdb_db = child_ctx->vacuum_handle->ctdb_db;
176         struct ctdb_context *ctdb = ctdb_db->ctdb;
177
178         DEBUG(DEBUG_ERR,("Vacuuming took %.3f seconds for database %s\n", l, ctdb_db->db_name));
179
180         if (child_ctx->child_pid != -1) {
181                 kill(child_ctx->child_pid, SIGKILL);
182         }
183
184         /* here calculate a new interval */
185         /* child_ctx->status */
186
187         DEBUG(DEBUG_ERR, ("Start new vacuum event for %s\n", ctdb_db->db_name));
188
189         event_add_timed(ctdb->ev, child_ctx->vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, child_ctx->vacuum_handle);
190
191         return 0;
192 }
193
194 /*
195  * this event is generated when a vacuum child process times out
196  */
197 static void vacuum_child_timeout(struct event_context *ev, struct timed_event *te,
198                                          struct timeval t, void *private_data)
199 {
200         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
201
202         DEBUG(DEBUG_ERR,("Vacuuming child process timed out for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
203
204         child_ctx->status = VACUUM_TIMEOUT;
205
206         talloc_free(child_ctx);
207 }
208
209
210 /*
211  * this event is generated when a vacuum child process has completed
212  */
213 static void vacuum_child_handler(struct event_context *ev, struct fd_event *fde,
214                              uint16_t flags, void *private_data)
215 {
216         struct ctdb_vacuum_child_context *child_ctx = talloc_get_type(private_data, struct ctdb_vacuum_child_context);
217         char c = 0;
218         int ret;
219
220         DEBUG(DEBUG_ERR,("Vacuuming child finished for db %s\n", child_ctx->vacuum_handle->ctdb_db->db_name));
221
222         child_ctx->child_pid = -1;
223
224         ret = read(child_ctx->fd[0], &c, 1);
225         if (ret != 1 || c != 0) {
226                 child_ctx->status = VACUUM_ERROR;
227                 DEBUG(DEBUG_ERR, ("A vacuum child process failed with an error for database %s. ret=%d c=%d\n", child_ctx->vacuum_handle->ctdb_db->db_name, ret, c));
228         } else {
229                 child_ctx->status = VACUUM_OK;
230         }
231
232         talloc_free(child_ctx);
233 }
234
235 /*
236  * this event is called every time we need to start a new vacuum process
237  */
238 static void
239 ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
240                                struct timeval t, void *private_data)
241 {
242         struct ctdb_vacuum_handle *vacuum_handle = talloc_get_type(private_data, struct ctdb_vacuum_handle);
243         struct ctdb_db_context *ctdb_db = vacuum_handle->ctdb_db;
244         struct ctdb_context *ctdb = ctdb_db->ctdb;
245         struct ctdb_vacuum_child_context *child_ctx;
246         int ret;
247
248         DEBUG(DEBUG_ERR,("Start a vacuuming child process for db %s\n", ctdb_db->db_name));
249
250         /* we dont vacuum if we are in recovery mode */
251         if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
252                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
253                 return;
254         }
255
256
257         child_ctx = talloc(vacuum_handle, struct ctdb_vacuum_child_context);
258         if (child_ctx == NULL) {
259                 DEBUG(DEBUG_CRIT, (__location__ " Failed to allocate child context for vacuuming of %s\n", ctdb_db->db_name));
260                 ctdb_fatal(ctdb, "Out of memory when crating vacuum child context. Shutting down\n");
261         }
262
263
264         ret = pipe(child_ctx->fd);
265         if (ret != 0) {
266                 talloc_free(child_ctx);
267                 DEBUG(DEBUG_ERR, ("Failed to create pipe for vacuum child process.\n"));
268                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
269                 return;
270         }
271
272         child_ctx->child_pid = fork();
273         if (child_ctx->child_pid == (pid_t)-1) {
274                 close(child_ctx->fd[0]);
275                 close(child_ctx->fd[1]);
276                 talloc_free(child_ctx);
277                 DEBUG(DEBUG_ERR, ("Failed to fork vacuum child process.\n"));
278                 event_add_timed(ctdb->ev, vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, vacuum_handle);
279                 return;
280         }
281
282
283         if (child_ctx->child_pid == 0) {
284                 char cc = 0;
285                 close(child_ctx->fd[0]);
286
287                 /* 
288                  * repack the db; next patch will include vacuuming here
289                  */
290                 cc = ctdb_repack_db(ctdb_db, child_ctx);
291
292                 write(child_ctx->fd[1], &cc, 1);
293                 _exit(0);
294         }
295
296         set_close_on_exec(child_ctx->fd[0]);
297         close(child_ctx->fd[1]);
298
299         child_ctx->status = VACUUM_RUNNING;
300         child_ctx->start_time = timeval_current();
301
302         talloc_set_destructor(child_ctx, vacuum_child_destructor);
303
304         event_add_timed(ctdb->ev, child_ctx,
305                 timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
306                 vacuum_child_timeout, child_ctx);
307
308         event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
309                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
310                 vacuum_child_handler,
311                 child_ctx);
312
313         vacuum_handle->child_ctx = child_ctx;
314         child_ctx->vacuum_handle = vacuum_handle;
315 }
316
317
318 /* this function initializes the vacuuming context for a database
319  * starts the vacuuming events
320  */
321 int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
322 {
323         struct ctdb_context *ctdb = ctdb_db->ctdb;
324
325         DEBUG(DEBUG_ERR,("Start vacuuming process for database %s\n", ctdb_db->db_name));
326
327         ctdb_db->vacuum_handle = talloc(ctdb_db, struct ctdb_vacuum_handle);
328         CTDB_NO_MEMORY(ctdb, ctdb_db->vacuum_handle);
329
330         ctdb_db->vacuum_handle->ctdb_db = ctdb_db;
331
332         event_add_timed(ctdb->ev, ctdb_db->vacuum_handle, timeval_current_ofs(ctdb->tunable.vacuum_default_interval, 0), ctdb_vacuum_event, ctdb_db->vacuum_handle);
333
334         return 0;
335 }