LIBCTDB: add support for traverse
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 14 Jan 2011 06:35:31 +0000 (17:35 +1100)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 14 Jan 2011 06:35:31 +0000 (17:35 +1100)
include/ctdb.h
include/ctdb_private.h
include/ctdb_protocol.h
libctdb/ctdb.c
libctdb/tst.c

index 1c6cf353f2e6a870d614ba7ca8f20bdd97e4fef7..c95c2e1e26796fc7af934e2077678435e3109a86 100644 (file)
@@ -291,6 +291,47 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db,
  */
 void ctdb_release_lock(struct ctdb_db *ctdb_db, struct ctdb_lock *lock);
 
+
+
+/**
+ * ctdb_traverse_callback_t - callback for ctdb_traverse_async.
+ * return 0 - to continue traverse
+ * return 1 - to abort the traverse
+ *
+ * See Also:
+ *     ctdb_traverse_async()
+ */
+#define TRAVERSE_STATUS_RECORD         0
+#define TRAVERSE_STATUS_FINISHED       1
+#define TRAVERSE_STATUS_ERROR          2
+typedef int (*ctdb_traverse_callback_t)(struct ctdb_connection *ctdb,
+                                   struct ctdb_db *ctdb_db,
+                                   int status,
+                                   TDB_DATA key,
+                                   TDB_DATA data,
+                                   void *private_data);
+
+/**
+ * ctdb_traverse_async - traverse a database.
+ * @ctdb_db: the database handle from ctdb_attachdb/ctdb_attachdb_recv.
+ * @callback: the callback once the record is locked (typesafe).
+ * @cbdata: the argument to callback()
+ *
+ * This returns true on success.
+ * when successfull, the callback will be invoked for each record
+ * until the traversal is finished.
+ *
+ * status == 
+ * TRAVERSE_STATUS_RECORD         key/data contains a record.
+ * TRAVERSE_STATUS_FINISHED       traverse is finished. key/data is undefined.
+ * TRAVERSE_STATUS_ERROR          an error occured during traverse.
+ *                                key/data is undefined.
+ *
+ * If failure is immediate, false is returned.
+ */
+bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
+                        ctdb_traverse_callback_t callback, void *cbdata);
+
 /**
  * ctdb_message_fn_t - messaging callback for ctdb messages
  *
@@ -480,6 +521,7 @@ ctdb_getpublicips_send(struct ctdb_connection *ctdb,
 bool ctdb_getpublicips_recv(struct ctdb_connection *ctdb,
                      struct ctdb_request *req, struct ctdb_all_public_ips **ips);
 
+
 /**
  * ctdb_getrecmaster_send - read the recovery master of a node
  * @ctdb: the ctdb_connection from ctdb_connect.
index 2d9315f6ac49bf0a79f4b2947c540bdf1ab3098c..cb2b066d2b0b1095069c5040893c272f8e7e78f1 100644 (file)
@@ -837,24 +837,6 @@ int ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA
 int ctdb_control_writerecord(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata);
 
 
-struct ctdb_traverse_start {
-       uint32_t db_id;
-       uint32_t reqid;
-       uint64_t srvid;
-};
-
-/*
-  structure used to pass record data between the child and parent
- */
-struct ctdb_rec_data {
-       uint32_t length;
-       uint32_t reqid;
-       uint32_t keylen;
-       uint32_t datalen;
-       uint8_t  data[1];
-};
-                                  
-
 /* structure used for pulldb control */
 struct ctdb_control_pulldb {
        uint32_t db_id;
index d297af42c316362b71f12af6bcff55a466f5d82f..b6b753c3c7ed3488b2b1365deb67c6e6ee2d6450 100644 (file)
@@ -161,6 +161,9 @@ struct ctdb_call_info {
  */
 #define CTDB_SRVID_TEST_RANGE  0xFE03000000000000LL
 
+/* Range of ports reserved for traversals */
+#define CTDB_SRVID_TRAVERSE_RANGE  0xFE04000000000000LL
+
 /* used on the domain socket, send a pdu to the local daemon */
 #define CTDB_CURRENT_NODE     0xF0000001
 /* send a broadcast to all nodes in the cluster, active or not */
@@ -545,6 +548,23 @@ struct latency_counter {
        double total;
 };
 
+/*
+  structure used to pass record data between the child and parent
+ */
+struct ctdb_rec_data {
+       uint32_t length;
+       uint32_t reqid;
+       uint32_t keylen;
+       uint32_t datalen;
+       uint8_t  data[1];
+};
+
+struct ctdb_traverse_start {
+       uint32_t db_id;
+       uint32_t reqid;
+       uint64_t srvid;
+};
+
 /*
   ctdb statistics information
  */
index e06c66ca85497d3b1904ea89d77a31745721b1bd..7115982d1a119651a2d270b06426e80c635a29dd 100644 (file)
@@ -936,3 +936,186 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db,
                return false;
        }
 }
