merge from ronnie
[tridge/ctdb.git] / server / ctdb_server.c
1 /* 
2    ctdb main protocol code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "lib/events/events.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "../include/ctdb_private.h"
27
28 /*
29   choose the transport we will use
30 */
31 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
32 {
33         ctdb->transport = talloc_strdup(ctdb, transport);
34         return 0;
35 }
36
37 /*
38   Check whether an ip is a valid node ip
39   Returns the node id for this ip address or -1
40 */
41 int ctdb_ip_to_nodeid(struct ctdb_context *ctdb, const char *nodeip)
42 {
43         int nodeid;
44
45         for (nodeid=0;nodeid<ctdb->num_nodes;nodeid++) {
46                 if (!strcmp(ctdb->nodes[nodeid]->address.address, nodeip)) {
47                         return nodeid;
48                 }
49         }
50
51         return -1;
52 }
53
54 /*
55   choose the recovery lock file
56 */
57 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
58 {
59         ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
60         return 0;
61 }
62
63 /*
64   set the directory for the local databases
65 */
66 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
67 {
68         ctdb->db_directory = talloc_strdup(ctdb, dir);
69         if (ctdb->db_directory == NULL) {
70                 return -1;
71         }
72         return 0;
73 }
74
75 /*
76   set the directory for the persistent databases
77 */
78 int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
79 {
80         ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
81         if (ctdb->db_directory_persistent == NULL) {
82                 return -1;
83         }
84         return 0;
85 }
86
87 /*
88   add a node to the list of active nodes
89 */
90 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
91 {
92         struct ctdb_node *node, **nodep;
93
94         nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
95         CTDB_NO_MEMORY(ctdb, nodep);
96
97         ctdb->nodes = nodep;
98         nodep = &ctdb->nodes[ctdb->num_nodes];
99         (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
100         CTDB_NO_MEMORY(ctdb, *nodep);
101         node = *nodep;
102
103         if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
104                 return -1;
105         }
106         node->ctdb = ctdb;
107         node->name = talloc_asprintf(node, "%s:%u", 
108                                      node->address.address, 
109                                      node->address.port);
110         /* this assumes that the nodes are kept in sorted order, and no gaps */
111         node->pnn = ctdb->num_nodes;
112
113         /* nodes start out disconnected and unhealthy */
114         node->flags = (NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY);
115
116         if (ctdb->address.address &&
117             ctdb_same_address(&ctdb->address, &node->address)) {
118                 ctdb->pnn = node->pnn;
119                 node->flags &= ~NODE_FLAGS_DISCONNECTED;
120         }
121
122         ctdb->num_nodes++;
123         node->dead_count = 0;
124
125         return 0;
126 }
127
128 /*
129   setup the node list from a file
130 */
131 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
132 {
133         char **lines;
134         int nlines;
135         int i;
136
137         talloc_free(ctdb->node_list_file);
138         ctdb->node_list_file = talloc_strdup(ctdb, nlist);
139
140         lines = file_lines_load(nlist, &nlines, ctdb);
141         if (lines == NULL) {
142                 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
143                 return -1;
144         }
145         while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
146                 nlines--;
147         }
148
149         for (i=0;i<nlines;i++) {
150                 if (ctdb_add_node(ctdb, lines[i]) != 0) {
151                         talloc_free(lines);
152                         return -1;
153                 }
154         }
155
156         /* initialize the vnn mapping table now that we have num_nodes setup */
157         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
158         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
159
160         ctdb->vnn_map->generation = INVALID_GENERATION;
161         ctdb->vnn_map->size = ctdb->num_nodes;
162         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
163         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
164
165         for(i=0;i<ctdb->vnn_map->size;i++) {
166                 ctdb->vnn_map->map[i] = i;
167         }
168         
169         talloc_free(lines);
170         return 0;
171 }
172
173
174 /*
175   setup the local node address
176 */
177 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
178 {
179         if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
180                 return -1;
181         }
182         
183         ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
184                                      ctdb->address.address, 
185                                      ctdb->address.port);
186         return 0;
187 }
188
189
190 /*
191   return the number of active nodes
192 */
193 uint32_t ctdb_get_num_active_nodes(struct ctdb_context *ctdb)
194 {
195         int i;
196         uint32_t count=0;
197         for (i=0;i<ctdb->vnn_map->size;i++) {
198                 struct ctdb_node *node = ctdb->nodes[ctdb->vnn_map->map[i]];
199                 if (!(node->flags & NODE_FLAGS_INACTIVE)) {
200                         count++;
201                 }
202         }
203         return count;
204 }
205
206
207 /*
208   called when we need to process a packet. This can be a requeued packet
209   after a lockwait, or a real packet from another node
210 */
211 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
212 {
213         TALLOC_CTX *tmp_ctx;
214
215         /* place the packet as a child of the tmp_ctx. We then use
216            talloc_free() below to free it. If any of the calls want
217            to keep it, then they will steal it somewhere else, and the
218            talloc_free() will only free the tmp_ctx */
219         tmp_ctx = talloc_new(ctdb);
220         talloc_steal(tmp_ctx, hdr);
221
222         DEBUG(DEBUG_DEBUG,(__location__ " ctdb request %u of type %u length %u from "
223                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
224                  hdr->srcnode, hdr->destnode));
225
226         switch (hdr->operation) {
227         case CTDB_REQ_CALL:
228         case CTDB_REPLY_CALL:
229         case CTDB_REQ_DMASTER:
230         case CTDB_REPLY_DMASTER:
231                 /* we dont allow these calls when banned */
232                 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_BANNED) {
233                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
234                                 " request %u"
235                                 " length %u from node %u to %u while node"
236                                 " is banned\n",
237                                  hdr->operation, hdr->reqid,
238                                  hdr->length, 
239                                  hdr->srcnode, hdr->destnode));
240                         goto done;
241                 }
242
243                 /* for ctdb_call inter-node operations verify that the
244                    remote node that sent us the call is running in the
245                    same generation instance as this node
246                 */
247                 if (ctdb->vnn_map->generation != hdr->generation) {
248                         DEBUG(DEBUG_DEBUG,(__location__ " ctdb operation %u"
249                                 " request %u"
250                                 " length %u from node %u to %u had an"
251                                 " invalid generation id:%u while our"
252                                 " generation id is:%u\n", 
253                                  hdr->operation, hdr->reqid,
254                                  hdr->length, 
255                                  hdr->srcnode, hdr->destnode, 
256                                  hdr->generation, ctdb->vnn_map->generation));
257                         goto done;
258                 }
259         }
260
261         switch (hdr->operation) {
262         case CTDB_REQ_CALL:
263                 ctdb->statistics.node.req_call++;
264                 ctdb_request_call(ctdb, hdr);
265                 break;
266
267         case CTDB_REPLY_CALL:
268                 ctdb->statistics.node.reply_call++;
269                 ctdb_reply_call(ctdb, hdr);
270                 break;
271
272         case CTDB_REPLY_ERROR:
273                 ctdb->statistics.node.reply_error++;
274                 ctdb_reply_error(ctdb, hdr);
275                 break;
276
277         case CTDB_REQ_DMASTER:
278                 ctdb->statistics.node.req_dmaster++;
279                 ctdb_request_dmaster(ctdb, hdr);
280                 break;
281
282         case CTDB_REPLY_DMASTER:
283                 ctdb->statistics.node.reply_dmaster++;
284                 ctdb_reply_dmaster(ctdb, hdr);
285                 break;
286
287         case CTDB_REQ_MESSAGE:
288                 ctdb->statistics.node.req_message++;
289                 ctdb_request_message(ctdb, hdr);
290                 break;
291
292         case CTDB_REQ_CONTROL:
293                 ctdb->statistics.node.req_control++;
294                 ctdb_request_control(ctdb, hdr);
295                 break;
296
297         case CTDB_REPLY_CONTROL:
298                 ctdb->statistics.node.reply_control++;
299                 ctdb_reply_control(ctdb, hdr);
300                 break;
301
302         case CTDB_REQ_KEEPALIVE:
303                 ctdb->statistics.keepalive_packets_recv++;
304                 break;
305
306         default:
307                 DEBUG(DEBUG_CRIT,("%s: Packet with unknown operation %u\n", 
308                          __location__, hdr->operation));
309                 break;
310         }
311
312 done:
313         talloc_free(tmp_ctx);
314 }
315
316
317 /*
318   called by the transport layer when a node is dead
319 */
320 void ctdb_node_dead(struct ctdb_node *node)
321 {
322         if (node->flags & NODE_FLAGS_DISCONNECTED) {
323                 DEBUG(DEBUG_INFO,("%s: node %s is already marked disconnected: %u connected\n", 
324                          node->ctdb->name, node->name, 
325                          node->ctdb->num_connected));
326                 return;
327         }
328         node->ctdb->num_connected--;
329         node->flags |= NODE_FLAGS_DISCONNECTED | NODE_FLAGS_UNHEALTHY;
330         node->rx_cnt = 0;
331         node->dead_count = 0;
332
333         DEBUG(DEBUG_NOTICE,("%s: node %s is dead: %u connected\n", 
334                  node->ctdb->name, node->name, node->ctdb->num_connected));
335         ctdb_daemon_cancel_controls(node->ctdb, node);
336
337         node->ctdb->methods->restart(node);
338 }
339
340 /*
341   called by the transport layer when a node is connected
342 */
343 void ctdb_node_connected(struct ctdb_node *node)
344 {
345         if (!(node->flags & NODE_FLAGS_DISCONNECTED)) {
346                 DEBUG(DEBUG_INFO,("%s: node %s is already marked connected: %u connected\n", 
347                          node->ctdb->name, node->name, 
348                          node->ctdb->num_connected));
349                 return;
350         }
351         node->ctdb->num_connected++;
352         node->dead_count = 0;
353         node->flags &= ~NODE_FLAGS_DISCONNECTED;
354         node->flags |= NODE_FLAGS_UNHEALTHY;
355         DEBUG(DEBUG_INFO,("%s: connected to %s - %u connected\n", 
356                  node->ctdb->name, node->name, node->ctdb->num_connected));
357 }
358
359 struct queue_next {
360         struct ctdb_context *ctdb;
361         struct ctdb_req_header *hdr;
362 };
363
364
365 /*
366   trigered when a deferred packet is due
367  */
368 static void queue_next_trigger(struct event_context *ev, struct timed_event *te, 
369                                struct timeval t, void *private_data)
370 {
371         struct queue_next *q = talloc_get_type(private_data, struct queue_next);
372         ctdb_input_pkt(q->ctdb, q->hdr);
373         talloc_free(q);
374 }       
375
376 /*
377   defer a packet, so it is processed on the next event loop
378   this is used for sending packets to ourselves
379  */
380 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
381 {
382         struct queue_next *q;
383         q = talloc(ctdb, struct queue_next);
384         if (q == NULL) {
385                 DEBUG(DEBUG_ERR,(__location__ " Failed to allocate deferred packet\n"));
386                 return;
387         }
388         q->ctdb = ctdb;
389         q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
390         if (q->hdr == NULL) {
391                 DEBUG(DEBUG_ERR,("Error copying deferred packet to self\n"));
392                 return;
393         }
394 #if 0
395         /* use this to put packets directly into our recv function */
396         ctdb_input_pkt(q->ctdb, q->hdr);
397 #else
398         event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
399 #endif
400 }
401
402
403 /*
404   broadcast a packet to all nodes
405 */
406 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, 
407                                       struct ctdb_req_header *hdr)
408 {
409         int i;
410         for (i=0;i<ctdb->num_nodes;i++) {
411                 hdr->destnode = ctdb->nodes[i]->pnn;
412                 ctdb_queue_packet(ctdb, hdr);
413         }
414 }
415
416 /*
417   broadcast a packet to all nodes in the current vnnmap
418 */
419 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, 
420                                          struct ctdb_req_header *hdr)
421 {
422         int i;
423         for (i=0;i<ctdb->vnn_map->size;i++) {
424                 hdr->destnode = ctdb->vnn_map->map[i];
425                 ctdb_queue_packet(ctdb, hdr);
426         }
427 }
428
429 /*
430   broadcast a packet to all connected nodes
431 */
432 static void ctdb_broadcast_packet_connected(struct ctdb_context *ctdb, 
433                                             struct ctdb_req_header *hdr)
434 {
435         int i;
436         for (i=0;i<ctdb->num_nodes;i++) {
437                 if (!(ctdb->nodes[i]->flags & NODE_FLAGS_DISCONNECTED)) {
438                         hdr->destnode = ctdb->nodes[i]->pnn;
439                         ctdb_queue_packet(ctdb, hdr);
440                 }
441         }
442 }
443
444 /*
445   queue a packet or die
446 */
447 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
448 {
449         struct ctdb_node *node;
450
451         switch (hdr->destnode) {
452         case CTDB_BROADCAST_ALL:
453                 ctdb_broadcast_packet_all(ctdb, hdr);
454                 return;
455         case CTDB_BROADCAST_VNNMAP:
456                 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
457                 return;
458         case CTDB_BROADCAST_CONNECTED:
459                 ctdb_broadcast_packet_connected(ctdb, hdr);
460                 return;
461         }
462
463         ctdb->statistics.node_packets_sent++;
464
465         if (!ctdb_validate_pnn(ctdb, hdr->destnode)) {
466                 DEBUG(DEBUG_CRIT,(__location__ " cant send to node %u that does not exist\n", 
467                          hdr->destnode));
468                 return;
469         }
470
471         node = ctdb->nodes[hdr->destnode];
472
473         if (hdr->destnode == ctdb->pnn) {
474                 ctdb_defer_packet(ctdb, hdr);
475         } else {
476                 node->tx_cnt++;
477                 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
478                         ctdb_fatal(ctdb, "Unable to queue packet\n");
479                 }
480         }
481 }
482
483