4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 if (client->ctdb->statistics.num_clients) {
188 client->ctdb->statistics.num_clients--;
191 if (client->num_persistent_updates != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
195 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
197 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198 "commit active. Forcing recovery.\n"));
199 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200 ctdb_db->transaction_active = false;
208 this is called when the ctdb daemon received a ctdb request message
209 from a local client over the unix domain socket
211 static void daemon_request_message_from_client(struct ctdb_client *client,
212 struct ctdb_req_message *c)
217 /* maybe the message is for another client on this node */
218 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
223 /* its for a remote node */
224 data.dptr = &c->data[0];
225 data.dsize = c->datalen;
226 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
235 struct daemon_call_state {
236 struct ctdb_client *client;
238 struct ctdb_call *call;
239 struct timeval start_time;
243 complete a call from a client
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
247 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
248 struct daemon_call_state);
249 struct ctdb_reply_call *r;
252 struct ctdb_client *client = dstate->client;
253 struct ctdb_db_context *ctdb_db = state->ctdb_db;
255 talloc_steal(client, dstate);
256 talloc_steal(dstate, dstate->call);
258 res = ctdb_daemon_call_recv(state, dstate->call);
260 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261 if (client->ctdb->statistics.pending_calls > 0) {
262 client->ctdb->statistics.pending_calls--;
264 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
268 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
270 length, struct ctdb_reply_call);
272 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273 if (client->ctdb->statistics.pending_calls > 0) {
274 client->ctdb->statistics.pending_calls--;
276 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279 r->hdr.reqid = dstate->reqid;
280 r->datalen = dstate->call->reply_data.dsize;
281 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
283 res = daemon_queue_send(client, &r->hdr);
285 /* client is dead - return immediately */
289 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
293 if (client->ctdb->statistics.pending_calls > 0) {
294 client->ctdb->statistics.pending_calls--;
298 struct ctdb_daemon_packet_wrap {
299 struct ctdb_context *ctdb;
304 a wrapper to catch disconnected clients
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
308 struct ctdb_client *client;
309 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
310 struct ctdb_daemon_packet_wrap);
312 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
316 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317 if (client == NULL) {
318 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
326 daemon_incoming_packet(client, hdr);
331 this is called when the ctdb daemon received a ctdb request call
332 from a local client over the unix domain socket
334 static void daemon_request_call_from_client(struct ctdb_client *client,
335 struct ctdb_req_call *c)
337 struct ctdb_call_state *state;
338 struct ctdb_db_context *ctdb_db;
339 struct daemon_call_state *dstate;
340 struct ctdb_call *call;
341 struct ctdb_ltdb_header header;
344 struct ctdb_context *ctdb = client->ctdb;
345 struct ctdb_daemon_packet_wrap *w;
347 ctdb->statistics.total_calls++;
348 if (client->ctdb->statistics.pending_calls > 0) {
349 ctdb->statistics.pending_calls++;
352 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
356 if (client->ctdb->statistics.pending_calls > 0) {
357 ctdb->statistics.pending_calls--;
362 if (ctdb_db->unhealthy_reason) {
364 * this is just a warning, as the tdb should be empty anyway,
365 * and only persistent databases can be unhealthy, which doesn't
366 * use this code patch
368 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369 ctdb_db->db_name, ctdb_db->unhealthy_reason));
373 key.dsize = c->keylen;
375 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376 CTDB_NO_MEMORY_VOID(ctdb, w);
379 w->client_id = client->client_id;
381 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
382 (struct ctdb_req_header *)c, &data,
383 daemon_incoming_packet_wrap, w, True);
385 /* will retry later */
386 if (client->ctdb->statistics.pending_calls > 0) {
387 ctdb->statistics.pending_calls--;
395 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396 if (client->ctdb->statistics.pending_calls > 0) {
397 ctdb->statistics.pending_calls--;
402 dstate = talloc(client, struct daemon_call_state);
403 if (dstate == NULL) {
404 ret = ctdb_ltdb_unlock(ctdb_db, key);
406 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
410 if (client->ctdb->statistics.pending_calls > 0) {
411 ctdb->statistics.pending_calls--;
415 dstate->start_time = timeval_current();
416 dstate->client = client;
417 dstate->reqid = c->hdr.reqid;
418 talloc_steal(dstate, data.dptr);
420 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
422 ret = ctdb_ltdb_unlock(ctdb_db, key);
424 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
427 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
428 if (client->ctdb->statistics.pending_calls > 0) {
429 ctdb->statistics.pending_calls--;
431 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
435 call->call_id = c->callid;
437 call->call_data.dptr = c->data + c->keylen;
438 call->call_data.dsize = c->calldatalen;
439 call->flags = c->flags;
441 if (header.dmaster == ctdb->pnn) {
442 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
444 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
447 ret = ctdb_ltdb_unlock(ctdb_db, key);
449 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
453 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
454 if (client->ctdb->statistics.pending_calls > 0) {
455 ctdb->statistics.pending_calls--;
457 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
460 talloc_steal(state, dstate);
461 talloc_steal(client, state);
463 state->async.fn = daemon_call_from_client_callback;
464 state->async.private_data = dstate;
468 static void daemon_request_control_from_client(struct ctdb_client *client,
469 struct ctdb_req_control *c);
471 /* data contains a packet from the client */
472 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
474 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
476 struct ctdb_context *ctdb = client->ctdb;
478 /* place the packet as a child of a tmp_ctx. We then use
479 talloc_free() below to free it. If any of the calls want
480 to keep it, then they will steal it somewhere else, and the
481 talloc_free() will be a no-op */
482 tmp_ctx = talloc_new(client);
483 talloc_steal(tmp_ctx, hdr);
485 if (hdr->ctdb_magic != CTDB_MAGIC) {
486 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
490 if (hdr->ctdb_version != CTDB_VERSION) {
491 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
495 switch (hdr->operation) {
497 ctdb->statistics.client.req_call++;
498 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
501 case CTDB_REQ_MESSAGE:
502 ctdb->statistics.client.req_message++;
503 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
506 case CTDB_REQ_CONTROL:
507 ctdb->statistics.client.req_control++;
508 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
512 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
517 talloc_free(tmp_ctx);
521 called when the daemon gets a incoming packet
523 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
525 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
526 struct ctdb_req_header *hdr;
533 client->ctdb->statistics.client_packets_recv++;
535 if (cnt < sizeof(*hdr)) {
536 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
540 hdr = (struct ctdb_req_header *)data;
541 if (cnt != hdr->length) {
542 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
543 (unsigned)hdr->length, (unsigned)cnt);
547 if (hdr->ctdb_magic != CTDB_MAGIC) {
548 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
552 if (hdr->ctdb_version != CTDB_VERSION) {
553 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
557 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
558 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
559 hdr->srcnode, hdr->destnode));
561 /* it is the responsibility of the incoming packet function to free 'data' */
562 daemon_incoming_packet(client, hdr);
566 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
568 if (client_pid->ctdb->client_pids != NULL) {
569 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
576 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
577 uint16_t flags, void *private_data)
579 struct sockaddr_un addr;
582 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
583 struct ctdb_client *client;
584 struct ctdb_client_pid_list *client_pid;
586 struct peercred_struct cr;
587 socklen_t crl = sizeof(struct peercred_struct);
590 socklen_t crl = sizeof(struct ucred);
593 memset(&addr, 0, sizeof(addr));
595 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
601 set_close_on_exec(fd);
603 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
605 client = talloc_zero(ctdb, struct ctdb_client);
607 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
609 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
611 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
616 client->client_id = ctdb_reqid_new(ctdb, client);
617 client->pid = cr.pid;
619 client_pid = talloc(client, struct ctdb_client_pid_list);
620 if (client_pid == NULL) {
621 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
626 client_pid->ctdb = ctdb;
627 client_pid->pid = cr.pid;
628 client_pid->client = client;
630 DLIST_ADD(ctdb->client_pids, client_pid);
632 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
633 ctdb_daemon_read_cb, client,
634 "client-%u", client->pid);
636 talloc_set_destructor(client, ctdb_client_destructor);
637 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
638 ctdb->statistics.num_clients++;
644 create a unix domain socket and bind it
645 return a file descriptor open on the socket
647 static int ux_socket_bind(struct ctdb_context *ctdb)
649 struct sockaddr_un addr;
651 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
652 if (ctdb->daemon.sd == -1) {
656 set_close_on_exec(ctdb->daemon.sd);
657 set_nonblocking(ctdb->daemon.sd);
659 memset(&addr, 0, sizeof(addr));
660 addr.sun_family = AF_UNIX;
661 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
663 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
664 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
668 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
669 chmod(ctdb->daemon.name, 0700) != 0) {
670 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
675 if (listen(ctdb->daemon.sd, 100) != 0) {
676 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
683 close(ctdb->daemon.sd);
684 ctdb->daemon.sd = -1;
688 static void sig_child_handler(struct event_context *ev,
689 struct signal_event *se, int signum, int count,
693 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
698 pid = waitpid(-1, &status, WNOHANG);
700 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
704 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
709 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
713 ctdb_fatal(ctdb, "Failed to run setup event\n");
716 ctdb_run_notification_script(ctdb, "setup");
718 /* tell all other nodes we've just started up */
719 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
720 0, CTDB_CONTROL_STARTUP, 0,
721 CTDB_CTRL_FLAG_NOREPLY,
722 tdb_null, NULL, NULL);
726 start the protocol going as a daemon
728 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
731 struct fd_event *fde;
732 const char *domain_socket_name;
733 struct signal_event *se;
735 /* get rid of any old sockets */
736 unlink(ctdb->daemon.name);
738 /* create a unix domain stream socket to listen to */
739 res = ux_socket_bind(ctdb);
741 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
745 if (do_fork && fork()) {
749 tdb_reopen_all(False);
754 if (open("/dev/null", O_RDONLY) != 0) {
755 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
759 block_signal(SIGPIPE);
761 ctdbd_pid = getpid();
764 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
766 ctdb_high_priority(ctdb);
768 /* ensure the socket is deleted on exit of the daemon */
769 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
770 if (domain_socket_name == NULL) {
771 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
775 ctdb->ev = event_context_init(NULL);
776 tevent_loop_allow_nesting(ctdb->ev);
777 ret = ctdb_init_tevent_logging(ctdb);
779 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
783 ctdb_set_child_logging(ctdb);
785 /* force initial recovery for election */
786 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
788 if (strcmp(ctdb->transport, "tcp") == 0) {
789 int ctdb_tcp_init(struct ctdb_context *);
790 ret = ctdb_tcp_init(ctdb);
792 #ifdef USE_INFINIBAND
793 if (strcmp(ctdb->transport, "ib") == 0) {
794 int ctdb_ibw_init(struct ctdb_context *);
795 ret = ctdb_ibw_init(ctdb);
799 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
803 if (ctdb->methods == NULL) {
804 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
805 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
808 /* initialise the transport */
809 if (ctdb->methods->initialise(ctdb) != 0) {
810 ctdb_fatal(ctdb, "transport failed to initialise");
813 /* attach to existing databases */
814 if (ctdb_attach_databases(ctdb) != 0) {
815 ctdb_fatal(ctdb, "Failed to attach to databases\n");
818 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
820 ctdb_fatal(ctdb, "Failed to run init event\n");
822 ctdb_run_notification_script(ctdb, "init");
824 /* start frozen, then let the first election sort things out */
825 if (ctdb_blocking_freeze(ctdb)) {
826 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
829 /* now start accepting clients, only can do this once frozen */
830 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
832 ctdb_accept_client, ctdb);
833 tevent_fd_set_auto_close(fde);
835 /* release any IPs we hold from previous runs of the daemon */
836 ctdb_release_all_ips(ctdb);
838 /* start the transport going */
839 ctdb_start_transport(ctdb);
841 /* set up a handler to pick up sigchld */
842 se = event_add_signal(ctdb->ev, ctdb,
847 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
851 ret = ctdb_event_script_callback(ctdb,
853 ctdb_setup_event_callback,
859 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
864 if (start_syslog_daemon(ctdb)) {
865 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
870 ctdb_lockdown_memory(ctdb);
872 /* go into a wait loop to allow other nodes to complete */
873 event_loop_wait(ctdb->ev);
875 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
880 allocate a packet for use in daemon<->daemon communication
882 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
884 enum ctdb_operation operation,
885 size_t length, size_t slength,
889 struct ctdb_req_header *hdr;
891 length = MAX(length, slength);
892 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
894 if (ctdb->methods == NULL) {
895 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
896 operation, (unsigned)length));
900 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
902 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
903 operation, (unsigned)length));
906 talloc_set_name_const(hdr, type);
907 memset(hdr, 0, slength);
908 hdr->length = length;
909 hdr->operation = operation;
910 hdr->ctdb_magic = CTDB_MAGIC;
911 hdr->ctdb_version = CTDB_VERSION;
912 hdr->generation = ctdb->vnn_map->generation;
913 hdr->srcnode = ctdb->pnn;
918 struct daemon_control_state {
919 struct daemon_control_state *next, *prev;
920 struct ctdb_client *client;
921 struct ctdb_req_control *c;
923 struct ctdb_node *node;
927 callback when a control reply comes in
929 static void daemon_control_callback(struct ctdb_context *ctdb,
930 int32_t status, TDB_DATA data,
931 const char *errormsg,
934 struct daemon_control_state *state = talloc_get_type(private_data,
935 struct daemon_control_state);
936 struct ctdb_client *client = state->client;
937 struct ctdb_reply_control *r;
941 /* construct a message to send to the client containing the data */
942 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
944 len += strlen(errormsg);
946 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
947 struct ctdb_reply_control);
948 CTDB_NO_MEMORY_VOID(ctdb, r);
950 r->hdr.reqid = state->reqid;
952 r->datalen = data.dsize;
954 memcpy(&r->data[0], data.dptr, data.dsize);
956 r->errorlen = strlen(errormsg);
957 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
960 ret = daemon_queue_send(client, &r->hdr);
967 fail all pending controls to a disconnected node
969 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
971 struct daemon_control_state *state;
972 while ((state = node->pending_controls)) {
973 DLIST_REMOVE(node->pending_controls, state);
974 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
975 "node is disconnected", state);
980 destroy a daemon_control_state
982 static int daemon_control_destructor(struct daemon_control_state *state)
985 DLIST_REMOVE(state->node->pending_controls, state);
991 this is called when the ctdb daemon received a ctdb request control
992 from a local client over the unix domain socket
994 static void daemon_request_control_from_client(struct ctdb_client *client,
995 struct ctdb_req_control *c)
999 struct daemon_control_state *state;
1000 TALLOC_CTX *tmp_ctx = talloc_new(client);
1002 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1003 c->hdr.destnode = client->ctdb->pnn;
1006 state = talloc(client, struct daemon_control_state);
1007 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1009 state->client = client;
1010 state->c = talloc_steal(state, c);
1011 state->reqid = c->hdr.reqid;
1012 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1013 state->node = client->ctdb->nodes[c->hdr.destnode];
1014 DLIST_ADD(state->node->pending_controls, state);
1019 talloc_set_destructor(state, daemon_control_destructor);
1021 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1022 talloc_steal(tmp_ctx, state);
1025 data.dptr = &c->data[0];
1026 data.dsize = c->datalen;
1027 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1028 c->srvid, c->opcode, client->client_id,
1030 data, daemon_control_callback,
1033 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1037 talloc_free(tmp_ctx);
1041 register a call function
1043 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1044 ctdb_fn_t fn, int id)
1046 struct ctdb_registered_call *call;
1047 struct ctdb_db_context *ctdb_db;
1049 ctdb_db = find_ctdb_db(ctdb, db_id);
1050 if (ctdb_db == NULL) {
1054 call = talloc(ctdb_db, struct ctdb_registered_call);
1058 DLIST_ADD(ctdb_db->calls, call);
1065 this local messaging handler is ugly, but is needed to prevent
1066 recursion in ctdb_send_message() when the destination node is the
1067 same as the source node
1069 struct ctdb_local_message {
1070 struct ctdb_context *ctdb;
1075 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1076 struct timeval t, void *private_data)
1078 struct ctdb_local_message *m = talloc_get_type(private_data,
1079 struct ctdb_local_message);
1082 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1084 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1085 (unsigned long long)m->srvid));
1090 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1092 struct ctdb_local_message *m;
1093 m = talloc(ctdb, struct ctdb_local_message);
1094 CTDB_NO_MEMORY(ctdb, m);
1099 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1100 if (m->data.dptr == NULL) {
1105 /* this needs to be done as an event to prevent recursion */
1106 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1113 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1114 uint64_t srvid, TDB_DATA data)
1116 struct ctdb_req_message *r;
1119 if (ctdb->methods == NULL) {
1120 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1124 /* see if this is a message to ourselves */
1125 if (pnn == ctdb->pnn) {
1126 return ctdb_local_message(ctdb, srvid, data);
1129 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1130 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1131 struct ctdb_req_message);
1132 CTDB_NO_MEMORY(ctdb, r);
1134 r->hdr.destnode = pnn;
1136 r->datalen = data.dsize;
1137 memcpy(&r->data[0], data.dptr, data.dsize);
1139 ctdb_queue_packet(ctdb, &r->hdr);
1147 struct ctdb_client_notify_list {
1148 struct ctdb_client_notify_list *next, *prev;
1149 struct ctdb_context *ctdb;
1155 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1159 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1161 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1163 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1169 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1171 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1172 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1173 struct ctdb_client_notify_list *nl;
1175 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1177 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1178 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1182 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1183 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1188 if (client == NULL) {
1189 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1193 for(nl=client->notify; nl; nl=nl->next) {
1194 if (nl->srvid == notify->srvid) {
1199 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1203 nl = talloc(client, struct ctdb_client_notify_list);
1204 CTDB_NO_MEMORY(ctdb, nl);
1206 nl->srvid = notify->srvid;
1207 nl->data.dsize = notify->len;
1208 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1209 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1210 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1212 DLIST_ADD(client->notify, nl);
1213 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1218 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1220 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1221 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1222 struct ctdb_client_notify_list *nl;
1224 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1226 if (client == NULL) {
1227 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1231 for(nl=client->notify; nl; nl=nl->next) {
1232 if (nl->srvid == notify->srvid) {
1237 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1241 DLIST_REMOVE(client->notify, nl);
1242 talloc_set_destructor(nl, NULL);
1248 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1250 struct ctdb_client_pid_list *client_pid;
1252 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1253 if (client_pid->pid == pid) {
1254 return client_pid->client;
1261 /* This control is used by samba when probing if a process (of a samba daemon)
1263 Samba does this when it needs/wants to check if a subrecord in one of the
1264 databases is still valied, or if it is stale and can be removed.
1265 If the node is in unhealthy or stopped state we just kill of the samba
1266 process holding htis sub-record and return to the calling samba that
1267 the process does not exist.
1268 This allows us to forcefully recall subrecords registered by samba processes
1269 on banned and stopped nodes.
1271 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1273 struct ctdb_client *client;
1275 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1276 client = ctdb_find_client_by_pid(ctdb, pid);
1277 if (client != NULL) {
1278 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1279 talloc_free(client);
1284 return kill(pid, 0);