libctdb: uniform callbacks, _recv functions to pull out data.
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 24 May 2010 03:47:36 +0000 (13:17 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Mon, 24 May 2010 03:47:36 +0000 (13:17 +0930)
This is a bit tricky for those cases where we need to do multiple or
zero I/Os (eg. attachdb and readrecordlock), but works well for the
simple cases.

include/ccan/typesafe_cb.h [new file with mode: 0644]
include/ctdb.h
libctdb/control.c
libctdb/ctdb.c
libctdb/io_elem.c
libctdb/io_elem.h
libctdb/libctdb_private.h
libctdb/messages.c
libctdb/sync.c
libctdb/tst.c

diff --git a/include/ccan/typesafe_cb.h b/include/ccan/typesafe_cb.h
new file mode 100644 (file)
index 0000000..b1f2c5f
--- /dev/null
@@ -0,0 +1,177 @@
+#ifndef CCAN_CAST_IF_TYPE_H
+#define CCAN_CAST_IF_TYPE_H
+
+#if (__GNUC__ >= 3)
+#define HAVE_TYPEOF 1
+#define HAVE_BUILTIN_CHOOSE_EXPR 1
+#define HAVE_BUILTIN_TYPES_COMPATIBLE_P 1
+#endif
+
+#if HAVE_TYPEOF && HAVE_BUILTIN_CHOOSE_EXPR && HAVE_BUILTIN_TYPES_COMPATIBLE_P
+/**
+ * cast_if_type - only cast an expression if test matches a given type
+ * @desttype: the type to cast to
+ * @expr: the expression to cast
+ * @test: the expression to test
+ * @oktype: the type we allow
+ *
+ * This macro is used to create functions which allow multiple types.
+ * The result of this macro is used somewhere that a @desttype type is
+ * expected: if @expr was of type @oktype, it will be cast to
+ * @desttype type.  As a result, if @expr is any type other than
+ * @oktype or @desttype, a compiler warning will be issued.
+ *
+ * This macro can be used in static initializers.
+ *
+ * This is merely useful for warnings: if the compiler does not
+ * support the primitives required for cast_if_type(), it becomes an
+ * unconditional cast, and the @test and @oktype argument is not used.  In
+ * particular, this means that @oktype can be a type which uses
+ * the "typeof": it will not be evaluated if typeof is not supported.
+ *
+ * Example:
+ *     // We can take either an unsigned long or a void *.
+ *     void _set_some_value(void *val);
+ *     #define set_some_value(e)                       \
+ *             _set_some_value(cast_if_type(void *, (e), (e), unsigned long))
+ */
+#define cast_if_type(desttype, expr, test, oktype)                     \
+__builtin_choose_expr(__builtin_types_compatible_p(typeof(1?(test):0), oktype), \
+                       (desttype)(expr), (expr))
+#else
+#define cast_if_type(desttype, expr, test, oktype) ((desttype)(expr))
+#endif
+
+/**
+ * cast_if_any - only cast an expression if it is one of the three given types
+ * @desttype: the type to cast to
+ * @expr: the expression to cast
+ * @test: the expression to test
+ * @ok1: the first type we allow
+ * @ok2: the second type we allow
+ * @ok3: the third type we allow
+ *
+ * This is a convenient wrapper for multiple cast_if_type() calls.  You can
+ * chain them inside each other (ie. use cast_if_any() for expr) if you need
+ * more than 3 arguments.
+ *
+ * Example:
+ *     // We can take either a long, unsigned long, void * or a const void *.
+ *     void _set_some_value(void *val);
+ *     #define set_some_value(expr)                                    \
+ *             _set_some_value(cast_if_any(void *, (expr), (expr),     \
+ *                                         long, unsigned long, const void *))
+ */
+#define cast_if_any(desttype, expr, test, ok1, ok2, ok3)               \
+       cast_if_type(desttype,                                          \
+                    cast_if_type(desttype,                             \
+                                 cast_if_type(desttype, (expr), (test), ok1), \
+                                 ok2),                                 \
+                    ok3)
+
+/**
+ * typesafe_cb - cast a callback function if it matches the arg
+ * @rtype: the return type of the callback function
+ * @fn: the callback function to cast
+ * @arg: the (pointer) argument to hand to the callback function.
+ *
+ * If a callback function takes a single argument, this macro does
+ * appropriate casts to a function which takes a single void * argument if the
+ * callback provided matches the @arg (or a const or volatile version).
+ *
+ * It is assumed that @arg is of pointer type: usually @arg is passed
+ * or assigned to a void * elsewhere anyway.
+ *
+ * Example:
+ *     void _register_callback(void (*fn)(void *arg), void *arg);
+ *     #define register_callback(fn, arg) \
+ *             _register_callback(typesafe_cb(void, (fn), (arg)), (arg))
+ */
+#define typesafe_cb(rtype, fn, arg)                    \
+       cast_if_type(rtype (*)(void *), (fn), (fn)(arg), rtype)
+
+/**
+ * typesafe_cb_const - cast a const callback function if it matches the arg
+ * @rtype: the return type of the callback function
+ * @fn: the callback function to cast
+ * @arg: the (pointer) argument to hand to the callback function.
+ *
+ * If a callback function takes a single argument, this macro does appropriate
+ * casts to a function which takes a single const void * argument if the
+ * callback provided matches the @arg.
+ *
+ * It is assumed that @arg is of pointer type: usually @arg is passed
+ * or assigned to a void * elsewhere anyway.
+ *
+ * Example:
+ *     void _register_callback(void (*fn)(const void *arg), const void *arg);
+ *     #define register_callback(fn, arg) \
+ *             _register_callback(typesafe_cb_const(void, (fn), (arg)), (arg))
+ */
+#define typesafe_cb_const(rtype, fn, arg)                              \
+       sizeof((fn)((const void *)0)),                                  \
+               cast_if_type(rtype (*)(const void *),                   \
+                            (fn), (fn)(arg), rtype (*)(typeof(arg)))
+
+/**
+ * typesafe_cb_preargs - cast a callback function if it matches the arg
+ * @rtype: the return type of the callback function
+ * @fn: the callback function to cast
+ * @arg: the (pointer) argument to hand to the callback function.
+ *
+ * This is a version of typesafe_cb() for callbacks that take other arguments
+ * before the @arg.
+ *
+ * Example:
+ *     void _register_callback(void (*fn)(int, void *arg), void *arg);
+ *     #define register_callback(fn, arg) \
+ *             _register_callback(typesafe_cb_preargs(void, (fn), (arg), int),\
+ *                                (arg))
+ */
+#define typesafe_cb_preargs(rtype, fn, arg, ...)                       \
+       cast_if_type(rtype (*)(__VA_ARGS__, void *), (fn), (fn),        \
+                    rtype (*)(__VA_ARGS__, typeof(arg)))
+/**
+ * typesafe_cb_postargs - cast a callback function if it matches the arg
+ * @rtype: the return type of the callback function
+ * @fn: the callback function to cast
+ * @arg: the (pointer) argument to hand to the callback function.
+ *
+ * This is a version of typesafe_cb() for callbacks that take other arguments
+ * after the @arg.
+ *
+ * Example:
+ *     void _register_callback(void (*fn)(void *arg, int), void *arg);
+ *     #define register_callback(fn, arg) \
+ *             _register_callback(typesafe_cb_postargs(void, (fn), (arg), int),\
+ *                                (arg))
+ */
+#define typesafe_cb_postargs(rtype, fn, arg, ...)                      \
+       cast_if_type(rtype (*)(void *, __VA_ARGS__), (fn), (fn),        \
+                    rtype (*)(typeof(arg), __VA_ARGS__))
+/**
+ * typesafe_cb_cmp - cast a compare function if it matches the arg
+ * @rtype: the return type of the callback function
+ * @fn: the callback function to cast
+ * @arg: the (pointer) argument(s) to hand to the compare function.
+ *
+ * If a callback function takes two matching-type arguments, this macro does
+ * appropriate casts to a function which takes two const void * arguments if
+ * the callback provided takes two a const pointers to @arg.
+ *
+ * It is assumed that @arg is of pointer type: usually @arg is passed
+ * or assigned to a void * elsewhere anyway.  Note also that the type
+ * arg points to must be defined.
+ *
+ * Example:
+ *     void _my_qsort(void *base, size_t nmemb, size_t size,
+ *                    int (*cmp)(const void *, const void *));
+ *     #define my_qsort(base, nmemb, cmpfn) \
+ *             _my_qsort((base), (nmemb), sizeof(*(base)), \
+ *                       typesafe_cb_cmp(int, (cmpfn), (base)), (arg))
+ */
+#define typesafe_cb_cmp(rtype, cmpfn, arg)                             \
+       cast_if_type(rtype (*)(const void *, const void *), (cmpfn),    \
+                    rtype (*)(const typeof(*arg)*, const typeof(*arg)*))
+                    
+#endif /* CCAN_CAST_IF_TYPE_H */
index 8ec5bfa799e5bd4b3128d92d64af01f6e907f4f9..9aabc4e25fd7a29445565e289fe2d525f4fa4948 100644 (file)
@@ -41,6 +41,19 @@ int ctdb_which_events(struct ctdb_connection *ctdb);
 
 int ctdb_service(struct ctdb_connection *ctdb, int revents);
 
+struct ctdb_request;
+
+void ctdb_request_free(struct ctdb_request *req);
+
+/*
+ * Callback for completed requests: it would normally unpack the request
+ * using ctdb_*_recv().  You must free the request using ctdb_request_free().
+ *
+ * Note that due to macro magic, your callback doesn't have to take void *,
+ * it can take a type which matches the actual private parameter.
+ */
+typedef void (*ctdb_callback_t)(struct ctdb_connection *ctdb,
+                               struct ctdb_request *req, void *private);
 
 /*
  * Special node addresses :
@@ -55,8 +68,6 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents);
 #define CTDB_BROADCAST_CONNECTED 0xF0000004
 
 
-struct ctdb_request;
-
 /*
  * functions to attach to a database
  * if the database does not exist it will be created.
@@ -65,13 +76,13 @@ struct ctdb_request;
  */
 struct ctdb_db;
 
-typedef void (*ctdb_attachdb_cb)(int status, struct ctdb_db *ctdb_db, void *private_data);
-
 struct ctdb_request *
 ctdb_attachdb_send(struct ctdb_connection *ctdb,
                   const char *name, int persistent, uint32_t tdb_flags,
-                  ctdb_attachdb_cb callback,
-                  void *private_data);
+                  ctdb_callback_t callback, void *private_data);
+
+struct ctdb_db *ctdb_attachdb_recv(struct ctdb_request *req);
+
 struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb,
                              const char *name, int persistent,
                              uint32_t tdb_flags);
