4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
32 struct ctdb_client_pid_list {
33 struct ctdb_client_pid_list *next, *prev;
34 struct ctdb_context *ctdb;
36 struct ctdb_client *client;
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
41 static void print_exit_message(void)
43 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
49 if (ctdb->methods == NULL) {
50 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
54 /* start the transport running */
55 if (ctdb->methods->start(ctdb) != 0) {
56 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57 ctdb_fatal(ctdb, "transport failed to start");
60 /* start the recovery daemon process */
61 if (ctdb_start_recoverd(ctdb) != 0) {
62 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
66 /* Make sure we log something when the daemon terminates */
67 atexit(print_exit_message);
69 /* start monitoring for connected/disconnected nodes */
70 ctdb_start_keepalive(ctdb);
72 /* start monitoring for node health */
73 ctdb_start_monitoring(ctdb);
75 /* start periodic update of tcp tickle lists */
76 ctdb_start_tcp_tickle_update(ctdb);
78 /* start listening for recovery daemon pings */
79 ctdb_control_recd_ping(ctdb);
82 static void block_signal(int signum)
86 memset(&act, 0, sizeof(act));
88 act.sa_handler = SIG_IGN;
89 sigemptyset(&act.sa_mask);
90 sigaddset(&act.sa_mask, signum);
91 sigaction(signum, &act, NULL);
96 send a packet to a client
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
100 client->ctdb->statistics.client_packets_sent++;
101 if (hdr->operation == CTDB_REQ_MESSAGE) {
102 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103 DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
107 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
111 message handler for when we are in daemon mode. This redirects the message
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
115 TDB_DATA data, void *private_data)
117 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118 struct ctdb_req_message *r;
121 /* construct a message to send to the client containing the data */
122 len = offsetof(struct ctdb_req_message, data) + data.dsize;
123 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
124 len, struct ctdb_req_message);
125 CTDB_NO_MEMORY_VOID(ctdb, r);
127 talloc_set_name_const(r, "req_message packet");
130 r->datalen = data.dsize;
131 memcpy(&r->data[0], data.dptr, data.dsize);
133 daemon_queue_send(client, &r->hdr);
139 this is called when the ctdb daemon received a ctdb request to
140 set the srvid from the client
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146 if (client == NULL) {
147 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
150 res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
153 (unsigned long long)srvid));
155 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
156 (unsigned long long)srvid));
163 this is called when the ctdb daemon received a ctdb request to
164 remove a srvid from the client
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169 if (client == NULL) {
170 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
173 return ctdb_deregister_message_handler(ctdb, srvid, client);
178 destroy a ctdb_client
180 static int ctdb_client_destructor(struct ctdb_client *client)
182 struct ctdb_db_context *ctdb_db;
184 ctdb_takeover_client_destructor_hook(client);
185 ctdb_reqid_remove(client->ctdb, client->client_id);
186 if (client->ctdb->statistics.num_clients) {
187 client->ctdb->statistics.num_clients--;
190 if (client->num_persistent_updates != 0) {
191 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197 "commit active. Forcing recovery.\n"));
198 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199 ctdb_db->transaction_active = false;
207 this is called when the ctdb daemon received a ctdb request message
208 from a local client over the unix domain socket
210 static void daemon_request_message_from_client(struct ctdb_client *client,
211 struct ctdb_req_message *c)
216 /* maybe the message is for another client on this node */
217 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
222 /* its for a remote node */
223 data.dptr = &c->data[0];
224 data.dsize = c->datalen;
225 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
228 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
234 struct daemon_call_state {
235 struct ctdb_client *client;
237 struct ctdb_call *call;
238 struct timeval start_time;
242 complete a call from a client
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
247 struct daemon_call_state);
248 struct ctdb_reply_call *r;
251 struct ctdb_client *client = dstate->client;
252 struct ctdb_db_context *ctdb_db = state->ctdb_db;
254 talloc_steal(client, dstate);
255 talloc_steal(dstate, dstate->call);
257 res = ctdb_daemon_call_recv(state, dstate->call);
259 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260 if (client->ctdb->statistics.pending_calls > 0) {
261 client->ctdb->statistics.pending_calls--;
263 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
267 length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
269 length, struct ctdb_reply_call);
271 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272 if (client->ctdb->statistics.pending_calls > 0) {
273 client->ctdb->statistics.pending_calls--;
275 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
278 r->hdr.reqid = dstate->reqid;
279 r->datalen = dstate->call->reply_data.dsize;
280 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282 res = daemon_queue_send(client, &r->hdr);
284 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
286 ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
288 if (client->ctdb->statistics.pending_calls > 0) {
289 client->ctdb->statistics.pending_calls--;
293 struct ctdb_daemon_packet_wrap {
294 struct ctdb_context *ctdb;
299 a wrapper to catch disconnected clients
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
303 struct ctdb_client *client;
304 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
305 struct ctdb_daemon_packet_wrap);
307 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
311 client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312 if (client == NULL) {
313 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
321 daemon_incoming_packet(client, hdr);
326 this is called when the ctdb daemon received a ctdb request call
327 from a local client over the unix domain socket
329 static void daemon_request_call_from_client(struct ctdb_client *client,
330 struct ctdb_req_call *c)
332 struct ctdb_call_state *state;
333 struct ctdb_db_context *ctdb_db;
334 struct daemon_call_state *dstate;
335 struct ctdb_call *call;
336 struct ctdb_ltdb_header header;
339 struct ctdb_context *ctdb = client->ctdb;
340 struct ctdb_daemon_packet_wrap *w;
342 ctdb->statistics.total_calls++;
343 if (client->ctdb->statistics.pending_calls > 0) {
344 ctdb->statistics.pending_calls++;
347 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
349 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
351 if (client->ctdb->statistics.pending_calls > 0) {
352 ctdb->statistics.pending_calls--;
358 key.dsize = c->keylen;
360 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
361 CTDB_NO_MEMORY_VOID(ctdb, w);
364 w->client_id = client->client_id;
366 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
367 (struct ctdb_req_header *)c, &data,
368 daemon_incoming_packet_wrap, w, True);
370 /* will retry later */
371 if (client->ctdb->statistics.pending_calls > 0) {
372 ctdb->statistics.pending_calls--;
380 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
381 if (client->ctdb->statistics.pending_calls > 0) {
382 ctdb->statistics.pending_calls--;
387 dstate = talloc(client, struct daemon_call_state);
388 if (dstate == NULL) {
389 ctdb_ltdb_unlock(ctdb_db, key);
390 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
391 if (client->ctdb->statistics.pending_calls > 0) {
392 ctdb->statistics.pending_calls--;
396 dstate->start_time = timeval_current();
397 dstate->client = client;
398 dstate->reqid = c->hdr.reqid;
399 talloc_steal(dstate, data.dptr);
401 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
403 ctdb_ltdb_unlock(ctdb_db, key);
404 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
405 if (client->ctdb->statistics.pending_calls > 0) {
406 ctdb->statistics.pending_calls--;
408 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
412 call->call_id = c->callid;
414 call->call_data.dptr = c->data + c->keylen;
415 call->call_data.dsize = c->calldatalen;
416 call->flags = c->flags;
418 if (header.dmaster == ctdb->pnn) {
419 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
421 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
424 ctdb_ltdb_unlock(ctdb_db, key);
427 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
428 if (client->ctdb->statistics.pending_calls > 0) {
429 ctdb->statistics.pending_calls--;
431 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
434 talloc_steal(state, dstate);
435 talloc_steal(client, state);
437 state->async.fn = daemon_call_from_client_callback;
438 state->async.private_data = dstate;
442 static void daemon_request_control_from_client(struct ctdb_client *client,
443 struct ctdb_req_control *c);
445 /* data contains a packet from the client */
446 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
448 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
450 struct ctdb_context *ctdb = client->ctdb;
452 /* place the packet as a child of a tmp_ctx. We then use
453 talloc_free() below to free it. If any of the calls want
454 to keep it, then they will steal it somewhere else, and the
455 talloc_free() will be a no-op */
456 tmp_ctx = talloc_new(client);
457 talloc_steal(tmp_ctx, hdr);
459 if (hdr->ctdb_magic != CTDB_MAGIC) {
460 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
464 if (hdr->ctdb_version != CTDB_VERSION) {
465 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
469 switch (hdr->operation) {
471 ctdb->statistics.client.req_call++;
472 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
475 case CTDB_REQ_MESSAGE:
476 ctdb->statistics.client.req_message++;
477 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
480 case CTDB_REQ_CONTROL:
481 ctdb->statistics.client.req_control++;
482 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
486 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
491 talloc_free(tmp_ctx);
495 called when the daemon gets a incoming packet
497 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
499 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
500 struct ctdb_req_header *hdr;
507 client->ctdb->statistics.client_packets_recv++;
509 if (cnt < sizeof(*hdr)) {
510 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
514 hdr = (struct ctdb_req_header *)data;
515 if (cnt != hdr->length) {
516 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
517 (unsigned)hdr->length, (unsigned)cnt);
521 if (hdr->ctdb_magic != CTDB_MAGIC) {
522 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
526 if (hdr->ctdb_version != CTDB_VERSION) {
527 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
531 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
532 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
533 hdr->srcnode, hdr->destnode));
535 /* it is the responsibility of the incoming packet function to free 'data' */
536 daemon_incoming_packet(client, hdr);
540 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
542 if (client_pid->ctdb->client_pids != NULL) {
543 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
550 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
551 uint16_t flags, void *private_data)
553 struct sockaddr_un addr;
556 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
557 struct ctdb_client *client;
558 struct ctdb_client_pid_list *client_pid;
560 struct peercred_struct cr;
561 socklen_t crl = sizeof(struct peercred_struct);
564 socklen_t crl = sizeof(struct ucred);
567 memset(&addr, 0, sizeof(addr));
569 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
575 set_close_on_exec(fd);
577 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
579 client = talloc_zero(ctdb, struct ctdb_client);
581 if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
583 if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
585 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
590 client->client_id = ctdb_reqid_new(ctdb, client);
591 client->pid = cr.pid;
593 client_pid = talloc(client, struct ctdb_client_pid_list);
594 if (client_pid == NULL) {
595 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
600 client_pid->ctdb = ctdb;
601 client_pid->pid = cr.pid;
602 client_pid->client = client;
604 DLIST_ADD(ctdb->client_pids, client_pid);
606 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
607 ctdb_daemon_read_cb, client);
609 talloc_set_destructor(client, ctdb_client_destructor);
610 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
611 ctdb->statistics.num_clients++;
617 create a unix domain socket and bind it
618 return a file descriptor open on the socket
620 static int ux_socket_bind(struct ctdb_context *ctdb)
622 struct sockaddr_un addr;
624 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
625 if (ctdb->daemon.sd == -1) {
629 set_close_on_exec(ctdb->daemon.sd);
630 set_nonblocking(ctdb->daemon.sd);
632 memset(&addr, 0, sizeof(addr));
633 addr.sun_family = AF_UNIX;
634 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
636 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
637 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
641 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
642 chmod(ctdb->daemon.name, 0700) != 0) {
643 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
648 if (listen(ctdb->daemon.sd, 100) != 0) {
649 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
656 close(ctdb->daemon.sd);
657 ctdb->daemon.sd = -1;
661 static void sig_child_handler(struct event_context *ev,
662 struct signal_event *se, int signum, int count,
666 // struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
671 pid = waitpid(-1, &status, WNOHANG);
673 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
677 DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
683 start the protocol going as a daemon
685 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
688 struct fd_event *fde;
689 const char *domain_socket_name;
690 struct signal_event *se;
692 /* get rid of any old sockets */
693 unlink(ctdb->daemon.name);
695 /* create a unix domain stream socket to listen to */
696 res = ux_socket_bind(ctdb);
698 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
702 if (do_fork && fork()) {
706 tdb_reopen_all(False);
711 if (open("/dev/null", O_RDONLY) != 0) {
712 DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
716 block_signal(SIGPIPE);
718 ctdbd_pid = getpid();
721 DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
723 if (ctdb->do_setsched) {
724 /* try to set us up as realtime */
725 ctdb_set_scheduler(ctdb);
728 /* ensure the socket is deleted on exit of the daemon */
729 domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
730 if (domain_socket_name == NULL) {
731 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
735 ctdb->ev = event_context_init(NULL);
737 ctdb_set_child_logging(ctdb);
739 /* force initial recovery for election */
740 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
742 if (strcmp(ctdb->transport, "tcp") == 0) {
743 int ctdb_tcp_init(struct ctdb_context *);
744 ret = ctdb_tcp_init(ctdb);
746 #ifdef USE_INFINIBAND
747 if (strcmp(ctdb->transport, "ib") == 0) {
748 int ctdb_ibw_init(struct ctdb_context *);
749 ret = ctdb_ibw_init(ctdb);
753 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
757 if (ctdb->methods == NULL) {
758 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
759 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
762 /* initialise the transport */
763 if (ctdb->methods->initialise(ctdb) != 0) {
764 ctdb_fatal(ctdb, "transport failed to initialise");
767 /* attach to any existing persistent databases */
768 if (ctdb_attach_persistent(ctdb) != 0) {
769 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");
772 /* start frozen, then let the first election sort things out */
773 if (ctdb_blocking_freeze(ctdb)) {
774 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
777 /* now start accepting clients, only can do this once frozen */
778 fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
779 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
780 ctdb_accept_client, ctdb);
782 /* tell all other nodes we've just started up */
783 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
784 0, CTDB_CONTROL_STARTUP, 0,
785 CTDB_CTRL_FLAG_NOREPLY,
786 tdb_null, NULL, NULL);
788 /* release any IPs we hold from previous runs of the daemon */
789 ctdb_release_all_ips(ctdb);
791 /* start the transport going */
792 ctdb_start_transport(ctdb);
794 /* set up a handler to pick up sigchld */
795 se = event_add_signal(ctdb->ev, ctdb,
800 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
805 if (start_syslog_daemon(ctdb)) {
806 DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
812 /* go into a wait loop to allow other nodes to complete */
813 event_loop_wait(ctdb->ev);
815 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
820 allocate a packet for use in daemon<->daemon communication
822 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
824 enum ctdb_operation operation,
825 size_t length, size_t slength,
829 struct ctdb_req_header *hdr;
831 length = MAX(length, slength);
832 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
834 if (ctdb->methods == NULL) {
835 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
836 operation, (unsigned)length));
840 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
842 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
843 operation, (unsigned)length));
846 talloc_set_name_const(hdr, type);
847 memset(hdr, 0, slength);
848 hdr->length = length;
849 hdr->operation = operation;
850 hdr->ctdb_magic = CTDB_MAGIC;
851 hdr->ctdb_version = CTDB_VERSION;
852 hdr->generation = ctdb->vnn_map->generation;
853 hdr->srcnode = ctdb->pnn;
858 struct daemon_control_state {
859 struct daemon_control_state *next, *prev;
860 struct ctdb_client *client;
861 struct ctdb_req_control *c;
863 struct ctdb_node *node;
867 callback when a control reply comes in
869 static void daemon_control_callback(struct ctdb_context *ctdb,
870 int32_t status, TDB_DATA data,
871 const char *errormsg,
874 struct daemon_control_state *state = talloc_get_type(private_data,
875 struct daemon_control_state);
876 struct ctdb_client *client = state->client;
877 struct ctdb_reply_control *r;
880 /* construct a message to send to the client containing the data */
881 len = offsetof(struct ctdb_reply_control, data) + data.dsize;
883 len += strlen(errormsg);
885 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
886 struct ctdb_reply_control);
887 CTDB_NO_MEMORY_VOID(ctdb, r);
889 r->hdr.reqid = state->reqid;
891 r->datalen = data.dsize;
893 memcpy(&r->data[0], data.dptr, data.dsize);
895 r->errorlen = strlen(errormsg);
896 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
899 daemon_queue_send(client, &r->hdr);
905 fail all pending controls to a disconnected node
907 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
909 struct daemon_control_state *state;
910 while ((state = node->pending_controls)) {
911 DLIST_REMOVE(node->pending_controls, state);
912 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
913 "node is disconnected", state);
918 destroy a daemon_control_state
920 static int daemon_control_destructor(struct daemon_control_state *state)
923 DLIST_REMOVE(state->node->pending_controls, state);
929 this is called when the ctdb daemon received a ctdb request control
930 from a local client over the unix domain socket
932 static void daemon_request_control_from_client(struct ctdb_client *client,
933 struct ctdb_req_control *c)
937 struct daemon_control_state *state;
938 TALLOC_CTX *tmp_ctx = talloc_new(client);
940 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
941 c->hdr.destnode = client->ctdb->pnn;
944 state = talloc(client, struct daemon_control_state);
945 CTDB_NO_MEMORY_VOID(client->ctdb, state);
947 state->client = client;
948 state->c = talloc_steal(state, c);
949 state->reqid = c->hdr.reqid;
950 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
951 state->node = client->ctdb->nodes[c->hdr.destnode];
952 DLIST_ADD(state->node->pending_controls, state);
957 talloc_set_destructor(state, daemon_control_destructor);
959 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
960 talloc_steal(tmp_ctx, state);
963 data.dptr = &c->data[0];
964 data.dsize = c->datalen;
965 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
966 c->srvid, c->opcode, client->client_id,
968 data, daemon_control_callback,
971 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
975 talloc_free(tmp_ctx);
979 register a call function
981 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
982 ctdb_fn_t fn, int id)
984 struct ctdb_registered_call *call;
985 struct ctdb_db_context *ctdb_db;
987 ctdb_db = find_ctdb_db(ctdb, db_id);
988 if (ctdb_db == NULL) {
992 call = talloc(ctdb_db, struct ctdb_registered_call);
996 DLIST_ADD(ctdb_db->calls, call);
1003 this local messaging handler is ugly, but is needed to prevent
1004 recursion in ctdb_send_message() when the destination node is the
1005 same as the source node
1007 struct ctdb_local_message {
1008 struct ctdb_context *ctdb;
1013 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
1014 struct timeval t, void *private_data)
1016 struct ctdb_local_message *m = talloc_get_type(private_data,
1017 struct ctdb_local_message);
1020 res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1022 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
1023 (unsigned long long)m->srvid));
1028 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1030 struct ctdb_local_message *m;
1031 m = talloc(ctdb, struct ctdb_local_message);
1032 CTDB_NO_MEMORY(ctdb, m);
1037 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1038 if (m->data.dptr == NULL) {
1043 /* this needs to be done as an event to prevent recursion */
1044 event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1051 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1052 uint64_t srvid, TDB_DATA data)
1054 struct ctdb_req_message *r;
1057 if (ctdb->methods == NULL) {
1058 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1062 /* see if this is a message to ourselves */
1063 if (pnn == ctdb->pnn) {
1064 return ctdb_local_message(ctdb, srvid, data);
1067 len = offsetof(struct ctdb_req_message, data) + data.dsize;
1068 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1069 struct ctdb_req_message);
1070 CTDB_NO_MEMORY(ctdb, r);
1072 r->hdr.destnode = pnn;
1074 r->datalen = data.dsize;
1075 memcpy(&r->data[0], data.dptr, data.dsize);
1077 ctdb_queue_packet(ctdb, &r->hdr);
1085 struct ctdb_client_notify_list {
1086 struct ctdb_client_notify_list *next, *prev;
1087 struct ctdb_context *ctdb;
1093 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1097 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1099 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1101 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1107 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1109 struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1110 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1111 struct ctdb_client_notify_list *nl;
1113 DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1115 if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1116 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1120 if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1121 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1126 if (client == NULL) {
1127 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1131 for(nl=client->notify; nl; nl=nl->next) {
1132 if (nl->srvid == notify->srvid) {
1137 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1141 nl = talloc(client, struct ctdb_client_notify_list);
1142 CTDB_NO_MEMORY(ctdb, nl);
1144 nl->srvid = notify->srvid;
1145 nl->data.dsize = notify->len;
1146 nl->data.dptr = talloc_size(nl, nl->data.dsize);
1147 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1148 memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1150 DLIST_ADD(client->notify, nl);
1151 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1156 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1158 struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1159 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1160 struct ctdb_client_notify_list *nl;
1162 DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1164 if (client == NULL) {
1165 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1169 for(nl=client->notify; nl; nl=nl->next) {
1170 if (nl->srvid == notify->srvid) {
1175 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1179 DLIST_REMOVE(client->notify, nl);
1180 talloc_set_destructor(nl, NULL);
1186 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1188 struct ctdb_client_pid_list *client_pid;
1190 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1191 if (client_pid->pid == pid) {
1192 return client_pid->client;
1199 /* This control is used by samba when probing if a process (of a samba daemon)
1201 Samba does this when it needs/wants to check if a subrecord in one of the
1202 databases is still valied, or if it is stale and can be removed.
1203 If the node is in unhealthy or stopped state we just kill of the samba
1204 process holding htis sub-record and return to the calling samba that
1205 the process does not exist.
1206 This allows us to forcefully recall subrecords registered by samba processes
1207 on banned and stopped nodes.
1209 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1211 struct ctdb_client *client;
1213 if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1214 client = ctdb_find_client_by_pid(ctdb, pid);
1215 if (client != NULL) {
1216 DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1217 talloc_free(client);
1222 return kill(pid, 0);