864ec3e7ea668f9adf80d80bb51a47d4a4dac94d
[rusty/ctdb.git] / libctdb / messages.c
1 #include "libctdb_private.h"
2 #include "messages.h"
3 #include "io_elem.h"
4 #include <ctdb.h>
5 #include <tdb.h>
6 #include <ctdb_protocol.h>
7 #include <stdlib.h>
8 #include <string.h>
9
10 /* Remove type-safety macros. */
11 #undef ctdb_set_message_handler_send
12 #undef ctdb_set_message_handler_recv
13 #undef ctdb_remove_message_handler_send
14
15 struct message_handler_info {
16         struct message_handler_info *next, *prev;
17
18         uint64_t srvid;
19         ctdb_message_fn_t handler;
20         void *private_data;
21 };
22
23 void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
24 {
25         struct message_handler_info *i;
26         struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
27         TDB_DATA data;
28
29         data.dptr = msg->data;
30         data.dsize = msg->datalen;
31
32         for (i = ctdb->message_handlers; i; i = i->next) {
33                 if (i->srvid == msg->srvid) {
34                         i->handler(ctdb, msg->srvid, data, i->private_data);
35                 }
36         }
37         /* FIXME: Report unknown messages */
38 }
39
40 int ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
41                                   struct ctdb_request *req)
42 {
43         struct message_handler_info *info = req->extra;
44         struct ctdb_reply_control *reply;
45
46         reply = unpack_reply_control(req, CTDB_CONTROL_REGISTER_SRVID);
47         if (!reply || reply->status != 0) {
48                 return -1;
49         }
50
51         /* Put ourselves in list of handlers. */
52         DLIST_ADD_END(ctdb->message_handlers, info,
53                       struct message_handler_info);
54         /* Keep safe from destructor */
55         req->extra = NULL;
56         return 0;
57 }
58
59 static void free_info(struct ctdb_request *req)
60 {
61         free(req->extra);
62 }
63
64 struct ctdb_request *
65 ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
66                               ctdb_message_fn_t handler,
67                               ctdb_callback_t callback, void *private_data)
68 {
69         struct message_handler_info *info;
70         struct ctdb_request *req;
71
72         info = malloc(sizeof(*info));
73         if (!info) {
74                 return NULL;
75         }
76
77         req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
78                                        CTDB_CURRENT_NODE, NULL, 0,
79                                        callback, private_data);
80         if (!req) {
81                 free(info);
82                 return NULL;
83         }
84         req->extra = info;
85         req->extra_destructor = free_info;
86         req->hdr.control->srvid = srvid;
87
88         info->srvid = srvid;
89         info->handler = handler;
90         info->private_data = private_data;
91
92         return req;
93 }
94
95 int ctdb_send_message(struct ctdb_connection *ctdb,
96                       uint32_t pnn, uint64_t srvid,
97                       TDB_DATA data)
98 {
99         struct ctdb_request *req;
100         struct ctdb_req_message *pkt;
101
102         /* We just discard it once it's finished: no reply. */
103         req = new_ctdb_request(sizeof(*pkt) + data.dsize,
104                                ctdb_cancel_callback, NULL);
105         if (!req) {
106                 return -1;
107         }
108
109         io_elem_init_req_header(req->io,
110                                 CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
111
112         pkt = req->hdr.message;
113         pkt->srvid = srvid;
114         pkt->datalen = data.dsize;
115         memcpy(pkt->data, data.dptr, data.dsize);
116         DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
117         return 0;
118 }