@@ -87,22 +98,16 @@ struct ctdb_lock;
  * When the lock is released, data is freed too, so make sure to copy the data
  * before that.
  */
-typedef void (*ctdb_readrecordlock_cb)(int status, struct ctdb_lock *lock, TDB_DATA data, void *private_data);
-
 struct ctdb_request *
-ctdb_readrecordlock_send(struct ctdb_db *ctdb_db,
-               TDB_DATA key,
-               ctdb_readrecordlock_cb callback,
-               void *private_data);
-int ctdb_readrecordlock_recv(struct ctdb_connection *ctdb,
-               struct ctdb_request *handle,
-               TDB_DATA **data);
-int ctdb_readrecordlock(struct ctdb_connection *ctdb,
-               struct ctdb_db *ctdb_db,
-               TDB_DATA key,
-               TDB_DATA **data);
-
+ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
+                        ctdb_callback_t callback, void *private_data);
+struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db,
+                                          struct ctdb_request *handle,
+                                          TDB_DATA *data);
 
+/* Returns null on failure. */
+struct ctdb_lock *ctdb_readrecordlock(struct ctdb_db *ctdb_db, TDB_DATA key,
+                                     TDB_DATA *data);
 
 /*
  * Function to write data to a record
@@ -124,15 +129,11 @@ void ctdb_release_lock(struct ctdb_lock *lock);
  */
 typedef void (*ctdb_message_fn_t)(struct ctdb_connection *, uint64_t srvid, TDB_DATA data, void *);
 
-/*
- * register a message handler and start listening on a service port
- */
-typedef void (*ctdb_set_message_handler_cb)(int status, void *private_data);
-
 struct ctdb_request *
 ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
-                             ctdb_set_message_handler_cb callback,
-                             ctdb_message_fn_t handler, void *private_data);
+                             ctdb_message_fn_t handler,
+                             ctdb_callback_t callback,
+                             void *private_data);
 
 int ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
                                  struct ctdb_request *handle);
@@ -145,15 +146,12 @@ int ctdb_set_message_handler(struct ctdb_connection *ctdb, uint64_t srvid,
 /*
  * unregister a message handler and stop listening on teh specified port
  */
-typedef void (*ctdb_remove_message_handler_cb)(int status, void *private_data);
-
 struct ctdb_request *
 ctdb_remove_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
-                                ctdb_remove_message_handler_cb callback,
+                                ctdb_callback_t callback,
                                 void *private_data);
 
-int ctdb_remove_message_handler_recv(struct ctdb_connection *ctdb,
-                                 struct ctdb_request *handle);
+int ctdb_remove_message_handler_recv(struct ctdb_request *handle);
 
 int ctdb_remove_message_handler(struct ctdb_connection *ctdb, uint64_t srvid);
 
