4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
108 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
112 message handler for when we are in daemon mode. This redirects the message
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
116 TDB_DATA data, void *private_data)
118 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119 struct ctdb_req_message *r;
122 /* construct a message to send to the client containing the data */
123 len = offsetof(struct ctdb_req_message, data) + data.dsize;
124 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
125 len, struct ctdb_req_message);
126 CTDB_NO_MEMORY_VOID(ctdb, r);
128 talloc_set_name_const(r, "req_message packet");
131 r->datalen = data.dsize;
132 memcpy(&r->data[0], data.dptr, data.dsize);
134 daemon_queue_send(client, &r->hdr);
140 this is called when the ctdb daemon received a ctdb request to
141 set the srvid from the client
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
145 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
147 if (client == NULL) {
148 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
151 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
153 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
154 (unsigned long long)srvid));
156 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
157 (unsigned long long)srvid));
164 this is called when the ctdb daemon received a ctdb request to
165 remove a srvid from the client
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
169 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170 if (client == NULL) {
171 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
174 return ctdb_deregister_message_handler(ctdb, srvid, client);
179 destroy a ctdb_client
181 static int ctdb_client_destructor(struct ctdb_client *client)
183 struct ctdb_db_context *ctdb_db;
185 ctdb_takeover_client_destructor_hook(client);
186 ctdb_reqid_remove(client->ctdb, client->client_id);
187 if (client->ctdb->statistics.num_clients) {
188 client->ctdb->statistics.num_clients--;
191 if (client->num_persistent_updates != 0) {
192 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
195 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
197 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198 "commit active. Forcing recovery.\n"));
199 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200 ctdb_db->transaction_active = false;
208 this is called when the ctdb daemon received a ctdb request message
209 from a local client over the unix domain socket
211 static void daemon_request_message_from_client(struct ctdb_client *client,
212 struct ctdb_req_message *c)
217 /* maybe the message is for another client on this node */
218 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
223 /* its for a remote node */
224 data.dptr = &c->data[0];
225 data.dsize = c->datalen;
226 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
229 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
235 struct daemon_call_state {
236 struct ctdb_client *client;
238 struct ctdb_call *call;
239 struct timeval start_time;
243 complete a call from a client
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
247 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
248 struct daemon_call_state);
249 struct ctdb_reply_call *r;
252 struct ctdb_client *client = dstate->client;
253 struct ctdb_db_context *ctdb_db = state->ctdb_db;
255 talloc_steal(client, dstate);
256 talloc_steal(dstate, dstate->call);
258 res = ctdb_daemon_call_recv(state, dstate->call);
260 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261 if (client->ctdb->statistics.pending_calls > 0) {
262 client->ctdb->statistics.pending_calls--;
264 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
268 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
270 length, struct ctdb_reply_call);
272 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273 if (client->ctdb->statistics.pending_calls > 0) {
274 client->ctdb->statistics.pending_calls--;
276 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
279 r->hdr.reqid = dstate->reqid;
280 r->datalen = dstate->call->reply_data.dsize;
281 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
283 res = daemon_queue_send(client, &r->hdr);
285 /* client is dead - return immediately */
289 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
293 if (client->ctdb->statistics.pending_calls > 0) {
294 client->ctdb->statistics.pending_calls--;
298 struct ctdb_daemon_packet_wrap {
299 struct ctdb_context *ctdb;
304 a wrapper to catch disconnected clients
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
308 struct ctdb_client *client;
309 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
310 struct ctdb_daemon_packet_wrap);
312 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
316 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317 if (client == NULL) {
318 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
326 daemon_incoming_packet(client, hdr);
331 this is called when the ctdb daemon received a ctdb request call
332 from a local client over the unix domain socket
334 static void daemon_request_call_from_client(struct ctdb_client *client,
335 struct ctdb_req_call *c)
337 struct ctdb_call_state *state;
338 struct ctdb_db_context *ctdb_db;
339 struct daemon_call_state *dstate;
340 struct ctdb_call *call;
341 struct ctdb_ltdb_header header;
344 struct ctdb_context *ctdb = client->ctdb;
345 struct ctdb_daemon_packet_wrap *w;
347 ctdb->statistics.total_calls++;
348 if (client->ctdb->statistics.pending_calls > 0) {
349 ctdb->statistics.pending_calls++;
352 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
354 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
356 if (client->ctdb->statistics.pending_calls > 0) {
357 ctdb->statistics.pending_calls--;
362 if (ctdb_db->unhealthy_reason) {
364 * this is just a warning, as the tdb should be empty anyway,
365 * and only persistent databases can be unhealthy, which doesn't
366 * use this code patch
368 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369 ctdb_db->db_name, ctdb_db->unhealthy_reason));
373 key.dsize = c->keylen;
375 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376 CTDB_NO_MEMORY_VOID(ctdb, w);
379 w->client_id = client->client_id;
381 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
382 (struct ctdb_req_header *)c, &data,
383 daemon_incoming_packet_wrap, w, True);
385 /* will retry later */
386 if (client->ctdb->statistics.pending_calls > 0) {
387 ctdb->statistics.pending_calls--;
395 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396 if (client->ctdb->statistics.pending_calls > 0) {
397 ctdb->statistics.pending_calls--;
402 dstate = talloc(client, struct daemon_call_state);
403 if (dstate == NULL) {
404 ctdb_ltdb_unlock(ctdb_db, key);
405 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
406 if (client->ctdb->statistics.pending_calls > 0) {
407 ctdb->statistics.pending_calls--;
411 dstate->start_time = timeval_current();
412 dstate->client = client;
413 dstate->reqid = c->hdr.reqid;
414 talloc_steal(dstate, data.dptr);
416 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
418 ctdb_ltdb_unlock(ctdb_db, key);
419 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
420 if (client->ctdb->statistics.pending_calls > 0) {
421 ctdb->statistics.pending_calls--;
423 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
427 call->call_id = c->callid;
429 call->call_data.dptr = c->data + c->keylen;
430 call->call_data.dsize = c->calldatalen;
431 call->flags = c->flags;
433 if (header.dmaster == ctdb->pnn) {
434 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
436 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
439 ctdb_ltdb_unlock(ctdb_db, key);
442 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
443 if (client->ctdb->statistics.pending_calls > 0) {
444 ctdb->statistics.pending_calls--;
446 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
449 talloc_steal(state, dstate);
450 talloc_steal(client, state);
452 state->async.fn = daemon_call_from_client_callback;
453 state->async.private_data = dstate;
457 static void daemon_request_control_from_client(struct ctdb_client *client,
458 struct ctdb_req_control *c);
460 /* data contains a packet from the client */
461 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
463 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
465 struct ctdb_context *ctdb = client->ctdb;
467 /* place the packet as a child of a tmp_ctx. We then use
468 talloc_free() below to free it. If any of the calls want
469 to keep it, then they will steal it somewhere else, and the
470 talloc_free() will be a no-op */
471 tmp_ctx = talloc_new(client);
472 talloc_steal(tmp_ctx, hdr);
474 if (hdr->ctdb_magic != CTDB_MAGIC) {
475 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
479 if (hdr->ctdb_version != CTDB_VERSION) {
480 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
484 switch (hdr->operation) {
486 ctdb->statistics.client.req_call++;
487 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
490 case CTDB_REQ_MESSAGE:
491 ctdb->statistics.client.req_message++;
492 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
495 case CTDB_REQ_CONTROL:
496 ctdb->statistics.client.req_control++;
497 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
501 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
506 talloc_free(tmp_ctx);
510 called when the daemon gets a incoming packet
512 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
514 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
515 struct ctdb_req_header *hdr;
522 client->ctdb->statistics.client_packets_recv++;
524 if (cnt < sizeof(*hdr)) {
525 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
529 hdr = (struct ctdb_req_header *)data;
530 if (cnt != hdr->length) {
531 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
532 (unsigned)hdr->length, (unsigned)cnt);
536 if (hdr->ctdb_magic != CTDB_MAGIC) {
537 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
541 if (hdr->ctdb_version != CTDB_VERSION) {
542 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
546 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
547 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
548 hdr->srcnode, hdr->destnode));
550 /* it is the responsibility of the incoming packet function to free 'data' */
551 daemon_incoming_packet(client, hdr);
555 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
557 if (client_pid->ctdb->client_pids != NULL) {
558 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
565 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
566 uint16_t flags, void *private_data)
568 struct sockaddr_un addr;
571 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572 struct ctdb_client *client;
573 struct ctdb_client_pid_list *client_pid;
575 struct peercred_struct cr;
576 socklen_t crl = sizeof(struct peercred_struct);
579 socklen_t crl = sizeof(struct ucred);
582 memset(&addr, 0, sizeof(addr));
584 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
590 set_close_on_exec(fd);
592 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
594 client = talloc_zero(ctdb, struct ctdb_client);
596 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
598 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
600 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
605 client->client_id = ctdb_reqid_new(ctdb, client);
606 client->pid = cr.pid;
608 client_pid = talloc(client, struct ctdb_client_pid_list);
609 if (client_pid == NULL) {
610 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
615 client_pid->ctdb = ctdb;
616 client_pid->pid = cr.pid;
617 client_pid->client = client;
619 DLIST_ADD(ctdb->client_pids, client_pid);
621 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
622 ctdb_daemon_read_cb, client);
624 talloc_set_destructor(client, ctdb_client_destructor);
625 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
626 ctdb->statistics.num_clients++;
632 create a unix domain socket and bind it
633 return a file descriptor open on the socket
635 static int ux_socket_bind(struct ctdb_context *ctdb)
637 struct sockaddr_un addr;
639 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
640 if (ctdb->daemon.sd == -1) {
644 set_close_on_exec(ctdb->daemon.sd);
645 set_nonblocking(ctdb->daemon.sd);
647 memset(&addr, 0, sizeof(addr));
648 addr.sun_family = AF_UNIX;
649 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
651 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
652 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
656 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
657 chmod(ctdb->daemon.name, 0700) != 0) {
658 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
663 if (listen(ctdb->daemon.sd, 100) != 0) {
664 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
671 close(ctdb->daemon.sd);
672 ctdb->daemon.sd = -1;
676 static void sig_child_handler(struct event_context *ev,
677 struct signal_event *se, int signum, int count,
681 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
686 pid = waitpid(-1, &status, WNOHANG);
688 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
692 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
698 start the protocol going as a daemon
700 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
703 struct fd_event *fde;
704 const char *domain_socket_name;
705 struct signal_event *se;
707 /* get rid of any old sockets */
708 unlink(ctdb->daemon.name);
710 /* create a unix domain stream socket to listen to */
711 res = ux_socket_bind(ctdb);
713 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
717 if (do_fork && fork()) {
721 tdb_reopen_all(False);
726 if (open("/dev/null", O_RDONLY) != 0) {
727 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
731 block_signal(SIGPIPE);
733 ctdbd_pid = getpid();
736 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
738 ctdb_high_priority(ctdb);
740 /* ensure the socket is deleted on exit of the daemon */
741 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
742 if (domain_socket_name == NULL) {
743 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
747 ctdb->ev = event_context_init(NULL);
749 ctdb_set_child_logging(ctdb);
751 /* force initial recovery for election */
752 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
754 if (strcmp(ctdb->transport, "tcp") == 0) {
755 int ctdb_tcp_init(struct ctdb_context *);
756 ret = ctdb_tcp_init(ctdb);
758 #ifdef USE_INFINIBAND
759 if (strcmp(ctdb->transport, "ib") == 0) {
760 int ctdb_ibw_init(struct ctdb_context *);
761 ret = ctdb_ibw_init(ctdb);
765 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
769 if (ctdb->methods == NULL) {
770 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
771 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
774 /* initialise the transport */
775 if (ctdb->methods->initialise(ctdb) != 0) {
776 ctdb_fatal(ctdb, "transport failed to initialise");
779 /* attach to existing databases */
780 if (ctdb_attach_databases(ctdb) != 0) {
781 ctdb_fatal(ctdb, "Failed to attach to databases\n");
784 /* start frozen, then let the first election sort things out */
785 if (ctdb_blocking_freeze(ctdb)) {
786 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
789 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
791 ctdb_fatal(ctdb, "Failed to run init event\n");
793 ctdb_run_notification_script(ctdb, "init");
795 /* now start accepting clients, only can do this once frozen */
796 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
797 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
798 ctdb_accept_client, ctdb);
800 /* tell all other nodes we've just started up */
801 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
802 0, CTDB_CONTROL_STARTUP, 0,
803 CTDB_CTRL_FLAG_NOREPLY,
804 tdb_null, NULL, NULL);
806 /* release any IPs we hold from previous runs of the daemon */
807 ctdb_release_all_ips(ctdb);
809 /* start the transport going */
810 ctdb_start_transport(ctdb);
812 /* set up a handler to pick up sigchld */
813 se = event_add_signal(ctdb->ev, ctdb,
818 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
823 if (start_syslog_daemon(ctdb)) {
824 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
829 ctdb_lockdown_memory(ctdb);
831 /* go into a wait loop to allow other nodes to complete */
832 event_loop_wait(ctdb->ev);
834 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
839 allocate a packet for use in daemon<->daemon communication
841 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
843 enum ctdb_operation operation,
844 size_t length, size_t slength,
848 struct ctdb_req_header *hdr;
850 length = MAX(length, slength);
851 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
853 if (ctdb->methods == NULL) {
854 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
855 operation, (unsigned)length));
859 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
861 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
862 operation, (unsigned)length));
865 talloc_set_name_const(hdr, type);
866 memset(hdr, 0, slength);
867 hdr->length = length;
868 hdr->operation = operation;
869 hdr->ctdb_magic = CTDB_MAGIC;
870 hdr->ctdb_version = CTDB_VERSION;
871 hdr->generation = ctdb->vnn_map->generation;
872 hdr->srcnode = ctdb->pnn;
877 struct daemon_control_state {
878 struct daemon_control_state *next, *prev;
879 struct ctdb_client *client;
880 struct ctdb_req_control *c;
882 struct ctdb_node *node;
886 callback when a control reply comes in
888 static void daemon_control_callback(struct ctdb_context *ctdb,
889 int32_t status, TDB_DATA data,
890 const char *errormsg,
893 struct daemon_control_state *state = talloc_get_type(private_data,
894 struct daemon_control_state);
895 struct ctdb_client *client = state->client;
896 struct ctdb_reply_control *r;
900 /* construct a message to send to the client containing the data */
901 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
903 len += strlen(errormsg);
905 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
906 struct ctdb_reply_control);
907 CTDB_NO_MEMORY_VOID(ctdb, r);
909 r->hdr.reqid = state->reqid;
911 r->datalen = data.dsize;
913 memcpy(&r->data[0], data.dptr, data.dsize);
915 r->errorlen = strlen(errormsg);
916 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
919 ret = daemon_queue_send(client, &r->hdr);
926 fail all pending controls to a disconnected node
928 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
930 struct daemon_control_state *state;
931 while ((state = node->pending_controls)) {
932 DLIST_REMOVE(node->pending_controls, state);
933 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
934 "node is disconnected", state);
939 destroy a daemon_control_state
941 static int daemon_control_destructor(struct daemon_control_state *state)
944 DLIST_REMOVE(state->node->pending_controls, state);
950 this is called when the ctdb daemon received a ctdb request control
951 from a local client over the unix domain socket
953 static void daemon_request_control_from_client(struct ctdb_client *client,
954 struct ctdb_req_control *c)
958 struct daemon_control_state *state;
959 TALLOC_CTX *tmp_ctx = talloc_new(client);
961 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
962 c->hdr.destnode = client->ctdb->pnn;
965 state = talloc(client, struct daemon_control_state);
966 CTDB_NO_MEMORY_VOID(client->ctdb, state);
968 state->client = client;
969 state->c = talloc_steal(state, c);
970 state->reqid = c->hdr.reqid;
971 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
972 state->node = client->ctdb->nodes[c->hdr.destnode];
973 DLIST_ADD(state->node->pending_controls, state);
978 talloc_set_destructor(state, daemon_control_destructor);
980 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
981 talloc_steal(tmp_ctx, state);
984 data.dptr = &c->data[0];
985 data.dsize = c->datalen;
986 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
987 c->srvid, c->opcode, client->client_id,
989 data, daemon_control_callback,
992 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
996 talloc_free(tmp_ctx);
1000 register a call function
1002 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1003 ctdb_fn_t fn, int id)
1005 struct ctdb_registered_call *call;
1006 struct ctdb_db_context *ctdb_db;
1008 ctdb_db = find_ctdb_db(ctdb, db_id);
1009 if (ctdb_db == NULL) {
1013 call = talloc(ctdb_db, struct ctdb_registered_call);
1017 DLIST_ADD(ctdb_db->calls, call);
1024 this local messaging handler is ugly, but is needed to prevent
1025 recursion in ctdb_send_message() when the destination node is the
1026 same as the source node
1028 struct ctdb_local_message {
1029 struct ctdb_context *ctdb;
1034 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1035 struct timeval t, void *private_data)
1037 struct ctdb_local_message *m = talloc_get_type(private_data,
1038 struct ctdb_local_message);
1041 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1043 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1044 (unsigned long long)m->srvid));
1049 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1051 struct ctdb_local_message *m;
1052 m = talloc(ctdb, struct ctdb_local_message);
1053 CTDB_NO_MEMORY(ctdb, m);
1058 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1059 if (m->data.dptr == NULL) {
1064 /* this needs to be done as an event to prevent recursion */
1065 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1072 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1073 uint64_t srvid, TDB_DATA data)
1075 struct ctdb_req_message *r;
1078 if (ctdb->methods == NULL) {
1079 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1083 /* see if this is a message to ourselves */
1084 if (pnn == ctdb->pnn) {
1085 return ctdb_local_message(ctdb, srvid, data);
1088 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1089 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1090 struct ctdb_req_message);
1091 CTDB_NO_MEMORY(ctdb, r);
1093 r->hdr.destnode = pnn;
1095 r->datalen = data.dsize;
1096 memcpy(&r->data[0], data.dptr, data.dsize);
1098 ctdb_queue_packet(ctdb, &r->hdr);
1106 struct ctdb_client_notify_list {
1107 struct ctdb_client_notify_list *next, *prev;
1108 struct ctdb_context *ctdb;
1114 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1118 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1120 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1122 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1128 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1130 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1131 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1132 struct ctdb_client_notify_list *nl;
1134 DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1136 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1137 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1141 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1142 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1147 if (client == NULL) {
1148 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1152 for(nl=client->notify; nl; nl=nl->next) {
1153 if (nl->srvid == notify->srvid) {
1158 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1162 nl = talloc(client, struct ctdb_client_notify_list);
1163 CTDB_NO_MEMORY(ctdb, nl);
1165 nl->srvid = notify->srvid;
1166 nl->data.dsize = notify->len;
1167 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1168 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1169 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1171 DLIST_ADD(client->notify, nl);
1172 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1177 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1179 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1180 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1181 struct ctdb_client_notify_list *nl;
1183 DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1185 if (client == NULL) {
1186 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1190 for(nl=client->notify; nl; nl=nl->next) {
1191 if (nl->srvid == notify->srvid) {
1196 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1200 DLIST_REMOVE(client->notify, nl);
1201 talloc_set_destructor(nl, NULL);
1207 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1209 struct ctdb_client_pid_list *client_pid;
1211 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1212 if (client_pid->pid == pid) {
1213 return client_pid->client;
1220 /* This control is used by samba when probing if a process (of a samba daemon)
1222 Samba does this when it needs/wants to check if a subrecord in one of the
1223 databases is still valied, or if it is stale and can be removed.
1224 If the node is in unhealthy or stopped state we just kill of the samba
1225 process holding htis sub-record and return to the calling samba that
1226 the process does not exist.
1227 This allows us to forcefully recall subrecords registered by samba processes
1228 on banned and stopped nodes.
1230 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1232 struct ctdb_client *client;
1234 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1235 client = ctdb_find_client_by_pid(ctdb, pid);
1236 if (client != NULL) {
1237 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1238 talloc_free(client);
1243 return kill(pid, 0);