4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 CTDB_DECREMENT_STAT(client->ctdb, num_clients);
189 if (client->num_persistent_updates != 0) {
190 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
191 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
193 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
195 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
196 "commit active. Forcing recovery.\n"));
197 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
198 ctdb_db->transaction_active = false;
206 this is called when the ctdb daemon received a ctdb request message
207 from a local client over the unix domain socket
209 static void daemon_request_message_from_client(struct ctdb_client *client,
210 struct ctdb_req_message *c)
215 /* maybe the message is for another client on this node */
216 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
217 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
221 /* its for a remote node */
222 data.dptr = &c->data[0];
223 data.dsize = c->datalen;
224 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
233 struct daemon_call_state {
234 struct ctdb_client *client;
236 struct ctdb_call *call;
237 struct timeval start_time;
241 complete a call from a client
243 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
245 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
246 struct daemon_call_state);
247 struct ctdb_reply_call *r;
250 struct ctdb_client *client = dstate->client;
251 struct ctdb_db_context *ctdb_db = state->ctdb_db;
253 talloc_steal(client, dstate);
254 talloc_steal(dstate, dstate->call);
256 res = ctdb_daemon_call_recv(state, dstate->call);
258 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
259 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
261 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
265 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
266 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
267 length, struct ctdb_reply_call);
269 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
270 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
271 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
274 r->hdr.reqid = dstate->reqid;
275 r->datalen = dstate->call->reply_data.dsize;
276 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
278 res = daemon_queue_send(client, &r->hdr);
280 /* client is dead - return immediately */
284 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
286 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
287 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
291 struct ctdb_daemon_packet_wrap {
292 struct ctdb_context *ctdb;
297 a wrapper to catch disconnected clients
299 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
301 struct ctdb_client *client;
302 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
303 struct ctdb_daemon_packet_wrap);
305 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
309 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
310 if (client == NULL) {
311 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319 daemon_incoming_packet(client, hdr);
324 this is called when the ctdb daemon received a ctdb request call
325 from a local client over the unix domain socket
327 static void daemon_request_call_from_client(struct ctdb_client *client,
328 struct ctdb_req_call *c)
330 struct ctdb_call_state *state;
331 struct ctdb_db_context *ctdb_db;
332 struct daemon_call_state *dstate;
333 struct ctdb_call *call;
334 struct ctdb_ltdb_header header;
337 struct ctdb_context *ctdb = client->ctdb;
338 struct ctdb_daemon_packet_wrap *w;
340 CTDB_INCREMENT_STAT(ctdb, total_calls);
341 CTDB_DECREMENT_STAT(ctdb, pending_calls);
343 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
345 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
347 CTDB_DECREMENT_STAT(ctdb, pending_calls);
351 if (ctdb_db->unhealthy_reason) {
353 * this is just a warning, as the tdb should be empty anyway,
354 * and only persistent databases can be unhealthy, which doesn't
355 * use this code patch
357 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
358 ctdb_db->db_name, ctdb_db->unhealthy_reason));
362 key.dsize = c->keylen;
364 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
365 CTDB_NO_MEMORY_VOID(ctdb, w);
368 w->client_id = client->client_id;
370 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
371 (struct ctdb_req_header *)c, &data,
372 daemon_incoming_packet_wrap, w, True);
374 /* will retry later */
375 CTDB_DECREMENT_STAT(ctdb, pending_calls);
382 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
383 CTDB_DECREMENT_STAT(ctdb, pending_calls);
387 dstate = talloc(client, struct daemon_call_state);
388 if (dstate == NULL) {
389 ret = ctdb_ltdb_unlock(ctdb_db, key);
391 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
394 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
395 CTDB_DECREMENT_STAT(ctdb, pending_calls);
398 dstate->start_time = timeval_current();
399 dstate->client = client;
400 dstate->reqid = c->hdr.reqid;
401 talloc_steal(dstate, data.dptr);
403 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
405 ret = ctdb_ltdb_unlock(ctdb_db, key);
407 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
410 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
411 CTDB_DECREMENT_STAT(ctdb, pending_calls);
412 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
416 call->call_id = c->callid;
418 call->call_data.dptr = c->data + c->keylen;
419 call->call_data.dsize = c->calldatalen;
420 call->flags = c->flags;
422 if (header.dmaster == ctdb->pnn) {
423 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
425 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
428 ret = ctdb_ltdb_unlock(ctdb_db, key);
430 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
434 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
435 CTDB_DECREMENT_STAT(ctdb, pending_calls);
436 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
439 talloc_steal(state, dstate);
440 talloc_steal(client, state);
442 state->async.fn = daemon_call_from_client_callback;
443 state->async.private_data = dstate;
447 static void daemon_request_control_from_client(struct ctdb_client *client,
448 struct ctdb_req_control *c);
450 /* data contains a packet from the client */
451 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
453 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
455 struct ctdb_context *ctdb = client->ctdb;
457 /* place the packet as a child of a tmp_ctx. We then use
458 talloc_free() below to free it. If any of the calls want
459 to keep it, then they will steal it somewhere else, and the
460 talloc_free() will be a no-op */
461 tmp_ctx = talloc_new(client);
462 talloc_steal(tmp_ctx, hdr);
464 if (hdr->ctdb_magic != CTDB_MAGIC) {
465 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
469 if (hdr->ctdb_version != CTDB_VERSION) {
470 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
474 switch (hdr->operation) {
476 CTDB_INCREMENT_STAT(ctdb, client.req_call);
477 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
480 case CTDB_REQ_MESSAGE:
481 CTDB_INCREMENT_STAT(ctdb, client.req_message);
482 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
485 case CTDB_REQ_CONTROL:
486 CTDB_INCREMENT_STAT(ctdb, client.req_control);
487 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
491 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
496 talloc_free(tmp_ctx);
500 called when the daemon gets a incoming packet
502 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
504 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
505 struct ctdb_req_header *hdr;
512 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
514 if (cnt < sizeof(*hdr)) {
515 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
519 hdr = (struct ctdb_req_header *)data;
520 if (cnt != hdr->length) {
521 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
522 (unsigned)hdr->length, (unsigned)cnt);
526 if (hdr->ctdb_magic != CTDB_MAGIC) {
527 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
531 if (hdr->ctdb_version != CTDB_VERSION) {
532 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
536 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
537 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
538 hdr->srcnode, hdr->destnode));
540 /* it is the responsibility of the incoming packet function to free 'data' */
541 daemon_incoming_packet(client, hdr);
545 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
547 if (client_pid->ctdb->client_pids != NULL) {
548 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
555 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
556 uint16_t flags, void *private_data)
558 struct sockaddr_un addr;
561 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
562 struct ctdb_client *client;
563 struct ctdb_client_pid_list *client_pid;
565 struct peercred_struct cr;
566 socklen_t crl = sizeof(struct peercred_struct);
569 socklen_t crl = sizeof(struct ucred);
572 memset(&addr, 0, sizeof(addr));
574 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
580 set_close_on_exec(fd);
582 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
584 client = talloc_zero(ctdb, struct ctdb_client);
586 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
588 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
590 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
595 client->client_id = ctdb_reqid_new(ctdb, client);
596 client->pid = cr.pid;
598 client_pid = talloc(client, struct ctdb_client_pid_list);
599 if (client_pid == NULL) {
600 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
605 client_pid->ctdb = ctdb;
606 client_pid->pid = cr.pid;
607 client_pid->client = client;
609 DLIST_ADD(ctdb->client_pids, client_pid);
611 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
612 ctdb_daemon_read_cb, client,
613 "client-%u", client->pid);
615 talloc_set_destructor(client, ctdb_client_destructor);
616 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
617 CTDB_INCREMENT_STAT(ctdb, num_clients);
623 create a unix domain socket and bind it
624 return a file descriptor open on the socket
626 static int ux_socket_bind(struct ctdb_context *ctdb)
628 struct sockaddr_un addr;
630 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
631 if (ctdb->daemon.sd == -1) {
635 set_close_on_exec(ctdb->daemon.sd);
636 set_nonblocking(ctdb->daemon.sd);
638 memset(&addr, 0, sizeof(addr));
639 addr.sun_family = AF_UNIX;
640 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
642 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
643 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
647 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
648 chmod(ctdb->daemon.name, 0700) != 0) {
649 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
654 if (listen(ctdb->daemon.sd, 100) != 0) {
655 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
662 close(ctdb->daemon.sd);
663 ctdb->daemon.sd = -1;
667 static void sig_child_handler(struct event_context *ev,
668 struct signal_event *se, int signum, int count,
672 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
677 pid = waitpid(-1, &status, WNOHANG);
679 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
683 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
688 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
692 ctdb_fatal(ctdb, "Failed to run setup event\n");
695 ctdb_run_notification_script(ctdb, "setup");
697 /* tell all other nodes we've just started up */
698 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
699 0, CTDB_CONTROL_STARTUP, 0,
700 CTDB_CTRL_FLAG_NOREPLY,
701 tdb_null, NULL, NULL);
705 start the protocol going as a daemon
707 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
710 struct fd_event *fde;
711 const char *domain_socket_name;
712 struct signal_event *se;
714 /* get rid of any old sockets */
715 unlink(ctdb->daemon.name);
717 /* create a unix domain stream socket to listen to */
718 res = ux_socket_bind(ctdb);
720 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
724 if (do_fork && fork()) {
728 tdb_reopen_all(False);
733 if (open("/dev/null", O_RDONLY) != 0) {
734 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
738 block_signal(SIGPIPE);
740 ctdbd_pid = getpid();
743 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
745 ctdb_high_priority(ctdb);
747 /* ensure the socket is deleted on exit of the daemon */
748 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
749 if (domain_socket_name == NULL) {
750 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
754 ctdb->ev = event_context_init(NULL);
755 tevent_loop_allow_nesting(ctdb->ev);
756 ret = ctdb_init_tevent_logging(ctdb);
758 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
762 ctdb_set_child_logging(ctdb);
764 /* initialize statistics collection */
765 ctdb_statistics_init(ctdb);
767 /* force initial recovery for election */
768 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
770 if (strcmp(ctdb->transport, "tcp") == 0) {
771 int ctdb_tcp_init(struct ctdb_context *);
772 ret = ctdb_tcp_init(ctdb);
774 #ifdef USE_INFINIBAND
775 if (strcmp(ctdb->transport, "ib") == 0) {
776 int ctdb_ibw_init(struct ctdb_context *);
777 ret = ctdb_ibw_init(ctdb);
781 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
785 if (ctdb->methods == NULL) {
786 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
787 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
790 /* initialise the transport */
791 if (ctdb->methods->initialise(ctdb) != 0) {
792 ctdb_fatal(ctdb, "transport failed to initialise");
794 if (public_address_list) {
795 ret = ctdb_set_public_addresses(ctdb, public_address_list);
797 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
803 /* attach to existing databases */
804 if (ctdb_attach_databases(ctdb) != 0) {
805 ctdb_fatal(ctdb, "Failed to attach to databases\n");
808 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
810 ctdb_fatal(ctdb, "Failed to run init event\n");
812 ctdb_run_notification_script(ctdb, "init");
814 /* start frozen, then let the first election sort things out */
815 if (ctdb_blocking_freeze(ctdb)) {
816 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
819 /* now start accepting clients, only can do this once frozen */
820 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
822 ctdb_accept_client, ctdb);
823 tevent_fd_set_auto_close(fde);
825 /* release any IPs we hold from previous runs of the daemon */
826 if (ctdb->tunable.disable_ip_failover == 0) {
827 ctdb_release_all_ips(ctdb);
830 /* start the transport going */
831 ctdb_start_transport(ctdb);
833 /* set up a handler to pick up sigchld */
834 se = event_add_signal(ctdb->ev, ctdb,
839 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
843 ret = ctdb_event_script_callback(ctdb,
845 ctdb_setup_event_callback,
851 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
856 if (start_syslog_daemon(ctdb)) {
857 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
862 ctdb_lockdown_memory(ctdb);
864 /* go into a wait loop to allow other nodes to complete */
865 event_loop_wait(ctdb->ev);
867 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
872 allocate a packet for use in daemon<->daemon communication
874 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
876 enum ctdb_operation operation,
877 size_t length, size_t slength,
881 struct ctdb_req_header *hdr;
883 length = MAX(length, slength);
884 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
886 if (ctdb->methods == NULL) {
887 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
888 operation, (unsigned)length));
892 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
894 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
895 operation, (unsigned)length));
898 talloc_set_name_const(hdr, type);
899 memset(hdr, 0, slength);
900 hdr->length = length;
901 hdr->operation = operation;
902 hdr->ctdb_magic = CTDB_MAGIC;
903 hdr->ctdb_version = CTDB_VERSION;
904 hdr->generation = ctdb->vnn_map->generation;
905 hdr->srcnode = ctdb->pnn;
910 struct daemon_control_state {
911 struct daemon_control_state *next, *prev;
912 struct ctdb_client *client;
913 struct ctdb_req_control *c;
915 struct ctdb_node *node;
919 callback when a control reply comes in
921 static void daemon_control_callback(struct ctdb_context *ctdb,
922 int32_t status, TDB_DATA data,
923 const char *errormsg,
926 struct daemon_control_state *state = talloc_get_type(private_data,
927 struct daemon_control_state);
928 struct ctdb_client *client = state->client;
929 struct ctdb_reply_control *r;
933 /* construct a message to send to the client containing the data */
934 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
936 len += strlen(errormsg);
938 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
939 struct ctdb_reply_control);
940 CTDB_NO_MEMORY_VOID(ctdb, r);
942 r->hdr.reqid = state->reqid;
944 r->datalen = data.dsize;
946 memcpy(&r->data[0], data.dptr, data.dsize);
948 r->errorlen = strlen(errormsg);
949 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
952 ret = daemon_queue_send(client, &r->hdr);
959 fail all pending controls to a disconnected node
961 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
963 struct daemon_control_state *state;
964 while ((state = node->pending_controls)) {
965 DLIST_REMOVE(node->pending_controls, state);
966 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
967 "node is disconnected", state);
972 destroy a daemon_control_state
974 static int daemon_control_destructor(struct daemon_control_state *state)
977 DLIST_REMOVE(state->node->pending_controls, state);
983 this is called when the ctdb daemon received a ctdb request control
984 from a local client over the unix domain socket
986 static void daemon_request_control_from_client(struct ctdb_client *client,
987 struct ctdb_req_control *c)
991 struct daemon_control_state *state;
992 TALLOC_CTX *tmp_ctx = talloc_new(client);
994 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
995 c->hdr.destnode = client->ctdb->pnn;
998 state = talloc(client, struct daemon_control_state);
999 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1001 state->client = client;
1002 state->c = talloc_steal(state, c);
1003 state->reqid = c->hdr.reqid;
1004 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1005 state->node = client->ctdb->nodes[c->hdr.destnode];
1006 DLIST_ADD(state->node->pending_controls, state);
1011 talloc_set_destructor(state, daemon_control_destructor);
1013 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1014 talloc_steal(tmp_ctx, state);
1017 data.dptr = &c->data[0];
1018 data.dsize = c->datalen;
1019 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1020 c->srvid, c->opcode, client->client_id,
1022 data, daemon_control_callback,
1025 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1029 talloc_free(tmp_ctx);
1033 register a call function
1035 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1036 ctdb_fn_t fn, int id)
1038 struct ctdb_registered_call *call;
1039 struct ctdb_db_context *ctdb_db;
1041 ctdb_db = find_ctdb_db(ctdb, db_id);
1042 if (ctdb_db == NULL) {
1046 call = talloc(ctdb_db, struct ctdb_registered_call);
1050 DLIST_ADD(ctdb_db->calls, call);
1057 this local messaging handler is ugly, but is needed to prevent
1058 recursion in ctdb_send_message() when the destination node is the
1059 same as the source node
1061 struct ctdb_local_message {
1062 struct ctdb_context *ctdb;
1067 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1068 struct timeval t, void *private_data)
1070 struct ctdb_local_message *m = talloc_get_type(private_data,
1071 struct ctdb_local_message);
1074 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1076 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1077 (unsigned long long)m->srvid));
1082 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1084 struct ctdb_local_message *m;
1085 m = talloc(ctdb, struct ctdb_local_message);
1086 CTDB_NO_MEMORY(ctdb, m);
1091 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1092 if (m->data.dptr == NULL) {
1097 /* this needs to be done as an event to prevent recursion */
1098 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1105 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1106 uint64_t srvid, TDB_DATA data)
1108 struct ctdb_req_message *r;
1111 if (ctdb->methods == NULL) {
1112 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1116 /* see if this is a message to ourselves */
1117 if (pnn == ctdb->pnn) {
1118 return ctdb_local_message(ctdb, srvid, data);
1121 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1122 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1123 struct ctdb_req_message);
1124 CTDB_NO_MEMORY(ctdb, r);
1126 r->hdr.destnode = pnn;
1128 r->datalen = data.dsize;
1129 memcpy(&r->data[0], data.dptr, data.dsize);
1131 ctdb_queue_packet(ctdb, &r->hdr);
1139 struct ctdb_client_notify_list {
1140 struct ctdb_client_notify_list *next, *prev;
1141 struct ctdb_context *ctdb;
1147 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1151 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1153 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1155 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1161 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1163 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1164 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1165 struct ctdb_client_notify_list *nl;
1167 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1169 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1170 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1174 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1175 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1180 if (client == NULL) {
1181 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1185 for(nl=client->notify; nl; nl=nl->next) {
1186 if (nl->srvid == notify->srvid) {
1191 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1195 nl = talloc(client, struct ctdb_client_notify_list);
1196 CTDB_NO_MEMORY(ctdb, nl);
1198 nl->srvid = notify->srvid;
1199 nl->data.dsize = notify->len;
1200 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1201 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1202 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1204 DLIST_ADD(client->notify, nl);
1205 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1210 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1212 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1213 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1214 struct ctdb_client_notify_list *nl;
1216 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1218 if (client == NULL) {
1219 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1223 for(nl=client->notify; nl; nl=nl->next) {
1224 if (nl->srvid == notify->srvid) {
1229 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1233 DLIST_REMOVE(client->notify, nl);
1234 talloc_set_destructor(nl, NULL);
1240 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1242 struct ctdb_client_pid_list *client_pid;
1244 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1245 if (client_pid->pid == pid) {
1246 return client_pid->client;
1253 /* This control is used by samba when probing if a process (of a samba daemon)
1255 Samba does this when it needs/wants to check if a subrecord in one of the
1256 databases is still valied, or if it is stale and can be removed.
1257 If the node is in unhealthy or stopped state we just kill of the samba
1258 process holding htis sub-record and return to the calling samba that
1259 the process does not exist.
1260 This allows us to forcefully recall subrecords registered by samba processes
1261 on banned and stopped nodes.
1263 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1265 struct ctdb_client *client;
1267 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1268 client = ctdb_find_client_by_pid(ctdb, pid);
1269 if (client != NULL) {
1270 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1271 talloc_free(client);
1276 return kill(pid, 0);