@@ -170,13 +168,13 @@ int ctdb_send_message(struct ctdb_connection *ctdb, uint32_t pnn, uint64_t srvid
 /*
  * functions to read the pnn number of the local node
  */
-typedef void (*ctdb_getpnn_cb)(int status, uint32_t pnn, void *private_data);
-
 struct ctdb_request *
 ctdb_getpnn_send(struct ctdb_connection *ctdb,
                 uint32_t destnode,
-                ctdb_getpnn_cb callback,
+                ctdb_callback_t callback,
                 void *private_data);
+int ctdb_getpnn_recv(struct ctdb_request *req, uint32_t *pnn);
+
 int ctdb_getpnn(struct ctdb_connection *ctdb,
                uint32_t destnode,
                uint32_t *pnn);
@@ -187,17 +185,13 @@ int ctdb_getpnn(struct ctdb_connection *ctdb,
 /*
  * functions to read the recovery master of a node
  */
-typedef void (*ctdb_getrecmaster_cb)(int status,
-                                    uint32_t recmaster, void *private_data);
-
 struct ctdb_request *
 ctdb_getrecmaster_send(struct ctdb_connection *ctdb,
                        uint32_t destnode,
-                       ctdb_getrecmaster_cb callback,
+                       ctdb_callback_t callback,
                        void *private_data);
-int ctdb_getrecmaster_recv(struct ctdb_connection *ctdb,
-                       struct ctdb_request *handle,
-                       uint32_t *recmaster);
+int ctdb_getrecmaster_recv(struct ctdb_request *handle,
+                          uint32_t *recmaster);
 int ctdb_getrecmaster(struct ctdb_connection *ctdb,
                        uint32_t destnode,
                        uint32_t *recmaster);
@@ -210,4 +204,34 @@ int ctdb_getrecmaster(struct ctdb_connection *ctdb,
  */
 int ctdb_cancel(struct ctdb_request *);
 
+
+/* These ugly macro wrappers make the callbacks typesafe. */
+#include <ccan/typesafe_cb.h>
+#define ctdb_sendcb(cb, cbdata)                                                \
+        typesafe_cb_preargs(void, (cb), (cbdata),                      \
+                            struct ctdb_connection *, struct ctdb_request *)
+
+#define ctdb_attachdb_send(ctdb, name, persistent, tdb_flags, cb, cbdata) \
+       ctdb_attachdb_send((ctdb), (name), (persistent), (tdb_flags),   \
+                          ctdb_sendcb((cb), (cbdata)), (cbdata))
+
+#define ctdb_readrecordlock_send(ctdb_db, key, cb, cbdata)             \
+       ctdb_readrecordlock_send((ctdb_db), (key),                      \
+                                ctdb_sendcb((cb), (cbdata)), (cbdata))
+
+#define ctdb_set_message_handler_send(ctdb, srvid, handler, cb, cbdata)        \
+       ctdb_set_message_handler_send((ctdb), (srvid), (handler),       \
+             ctdb_sendcb((cb), (cbdata)), (cbdata))
+
+#define ctdb_remove_message_handler_send(ctdb, srvid, cb, cbdata)      \
+       ctdb_remove_message_handler_send((ctdb), (srvid),               \
+             ctdb_sendcb((cb), (cbdata)), (cbdata))
+
+#define ctdb_getpnn_send(ctdb, destnode, cb, cbdata)                   \
+       ctdb_getpnn_send((ctdb), (destnode),                            \
+                        ctdb_sendcb((cb), (cbdata)), (cbdata))
+
+#define ctdb_getrecmaster_send(ctdb, destnode, cb, cbdata)             \
+       ctdb_getrecmaster_send((ctdb), (destnode),                      \
+                              ctdb_sendcb((cb), (cbdata)), (cbdata))
 #endif
index 84d703e81e9ab3aeb763d347358626f0a7f1f053..928729050323d1dea66471c134cdc88a6d4d3d7f 100644 (file)
 #include <ctdb_protocol.h>
 #include "libctdb_private.h"
 
+/* Remove type-safety macros. */
+#undef ctdb_getrecmaster_send
+#undef ctdb_getpnn_send
+
+int ctdb_getrecmaster_recv(struct ctdb_request *req, uint32_t *recmaster)
+{
+       struct ctdb_reply_control *reply;
+
+       reply = unpack_reply_control(req, CTDB_CONTROL_GET_RECMASTER);
+       if (!reply || reply->status == -1)
+               return -1;
+       *recmaster = reply->status;
+       return 0;
+}
+
 struct ctdb_request *ctdb_getrecmaster_send(struct ctdb_connection *ctdb,
-                                   uint32_t destnode,
-                                   ctdb_getrecmaster_cb callback,
-                                   void *private_data)
+                                           uint32_t destnode,
+                                           ctdb_callback_t callback,
+                                           void *private_data)
 {
-       struct ctdb_request *req;
-
-       req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER,
-                                      destnode, NULL, 0);
-       if (!req)
-               return NULL;
-       req->callback.getrecmaster = callback;
-       req->priv_data = private_data;
-       return req;
+       return new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER,
+                                       destnode, NULL, 0,
+                                       callback, private_data);
+}
+
+int ctdb_getpnn_recv(struct ctdb_request *req, uint32_t *pnn)
+{
+       struct ctdb_reply_control *reply;
+
+       reply = unpack_reply_control(req, CTDB_CONTROL_GET_PNN);
+       if (!reply || reply->status == -1)
+               return -1;
+       *pnn = reply->status;
+       return 0;
 }
 
-struct ctdb_request *
-ctdb_getpnn_send(struct ctdb_connection *ctdb,
-                uint32_t destnode,
-                ctdb_getpnn_cb callback,
-                void *private_data)
+struct ctdb_request *ctdb_getpnn_send(struct ctdb_connection *ctdb,
+                                     uint32_t destnode,
+                                     ctdb_callback_t callback,
+                                     void *private_data)
 {
-       struct ctdb_request *req;
-
-       req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode,
-                                      NULL, 0);
-       if (!req) {
-               return NULL;
-       }
-       req->callback.getpnn = callback;
-       req->priv_data = private_data;
-       return req;
+       return new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode,
+                                       NULL, 0, callback, private_data);
 }
index 85875ac40d5c7c27f63ba2ef869f2519dcb93031..5461d3c132d21323ac38ed29377987f66fb950b2 100644 (file)
 #include <dlinklist.h>
 #include <ctdb_protocol.h>
 
+/* Remove type-safety macros. */
+#undef ctdb_attachdb_send
+#undef ctdb_readrecordlock_send
+
 /* FIXME: Could be in shared util code with rest of ctdb */
 static void close_noerr(int fd)
 {
@@ -63,13 +67,13 @@ static void set_close_on_exec(int fd)
         fcntl(fd, F_SETFD, v | FD_CLOEXEC);
 }
 
-static void set_pnn(int32_t status, uint32_t pnn, void *private_data)
+static void set_pnn(struct ctdb_connection *ctdb,
+                   struct ctdb_request *req,
+                   void *unused)
 {
-       if (status != 0) {
+       if (ctdb_getpnn_recv(req, &ctdb->pnn) != 0) {
                /* FIXME: Report error. */
-               ((struct ctdb_connection *)private_data)->broken = true;
-       } else {
-               ((struct ctdb_connection *)private_data)->pnn = pnn;
+               ctdb->broken = true;
        }
 }
 
@@ -103,7 +107,7 @@ struct ctdb_connection *ctdb_connect(const char *addr)
                goto close_fail;
 
        /* Immediately queue a request to get our pnn. */
-       if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, ctdb))
+       if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, NULL))
                goto close_fail;
 
        return ctdb;
@@ -130,7 +134,8 @@ int ctdb_which_events(struct ctdb_connection *ctdb)
        return events;
 }
 
-struct ctdb_request *new_ctdb_request(size_t len)
+struct ctdb_request *new_ctdb_request(size_t len,
+                                     ctdb_callback_t cb, void *cbdata)
 {
        struct ctdb_request *req = malloc(sizeof(*req));
        if (!req)
@@ -141,105 +146,104 @@ struct ctdb_request *new_ctdb_request(size_t len)
                return NULL;
        }
        req->hdr.hdr = io_elem_data(req->io, NULL);
