LibCTDB
[metze/ctdb/wip.git] / libctdb / messages.c
index 6ec35415d3499bcff74f5dd0ba34e0831821ac35..f28ac4a61bee257eaffea05ca00b9651ffac0454 100644 (file)
@@ -6,16 +6,19 @@
 #include <ctdb_protocol.h>
 #include <stdlib.h>
 #include <string.h>
+#include <errno.h>
+
+/* Remove type-safety macros. */
+#undef ctdb_set_message_handler_send
+#undef ctdb_set_message_handler_recv
+#undef ctdb_remove_message_handler_send
 
 struct message_handler_info {
        struct message_handler_info *next, *prev;
-       /* Callback when we're first registered. */
-       ctdb_set_message_handler_cb callback;
 
        uint64_t srvid;
        ctdb_message_fn_t handler;
-       void *private_data;
-       struct ctdb_connection *ctdb;
+       void *handler_data;
 };
 
 void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
@@ -23,88 +26,189 @@ void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
        struct message_handler_info *i;
        struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
        TDB_DATA data;
+       bool found;
 
        data.dptr = msg->data;
        data.dsize = msg->datalen;
 
+       /* Note: we want to call *every* handler: there may be more than one */
        for (i = ctdb->message_handlers; i; i = i->next) {
                if (i->srvid == msg->srvid) {
-                       i->handler(ctdb, msg->srvid, data, i->private_data);
+                       i->handler(ctdb, msg->srvid, data, i->handler_data);
+                       found = true;
                }
        }
-       /* FIXME: Report unknown messages */
+       if (!found) {
+               DEBUG(ctdb, LOG_WARNING,
+                     "ctdb_service: messsage for unregistered srvid %llu",
+                     msg->srvid);
+       }
 }
 
-static void set_message_handler(int status, struct message_handler_info *info)
+void remove_message_handlers(struct ctdb_connection *ctdb)
 {
-       /* If registration failed, tell callback and clean up */
-       if (status < 0) {
-               info->callback(status, info->private_data);
-               free(info);
-               return;
-       } else {
-               /* Put ourselves in list of handlers. */
-               DLIST_ADD_END(info->ctdb->message_handlers, info,
-                             struct message_handler_info);
-               /* Now call callback: it could remove us in theory. */
-               info->callback(status, info->private_data);
+       struct message_handler_info *i;
+
+       /* ctdbd should unregister automatically when we close fd, so we don't
+          need to do that here. */
+       while ((i = ctdb->message_handlers) != NULL) {
+               DLIST_REMOVE(ctdb->message_handlers, i);
+               free(i);
+       }
+}
+
+bool ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
+                                  struct ctdb_request *req)
+{
+       struct message_handler_info *info = req->extra;
+       struct ctdb_reply_control *reply;
+
+       reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_REGISTER_SRVID);
+       if (!reply) {
+               return false;
+       }
+       if (reply->status != 0) {
+               DEBUG(ctdb, LOG_ERR,
+                     "ctdb_set_message_handler_recv: status %i",
+                     reply->status);
+               return false;
        }
+
+       /* Put ourselves in list of handlers. */
+       DLIST_ADD(ctdb->message_handlers, info);
+       /* Keep safe from destructor */
+       req->extra = NULL;
+       return true;
+}
+
+static void free_info(struct ctdb_connection *ctdb, struct ctdb_request *req)
+{
+       free(req->extra);
 }
 
 struct ctdb_request *
 ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
-                             ctdb_set_message_handler_cb callback,
-                             ctdb_message_fn_t handler, void *private_data)
+                             ctdb_message_fn_t handler, void *handler_data,
+                             ctdb_callback_t callback, void *private_data)
 {
-       struct ctdb_request *req;
        struct message_handler_info *info;
+       struct ctdb_request *req;
 
        info = malloc(sizeof(*info));
        if (!info) {
+               DEBUG(ctdb, LOG_ERR,
+                     "ctdb_set_message_handler_send: allocating info");
                return NULL;
        }
+
        req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
-                                      CTDB_CURRENT_NODE, NULL, 0);
+                                      CTDB_CURRENT_NODE, NULL, 0,
+                                      callback, private_data);
        if (!req) {
+               DEBUG(ctdb, LOG_ERR,
+                     "ctdb_set_message_handler_send: allocating request");
                free(info);
                return NULL;
        }