+
+
+struct ctdb_traverse_state {
+       struct ctdb_request *handle;
+       struct ctdb_db *ctdb_db;
+       uint64_t srvid;
+
+       ctdb_traverse_callback_t callback;
+       void *cbdata;
+};
+
+static void traverse_remhnd_cb(struct ctdb_connection *ctdb,
+                        struct ctdb_request *req, void *private_data)
+{
+       struct ctdb_traverse_state *state = private_data;
+
+       if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) {
+               DEBUG(ctdb, LOG_ERR,
+                               "Failed to remove message handler for"
+                               " traverse.");
+               state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+                               TRAVERSE_STATUS_ERROR,
+                               tdb_null, tdb_null,
+                               state->cbdata);
+       }
+       ctdb_request_free(ctdb, state->handle);
+       state->handle = NULL;
+       free(state);
+}
+       
+static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid,
+          TDB_DATA data, void *private_data)
+{
+       struct ctdb_traverse_state *state = private_data;
+       struct ctdb_db *ctdb_db = state->ctdb_db;
+       struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
+       TDB_DATA key;
+
+       if (data.dsize < sizeof(uint32_t) ||
+           d->length != data.dsize) {
+               DEBUG(ctdb, LOG_ERR,
+                       "Bad data size %u in traverse_handler",
+                       (unsigned)data.dsize);
+               state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+                               TRAVERSE_STATUS_ERROR,
+                               tdb_null, tdb_null,
+                               state->cbdata);
+               state->handle = ctdb_remove_message_handler_send(
+                               state->ctdb_db->ctdb, state->srvid,
+                               msg_h, state,
+                               traverse_remhnd_cb, state);
+               return;
+       }
+
+       key.dsize = d->keylen;
+       key.dptr  = &d->data[0];
+       data.dsize = d->datalen;
+       data.dptr = &d->data[d->keylen];
+
+       if (key.dsize == 0 && data.dsize == 0) {
+               state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+                               TRAVERSE_STATUS_FINISHED,
+                               tdb_null, tdb_null,
+                               state->cbdata);
+               state->handle = ctdb_remove_message_handler_send(
+                               state->ctdb_db->ctdb, state->srvid,
+                               msg_h, state,
+                               traverse_remhnd_cb, state);
+               return;
+       }
+
+       if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+               /* empty records are deleted records in ctdb */
+               return;
+       }
+
+       data.dsize -= sizeof(struct ctdb_ltdb_header);
+       data.dptr  += sizeof(struct ctdb_ltdb_header);
+
+       if (state->callback(ctdb, ctdb_db,
+                       TRAVERSE_STATUS_RECORD,
+                       key, data, state->cbdata) != 0) {
+               state->handle = ctdb_remove_message_handler_send(
+                               state->ctdb_db->ctdb, state->srvid,
+                               msg_h, state,
+                               traverse_remhnd_cb, state);
+               return;
+       }
+}
+
+static void traverse_start_cb(struct ctdb_connection *ctdb,
+                        struct ctdb_request *req, void *private_data)
+{
+       struct ctdb_traverse_state *state = private_data;
+
+        ctdb_request_free(ctdb, state->handle);
+       state->handle = NULL;
+}
+
+static void traverse_msghnd_cb(struct ctdb_connection *ctdb,
+                        struct ctdb_request *req, void *private_data)
+{
+       struct ctdb_traverse_state *state = private_data;
+       struct ctdb_db *ctdb_db = state->ctdb_db;
+       struct ctdb_traverse_start t;
+
+       if (!ctdb_set_message_handler_recv(ctdb, state->handle)) {
+               DEBUG(ctdb, LOG_ERR,
+                               "Failed to register message handler for"
+                               " traverse.");
+               state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+                               TRAVERSE_STATUS_ERROR,
+                               tdb_null, tdb_null,
+                               state->cbdata);
+               ctdb_request_free(ctdb, state->handle);
+               state->handle = NULL;
+               free(state);
+               return;
+        }
+        ctdb_request_free(ctdb, state->handle);
+       state->handle = NULL;
+
+       t.db_id = ctdb_db->id;
+       t.srvid = state->srvid;
+       t.reqid = 0;
+
+       state->handle = new_ctdb_control_request(ctdb,
+                               CTDB_CONTROL_TRAVERSE_START,
+                               CTDB_CURRENT_NODE,
+                               &t, sizeof(t),
+                               traverse_start_cb, state);
+       if (state->handle == NULL) {
+               DEBUG(ctdb, LOG_ERR,
+                               "ctdb_traverse_async:"
+                               " failed to send traverse_start control");
+               state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+                               TRAVERSE_STATUS_ERROR,
+                               tdb_null, tdb_null,
+                               state->cbdata);
+               state->handle = ctdb_remove_message_handler_send(
+                               state->ctdb_db->ctdb, state->srvid,
+                               msg_h, state,
+                               traverse_remhnd_cb, state);
+               return;
+       }
+}
+
+bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
+                        ctdb_traverse_callback_t callback, void *cbdata)
+{
+       struct ctdb_connection *ctdb = ctdb_db->ctdb;
+       struct ctdb_traverse_state *state;
+       static uint32_t tid = 0;
+
+       state = malloc(sizeof(struct ctdb_traverse_state));
+       if (state == NULL) {
+               DEBUG(ctdb, LOG_ERR,
+                               "ctdb_traverse_async: no memory."
+                               " allocate state failed");
+               return false;
+       }
+
+       tid++;
+       state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid;
+
+       state->callback = callback;
+       state->cbdata   = cbdata;
+       state->ctdb_db  = ctdb_db;
+
+       state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb,
+                               state->srvid,
+                               msg_h, state,
+                               traverse_msghnd_cb, state);
+       if (state->handle == NULL) {
+               DEBUG(ctdb, LOG_ERR,
+                       "ctdb_traverse_async:"
+                       " failed ctdb_set_message_handler_send");
+               free(state);
+               return false;
+       }
+
+       return true;
+}
index e61561fc77c4ac2e79b38ffe7d1f0bd3496eedee..0e3531d039a568b3ce20a73256441cea3326893d 100644 (file)
@@ -223,6 +223,25 @@ void message_handler_cb(struct ctdb_connection *ctdb,
        registered = true;
 }
 
+static int traverse_callback(struct ctdb_connection *ctdb_connection, struct ctdb_db *ctdb_db, int status, TDB_DATA key, TDB_DATA data, void *private_data)
+{
+       if (status == TRAVERSE_STATUS_FINISHED) {
+               printf("Traverse finished\n");
+               return 0;
+       }
+       if (status == TRAVERSE_STATUS_ERROR) {
+               printf("Traverse failed\n");
+               return 1;
+       }
+
+       printf("traverse callback   status:%d\n", status);
+       printf("key: %d [%s]\n", key.dsize, key.dptr);
+       printf("data:%d [%s]\n", data.dsize, data.dptr);
+
+       return 0;
+}
+
+
 int main(int argc, char *argv[])
 {
        struct ctdb_connection *ctdb_connection;
@@ -366,6 +385,9 @@ int main(int argc, char *argv[])
        print_nodemap(nodemap);
        ctdb_free_nodemap(nodemap);
 
+       printf("Traverse the test_test.tdb database\n");
+       ctdb_traverse_async(ctdb_db_context, traverse_callback, NULL);
+
        for (;;) {
 
          pfd.events = ctdb_which_events(ctdb_connection);