-       req->cancelled = false;
+       req->reply = NULL;
+       req->callback = cb;
+       req->priv_data = cbdata;
+       req->extra = NULL;
+       req->extra_destructor = NULL;
        return req;
 }
 
-static struct ctdb_request *new_immediate_request(void)
+void ctdb_request_free(struct ctdb_request *req)
 {
-       struct ctdb_request *req = malloc(sizeof(*req));
-       if (!req)
-               return NULL;
-       req->cancelled = false;
-       req->io = NULL;
-       req->hdr.hdr = NULL;
-       return req;
-}
-
-static void free_ctdb_request(struct ctdb_request *req)
-{
-       /* immediate requests don't have IO */
-       if (req->io) {
-               free_io_elem(req->io);
+       if (req->extra_destructor) {
+               req->extra_destructor(req);
+       }
+       if (req->reply) {
+               free_io_elem(req->reply);
        }
+       free_io_elem(req->io);
        free(req);
 }
 
-static void handle_call_reply(struct ctdb_connection *ctdb,
-                             struct ctdb_req_header *hdr,
-                             struct ctdb_request *i)
+/* Sanity-checking wrapper for reply.
+ * FIXME: logging! */
+static struct ctdb_reply_call *unpack_reply_call(struct ctdb_request *req,
+                                                uint32_t callid)
 {
-       struct ctdb_req_call *call = i->hdr.call;
-       struct ctdb_reply_call *reply = (struct ctdb_reply_call *)hdr;
+       size_t len;
+       struct ctdb_reply_call *inhdr = io_elem_data(req->reply, &len);
 
-       switch (call->callid) {
-       case CTDB_NULL_FUNC:
-               /* FIXME: We should let it steal the request, rathe than copy */
-               i->callback.nullfunc(reply->status, reply, i->priv_data);
-               break;
+       /* ctdbd or our error if this isn't a reply call. */
+       if (len < sizeof(*inhdr) || inhdr->hdr.operation != CTDB_REPLY_CALL) {
+               errno = EIO;
+               return NULL;
        }
+
+       /* Library user error if this isn't a reply to a call. */
+       if (req->hdr.hdr->operation != CTDB_REQ_CALL
+           || req->hdr.call->callid != callid) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       return inhdr;
 }
 
-static void handle_control_reply(struct ctdb_connection *ctdb,
-                                struct ctdb_req_header *hdr,
-                                struct ctdb_request *i)
+/* Sanity-checking wrapper for reply.
+ * FIXME: logging! */
+struct ctdb_reply_control *unpack_reply_control(struct ctdb_request *req,
+                                               enum ctdb_controls control)
 {
-       struct ctdb_req_control *control = i->hdr.control;
-       struct ctdb_reply_control *reply = (struct ctdb_reply_control *)hdr;
+       size_t len;
+       struct ctdb_reply_control *inhdr = io_elem_data(req->reply, &len);
+
+       /* Library user error if this isn't a reply to a call. */
+       if (len < sizeof(*inhdr)
+           || req->hdr.hdr->operation != CTDB_REQ_CONTROL) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       /* ... or if it was a different control from what we expected. */
+       if (req->hdr.control->opcode != control) {
+               errno = EINVAL;
+               return NULL;
+       }
 
-       switch (control->opcode) {
-       case CTDB_CONTROL_GET_RECMASTER:
-               i->callback.getrecmaster(0, reply->status, i->priv_data);
-               break;
-       case CTDB_CONTROL_GET_PNN:
-               i->callback.getpnn(0, reply->status, i->priv_data);
-               break;
-       case CTDB_CONTROL_REGISTER_SRVID:
-               i->callback.register_srvid(reply->status, i->priv_data);
-               break;
-       case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
-       case CTDB_CONTROL_DB_ATTACH:
-               i->callback.attachdb(reply->status, *(uint32_t *)reply->data,
-                                    i->priv_data);
-               break;
-       case CTDB_CONTROL_GETDBPATH:
-               i->callback.getdbpath(reply->status, (char *)reply->data,
-                                     i->priv_data);
-               break;
+       /* ctdbd or our error if this isn't a reply call. */
+       if (inhdr->hdr.operation != CTDB_REPLY_CONTROL) {
+               errno = EIO;
+               return NULL;
        }
+
+       return inhdr;
 }
 
-static void handle_incoming(struct ctdb_connection *ctdb,
-                           struct ctdb_req_header *hdr,
-                           size_t len /* FIXME: use len to check packet! */)
+static void handle_incoming(struct ctdb_connection *ctdb, struct io_elem *in)
 {
+       struct ctdb_req_header *hdr;
+       size_t len;
        struct ctdb_request *i;
 
+       hdr = io_elem_data(in, &len);
+       /* FIXME: use len to check packet! */
+
        if (hdr->operation == CTDB_REQ_MESSAGE) {
                deliver_message(ctdb, hdr);
                return;
        }
 
-       if (hdr->operation != CTDB_REPLY_CALL
-           && hdr->operation != CTDB_REPLY_CONTROL) {
-               /* FIXME: report this error. */
-               return;
-       }
-
        for (i = ctdb->doneq; i; i = i->next) {
                if (i->hdr.hdr->reqid == hdr->reqid) {
-                       if (!i->cancelled) {
-                               if (hdr->operation == CTDB_REPLY_CALL)
-                                       handle_call_reply(ctdb, hdr, i);
-                               else
-                                       handle_control_reply(ctdb, hdr, i);
-                       }
                        DLIST_REMOVE(ctdb->doneq, i);
-                       free_ctdb_request(i);
+                       i->reply = in;
+                       i->callback(ctdb, i, i->priv_data);
                        return;
                }
        }
        /* FIXME: report this error. */
+       free_io_elem(in);
 }
 
 /* Remove "harmless" errors. */
@@ -266,12 +270,8 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                        if (io_elem_finished(ctdb->outq->io)) {
                                struct ctdb_request *done = ctdb->outq;
                                DLIST_REMOVE(ctdb->outq, done);
-                               if (done->cancelled) {
-                                       free_ctdb_request(done);
-                               } else {
-                                       DLIST_ADD_END(ctdb->doneq, done,
-                                                     struct ctdb_request);
-                               }
+                               DLIST_ADD_END(ctdb->doneq, done,
+                                             struct ctdb_request);
                        }
                }
        }
@@ -298,22 +298,15 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                        /* No progress, stop loop. */
                        revents = 0;
                } else if (io_elem_finished(ctdb->in)) {
-                       struct ctdb_req_header *hdr;
-                       size_t len;
-
-                       hdr = io_elem_data(ctdb->in, &len);
-                       handle_incoming(ctdb, hdr, len);
-                       free_io_elem(ctdb->in);
+                       handle_incoming(ctdb, ctdb->in);
                        ctdb->in = NULL;
                }
        }
 
        while (ctdb->immediateq) {
                struct ctdb_request *imm = ctdb->immediateq;
-               /* This has to handle fake->cancelled internally. */
-               imm->callback.immediate(imm, imm->priv_data);
+               imm->callback(ctdb, imm, imm->priv_data);
                DLIST_REMOVE(ctdb->immediateq, imm);
-               free_ctdb_request(imm);
        }
 
        return 0;
