Merge commit 'origin/master' into martins
[sahlberg/ctdb.git] / server / ctdb_traverse.c
1 /* 
2    efficient async ctdb traverse
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "db_wrap.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27
28 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
29
30 /*
31   handle returned to caller - freeing this handler will kill the child and 
32   terminate the traverse
33  */
34 struct ctdb_traverse_local_handle {
35         struct ctdb_db_context *ctdb_db;
36         int fd[2];
37         pid_t child;
38         void *private_data;
39         ctdb_traverse_fn_t callback;
40         struct timeval start_time;
41         struct ctdb_queue *queue;
42 };
43
44 /*
45   called when data is available from the child
46  */
47 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
48 {
49         struct ctdb_traverse_local_handle *h = talloc_get_type(private_data, 
50                                                                struct ctdb_traverse_local_handle);
51         TDB_DATA key, data;
52         ctdb_traverse_fn_t callback = h->callback;
53         void *p = h->private_data;
54         struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
55
56         if (rawdata == NULL || length < 4 || length != tdata->length) {
57                 /* end of traverse */
58                 talloc_free(h);
59                 callback(p, tdb_null, tdb_null);
60                 return;
61         }
62
63         key.dsize = tdata->keylen;
64         key.dptr  = &tdata->data[0];
65         data.dsize = tdata->datalen;
66         data.dptr = &tdata->data[tdata->keylen];
67
68         callback(p, key, data); 
69 }
70
71 /*
72   destroy a in-flight traverse operation
73  */
74 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
75 {
76         kill(h->child, SIGKILL);
77         return 0;
78 }
79
80 /*
81   callback from tdb_traverse_read()
82  */
83 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
84 {
85         struct ctdb_traverse_local_handle *h = talloc_get_type(p, 
86                                                                struct ctdb_traverse_local_handle);
87         struct ctdb_rec_data *d;
88         struct ctdb_ltdb_header *hdr;
89
90         /* filter out non-authoritative and zero-length records */
91         hdr = (struct ctdb_ltdb_header *)data.dptr;
92         if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
93             hdr->dmaster != h->ctdb_db->ctdb->pnn) {
94                 return 0;
95         }
96
97         d = ctdb_marshall_record(h, 0, key, NULL, data);
98         if (d == NULL) {
99                 /* error handling is tricky in this child code .... */
100                 return -1;
101         }
102
103         if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
104                 return -1;
105         }
106         return 0;
107 }
108
109
110 /*
111   setup a non-blocking traverse of a local ltdb. The callback function
112   will be called on every record in the local ltdb. To stop the
113   travserse, talloc_free() the travserse_handle.
114
115   The traverse is finished when the callback is called with tdb_null for key and data
116  */
117 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
118                                                               ctdb_traverse_fn_t callback,
119                                                               void *private_data)
120 {
121         struct ctdb_traverse_local_handle *h;
122         int ret;
123
124         h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
125         if (h == NULL) {
126                 return NULL;
127         }
128
129         ret = pipe(h->fd);
130
131         if (ret != 0) {
132                 talloc_free(h);
133                 return NULL;
134         }
135
136         h->child = fork();
137
138         if (h->child == (pid_t)-1) {
139                 close(h->fd[0]);
140                 close(h->fd[1]);
141                 talloc_free(h);
142                 return NULL;
143         }
144
145         h->callback = callback;
146         h->private_data = private_data;
147         h->ctdb_db = ctdb_db;
148
149         if (h->child == 0) {
150                 /* start the traverse in the child */
151                 close(h->fd[0]);
152                 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
153                 _exit(0);
154         }
155
156         close(h->fd[1]);
157         talloc_set_destructor(h, traverse_local_destructor);
158
159         /*
160           setup a packet queue between the child and the parent. This
161           copes with all the async and packet boundary issues
162          */
163         h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
164         if (h->queue == NULL) {
165                 talloc_free(h);
166                 return NULL;
167         }
168
169         h->start_time = timeval_current();
170
171         return h;
172 }
173
174
175 struct ctdb_traverse_all_handle {
176         struct ctdb_context *ctdb;
177         uint32_t reqid;
178         ctdb_traverse_fn_t callback;
179         void *private_data;
180         uint32_t null_count;
181 };
182
183 /*
184   destroy a traverse_all op
185  */
186 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
187 {
188         ctdb_reqid_remove(state->ctdb, state->reqid);
189         return 0;
190 }
191
192 struct ctdb_traverse_all {
193         uint32_t db_id;
194         uint32_t reqid;
195         uint32_t pnn;
196 };
197
198 /* called when a traverse times out */
199 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te, 
200                                       struct timeval t, void *private_data)
201 {
202         struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
203
204         state->ctdb->statistics.timeouts.traverse++;
205
206         state->callback(state->private_data, tdb_null, tdb_null);
207         talloc_free(state);
208 }
209
210 /*
211   setup a cluster-wide non-blocking traverse of a ctdb. The
212   callback function will be called on every record in the local
213   ltdb. To stop the travserse, talloc_free() the traverse_handle.
214
215   The traverse is finished when the callback is called with tdb_null
216   for key and data
217  */
218 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
219                                                                  ctdb_traverse_fn_t callback,
220                                                                  void *private_data)
221 {
222         struct ctdb_traverse_all_handle *state;
223         struct ctdb_context *ctdb = ctdb_db->ctdb;
224         int ret;
225         TDB_DATA data;
226         struct ctdb_traverse_all r;
227
228         state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
229         if (state == NULL) {
230                 return NULL;
231         }
232
233         state->ctdb = ctdb;
234         state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
235         state->callback = callback;
236         state->private_data = private_data;
237         state->null_count = 0;
238         
239         talloc_set_destructor(state, ctdb_traverse_all_destructor);
240
241         r.db_id = ctdb_db->db_id;
242         r.reqid = state->reqid;
243         r.pnn   = ctdb->pnn;
244
245         data.dptr = (uint8_t *)&r;
246         data.dsize = sizeof(r);
247
248         /* tell all the nodes in the cluster to start sending records to this node */
249         ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0, 
250                                        CTDB_CONTROL_TRAVERSE_ALL,
251                                        0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
252         if (ret != 0) {
253                 talloc_free(state);
254                 return NULL;
255         }
256
257         /* timeout the traverse */
258         event_add_timed(ctdb->ev, state, 
259                         timeval_current_ofs(ctdb->tunable.traverse_timeout, 0), 
260                         ctdb_traverse_all_timeout, state);
261
262         return state;
263 }
264
265 struct traverse_all_state {
266         struct ctdb_context *ctdb;
267         struct ctdb_traverse_local_handle *h;
268         uint32_t reqid;
269         uint32_t srcnode;
270 };
271
272 /*
273   called for each record during a traverse all 
274  */
275 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
276 {
277         struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
278         int ret;
279         struct ctdb_rec_data *d;
280         TDB_DATA cdata;
281
282         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
283         if (d == NULL) {
284                 /* darn .... */
285                 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
286                 return;
287         }
288
289         cdata.dptr = (uint8_t *)d;
290         cdata.dsize = d->length;
291
292         ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
293                                        0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
294         if (ret != 0) {
295                 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
296         }
297
298         if (key.dsize == 0 && data.dsize == 0) {
299                 /* we're done */
300                 talloc_free(state);
301         }
302 }
303
304 /*
305   called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
306   setup a traverse of our local ltdb, sending the records as
307   CTDB_CONTROL_TRAVERSE_DATA records back to the originator
308  */
309 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
310 {
311         struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
312         struct traverse_all_state *state;
313         struct ctdb_db_context *ctdb_db;
314
315         if (data.dsize != sizeof(struct ctdb_traverse_all)) {
316                 DEBUG(DEBUG_ERR,("Invalid size in ctdb_control_traverse_all\n"));
317                 return -1;
318         }
319
320         ctdb_db = find_ctdb_db(ctdb, c->db_id);
321         if (ctdb_db == NULL) {
322                 return -1;
323         }
324
325         state = talloc(ctdb_db, struct traverse_all_state);
326         if (state == NULL) {
327                 return -1;
328         }
329
330         state->reqid = c->reqid;
331         state->srcnode = c->pnn;
332         state->ctdb = ctdb;
333
334         state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
335         if (state->h == NULL) {
336                 talloc_free(state);
337                 return -1;
338         }
339
340         return 0;
341 }
342
343
344 /*
345   called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
346   call the traverse_all callback with the record
347  */
348 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
349 {
350         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
351         struct ctdb_traverse_all_handle *state;
352         TDB_DATA key;
353         ctdb_traverse_fn_t callback;
354         void *private_data;
355
356         if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
357                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
358                 return -1;
359         }
360
361         state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
362         if (state == NULL || d->reqid != state->reqid) {
363                 /* traverse might have been terminated already */
364                 return -1;
365         }
366
367         key.dsize = d->keylen;
368         key.dptr  = &d->data[0];
369         data.dsize = d->datalen;
370         data.dptr = &d->data[d->keylen];
371
372         if (key.dsize == 0 && data.dsize == 0) {
373                 state->null_count++;
374                 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
375                         return 0;
376                 }
377         }
378
379         callback = state->callback;
380         private_data = state->private_data;
381
382         callback(private_data, key, data);
383         if (key.dsize == 0 && data.dsize == 0) {
384                 /* we've received all of the null replies, so all
385                    nodes are finished */
386                 talloc_free(state);
387         }
388         return 0;
389 }       
390
391 struct traverse_start_state {
392         struct ctdb_context *ctdb;
393         struct ctdb_traverse_all_handle *h;
394         uint32_t srcnode;
395         uint32_t reqid;
396         uint64_t srvid;
397 };
398
399 /*
400   callback which sends records as messages to the client
401  */
402 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
403 {
404         struct traverse_start_state *state;
405         struct ctdb_rec_data *d;
406         TDB_DATA cdata;
407
408         state = talloc_get_type(p, struct traverse_start_state);
409
410         d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
411         if (d == NULL) {
412                 return;
413         }
414
415         cdata.dptr = (uint8_t *)d;
416         cdata.dsize = d->length;
417
418         ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
419         if (key.dsize == 0 && data.dsize == 0) {
420                 /* end of traverse */
421                 talloc_free(state);
422         }
423 }
424
425 /*
426   start a traverse_all - called as a control from a client
427  */
428 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data, 
429                                     TDB_DATA *outdata, uint32_t srcnode)
430 {
431         struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
432         struct traverse_start_state *state;
433         struct ctdb_db_context *ctdb_db;
434
435         if (data.dsize != sizeof(*d)) {
436                 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
437                 return -1;
438         }
439
440         ctdb_db = find_ctdb_db(ctdb, d->db_id);
441         if (ctdb_db == NULL) {
442                 return -1;
443         }
444
445         state = talloc(ctdb_db, struct traverse_start_state);
446         if (state == NULL) {
447                 return -1;
448         }
449         
450         state->srcnode = srcnode;
451         state->reqid = d->reqid;
452         state->srvid = d->srvid;
453         state->ctdb = ctdb;
454
455         state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
456         if (state->h == NULL) {
457                 talloc_free(state);
458                 return -1;
459         }
460
461         return 0;
462 }