common/messaging: Abstract db related operations inside db functions
[ctdb.git] / common / ctdb_message.c
1 /* 
2    ctdb_message protocol code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Amitay Isaacs  2013
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 /*
21   see http://wiki.samba.org/index.php/Samba_%26_Clustering for
22   protocol design and packet details
23 */
24 #include "includes.h"
25 #include "lib/tevent/tevent.h"
26 #include "lib/tdb/include/tdb.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 static int message_list_db_init(struct ctdb_context *ctdb)
33 {
34         ctdb->message_list_indexdb = tdb_open("messagedb", 8192,
35                                               TDB_INTERNAL|TDB_DISALLOW_NESTING,
36                                               O_RDWR|O_CREAT, 0);
37         if (ctdb->message_list_indexdb == NULL) {
38                 DEBUG(DEBUG_ERR, ("Failed to create message list indexdb\n"));
39                 return -1;
40         }
41
42         return 0;
43 }
44
45 static int message_list_db_add(struct ctdb_context *ctdb, uint64_t srvid,
46                                struct ctdb_message_list_header *h)
47 {
48         int ret;
49         TDB_DATA key, data;
50
51         if (ctdb->message_list_indexdb == NULL) {
52                 ret = message_list_db_init(ctdb);
53                 if (ret < 0) {
54                         return -1;
55                 }
56         }
57
58         key.dptr = (uint8_t *)&srvid;
59         key.dsize = sizeof(uint64_t);
60
61         data.dptr = (uint8_t *)&h;
62         data.dsize = sizeof(struct ctdb_message_list_header *);
63
64         ret = tdb_store(ctdb->message_list_indexdb, key, data, TDB_INSERT);
65         if (ret < 0) {
66                 DEBUG(DEBUG_ERR, ("Failed to add message list handler (%s)\n",
67                                   tdb_errorstr(ctdb->message_list_indexdb)));
68                 return -1;
69         }
70
71         return 0;
72 }
73
74 static int message_list_db_delete(struct ctdb_context *ctdb, uint64_t srvid)
75 {
76         int ret;
77         TDB_DATA key;
78
79         if (ctdb->message_list_indexdb == NULL) {
80                 return -1;
81         }
82
83         key.dptr = (uint8_t *)&srvid;
84         key.dsize = sizeof(uint64_t);
85
86         ret = tdb_delete(ctdb->message_list_indexdb, key);
87         if (ret < 0) {
88                 DEBUG(DEBUG_ERR, ("Failed to delete message list handler (%s)\n",
89                                   tdb_errorstr(ctdb->message_list_indexdb)));
90                 return -1;
91         }
92
93         return 0;
94 }
95
96 static int message_list_db_fetch(struct ctdb_context *ctdb, uint64_t srvid,
97                                  struct ctdb_message_list_header **h)
98 {
99         TDB_DATA key, data;
100
101         if (ctdb->message_list_indexdb == NULL) {
102                 return -1;
103         }
104
105         key.dptr = (uint8_t *)&srvid;
106         key.dsize = sizeof(uint64_t);
107
108         data = tdb_fetch(ctdb->message_list_indexdb, key);
109         if (data.dsize != sizeof(struct ctdb_message_list_header *)) {
110                 talloc_free(data.dptr);
111                 return -1;
112         }
113
114         *h = *(struct ctdb_message_list_header **)data.dptr;
115         talloc_free(data.dptr);
116
117         return 0;
118 }
119
120 /*
121   this dispatches the messages to the registered ctdb message handler
122 */
123 int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
124 {
125         struct ctdb_message_list_header *h;
126         struct ctdb_message_list *m;
127         uint64_t srvid_all = CTDB_SRVID_ALL;
128         int ret;
129
130         ret = message_list_db_fetch(ctdb, srvid, &h);
131         if (ret == 0) {
132                 for (m=h->m; m; m=m->next) {
133                         m->message_handler(ctdb, srvid, data, m->message_private);
134                 }
135         }
136
137         ret = message_list_db_fetch(ctdb, srvid_all, &h);
138         if (ret == 0) {
139                 for(m=h->m; m; m=m->next) {
140                         m->message_handler(ctdb, srvid, data, m->message_private);
141                 }
142         }
143
144         return 0;
145 }
146
147 /*
148   called when a CTDB_REQ_MESSAGE packet comes in
149 */
150 void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
151 {
152         struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
153         TDB_DATA data;
154
155         data.dsize = c->datalen;
156         data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
157
158         ctdb_dispatch_message(ctdb, c->srvid, data);
159 }
160
161 /*
162  * When header is freed, remove all the srvid handlers
163  */
164 static int message_header_destructor(struct ctdb_message_list_header *h)
165 {
166         struct ctdb_message_list *m;
167
168         while (h->m != NULL) {
169                 m = h->m;
170                 DLIST_REMOVE(h->m, m);
171                 TALLOC_FREE(m);
172         }
173
174         message_list_db_delete(h->ctdb, h->srvid);
175         DLIST_REMOVE(h->ctdb->message_list_header, h);
176
177         return 0;
178 }
179
180 /*
181   when a client goes away, we need to remove its srvid handler from the list
182  */
183 static int message_handler_destructor(struct ctdb_message_list *m)
184 {
185         struct ctdb_message_list_header *h = m->h;
186
187         DLIST_REMOVE(h->m, m);
188         if (h->m == NULL) {
189                 talloc_free(h);
190         }
191         return 0;
192 }
193
194 /*
195   setup handler for receipt of ctdb messages from ctdb_send_message()
196 */
197 int ctdb_register_message_handler(struct ctdb_context *ctdb, 
198                                   TALLOC_CTX *mem_ctx,
199                                   uint64_t srvid,
200                                   ctdb_msg_fn_t handler,
201                                   void *private_data)
202 {
203         struct ctdb_message_list_header *h;
204         struct ctdb_message_list *m;
205         int ret;
206
207         m = talloc_zero(mem_ctx, struct ctdb_message_list);
208         CTDB_NO_MEMORY(ctdb, m);
209
210         m->message_handler = handler;
211         m->message_private = private_data;
212
213         ret = message_list_db_fetch(ctdb, srvid, &h);
214         if (ret != 0) {
215                 /* srvid not registered yet */
216                 h = talloc_zero(ctdb, struct ctdb_message_list_header);
217                 CTDB_NO_MEMORY(ctdb, h);
218
219                 h->ctdb = ctdb;
220                 h->srvid = srvid;
221
222                 ret = message_list_db_add(ctdb, srvid, h);
223                 if (ret < 0) {
224                         talloc_free(m);
225                         talloc_free(h);
226                         return -1;
227                 }
228
229                 DLIST_ADD(ctdb->message_list_header, h);
230                 talloc_set_destructor(h, message_header_destructor);
231         }
232
233         m->h = h;
234         DLIST_ADD(h->m, m);
235         talloc_set_destructor(m, message_handler_destructor);
236         return 0;
237 }
238
239
240 /*
241   setup handler for receipt of ctdb messages from ctdb_send_message()
242 */
243 int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
244 {
245         struct ctdb_message_list_header *h;
246         struct ctdb_message_list *m;
247         int ret;
248
249         ret = message_list_db_fetch(ctdb, srvid, &h);
250         if (ret != 0) {
251                 return -1;
252         }
253
254         for (m=h->m; m; m=m->next) {
255                 if (m->message_private == private_data) {
256                         talloc_free(m);
257                         return 0;
258                 }
259         }
260
261         return -1;
262 }
263
264
265 /*
266  * check if the given srvid exists
267  */
268 bool ctdb_check_message_handler(struct ctdb_context *ctdb, uint64_t srvid)
269 {
270         struct ctdb_message_list_header *h;
271         int ret;
272
273         ret = message_list_db_fetch(ctdb, srvid, &h);
274         if (ret != 0 || h->m == NULL) {
275                 return false;
276         }
277
278         return true;
279 }