@@ -349,12 +342,14 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
                                              uint32_t opcode,
                                              uint32_t destnode,
                                              const void *extra_data,
-                                             size_t extra)
+                                             size_t extra,
+                                             ctdb_callback_t callback,
+                                             void *cbdata)
 {
        struct ctdb_request *req;
        struct ctdb_req_control *pkt;
 
-       req = new_ctdb_request(sizeof(*pkt) + extra);
+       req = new_ctdb_request(sizeof(*pkt) + extra, callback, cbdata);
        if (!req)
                return NULL;
 
@@ -372,10 +367,17 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
        return req;
 }
 
+void ctdb_cancel_callback(struct ctdb_connection *ctdb,
+                         struct ctdb_request *req,
+                         void *unused)
+{
+       ctdb_request_free(req);
+}
+
 int ctdb_cancel(struct ctdb_request *req)
 {
        /* FIXME: If it's not sent, we could just free it right now. */
-       req->cancelled = true;
+       req->callback = ctdb_cancel_callback;
        return 0;
 }
 
@@ -386,71 +388,107 @@ struct ctdb_db {
        uint32_t id;
        struct tdb_context *tdb;
 
-       ctdb_attachdb_cb callback;
+       ctdb_callback_t callback;
        void *private_data;
 };
 
-static void attachdb_getdbpath_done(int status, const char *path,
+static void attachdb_getdbpath_done(struct ctdb_connection *ctdb,
+                                   struct ctdb_request *req,
                                    void *_db)
 {
        struct ctdb_db *db = _db;
+
+       /* Do callback on original request. */
+       db->callback(ctdb, req->extra, db->private_data);
+}
+
+struct ctdb_db *ctdb_attachdb_recv(struct ctdb_request *req)
+{
+       struct ctdb_request *dbpath_req = req->extra;
+       struct ctdb_reply_control *reply;
+       struct ctdb_db *db = req->priv_data;
        uint32_t tdb_flags = db->tdb_flags;
 
-       if (status != 0) {
-               db->callback(status, NULL, db->private_data);
-               free(db);
-               return;
+       /* Never sent the dbpath request?  We've failed. */
+       if (!dbpath_req) {
+               /* FIXME: Save errno? */
+               errno = EINVAL;
+               return NULL;
+       }
+
+       reply = unpack_reply_control(dbpath_req, CTDB_CONTROL_GETDBPATH);
+       if (!reply || reply->status != 0) {
+               return NULL;
        }
 
        tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC;
        tdb_flags |= TDB_DISALLOW_NESTING;
 
-       db->tdb = tdb_open(path, 0, tdb_flags, O_RDWR, 0);
+       db->tdb = tdb_open((char *)reply->data, 0, tdb_flags, O_RDWR, 0);
        if (db->tdb == NULL) {
-               db->callback(-1, NULL, db->private_data);
-               free(db);
-               return;
+               return NULL;
        }
 
-       /* Finally, we tell the client that we opened the db. */
-       db->callback(status, db, db->private_data);
+       /* Finally, separate the db from the request (see destroy_req_db). */
+       req->priv_data = NULL;
+       return db;
 }
 
-static void attachdb_done(int status, uint32_t id, struct ctdb_db *db)
+static void attachdb_done(struct ctdb_connection *ctdb,
+                         struct ctdb_request *req,
+                         void *_db)
 {
-       struct ctdb_request *req;
+       struct ctdb_db *db = _db;
+       struct ctdb_request *req2;
+       struct ctdb_reply_control *reply;
+       enum ctdb_controls control = CTDB_CONTROL_DB_ATTACH;
 
-       if (status != 0) {
-               db->callback(status, NULL, db->private_data);
-               free(db);
+       if (db->persistent) {
+               control = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+       }
+
+       reply = unpack_reply_control(req, control);
+       if (!reply || reply->status != 0) {
+               /* We failed.  Hand request to user and have them discover it
+                * via ctdb_attachdb_recv. */
+               db->callback(ctdb, req, db);
                return;
        }
-       db->id = id;
+       db->id = *(uint32_t *)reply->data;
 
        /* Now we do another call, to get the dbpath. */
-       req = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
-                                      CTDB_CURRENT_NODE, &id, sizeof(id));
-       if (!req) {
-               db->callback(-1, NULL, db->private_data);
-               free(db);
+       req2 = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
+                                       CTDB_CURRENT_NODE,
+                                       &db->id, sizeof(db->id),
+                                       attachdb_getdbpath_done, db);
+       if (!req2) {
+               db->callback(ctdb, req, db);
                return;
        }
-       req->callback.getdbpath = attachdb_getdbpath_done;
-       req->priv_data = db;
+       req->extra = req2;
+       req2->extra = req;
+}
+
+static void destroy_req_db(struct ctdb_request *req)
+{
+       /* Incomplete db is in priv_data. */
+       free(req->priv_data);
+       /* second request is chained off this one. */
+       if (req->extra) {
+               ctdb_request_free(req->extra);
+       }
 }
 
 struct ctdb_request *
 ctdb_attachdb_send(struct ctdb_connection *ctdb,
                   const char *name, int persistent, uint32_t tdb_flags,
-                  ctdb_attachdb_cb callback,
-                  void *private_data)
+                  ctdb_callback_t callback, void *private_data)
 {
        struct ctdb_request *req;
        struct ctdb_db *db;
        uint32_t opcode;
 
        /* FIXME: Search if db already open. */
-
        db = malloc(sizeof(*db));
        if (!db) {
                return NULL;
@@ -463,7 +501,7 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
        }
 
        req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name,
-                                      strlen(name) + 1);
+                                      strlen(name) + 1, attachdb_done, db);
        if (!req) {
                free(db);
                return NULL;
@@ -475,8 +513,9 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
        db->callback = callback;
        db->private_data = private_data;
 
-       req->callback.attachdb = attachdb_done;
-       req->priv_data = db;
+       req->extra_destructor = destroy_req_db;
+       /* This is set non-NULL when we succeed, see ctdb_attachdb_recv */
+       req->extra = NULL;
 
        /* Flags get overloaded into srvid. */
        req->hdr.control->srvid = tdb_flags;
@@ -486,12 +525,14 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
 struct ctdb_lock {
        struct ctdb_db *ctdb_db;
        TDB_DATA key;
+
+       /* This will always be true by the time user sees this. */
+       bool held;
        struct ctdb_ltdb_header *hdr;
        TDB_DATA data;
-       bool held;
-       /* For convenience, we stash this here. */
-       ctdb_readrecordlock_cb callback;
-       void *private_data;
+
+       /* For convenience, we stash original callback here. */
+       ctdb_callback_t callback;
 };
 
 void ctdb_release_lock(struct ctdb_lock *lock)
@@ -499,136 +540,131 @@ void ctdb_release_lock(struct ctdb_lock *lock)
        if (lock->held) {
                tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
        }
-       free(lock->key.dptr);
-       free(lock->hdr); /* Also frees lock->data */
+       free(lock->hdr); /* Also frees data */
        free(lock);
 }
 
 /* We keep the lock if local node is the dmaster. */
 static bool try_readrecordlock(struct ctdb_lock *lock)
 {
+       struct ctdb_ltdb_header *hdr;
+
        if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) {
-               return false;
+               return NULL;
        }
 
