*/
void ctdb_release_lock(struct ctdb_db *ctdb_db, struct ctdb_lock *lock);
+
+
+/**
+ * ctdb_traverse_callback_t - callback for ctdb_traverse_async.
+ * return 0 - to continue traverse
+ * return 1 - to abort the traverse
+ *
+ * See Also:
+ * ctdb_traverse_async()
+ */
+#define TRAVERSE_STATUS_RECORD 0
+#define TRAVERSE_STATUS_FINISHED 1
+#define TRAVERSE_STATUS_ERROR 2
+typedef int (*ctdb_traverse_callback_t)(struct ctdb_connection *ctdb,
+ struct ctdb_db *ctdb_db,
+ int status,
+ TDB_DATA key,
+ TDB_DATA data,
+ void *private_data);
+
+/**
+ * ctdb_traverse_async - traverse a database.
+ * @ctdb_db: the database handle from ctdb_attachdb/ctdb_attachdb_recv.
+ * @callback: the callback once the record is locked (typesafe).
+ * @cbdata: the argument to callback()
+ *
+ * This returns true on success.
+ * when successfull, the callback will be invoked for each record
+ * until the traversal is finished.
+ *
+ * status ==
+ * TRAVERSE_STATUS_RECORD key/data contains a record.
+ * TRAVERSE_STATUS_FINISHED traverse is finished. key/data is undefined.
+ * TRAVERSE_STATUS_ERROR an error occured during traverse.
+ * key/data is undefined.
+ *
+ * If failure is immediate, false is returned.
+ */
+bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
+ ctdb_traverse_callback_t callback, void *cbdata);
+
/**
* ctdb_message_fn_t - messaging callback for ctdb messages
*
bool ctdb_getpublicips_recv(struct ctdb_connection *ctdb,
struct ctdb_request *req, struct ctdb_all_public_ips **ips);
+
/**
* ctdb_getrecmaster_send - read the recovery master of a node
* @ctdb: the ctdb_connection from ctdb_connect.
return false;
}
}
+
+
+struct ctdb_traverse_state {
+ struct ctdb_request *handle;
+ struct ctdb_db *ctdb_db;
+ uint64_t srvid;
+
+ ctdb_traverse_callback_t callback;
+ void *cbdata;
+};
+
+static void traverse_remhnd_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+
+ if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) {
+ DEBUG(ctdb, LOG_ERR,
+ "Failed to remove message handler for"
+ " traverse.");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ }
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+ free(state);
+}
+
+static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid,
+ TDB_DATA data, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+ struct ctdb_db *ctdb_db = state->ctdb_db;
+ struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
+ TDB_DATA key;
+
+ if (data.dsize < sizeof(uint32_t) ||
+ d->length != data.dsize) {
+ DEBUG(ctdb, LOG_ERR,
+ "Bad data size %u in traverse_handler",
+ (unsigned)data.dsize);
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+
+ key.dsize = d->keylen;
+ key.dptr = &d->data[0];
+ data.dsize = d->datalen;
+ data.dptr = &d->data[d->keylen];
+
+ if (key.dsize == 0 && data.dsize == 0) {
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_FINISHED,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+
+ if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+ /* empty records are deleted records in ctdb */
+ return;
+ }
+
+ data.dsize -= sizeof(struct ctdb_ltdb_header);
+ data.dptr += sizeof(struct ctdb_ltdb_header);
+
+ if (state->callback(ctdb, ctdb_db,
+ TRAVERSE_STATUS_RECORD,
+ key, data, state->cbdata) != 0) {
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+}
+
+static void traverse_start_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+}
+
+static void traverse_msghnd_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+ struct ctdb_db *ctdb_db = state->ctdb_db;
+ struct ctdb_traverse_start t;
+
+ if (!ctdb_set_message_handler_recv(ctdb, state->handle)) {
+ DEBUG(ctdb, LOG_ERR,
+ "Failed to register message handler for"
+ " traverse.");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+ free(state);
+ return;
+ }
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+
+ t.db_id = ctdb_db->id;
+ t.srvid = state->srvid;
+ t.reqid = 0;
+
+ state->handle = new_ctdb_control_request(ctdb,
+ CTDB_CONTROL_TRAVERSE_START,
+ CTDB_CURRENT_NODE,
+ &t, sizeof(t),
+ traverse_start_cb, state);
+ if (state->handle == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async:"
+ " failed to send traverse_start control");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+}
+
+bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
+ ctdb_traverse_callback_t callback, void *cbdata)
+{
+ struct ctdb_connection *ctdb = ctdb_db->ctdb;
+ struct ctdb_traverse_state *state;
+ static uint32_t tid = 0;
+
+ state = malloc(sizeof(struct ctdb_traverse_state));
+ if (state == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async: no memory."
+ " allocate state failed");
+ return false;
+ }
+
+ tid++;
+ state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid;
+
+ state->callback = callback;
+ state->cbdata = cbdata;
+ state->ctdb_db = ctdb_db;
+
+ state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb,
+ state->srvid,
+ msg_h, state,
+ traverse_msghnd_cb, state);
+ if (state->handle == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async:"
+ " failed ctdb_set_message_handler_send");
+ free(state);
+ return false;
+ }
+
+ return true;
+}
registered = true;
}
+static int traverse_callback(struct ctdb_connection *ctdb_connection, struct ctdb_db *ctdb_db, int status, TDB_DATA key, TDB_DATA data, void *private_data)
+{
+ if (status == TRAVERSE_STATUS_FINISHED) {
+ printf("Traverse finished\n");
+ return 0;
+ }
+ if (status == TRAVERSE_STATUS_ERROR) {
+ printf("Traverse failed\n");
+ return 1;
+ }
+
+ printf("traverse callback status:%d\n", status);
+ printf("key: %d [%s]\n", key.dsize, key.dptr);
+ printf("data:%d [%s]\n", data.dsize, data.dptr);
+
+ return 0;
+}
+
+
int main(int argc, char *argv[])
{
struct ctdb_connection *ctdb_connection;
print_nodemap(nodemap);
ctdb_free_nodemap(nodemap);
+ printf("Traverse the test_test.tdb database\n");
+ ctdb_traverse_async(ctdb_db_context, traverse_callback, NULL);
+
for (;;) {
pfd.events = ctdb_which_events(ctdb_connection);