2 ctdb main protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, write to the Free Software
18 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "../include/ctdb_private.h"
30 choose the transport we will use
32 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
34 ctdb->transport = talloc_strdup(ctdb, transport);
39 choose the recovery lock file
41 int ctdb_set_recovery_lock_file(struct ctdb_context *ctdb, const char *file)
43 ctdb->recovery_lock_file = talloc_strdup(ctdb, file);
48 choose the logfile location
50 int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile)
52 ctdb->logfile = talloc_strdup(ctdb, logfile);
53 if (ctdb->logfile != NULL && strcmp(logfile, "-") != 0) {
57 fd = open(ctdb->logfile, O_WRONLY|O_APPEND|O_CREAT, 0666);
65 /* also catch stderr of subcommands to the log file */
75 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
83 void ctdb_clear_flags(struct ctdb_context *ctdb, unsigned flags)
85 ctdb->flags &= ~flags;
89 set max acess count before a dmaster migration
91 void ctdb_set_max_lacount(struct ctdb_context *ctdb, unsigned count)
93 ctdb->max_lacount = count;
97 set the directory for the local databases
99 int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
101 ctdb->db_directory = talloc_strdup(ctdb, dir);
102 if (ctdb->db_directory == NULL) {
109 add a node to the list of active nodes
111 static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
113 struct ctdb_node *node, **nodep;
115 nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
116 CTDB_NO_MEMORY(ctdb, nodep);
119 nodep = &ctdb->nodes[ctdb->num_nodes];
120 (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
121 CTDB_NO_MEMORY(ctdb, *nodep);
124 if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
128 node->name = talloc_asprintf(node, "%s:%u",
129 node->address.address,
131 /* this assumes that the nodes are kept in sorted order, and no gaps */
132 node->vnn = ctdb->num_nodes;
134 if (ctdb->address.address &&
135 ctdb_same_address(&ctdb->address, &node->address)) {
136 ctdb->vnn = node->vnn;
137 node->flags |= NODE_FLAGS_CONNECTED;
141 node->dead_count = 0;
147 setup the node list from a file
149 int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
155 talloc_free(ctdb->node_list_file);
156 ctdb->node_list_file = talloc_strdup(ctdb, nlist);
158 lines = file_lines_load(nlist, &nlines, ctdb);
160 ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
163 while (nlines > 0 && strcmp(lines[nlines-1], "") == 0) {
167 for (i=0;i<nlines;i++) {
168 if (ctdb_add_node(ctdb, lines[i]) != 0) {
174 /* initialize the vnn mapping table now that we have num_nodes setup */
176 XXX we currently initialize it to the maximum number of nodes to
177 XXX make it behave the same way as previously.
178 XXX Once we have recovery working we should initialize this always to
179 XXX generation==0 (==invalid) and let the recovery tool populate this
180 XXX table for the daemons.
182 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
183 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
185 ctdb->vnn_map->generation = 1;
186 ctdb->vnn_map->size = ctdb->num_nodes;
187 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
188 CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
190 for(i=0;i<ctdb->vnn_map->size;i++) {
191 ctdb->vnn_map->map[i] = i;
200 setup the local node address
202 int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
204 if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
208 ctdb->name = talloc_asprintf(ctdb, "%s:%u",
209 ctdb->address.address,
216 setup the local socket name
218 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
220 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
224 return the vnn of this node
226 uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
232 return the number of connected nodes
234 uint32_t ctdb_get_num_connected_nodes(struct ctdb_context *ctdb)
238 for (i=0;i<ctdb->vnn_map->size;i++) {
239 if (ctdb->nodes[ctdb->vnn_map->map[i]]->flags & NODE_FLAGS_CONNECTED) {
248 called when we need to process a packet. This can be a requeued packet
249 after a lockwait, or a real packet from another node
251 void ctdb_input_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
255 /* place the packet as a child of the tmp_ctx. We then use
256 talloc_free() below to free it. If any of the calls want
257 to keep it, then they will steal it somewhere else, and the
258 talloc_free() will only free the tmp_ctx */
259 tmp_ctx = talloc_new(ctdb);
260 talloc_steal(tmp_ctx, hdr);
262 DEBUG(3,(__location__ " ctdb request %u of type %u length %u from "
263 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
264 hdr->srcnode, hdr->destnode));
266 switch (hdr->operation) {
268 case CTDB_REPLY_CALL:
269 case CTDB_REQ_DMASTER:
270 case CTDB_REPLY_DMASTER:
271 /* for ctdb_call inter-node operations verify that the
272 remote node that sent us the call is running in the
273 same generation instance as this node
275 if (ctdb->vnn_map->generation != hdr->generation) {
276 DEBUG(0,(__location__ " ctdb request %u"
277 " length %u from node %u to %u had an"
278 " invalid generation id:%u while our"
279 " generation id is:%u\n",
280 hdr->reqid, hdr->length,
281 hdr->srcnode, hdr->destnode,
282 hdr->generation, ctdb->vnn_map->generation));
287 switch (hdr->operation) {
289 ctdb->statistics.node.req_call++;
290 ctdb_request_call(ctdb, hdr);
293 case CTDB_REPLY_CALL:
294 ctdb->statistics.node.reply_call++;
295 ctdb_reply_call(ctdb, hdr);
298 case CTDB_REPLY_ERROR:
299 ctdb->statistics.node.reply_error++;
300 ctdb_reply_error(ctdb, hdr);
303 case CTDB_REQ_DMASTER:
304 ctdb->statistics.node.req_dmaster++;
305 ctdb_request_dmaster(ctdb, hdr);
308 case CTDB_REPLY_DMASTER:
309 ctdb->statistics.node.reply_dmaster++;
310 ctdb_reply_dmaster(ctdb, hdr);
313 case CTDB_REQ_MESSAGE:
314 ctdb->statistics.node.req_message++;
315 ctdb_request_message(ctdb, hdr);
318 case CTDB_REQ_FINISHED:
319 ctdb->statistics.node.req_finished++;
320 ctdb_request_finished(ctdb, hdr);
323 case CTDB_REQ_CONTROL:
324 ctdb->statistics.node.req_control++;
325 ctdb_request_control(ctdb, hdr);
328 case CTDB_REPLY_CONTROL:
329 ctdb->statistics.node.reply_control++;
330 ctdb_reply_control(ctdb, hdr);
333 case CTDB_REQ_KEEPALIVE:
334 ctdb->statistics.keepalive_packets_recv++;
338 DEBUG(0,("%s: Packet with unknown operation %u\n",
339 __location__, hdr->operation));
344 talloc_free(tmp_ctx);
349 called by the transport layer when a packet comes in
351 static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
353 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
355 ctdb->statistics.node_packets_recv++;
357 /* up the counter for this source node, so we know its alive */
358 if (ctdb_validate_vnn(ctdb, hdr->srcnode)) {
359 /* as a special case, redirected calls don't increment the rx_cnt */
360 if (hdr->operation != CTDB_REQ_CALL ||
361 ((struct ctdb_req_call *)hdr)->hopcount == 0) {
362 ctdb->nodes[hdr->srcnode]->rx_cnt++;
366 ctdb_input_pkt(ctdb, hdr);
371 called by the transport layer when a node is dead
373 void ctdb_node_dead(struct ctdb_node *node)
375 if (!(node->flags & NODE_FLAGS_CONNECTED)) {
376 DEBUG(1,("%s: node %s is already marked disconnected: %u connected\n",
377 node->ctdb->name, node->name,
378 node->ctdb->num_connected));
381 node->ctdb->num_connected--;
382 node->flags &= ~NODE_FLAGS_CONNECTED;
384 node->dead_count = 0;
385 DEBUG(1,("%s: node %s is dead: %u connected\n",
386 node->ctdb->name, node->name, node->ctdb->num_connected));
387 ctdb_daemon_cancel_controls(node->ctdb, node);
391 called by the transport layer when a node is connected
393 void ctdb_node_connected(struct ctdb_node *node)
395 if (node->flags & NODE_FLAGS_CONNECTED) {
396 DEBUG(1,("%s: node %s is already marked connected: %u connected\n",
397 node->ctdb->name, node->name,
398 node->ctdb->num_connected));
401 node->ctdb->num_connected++;
402 node->dead_count = 0;
403 node->flags |= NODE_FLAGS_CONNECTED;
404 DEBUG(1,("%s: connected to %s - %u connected\n",
405 node->ctdb->name, node->name, node->ctdb->num_connected));
409 wait for all nodes to be connected
411 void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
413 int expected = ctdb->num_nodes - 1;
414 if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
417 while (ctdb->num_connected != expected) {
418 DEBUG(3,("ctdb_connect_wait: waiting for %u nodes (have %u)\n",
419 expected, ctdb->num_connected));
420 event_loop_once(ctdb->ev);
422 DEBUG(3,("ctdb_connect_wait: got all %u nodes\n", expected));
426 struct ctdb_context *ctdb;
427 struct ctdb_req_header *hdr;
432 trigered when a deferred packet is due
434 static void queue_next_trigger(struct event_context *ev, struct timed_event *te,
435 struct timeval t, void *private_data)
437 struct queue_next *q = talloc_get_type(private_data, struct queue_next);
438 ctdb_input_pkt(q->ctdb, q->hdr);
443 defer a packet, so it is processed on the next event loop
444 this is used for sending packets to ourselves
446 static void ctdb_defer_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
448 struct queue_next *q;
449 q = talloc(ctdb, struct queue_next);
451 DEBUG(0,(__location__ " Failed to allocate deferred packet\n"));
455 q->hdr = talloc_memdup(ctdb, hdr, hdr->length);
456 if (q->hdr == NULL) {
457 DEBUG(0,("Error copying deferred packet to self\n"));
461 /* use this to put packets directly into our recv function */
462 ctdb_input_pkt(q->ctdb, q->hdr);
464 event_add_timed(ctdb->ev, q, timeval_zero(), queue_next_trigger, q);
470 broadcast a packet to all nodes
472 static void ctdb_broadcast_packet_all(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
475 for (i=0;i<ctdb->num_nodes;i++) {
476 hdr->destnode = ctdb->nodes[i]->vnn;
477 ctdb_queue_packet(ctdb, hdr);
482 broadcast a packet to all nodes in the current vnnmap
484 static void ctdb_broadcast_packet_vnnmap(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
487 for (i=0;i<ctdb->vnn_map->size;i++) {
488 hdr->destnode = ctdb->vnn_map->map[i];
489 ctdb_queue_packet(ctdb, hdr);
494 queue a packet or die
496 void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
498 struct ctdb_node *node;
500 switch (hdr->destnode) {
501 case CTDB_BROADCAST_ALL:
502 ctdb_broadcast_packet_all(ctdb, hdr);
504 case CTDB_BROADCAST_VNNMAP:
505 ctdb_broadcast_packet_vnnmap(ctdb, hdr);
509 ctdb->statistics.node_packets_sent++;
511 if (!ctdb_validate_vnn(ctdb, hdr->destnode)) {
512 DEBUG(0,(__location__ " cant send to node %u that does not exist\n",
517 node = ctdb->nodes[hdr->destnode];
519 if (hdr->destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
520 ctdb_defer_packet(ctdb, hdr);
523 if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
524 ctdb_fatal(ctdb, "Unable to queue packet\n");
530 static const struct ctdb_upcalls ctdb_upcalls = {
531 .recv_pkt = ctdb_recv_pkt,
532 .node_dead = ctdb_node_dead,
533 .node_connected = ctdb_node_connected
537 initialise the ctdb daemon.
539 NOTE: In current code the daemon does not fork. This is for testing purposes only
540 and to simplify the code.
542 struct ctdb_context *ctdb_init(struct event_context *ev)
544 struct ctdb_context *ctdb;
546 ctdb = talloc_zero(ev, struct ctdb_context);
548 ctdb->recovery_mode = CTDB_RECOVERY_NORMAL;
549 ctdb->recovery_master = (uint32_t)-1;
550 ctdb->upcalls = &ctdb_upcalls;
551 ctdb->idr = idr_init(ctdb);
552 ctdb->max_lacount = CTDB_DEFAULT_MAX_LACOUNT;
553 ctdb->seqnum_frequency = CTDB_DEFAULT_SEQNUM_FREQUENCY;
554 ctdb->recovery_lock_fd = -1;
555 ctdb->monitoring_mode = CTDB_MONITORING_ACTIVE;