-       lock->hdr = ctdb_local_fetch(lock->ctdb_db->tdb,
-                                    lock->key, &lock->data);
-       if (lock->hdr && lock->hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
+       hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, &lock->data);
+       if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
                lock->held = true;
+               lock->hdr = hdr;
                return true;
        }
 
        tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
-       free(lock->hdr);
-       return false;
+       free(hdr);
+       return NULL;
 }
 
-static void readrecordlock_done(int, struct ctdb_reply_call *, void *);
+/* If they cancel *before* we hand them the lock from
+ * ctdb_readrecordlock_recv, we free it here. */
+static void destroy_lock(struct ctdb_request *req)
+{
+       ctdb_release_lock(req->extra);
+}
 
-static struct ctdb_request *new_readrecordlock_request(struct ctdb_lock *lock)
+struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db,
+                                          struct ctdb_request *req,
+                                          TDB_DATA *data)
 {
-       struct ctdb_request *req;
-       struct ctdb_req_call *pkt;
+       struct ctdb_lock *lock = req->extra;
 
-       req = new_ctdb_request(sizeof(*pkt) + lock->key.dsize);
-       if (!req)
+       if (!lock->held) {
+               /* Something went wrong. */
                return NULL;
-       req->callback.nullfunc = readrecordlock_done;
-       req->priv_data = lock;
+       }
 
-       io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
-                               new_reqid(lock->ctdb_db->ctdb));
-
-       pkt = req->hdr.call;
-       pkt->flags = CTDB_IMMEDIATE_MIGRATION;
-       pkt->db_id = lock->ctdb_db->id;
-       pkt->callid = CTDB_NULL_FUNC;
-       pkt->hopcount = 0;
-       pkt->keylen = lock->key.dsize;
-       pkt->calldatalen = 0;
-       memcpy(pkt->data, lock->key.dptr, lock->key.dsize);
-       DLIST_ADD_END(lock->ctdb_db->ctdb->outq, req, struct ctdb_request);
-       return req;
+       /* Now it's their responsibility to free! */
+       req->extra_destructor = NULL;
+       *data = lock->data;
+       return lock;
 }
 
-/* OK, let's try again... */
-static void readrecordlock_done(int status, struct ctdb_reply_call *reply,
-                               void *_lock)
+static void readrecordlock_retry(struct ctdb_connection *ctdb,
+                                struct ctdb_request *req, void *private)
 {
-       struct ctdb_lock *lock = _lock;
+       struct ctdb_lock *lock = req->extra;
+       struct ctdb_reply_call *reply;
 
-       if (status != 0) {
-               lock->callback(status, NULL, tdb_null, lock->private_data);
-               ctdb_release_lock(lock);
+       /* OK, we've received reply to noop migration */
+       reply = unpack_reply_call(req, CTDB_NULL_FUNC);
+       if (!reply || reply->status != 0) {
+               lock->callback(ctdb, req, private);
                return;
        }
 
+       /* Can we get lock now? */
        if (try_readrecordlock(lock)) {
-               lock->callback(0, lock, lock->data, lock->private_data);
+               lock->callback(ctdb, req, private);
                return;
        }
 
-       if (!new_readrecordlock_request(lock)) {
-               lock->callback(-1, NULL, tdb_null, lock->private_data);
-               ctdb_release_lock(lock);
-       }
-}
-
-static void lock_complete(struct ctdb_request *req, void *_lock)
-{
-       struct ctdb_lock *lock = _lock;
-
-       if (!req->cancelled) {
-               lock->callback(0, lock, lock->data, lock->private_data);
-       } else {
-               ctdb_release_lock(lock);
-       }
+       /* Retransmit the same request again (we lost race). */
+       io_elem_reset(req->io);
+       DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+       return;
 }
 
 struct ctdb_request *
-ctdb_readrecordlock_send(struct ctdb_db *ctdb_db,
-                        TDB_DATA key,
-                        ctdb_readrecordlock_cb callback,
-                        void *private_data)
+ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
+                        ctdb_callback_t callback, void *cbdata)
 {
        struct ctdb_request *req;
        struct ctdb_lock *lock;
 
-       lock = malloc(sizeof(*lock));
-       if (!lock)
-               return NULL;
-       lock->key.dptr = malloc(key.dsize);
-       if (!lock->key.dptr) {
-               free_noerr(lock);
+       /* Setup lock. */
+       lock = malloc(sizeof(*lock) + key.dsize);
+       if (!lock) {
                return NULL;
        }
+       lock->key.dptr = (void *)(lock + 1);
        memcpy(lock->key.dptr, key.dptr, key.dsize);
        lock->key.dsize = key.dsize;
        lock->ctdb_db = ctdb_db;
-       lock->callback = callback;
-       lock->private_data = private_data;
        lock->hdr = NULL;
        lock->held = false;
 
+       /* Get ready in case we need to send a migrate request. */
+       req = new_ctdb_request(sizeof(*req->hdr.call)
+                              + key.dsize, callback, cbdata);
+       if (!req) {
+               ctdb_release_lock(lock);
+               return NULL;
+       }
+       req->extra = lock;
+       req->extra_destructor = destroy_lock;
+
        if (try_readrecordlock(lock)) {
-               /* We pretend to be async, so we just queue this. */
-               req = new_immediate_request();
-               if (!req) {
-                       ctdb_release_lock(lock);
-                       return NULL;
-               }
-               req->callback.immediate = lock_complete;
-               req->priv_data = lock;
-               DLIST_ADD_END(lock->ctdb_db->ctdb->immediateq,
+               /* Already got it: prepare for immediate callback. */
+               DLIST_ADD_END(ctdb_db->ctdb->immediateq,
                              req, struct ctdb_request);
                return req;
        }
 
-       req = new_readrecordlock_request(lock);
-       if (!req) {
-               ctdb_release_lock(lock);
-               return NULL;
-       }
+       /* We store the original callback in the lock, and use our own. */
+       lock->callback = callback;
+       req->callback = readrecordlock_retry;
+
+       io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
+                               new_reqid(ctdb_db->ctdb));
+
+       req->hdr.call->flags = CTDB_IMMEDIATE_MIGRATION;
+       req->hdr.call->db_id = ctdb_db->id;
+       req->hdr.call->callid = CTDB_NULL_FUNC;
+       req->hdr.call->hopcount = 0;
+       req->hdr.call->keylen = key.dsize;
+       req->hdr.call->calldatalen = 0;
+       memcpy(req->hdr.call->data, key.dptr, key.dsize);
+       DLIST_ADD_END(ctdb_db->ctdb->outq, req, struct ctdb_request);
        return req;
 }
 
@@ -639,5 +675,6 @@ int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
                return -1;
        }
 
-       return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, data);
+       return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr,
+                               data);
 }
index 91e84cce5c24f743d8a971ff771514630fdab1ec..ada77780575d27bca7a85512ea674006d77810ac 100644 (file)
@@ -130,3 +130,8 @@ int write_io_elem(int fd, struct io_elem *io)
        io->off += ret;
        return ret;
 }
+
+void io_elem_reset(struct io_elem *io)
+{
+       io->off = 0;
+}
index 5c234fec1dee0d2775005d2fbcd67b62f75a646e..e774cdbd369a8d7cfd45db3e86674c3061a53414 100644 (file)
@@ -14,6 +14,9 @@ void free_io_elem(struct io_elem *io);
 /* If finished, this returns the request header, otherwise NULL. */
 bool io_elem_finished(const struct io_elem *io);
 
