4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
107 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
111 message handler for when we are in daemon mode. This redirects the message
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
115 TDB_DATA data, void *private_data)
117 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118 struct ctdb_req_message *r;
121 /* construct a message to send to the client containing the data */
122 len = offsetof(struct ctdb_req_message, data) + data.dsize;
123 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
124 len, struct ctdb_req_message);
125 CTDB_NO_MEMORY_VOID(ctdb, r);
127 talloc_set_name_const(r, "req_message packet");
130 r->datalen = data.dsize;
131 memcpy(&r->data[0], data.dptr, data.dsize);
133 daemon_queue_send(client, &r->hdr);
139 this is called when the ctdb daemon received a ctdb request to
140 set the srvid from the client
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146 if (client == NULL) {
147 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
150 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
153 (unsigned long long)srvid));
155 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
156 (unsigned long long)srvid));
163 this is called when the ctdb daemon received a ctdb request to
164 remove a srvid from the client
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169 if (client == NULL) {
170 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
173 return ctdb_deregister_message_handler(ctdb, srvid, client);
178 destroy a ctdb_client
180 static int ctdb_client_destructor(struct ctdb_client *client)
182 struct ctdb_db_context *ctdb_db;
184 ctdb_takeover_client_destructor_hook(client);
185 ctdb_reqid_remove(client->ctdb, client->client_id);
186 if (client->ctdb->statistics.num_clients) {
187 client->ctdb->statistics.num_clients--;
190 if (client->num_persistent_updates != 0) {
191 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197 "commit active. Forcing recovery.\n"));
198 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199 ctdb_db->transaction_active = false;
207 this is called when the ctdb daemon received a ctdb request message
208 from a local client over the unix domain socket
210 static void daemon_request_message_from_client(struct ctdb_client *client,
211 struct ctdb_req_message *c)
216 /* maybe the message is for another client on this node */
217 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
222 /* its for a remote node */
223 data.dptr = &c->data[0];
224 data.dsize = c->datalen;
225 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
228 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
234 struct daemon_call_state {
235 struct ctdb_client *client;
237 struct ctdb_call *call;
238 struct timeval start_time;
242 complete a call from a client
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
247 struct daemon_call_state);
248 struct ctdb_reply_call *r;
251 struct ctdb_client *client = dstate->client;
252 struct ctdb_db_context *ctdb_db = state->ctdb_db;
254 talloc_steal(client, dstate);
255 talloc_steal(dstate, dstate->call);
257 res = ctdb_daemon_call_recv(state, dstate->call);
259 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260 if (client->ctdb->statistics.pending_calls > 0) {
261 client->ctdb->statistics.pending_calls--;
263 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
267 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
269 length, struct ctdb_reply_call);
271 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272 if (client->ctdb->statistics.pending_calls > 0) {
273 client->ctdb->statistics.pending_calls--;
275 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
278 r->hdr.reqid = dstate->reqid;
279 r->datalen = dstate->call->reply_data.dsize;
280 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282 res = daemon_queue_send(client, &r->hdr);
284 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
286 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
288 if (client->ctdb->statistics.pending_calls > 0) {
289 client->ctdb->statistics.pending_calls--;
293 struct ctdb_daemon_packet_wrap {
294 struct ctdb_context *ctdb;
299 a wrapper to catch disconnected clients
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
303 struct ctdb_client *client;
304 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
305 struct ctdb_daemon_packet_wrap);
307 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
311 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312 if (client == NULL) {
313 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
321 daemon_incoming_packet(client, hdr);
326 this is called when the ctdb daemon received a ctdb request call
327 from a local client over the unix domain socket
329 static void daemon_request_call_from_client(struct ctdb_client *client,
330 struct ctdb_req_call *c)
332 struct ctdb_call_state *state;
333 struct ctdb_db_context *ctdb_db;
334 struct daemon_call_state *dstate;
335 struct ctdb_call *call;
336 struct ctdb_ltdb_header header;
339 struct ctdb_context *ctdb = client->ctdb;
340 struct ctdb_daemon_packet_wrap *w;
342 ctdb->statistics.total_calls++;
343 if (client->ctdb->statistics.pending_calls > 0) {
344 ctdb->statistics.pending_calls++;
347 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
349 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
351 if (client->ctdb->statistics.pending_calls > 0) {
352 ctdb->statistics.pending_calls--;
357 if (ctdb_db->unhealthy_reason) {
359 * this is just a warning, as the tdb should be empty anyway,
360 * and only persistent databases can be unhealthy, which doesn't
361 * use this code patch
363 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
364 ctdb_db->db_name, ctdb_db->unhealthy_reason));
368 key.dsize = c->keylen;
370 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
371 CTDB_NO_MEMORY_VOID(ctdb, w);
374 w->client_id = client->client_id;
376 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
377 (struct ctdb_req_header *)c, &data,
378 daemon_incoming_packet_wrap, w, True);
380 /* will retry later */
381 if (client->ctdb->statistics.pending_calls > 0) {
382 ctdb->statistics.pending_calls--;
390 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
391 if (client->ctdb->statistics.pending_calls > 0) {
392 ctdb->statistics.pending_calls--;
397 dstate = talloc(client, struct daemon_call_state);
398 if (dstate == NULL) {
399 ctdb_ltdb_unlock(ctdb_db, key);
400 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
401 if (client->ctdb->statistics.pending_calls > 0) {
402 ctdb->statistics.pending_calls--;
406 dstate->start_time = timeval_current();
407 dstate->client = client;
408 dstate->reqid = c->hdr.reqid;
409 talloc_steal(dstate, data.dptr);
411 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
413 ctdb_ltdb_unlock(ctdb_db, key);
414 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
415 if (client->ctdb->statistics.pending_calls > 0) {
416 ctdb->statistics.pending_calls--;
418 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
422 call->call_id = c->callid;
424 call->call_data.dptr = c->data + c->keylen;
425 call->call_data.dsize = c->calldatalen;
426 call->flags = c->flags;
428 if (header.dmaster == ctdb->pnn) {
429 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
431 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
434 ctdb_ltdb_unlock(ctdb_db, key);
437 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
438 if (client->ctdb->statistics.pending_calls > 0) {
439 ctdb->statistics.pending_calls--;
441 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
444 talloc_steal(state, dstate);
445 talloc_steal(client, state);
447 state->async.fn = daemon_call_from_client_callback;
448 state->async.private_data = dstate;
452 static void daemon_request_control_from_client(struct ctdb_client *client,
453 struct ctdb_req_control *c);
455 /* data contains a packet from the client */
456 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
458 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
460 struct ctdb_context *ctdb = client->ctdb;
462 /* place the packet as a child of a tmp_ctx. We then use
463 talloc_free() below to free it. If any of the calls want
464 to keep it, then they will steal it somewhere else, and the
465 talloc_free() will be a no-op */
466 tmp_ctx = talloc_new(client);
467 talloc_steal(tmp_ctx, hdr);
469 if (hdr->ctdb_magic != CTDB_MAGIC) {
470 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
474 if (hdr->ctdb_version != CTDB_VERSION) {
475 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
479 switch (hdr->operation) {
481 ctdb->statistics.client.req_call++;
482 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
485 case CTDB_REQ_MESSAGE:
486 ctdb->statistics.client.req_message++;
487 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
490 case CTDB_REQ_CONTROL:
491 ctdb->statistics.client.req_control++;
492 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
496 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
501 talloc_free(tmp_ctx);
505 called when the daemon gets a incoming packet
507 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
509 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
510 struct ctdb_req_header *hdr;
517 client->ctdb->statistics.client_packets_recv++;
519 if (cnt < sizeof(*hdr)) {
520 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
524 hdr = (struct ctdb_req_header *)data;
525 if (cnt != hdr->length) {
526 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
527 (unsigned)hdr->length, (unsigned)cnt);
531 if (hdr->ctdb_magic != CTDB_MAGIC) {
532 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
536 if (hdr->ctdb_version != CTDB_VERSION) {
537 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
541 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
542 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
543 hdr->srcnode, hdr->destnode));
545 /* it is the responsibility of the incoming packet function to free 'data' */
546 daemon_incoming_packet(client, hdr);
550 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
552 if (client_pid->ctdb->client_pids != NULL) {
553 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
560 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
561 uint16_t flags, void *private_data)
563 struct sockaddr_un addr;
566 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
567 struct ctdb_client *client;
568 struct ctdb_client_pid_list *client_pid;
570 struct peercred_struct cr;
571 socklen_t crl = sizeof(struct peercred_struct);
574 socklen_t crl = sizeof(struct ucred);
577 memset(&addr, 0, sizeof(addr));
579 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
585 set_close_on_exec(fd);
587 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
589 client = talloc_zero(ctdb, struct ctdb_client);
591 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
593 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
595 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
600 client->client_id = ctdb_reqid_new(ctdb, client);
601 client->pid = cr.pid;
603 client_pid = talloc(client, struct ctdb_client_pid_list);
604 if (client_pid == NULL) {
605 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
610 client_pid->ctdb = ctdb;
611 client_pid->pid = cr.pid;
612 client_pid->client = client;
614 DLIST_ADD(ctdb->client_pids, client_pid);
616 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
617 ctdb_daemon_read_cb, client);
619 talloc_set_destructor(client, ctdb_client_destructor);
620 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
621 ctdb->statistics.num_clients++;
627 create a unix domain socket and bind it
628 return a file descriptor open on the socket
630 static int ux_socket_bind(struct ctdb_context *ctdb)
632 struct sockaddr_un addr;
634 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
635 if (ctdb->daemon.sd == -1) {
639 set_close_on_exec(ctdb->daemon.sd);
640 set_nonblocking(ctdb->daemon.sd);
642 memset(&addr, 0, sizeof(addr));
643 addr.sun_family = AF_UNIX;
644 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
646 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
647 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
651 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
652 chmod(ctdb->daemon.name, 0700) != 0) {
653 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
658 if (listen(ctdb->daemon.sd, 100) != 0) {
659 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
666 close(ctdb->daemon.sd);
667 ctdb->daemon.sd = -1;
671 static void sig_child_handler(struct event_context *ev,
672 struct signal_event *se, int signum, int count,
676 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
681 pid = waitpid(-1, &status, WNOHANG);
683 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
687 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
693 start the protocol going as a daemon
695 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
698 struct fd_event *fde;
699 const char *domain_socket_name;
700 struct signal_event *se;
702 /* get rid of any old sockets */
703 unlink(ctdb->daemon.name);
705 /* create a unix domain stream socket to listen to */
706 res = ux_socket_bind(ctdb);
708 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
712 if (do_fork && fork()) {
716 tdb_reopen_all(False);
721 if (open("/dev/null", O_RDONLY) != 0) {
722 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
726 block_signal(SIGPIPE);
728 ctdbd_pid = getpid();
731 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
733 ctdb_high_priority(ctdb);
735 /* ensure the socket is deleted on exit of the daemon */
736 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
737 if (domain_socket_name == NULL) {
738 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
742 ctdb->ev = event_context_init(NULL);
744 ctdb_set_child_logging(ctdb);
746 /* force initial recovery for election */
747 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
749 if (strcmp(ctdb->transport, "tcp") == 0) {
750 int ctdb_tcp_init(struct ctdb_context *);
751 ret = ctdb_tcp_init(ctdb);
753 #ifdef USE_INFINIBAND
754 if (strcmp(ctdb->transport, "ib") == 0) {
755 int ctdb_ibw_init(struct ctdb_context *);
756 ret = ctdb_ibw_init(ctdb);
760 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
764 if (ctdb->methods == NULL) {
765 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
766 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
769 /* initialise the transport */
770 if (ctdb->methods->initialise(ctdb) != 0) {
771 ctdb_fatal(ctdb, "transport failed to initialise");
774 /* attach to existing databases */
775 if (ctdb_attach_databases(ctdb) != 0) {
776 ctdb_fatal(ctdb, "Failed to attach to databases\n");
779 /* start frozen, then let the first election sort things out */
780 if (ctdb_blocking_freeze(ctdb)) {
781 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
784 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
786 ctdb_fatal(ctdb, "Failed to run init event\n");
788 ctdb_run_notification_script(ctdb, "init");
790 /* now start accepting clients, only can do this once frozen */
791 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
792 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
793 ctdb_accept_client, ctdb);
795 /* tell all other nodes we've just started up */
796 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
797 0, CTDB_CONTROL_STARTUP, 0,
798 CTDB_CTRL_FLAG_NOREPLY,
799 tdb_null, NULL, NULL);
801 /* release any IPs we hold from previous runs of the daemon */
802 ctdb_release_all_ips(ctdb);
804 /* start the transport going */
805 ctdb_start_transport(ctdb);
807 /* set up a handler to pick up sigchld */
808 se = event_add_signal(ctdb->ev, ctdb,
813 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
818 if (start_syslog_daemon(ctdb)) {
819 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
824 ctdb_lockdown_memory(ctdb);
826 /* go into a wait loop to allow other nodes to complete */
827 event_loop_wait(ctdb->ev);
829 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
834 allocate a packet for use in daemon<->daemon communication
836 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
838 enum ctdb_operation operation,
839 size_t length, size_t slength,
843 struct ctdb_req_header *hdr;
845 length = MAX(length, slength);
846 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
848 if (ctdb->methods == NULL) {
849 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
850 operation, (unsigned)length));
854 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
856 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
857 operation, (unsigned)length));
860 talloc_set_name_const(hdr, type);
861 memset(hdr, 0, slength);
862 hdr->length = length;
863 hdr->operation = operation;
864 hdr->ctdb_magic = CTDB_MAGIC;
865 hdr->ctdb_version = CTDB_VERSION;
866 hdr->generation = ctdb->vnn_map->generation;
867 hdr->srcnode = ctdb->pnn;
872 struct daemon_control_state {
873 struct daemon_control_state *next, *prev;
874 struct ctdb_client *client;
875 struct ctdb_req_control *c;
877 struct ctdb_node *node;
881 callback when a control reply comes in
883 static void daemon_control_callback(struct ctdb_context *ctdb,
884 int32_t status, TDB_DATA data,
885 const char *errormsg,
888 struct daemon_control_state *state = talloc_get_type(private_data,
889 struct daemon_control_state);
890 struct ctdb_client *client = state->client;
891 struct ctdb_reply_control *r;
894 /* construct a message to send to the client containing the data */
895 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
897 len += strlen(errormsg);
899 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
900 struct ctdb_reply_control);
901 CTDB_NO_MEMORY_VOID(ctdb, r);
903 r->hdr.reqid = state->reqid;
905 r->datalen = data.dsize;
907 memcpy(&r->data[0], data.dptr, data.dsize);
909 r->errorlen = strlen(errormsg);
910 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
913 daemon_queue_send(client, &r->hdr);
919 fail all pending controls to a disconnected node
921 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
923 struct daemon_control_state *state;
924 while ((state = node->pending_controls)) {
925 DLIST_REMOVE(node->pending_controls, state);
926 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
927 "node is disconnected", state);
932 destroy a daemon_control_state
934 static int daemon_control_destructor(struct daemon_control_state *state)
937 DLIST_REMOVE(state->node->pending_controls, state);
943 this is called when the ctdb daemon received a ctdb request control
944 from a local client over the unix domain socket
946 static void daemon_request_control_from_client(struct ctdb_client *client,
947 struct ctdb_req_control *c)
951 struct daemon_control_state *state;
952 TALLOC_CTX *tmp_ctx = talloc_new(client);
954 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
955 c->hdr.destnode = client->ctdb->pnn;
958 state = talloc(client, struct daemon_control_state);
959 CTDB_NO_MEMORY_VOID(client->ctdb, state);
961 state->client = client;
962 state->c = talloc_steal(state, c);
963 state->reqid = c->hdr.reqid;
964 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
965 state->node = client->ctdb->nodes[c->hdr.destnode];
966 DLIST_ADD(state->node->pending_controls, state);
971 talloc_set_destructor(state, daemon_control_destructor);
973 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
974 talloc_steal(tmp_ctx, state);
977 data.dptr = &c->data[0];
978 data.dsize = c->datalen;
979 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
980 c->srvid, c->opcode, client->client_id,
982 data, daemon_control_callback,
985 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
989 talloc_free(tmp_ctx);
993 register a call function
995 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
996 ctdb_fn_t fn, int id)
998 struct ctdb_registered_call *call;
999 struct ctdb_db_context *ctdb_db;
1001 ctdb_db = find_ctdb_db(ctdb, db_id);
1002 if (ctdb_db == NULL) {
1006 call = talloc(ctdb_db, struct ctdb_registered_call);
1010 DLIST_ADD(ctdb_db->calls, call);
1017 this local messaging handler is ugly, but is needed to prevent
1018 recursion in ctdb_send_message() when the destination node is the
1019 same as the source node
1021 struct ctdb_local_message {
1022 struct ctdb_context *ctdb;
1027 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1028 struct timeval t, void *private_data)
1030 struct ctdb_local_message *m = talloc_get_type(private_data,
1031 struct ctdb_local_message);
1034 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1036 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1037 (unsigned long long)m->srvid));
1042 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1044 struct ctdb_local_message *m;
1045 m = talloc(ctdb, struct ctdb_local_message);
1046 CTDB_NO_MEMORY(ctdb, m);
1051 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1052 if (m->data.dptr == NULL) {
1057 /* this needs to be done as an event to prevent recursion */
1058 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1065 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1066 uint64_t srvid, TDB_DATA data)
1068 struct ctdb_req_message *r;
1071 if (ctdb->methods == NULL) {
1072 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1076 /* see if this is a message to ourselves */
1077 if (pnn == ctdb->pnn) {
1078 return ctdb_local_message(ctdb, srvid, data);
1081 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1082 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1083 struct ctdb_req_message);
1084 CTDB_NO_MEMORY(ctdb, r);
1086 r->hdr.destnode = pnn;
1088 r->datalen = data.dsize;
1089 memcpy(&r->data[0], data.dptr, data.dsize);
1091 ctdb_queue_packet(ctdb, &r->hdr);
1099 struct ctdb_client_notify_list {
1100 struct ctdb_client_notify_list *next, *prev;
1101 struct ctdb_context *ctdb;
1107 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1111 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1113 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1115 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1121 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1123 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1124 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1125 struct ctdb_client_notify_list *nl;
1127 DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1129 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1130 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1134 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1135 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1140 if (client == NULL) {
1141 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1145 for(nl=client->notify; nl; nl=nl->next) {
1146 if (nl->srvid == notify->srvid) {
1151 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1155 nl = talloc(client, struct ctdb_client_notify_list);
1156 CTDB_NO_MEMORY(ctdb, nl);
1158 nl->srvid = notify->srvid;
1159 nl->data.dsize = notify->len;
1160 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1161 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1162 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1164 DLIST_ADD(client->notify, nl);
1165 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1170 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1172 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1173 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1174 struct ctdb_client_notify_list *nl;
1176 DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1178 if (client == NULL) {
1179 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1183 for(nl=client->notify; nl; nl=nl->next) {
1184 if (nl->srvid == notify->srvid) {
1189 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1193 DLIST_REMOVE(client->notify, nl);
1194 talloc_set_destructor(nl, NULL);
1200 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1202 struct ctdb_client_pid_list *client_pid;
1204 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1205 if (client_pid->pid == pid) {
1206 return client_pid->client;
1213 /* This control is used by samba when probing if a process (of a samba daemon)
1215 Samba does this when it needs/wants to check if a subrecord in one of the
1216 databases is still valied, or if it is stale and can be removed.
1217 If the node is in unhealthy or stopped state we just kill of the samba
1218 process holding htis sub-record and return to the calling samba that
1219 the process does not exist.
1220 This allows us to forcefully recall subrecords registered by samba processes
1221 on banned and stopped nodes.
1223 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1225 struct ctdb_client *client;
1227 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1228 client = ctdb_find_client_by_pid(ctdb, pid);
1229 if (client != NULL) {
1230 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1231 talloc_free(client);
1236 return kill(pid, 0);