#include <ctdb_protocol.h>
#include <stdlib.h>
#include <string.h>
+#include <errno.h>
+
+/* Remove type-safety macros. */
+#undef ctdb_set_message_handler_send
+#undef ctdb_set_message_handler_recv
+#undef ctdb_remove_message_handler_send
struct message_handler_info {
struct message_handler_info *next, *prev;
- /* Callback when we're first registered. */
- ctdb_set_message_handler_cb callback;
uint64_t srvid;
ctdb_message_fn_t handler;
- void *private_data;
- struct ctdb_connection *ctdb;
+ void *handler_data;
};
void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
struct message_handler_info *i;
struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
TDB_DATA data;
+ bool found;
data.dptr = msg->data;
data.dsize = msg->datalen;
+ /* Note: we want to call *every* handler: there may be more than one */
for (i = ctdb->message_handlers; i; i = i->next) {
if (i->srvid == msg->srvid) {
- i->handler(ctdb, msg->srvid, data, i->private_data);
+ i->handler(ctdb, msg->srvid, data, i->handler_data);
+ found = true;
}
}
- /* FIXME: Report unknown messages */
+ if (!found) {
+ DEBUG(ctdb, LOG_WARNING,
+ "ctdb_service: messsage for unregistered srvid %llu",
+ msg->srvid);
+ }
}
-static void set_message_handler(int status, struct message_handler_info *info)
+void remove_message_handlers(struct ctdb_connection *ctdb)
{
- /* If registration failed, tell callback and clean up */
- if (status < 0) {
- info->callback(status, info->private_data);
- free(info);
- return;
- } else {
- /* Put ourselves in list of handlers. */
- DLIST_ADD_END(info->ctdb->message_handlers, info,
- struct message_handler_info);
- /* Now call callback: it could remove us in theory. */
- info->callback(status, info->private_data);
+ struct message_handler_info *i;
+
+ /* ctdbd should unregister automatically when we close fd, so we don't
+ need to do that here. */
+ while ((i = ctdb->message_handlers) != NULL) {
+ DLIST_REMOVE(ctdb->message_handlers, i);
+ free(i);
+ }
+}
+
+bool ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
+ struct ctdb_request *req)
+{
+ struct message_handler_info *info = req->extra;
+ struct ctdb_reply_control *reply;
+
+ reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_REGISTER_SRVID);
+ if (!reply) {
+ return false;
+ }
+ if (reply->status != 0) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_set_message_handler_recv: status %i",
+ reply->status);
+ return false;
}
+
+ /* Put ourselves in list of handlers. */
+ DLIST_ADD(ctdb->message_handlers, info);
+ /* Keep safe from destructor */
+ req->extra = NULL;
+ return true;
+}
+
+static void free_info(struct ctdb_connection *ctdb, struct ctdb_request *req)
+{
+ free(req->extra);
}
struct ctdb_request *
ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
- ctdb_set_message_handler_cb callback,
- ctdb_message_fn_t handler, void *private_data)
+ ctdb_message_fn_t handler, void *handler_data,
+ ctdb_callback_t callback, void *private_data)
{
- struct ctdb_request *req;
struct message_handler_info *info;
+ struct ctdb_request *req;
info = malloc(sizeof(*info));
if (!info) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_set_message_handler_send: allocating info");
return NULL;
}
+
req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
- CTDB_CURRENT_NODE, NULL, 0);
+ CTDB_CURRENT_NODE, NULL, 0,
+ callback, private_data);
if (!req) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_set_message_handler_send: allocating request");
free(info);
return NULL;
}
+ req->extra = info;
+ req->extra_destructor = free_info;
req->hdr.control->srvid = srvid;
info->srvid = srvid;
info->handler = handler;
- info->callback = callback;
- info->private_data = private_data;
- info->ctdb = ctdb;
+ info->handler_data = handler_data;
+
+ DEBUG(ctdb, LOG_DEBUG,
+ "ctdb_set_message_handler_send: sending request %u for id %llx",
+ req->hdr.hdr->reqid, srvid);
+ return req;
+}
+
+struct ctdb_request *
+ctdb_remove_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
+ ctdb_message_fn_t handler, void *hdata,
+ ctdb_callback_t callback, void *cbdata)
+{
+ struct message_handler_info *i;
+ struct ctdb_request *req;
- req->callback.register_srvid = set_message_handler;
- req->priv_data = info;
+ for (i = ctdb->message_handlers; i; i = i->next) {
+ if (i->srvid == srvid
+ && i->handler == handler && i->handler_data == hdata) {
+ break;
+ }
+ }
+ if (!i) {
+ DEBUG(ctdb, LOG_ALERT,
+ "ctdb_remove_message_handler_send: no such handler");
+ errno = ENOENT;
+ return NULL;
+ }
+
+ req = new_ctdb_control_request(ctdb, CTDB_CONTROL_DEREGISTER_SRVID,
+ CTDB_CURRENT_NODE, NULL, 0,
+ callback, cbdata);
+ if (!req) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_remove_message_handler_send: allocating request");
+ return NULL;
+ }
+ req->hdr.control->srvid = srvid;
+ req->extra = i;
+ DEBUG(ctdb, LOG_DEBUG,
+ "ctdb_set_remove_handler_send: sending request %u for id %llu",
+ req->hdr.hdr->reqid, srvid);
return req;
}
-int ctdb_send_message(struct ctdb_connection *ctdb,
+bool ctdb_remove_message_handler_recv(struct ctdb_connection *ctdb,
+ struct ctdb_request *req)
+{
+ struct message_handler_info *handler = req->extra;
+ struct ctdb_reply_control *reply;
+
+ reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_DEREGISTER_SRVID);
+ if (!reply) {
+ return false;
+ }
+ if (reply->status != 0) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_remove_message_handler_recv: status %i",
+ reply->status);
+ return false;
+ }
+
+ /* Remove ourselves from list of handlers. */
+ DLIST_REMOVE(ctdb->message_handlers, handler);
+ free(handler);
+ /* Crash if they call this again! */
+ req->extra = NULL;
+ return true;
+}
+
+bool ctdb_send_message(struct ctdb_connection *ctdb,
uint32_t pnn, uint64_t srvid,
TDB_DATA data)
{
struct ctdb_request *req;
struct ctdb_req_message *pkt;
- req = new_ctdb_request(sizeof(*pkt) + data.dsize);
+ /* We just discard it once it's finished: no reply. */
+ req = new_ctdb_request(offsetof(struct ctdb_req_message, data) + data.dsize,
+ ctdb_cancel_callback, NULL);
if (!req) {
- return -1;
+ DEBUG(ctdb, LOG_ERR, "ctdb_set_message: allocating message");
+ return false;
}
- io_elem_init_req_header(req->io,
+ io_elem_init_req_header(req->pdu,
CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
- /* There's no reply to this, so we mark it cancelled immediately. */
- req->cancelled = true;
-
pkt = req->hdr.message;
pkt->srvid = srvid;
pkt->datalen = data.dsize;
memcpy(pkt->data, data.dptr, data.dsize);
DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
- return 0;
+ return true;
}