4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te,
49 struct timeval t, void *private_data)
51 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
53 if (getpid() != ctdbd_pid) {
57 event_add_timed(ctdb->ev, ctdb,
58 timeval_current_ofs(1, 0),
59 ctdb_time_tick, ctdb);
62 /* Used to trigger a dummy event once per second, to make
63 * detection of hangs more reliable.
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
67 event_add_timed(ctdb->ev, ctdb,
68 timeval_current_ofs(1, 0),
69 ctdb_time_tick, ctdb);
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
76 if (ctdb->methods == NULL) {
77 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
81 /* start the transport running */
82 if (ctdb->methods->start(ctdb) != 0) {
83 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84 ctdb_fatal(ctdb, "transport failed to start");
87 /* start the recovery daemon process */
88 if (ctdb_start_recoverd(ctdb) != 0) {
89 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
93 /* Make sure we log something when the daemon terminates */
94 atexit(print_exit_message);
96 /* start monitoring for connected/disconnected nodes */
97 ctdb_start_keepalive(ctdb);
99 /* start monitoring for node health */
100 ctdb_start_monitoring(ctdb);
102 /* start periodic update of tcp tickle lists */
103 ctdb_start_tcp_tickle_update(ctdb);
105 /* start listening for recovery daemon pings */
106 ctdb_control_recd_ping(ctdb);
108 /* start listening to timer ticks */
109 ctdb_start_time_tickd(ctdb);
112 static void block_signal(int signum)
114 struct sigaction act;
116 memset(&act, 0, sizeof(act));
118 act.sa_handler = SIG_IGN;
119 sigemptyset(&act.sa_mask);
120 sigaddset(&act.sa_mask, signum);
121 sigaction(signum, &act, NULL);
126 send a packet to a client
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
130 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131 if (hdr->operation == CTDB_REQ_MESSAGE) {
132 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
142 message handler for when we are in daemon mode. This redirects the message
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
146 TDB_DATA data, void *private_data)
148 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149 struct ctdb_req_message *r;
152 /* construct a message to send to the client containing the data */
153 len = offsetof(struct ctdb_req_message, data) + data.dsize;
154 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
155 len, struct ctdb_req_message);
156 CTDB_NO_MEMORY_VOID(ctdb, r);
158 talloc_set_name_const(r, "req_message packet");
161 r->datalen = data.dsize;
162 memcpy(&r->data[0], data.dptr, data.dsize);
164 daemon_queue_send(client, &r->hdr);
170 this is called when the ctdb daemon received a ctdb request to
171 set the srvid from the client
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
175 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
177 if (client == NULL) {
178 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
181 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
183 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
184 (unsigned long long)srvid));
186 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
187 (unsigned long long)srvid));
194 this is called when the ctdb daemon received a ctdb request to
195 remove a srvid from the client
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
199 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200 if (client == NULL) {
201 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
204 return ctdb_deregister_message_handler(ctdb, srvid, client);
209 destroy a ctdb_client
211 static int ctdb_client_destructor(struct ctdb_client *client)
213 struct ctdb_db_context *ctdb_db;
215 ctdb_takeover_client_destructor_hook(client);
216 ctdb_reqid_remove(client->ctdb, client->client_id);
217 CTDB_DECREMENT_STAT(client->ctdb, num_clients);
219 if (client->num_persistent_updates != 0) {
220 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
221 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
223 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
225 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
226 "commit active. Forcing recovery.\n"));
227 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
228 ctdb_db->transaction_active = false;
236 this is called when the ctdb daemon received a ctdb request message
237 from a local client over the unix domain socket
239 static void daemon_request_message_from_client(struct ctdb_client *client,
240 struct ctdb_req_message *c)
245 /* maybe the message is for another client on this node */
246 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
247 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
251 /* its for a remote node */
252 data.dptr = &c->data[0];
253 data.dsize = c->datalen;
254 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
257 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
263 struct daemon_call_state {
264 struct ctdb_client *client;
266 struct ctdb_call *call;
267 struct timeval start_time;
271 complete a call from a client
273 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
275 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
276 struct daemon_call_state);
277 struct ctdb_reply_call *r;
280 struct ctdb_client *client = dstate->client;
281 struct ctdb_db_context *ctdb_db = state->ctdb_db;
283 talloc_steal(client, dstate);
284 talloc_steal(dstate, dstate->call);
286 res = ctdb_daemon_call_recv(state, dstate->call);
288 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
289 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
291 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
295 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
296 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
297 length, struct ctdb_reply_call);
299 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
300 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
301 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
304 r->hdr.reqid = dstate->reqid;
305 r->datalen = dstate->call->reply_data.dsize;
306 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
308 res = daemon_queue_send(client, &r->hdr);
310 /* client is dead - return immediately */
314 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
316 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
317 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
321 struct ctdb_daemon_packet_wrap {
322 struct ctdb_context *ctdb;
327 a wrapper to catch disconnected clients
329 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
331 struct ctdb_client *client;
332 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
333 struct ctdb_daemon_packet_wrap);
335 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
339 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
340 if (client == NULL) {
341 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
349 daemon_incoming_packet(client, hdr);
354 this is called when the ctdb daemon received a ctdb request call
355 from a local client over the unix domain socket
357 static void daemon_request_call_from_client(struct ctdb_client *client,
358 struct ctdb_req_call *c)
360 struct ctdb_call_state *state;
361 struct ctdb_db_context *ctdb_db;
362 struct daemon_call_state *dstate;
363 struct ctdb_call *call;
364 struct ctdb_ltdb_header header;
367 struct ctdb_context *ctdb = client->ctdb;
368 struct ctdb_daemon_packet_wrap *w;
370 CTDB_INCREMENT_STAT(ctdb, total_calls);
371 CTDB_DECREMENT_STAT(ctdb, pending_calls);
373 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
375 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
377 CTDB_DECREMENT_STAT(ctdb, pending_calls);
381 if (ctdb_db->unhealthy_reason) {
383 * this is just a warning, as the tdb should be empty anyway,
384 * and only persistent databases can be unhealthy, which doesn't
385 * use this code patch
387 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
388 ctdb_db->db_name, ctdb_db->unhealthy_reason));
392 key.dsize = c->keylen;
394 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
395 CTDB_NO_MEMORY_VOID(ctdb, w);
398 w->client_id = client->client_id;
400 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
401 (struct ctdb_req_header *)c, &data,
402 daemon_incoming_packet_wrap, w, True);
404 /* will retry later */
405 CTDB_DECREMENT_STAT(ctdb, pending_calls);
412 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
413 CTDB_DECREMENT_STAT(ctdb, pending_calls);
417 dstate = talloc(client, struct daemon_call_state);
418 if (dstate == NULL) {
419 ret = ctdb_ltdb_unlock(ctdb_db, key);
421 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
424 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
425 CTDB_DECREMENT_STAT(ctdb, pending_calls);
428 dstate->start_time = timeval_current();
429 dstate->client = client;
430 dstate->reqid = c->hdr.reqid;
431 talloc_steal(dstate, data.dptr);
433 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
435 ret = ctdb_ltdb_unlock(ctdb_db, key);
437 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
440 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
441 CTDB_DECREMENT_STAT(ctdb, pending_calls);
442 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
446 call->call_id = c->callid;
448 call->call_data.dptr = c->data + c->keylen;
449 call->call_data.dsize = c->calldatalen;
450 call->flags = c->flags;
452 if (header.dmaster == ctdb->pnn) {
453 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
455 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
458 ret = ctdb_ltdb_unlock(ctdb_db, key);
460 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
464 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
465 CTDB_DECREMENT_STAT(ctdb, pending_calls);
466 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
469 talloc_steal(state, dstate);
470 talloc_steal(client, state);
472 state->async.fn = daemon_call_from_client_callback;
473 state->async.private_data = dstate;
477 static void daemon_request_control_from_client(struct ctdb_client *client,
478 struct ctdb_req_control *c);
480 /* data contains a packet from the client */
481 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
483 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
485 struct ctdb_context *ctdb = client->ctdb;
487 /* place the packet as a child of a tmp_ctx. We then use
488 talloc_free() below to free it. If any of the calls want
489 to keep it, then they will steal it somewhere else, and the
490 talloc_free() will be a no-op */
491 tmp_ctx = talloc_new(client);
492 talloc_steal(tmp_ctx, hdr);
494 if (hdr->ctdb_magic != CTDB_MAGIC) {
495 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
499 if (hdr->ctdb_version != CTDB_VERSION) {
500 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
504 switch (hdr->operation) {
506 CTDB_INCREMENT_STAT(ctdb, client.req_call);
507 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
510 case CTDB_REQ_MESSAGE:
511 CTDB_INCREMENT_STAT(ctdb, client.req_message);
512 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
515 case CTDB_REQ_CONTROL:
516 CTDB_INCREMENT_STAT(ctdb, client.req_control);
517 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
521 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
526 talloc_free(tmp_ctx);
530 called when the daemon gets a incoming packet
532 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
534 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
535 struct ctdb_req_header *hdr;
542 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
544 if (cnt < sizeof(*hdr)) {
545 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
549 hdr = (struct ctdb_req_header *)data;
550 if (cnt != hdr->length) {
551 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
552 (unsigned)hdr->length, (unsigned)cnt);
556 if (hdr->ctdb_magic != CTDB_MAGIC) {
557 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
561 if (hdr->ctdb_version != CTDB_VERSION) {
562 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
566 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
567 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
568 hdr->srcnode, hdr->destnode));
570 /* it is the responsibility of the incoming packet function to free 'data' */
571 daemon_incoming_packet(client, hdr);
575 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
577 if (client_pid->ctdb->client_pids != NULL) {
578 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
585 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
586 uint16_t flags, void *private_data)
588 struct sockaddr_un addr;
591 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
592 struct ctdb_client *client;
593 struct ctdb_client_pid_list *client_pid;
595 struct peercred_struct cr;
596 socklen_t crl = sizeof(struct peercred_struct);
599 socklen_t crl = sizeof(struct ucred);
602 memset(&addr, 0, sizeof(addr));
604 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
610 set_close_on_exec(fd);
612 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
614 client = talloc_zero(ctdb, struct ctdb_client);
616 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
618 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
620 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
625 client->client_id = ctdb_reqid_new(ctdb, client);
626 client->pid = cr.pid;
628 client_pid = talloc(client, struct ctdb_client_pid_list);
629 if (client_pid == NULL) {
630 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
635 client_pid->ctdb = ctdb;
636 client_pid->pid = cr.pid;
637 client_pid->client = client;
639 DLIST_ADD(ctdb->client_pids, client_pid);
641 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
642 ctdb_daemon_read_cb, client,
643 "client-%u", client->pid);
645 talloc_set_destructor(client, ctdb_client_destructor);
646 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
647 CTDB_INCREMENT_STAT(ctdb, num_clients);
653 create a unix domain socket and bind it
654 return a file descriptor open on the socket
656 static int ux_socket_bind(struct ctdb_context *ctdb)
658 struct sockaddr_un addr;
660 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
661 if (ctdb->daemon.sd == -1) {
665 set_close_on_exec(ctdb->daemon.sd);
666 set_nonblocking(ctdb->daemon.sd);
668 memset(&addr, 0, sizeof(addr));
669 addr.sun_family = AF_UNIX;
670 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
672 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
673 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
677 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
678 chmod(ctdb->daemon.name, 0700) != 0) {
679 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
684 if (listen(ctdb->daemon.sd, 100) != 0) {
685 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
692 close(ctdb->daemon.sd);
693 ctdb->daemon.sd = -1;
697 static void sig_child_handler(struct event_context *ev,
698 struct signal_event *se, int signum, int count,
702 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
707 pid = waitpid(-1, &status, WNOHANG);
709 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
713 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
718 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
722 ctdb_fatal(ctdb, "Failed to run setup event\n");
725 ctdb_run_notification_script(ctdb, "setup");
727 /* tell all other nodes we've just started up */
728 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
729 0, CTDB_CONTROL_STARTUP, 0,
730 CTDB_CTRL_FLAG_NOREPLY,
731 tdb_null, NULL, NULL);
735 start the protocol going as a daemon
737 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
740 struct fd_event *fde;
741 const char *domain_socket_name;
742 struct signal_event *se;
744 /* get rid of any old sockets */
745 unlink(ctdb->daemon.name);
747 /* create a unix domain stream socket to listen to */
748 res = ux_socket_bind(ctdb);
750 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
754 if (do_fork && fork()) {
758 tdb_reopen_all(False);
763 if (open("/dev/null", O_RDONLY) != 0) {
764 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
768 block_signal(SIGPIPE);
770 ctdbd_pid = getpid();
773 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
775 if (ctdb->do_setsched) {
776 /* try to set us up as realtime */
777 ctdb_set_scheduler(ctdb);
780 /* ensure the socket is deleted on exit of the daemon */
781 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
782 if (domain_socket_name == NULL) {
783 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
787 ctdb->ev = event_context_init(NULL);
788 tevent_loop_allow_nesting(ctdb->ev);
789 ret = ctdb_init_tevent_logging(ctdb);
791 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
795 ctdb_set_child_logging(ctdb);
797 /* initialize statistics collection */
798 ctdb_statistics_init(ctdb);
800 /* force initial recovery for election */
801 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
803 if (strcmp(ctdb->transport, "tcp") == 0) {
804 int ctdb_tcp_init(struct ctdb_context *);
805 ret = ctdb_tcp_init(ctdb);
807 #ifdef USE_INFINIBAND
808 if (strcmp(ctdb->transport, "ib") == 0) {
809 int ctdb_ibw_init(struct ctdb_context *);
810 ret = ctdb_ibw_init(ctdb);
814 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
818 if (ctdb->methods == NULL) {
819 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
820 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
823 /* initialise the transport */
824 if (ctdb->methods->initialise(ctdb) != 0) {
825 ctdb_fatal(ctdb, "transport failed to initialise");
827 if (public_address_list) {
828 ret = ctdb_set_public_addresses(ctdb, public_address_list);
830 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
836 /* attach to existing databases */
837 if (ctdb_attach_databases(ctdb) != 0) {
838 ctdb_fatal(ctdb, "Failed to attach to databases\n");
841 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
843 ctdb_fatal(ctdb, "Failed to run init event\n");
845 ctdb_run_notification_script(ctdb, "init");
847 /* start frozen, then let the first election sort things out */
848 if (ctdb_blocking_freeze(ctdb)) {
849 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
852 /* now start accepting clients, only can do this once frozen */
853 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
855 ctdb_accept_client, ctdb);
856 tevent_fd_set_auto_close(fde);
858 /* release any IPs we hold from previous runs of the daemon */
859 if (ctdb->tunable.disable_ip_failover == 0) {
860 ctdb_release_all_ips(ctdb);
863 /* start the transport going */
864 ctdb_start_transport(ctdb);
866 /* set up a handler to pick up sigchld */
867 se = event_add_signal(ctdb->ev, ctdb,
872 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
876 ret = ctdb_event_script_callback(ctdb,
878 ctdb_setup_event_callback,
884 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
889 if (start_syslog_daemon(ctdb)) {
890 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
895 ctdb_lockdown_memory(ctdb);
897 /* go into a wait loop to allow other nodes to complete */
898 event_loop_wait(ctdb->ev);
900 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
905 allocate a packet for use in daemon<->daemon communication
907 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
909 enum ctdb_operation operation,
910 size_t length, size_t slength,
914 struct ctdb_req_header *hdr;
916 length = MAX(length, slength);
917 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
919 if (ctdb->methods == NULL) {
920 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
921 operation, (unsigned)length));
925 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
927 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
928 operation, (unsigned)length));
931 talloc_set_name_const(hdr, type);
932 memset(hdr, 0, slength);
933 hdr->length = length;
934 hdr->operation = operation;
935 hdr->ctdb_magic = CTDB_MAGIC;
936 hdr->ctdb_version = CTDB_VERSION;
937 hdr->generation = ctdb->vnn_map->generation;
938 hdr->srcnode = ctdb->pnn;
943 struct daemon_control_state {
944 struct daemon_control_state *next, *prev;
945 struct ctdb_client *client;
946 struct ctdb_req_control *c;
948 struct ctdb_node *node;
952 callback when a control reply comes in
954 static void daemon_control_callback(struct ctdb_context *ctdb,
955 int32_t status, TDB_DATA data,
956 const char *errormsg,
959 struct daemon_control_state *state = talloc_get_type(private_data,
960 struct daemon_control_state);
961 struct ctdb_client *client = state->client;
962 struct ctdb_reply_control *r;
966 /* construct a message to send to the client containing the data */
967 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
969 len += strlen(errormsg);
971 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
972 struct ctdb_reply_control);
973 CTDB_NO_MEMORY_VOID(ctdb, r);
975 r->hdr.reqid = state->reqid;
977 r->datalen = data.dsize;
979 memcpy(&r->data[0], data.dptr, data.dsize);
981 r->errorlen = strlen(errormsg);
982 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
985 ret = daemon_queue_send(client, &r->hdr);
992 fail all pending controls to a disconnected node
994 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
996 struct daemon_control_state *state;
997 while ((state = node->pending_controls)) {
998 DLIST_REMOVE(node->pending_controls, state);
999 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1000 "node is disconnected", state);
1005 destroy a daemon_control_state
1007 static int daemon_control_destructor(struct daemon_control_state *state)
1010 DLIST_REMOVE(state->node->pending_controls, state);
1016 this is called when the ctdb daemon received a ctdb request control
1017 from a local client over the unix domain socket
1019 static void daemon_request_control_from_client(struct ctdb_client *client,
1020 struct ctdb_req_control *c)
1024 struct daemon_control_state *state;
1025 TALLOC_CTX *tmp_ctx = talloc_new(client);
1027 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1028 c->hdr.destnode = client->ctdb->pnn;
1031 state = talloc(client, struct daemon_control_state);
1032 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1034 state->client = client;
1035 state->c = talloc_steal(state, c);
1036 state->reqid = c->hdr.reqid;
1037 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1038 state->node = client->ctdb->nodes[c->hdr.destnode];
1039 DLIST_ADD(state->node->pending_controls, state);
1044 talloc_set_destructor(state, daemon_control_destructor);
1046 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1047 talloc_steal(tmp_ctx, state);
1050 data.dptr = &c->data[0];
1051 data.dsize = c->datalen;
1052 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1053 c->srvid, c->opcode, client->client_id,
1055 data, daemon_control_callback,
1058 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1062 talloc_free(tmp_ctx);
1066 register a call function
1068 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1069 ctdb_fn_t fn, int id)
1071 struct ctdb_registered_call *call;
1072 struct ctdb_db_context *ctdb_db;
1074 ctdb_db = find_ctdb_db(ctdb, db_id);
1075 if (ctdb_db == NULL) {
1079 call = talloc(ctdb_db, struct ctdb_registered_call);
1083 DLIST_ADD(ctdb_db->calls, call);
1090 this local messaging handler is ugly, but is needed to prevent
1091 recursion in ctdb_send_message() when the destination node is the
1092 same as the source node
1094 struct ctdb_local_message {
1095 struct ctdb_context *ctdb;
1100 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1101 struct timeval t, void *private_data)
1103 struct ctdb_local_message *m = talloc_get_type(private_data,
1104 struct ctdb_local_message);
1107 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1109 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1110 (unsigned long long)m->srvid));
1115 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1117 struct ctdb_local_message *m;
1118 m = talloc(ctdb, struct ctdb_local_message);
1119 CTDB_NO_MEMORY(ctdb, m);
1124 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1125 if (m->data.dptr == NULL) {
1130 /* this needs to be done as an event to prevent recursion */
1131 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1138 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1139 uint64_t srvid, TDB_DATA data)
1141 struct ctdb_req_message *r;
1144 if (ctdb->methods == NULL) {
1145 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1149 /* see if this is a message to ourselves */
1150 if (pnn == ctdb->pnn) {
1151 return ctdb_local_message(ctdb, srvid, data);
1154 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1155 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1156 struct ctdb_req_message);
1157 CTDB_NO_MEMORY(ctdb, r);
1159 r->hdr.destnode = pnn;
1161 r->datalen = data.dsize;
1162 memcpy(&r->data[0], data.dptr, data.dsize);
1164 ctdb_queue_packet(ctdb, &r->hdr);
1172 struct ctdb_client_notify_list {
1173 struct ctdb_client_notify_list *next, *prev;
1174 struct ctdb_context *ctdb;
1180 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1184 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1186 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1188 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1194 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1196 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1197 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1198 struct ctdb_client_notify_list *nl;
1200 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1202 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1203 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1207 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1208 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1213 if (client == NULL) {
1214 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1218 for(nl=client->notify; nl; nl=nl->next) {
1219 if (nl->srvid == notify->srvid) {
1224 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1228 nl = talloc(client, struct ctdb_client_notify_list);
1229 CTDB_NO_MEMORY(ctdb, nl);
1231 nl->srvid = notify->srvid;
1232 nl->data.dsize = notify->len;
1233 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1234 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1235 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1237 DLIST_ADD(client->notify, nl);
1238 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1243 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1245 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1246 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1247 struct ctdb_client_notify_list *nl;
1249 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1251 if (client == NULL) {
1252 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1256 for(nl=client->notify; nl; nl=nl->next) {
1257 if (nl->srvid == notify->srvid) {
1262 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1266 DLIST_REMOVE(client->notify, nl);
1267 talloc_set_destructor(nl, NULL);
1273 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1275 struct ctdb_client_pid_list *client_pid;
1277 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1278 if (client_pid->pid == pid) {
1279 return client_pid->client;
1286 /* This control is used by samba when probing if a process (of a samba daemon)
1288 Samba does this when it needs/wants to check if a subrecord in one of the
1289 databases is still valied, or if it is stale and can be removed.
1290 If the node is in unhealthy or stopped state we just kill of the samba
1291 process holding htis sub-record and return to the calling samba that
1292 the process does not exist.
1293 This allows us to forcefully recall subrecords registered by samba processes
1294 on banned and stopped nodes.
1296 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1298 struct ctdb_client *client;
1300 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1301 client = ctdb_find_client_by_pid(ctdb, pid);
1302 if (client != NULL) {
1303 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1304 talloc_free(client);
1309 return kill(pid, 0);