DLIST_ADD(ctdb_db->calls, call);
return 0;
}
+
+
+/*
+ start a cluster wide traverse. each record is sent as a message to
+ the given srvid
+ */
+int ctdb_traverse_all(struct ctdb_db_context *ctdb_db, uint64_t srvid)
+{
+ TDB_DATA data;
+ struct ctdb_traverse_start t;
+ int32_t status;
+ int ret;
+
+ t.db_id = ctdb_db->db_id;
+ t.srvid = srvid;
+ t.reqid = 0;
+
+ data.dptr = (uint8_t *)&t;
+ data.dsize = sizeof(t);
+
+ ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
+ data, NULL, NULL, &status);
+ if (ret != 0 || status != 0) {
+ DEBUG(0,("ctdb_traverse_all failed\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+struct list_keys_state {
+ FILE *f;
+ bool done;
+ uint32_t count;
+};
+
+/*
+ called on each key during a list_keys
+ */
+static void list_keys_handler(struct ctdb_context *ctdb, uint64_t srvid,
+ TDB_DATA data, void *p)
+{
+ struct list_keys_state *state = (struct list_keys_state *)p;
+ struct ctdb_traverse_data *d = (struct ctdb_traverse_data *)data.dptr;
+ TDB_DATA key;
+ char *keystr, *datastr;
+ struct ctdb_ltdb_header *h;
+
+ if (data.dsize < sizeof(uint32_t) ||
+ d->length != data.dsize) {
+ DEBUG(0,("Bad data size %u in list_keys_handler\n", data.dsize));
+ return;
+ }
+
+ key.dsize = d->keylen;
+ key.dptr = &d->data[0];
+ data.dsize = d->datalen;
+ data.dptr = &d->data[d->keylen];
+
+ if (key.dsize == 0 && data.dsize == 0) {
+ /* end of traverse */
+ state->done = True;
+ return;
+ }
+
+ h = (struct ctdb_ltdb_header *)data.dptr;
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ DEBUG(0,("Bad ctdb ltdb header in list_keys_handler\n"));
+ return;
+ }
+
+
+ keystr = hex_encode(ctdb, key.dptr, key.dsize);
+ datastr = hex_encode(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
+
+ fprintf(state->f, "dmaster: %u\n", h->dmaster);
+ fprintf(state->f, "rsn: %llu\n", (unsigned long long)h->rsn);
+ fprintf(state->f, "key: %s\ndata: %s\n", keystr, datastr);
+
+ talloc_free(keystr);
+ talloc_free(datastr);
+
+ state->count++;
+}
+
+/*
+ convenience function to list all keys to stdout
+ */
+int ctdb_list_keys(struct ctdb_db_context *ctdb_db, FILE *f)
+{
+ int ret;
+ uint64_t srvid = (getpid() | 0xFLL<<60);
+ struct list_keys_state state;
+
+ state.f = f;
+ state.done = False;
+ state.count = 0;
+
+ ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, list_keys_handler, &state);
+ if (ret != 0) {
+ DEBUG(0,("Failed to setup list keys handler\n"));
+ return -1;
+ }
+
+ ret = ctdb_traverse_all(ctdb_db, srvid);
+
+ while (!state.done) {
+ event_loop_once(ctdb_db->ctdb->ev);
+ }
+
+ return state.count;
+}
typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
-/*
- structure used to pass the data between the child and parent
- */
-struct ctdb_traverse_data {
- uint32_t length;
- uint32_t keylen;
- uint32_t datalen;
- uint8_t data[1];
-};
-
/*
handle returned to caller - freeing this handler will kill the child and
terminate the traverse
*/
-struct ctdb_traverse_handle {
+struct ctdb_traverse_local_handle {
struct ctdb_db_context *ctdb_db;
int fd[2];
pid_t child;
/*
called when data is available from the child
*/
-static void ctdb_traverse_handler(uint8_t *rawdata, size_t length, void *private_data)
+static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
{
- struct ctdb_traverse_handle *h = talloc_get_type(private_data,
- struct ctdb_traverse_handle);
+ struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
+ struct ctdb_traverse_local_handle);
TDB_DATA key, data;
ctdb_traverse_fn_t callback = h->callback;
void *p = h->private_data;
/*
destroy a in-flight traverse operation
*/
-static int traverse_destructor(struct ctdb_traverse_handle *h)
+static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
{
close(h->fd[0]);
kill(h->child, SIGKILL);
}
/*
- callback from tdb_traverse_read()x
+ form a ctdb_traverse_data record from a key/data pair
*/
-static int ctdb_traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+static struct ctdb_traverse_data *ctdb_traverse_marshall_record(TALLOC_CTX *mem_ctx,
+ uint32_t reqid,
+ TDB_DATA key, TDB_DATA data)
{
- struct ctdb_traverse_handle *h = talloc_get_type(p, struct ctdb_traverse_handle);
+ size_t length;
struct ctdb_traverse_data *d;
- size_t length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize;
- d = (struct ctdb_traverse_data *)talloc_size(h, length);
+
+ length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize;
+ d = (struct ctdb_traverse_data *)talloc_size(mem_ctx, length);
if (d == NULL) {
- /* error handling is tricky in this child code .... */
- return -1;
+ return NULL;
}
d->length = length;
+ d->reqid = reqid;
d->keylen = key.dsize;
d->datalen = data.dsize;
memcpy(&d->data[0], key.dptr, key.dsize);
memcpy(&d->data[key.dsize], data.dptr, data.dsize);
- if (ctdb_queue_send(h->queue, (uint8_t *)d, d->length) != 0) {
+ return d;
+}
+
+/*
+ callback from tdb_traverse_read()
+ */
+static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
+{
+ struct ctdb_traverse_local_handle *h = talloc_get_type(p,
+ struct ctdb_traverse_local_handle);
+ struct ctdb_traverse_data *d;
+ struct ctdb_ltdb_header *hdr;
+
+ /* filter out non-authoritative and zero-length records */
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+ if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
+ hdr->dmaster != h->ctdb_db->ctdb->vnn) {
+ return 0;
+ }
+
+ d = ctdb_traverse_marshall_record(h, 0, key, data);
+ if (d == NULL) {
+ /* error handling is tricky in this child code .... */
+ return -1;
+ }
+
+ if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
return -1;
}
return 0;
}
+
/*
- setup a non-blocking traverse of a tdb. The callback function will
- be called on every record in the local ltdb. To stop the travserse,
- talloc_free() the travserse_handle.
+ setup a non-blocking traverse of a local ltdb. The callback function
+ will be called on every record in the local ltdb. To stop the
+ travserse, talloc_free() the travserse_handle.
+
+ The traverse is finished when the callback is called with tdb_null for key and data
*/
-struct ctdb_traverse_handle *ctdb_traverse(struct ctdb_db_context *ctdb_db,
- ctdb_traverse_fn_t callback,
- void *private_data)
+struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
+ ctdb_traverse_fn_t callback,
+ void *private_data)
{
- struct ctdb_traverse_handle *h;
+ struct ctdb_traverse_local_handle *h;
int ret;
ctdb_db->ctdb->status.traverse_calls++;
- if (!(h = talloc_zero(ctdb_db, struct ctdb_traverse_handle))) {
+ h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
+ if (h == NULL) {
return NULL;
}
if (h->child == 0) {
/* start the traverse in the child */
close(h->fd[0]);
- tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_fn, h);
+ tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
_exit(0);
}
close(h->fd[1]);
- talloc_set_destructor(h, traverse_destructor);
+ talloc_set_destructor(h, traverse_local_destructor);
/*
setup a packet queue between the child and the parent. This
copes with all the async and packet boundary issues
*/
- h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_handler, h);
+ h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
if (h->queue == NULL) {
talloc_free(h);
return NULL;
return h;
}
+
+
+struct ctdb_traverse_all_handle {
+ struct ctdb_context *ctdb;
+ uint32_t reqid;
+ ctdb_traverse_fn_t callback;
+ void *private_data;
+ uint32_t null_count;
+};
+
+/*
+ destroy a traverse_all op
+ */
+static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
+{
+ ctdb_reqid_remove(state->ctdb, state->reqid);
+ return 0;
+}
+
+struct ctdb_traverse_all {
+ uint32_t db_id;
+ uint32_t reqid;
+ uint32_t vnn;
+};
+
+/*
+ setup a cluster-wide non-blocking traverse of a ctdb. The
+ callback function will be called on every record in the local
+ ltdb. To stop the travserse, talloc_free() the traverse_handle.
+
+ The traverse is finished when the callback is called with tdb_null
+ for key and data
+ */
+struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
+ ctdb_traverse_fn_t callback,
+ void *private_data)
+{
+ struct ctdb_traverse_all_handle *state;
+ struct ctdb_context *ctdb = ctdb_db->ctdb;
+ int ret;
+ TDB_DATA data;
+ struct ctdb_traverse_all r;
+
+ state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
+ if (state == NULL) {
+ return NULL;
+ }
+
+ state->ctdb = ctdb;
+ state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
+ state->callback = callback;
+ state->private_data = private_data;
+ state->null_count = 0;
+
+ talloc_set_destructor(state, ctdb_traverse_all_destructor);
+
+ r.db_id = ctdb_db->db_id;
+ r.reqid = state->reqid;
+ r.vnn = ctdb->vnn;
+
+ data.dptr = (uint8_t *)&r;
+ data.dsize = sizeof(r);
+
+ /* tell all the nodes in the cluster to start sending records to this node */
+ ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0, CTDB_CONTROL_TRAVERSE_ALL,
+ CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ if (ret != 0) {
+ talloc_free(state);
+ return NULL;
+ }
+
+ return state;
+}
+
+struct traverse_all_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_traverse_local_handle *h;
+ uint32_t reqid;
+ uint32_t srcnode;
+};
+
+/*
+ called for each record during a traverse all
+ */
+static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
+{
+ struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
+ int ret;
+ struct ctdb_traverse_data *d;
+
+ d = ctdb_traverse_marshall_record(state, state->reqid, key, data);
+ if (d == NULL) {
+ /* darn .... */
+ DEBUG(0,("Out of memory in traverse_all_callback\n"));
+ return;
+ }
+
+ data.dptr = (uint8_t *)d;
+ data.dsize = d->length;
+
+ ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
+ CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
+ if (ret != 0) {
+ DEBUG(0,("Failed to send traverse data\n"));
+ }
+}
+
+/*
+ called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
+ setup a traverse of our local ltdb, sending the records as
+ CTDB_CONTROL_TRAVERSE_DATA records back to the originator
+ */
+int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+ struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
+ struct traverse_all_state *state;
+ struct ctdb_db_context *ctdb_db;
+
+ if (data.dsize != sizeof(struct ctdb_traverse_all)) {
+ DEBUG(0,("Invalid size in ctdb_control_traverse_all\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, c->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ state = talloc(ctdb_db, struct traverse_all_state);
+ if (state == NULL) {
+ return -1;
+ }
+
+ state->reqid = c->reqid;
+ state->srcnode = c->vnn;
+ state->ctdb = ctdb;
+
+ state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
+ if (state->h == NULL) {
+ talloc_free(state);
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
+ call the traverse_all callback with the record
+ */
+int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
+{
+ struct ctdb_traverse_data *d = (struct ctdb_traverse_data *)data.dptr;
+ struct ctdb_traverse_all_handle *state;
+ TDB_DATA key;
+ ctdb_traverse_fn_t callback;
+ void *private_data;
+
+ if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
+ DEBUG(0,("Bad record size in ctdb_control_traverse_data\n"));
+ return -1;
+ }
+
+ state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
+ if (state == NULL || d->reqid != state->reqid) {
+ /* traverse might have been terminated already */
+ return -1;
+ }
+
+ key.dsize = d->keylen;
+ key.dptr = &d->data[0];
+ data.dsize = d->datalen;
+ data.dptr = &d->data[d->keylen];
+
+ if (key.dsize == 0 && data.dsize == 0) {
+ state->null_count++;
+ if (state->null_count != ctdb_get_num_nodes(ctdb)) {
+ return 0;
+ }
+ }
+
+ callback = state->callback;
+ private_data = state->private_data;
+
+ callback(private_data, key, data);
+ if (key.dsize == 0 && data.dsize == 0) {
+ /* we've received all of the null replies, so all
+ nodes are finished */
+ talloc_free(state);
+ }
+ return 0;
+}
+
+struct traverse_start_state {
+ struct ctdb_context *ctdb;
+ struct ctdb_traverse_all_handle *h;
+ uint32_t srcnode;
+ uint32_t reqid;
+ uint64_t srvid;
+};
+
+/*
+ callback which sends records as messages to the client
+ */
+static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
+{
+ struct traverse_start_state *state;
+ struct ctdb_traverse_data *d;
+
+ state = talloc_get_type(p, struct traverse_start_state);
+
+ d = ctdb_traverse_marshall_record(state, state->reqid, key, data);
+ if (d == NULL) {
+ return;
+ }
+
+ data.dptr = (uint8_t *)d;
+ data.dsize = d->length;
+
+ ctdb_dispatch_message(state->ctdb, state->srvid, data);
+ if (key.dsize == 0 && data.dsize == 0) {
+ /* end of traverse */
+ talloc_free(state);
+ }
+}
+
+/*
+ start a traverse_all - called as a control from a client
+ */
+int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
+ TDB_DATA *outdata, uint32_t srcnode)
+{
+ struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
+ struct traverse_start_state *state;
+ struct ctdb_db_context *ctdb_db;
+
+ if (data.dsize != sizeof(*d)) {
+ DEBUG(0,("Bad record size in ctdb_control_traverse_start\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, d->db_id);
+ if (ctdb_db == NULL) {
+ return -1;
+ }
+
+ state = talloc(ctdb_db, struct traverse_start_state);
+ if (state == NULL) {
+ return -1;
+ }
+
+ state->srcnode = srcnode;
+ state->reqid = d->reqid;
+ state->srvid = d->srvid;
+ state->ctdb = ctdb;
+
+ state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
+ if (state->h == NULL) {
+ talloc_free(state);
+ return -1;
+ }
+
+ return 0;
+}