+       req->extra = info;
+       req->extra_destructor = free_info;
        req->hdr.control->srvid = srvid;
 
        info->srvid = srvid;
        info->handler = handler;
-       info->callback = callback;
-       info->private_data = private_data;
-       info->ctdb = ctdb;
+       info->handler_data = handler_data;
+
+       DEBUG(ctdb, LOG_DEBUG,
+             "ctdb_set_message_handler_send: sending request %u for id %llx",
+             req->hdr.hdr->reqid, srvid);
+       return req;
+}
+
+struct ctdb_request *
+ctdb_remove_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
+                                ctdb_message_fn_t handler, void *hdata,
+                                ctdb_callback_t callback, void *cbdata)
+{
+       struct message_handler_info *i;
+       struct ctdb_request *req;
 
-       req->callback.register_srvid = set_message_handler;
-       req->priv_data = info;
+       for (i = ctdb->message_handlers; i; i = i->next) {
+               if (i->srvid == srvid
+                   && i->handler == handler && i->handler_data == hdata) {
+                       break;
+               }
+       }
+       if (!i) {
+               DEBUG(ctdb, LOG_ALERT,
+                     "ctdb_remove_message_handler_send: no such handler");
+               errno = ENOENT;
+               return NULL;
+       }
+
+       req = new_ctdb_control_request(ctdb, CTDB_CONTROL_DEREGISTER_SRVID,
+                                      CTDB_CURRENT_NODE, NULL, 0,
+                                      callback, cbdata);
+       if (!req) {
+               DEBUG(ctdb, LOG_ERR,
+                     "ctdb_remove_message_handler_send: allocating request");
+               return NULL;
+       }
+       req->hdr.control->srvid = srvid;
+       req->extra = i;
 
+       DEBUG(ctdb, LOG_DEBUG,
+             "ctdb_set_remove_handler_send: sending request %u for id %llu",
+             req->hdr.hdr->reqid, srvid);
        return req;
 }
 
-int ctdb_send_message(struct ctdb_connection *ctdb,
+bool ctdb_remove_message_handler_recv(struct ctdb_connection *ctdb,
+                                     struct ctdb_request *req)
+{
+       struct message_handler_info *handler = req->extra;
+       struct ctdb_reply_control *reply;
+
+       reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_DEREGISTER_SRVID);
+       if (!reply) {
+               return false;
+       }
+       if (reply->status != 0) {
+               DEBUG(ctdb, LOG_ERR,
+                     "ctdb_remove_message_handler_recv: status %i",
+                     reply->status);
+               return false;
+       }
+
+       /* Remove ourselves from list of handlers. */
+       DLIST_REMOVE(ctdb->message_handlers, handler);
+       free(handler);
+       /* Crash if they call this again! */
+       req->extra = NULL;
+       return true;
+}
+
+bool ctdb_send_message(struct ctdb_connection *ctdb,
                      uint32_t pnn, uint64_t srvid,
                      TDB_DATA data)
 {
        struct ctdb_request *req;
        struct ctdb_req_message *pkt;
 
-       req = new_ctdb_request(sizeof(*pkt) + data.dsize);
+       /* We just discard it once it's finished: no reply. */
+       req = new_ctdb_request(offsetof(struct ctdb_req_message, data) + data.dsize,
+                              ctdb_cancel_callback, NULL);
        if (!req) {
-               return -1;
+               DEBUG(ctdb, LOG_ERR, "ctdb_set_message: allocating message");
+               return false;
        }
 
-       io_elem_init_req_header(req->io,
+       io_elem_init_req_header(req->pdu,
                                CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
 
-       /* There's no reply to this, so we mark it cancelled immediately. */
-       req->cancelled = true;
-
        pkt = req->hdr.message;
        pkt->srvid = srvid;
        pkt->datalen = data.dsize;
        memcpy(pkt->data, data.dptr, data.dsize);
        DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
-       return 0;
+       return true;
 }