*/
struct ctdb_methods {
int (*initialise)(struct ctdb_context *); /* initialise transport structures */
- int (*start)(struct ctdb_context *); /* start protocol processing */
+ int (*start)(struct ctdb_context *); /* start the transport */
int (*add_node)(struct ctdb_node *); /* setup a new node */
+ int (*connect_node)(struct ctdb_node *); /* connect to node */
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
void *(*allocate_pkt)(TALLOC_CTX *mem_ctx, size_t );
void (*shutdown)(struct ctdb_context *); /* shutdown transport */
ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
- int i;
-
+ int i, num_nodes;
struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
+ TALLOC_CTX *tmp_ctx;
+ struct ctdb_node **nodes;
+
+ tmp_ctx = talloc_new(ctdb);
+
+ /* steal the old nodes file for a while */
+ talloc_steal(tmp_ctx, ctdb->nodes);
+ nodes = ctdb->nodes;
+ ctdb->nodes = NULL;
+ num_nodes = ctdb->num_nodes;
+ ctdb->num_nodes = 0;
+ /* load the new nodes file */
ctdb_load_nodes_file(ctdb);
for (i=0; i<ctdb->num_nodes; i++) {
+ /* keep any identical pre-existing nodes and connections */
+ if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
+ talloc_free(ctdb->nodes[i]);
+ ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
+ continue;
+ }
+
+ /* any new or different nodes must be added */
if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
ctdb_fatal(ctdb, "failed to add node. shutting down\n");
}
+ if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
+ DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
+ ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
+ }
}
- ctdb->methods->start(ctdb);
+ talloc_free(tmp_ctx);
return;
}
/*
called when a complete packet has come in - should not happen on this socket
+ unless the other side closes the connection with RST or FIN
*/
void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data)
{
}
ctdb_tcp_stop_connection(node);
- tnode->connect_te = event_add_timed(node->ctdb->ev, tnode, timeval_zero(),
+ tnode->connect_te = event_add_timed(node->ctdb->ev, tnode,
+ timeval_current_ofs(3, 0),
ctdb_tcp_node_connect, node);
}
return;
}
+DEBUG(DEBUG_ERR,("create socket...\n"));
tnode->fd = socket(sock_out.sa.sa_family, SOCK_STREAM, IPPROTO_TCP);
set_nonblocking(tnode->fd);
set_close_on_exec(tnode->fd);
#include "../include/ctdb_private.h"
#include "ctdb_tcp.h"
+static int tnode_destructor(struct ctdb_tcp_node *tnode)
+{
+ struct ctdb_node *node = talloc_find_parent_bytype(tnode, struct ctdb_node);
+
+ if (tnode->fd != -1) {
+ close(tnode->fd);
+ tnode->fd = -1;
+ }
+
+ return 0;
+}
/*
initialise tcp portion of a ctdb node
tnode->fd = -1;
node->private_data = tnode;
+ talloc_set_destructor(tnode, tnode_destructor);
tnode->out_queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
ctdb_tcp_tnode_cb, node);
/*
start the protocol going
*/
-static int ctdb_tcp_start(struct ctdb_context *ctdb)
+static int ctdb_tcp_connect_node(struct ctdb_node *node)
{
- int i;
+ struct ctdb_context *ctdb = node->ctdb;
+ struct ctdb_tcp_node *tnode = talloc_get_type(
+ node->private_data, struct ctdb_tcp_node);
- /* startup connections to the other servers - will happen on
+ /* startup connection to the other server - will happen on
next event loop */
- for (i=0;i<ctdb->num_nodes;i++) {
- struct ctdb_node *node = *(ctdb->nodes + i);
- struct ctdb_tcp_node *tnode = talloc_get_type(
- node->private_data, struct ctdb_tcp_node);
- if (!ctdb_same_address(&ctdb->address, &node->address)) {
- tnode->connect_te = event_add_timed(ctdb->ev, tnode,
- timeval_zero(),
- ctdb_tcp_node_connect, node);
- }
+ if (!ctdb_same_address(&ctdb->address, &node->address)) {
+ tnode->connect_te = event_add_timed(ctdb->ev, tnode,
+ timeval_zero(),
+ ctdb_tcp_node_connect, node);
}
return 0;
ctdb->private_data = NULL;
}
+/*
+ start the transport
+*/
+static int ctdb_tcp_start(struct ctdb_context *ctdb)
+{
+ int i;
+
+ for (i=0; i<ctdb->num_nodes; i++) {
+ ctdb_tcp_connect_node(ctdb->nodes[i]);
+ }
+
+ return 0;
+}
+
/*
transport packet allocator - allows transport to control memory for packets
.start = ctdb_tcp_start,
.queue_pkt = ctdb_tcp_queue_pkt,
.add_node = ctdb_tcp_add_node,
+ .connect_node = ctdb_tcp_connect_node,
.allocate_pkt = ctdb_tcp_allocate_pkt,
.shutdown = ctdb_tcp_shutdown,
.restart = ctdb_tcp_restart,
DEBUG(DEBUG_ERR, ("ERROR: Failed to reload nodes file on node %u. You MUST fix that node manually!\n", mypnn));
}
+ /* initiate a recovery */
+ control_recover(ctdb, argc, argv);
+
return 0;
}