4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 if (client->ctdb->statistics.num_clients) {
188 client->ctdb->statistics.num_clients--;
191 if (client->num_persistent_updates != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
195 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
197 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198 "commit active. Forcing recovery.\n"));
199 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200 ctdb_db->transaction_active = false;
208 this is called when the ctdb daemon received a ctdb request message
209 from a local client over the unix domain socket
211 static void daemon_request_message_from_client(struct ctdb_client *client,
212 struct ctdb_req_message *c)
217 /* maybe the message is for another client on this node */
218 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
223 /* its for a remote node */
224 data.dptr = &c->data[0];
225 data.dsize = c->datalen;
226 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
235 struct daemon_call_state {
236 struct ctdb_client *client;
238 struct ctdb_call *call;
239 struct timeval start_time;
243 complete a call from a client
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
247 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
248 struct daemon_call_state);
249 struct ctdb_reply_call *r;
252 struct ctdb_client *client = dstate->client;
253 struct ctdb_db_context *ctdb_db = state->ctdb_db;
255 talloc_steal(client, dstate);
256 talloc_steal(dstate, dstate->call);
258 res = ctdb_daemon_call_recv(state, dstate->call);
260 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261 if (client->ctdb->statistics.pending_calls > 0) {
262 client->ctdb->statistics.pending_calls--;
264 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
268 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
270 length, struct ctdb_reply_call);
272 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273 if (client->ctdb->statistics.pending_calls > 0) {
274 client->ctdb->statistics.pending_calls--;
276 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279 r->hdr.reqid = dstate->reqid;
280 r->datalen = dstate->call->reply_data.dsize;
281 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
283 res = daemon_queue_send(client, &r->hdr);
285 /* client is dead - return immediately */
289 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
293 if (client->ctdb->statistics.pending_calls > 0) {
294 client->ctdb->statistics.pending_calls--;
298 struct ctdb_daemon_packet_wrap {
299 struct ctdb_context *ctdb;
304 a wrapper to catch disconnected clients
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
308 struct ctdb_client *client;
309 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
310 struct ctdb_daemon_packet_wrap);
312 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
316 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317 if (client == NULL) {
318 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
326 daemon_incoming_packet(client, hdr);
331 this is called when the ctdb daemon received a ctdb request call
332 from a local client over the unix domain socket
334 static void daemon_request_call_from_client(struct ctdb_client *client,
335 struct ctdb_req_call *c)
337 struct ctdb_call_state *state;
338 struct ctdb_db_context *ctdb_db;
339 struct daemon_call_state *dstate;
340 struct ctdb_call *call;
341 struct ctdb_ltdb_header header;
344 struct ctdb_context *ctdb = client->ctdb;
345 struct ctdb_daemon_packet_wrap *w;
347 ctdb->statistics.total_calls++;
348 if (client->ctdb->statistics.pending_calls > 0) {
349 ctdb->statistics.pending_calls++;
352 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
356 if (client->ctdb->statistics.pending_calls > 0) {
357 ctdb->statistics.pending_calls--;
362 if (ctdb_db->unhealthy_reason) {
364 * this is just a warning, as the tdb should be empty anyway,
365 * and only persistent databases can be unhealthy, which doesn't
366 * use this code patch
368 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369 ctdb_db->db_name, ctdb_db->unhealthy_reason));
373 key.dsize = c->keylen;
375 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376 CTDB_NO_MEMORY_VOID(ctdb, w);
379 w->client_id = client->client_id;
381 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
382 (struct ctdb_req_header *)c, &data,
383 daemon_incoming_packet_wrap, w, True);
385 /* will retry later */
386 if (client->ctdb->statistics.pending_calls > 0) {
387 ctdb->statistics.pending_calls--;
395 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396 if (client->ctdb->statistics.pending_calls > 0) {
397 ctdb->statistics.pending_calls--;
402 dstate = talloc(client, struct daemon_call_state);
403 if (dstate == NULL) {
404 ctdb_ltdb_unlock(ctdb_db, key);
405 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
406 if (client->ctdb->statistics.pending_calls > 0) {
407 ctdb->statistics.pending_calls--;
411 dstate->start_time = timeval_current();
412 dstate->client = client;
413 dstate->reqid = c->hdr.reqid;
414 talloc_steal(dstate, data.dptr);
416 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
418 ctdb_ltdb_unlock(ctdb_db, key);
419 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
420 if (client->ctdb->statistics.pending_calls > 0) {
421 ctdb->statistics.pending_calls--;
423 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
427 call->call_id = c->callid;
429 call->call_data.dptr = c->data + c->keylen;
430 call->call_data.dsize = c->calldatalen;
431 call->flags = c->flags;
433 if (header.dmaster == ctdb->pnn) {
434 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
436 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
439 ctdb_ltdb_unlock(ctdb_db, key);
442 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
443 if (client->ctdb->statistics.pending_calls > 0) {
444 ctdb->statistics.pending_calls--;
446 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
449 talloc_steal(state, dstate);
450 talloc_steal(client, state);
452 state->async.fn = daemon_call_from_client_callback;
453 state->async.private_data = dstate;
457 static void daemon_request_control_from_client(struct ctdb_client *client,
458 struct ctdb_req_control *c);
460 /* data contains a packet from the client */
461 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
463 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
465 struct ctdb_context *ctdb = client->ctdb;
467 /* place the packet as a child of a tmp_ctx. We then use
468 talloc_free() below to free it. If any of the calls want
469 to keep it, then they will steal it somewhere else, and the
470 talloc_free() will be a no-op */
471 tmp_ctx = talloc_new(client);
472 talloc_steal(tmp_ctx, hdr);
474 if (hdr->ctdb_magic != CTDB_MAGIC) {
475 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
479 if (hdr->ctdb_version != CTDB_VERSION) {
480 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
484 switch (hdr->operation) {
486 ctdb->statistics.client.req_call++;
487 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
490 case CTDB_REQ_MESSAGE:
491 ctdb->statistics.client.req_message++;
492 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
495 case CTDB_REQ_CONTROL:
496 ctdb->statistics.client.req_control++;
497 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
501 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
506 talloc_free(tmp_ctx);
510 called when the daemon gets a incoming packet
512 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
514 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
515 struct ctdb_req_header *hdr;
522 client->ctdb->statistics.client_packets_recv++;
524 if (cnt < sizeof(*hdr)) {
525 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
529 hdr = (struct ctdb_req_header *)data;
530 if (cnt != hdr->length) {
531 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
532 (unsigned)hdr->length, (unsigned)cnt);
536 if (hdr->ctdb_magic != CTDB_MAGIC) {
537 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
541 if (hdr->ctdb_version != CTDB_VERSION) {
542 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
546 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
547 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
548 hdr->srcnode, hdr->destnode));
550 /* it is the responsibility of the incoming packet function to free 'data' */
551 daemon_incoming_packet(client, hdr);
555 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
557 if (client_pid->ctdb->client_pids != NULL) {
558 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
565 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
566 uint16_t flags, void *private_data)
568 struct sockaddr_un addr;
571 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572 struct ctdb_client *client;
573 struct ctdb_client_pid_list *client_pid;
575 struct peercred_struct cr;
576 socklen_t crl = sizeof(struct peercred_struct);
579 socklen_t crl = sizeof(struct ucred);
582 memset(&addr, 0, sizeof(addr));
584 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
590 set_close_on_exec(fd);
592 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
594 client = talloc_zero(ctdb, struct ctdb_client);
596 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
598 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
600 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
605 client->client_id = ctdb_reqid_new(ctdb, client);
606 client->pid = cr.pid;
608 client_pid = talloc(client, struct ctdb_client_pid_list);
609 if (client_pid == NULL) {
610 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
615 client_pid->ctdb = ctdb;
616 client_pid->pid = cr.pid;
617 client_pid->client = client;
619 DLIST_ADD(ctdb->client_pids, client_pid);
621 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
622 ctdb_daemon_read_cb, client);
624 talloc_set_destructor(client, ctdb_client_destructor);
625 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
626 ctdb->statistics.num_clients++;
632 create a unix domain socket and bind it
633 return a file descriptor open on the socket
635 static int ux_socket_bind(struct ctdb_context *ctdb)
637 struct sockaddr_un addr;
639 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
640 if (ctdb->daemon.sd == -1) {
644 set_close_on_exec(ctdb->daemon.sd);
645 set_nonblocking(ctdb->daemon.sd);
647 memset(&addr, 0, sizeof(addr));
648 addr.sun_family = AF_UNIX;
649 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
651 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
652 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
656 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
657 chmod(ctdb->daemon.name, 0700) != 0) {
658 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
663 if (listen(ctdb->daemon.sd, 100) != 0) {
664 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
671 close(ctdb->daemon.sd);
672 ctdb->daemon.sd = -1;
676 static void sig_child_handler(struct event_context *ev,
677 struct signal_event *se, int signum, int count,
681 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
686 pid = waitpid(-1, &status, WNOHANG);
688 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
692 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
697 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
701 ctdb_fatal(ctdb, "Failed to run setup event\n");
704 ctdb_run_notification_script(ctdb, "setup");
706 /* tell all other nodes we've just started up */
707 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
708 0, CTDB_CONTROL_STARTUP, 0,
709 CTDB_CTRL_FLAG_NOREPLY,
710 tdb_null, NULL, NULL);
714 start the protocol going as a daemon
716 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
719 struct fd_event *fde;
720 const char *domain_socket_name;
721 struct signal_event *se;
723 /* get rid of any old sockets */
724 unlink(ctdb->daemon.name);
726 /* create a unix domain stream socket to listen to */
727 res = ux_socket_bind(ctdb);
729 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
733 if (do_fork && fork()) {
737 tdb_reopen_all(False);
742 if (open("/dev/null", O_RDONLY) != 0) {
743 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
747 block_signal(SIGPIPE);
749 ctdbd_pid = getpid();
752 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
754 ctdb_high_priority(ctdb);
756 /* ensure the socket is deleted on exit of the daemon */
757 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
758 if (domain_socket_name == NULL) {
759 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
763 ctdb->ev = event_context_init(NULL);
765 ctdb_set_child_logging(ctdb);
767 /* force initial recovery for election */
768 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
770 if (strcmp(ctdb->transport, "tcp") == 0) {
771 int ctdb_tcp_init(struct ctdb_context *);
772 ret = ctdb_tcp_init(ctdb);
774 #ifdef USE_INFINIBAND
775 if (strcmp(ctdb->transport, "ib") == 0) {
776 int ctdb_ibw_init(struct ctdb_context *);
777 ret = ctdb_ibw_init(ctdb);
781 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
785 if (ctdb->methods == NULL) {
786 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
787 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
790 /* initialise the transport */
791 if (ctdb->methods->initialise(ctdb) != 0) {
792 ctdb_fatal(ctdb, "transport failed to initialise");
795 /* attach to existing databases */
796 if (ctdb_attach_databases(ctdb) != 0) {
797 ctdb_fatal(ctdb, "Failed to attach to databases\n");
800 /* start frozen, then let the first election sort things out */
801 if (ctdb_blocking_freeze(ctdb)) {
802 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
805 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
807 ctdb_fatal(ctdb, "Failed to run init event\n");
809 ctdb_run_notification_script(ctdb, "init");
811 /* now start accepting clients, only can do this once frozen */
812 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
813 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
814 ctdb_accept_client, ctdb);
816 /* release any IPs we hold from previous runs of the daemon */
817 ctdb_release_all_ips(ctdb);
819 /* start the transport going */
820 ctdb_start_transport(ctdb);
822 /* set up a handler to pick up sigchld */
823 se = event_add_signal(ctdb->ev, ctdb,
828 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
832 ret = ctdb_event_script_callback(ctdb,
834 ctdb_setup_event_callback,
840 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
845 if (start_syslog_daemon(ctdb)) {
846 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
851 ctdb_lockdown_memory(ctdb);
853 /* go into a wait loop to allow other nodes to complete */
854 event_loop_wait(ctdb->ev);
856 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
861 allocate a packet for use in daemon<->daemon communication
863 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
865 enum ctdb_operation operation,
866 size_t length, size_t slength,
870 struct ctdb_req_header *hdr;
872 length = MAX(length, slength);
873 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
875 if (ctdb->methods == NULL) {
876 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
877 operation, (unsigned)length));
881 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
883 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
884 operation, (unsigned)length));
887 talloc_set_name_const(hdr, type);
888 memset(hdr, 0, slength);
889 hdr->length = length;
890 hdr->operation = operation;
891 hdr->ctdb_magic = CTDB_MAGIC;
892 hdr->ctdb_version = CTDB_VERSION;
893 hdr->generation = ctdb->vnn_map->generation;
894 hdr->srcnode = ctdb->pnn;
899 struct daemon_control_state {
900 struct daemon_control_state *next, *prev;
901 struct ctdb_client *client;
902 struct ctdb_req_control *c;
904 struct ctdb_node *node;
908 callback when a control reply comes in
910 static void daemon_control_callback(struct ctdb_context *ctdb,
911 int32_t status, TDB_DATA data,
912 const char *errormsg,
915 struct daemon_control_state *state = talloc_get_type(private_data,
916 struct daemon_control_state);
917 struct ctdb_client *client = state->client;
918 struct ctdb_reply_control *r;
922 /* construct a message to send to the client containing the data */
923 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
925 len += strlen(errormsg);
927 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
928 struct ctdb_reply_control);
929 CTDB_NO_MEMORY_VOID(ctdb, r);
931 r->hdr.reqid = state->reqid;
933 r->datalen = data.dsize;
935 memcpy(&r->data[0], data.dptr, data.dsize);
937 r->errorlen = strlen(errormsg);
938 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
941 ret = daemon_queue_send(client, &r->hdr);
948 fail all pending controls to a disconnected node
950 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
952 struct daemon_control_state *state;
953 while ((state = node->pending_controls)) {
954 DLIST_REMOVE(node->pending_controls, state);
955 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
956 "node is disconnected", state);
961 destroy a daemon_control_state
963 static int daemon_control_destructor(struct daemon_control_state *state)
966 DLIST_REMOVE(state->node->pending_controls, state);
972 this is called when the ctdb daemon received a ctdb request control
973 from a local client over the unix domain socket
975 static void daemon_request_control_from_client(struct ctdb_client *client,
976 struct ctdb_req_control *c)
980 struct daemon_control_state *state;
981 TALLOC_CTX *tmp_ctx = talloc_new(client);
983 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
984 c->hdr.destnode = client->ctdb->pnn;
987 state = talloc(client, struct daemon_control_state);
988 CTDB_NO_MEMORY_VOID(client->ctdb, state);
990 state->client = client;
991 state->c = talloc_steal(state, c);
992 state->reqid = c->hdr.reqid;
993 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
994 state->node = client->ctdb->nodes[c->hdr.destnode];
995 DLIST_ADD(state->node->pending_controls, state);
1000 talloc_set_destructor(state, daemon_control_destructor);
1002 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1003 talloc_steal(tmp_ctx, state);
1006 data.dptr = &c->data[0];
1007 data.dsize = c->datalen;
1008 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1009 c->srvid, c->opcode, client->client_id,
1011 data, daemon_control_callback,
1014 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1018 talloc_free(tmp_ctx);
1022 register a call function
1024 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1025 ctdb_fn_t fn, int id)
1027 struct ctdb_registered_call *call;
1028 struct ctdb_db_context *ctdb_db;
1030 ctdb_db = find_ctdb_db(ctdb, db_id);
1031 if (ctdb_db == NULL) {
1035 call = talloc(ctdb_db, struct ctdb_registered_call);
1039 DLIST_ADD(ctdb_db->calls, call);
1046 this local messaging handler is ugly, but is needed to prevent
1047 recursion in ctdb_send_message() when the destination node is the
1048 same as the source node
1050 struct ctdb_local_message {
1051 struct ctdb_context *ctdb;
1056 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1057 struct timeval t, void *private_data)
1059 struct ctdb_local_message *m = talloc_get_type(private_data,
1060 struct ctdb_local_message);
1063 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1065 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1066 (unsigned long long)m->srvid));
1071 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1073 struct ctdb_local_message *m;
1074 m = talloc(ctdb, struct ctdb_local_message);
1075 CTDB_NO_MEMORY(ctdb, m);
1080 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1081 if (m->data.dptr == NULL) {
1086 /* this needs to be done as an event to prevent recursion */
1087 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1094 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1095 uint64_t srvid, TDB_DATA data)
1097 struct ctdb_req_message *r;
1100 if (ctdb->methods == NULL) {
1101 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1105 /* see if this is a message to ourselves */
1106 if (pnn == ctdb->pnn) {
1107 return ctdb_local_message(ctdb, srvid, data);
1110 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1111 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1112 struct ctdb_req_message);
1113 CTDB_NO_MEMORY(ctdb, r);
1115 r->hdr.destnode = pnn;
1117 r->datalen = data.dsize;
1118 memcpy(&r->data[0], data.dptr, data.dsize);
1120 ctdb_queue_packet(ctdb, &r->hdr);
1128 struct ctdb_client_notify_list {
1129 struct ctdb_client_notify_list *next, *prev;
1130 struct ctdb_context *ctdb;
1136 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1140 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1142 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1144 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1150 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1152 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1153 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1154 struct ctdb_client_notify_list *nl;
1156 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1158 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1159 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1163 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1164 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1169 if (client == NULL) {
1170 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1174 for(nl=client->notify; nl; nl=nl->next) {
1175 if (nl->srvid == notify->srvid) {
1180 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1184 nl = talloc(client, struct ctdb_client_notify_list);
1185 CTDB_NO_MEMORY(ctdb, nl);
1187 nl->srvid = notify->srvid;
1188 nl->data.dsize = notify->len;
1189 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1190 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1191 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1193 DLIST_ADD(client->notify, nl);
1194 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1199 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1201 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1202 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1203 struct ctdb_client_notify_list *nl;
1205 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1207 if (client == NULL) {
1208 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1212 for(nl=client->notify; nl; nl=nl->next) {
1213 if (nl->srvid == notify->srvid) {
1218 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1222 DLIST_REMOVE(client->notify, nl);
1223 talloc_set_destructor(nl, NULL);
1229 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1231 struct ctdb_client_pid_list *client_pid;
1233 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1234 if (client_pid->pid == pid) {
1235 return client_pid->client;
1242 /* This control is used by samba when probing if a process (of a samba daemon)
1244 Samba does this when it needs/wants to check if a subrecord in one of the
1245 databases is still valied, or if it is stale and can be removed.
1246 If the node is in unhealthy or stopped state we just kill of the samba
1247 process holding htis sub-record and return to the calling samba that
1248 the process does not exist.
1249 This allows us to forcefully recall subrecords registered by samba processes
1250 on banned and stopped nodes.
1252 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1254 struct ctdb_client *client;
1256 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1257 client = ctdb_find_client_by_pid(ctdb, pid);
1258 if (client != NULL) {
1259 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1260 talloc_free(client);
1265 return kill(pid, 0);