2 efficient async ctdb traverse
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
25 #include "lib/tdb/include/tdb.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
29 typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
32 handle returned to caller - freeing this handler will kill the child and
33 terminate the traverse
35 struct ctdb_traverse_local_handle {
36 struct ctdb_traverse_local_handle *next, *prev;
37 struct ctdb_db_context *ctdb_db;
41 uint32_t client_reqid;
43 ctdb_traverse_fn_t callback;
44 struct timeval start_time;
45 struct ctdb_queue *queue;
49 called when data is available from the child
51 static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
53 struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
54 struct ctdb_traverse_local_handle);
56 ctdb_traverse_fn_t callback = h->callback;
57 void *p = h->private_data;
58 struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
60 if (rawdata == NULL || length < 4 || length != tdata->length) {
63 callback(p, tdb_null, tdb_null);
67 key.dsize = tdata->keylen;
68 key.dptr = &tdata->data[0];
69 data.dsize = tdata->datalen;
70 data.dptr = &tdata->data[tdata->keylen];
72 callback(p, key, data);
76 destroy a in-flight traverse operation
78 static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
80 DLIST_REMOVE(h->ctdb_db->traverse, h);
81 kill(h->child, SIGKILL);
86 callback from tdb_traverse_read()
88 static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
90 struct ctdb_traverse_local_handle *h = talloc_get_type(p,
91 struct ctdb_traverse_local_handle);
92 struct ctdb_rec_data *d;
93 struct ctdb_ltdb_header *hdr;
95 hdr = (struct ctdb_ltdb_header *)data.dptr;
97 if (h->ctdb_db->persistent == 0) {
98 /* filter out zero-length records */
99 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
103 /* filter out non-authoritative records */
104 if (hdr->dmaster != h->ctdb_db->ctdb->pnn) {
109 d = ctdb_marshall_record(h, 0, key, NULL, data);
111 /* error handling is tricky in this child code .... */
115 if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
121 struct traverse_all_state {
122 struct ctdb_context *ctdb;
123 struct ctdb_traverse_local_handle *h;
126 uint32_t client_reqid;
131 setup a non-blocking traverse of a local ltdb. The callback function
132 will be called on every record in the local ltdb. To stop the
133 travserse, talloc_free() the travserse_handle.
135 The traverse is finished when the callback is called with tdb_null for key and data
137 static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
138 ctdb_traverse_fn_t callback,
139 struct traverse_all_state *all_state)
141 struct ctdb_traverse_local_handle *h;
144 h = talloc_zero(all_state, struct ctdb_traverse_local_handle);
156 h->child = ctdb_fork(ctdb_db->ctdb);
158 if (h->child == (pid_t)-1) {
165 h->callback = callback;
166 h->private_data = all_state;
167 h->ctdb_db = ctdb_db;
168 h->client_reqid = all_state->client_reqid;
169 h->srvid = all_state->srvid;
172 /* start the traverse in the child */
174 debug_extra = talloc_asprintf(NULL, "traverse_local-%s:",
176 tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
181 set_close_on_exec(h->fd[0]);
183 talloc_set_destructor(h, traverse_local_destructor);
185 DLIST_ADD(ctdb_db->traverse, h);
188 setup a packet queue between the child and the parent. This
189 copes with all the async and packet boundary issues
191 DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
193 h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h,
195 if (h->queue == NULL) {
200 h->start_time = timeval_current();
206 struct ctdb_traverse_all_handle {
207 struct ctdb_context *ctdb;
208 struct ctdb_db_context *ctdb_db;
210 ctdb_traverse_fn_t callback;
216 destroy a traverse_all op
218 static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
220 ctdb_reqid_remove(state->ctdb, state->reqid);
224 struct ctdb_traverse_all {
228 uint32_t client_reqid;
232 /* called when a traverse times out */
233 static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te,
234 struct timeval t, void *private_data)
236 struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
238 DEBUG(DEBUG_ERR,(__location__ " Traverse all timeout on database:%s\n", state->ctdb_db->db_name));
239 CTDB_INCREMENT_STAT(state->ctdb, timeouts.traverse);
241 state->callback(state->private_data, tdb_null, tdb_null);
245 struct traverse_start_state {
246 struct ctdb_context *ctdb;
247 struct ctdb_traverse_all_handle *h;
256 setup a cluster-wide non-blocking traverse of a ctdb. The
257 callback function will be called on every record in the local
258 ltdb. To stop the travserse, talloc_free() the traverse_handle.
260 The traverse is finished when the callback is called with tdb_null
263 static struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
264 ctdb_traverse_fn_t callback,
265 struct traverse_start_state *start_state)
267 struct ctdb_traverse_all_handle *state;
268 struct ctdb_context *ctdb = ctdb_db->ctdb;
271 struct ctdb_traverse_all r;
272 uint32_t destination;
274 state = talloc(start_state, struct ctdb_traverse_all_handle);
280 state->ctdb_db = ctdb_db;
281 state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
282 state->callback = callback;
283 state->private_data = start_state;
284 state->null_count = 0;
286 talloc_set_destructor(state, ctdb_traverse_all_destructor);
288 r.db_id = ctdb_db->db_id;
289 r.reqid = state->reqid;
291 r.client_reqid = start_state->reqid;
292 r.srvid = start_state->srvid;
294 data.dptr = (uint8_t *)&r;
295 data.dsize = sizeof(r);
297 if (ctdb_db->persistent == 0) {
298 /* normal database, traverse all nodes */
299 destination = CTDB_BROADCAST_VNNMAP;
302 /* persistent database, traverse one node, preferably
305 destination = ctdb->pnn;
306 /* check we are in the vnnmap */
307 for (i=0; i < ctdb->vnn_map->size; i++) {
308 if (ctdb->vnn_map->map[i] == ctdb->pnn) {
312 /* if we are not in the vnn map we just pick the first
315 if (i == ctdb->vnn_map->size) {
316 destination = ctdb->vnn_map->map[0];
320 /* tell all the nodes in the cluster to start sending records to this
321 * node, or if it is a persistent database, just tell the local
324 ret = ctdb_daemon_send_control(ctdb, destination, 0,
325 CTDB_CONTROL_TRAVERSE_ALL,
326 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
333 /* timeout the traverse */
334 event_add_timed(ctdb->ev, state,
335 timeval_current_ofs(ctdb->tunable.traverse_timeout, 0),
336 ctdb_traverse_all_timeout, state);
342 called for each record during a traverse all
344 static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
346 struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
348 struct ctdb_rec_data *d;
351 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
354 DEBUG(DEBUG_ERR,("Out of memory in traverse_all_callback\n"));
358 cdata.dptr = (uint8_t *)d;
359 cdata.dsize = d->length;
361 ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
362 0, CTDB_CTRL_FLAG_NOREPLY, cdata, NULL, NULL);
364 DEBUG(DEBUG_ERR,("Failed to send traverse data\n"));
367 if (key.dsize == 0 && data.dsize == 0) {
374 called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
375 setup a traverse of our local ltdb, sending the records as
376 CTDB_CONTROL_TRAVERSE_DATA records back to the originator
378 int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
380 struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
381 struct traverse_all_state *state;
382 struct ctdb_db_context *ctdb_db;
384 if (data.dsize != sizeof(struct ctdb_traverse_all)) {
385 DEBUG(DEBUG_ERR,(__location__ " Invalid size in ctdb_control_traverse_all\n"));
389 ctdb_db = find_ctdb_db(ctdb, c->db_id);
390 if (ctdb_db == NULL) {
394 if (ctdb_db->unhealthy_reason) {
395 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
396 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_all: %s\n",
397 ctdb_db->db_name, ctdb_db->unhealthy_reason));
400 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_all: %s\n",
401 ctdb_db->db_name, ctdb_db->unhealthy_reason));
404 state = talloc(ctdb_db, struct traverse_all_state);
409 state->reqid = c->reqid;
410 state->srcnode = c->pnn;
412 state->client_reqid = c->client_reqid;
413 state->srvid = c->srvid;
415 state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
416 if (state->h == NULL) {
426 called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
427 call the traverse_all callback with the record
429 int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
431 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
432 struct ctdb_traverse_all_handle *state;
434 ctdb_traverse_fn_t callback;
437 if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
438 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_data\n"));
442 state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
443 if (state == NULL || d->reqid != state->reqid) {
444 /* traverse might have been terminated already */
448 key.dsize = d->keylen;
449 key.dptr = &d->data[0];
450 data.dsize = d->datalen;
451 data.dptr = &d->data[d->keylen];
453 if (key.dsize == 0 && data.dsize == 0) {
455 /* Persistent databases are only scanned on one node (the local
458 if (state->ctdb_db->persistent == 0) {
459 if (state->null_count != ctdb_get_num_active_nodes(ctdb)) {
465 callback = state->callback;
466 private_data = state->private_data;
468 callback(private_data, key, data);
473 kill a in-progress traverse, used when a client disconnects
475 int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA data,
476 TDB_DATA *outdata, uint32_t srcnode)
478 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
479 struct ctdb_db_context *ctdb_db;
480 struct ctdb_traverse_local_handle *t;
482 ctdb_db = find_ctdb_db(ctdb, d->db_id);
483 if (ctdb_db == NULL) {
487 for (t=ctdb_db->traverse; t; t=t->next) {
488 if (t->client_reqid == d->reqid &&
489 t->srvid == d->srvid) {
500 this is called when a client disconnects during a traverse
501 we need to notify all the nodes taking part in the search that they
502 should kill their traverse children
504 static int ctdb_traverse_start_destructor(struct traverse_start_state *state)
506 struct ctdb_traverse_start r;
509 DEBUG(DEBUG_ERR,(__location__ " Traverse cancelled by client disconnect for database:0x%08x\n", state->db_id));
510 r.db_id = state->db_id;
511 r.reqid = state->reqid;
512 r.srvid = state->srvid;
514 data.dptr = (uint8_t *)&r;
515 data.dsize = sizeof(r);
517 ctdb_daemon_send_control(state->ctdb, CTDB_BROADCAST_CONNECTED, 0,
518 CTDB_CONTROL_TRAVERSE_KILL,
519 0, CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
524 callback which sends records as messages to the client
526 static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
528 struct traverse_start_state *state;
529 struct ctdb_rec_data *d;
532 state = talloc_get_type(p, struct traverse_start_state);
534 d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
539 cdata.dptr = (uint8_t *)d;
540 cdata.dsize = d->length;
542 ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
543 if (key.dsize == 0 && data.dsize == 0) {
544 /* end of traverse */
545 talloc_set_destructor(state, NULL);
552 start a traverse_all - called as a control from a client
554 int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
555 TDB_DATA *outdata, uint32_t srcnode, uint32_t client_id)
557 struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
558 struct traverse_start_state *state;
559 struct ctdb_db_context *ctdb_db;
560 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
562 if (client == NULL) {
563 DEBUG(DEBUG_ERR,(__location__ " No client found\n"));
567 if (data.dsize != sizeof(*d)) {
568 DEBUG(DEBUG_ERR,("Bad record size in ctdb_control_traverse_start\n"));
572 ctdb_db = find_ctdb_db(ctdb, d->db_id);
573 if (ctdb_db == NULL) {
577 if (ctdb_db->unhealthy_reason) {
578 if (ctdb->tunable.allow_unhealthy_db_read == 0) {
579 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_control_traverse_start: %s\n",
580 ctdb_db->db_name, ctdb_db->unhealthy_reason));
583 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in ctdb_control_traverse_start: %s\n",
584 ctdb_db->db_name, ctdb_db->unhealthy_reason));
587 state = talloc(client, struct traverse_start_state);
592 state->srcnode = srcnode;
593 state->reqid = d->reqid;
594 state->srvid = d->srvid;
595 state->db_id = d->db_id;
598 state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
599 if (state->h == NULL) {
604 talloc_set_destructor(state, ctdb_traverse_start_destructor);