+/* Reset an io_elem to the start. */
+void io_elem_reset(struct io_elem *io);
+
 /* Access to raw data: if len is non-NULL it is filled in. */
 void *io_elem_data(const struct io_elem *io, size_t *len);
 
index 6e789cf9e9dcc638a3c03d6af2d56bf9231e6aa2..5727012747286117d8996401a1b0cadda33432ce 100644 (file)
@@ -5,12 +5,16 @@
 #include <stdint.h>
 #include <stdlib.h>
 #include <ctdb.h>
+#include <ctdb_protocol.h>
 
 struct message_handler_info;
 struct ctdb_reply_call;
 
 struct ctdb_request {
+       struct ctdb_connection *ctdb;
        struct ctdb_request *next, *prev;
+       bool cancelled;
+
        struct io_elem *io;
        union {
                struct ctdb_req_header *hdr;
@@ -18,17 +22,15 @@ struct ctdb_request {
                struct ctdb_req_control *control;
                struct ctdb_req_message *message;
        } hdr;
-       bool cancelled;
-       union {
-               ctdb_getrecmaster_cb getrecmaster;
-               ctdb_getpnn_cb getpnn;
-               void (*register_srvid)(int, struct message_handler_info *);
-               void (*attachdb)(int, uint32_t id, struct ctdb_db *);
-               void (*getdbpath)(int, const char *, void *);
-               void (*nullfunc)(int, struct ctdb_reply_call *, void *);
-               void (*immediate)(struct ctdb_request *, void *);
-       } callback;
+
+       struct io_elem *reply;
+
+       ctdb_callback_t callback;
        void *priv_data;
+
+       /* Extra per-request info. */
+       void (*extra_destructor)(struct ctdb_request *);
+       void *extra;
 };
 
 struct ctdb_connection {
@@ -53,11 +55,18 @@ struct ctdb_connection {
 };
 
 /* ctdb.c */
-struct ctdb_request *new_ctdb_request(size_t len);
+struct ctdb_request *new_ctdb_request(size_t len, ctdb_callback_t, void *);
 struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
                                              uint32_t opcode,
                                              uint32_t destnode,
                                              const void *extra_data,
-                                             size_t extra);
+                                             size_t extra,
+                                             ctdb_callback_t, void *);
 uint32_t new_reqid(struct ctdb_connection *ctdb);
+
+struct ctdb_reply_control *unpack_reply_control(struct ctdb_request *req,
+                                               enum ctdb_controls control);
+void ctdb_cancel_callback(struct ctdb_connection *ctdb,
+                         struct ctdb_request *req,
+                         void *unused);
 #endif /* _LIBCTDB_PRIVATE_H */
index 6ec35415d3499bcff74f5dd0ba34e0831821ac35..864ec3e7ea668f9adf80d80bb51a47d4a4dac94d 100644 (file)
@@ -7,15 +7,17 @@
 #include <stdlib.h>
 #include <string.h>
 
+/* Remove type-safety macros. */
+#undef ctdb_set_message_handler_send
+#undef ctdb_set_message_handler_recv
+#undef ctdb_remove_message_handler_send
+
 struct message_handler_info {
        struct message_handler_info *next, *prev;
-       /* Callback when we're first registered. */
-       ctdb_set_message_handler_cb callback;
 
        uint64_t srvid;
        ctdb_message_fn_t handler;
        void *private_data;
-       struct ctdb_connection *ctdb;
 };
 
 void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
@@ -35,50 +37,57 @@ void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
        /* FIXME: Report unknown messages */
 }
 
-static void set_message_handler(int status, struct message_handler_info *info)
+int ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
+                                 struct ctdb_request *req)
 {
-       /* If registration failed, tell callback and clean up */
-       if (status < 0) {
-               info->callback(status, info->private_data);
-               free(info);
-               return;
-       } else {
-               /* Put ourselves in list of handlers. */
-               DLIST_ADD_END(info->ctdb->message_handlers, info,
-                             struct message_handler_info);
-               /* Now call callback: it could remove us in theory. */
-               info->callback(status, info->private_data);
+       struct message_handler_info *info = req->extra;
+       struct ctdb_reply_control *reply;
+
+       reply = unpack_reply_control(req, CTDB_CONTROL_REGISTER_SRVID);
+       if (!reply || reply->status != 0) {
+               return -1;
        }
+
+       /* Put ourselves in list of handlers. */
+       DLIST_ADD_END(ctdb->message_handlers, info,
+                     struct message_handler_info);
+       /* Keep safe from destructor */
+       req->extra = NULL;
+       return 0;
+}
+
+static void free_info(struct ctdb_request *req)
+{
+       free(req->extra);
 }
 
 struct ctdb_request *
 ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
-                             ctdb_set_message_handler_cb callback,
-                             ctdb_message_fn_t handler, void *private_data)
+                             ctdb_message_fn_t handler,
+                             ctdb_callback_t callback, void *private_data)
 {
-       struct ctdb_request *req;
        struct message_handler_info *info;
+       struct ctdb_request *req;
 
        info = malloc(sizeof(*info));
        if (!info) {
                return NULL;
        }
+
        req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
-                                      CTDB_CURRENT_NODE, NULL, 0);
+                                      CTDB_CURRENT_NODE, NULL, 0,
+                                      callback, private_data);
        if (!req) {
                free(info);
                return NULL;
        }
+       req->extra = info;
+       req->extra_destructor = free_info;
        req->hdr.control->srvid = srvid;
 
        info->srvid = srvid;
        info->handler = handler;
-       info->callback = callback;
        info->private_data = private_data;
-       info->ctdb = ctdb;
-
-       req->callback.register_srvid = set_message_handler;
-       req->priv_data = info;
 
        return req;
 }
