this allows us to have a generic signature for all callbacks instead of different types for each operation.
uint32_t pnn;
ctdb_handle *handle;
- handle = ctdb_getpnn_send(ctdb, destnode, NULL, NULL);
+ handle = ctdb_getpnn_send(ctdb, destnode);
if (handle == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Failed to send getpnn control\n"));
return -1;
/* Functions are not thread safe so all function calls must be wrapped
* inside a pthread_mutex for threaded applications.
*
- * All _send() functions are guaranteed to be non-blocking and fully
+ * All *_send() functions are guaranteed to be non-blocking and fully
* asynchronous.
*
- * Avoid using the synchronous calls
+ * The return data from a _send() call can be accessed through two different
+ * mechanisms.
+ *
+ * 1, by calling *_recv() directly on the handle.
+ * This function will block until the response is received so it
+ * should be avoided.
+ * The exception is when called from in the registered callback,
+ * in this case the fucntion is guaranteed not to block.
+ *
+ * 2, Registering an async callback to be invoked when the call completes.
+ * From inside the callback you use the *_recv() function to extract the
+ * response data.
+ *
+ * After the *_recv() function returns, the handle will have been destroyed.
*/
/*
typedef void ctdb_handle;
+/*
+ * After issuing a *_send() command, you can use this function to register a
+ * a callback function to be automatically called once the call
+ * finishes.
+ *
+ * Once the callback function returns, the handle will be automatically
+ * destroyed.
+ *
+ * If using ctdb_free() to abort a call in flight, you have to take care
+ * to avoid the race condition that would exist between the callback and
+ * ctdb_free().
+ *
+ * Possible method could be :
+ * * take pthreads mutex
+ * * ctdb_set_callback(handle, NULL, NULL)
+ * * verify that the callback has not yet been called
+ * (if it has handle is no longer valid)
+ * * ctdb_free(handle)
+ * * release pthreads mutex
+ */
+typedef void (*ctdb_callback)(int32_t status, struct ctdb_context *ctdb, ctdb_handle *, void *private_data);
+
+int ctdb_set_callback(ctdb_handle *handle, ctdb_callback callback, void *private_data);
+
+
+
+
/*
* functions to attach to a database
* if the database does not exist it will be created.
/*
* functions to read the pnn number of the local node
*/
-typedef void (*ctdb_getpnn_cb)(int32_t status, int32_t pnn, void *private_data);
-
ctdb_handle *
ctdb_getpnn_send(struct ctdb_context *ctdb,
- uint32_t destnode,
- ctdb_getpnn_cb callback,
- void *private_data);
+ uint32_t destnode);
int ctdb_getpnn_recv(struct ctdb_context *ctdb,
ctdb_handle *handle,
uint32_t *pnn);
#include "system/filesys.h"
+struct ctdb_control_cb_data {
+ void *callback;
+ void *private_data;
+ uint32_t db_id;
+};
+
+
+static void
+ctdb_control_cb(struct ctdb_client_control_state *state)
+{
+ struct ctdb_control_cb_data *cb_data = state->async.private_data;
+ ctdb_callback callback = (ctdb_callback)cb_data->callback;
+
+ /* dont recurse */
+ state->async.fn = NULL;
+
+ if (state->state != CTDB_CONTROL_DONE) {
+ DEBUG(DEBUG_ERR, (__location__ " ctdb_getpnn_recv_cb failed with state:%d\n", state->state));
+ callback(-1, NULL, NULL, cb_data->private_data);
+ return;
+ }
+
+ callback(0, state->ctdb, state, cb_data->private_data);
+}
+
+
+/*
+ * This function is used to set the callback action for a handle
+ */
+int ctdb_set_callback(ctdb_handle *handle, ctdb_callback callback, void *private_data)
+{
+ struct ctdb_client_control_state *control_state = talloc_get_type(handle, struct ctdb_client_control_state);
+
+ if (control_state != NULL) {
+ struct ctdb_control_cb_data *cb_data;
+
+ if (callback == NULL) {
+ if (control_state->async.private_data != NULL) {
+ talloc_free(control_state->async.private_data);
+ control_state->async.private_data = NULL;
+ }
+ control_state->async.fn = NULL;
+
+ return 0;
+ }
+
+ cb_data = talloc(control_state, struct ctdb_control_cb_data);
+ if (cb_data == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " Failed to alloc cb_data\n"));
+ return -1;
+ }
+
+ cb_data->callback = callback;
+ cb_data->private_data = private_data;
+
+ control_state->async.fn = ctdb_control_cb;
+ control_state->async.private_data = cb_data;
+
+ return 0;
+ }
+
+ DEBUG(DEBUG_ERR, (__location__ " Unknown type of handle passed to ctdb_set_callback.\n"));
+ return -1;
+}
+
+
+
/*
this is the dummy null procedure that all databases support
*/
return 0;
}
-struct ctdb_control_cb_data {
- void *callback;
- void *private_data;
- uint32_t db_id;
-};
-
/*************************
* GET PNN of local node *
*************************/
-static void
-ctdb_getpnn_recv_cb(struct ctdb_client_control_state *state)
-{
- struct ctdb_control_cb_data *cb_data = state->async.private_data;
- ctdb_getpnn_cb callback = (ctdb_getpnn_cb)cb_data->callback;
-
- if (state->state != CTDB_CONTROL_DONE) {
- DEBUG(DEBUG_ERR, (__location__ " ctdb_getpnn_recv_cb failed with state:%d\n", state->state));
- callback(-1, 0, cb_data->private_data);
- return;
- }
-
- callback(0, state->status, cb_data->private_data);
-}
-
ctdb_handle *
ctdb_getpnn_send(struct ctdb_context *ctdb,
- uint32_t destnode,
- ctdb_getpnn_cb callback,
- void *private_data)
+ uint32_t destnode)
{
struct ctdb_client_control_state *state;
- struct ctdb_control_cb_data *cb_data;
state = ctdb_control_send(ctdb, destnode, 0,
CTDB_CONTROL_GET_PNN, 0, tdb_null,
return NULL;
}
- if (callback != NULL) {
- cb_data = talloc(state, struct ctdb_control_cb_data);
- cb_data->callback = callback;
- cb_data->private_data = private_data;
-
- state->async.fn = ctdb_getpnn_recv_cb;
- state->async.private_data = cb_data;
- }
-
return (ctdb_handle *)state;
}
{
struct ctdb_client_control_state *state = talloc_get_type(handle, struct ctdb_client_control_state);
int ret;
- int32_t res;
- ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
+ ret = ctdb_control_recv(ctdb, state, state, NULL, NULL, NULL);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_getpnn_recv failed\n"));
return -1;
}
if (pnn != NULL) {
- *pnn = (uint32_t)res;
+ *pnn = (uint32_t)state->status;
}
return 0;
{
struct ctdb_client_control_state *state;
- state = ctdb_getpnn_send(ctdb, destnode, NULL, NULL);
+ state = ctdb_getpnn_send(ctdb, destnode);
if (state == NULL) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_getpnn_send() failed.\n"));
return -1;
}
-void pnn_cb(int32_t status, int32_t pnn, void *private_data)
+void pnn_cb(int32_t status, struct ctdb_context *ctdb, ctdb_handle *handle, void *private_data)
{
+ uint32_t pnn;
+ int ret;
+
+ if (status != 0) {
+ printf("Error reading PNN\n");
+ return;
+ }
+
+ ret = ctdb_getpnn_recv(ctdb, handle, &pnn);
+ if (ret != 0) {
+ printf("Failed to read getpnn reply\n");
+ return;
+ }
+
printf("status:%d pnn:%d\n", status, pnn);
}
int main(int argc, char *argv[])
{
- struct ctdb_context *ctdb_context;
+ struct ctdb_context *ctdb;
struct ctdb_db_context *ctdb_db_context;
ctdb_handle *handle;
struct pollfd pfd;
key.dptr = "Test Record";
key.dsize = strlen(key.dptr);
- ctdb_context = ctdb_connect("/tmp/ctdb.socket");
+ ctdb = ctdb_connect("/tmp/ctdb.socket");
- handle = ctdb_set_message_handler_send(ctdb_context, 55, NULL, msg_h, NULL);
+ handle = ctdb_set_message_handler_send(ctdb, 55, NULL, msg_h, NULL);
if (handle == NULL) {
printf("Failed to register message port\n");
exit(10);
}
- ret = ctdb_set_message_handler_recv(ctdb_context, handle);
+ ret = ctdb_set_message_handler_recv(ctdb, handle);
if (ret != 0) {
printf("Failed to receive set_message_handler reply\n");
exit(10);
msg.dptr="HelloWorld";
msg.dsize = strlen(msg.dptr);
- ret = ctdb_send_message(ctdb_context, 0, 55, msg);
+ ret = ctdb_send_message(ctdb, 0, 55, msg);
if (ret != 0) {
printf("Failed to send message. Aborting\n");
exit(10);
}
- handle = ctdb_attachdb_send(ctdb_context, "test_test.tdb", 0, 0, NULL, NULL);
+ handle = ctdb_attachdb_send(ctdb, "test_test.tdb", 0, 0, NULL, NULL);
if (handle == NULL) {
printf("Failed to send attachdb control\n");
exit(10);
}
- ret = ctdb_attachdb_recv(ctdb_context, handle, &ctdb_db_context);
+ ret = ctdb_attachdb_recv(ctdb, handle, &ctdb_db_context);
if (ret != 0 ) {
printf("Failed to attach to database\n");
exit(10);
}
- handle = ctdb_getpnn_send(ctdb_context, CTDB_CURRENT_NODE, pnn_cb, NULL);
+
+
+ handle = ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE);
if (handle == NULL) {
printf("Failed to send get_pnn control\n");
exit(10);
}
+ ret = ctdb_set_callback(handle, pnn_cb, NULL);
+ if (ret != 0) {
+ printf("Failed to set callback for getpnn\n");
+ ctdb_free(handle);
+ exit(10);
+ }
+
+
+
- handle = ctdb_readrecordlock_send(ctdb_context, ctdb_db_context, key, rrl_cb, NULL);
+ handle = ctdb_readrecordlock_send(ctdb, ctdb_db_context, key, rrl_cb, NULL);
if (handle == NULL) {
printf("Failed to send READRECORDLOCK\n");
exit(10);
}
- pfd.fd = ctdb_get_fd(ctdb_context);
+ pfd.fd = ctdb_get_fd(ctdb);
for (;;) {
- pfd.events = ctdb_which_events(ctdb_context);
+ pfd.events = ctdb_which_events(ctdb);
if (poll(&pfd, 1, -1) < 0) {
printf("Poll failed");
exit(10);
}
- ctdb_service(ctdb_context);
+ ctdb_service(ctdb);
}
return 0;