From ebe4dd724338c156423cfdcc10a75b68c2084cde Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Mon, 24 May 2010 13:17:36 +0930 Subject: [PATCH] libctdb: uniform callbacks, _recv functions to pull out data. This is a bit tricky for those cases where we need to do multiple or zero I/Os (eg. attachdb and readrecordlock), but works well for the simple cases. --- include/ccan/typesafe_cb.h | 177 +++++++++++++++ include/ctdb.h | 108 +++++---- libctdb/control.c | 65 +++--- libctdb/ctdb.c | 447 ++++++++++++++++++++----------------- libctdb/io_elem.c | 5 + libctdb/io_elem.h | 3 + libctdb/libctdb_private.h | 33 ++- libctdb/messages.c | 64 +++--- libctdb/sync.c | 109 ++++----- libctdb/tst.c | 36 ++- 10 files changed, 661 insertions(+), 386 deletions(-) create mode 100644 include/ccan/typesafe_cb.h diff --git a/include/ccan/typesafe_cb.h b/include/ccan/typesafe_cb.h new file mode 100644 index 00000000..b1f2c5f5 --- /dev/null +++ b/include/ccan/typesafe_cb.h @@ -0,0 +1,177 @@ +#ifndef CCAN_CAST_IF_TYPE_H +#define CCAN_CAST_IF_TYPE_H + +#if (__GNUC__ >= 3) +#define HAVE_TYPEOF 1 +#define HAVE_BUILTIN_CHOOSE_EXPR 1 +#define HAVE_BUILTIN_TYPES_COMPATIBLE_P 1 +#endif + +#if HAVE_TYPEOF && HAVE_BUILTIN_CHOOSE_EXPR && HAVE_BUILTIN_TYPES_COMPATIBLE_P +/** + * cast_if_type - only cast an expression if test matches a given type + * @desttype: the type to cast to + * @expr: the expression to cast + * @test: the expression to test + * @oktype: the type we allow + * + * This macro is used to create functions which allow multiple types. + * The result of this macro is used somewhere that a @desttype type is + * expected: if @expr was of type @oktype, it will be cast to + * @desttype type. As a result, if @expr is any type other than + * @oktype or @desttype, a compiler warning will be issued. + * + * This macro can be used in static initializers. + * + * This is merely useful for warnings: if the compiler does not + * support the primitives required for cast_if_type(), it becomes an + * unconditional cast, and the @test and @oktype argument is not used. In + * particular, this means that @oktype can be a type which uses + * the "typeof": it will not be evaluated if typeof is not supported. + * + * Example: + * // We can take either an unsigned long or a void *. + * void _set_some_value(void *val); + * #define set_some_value(e) \ + * _set_some_value(cast_if_type(void *, (e), (e), unsigned long)) + */ +#define cast_if_type(desttype, expr, test, oktype) \ +__builtin_choose_expr(__builtin_types_compatible_p(typeof(1?(test):0), oktype), \ + (desttype)(expr), (expr)) +#else +#define cast_if_type(desttype, expr, test, oktype) ((desttype)(expr)) +#endif + +/** + * cast_if_any - only cast an expression if it is one of the three given types + * @desttype: the type to cast to + * @expr: the expression to cast + * @test: the expression to test + * @ok1: the first type we allow + * @ok2: the second type we allow + * @ok3: the third type we allow + * + * This is a convenient wrapper for multiple cast_if_type() calls. You can + * chain them inside each other (ie. use cast_if_any() for expr) if you need + * more than 3 arguments. + * + * Example: + * // We can take either a long, unsigned long, void * or a const void *. + * void _set_some_value(void *val); + * #define set_some_value(expr) \ + * _set_some_value(cast_if_any(void *, (expr), (expr), \ + * long, unsigned long, const void *)) + */ +#define cast_if_any(desttype, expr, test, ok1, ok2, ok3) \ + cast_if_type(desttype, \ + cast_if_type(desttype, \ + cast_if_type(desttype, (expr), (test), ok1), \ + ok2), \ + ok3) + +/** + * typesafe_cb - cast a callback function if it matches the arg + * @rtype: the return type of the callback function + * @fn: the callback function to cast + * @arg: the (pointer) argument to hand to the callback function. + * + * If a callback function takes a single argument, this macro does + * appropriate casts to a function which takes a single void * argument if the + * callback provided matches the @arg (or a const or volatile version). + * + * It is assumed that @arg is of pointer type: usually @arg is passed + * or assigned to a void * elsewhere anyway. + * + * Example: + * void _register_callback(void (*fn)(void *arg), void *arg); + * #define register_callback(fn, arg) \ + * _register_callback(typesafe_cb(void, (fn), (arg)), (arg)) + */ +#define typesafe_cb(rtype, fn, arg) \ + cast_if_type(rtype (*)(void *), (fn), (fn)(arg), rtype) + +/** + * typesafe_cb_const - cast a const callback function if it matches the arg + * @rtype: the return type of the callback function + * @fn: the callback function to cast + * @arg: the (pointer) argument to hand to the callback function. + * + * If a callback function takes a single argument, this macro does appropriate + * casts to a function which takes a single const void * argument if the + * callback provided matches the @arg. + * + * It is assumed that @arg is of pointer type: usually @arg is passed + * or assigned to a void * elsewhere anyway. + * + * Example: + * void _register_callback(void (*fn)(const void *arg), const void *arg); + * #define register_callback(fn, arg) \ + * _register_callback(typesafe_cb_const(void, (fn), (arg)), (arg)) + */ +#define typesafe_cb_const(rtype, fn, arg) \ + sizeof((fn)((const void *)0)), \ + cast_if_type(rtype (*)(const void *), \ + (fn), (fn)(arg), rtype (*)(typeof(arg))) + +/** + * typesafe_cb_preargs - cast a callback function if it matches the arg + * @rtype: the return type of the callback function + * @fn: the callback function to cast + * @arg: the (pointer) argument to hand to the callback function. + * + * This is a version of typesafe_cb() for callbacks that take other arguments + * before the @arg. + * + * Example: + * void _register_callback(void (*fn)(int, void *arg), void *arg); + * #define register_callback(fn, arg) \ + * _register_callback(typesafe_cb_preargs(void, (fn), (arg), int),\ + * (arg)) + */ +#define typesafe_cb_preargs(rtype, fn, arg, ...) \ + cast_if_type(rtype (*)(__VA_ARGS__, void *), (fn), (fn), \ + rtype (*)(__VA_ARGS__, typeof(arg))) +/** + * typesafe_cb_postargs - cast a callback function if it matches the arg + * @rtype: the return type of the callback function + * @fn: the callback function to cast + * @arg: the (pointer) argument to hand to the callback function. + * + * This is a version of typesafe_cb() for callbacks that take other arguments + * after the @arg. + * + * Example: + * void _register_callback(void (*fn)(void *arg, int), void *arg); + * #define register_callback(fn, arg) \ + * _register_callback(typesafe_cb_postargs(void, (fn), (arg), int),\ + * (arg)) + */ +#define typesafe_cb_postargs(rtype, fn, arg, ...) \ + cast_if_type(rtype (*)(void *, __VA_ARGS__), (fn), (fn), \ + rtype (*)(typeof(arg), __VA_ARGS__)) +/** + * typesafe_cb_cmp - cast a compare function if it matches the arg + * @rtype: the return type of the callback function + * @fn: the callback function to cast + * @arg: the (pointer) argument(s) to hand to the compare function. + * + * If a callback function takes two matching-type arguments, this macro does + * appropriate casts to a function which takes two const void * arguments if + * the callback provided takes two a const pointers to @arg. + * + * It is assumed that @arg is of pointer type: usually @arg is passed + * or assigned to a void * elsewhere anyway. Note also that the type + * arg points to must be defined. + * + * Example: + * void _my_qsort(void *base, size_t nmemb, size_t size, + * int (*cmp)(const void *, const void *)); + * #define my_qsort(base, nmemb, cmpfn) \ + * _my_qsort((base), (nmemb), sizeof(*(base)), \ + * typesafe_cb_cmp(int, (cmpfn), (base)), (arg)) + */ +#define typesafe_cb_cmp(rtype, cmpfn, arg) \ + cast_if_type(rtype (*)(const void *, const void *), (cmpfn), \ + rtype (*)(const typeof(*arg)*, const typeof(*arg)*)) + +#endif /* CCAN_CAST_IF_TYPE_H */ diff --git a/include/ctdb.h b/include/ctdb.h index 8ec5bfa7..9aabc4e2 100644 --- a/include/ctdb.h +++ b/include/ctdb.h @@ -41,6 +41,19 @@ int ctdb_which_events(struct ctdb_connection *ctdb); int ctdb_service(struct ctdb_connection *ctdb, int revents); +struct ctdb_request; + +void ctdb_request_free(struct ctdb_request *req); + +/* + * Callback for completed requests: it would normally unpack the request + * using ctdb_*_recv(). You must free the request using ctdb_request_free(). + * + * Note that due to macro magic, your callback doesn't have to take void *, + * it can take a type which matches the actual private parameter. + */ +typedef void (*ctdb_callback_t)(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private); /* * Special node addresses : @@ -55,8 +68,6 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents); #define CTDB_BROADCAST_CONNECTED 0xF0000004 -struct ctdb_request; - /* * functions to attach to a database * if the database does not exist it will be created. @@ -65,13 +76,13 @@ struct ctdb_request; */ struct ctdb_db; -typedef void (*ctdb_attachdb_cb)(int status, struct ctdb_db *ctdb_db, void *private_data); - struct ctdb_request * ctdb_attachdb_send(struct ctdb_connection *ctdb, const char *name, int persistent, uint32_t tdb_flags, - ctdb_attachdb_cb callback, - void *private_data); + ctdb_callback_t callback, void *private_data); + +struct ctdb_db *ctdb_attachdb_recv(struct ctdb_request *req); + struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb, const char *name, int persistent, uint32_t tdb_flags); @@ -87,22 +98,16 @@ struct ctdb_lock; * When the lock is released, data is freed too, so make sure to copy the data * before that. */ -typedef void (*ctdb_readrecordlock_cb)(int status, struct ctdb_lock *lock, TDB_DATA data, void *private_data); - struct ctdb_request * -ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, - TDB_DATA key, - ctdb_readrecordlock_cb callback, - void *private_data); -int ctdb_readrecordlock_recv(struct ctdb_connection *ctdb, - struct ctdb_request *handle, - TDB_DATA **data); -int ctdb_readrecordlock(struct ctdb_connection *ctdb, - struct ctdb_db *ctdb_db, - TDB_DATA key, - TDB_DATA **data); - +ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key, + ctdb_callback_t callback, void *private_data); +struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db, + struct ctdb_request *handle, + TDB_DATA *data); +/* Returns null on failure. */ +struct ctdb_lock *ctdb_readrecordlock(struct ctdb_db *ctdb_db, TDB_DATA key, + TDB_DATA *data); /* * Function to write data to a record @@ -124,15 +129,11 @@ void ctdb_release_lock(struct ctdb_lock *lock); */ typedef void (*ctdb_message_fn_t)(struct ctdb_connection *, uint64_t srvid, TDB_DATA data, void *); -/* - * register a message handler and start listening on a service port - */ -typedef void (*ctdb_set_message_handler_cb)(int status, void *private_data); - struct ctdb_request * ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid, - ctdb_set_message_handler_cb callback, - ctdb_message_fn_t handler, void *private_data); + ctdb_message_fn_t handler, + ctdb_callback_t callback, + void *private_data); int ctdb_set_message_handler_recv(struct ctdb_connection *ctdb, struct ctdb_request *handle); @@ -145,15 +146,12 @@ int ctdb_set_message_handler(struct ctdb_connection *ctdb, uint64_t srvid, /* * unregister a message handler and stop listening on teh specified port */ -typedef void (*ctdb_remove_message_handler_cb)(int status, void *private_data); - struct ctdb_request * ctdb_remove_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid, - ctdb_remove_message_handler_cb callback, + ctdb_callback_t callback, void *private_data); -int ctdb_remove_message_handler_recv(struct ctdb_connection *ctdb, - struct ctdb_request *handle); +int ctdb_remove_message_handler_recv(struct ctdb_request *handle); int ctdb_remove_message_handler(struct ctdb_connection *ctdb, uint64_t srvid); @@ -170,13 +168,13 @@ int ctdb_send_message(struct ctdb_connection *ctdb, uint32_t pnn, uint64_t srvid /* * functions to read the pnn number of the local node */ -typedef void (*ctdb_getpnn_cb)(int status, uint32_t pnn, void *private_data); - struct ctdb_request * ctdb_getpnn_send(struct ctdb_connection *ctdb, uint32_t destnode, - ctdb_getpnn_cb callback, + ctdb_callback_t callback, void *private_data); +int ctdb_getpnn_recv(struct ctdb_request *req, uint32_t *pnn); + int ctdb_getpnn(struct ctdb_connection *ctdb, uint32_t destnode, uint32_t *pnn); @@ -187,17 +185,13 @@ int ctdb_getpnn(struct ctdb_connection *ctdb, /* * functions to read the recovery master of a node */ -typedef void (*ctdb_getrecmaster_cb)(int status, - uint32_t recmaster, void *private_data); - struct ctdb_request * ctdb_getrecmaster_send(struct ctdb_connection *ctdb, uint32_t destnode, - ctdb_getrecmaster_cb callback, + ctdb_callback_t callback, void *private_data); -int ctdb_getrecmaster_recv(struct ctdb_connection *ctdb, - struct ctdb_request *handle, - uint32_t *recmaster); +int ctdb_getrecmaster_recv(struct ctdb_request *handle, + uint32_t *recmaster); int ctdb_getrecmaster(struct ctdb_connection *ctdb, uint32_t destnode, uint32_t *recmaster); @@ -210,4 +204,34 @@ int ctdb_getrecmaster(struct ctdb_connection *ctdb, */ int ctdb_cancel(struct ctdb_request *); + +/* These ugly macro wrappers make the callbacks typesafe. */ +#include +#define ctdb_sendcb(cb, cbdata) \ + typesafe_cb_preargs(void, (cb), (cbdata), \ + struct ctdb_connection *, struct ctdb_request *) + +#define ctdb_attachdb_send(ctdb, name, persistent, tdb_flags, cb, cbdata) \ + ctdb_attachdb_send((ctdb), (name), (persistent), (tdb_flags), \ + ctdb_sendcb((cb), (cbdata)), (cbdata)) + +#define ctdb_readrecordlock_send(ctdb_db, key, cb, cbdata) \ + ctdb_readrecordlock_send((ctdb_db), (key), \ + ctdb_sendcb((cb), (cbdata)), (cbdata)) + +#define ctdb_set_message_handler_send(ctdb, srvid, handler, cb, cbdata) \ + ctdb_set_message_handler_send((ctdb), (srvid), (handler), \ + ctdb_sendcb((cb), (cbdata)), (cbdata)) + +#define ctdb_remove_message_handler_send(ctdb, srvid, cb, cbdata) \ + ctdb_remove_message_handler_send((ctdb), (srvid), \ + ctdb_sendcb((cb), (cbdata)), (cbdata)) + +#define ctdb_getpnn_send(ctdb, destnode, cb, cbdata) \ + ctdb_getpnn_send((ctdb), (destnode), \ + ctdb_sendcb((cb), (cbdata)), (cbdata)) + +#define ctdb_getrecmaster_send(ctdb, destnode, cb, cbdata) \ + ctdb_getrecmaster_send((ctdb), (destnode), \ + ctdb_sendcb((cb), (cbdata)), (cbdata)) #endif diff --git a/libctdb/control.c b/libctdb/control.c index 84d703e8..92872905 100644 --- a/libctdb/control.c +++ b/libctdb/control.c @@ -20,36 +20,47 @@ #include #include "libctdb_private.h" +/* Remove type-safety macros. */ +#undef ctdb_getrecmaster_send +#undef ctdb_getpnn_send + +int ctdb_getrecmaster_recv(struct ctdb_request *req, uint32_t *recmaster) +{ + struct ctdb_reply_control *reply; + + reply = unpack_reply_control(req, CTDB_CONTROL_GET_RECMASTER); + if (!reply || reply->status == -1) + return -1; + *recmaster = reply->status; + return 0; +} + struct ctdb_request *ctdb_getrecmaster_send(struct ctdb_connection *ctdb, - uint32_t destnode, - ctdb_getrecmaster_cb callback, - void *private_data) + uint32_t destnode, + ctdb_callback_t callback, + void *private_data) { - struct ctdb_request *req; - - req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER, - destnode, NULL, 0); - if (!req) - return NULL; - req->callback.getrecmaster = callback; - req->priv_data = private_data; - return req; + return new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER, + destnode, NULL, 0, + callback, private_data); +} + +int ctdb_getpnn_recv(struct ctdb_request *req, uint32_t *pnn) +{ + struct ctdb_reply_control *reply; + + reply = unpack_reply_control(req, CTDB_CONTROL_GET_PNN); + if (!reply || reply->status == -1) + return -1; + *pnn = reply->status; + return 0; } -struct ctdb_request * -ctdb_getpnn_send(struct ctdb_connection *ctdb, - uint32_t destnode, - ctdb_getpnn_cb callback, - void *private_data) +struct ctdb_request *ctdb_getpnn_send(struct ctdb_connection *ctdb, + uint32_t destnode, + ctdb_callback_t callback, + void *private_data) { - struct ctdb_request *req; - - req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode, - NULL, 0); - if (!req) { - return NULL; - } - req->callback.getpnn = callback; - req->priv_data = private_data; - return req; + return new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode, + NULL, 0, callback, private_data); } diff --git a/libctdb/ctdb.c b/libctdb/ctdb.c index 85875ac4..5461d3c1 100644 --- a/libctdb/ctdb.c +++ b/libctdb/ctdb.c @@ -31,6 +31,10 @@ #include #include +/* Remove type-safety macros. */ +#undef ctdb_attachdb_send +#undef ctdb_readrecordlock_send + /* FIXME: Could be in shared util code with rest of ctdb */ static void close_noerr(int fd) { @@ -63,13 +67,13 @@ static void set_close_on_exec(int fd) fcntl(fd, F_SETFD, v | FD_CLOEXEC); } -static void set_pnn(int32_t status, uint32_t pnn, void *private_data) +static void set_pnn(struct ctdb_connection *ctdb, + struct ctdb_request *req, + void *unused) { - if (status != 0) { + if (ctdb_getpnn_recv(req, &ctdb->pnn) != 0) { /* FIXME: Report error. */ - ((struct ctdb_connection *)private_data)->broken = true; - } else { - ((struct ctdb_connection *)private_data)->pnn = pnn; + ctdb->broken = true; } } @@ -103,7 +107,7 @@ struct ctdb_connection *ctdb_connect(const char *addr) goto close_fail; /* Immediately queue a request to get our pnn. */ - if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, ctdb)) + if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, NULL)) goto close_fail; return ctdb; @@ -130,7 +134,8 @@ int ctdb_which_events(struct ctdb_connection *ctdb) return events; } -struct ctdb_request *new_ctdb_request(size_t len) +struct ctdb_request *new_ctdb_request(size_t len, + ctdb_callback_t cb, void *cbdata) { struct ctdb_request *req = malloc(sizeof(*req)); if (!req) @@ -141,105 +146,104 @@ struct ctdb_request *new_ctdb_request(size_t len) return NULL; } req->hdr.hdr = io_elem_data(req->io, NULL); - req->cancelled = false; + req->reply = NULL; + req->callback = cb; + req->priv_data = cbdata; + req->extra = NULL; + req->extra_destructor = NULL; return req; } -static struct ctdb_request *new_immediate_request(void) +void ctdb_request_free(struct ctdb_request *req) { - struct ctdb_request *req = malloc(sizeof(*req)); - if (!req) - return NULL; - req->cancelled = false; - req->io = NULL; - req->hdr.hdr = NULL; - return req; -} - -static void free_ctdb_request(struct ctdb_request *req) -{ - /* immediate requests don't have IO */ - if (req->io) { - free_io_elem(req->io); + if (req->extra_destructor) { + req->extra_destructor(req); + } + if (req->reply) { + free_io_elem(req->reply); } + free_io_elem(req->io); free(req); } -static void handle_call_reply(struct ctdb_connection *ctdb, - struct ctdb_req_header *hdr, - struct ctdb_request *i) +/* Sanity-checking wrapper for reply. + * FIXME: logging! */ +static struct ctdb_reply_call *unpack_reply_call(struct ctdb_request *req, + uint32_t callid) { - struct ctdb_req_call *call = i->hdr.call; - struct ctdb_reply_call *reply = (struct ctdb_reply_call *)hdr; + size_t len; + struct ctdb_reply_call *inhdr = io_elem_data(req->reply, &len); - switch (call->callid) { - case CTDB_NULL_FUNC: - /* FIXME: We should let it steal the request, rathe than copy */ - i->callback.nullfunc(reply->status, reply, i->priv_data); - break; + /* ctdbd or our error if this isn't a reply call. */ + if (len < sizeof(*inhdr) || inhdr->hdr.operation != CTDB_REPLY_CALL) { + errno = EIO; + return NULL; } + + /* Library user error if this isn't a reply to a call. */ + if (req->hdr.hdr->operation != CTDB_REQ_CALL + || req->hdr.call->callid != callid) { + errno = EINVAL; + return NULL; + } + + return inhdr; } -static void handle_control_reply(struct ctdb_connection *ctdb, - struct ctdb_req_header *hdr, - struct ctdb_request *i) +/* Sanity-checking wrapper for reply. + * FIXME: logging! */ +struct ctdb_reply_control *unpack_reply_control(struct ctdb_request *req, + enum ctdb_controls control) { - struct ctdb_req_control *control = i->hdr.control; - struct ctdb_reply_control *reply = (struct ctdb_reply_control *)hdr; + size_t len; + struct ctdb_reply_control *inhdr = io_elem_data(req->reply, &len); + + /* Library user error if this isn't a reply to a call. */ + if (len < sizeof(*inhdr) + || req->hdr.hdr->operation != CTDB_REQ_CONTROL) { + errno = EINVAL; + return NULL; + } + + /* ... or if it was a different control from what we expected. */ + if (req->hdr.control->opcode != control) { + errno = EINVAL; + return NULL; + } - switch (control->opcode) { - case CTDB_CONTROL_GET_RECMASTER: - i->callback.getrecmaster(0, reply->status, i->priv_data); - break; - case CTDB_CONTROL_GET_PNN: - i->callback.getpnn(0, reply->status, i->priv_data); - break; - case CTDB_CONTROL_REGISTER_SRVID: - i->callback.register_srvid(reply->status, i->priv_data); - break; - case CTDB_CONTROL_DB_ATTACH_PERSISTENT: - case CTDB_CONTROL_DB_ATTACH: - i->callback.attachdb(reply->status, *(uint32_t *)reply->data, - i->priv_data); - break; - case CTDB_CONTROL_GETDBPATH: - i->callback.getdbpath(reply->status, (char *)reply->data, - i->priv_data); - break; + /* ctdbd or our error if this isn't a reply call. */ + if (inhdr->hdr.operation != CTDB_REPLY_CONTROL) { + errno = EIO; + return NULL; } + + return inhdr; } -static void handle_incoming(struct ctdb_connection *ctdb, - struct ctdb_req_header *hdr, - size_t len /* FIXME: use len to check packet! */) +static void handle_incoming(struct ctdb_connection *ctdb, struct io_elem *in) { + struct ctdb_req_header *hdr; + size_t len; struct ctdb_request *i; + hdr = io_elem_data(in, &len); + /* FIXME: use len to check packet! */ + if (hdr->operation == CTDB_REQ_MESSAGE) { deliver_message(ctdb, hdr); return; } - if (hdr->operation != CTDB_REPLY_CALL - && hdr->operation != CTDB_REPLY_CONTROL) { - /* FIXME: report this error. */ - return; - } - for (i = ctdb->doneq; i; i = i->next) { if (i->hdr.hdr->reqid == hdr->reqid) { - if (!i->cancelled) { - if (hdr->operation == CTDB_REPLY_CALL) - handle_call_reply(ctdb, hdr, i); - else - handle_control_reply(ctdb, hdr, i); - } DLIST_REMOVE(ctdb->doneq, i); - free_ctdb_request(i); + i->reply = in; + i->callback(ctdb, i, i->priv_data); return; } } /* FIXME: report this error. */ + free_io_elem(in); } /* Remove "harmless" errors. */ @@ -266,12 +270,8 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents) if (io_elem_finished(ctdb->outq->io)) { struct ctdb_request *done = ctdb->outq; DLIST_REMOVE(ctdb->outq, done); - if (done->cancelled) { - free_ctdb_request(done); - } else { - DLIST_ADD_END(ctdb->doneq, done, - struct ctdb_request); - } + DLIST_ADD_END(ctdb->doneq, done, + struct ctdb_request); } } } @@ -298,22 +298,15 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents) /* No progress, stop loop. */ revents = 0; } else if (io_elem_finished(ctdb->in)) { - struct ctdb_req_header *hdr; - size_t len; - - hdr = io_elem_data(ctdb->in, &len); - handle_incoming(ctdb, hdr, len); - free_io_elem(ctdb->in); + handle_incoming(ctdb, ctdb->in); ctdb->in = NULL; } } while (ctdb->immediateq) { struct ctdb_request *imm = ctdb->immediateq; - /* This has to handle fake->cancelled internally. */ - imm->callback.immediate(imm, imm->priv_data); + imm->callback(ctdb, imm, imm->priv_data); DLIST_REMOVE(ctdb->immediateq, imm); - free_ctdb_request(imm); } return 0; @@ -349,12 +342,14 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb, uint32_t opcode, uint32_t destnode, const void *extra_data, - size_t extra) + size_t extra, + ctdb_callback_t callback, + void *cbdata) { struct ctdb_request *req; struct ctdb_req_control *pkt; - req = new_ctdb_request(sizeof(*pkt) + extra); + req = new_ctdb_request(sizeof(*pkt) + extra, callback, cbdata); if (!req) return NULL; @@ -372,10 +367,17 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb, return req; } +void ctdb_cancel_callback(struct ctdb_connection *ctdb, + struct ctdb_request *req, + void *unused) +{ + ctdb_request_free(req); +} + int ctdb_cancel(struct ctdb_request *req) { /* FIXME: If it's not sent, we could just free it right now. */ - req->cancelled = true; + req->callback = ctdb_cancel_callback; return 0; } @@ -386,71 +388,107 @@ struct ctdb_db { uint32_t id; struct tdb_context *tdb; - ctdb_attachdb_cb callback; + ctdb_callback_t callback; void *private_data; }; -static void attachdb_getdbpath_done(int status, const char *path, +static void attachdb_getdbpath_done(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *_db) { struct ctdb_db *db = _db; + + /* Do callback on original request. */ + db->callback(ctdb, req->extra, db->private_data); +} + +struct ctdb_db *ctdb_attachdb_recv(struct ctdb_request *req) +{ + struct ctdb_request *dbpath_req = req->extra; + struct ctdb_reply_control *reply; + struct ctdb_db *db = req->priv_data; uint32_t tdb_flags = db->tdb_flags; - if (status != 0) { - db->callback(status, NULL, db->private_data); - free(db); - return; + /* Never sent the dbpath request? We've failed. */ + if (!dbpath_req) { + /* FIXME: Save errno? */ + errno = EINVAL; + return NULL; + } + + reply = unpack_reply_control(dbpath_req, CTDB_CONTROL_GETDBPATH); + if (!reply || reply->status != 0) { + return NULL; } tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC; tdb_flags |= TDB_DISALLOW_NESTING; - db->tdb = tdb_open(path, 0, tdb_flags, O_RDWR, 0); + db->tdb = tdb_open((char *)reply->data, 0, tdb_flags, O_RDWR, 0); if (db->tdb == NULL) { - db->callback(-1, NULL, db->private_data); - free(db); - return; + return NULL; } - /* Finally, we tell the client that we opened the db. */ - db->callback(status, db, db->private_data); + /* Finally, separate the db from the request (see destroy_req_db). */ + req->priv_data = NULL; + return db; } -static void attachdb_done(int status, uint32_t id, struct ctdb_db *db) +static void attachdb_done(struct ctdb_connection *ctdb, + struct ctdb_request *req, + void *_db) { - struct ctdb_request *req; + struct ctdb_db *db = _db; + struct ctdb_request *req2; + struct ctdb_reply_control *reply; + enum ctdb_controls control = CTDB_CONTROL_DB_ATTACH; - if (status != 0) { - db->callback(status, NULL, db->private_data); - free(db); + if (db->persistent) { + control = CTDB_CONTROL_DB_ATTACH_PERSISTENT; + } + + reply = unpack_reply_control(req, control); + if (!reply || reply->status != 0) { + /* We failed. Hand request to user and have them discover it + * via ctdb_attachdb_recv. */ + db->callback(ctdb, req, db); return; } - db->id = id; + db->id = *(uint32_t *)reply->data; /* Now we do another call, to get the dbpath. */ - req = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH, - CTDB_CURRENT_NODE, &id, sizeof(id)); - if (!req) { - db->callback(-1, NULL, db->private_data); - free(db); + req2 = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH, + CTDB_CURRENT_NODE, + &db->id, sizeof(db->id), + attachdb_getdbpath_done, db); + if (!req2) { + db->callback(ctdb, req, db); return; } - req->callback.getdbpath = attachdb_getdbpath_done; - req->priv_data = db; + req->extra = req2; + req2->extra = req; +} + +static void destroy_req_db(struct ctdb_request *req) +{ + /* Incomplete db is in priv_data. */ + free(req->priv_data); + /* second request is chained off this one. */ + if (req->extra) { + ctdb_request_free(req->extra); + } } struct ctdb_request * ctdb_attachdb_send(struct ctdb_connection *ctdb, const char *name, int persistent, uint32_t tdb_flags, - ctdb_attachdb_cb callback, - void *private_data) + ctdb_callback_t callback, void *private_data) { struct ctdb_request *req; struct ctdb_db *db; uint32_t opcode; /* FIXME: Search if db already open. */ - db = malloc(sizeof(*db)); if (!db) { return NULL; @@ -463,7 +501,7 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb, } req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name, - strlen(name) + 1); + strlen(name) + 1, attachdb_done, db); if (!req) { free(db); return NULL; @@ -475,8 +513,9 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb, db->callback = callback; db->private_data = private_data; - req->callback.attachdb = attachdb_done; - req->priv_data = db; + req->extra_destructor = destroy_req_db; + /* This is set non-NULL when we succeed, see ctdb_attachdb_recv */ + req->extra = NULL; /* Flags get overloaded into srvid. */ req->hdr.control->srvid = tdb_flags; @@ -486,12 +525,14 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb, struct ctdb_lock { struct ctdb_db *ctdb_db; TDB_DATA key; + + /* This will always be true by the time user sees this. */ + bool held; struct ctdb_ltdb_header *hdr; TDB_DATA data; - bool held; - /* For convenience, we stash this here. */ - ctdb_readrecordlock_cb callback; - void *private_data; + + /* For convenience, we stash original callback here. */ + ctdb_callback_t callback; }; void ctdb_release_lock(struct ctdb_lock *lock) @@ -499,136 +540,131 @@ void ctdb_release_lock(struct ctdb_lock *lock) if (lock->held) { tdb_chainunlock(lock->ctdb_db->tdb, lock->key); } - free(lock->key.dptr); - free(lock->hdr); /* Also frees lock->data */ + free(lock->hdr); /* Also frees data */ free(lock); } /* We keep the lock if local node is the dmaster. */ static bool try_readrecordlock(struct ctdb_lock *lock) { + struct ctdb_ltdb_header *hdr; + if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) { - return false; + return NULL; } - lock->hdr = ctdb_local_fetch(lock->ctdb_db->tdb, - lock->key, &lock->data); - if (lock->hdr && lock->hdr->dmaster == lock->ctdb_db->ctdb->pnn) { + hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, &lock->data); + if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) { lock->held = true; + lock->hdr = hdr; return true; } tdb_chainunlock(lock->ctdb_db->tdb, lock->key); - free(lock->hdr); - return false; + free(hdr); + return NULL; } -static void readrecordlock_done(int, struct ctdb_reply_call *, void *); +/* If they cancel *before* we hand them the lock from + * ctdb_readrecordlock_recv, we free it here. */ +static void destroy_lock(struct ctdb_request *req) +{ + ctdb_release_lock(req->extra); +} -static struct ctdb_request *new_readrecordlock_request(struct ctdb_lock *lock) +struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db, + struct ctdb_request *req, + TDB_DATA *data) { - struct ctdb_request *req; - struct ctdb_req_call *pkt; + struct ctdb_lock *lock = req->extra; - req = new_ctdb_request(sizeof(*pkt) + lock->key.dsize); - if (!req) + if (!lock->held) { + /* Something went wrong. */ return NULL; - req->callback.nullfunc = readrecordlock_done; - req->priv_data = lock; + } - io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE, - new_reqid(lock->ctdb_db->ctdb)); - - pkt = req->hdr.call; - pkt->flags = CTDB_IMMEDIATE_MIGRATION; - pkt->db_id = lock->ctdb_db->id; - pkt->callid = CTDB_NULL_FUNC; - pkt->hopcount = 0; - pkt->keylen = lock->key.dsize; - pkt->calldatalen = 0; - memcpy(pkt->data, lock->key.dptr, lock->key.dsize); - DLIST_ADD_END(lock->ctdb_db->ctdb->outq, req, struct ctdb_request); - return req; + /* Now it's their responsibility to free! */ + req->extra_destructor = NULL; + *data = lock->data; + return lock; } -/* OK, let's try again... */ -static void readrecordlock_done(int status, struct ctdb_reply_call *reply, - void *_lock) +static void readrecordlock_retry(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private) { - struct ctdb_lock *lock = _lock; + struct ctdb_lock *lock = req->extra; + struct ctdb_reply_call *reply; - if (status != 0) { - lock->callback(status, NULL, tdb_null, lock->private_data); - ctdb_release_lock(lock); + /* OK, we've received reply to noop migration */ + reply = unpack_reply_call(req, CTDB_NULL_FUNC); + if (!reply || reply->status != 0) { + lock->callback(ctdb, req, private); return; } + /* Can we get lock now? */ if (try_readrecordlock(lock)) { - lock->callback(0, lock, lock->data, lock->private_data); + lock->callback(ctdb, req, private); return; } - if (!new_readrecordlock_request(lock)) { - lock->callback(-1, NULL, tdb_null, lock->private_data); - ctdb_release_lock(lock); - } -} - -static void lock_complete(struct ctdb_request *req, void *_lock) -{ - struct ctdb_lock *lock = _lock; - - if (!req->cancelled) { - lock->callback(0, lock, lock->data, lock->private_data); - } else { - ctdb_release_lock(lock); - } + /* Retransmit the same request again (we lost race). */ + io_elem_reset(req->io); + DLIST_ADD_END(ctdb->outq, req, struct ctdb_request); + return; } struct ctdb_request * -ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, - TDB_DATA key, - ctdb_readrecordlock_cb callback, - void *private_data) +ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key, + ctdb_callback_t callback, void *cbdata) { struct ctdb_request *req; struct ctdb_lock *lock; - lock = malloc(sizeof(*lock)); - if (!lock) - return NULL; - lock->key.dptr = malloc(key.dsize); - if (!lock->key.dptr) { - free_noerr(lock); + /* Setup lock. */ + lock = malloc(sizeof(*lock) + key.dsize); + if (!lock) { return NULL; } + lock->key.dptr = (void *)(lock + 1); memcpy(lock->key.dptr, key.dptr, key.dsize); lock->key.dsize = key.dsize; lock->ctdb_db = ctdb_db; - lock->callback = callback; - lock->private_data = private_data; lock->hdr = NULL; lock->held = false; + /* Get ready in case we need to send a migrate request. */ + req = new_ctdb_request(sizeof(*req->hdr.call) + + key.dsize, callback, cbdata); + if (!req) { + ctdb_release_lock(lock); + return NULL; + } + req->extra = lock; + req->extra_destructor = destroy_lock; + if (try_readrecordlock(lock)) { - /* We pretend to be async, so we just queue this. */ - req = new_immediate_request(); - if (!req) { - ctdb_release_lock(lock); - return NULL; - } - req->callback.immediate = lock_complete; - req->priv_data = lock; - DLIST_ADD_END(lock->ctdb_db->ctdb->immediateq, + /* Already got it: prepare for immediate callback. */ + DLIST_ADD_END(ctdb_db->ctdb->immediateq, req, struct ctdb_request); return req; } - req = new_readrecordlock_request(lock); - if (!req) { - ctdb_release_lock(lock); - return NULL; - } + /* We store the original callback in the lock, and use our own. */ + lock->callback = callback; + req->callback = readrecordlock_retry; + + io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE, + new_reqid(ctdb_db->ctdb)); + + req->hdr.call->flags = CTDB_IMMEDIATE_MIGRATION; + req->hdr.call->db_id = ctdb_db->id; + req->hdr.call->callid = CTDB_NULL_FUNC; + req->hdr.call->hopcount = 0; + req->hdr.call->keylen = key.dsize; + req->hdr.call->calldatalen = 0; + memcpy(req->hdr.call->data, key.dptr, key.dsize); + DLIST_ADD_END(ctdb_db->ctdb->outq, req, struct ctdb_request); return req; } @@ -639,5 +675,6 @@ int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data) return -1; } - return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, data); + return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, + data); } diff --git a/libctdb/io_elem.c b/libctdb/io_elem.c index 91e84cce..ada77780 100644 --- a/libctdb/io_elem.c +++ b/libctdb/io_elem.c @@ -130,3 +130,8 @@ int write_io_elem(int fd, struct io_elem *io) io->off += ret; return ret; } + +void io_elem_reset(struct io_elem *io) +{ + io->off = 0; +} diff --git a/libctdb/io_elem.h b/libctdb/io_elem.h index 5c234fec..e774cdbd 100644 --- a/libctdb/io_elem.h +++ b/libctdb/io_elem.h @@ -14,6 +14,9 @@ void free_io_elem(struct io_elem *io); /* If finished, this returns the request header, otherwise NULL. */ bool io_elem_finished(const struct io_elem *io); +/* Reset an io_elem to the start. */ +void io_elem_reset(struct io_elem *io); + /* Access to raw data: if len is non-NULL it is filled in. */ void *io_elem_data(const struct io_elem *io, size_t *len); diff --git a/libctdb/libctdb_private.h b/libctdb/libctdb_private.h index 6e789cf9..57270127 100644 --- a/libctdb/libctdb_private.h +++ b/libctdb/libctdb_private.h @@ -5,12 +5,16 @@ #include #include #include +#include struct message_handler_info; struct ctdb_reply_call; struct ctdb_request { + struct ctdb_connection *ctdb; struct ctdb_request *next, *prev; + bool cancelled; + struct io_elem *io; union { struct ctdb_req_header *hdr; @@ -18,17 +22,15 @@ struct ctdb_request { struct ctdb_req_control *control; struct ctdb_req_message *message; } hdr; - bool cancelled; - union { - ctdb_getrecmaster_cb getrecmaster; - ctdb_getpnn_cb getpnn; - void (*register_srvid)(int, struct message_handler_info *); - void (*attachdb)(int, uint32_t id, struct ctdb_db *); - void (*getdbpath)(int, const char *, void *); - void (*nullfunc)(int, struct ctdb_reply_call *, void *); - void (*immediate)(struct ctdb_request *, void *); - } callback; + + struct io_elem *reply; + + ctdb_callback_t callback; void *priv_data; + + /* Extra per-request info. */ + void (*extra_destructor)(struct ctdb_request *); + void *extra; }; struct ctdb_connection { @@ -53,11 +55,18 @@ struct ctdb_connection { }; /* ctdb.c */ -struct ctdb_request *new_ctdb_request(size_t len); +struct ctdb_request *new_ctdb_request(size_t len, ctdb_callback_t, void *); struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb, uint32_t opcode, uint32_t destnode, const void *extra_data, - size_t extra); + size_t extra, + ctdb_callback_t, void *); uint32_t new_reqid(struct ctdb_connection *ctdb); + +struct ctdb_reply_control *unpack_reply_control(struct ctdb_request *req, + enum ctdb_controls control); +void ctdb_cancel_callback(struct ctdb_connection *ctdb, + struct ctdb_request *req, + void *unused); #endif /* _LIBCTDB_PRIVATE_H */ diff --git a/libctdb/messages.c b/libctdb/messages.c index 6ec35415..864ec3e7 100644 --- a/libctdb/messages.c +++ b/libctdb/messages.c @@ -7,15 +7,17 @@ #include #include +/* Remove type-safety macros. */ +#undef ctdb_set_message_handler_send +#undef ctdb_set_message_handler_recv +#undef ctdb_remove_message_handler_send + struct message_handler_info { struct message_handler_info *next, *prev; - /* Callback when we're first registered. */ - ctdb_set_message_handler_cb callback; uint64_t srvid; ctdb_message_fn_t handler; void *private_data; - struct ctdb_connection *ctdb; }; void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr) @@ -35,50 +37,57 @@ void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr) /* FIXME: Report unknown messages */ } -static void set_message_handler(int status, struct message_handler_info *info) +int ctdb_set_message_handler_recv(struct ctdb_connection *ctdb, + struct ctdb_request *req) { - /* If registration failed, tell callback and clean up */ - if (status < 0) { - info->callback(status, info->private_data); - free(info); - return; - } else { - /* Put ourselves in list of handlers. */ - DLIST_ADD_END(info->ctdb->message_handlers, info, - struct message_handler_info); - /* Now call callback: it could remove us in theory. */ - info->callback(status, info->private_data); + struct message_handler_info *info = req->extra; + struct ctdb_reply_control *reply; + + reply = unpack_reply_control(req, CTDB_CONTROL_REGISTER_SRVID); + if (!reply || reply->status != 0) { + return -1; } + + /* Put ourselves in list of handlers. */ + DLIST_ADD_END(ctdb->message_handlers, info, + struct message_handler_info); + /* Keep safe from destructor */ + req->extra = NULL; + return 0; +} + +static void free_info(struct ctdb_request *req) +{ + free(req->extra); } struct ctdb_request * ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid, - ctdb_set_message_handler_cb callback, - ctdb_message_fn_t handler, void *private_data) + ctdb_message_fn_t handler, + ctdb_callback_t callback, void *private_data) { - struct ctdb_request *req; struct message_handler_info *info; + struct ctdb_request *req; info = malloc(sizeof(*info)); if (!info) { return NULL; } + req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID, - CTDB_CURRENT_NODE, NULL, 0); + CTDB_CURRENT_NODE, NULL, 0, + callback, private_data); if (!req) { free(info); return NULL; } + req->extra = info; + req->extra_destructor = free_info; req->hdr.control->srvid = srvid; info->srvid = srvid; info->handler = handler; - info->callback = callback; info->private_data = private_data; - info->ctdb = ctdb; - - req->callback.register_srvid = set_message_handler; - req->priv_data = info; return req; } @@ -90,7 +99,9 @@ int ctdb_send_message(struct ctdb_connection *ctdb, struct ctdb_request *req; struct ctdb_req_message *pkt; - req = new_ctdb_request(sizeof(*pkt) + data.dsize); + /* We just discard it once it's finished: no reply. */ + req = new_ctdb_request(sizeof(*pkt) + data.dsize, + ctdb_cancel_callback, NULL); if (!req) { return -1; } @@ -98,9 +109,6 @@ int ctdb_send_message(struct ctdb_connection *ctdb, io_elem_init_req_header(req->io, CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb)); - /* There's no reply to this, so we mark it cancelled immediately. */ - req->cancelled = true; - pkt = req->hdr.message; pkt->srvid = srvid; pkt->datalen = data.dsize; diff --git a/libctdb/sync.c b/libctdb/sync.c index 8c4892de..dfafa058 100644 --- a/libctdb/sync.c +++ b/libctdb/sync.c @@ -22,68 +22,56 @@ #include #include -/* FIXME: Find a way to share more code here. */ -struct ctdb_getrecmaster { - bool done; - int status; - uint32_t *recmaster; -}; - -static bool ctdb_service_flush(struct ctdb_connection *ctdb) +/* On failure, frees req and returns NULL. */ +static struct ctdb_request *wait_for(struct ctdb_connection *ctdb, + struct ctdb_request *req, + bool *done) { struct pollfd fds; + /* Pass through allocation failures. */ + if (!req) + return NULL; + fds.fd = ctdb_get_fd(ctdb); - fds.events = ctdb_which_events(ctdb); - if (poll(&fds, 1, -1) < 0) { - /* Signalled is OK, other error is bad. */ - return errno == EINTR; + while (!*done) { + fds.events = ctdb_which_events(ctdb); + if (poll(&fds, 1, -1) < 0) { + /* Signalled is OK, other error is bad. */ + if (errno == EINTR) + continue; + ctdb_request_free(req); + return NULL; + } + if (ctdb_service(ctdb, fds.revents) < 0) { + ctdb_request_free(req); + return NULL; + } } - return ctdb_service(ctdb, fds.revents) >= 0; + return req; } -static void getrecmaster_done(int status, uint32_t recmaster, void *priv_data) +static void set(struct ctdb_connection *ctdb, + struct ctdb_request *req, bool *done) { - struct ctdb_getrecmaster *grm = priv_data; - *grm->recmaster = recmaster; - grm->status = status; - grm->done = true; + *done = true; } int ctdb_getrecmaster(struct ctdb_connection *ctdb, uint32_t destnode, uint32_t *recmaster) { struct ctdb_request *req; - struct ctdb_getrecmaster grm; - - grm.done = false; - grm.recmaster = recmaster; - req = ctdb_getrecmaster_send(ctdb, destnode, getrecmaster_done, &grm); - if (!req) - return -1; - - while (!grm.done) { - if (!ctdb_service_flush(ctdb)) { - ctdb_cancel(req); - return -1; - } + bool done = false; + int ret = -1; + + req = wait_for(ctdb, + ctdb_getrecmaster_send(ctdb, destnode, set, &done), + &done); + if (req != NULL) { + ret = ctdb_getrecmaster_recv(req, recmaster); + ctdb_request_free(req); } - return grm.status; -} - -struct ctdb_attachdb { - bool done; - int status; - struct ctdb_db *ctdb_db; -}; - -static void attachdb_sync_done(int status, - struct ctdb_db *ctdb_db, void *private_data) -{ - struct ctdb_attachdb *atb = private_data; - atb->ctdb_db = ctdb_db; - atb->status = status; - atb->done = true; + return ret; } struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb, @@ -91,21 +79,16 @@ struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb, uint32_t tdb_flags) { struct ctdb_request *req; - struct ctdb_attachdb atb; - - atb.done = false; - req = ctdb_attachdb_send(ctdb, name, persistent, tdb_flags, - attachdb_sync_done, &atb); - if (!req) - return NULL; - - while (!atb.done) { - if (!ctdb_service_flush(ctdb)) { - ctdb_cancel(req); - return NULL; - } + bool done = false; + struct ctdb_db *ret = NULL; + + req = wait_for(ctdb, + ctdb_attachdb_send(ctdb, name, persistent, tdb_flags, + set, &done), + &done); + if (req != NULL) { + ret = ctdb_attachdb_recv(req); + ctdb_request_free(req); } - if (atb.status != 0) - return NULL; - return atb.ctdb_db; + return ret; } diff --git a/libctdb/tst.c b/libctdb/tst.c index 03ce5724..2b0aa14a 100644 --- a/libctdb/tst.c +++ b/libctdb/tst.c @@ -16,8 +16,13 @@ void msg_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *pr printf("Message received on port %d : %s\n", (int)srvid, data.dptr); } -void pnn_cb(int32_t status, uint32_t pnn, void *private_data) +static void pnn_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private) { + int status; + uint32_t pnn; + + status = ctdb_getpnn_recv(req, &pnn); if (status != 0) { printf("Error reading PNN\n"); return; @@ -25,8 +30,13 @@ void pnn_cb(int32_t status, uint32_t pnn, void *private_data) printf("status:%d pnn:%d\n", status, pnn); } -void rm_cb(int status, uint32_t rm, void *private_data) +static void rm_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private) { + int status; + uint32_t rm; + + status = ctdb_getrecmaster_recv(req, &rm); if (status != 0) { printf("Error reading RECMASTER\n"); return; @@ -35,7 +45,6 @@ void rm_cb(int status, uint32_t rm, void *private_data) printf("GETRECMASTER ASYNC: status:%d recmaster:%d\n", status, rm); } - /* * example on how to first read(non-existing recortds are implicitely created * on demand) a record and change it in the callback. @@ -43,13 +52,17 @@ void rm_cb(int status, uint32_t rm, void *private_data) * * Pure read, or pure write are just special cases of this cycle. */ -void rrl_cb(int32_t status, struct ctdb_lock *lock, TDB_DATA outdata, void *private_data) +static void rrl_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private) { + struct ctdb_lock *lock; + TDB_DATA outdata; TDB_DATA data; char tmp[256]; - if (status != 0) { - printf("rrl_cb returned error: status %d\n", status); + lock = ctdb_readrecordlock_recv(private, req, &outdata); + if (!lock) { + printf("rrl_cb returned error\n"); return; } @@ -72,9 +85,13 @@ void rrl_cb(int32_t status, struct ctdb_lock *lock, TDB_DATA outdata, void *priv } static bool registered = false; -void message_handler_cb(int status, void *private_data) +void message_handler_cb(struct ctdb_connection *ctdb, + struct ctdb_request *req, void *private) { - printf("Message handler registered: %i\n", status); + if (ctdb_set_message_handler_recv(ctdb, req) != 0) { + err(1, "registering message"); + } + printf("Message handler registered\n"); registered = true; } @@ -94,7 +111,8 @@ int main(int argc, char *argv[]) pfd.fd = ctdb_get_fd(ctdb_connection); - handle = ctdb_set_message_handler_send(ctdb_connection, 55, message_handler_cb, msg_h, NULL); + handle = ctdb_set_message_handler_send(ctdb_connection, 55, msg_h, + message_handler_cb, NULL); if (handle == NULL) { printf("Failed to register message port\n"); exit(10); -- 2.34.1