e4b03d9e9ef192e4fb51eb542faf0dd9f16b5b77
[rusty/ctdb.git] / libctdb / messages.c
1 #include "libctdb_private.h"
2 #include "messages.h"
3 #include "io_elem.h"
4 #include <ctdb.h>
5 #include <tdb.h>
6 #include <ctdb_protocol.h>
7 #include <stdlib.h>
8 #include <string.h>
9
10 /* Remove type-safety macros. */
11 #undef ctdb_set_message_handler_send
12 #undef ctdb_set_message_handler_recv
13 #undef ctdb_remove_message_handler_send
14
15 struct message_handler_info {
16         struct message_handler_info *next, *prev;
17
18         uint64_t srvid;
19         ctdb_message_fn_t handler;
20         void *private_data;
21 };
22
23 void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
24 {
25         struct message_handler_info *i;
26         struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
27         TDB_DATA data;
28         bool found;
29
30         data.dptr = msg->data;
31         data.dsize = msg->datalen;
32
33         for (i = ctdb->message_handlers; i; i = i->next) {
34                 if (i->srvid == msg->srvid) {
35                         i->handler(ctdb, msg->srvid, data, i->private_data);
36                         found = true;
37                 }
38         }
39         if (!found) {
40                 DEBUG(ctdb, LOG_WARNING,
41                       "ctdb_service: messsage for unregistered srvid %llu",
42                       msg->srvid);
43         }
44 }
45
46 int ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
47                                   struct ctdb_request *req)
48 {
49         struct message_handler_info *info = req->extra;
50         struct ctdb_reply_control *reply;
51
52         reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_REGISTER_SRVID);
53         if (!reply) {
54                 return -1;
55         }
56         if (reply->status != 0) {
57                 DEBUG(ctdb, LOG_WARNING,
58                       "ctdb_set_message_handler_recv: status %i",
59                       reply->status);
60                 return -1;
61         }
62
63         /* Put ourselves in list of handlers. */
64         DLIST_ADD(ctdb->message_handlers, info);
65         /* Keep safe from destructor */
66         req->extra = NULL;
67         return 0;
68 }
69
70 static void free_info(struct ctdb_connection *ctdb, struct ctdb_request *req)
71 {
72         free(req->extra);
73 }
74
75 struct ctdb_request *
76 ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
77                               ctdb_message_fn_t handler,
78                               ctdb_callback_t callback, void *private_data)
79 {
80         struct message_handler_info *info;
81         struct ctdb_request *req;
82
83         info = malloc(sizeof(*info));
84         if (!info) {
85                 DEBUG(ctdb, LOG_ERR,
86                       "ctdb_set_message_handler_send: allocating info");
87                 return NULL;
88         }
89
90         req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
91                                        CTDB_CURRENT_NODE, NULL, 0,
92                                        callback, private_data);
93         if (!req) {
94                 DEBUG(ctdb, LOG_ERR,
95                       "ctdb_set_message_handler_send: allocating request");
96                 free(info);
97                 return NULL;
98         }
99         req->extra = info;
100         req->extra_destructor = free_info;
101         req->hdr.control->srvid = srvid;
102
103         info->srvid = srvid;
104         info->handler = handler;
105         info->private_data = private_data;
106
107         DEBUG(ctdb, LOG_DEBUG,
108               "ctdb_set_message_handler_send: sending request %u for id %llu",
109               req->hdr.hdr->reqid, srvid);
110         return req;
111 }
112
113 int ctdb_send_message(struct ctdb_connection *ctdb,
114                       uint32_t pnn, uint64_t srvid,
115                       TDB_DATA data)
116 {
117         struct ctdb_request *req;
118         struct ctdb_req_message *pkt;
119
120         /* We just discard it once it's finished: no reply. */
121         req = new_ctdb_request(offsetof(struct ctdb_req_message, data) + data.dsize,
122                                ctdb_cancel_callback, NULL);
123         if (!req) {
124                 DEBUG(ctdb, LOG_ERR, "ctdb_set_message: allocating message");
125                 return -1;
126         }
127
128         io_elem_init_req_header(req->io,
129                                 CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
130
131         pkt = req->hdr.message;
132         pkt->srvid = srvid;
133         pkt->datalen = data.dsize;
134         memcpy(pkt->data, data.dptr, data.dsize);
135         DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
136         return 0;
137 }