@@ -90,7 +99,9 @@ int ctdb_send_message(struct ctdb_connection *ctdb,
        struct ctdb_request *req;
        struct ctdb_req_message *pkt;
 
-       req = new_ctdb_request(sizeof(*pkt) + data.dsize);
+       /* We just discard it once it's finished: no reply. */
+       req = new_ctdb_request(sizeof(*pkt) + data.dsize,
+                              ctdb_cancel_callback, NULL);
        if (!req) {
                return -1;
        }
@@ -98,9 +109,6 @@ int ctdb_send_message(struct ctdb_connection *ctdb,
        io_elem_init_req_header(req->io,
                                CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
 
-       /* There's no reply to this, so we mark it cancelled immediately. */
-       req->cancelled = true;
-
        pkt = req->hdr.message;
        pkt->srvid = srvid;
        pkt->datalen = data.dsize;
index 8c4892de54665335a4a53793b56a2b217d240893..dfafa058da5a2e3c53ee9bd7bc39cc905c85ddb5 100644 (file)
 #include <errno.h>
 #include <stdlib.h>
 
-/* FIXME: Find a way to share more code here. */
-struct ctdb_getrecmaster {
-       bool done;
-       int status;
-       uint32_t *recmaster;
-};
-
-static bool ctdb_service_flush(struct ctdb_connection *ctdb)
+/* On failure, frees req and returns NULL. */
+static struct ctdb_request *wait_for(struct ctdb_connection *ctdb,
+                                    struct ctdb_request *req,
+                                    bool *done)
 {
        struct pollfd fds;
 
+       /* Pass through allocation failures. */
+       if (!req)
+               return NULL;
+
        fds.fd = ctdb_get_fd(ctdb);
-       fds.events = ctdb_which_events(ctdb);
-       if (poll(&fds, 1, -1) < 0) {
-               /* Signalled is OK, other error is bad. */
-               return errno == EINTR;
+       while (!*done) {
+               fds.events = ctdb_which_events(ctdb);
+               if (poll(&fds, 1, -1) < 0) {
+                       /* Signalled is OK, other error is bad. */
+                       if (errno == EINTR)
+                               continue;
+                       ctdb_request_free(req);
+                       return NULL;
+               }
+               if (ctdb_service(ctdb, fds.revents) < 0) {
+                       ctdb_request_free(req);
+                       return NULL;
+               }
        }
-       return ctdb_service(ctdb, fds.revents) >= 0;
+       return req;
 }
 
-static void getrecmaster_done(int status, uint32_t recmaster, void *priv_data)
+static void set(struct ctdb_connection *ctdb,
+               struct ctdb_request *req, bool *done)
 {
-       struct ctdb_getrecmaster *grm = priv_data;
-       *grm->recmaster = recmaster;
-       grm->status = status;
-       grm->done = true;
+       *done = true;
 }
 
 int ctdb_getrecmaster(struct ctdb_connection *ctdb,
                      uint32_t destnode, uint32_t *recmaster)
 {
        struct ctdb_request *req;
-       struct ctdb_getrecmaster grm;
-
-       grm.done = false;
-       grm.recmaster = recmaster;
-       req = ctdb_getrecmaster_send(ctdb, destnode, getrecmaster_done, &grm);
-       if (!req)
-               return -1;
-
-       while (!grm.done) {
-               if (!ctdb_service_flush(ctdb)) {
-                       ctdb_cancel(req);
-                       return -1;
-               }
+       bool done = false;
+       int ret = -1;
+
+       req = wait_for(ctdb,
+                      ctdb_getrecmaster_send(ctdb, destnode, set, &done),
+                      &done);
+       if (req != NULL) {
+               ret = ctdb_getrecmaster_recv(req, recmaster);
+               ctdb_request_free(req);
        }
-       return grm.status;
-}
-
-struct ctdb_attachdb {
-       bool done;
-       int status;
-       struct ctdb_db *ctdb_db;
-};
-
-static void attachdb_sync_done(int status,
-                              struct ctdb_db *ctdb_db, void *private_data)
-{
-       struct ctdb_attachdb *atb = private_data;
-       atb->ctdb_db = ctdb_db;
-       atb->status = status;
-       atb->done = true;
+       return ret;
 }
 
 struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb,
@@ -91,21 +79,16 @@ struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb,
                              uint32_t tdb_flags)
 {
        struct ctdb_request *req;
-       struct ctdb_attachdb atb;
-
-       atb.done = false;
-       req = ctdb_attachdb_send(ctdb, name, persistent, tdb_flags,
-                                attachdb_sync_done, &atb);
-       if (!req)
-               return NULL;
-
-       while (!atb.done) {
-               if (!ctdb_service_flush(ctdb)) {
-                       ctdb_cancel(req);
-                       return NULL;
-               }
+       bool done = false;
+       struct ctdb_db *ret = NULL;
+
+       req = wait_for(ctdb,
+                      ctdb_attachdb_send(ctdb, name, persistent, tdb_flags,
+                                         set, &done),
+                      &done);
+       if (req != NULL) {
+               ret = ctdb_attachdb_recv(req);
+               ctdb_request_free(req);
        }
-       if (atb.status != 0)
-               return NULL;
-       return atb.ctdb_db;
+       return ret;
 }
index 03ce572402f526adbf8c3181285386678a7c3718..2b0aa14a801b383364703ac5985ba51d2ec11160 100644 (file)
@@ -16,8 +16,13 @@ void msg_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *pr
        printf("Message received on port %d : %s\n", (int)srvid, data.dptr);
 }
 
-void pnn_cb(int32_t status, uint32_t pnn, void *private_data)
+static void pnn_cb(struct ctdb_connection *ctdb,
+                  struct ctdb_request *req, void *private)
 {
+       int status;
+       uint32_t pnn;
+
+       status = ctdb_getpnn_recv(req, &pnn);
        if (status != 0) {
                printf("Error reading PNN\n");
                return;
@@ -25,8 +30,13 @@ void pnn_cb(int32_t status, uint32_t pnn, void *private_data)
        printf("status:%d pnn:%d\n", status, pnn);
 }
 
-void rm_cb(int status, uint32_t rm, void *private_data)
+static void rm_cb(struct ctdb_connection *ctdb,
+                 struct ctdb_request *req, void *private)
 {
+       int status;
+       uint32_t rm;
+
+       status = ctdb_getrecmaster_recv(req, &rm);
        if (status != 0) {
                printf("Error reading RECMASTER\n");
                return;
@@ -35,7 +45,6 @@ void rm_cb(int status, uint32_t rm, void *private_data)
        printf("GETRECMASTER ASYNC: status:%d recmaster:%d\n", status, rm);
 }
 
-
 /*
  * example on how to first read(non-existing recortds are implicitely created
  * on demand) a record and change it in the callback.
@@ -43,13 +52,17 @@ void rm_cb(int status, uint32_t rm, void *private_data)
  *
  * Pure read, or pure write are just special cases of this cycle.
  */
-void rrl_cb(int32_t status, struct ctdb_lock *lock, TDB_DATA outdata, void *private_data)
+static void rrl_cb(struct ctdb_connection *ctdb,
+                 struct ctdb_request *req, void *private)
 {
+       struct ctdb_lock *lock;
+       TDB_DATA outdata;
        TDB_DATA data;
        char tmp[256];
 
-       if (status != 0) {
-               printf("rrl_cb returned error: status %d\n", status);
+       lock = ctdb_readrecordlock_recv(private, req, &outdata);
+       if (!lock) {
+               printf("rrl_cb returned error\n");
                return;
        }
 
@@ -72,9 +85,13 @@ void rrl_cb(int32_t status, struct ctdb_lock *lock, TDB_DATA outdata, void *priv
 }
 
 static bool registered = false;
-void message_handler_cb(int status, void *private_data)
+void message_handler_cb(struct ctdb_connection *ctdb,
+                       struct ctdb_request *req, void *private)
 {
-       printf("Message handler registered: %i\n", status);
+       if (ctdb_set_message_handler_recv(ctdb, req) != 0) {
+               err(1, "registering message");
+       }
+       printf("Message handler registered\n");
        registered = true;
 }
 
@@ -94,7 +111,8 @@ int main(int argc, char *argv[])
 
        pfd.fd = ctdb_get_fd(ctdb_connection);
 
-       handle = ctdb_set_message_handler_send(ctdb_connection, 55, message_handler_cb, msg_h, NULL);
+       handle = ctdb_set_message_handler_send(ctdb_connection, 55, msg_h,
+                                              message_handler_cb, NULL);
        if (handle == NULL) {
                printf("Failed to register message port\n");
                exit(10);