4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
51 struct ctdb_client_pid_list {
52 struct ctdb_client_pid_list *next, *prev;
53 struct ctdb_context *ctdb;
55 struct ctdb_client *client;
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
63 static pid_t __ctdbd_pid;
65 static void print_exit_message(void)
67 if (getpid() == __ctdbd_pid) {
68 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
70 /* Wait a second to allow pending log messages to be flushed */
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78 struct timeval t, void *private_data)
80 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
82 if (getpid() != ctdb->ctdbd_pid) {
86 tevent_add_timer(ctdb->ev, ctdb,
87 timeval_current_ofs(1, 0),
88 ctdb_time_tick, ctdb);
91 /* Used to trigger a dummy event once per second, to make
92 * detection of hangs more reliable.
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
96 tevent_add_timer(ctdb->ev, ctdb,
97 timeval_current_ofs(1, 0),
98 ctdb_time_tick, ctdb);
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
103 /* start monitoring for connected/disconnected nodes */
104 ctdb_start_keepalive(ctdb);
106 /* start periodic update of tcp tickle lists */
107 ctdb_start_tcp_tickle_update(ctdb);
109 /* start listening for recovery daemon pings */
110 ctdb_control_recd_ping(ctdb);
112 /* start listening to timer ticks */
113 ctdb_start_time_tickd(ctdb);
116 static void ignore_signal(int signum)
118 struct sigaction act;
120 memset(&act, 0, sizeof(act));
122 act.sa_handler = SIG_IGN;
123 sigemptyset(&act.sa_mask);
124 sigaddset(&act.sa_mask, signum);
125 sigaction(signum, &act, NULL);
130 send a packet to a client
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
134 CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135 if (hdr->operation == CTDB_REQ_MESSAGE) {
136 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137 DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
142 return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
146 message handler for when we are in daemon mode. This redirects the message
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
152 struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153 struct ctdb_req_message_old *r;
156 /* construct a message to send to the client containing the data */
157 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158 r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159 len, struct ctdb_req_message_old);
160 CTDB_NO_MEMORY_VOID(client->ctdb, r);
162 talloc_set_name_const(r, "req_message packet");
165 r->datalen = data.dsize;
166 memcpy(&r->data[0], data.dptr, data.dsize);
168 daemon_queue_send(client, &r->hdr);
174 this is called when the ctdb daemon received a ctdb request to
175 set the srvid from the client
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
179 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
181 if (client == NULL) {
182 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
185 res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
188 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
189 (unsigned long long)srvid));
191 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n",
192 (unsigned long long)srvid));
199 this is called when the ctdb daemon received a ctdb request to
200 remove a srvid from the client
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
204 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205 if (client == NULL) {
206 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
209 return srvid_deregister(ctdb->srv, srvid, client);
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
215 struct ctdb_client *client =
216 talloc_get_type_abort(private_data, struct ctdb_client);
217 struct ctdb_req_tunnel_old *c, *pkt;
220 pkt = (struct ctdb_req_tunnel_old *)data.dptr;
222 len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223 c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224 len, struct ctdb_req_tunnel_old);
226 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
230 talloc_set_name_const(c, "req_tunnel packet");
232 c->tunnel_id = tunnel_id;
233 c->flags = pkt->flags;
234 c->datalen = pkt->datalen;
235 memcpy(c->data, pkt->data, pkt->datalen);
237 daemon_queue_send(client, &c->hdr);
243 destroy a ctdb_client
245 static int ctdb_client_destructor(struct ctdb_client *client)
247 struct ctdb_db_context *ctdb_db;
249 ctdb_takeover_client_destructor_hook(client);
250 reqid_remove(client->ctdb->idr, client->client_id);
251 client->ctdb->num_clients--;
253 if (client->num_persistent_updates != 0) {
254 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
257 ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
259 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260 "commit active. Forcing recovery.\n"));
261 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
264 * trans3 transaction state:
266 * The destructor sets the pointer to NULL.
268 talloc_free(ctdb_db->persistent_state);
276 this is called when the ctdb daemon received a ctdb request message
277 from a local client over the unix domain socket
279 static void daemon_request_message_from_client(struct ctdb_client *client,
280 struct ctdb_req_message_old *c)
285 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
289 /* maybe the message is for another client on this node */
290 if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
295 /* its for a remote node */
296 data.dptr = &c->data[0];
297 data.dsize = c->datalen;
298 res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
301 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
307 struct daemon_call_state {
308 struct ctdb_client *client;
310 struct ctdb_call *call;
311 struct timeval start_time;
313 /* readonly request ? */
314 uint32_t readonly_fetch;
315 uint32_t client_callid;
319 complete a call from a client
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
323 struct daemon_call_state *dstate = talloc_get_type(state->async.private_data,
324 struct daemon_call_state);
325 struct ctdb_reply_call_old *r;
328 struct ctdb_client *client = dstate->client;
329 struct ctdb_db_context *ctdb_db = state->ctdb_db;
331 talloc_steal(client, dstate);
332 talloc_steal(dstate, dstate->call);
334 res = ctdb_daemon_call_recv(state, dstate->call);
336 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
339 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
343 length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344 /* If the client asked for readonly FETCH, we remapped this to
345 FETCH_WITH_HEADER when calling the daemon. So we must
346 strip the extra header off the reply data before passing
347 it back to the client.
349 if (dstate->readonly_fetch
350 && dstate->client_callid == CTDB_FETCH_FUNC) {
351 length -= sizeof(struct ctdb_ltdb_header);
354 r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL,
355 length, struct ctdb_reply_call_old);
357 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
362 r->hdr.reqid = dstate->reqid;
363 r->status = dstate->call->status;
365 if (dstate->readonly_fetch
366 && dstate->client_callid == CTDB_FETCH_FUNC) {
367 /* client only asked for a FETCH so we must strip off
368 the extra ctdb_ltdb header
370 r->datalen = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
373 r->datalen = dstate->call->reply_data.dsize;
374 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
377 res = daemon_queue_send(client, &r->hdr);
379 /* client is dead - return immediately */
383 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
385 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
390 struct ctdb_daemon_packet_wrap {
391 struct ctdb_context *ctdb;
396 a wrapper to catch disconnected clients
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
400 struct ctdb_client *client;
401 struct ctdb_daemon_packet_wrap *w = talloc_get_type(p,
402 struct ctdb_daemon_packet_wrap);
404 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
408 client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409 if (client == NULL) {
410 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
418 daemon_incoming_packet(client, hdr);
421 struct ctdb_deferred_fetch_call {
422 struct ctdb_deferred_fetch_call *next, *prev;
423 struct ctdb_req_call_old *c;
424 struct ctdb_daemon_packet_wrap *w;
427 struct ctdb_deferred_fetch_queue {
428 struct ctdb_deferred_fetch_call *deferred_calls;
431 struct ctdb_deferred_requeue {
432 struct ctdb_deferred_fetch_call *dfc;
433 struct ctdb_client *client;
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438 struct tevent_timer *te,
439 struct timeval t, void *private_data)
441 struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442 struct ctdb_client *client = dfr->client;
444 talloc_steal(client, dfr->dfc->c);
445 daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
449 /* the referral context is destroyed either after a timeout or when the initial
450 fetch-lock has finished.
451 at this stage, immediately start reprocessing the queued up deferred
452 calls so they get reprocessed immediately (and since we are dmaster at
453 this stage, trigger the waiting smbd processes to pick up and aquire the
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
459 /* need to reprocess the packets from the queue explicitely instead of
460 just using a normal destructor since we want, need, to
461 call the clients in the same oder as the requests queued up
463 while (dfq->deferred_calls != NULL) {
464 struct ctdb_client *client;
465 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466 struct ctdb_deferred_requeue *dfr;
468 DLIST_REMOVE(dfq->deferred_calls, dfc);
470 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471 if (client == NULL) {
472 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
477 /* process it by pushing it back onto the eventloop */
478 dfr = talloc(client, struct ctdb_deferred_requeue);
480 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
484 dfr->dfc = talloc_steal(dfr, dfc);
485 dfr->client = client;
487 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488 reprocess_deferred_call, dfr);
494 /* insert the new deferral context into the rb tree.
495 there should never be a pre-existing context here, but check for it
496 warn and destroy the previous context if there is already a deferral context
499 static void *insert_dfq_callback(void *parm, void *data)
502 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
508 /* if the original fetch-lock did not complete within a reasonable time,
509 free the context and context for all deferred requests to cause them to be
510 re-inserted into the event system.
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513 struct timeval t, void *private_data)
515 talloc_free(private_data);
518 /* This function is used in the local daemon to register a KEY in a database
520 While the remote fetch is in-flight, any futher attempts to re-fetch the
521 same record will be deferred until the fetch completes.
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
526 struct ctdb_deferred_fetch_queue *dfq;
528 k = ctdb_key_to_idkey(call, call->key);
530 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
534 dfq = talloc(call, struct ctdb_deferred_fetch_queue);
536 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
540 dfq->deferred_calls = NULL;
542 trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
544 talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
546 /* if the fetch havent completed in 30 seconds, just tear it all down
547 and let it try again as the events are reissued */
548 tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
555 /* check if this is a duplicate request to a fetch already in-flight
556 if it is, make this call deferred to be reprocessed later when
557 the in-flight fetch completes.
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
562 struct ctdb_deferred_fetch_queue *dfq;
563 struct ctdb_deferred_fetch_call *dfc;
565 k = ctdb_key_to_idkey(c, key);
567 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
571 dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
580 dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
582 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
586 dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587 if (dfc->w == NULL) {
588 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
593 dfc->c = talloc_steal(dfc, c);
594 dfc->w->ctdb = ctdb_db->ctdb;
595 dfc->w->client_id = client->client_id;
597 DLIST_ADD_END(dfq->deferred_calls, dfc);
604 this is called when the ctdb daemon received a ctdb request call
605 from a local client over the unix domain socket
607 static void daemon_request_call_from_client(struct ctdb_client *client,
608 struct ctdb_req_call_old *c)
610 struct ctdb_call_state *state;
611 struct ctdb_db_context *ctdb_db;
612 struct daemon_call_state *dstate;
613 struct ctdb_call *call;
614 struct ctdb_ltdb_header header;
617 struct ctdb_context *ctdb = client->ctdb;
618 struct ctdb_daemon_packet_wrap *w;
620 CTDB_INCREMENT_STAT(ctdb, total_calls);
621 CTDB_INCREMENT_STAT(ctdb, pending_calls);
623 ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
625 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
627 CTDB_DECREMENT_STAT(ctdb, pending_calls);
631 if (ctdb_db->unhealthy_reason) {
633 * this is just a warning, as the tdb should be empty anyway,
634 * and only persistent databases can be unhealthy, which doesn't
635 * use this code patch
637 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638 ctdb_db->db_name, ctdb_db->unhealthy_reason));
642 key.dsize = c->keylen;
644 w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645 CTDB_NO_MEMORY_VOID(ctdb, w);
648 w->client_id = client->client_id;
650 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header,
651 (struct ctdb_req_header *)c, &data,
652 daemon_incoming_packet_wrap, w, true);
654 /* will retry later */
655 CTDB_DECREMENT_STAT(ctdb, pending_calls);
662 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663 CTDB_DECREMENT_STAT(ctdb, pending_calls);
668 /* check if this fetch request is a duplicate for a
669 request we already have in flight. If so defer it until
670 the first request completes.
672 if (ctdb->tunable.fetch_collapse == 1) {
673 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674 ret = ctdb_ltdb_unlock(ctdb_db, key);
676 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
678 CTDB_DECREMENT_STAT(ctdb, pending_calls);
679 talloc_free(data.dptr);
684 /* Dont do READONLY if we don't have a tracking database */
685 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686 c->flags &= ~CTDB_WANT_READONLY;
689 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690 header.flags &= ~CTDB_REC_RO_FLAGS;
691 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
696 /* and clear out the tracking data */
697 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
702 /* if we are revoking, we must defer all other calls until the revoke
705 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706 talloc_free(data.dptr);
707 ret = ctdb_ltdb_unlock(ctdb_db, key);
709 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
712 CTDB_DECREMENT_STAT(ctdb, pending_calls);
716 if ((header.dmaster == ctdb->pnn)
717 && (!(c->flags & CTDB_WANT_READONLY))
718 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
720 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
723 ret = ctdb_ltdb_unlock(ctdb_db, key);
725 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726 ctdb_fatal(ctdb, "Failed to start record revoke");
728 talloc_free(data.dptr);
730 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
734 CTDB_DECREMENT_STAT(ctdb, pending_calls);
738 dstate = talloc(client, struct daemon_call_state);
739 if (dstate == NULL) {
740 ret = ctdb_ltdb_unlock(ctdb_db, key);
742 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
745 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746 CTDB_DECREMENT_STAT(ctdb, pending_calls);
749 dstate->start_time = timeval_current();
750 dstate->client = client;
751 dstate->reqid = c->hdr.reqid;
752 talloc_steal(dstate, data.dptr);
754 call = dstate->call = talloc_zero(dstate, struct ctdb_call);
756 ret = ctdb_ltdb_unlock(ctdb_db, key);
758 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
761 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
767 dstate->readonly_fetch = 0;
768 call->call_id = c->callid;
770 call->call_data.dptr = c->data + c->keylen;
771 call->call_data.dsize = c->calldatalen;
772 call->flags = c->flags;
774 if (c->flags & CTDB_WANT_READONLY) {
775 /* client wants readonly record, so translate this into a
776 fetch with header. remember what the client asked for
777 so we can remap the reply back to the proper format for
778 the client in the reply
780 dstate->client_callid = call->call_id;
781 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782 dstate->readonly_fetch = 1;
785 if (header.dmaster == ctdb->pnn) {
786 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
788 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789 if (ctdb->tunable.fetch_collapse == 1) {
790 /* This request triggered a remote fetch-lock.
791 set up a deferral for this key so any additional
792 fetch-locks are deferred until the current one
795 setup_deferred_fetch_locks(ctdb_db, call);
799 ret = ctdb_ltdb_unlock(ctdb_db, key);
801 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
805 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
810 talloc_steal(state, dstate);
811 talloc_steal(client, state);
813 state->async.fn = daemon_call_from_client_callback;
814 state->async.private_data = dstate;
818 static void daemon_request_control_from_client(struct ctdb_client *client,
819 struct ctdb_req_control_old *c);
821 /* data contains a packet from the client */
822 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
824 struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
826 struct ctdb_context *ctdb = client->ctdb;
828 /* place the packet as a child of a tmp_ctx. We then use
829 talloc_free() below to free it. If any of the calls want
830 to keep it, then they will steal it somewhere else, and the
831 talloc_free() will be a no-op */
832 tmp_ctx = talloc_new(client);
833 talloc_steal(tmp_ctx, hdr);
835 if (hdr->ctdb_magic != CTDB_MAGIC) {
836 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
840 if (hdr->ctdb_version != CTDB_PROTOCOL) {
841 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
845 switch (hdr->operation) {
847 CTDB_INCREMENT_STAT(ctdb, client.req_call);
848 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
851 case CTDB_REQ_MESSAGE:
852 CTDB_INCREMENT_STAT(ctdb, client.req_message);
853 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
856 case CTDB_REQ_CONTROL:
857 CTDB_INCREMENT_STAT(ctdb, client.req_control);
858 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
862 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
867 talloc_free(tmp_ctx);
871 called when the daemon gets a incoming packet
873 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
875 struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
876 struct ctdb_req_header *hdr;
883 CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
885 if (cnt < sizeof(*hdr)) {
886 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n",
890 hdr = (struct ctdb_req_header *)data;
891 if (cnt != hdr->length) {
892 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon",
893 (unsigned)hdr->length, (unsigned)cnt);
897 if (hdr->ctdb_magic != CTDB_MAGIC) {
898 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
902 if (hdr->ctdb_version != CTDB_PROTOCOL) {
903 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
907 DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
908 "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
909 hdr->srcnode, hdr->destnode));
911 /* it is the responsibility of the incoming packet function to free 'data' */
912 daemon_incoming_packet(client, hdr);
916 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
918 if (client_pid->ctdb->client_pids != NULL) {
919 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
926 static void ctdb_accept_client(struct tevent_context *ev,
927 struct tevent_fd *fde, uint16_t flags,
930 struct sockaddr_un addr;
933 struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
934 struct ctdb_client *client;
935 struct ctdb_client_pid_list *client_pid;
939 memset(&addr, 0, sizeof(addr));
941 fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
946 ret = set_blocking(fd, false);
950 " failed to set socket non-blocking (%s)\n",
956 set_close_on_exec(fd);
958 DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
960 client = talloc_zero(ctdb, struct ctdb_client);
961 if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
962 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
967 client->client_id = reqid_new(ctdb->idr, client);
968 client->pid = peer_pid;
970 client_pid = talloc(client, struct ctdb_client_pid_list);
971 if (client_pid == NULL) {
972 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
977 client_pid->ctdb = ctdb;
978 client_pid->pid = peer_pid;
979 client_pid->client = client;
981 DLIST_ADD(ctdb->client_pids, client_pid);
983 client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT,
984 ctdb_daemon_read_cb, client,
985 "client-%u", client->pid);
987 talloc_set_destructor(client, ctdb_client_destructor);
988 talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
995 create a unix domain socket and bind it
996 return a file descriptor open on the socket
998 static int ux_socket_bind(struct ctdb_context *ctdb)
1000 struct sockaddr_un addr;
1003 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1004 if (ctdb->daemon.sd == -1) {
1008 memset(&addr, 0, sizeof(addr));
1009 addr.sun_family = AF_UNIX;
1010 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1012 if (! sock_clean(ctdb->daemon.name)) {
1016 set_close_on_exec(ctdb->daemon.sd);
1018 ret = set_blocking(ctdb->daemon.sd, false);
1022 " failed to set socket non-blocking (%s)\n",
1027 if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1028 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1032 if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1033 chmod(ctdb->daemon.name, 0700) != 0) {
1034 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1039 if (listen(ctdb->daemon.sd, 100) != 0) {
1040 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1044 DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1045 ctdb->daemon.name));
1049 close(ctdb->daemon.sd);
1050 ctdb->daemon.sd = -1;
1054 static void initialise_node_flags (struct ctdb_context *ctdb)
1056 if (ctdb->pnn == -1) {
1057 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1060 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1062 /* do we start out in DISABLED mode? */
1063 if (ctdb->start_as_disabled != 0) {
1065 ("This node is configured to start in DISABLED state\n"));
1066 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1068 /* do we start out in STOPPED mode? */
1069 if (ctdb->start_as_stopped != 0) {
1071 ("This node is configured to start in STOPPED state\n"));
1072 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1076 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1080 ctdb_die(ctdb, "Failed to run setup event");
1082 ctdb_run_notification_script(ctdb, "setup");
1084 /* tell all other nodes we've just started up */
1085 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1086 0, CTDB_CONTROL_STARTUP, 0,
1087 CTDB_CTRL_FLAG_NOREPLY,
1088 tdb_null, NULL, NULL);
1090 /* Start the recovery daemon */
1091 if (ctdb_start_recoverd(ctdb) != 0) {
1092 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1096 ctdb_start_periodic_events(ctdb);
1098 ctdb_wait_for_first_recovery(ctdb);
1101 static struct timeval tevent_before_wait_ts;
1102 static struct timeval tevent_after_wait_ts;
1104 static void ctdb_tevent_trace_init(void)
1108 now = timeval_current();
1110 tevent_before_wait_ts = now;
1111 tevent_after_wait_ts = now;
1114 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1117 struct timeval diff;
1119 struct ctdb_context *ctdb =
1120 talloc_get_type(private_data, struct ctdb_context);
1122 if (getpid() != ctdb->ctdbd_pid) {
1126 now = timeval_current();
1129 case TEVENT_TRACE_BEFORE_WAIT:
1130 diff = timeval_until(&tevent_after_wait_ts, &now);
1131 if (diff.tv_sec > 3) {
1133 ("Handling event took %ld seconds!\n",
1134 (long)diff.tv_sec));
1136 tevent_before_wait_ts = now;
1139 case TEVENT_TRACE_AFTER_WAIT:
1140 diff = timeval_until(&tevent_before_wait_ts, &now);
1141 if (diff.tv_sec > 3) {
1143 ("No event for %ld seconds!\n",
1144 (long)diff.tv_sec));
1146 tevent_after_wait_ts = now;
1150 /* Do nothing for future tevent trace points */ ;
1154 static void ctdb_remove_pidfile(void)
1156 TALLOC_FREE(ctdbd_pidfile_ctx);
1159 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1161 if (ctdbd_pidfile != NULL) {
1162 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1163 &ctdbd_pidfile_ctx);
1166 ("Failed to create PID file %s\n",
1171 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1172 atexit(ctdb_remove_pidfile);
1176 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1180 /* initialize the vnn mapping table, skipping any deleted nodes */
1181 ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1182 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1185 for (i = 0; i < ctdb->num_nodes; i++) {
1186 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1191 ctdb->vnn_map->generation = INVALID_GENERATION;
1192 ctdb->vnn_map->size = count;
1193 ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1194 CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1196 for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1197 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1200 ctdb->vnn_map->map[j] = i;
1205 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1209 if (ctdb->address == NULL) {
1211 "Can not determine PNN - node address is not set\n");
1214 nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1217 "Can not determine PNN - node address not found in node list\n");
1220 ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1221 DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1225 start the protocol going as a daemon
1227 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1230 struct tevent_fd *fde;
1232 become_daemon(do_fork, false, false);
1234 ignore_signal(SIGPIPE);
1235 ignore_signal(SIGUSR1);
1237 ctdb->ctdbd_pid = getpid();
1238 DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1239 ctdb_version_string, ctdb->ctdbd_pid));
1240 ctdb_create_pidfile(ctdb);
1242 /* create a unix domain stream socket to listen to */
1243 res = ux_socket_bind(ctdb);
1245 DEBUG(DEBUG_ALERT,("Cannot continue. Exiting!\n"));
1249 /* Make sure we log something when the daemon terminates.
1250 * This must be the first exit handler to run (so the last to
1253 __ctdbd_pid = getpid();
1254 atexit(print_exit_message);
1256 if (ctdb->do_setsched) {
1257 /* try to set us up as realtime */
1258 if (!set_scheduler()) {
1261 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1264 ctdb->ev = tevent_context_init(NULL);
1265 if (ctdb->ev == NULL) {
1266 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1269 tevent_loop_allow_nesting(ctdb->ev);
1270 ctdb_tevent_trace_init();
1271 tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1273 /* set up a handler to pick up sigchld */
1274 if (ctdb_init_sigchld(ctdb) == NULL) {
1275 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1280 ctdb_set_child_logging(ctdb);
1283 TALLOC_FREE(ctdb->srv);
1284 if (srvid_init(ctdb, &ctdb->srv) != 0) {
1285 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1289 TALLOC_FREE(ctdb->tunnels);
1290 if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1291 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1295 /* initialize statistics collection */
1296 ctdb_statistics_init(ctdb);
1298 /* force initial recovery for election */
1299 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1301 if (ctdb_start_eventd(ctdb) != 0) {
1302 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1306 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1307 ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1309 ctdb_die(ctdb, "Failed to run init event\n");
1311 ctdb_run_notification_script(ctdb, "init");
1313 if (strcmp(ctdb->transport, "tcp") == 0) {
1314 ret = ctdb_tcp_init(ctdb);
1316 #ifdef USE_INFINIBAND
1317 if (strcmp(ctdb->transport, "ib") == 0) {
1318 ret = ctdb_ibw_init(ctdb);
1322 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1326 if (ctdb->methods == NULL) {
1327 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1328 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1331 /* Initialise the transport. This sets the node address if it
1332 * was not set via the command-line. */
1333 if (ctdb->methods->initialise(ctdb) != 0) {
1334 ctdb_fatal(ctdb, "transport failed to initialise");
1337 ctdb_set_my_pnn(ctdb);
1339 initialise_node_flags(ctdb);
1341 if (ctdb->public_addresses_file) {
1342 ret = ctdb_set_public_addresses(ctdb, true);
1344 DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1349 ctdb_initialise_vnn_map(ctdb);
1351 /* attach to existing databases */
1352 if (ctdb_attach_databases(ctdb) != 0) {
1353 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1356 /* start frozen, then let the first election sort things out */
1357 if (!ctdb_blocking_freeze(ctdb)) {
1358 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1361 /* now start accepting clients, only can do this once frozen */
1362 fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1363 ctdb_accept_client, ctdb);
1365 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1367 tevent_fd_set_auto_close(fde);
1369 /* Start the transport */
1370 if (ctdb->methods->start(ctdb) != 0) {
1371 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1372 ctdb_fatal(ctdb, "transport failed to start");
1375 /* Recovery daemon and timed events are started from the
1376 * callback, only after the setup event completes
1379 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1380 ret = ctdb_event_script_callback(ctdb,
1382 ctdb_setup_event_callback,
1388 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1392 lockdown_memory(ctdb->valgrinding);
1394 /* go into a wait loop to allow other nodes to complete */
1395 tevent_loop_wait(ctdb->ev);
1397 DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1402 allocate a packet for use in daemon<->daemon communication
1404 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1405 TALLOC_CTX *mem_ctx,
1406 enum ctdb_operation operation,
1407 size_t length, size_t slength,
1411 struct ctdb_req_header *hdr;
1413 length = MAX(length, slength);
1414 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1416 if (ctdb->methods == NULL) {
1417 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1418 operation, (unsigned)length));
1422 hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1424 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1425 operation, (unsigned)length));
1428 talloc_set_name_const(hdr, type);
1429 memset(hdr, 0, slength);
1430 hdr->length = length;
1431 hdr->operation = operation;
1432 hdr->ctdb_magic = CTDB_MAGIC;
1433 hdr->ctdb_version = CTDB_PROTOCOL;
1434 hdr->generation = ctdb->vnn_map->generation;
1435 hdr->srcnode = ctdb->pnn;
1440 struct daemon_control_state {
1441 struct daemon_control_state *next, *prev;
1442 struct ctdb_client *client;
1443 struct ctdb_req_control_old *c;
1445 struct ctdb_node *node;
1449 callback when a control reply comes in
1451 static void daemon_control_callback(struct ctdb_context *ctdb,
1452 int32_t status, TDB_DATA data,
1453 const char *errormsg,
1456 struct daemon_control_state *state = talloc_get_type(private_data,
1457 struct daemon_control_state);
1458 struct ctdb_client *client = state->client;
1459 struct ctdb_reply_control_old *r;
1463 /* construct a message to send to the client containing the data */
1464 len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1466 len += strlen(errormsg);
1468 r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len,
1469 struct ctdb_reply_control_old);
1470 CTDB_NO_MEMORY_VOID(ctdb, r);
1472 r->hdr.reqid = state->reqid;
1474 r->datalen = data.dsize;
1476 memcpy(&r->data[0], data.dptr, data.dsize);
1478 r->errorlen = strlen(errormsg);
1479 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1482 ret = daemon_queue_send(client, &r->hdr);
1489 fail all pending controls to a disconnected node
1491 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1493 struct daemon_control_state *state;
1494 while ((state = node->pending_controls)) {
1495 DLIST_REMOVE(node->pending_controls, state);
1496 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null,
1497 "node is disconnected", state);
1502 destroy a daemon_control_state
1504 static int daemon_control_destructor(struct daemon_control_state *state)
1507 DLIST_REMOVE(state->node->pending_controls, state);
1513 this is called when the ctdb daemon received a ctdb request control
1514 from a local client over the unix domain socket
1516 static void daemon_request_control_from_client(struct ctdb_client *client,
1517 struct ctdb_req_control_old *c)
1521 struct daemon_control_state *state;
1522 TALLOC_CTX *tmp_ctx = talloc_new(client);
1524 if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1525 c->hdr.destnode = client->ctdb->pnn;
1528 state = talloc(client, struct daemon_control_state);
1529 CTDB_NO_MEMORY_VOID(client->ctdb, state);
1531 state->client = client;
1532 state->c = talloc_steal(state, c);
1533 state->reqid = c->hdr.reqid;
1534 if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1535 state->node = client->ctdb->nodes[c->hdr.destnode];
1536 DLIST_ADD(state->node->pending_controls, state);
1541 talloc_set_destructor(state, daemon_control_destructor);
1543 if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1544 talloc_steal(tmp_ctx, state);
1547 data.dptr = &c->data[0];
1548 data.dsize = c->datalen;
1549 res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1550 c->srvid, c->opcode, client->client_id,
1552 data, daemon_control_callback,
1555 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1559 talloc_free(tmp_ctx);
1563 register a call function
1565 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1566 ctdb_fn_t fn, int id)
1568 struct ctdb_registered_call *call;
1569 struct ctdb_db_context *ctdb_db;
1571 ctdb_db = find_ctdb_db(ctdb, db_id);
1572 if (ctdb_db == NULL) {
1576 call = talloc(ctdb_db, struct ctdb_registered_call);
1580 DLIST_ADD(ctdb_db->calls, call);
1587 this local messaging handler is ugly, but is needed to prevent
1588 recursion in ctdb_send_message() when the destination node is the
1589 same as the source node
1591 struct ctdb_local_message {
1592 struct ctdb_context *ctdb;
1597 static void ctdb_local_message_trigger(struct tevent_context *ev,
1598 struct tevent_timer *te,
1599 struct timeval t, void *private_data)
1601 struct ctdb_local_message *m = talloc_get_type(
1602 private_data, struct ctdb_local_message);
1604 srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1608 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1610 struct ctdb_local_message *m;
1611 m = talloc(ctdb, struct ctdb_local_message);
1612 CTDB_NO_MEMORY(ctdb, m);
1617 m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1618 if (m->data.dptr == NULL) {
1623 /* this needs to be done as an event to prevent recursion */
1624 tevent_add_timer(ctdb->ev, m, timeval_zero(),
1625 ctdb_local_message_trigger, m);
1632 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1633 uint64_t srvid, TDB_DATA data)
1635 struct ctdb_req_message_old *r;
1638 if (ctdb->methods == NULL) {
1639 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1643 /* see if this is a message to ourselves */
1644 if (pnn == ctdb->pnn) {
1645 return ctdb_local_message(ctdb, srvid, data);
1648 len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1649 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1650 struct ctdb_req_message_old);
1651 CTDB_NO_MEMORY(ctdb, r);
1653 r->hdr.destnode = pnn;
1655 r->datalen = data.dsize;
1656 memcpy(&r->data[0], data.dptr, data.dsize);
1658 ctdb_queue_packet(ctdb, &r->hdr);
1666 struct ctdb_client_notify_list {
1667 struct ctdb_client_notify_list *next, *prev;
1668 struct ctdb_context *ctdb;
1674 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1678 DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1680 ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1682 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1688 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1690 struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1691 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1692 struct ctdb_client_notify_list *nl;
1694 DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1696 if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1697 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1701 if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1702 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1707 if (client == NULL) {
1708 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1712 for(nl=client->notify; nl; nl=nl->next) {
1713 if (nl->srvid == notify->srvid) {
1718 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1722 nl = talloc(client, struct ctdb_client_notify_list);
1723 CTDB_NO_MEMORY(ctdb, nl);
1725 nl->srvid = notify->srvid;
1726 nl->data.dsize = notify->len;
1727 nl->data.dptr = talloc_memdup(nl, notify->notify_data,
1729 CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1731 DLIST_ADD(client->notify, nl);
1732 talloc_set_destructor(nl, ctdb_client_notify_destructor);
1737 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1739 uint64_t srvid = *(uint64_t *)indata.dptr;
1740 struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1741 struct ctdb_client_notify_list *nl;
1743 DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1745 if (client == NULL) {
1746 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1750 for(nl=client->notify; nl; nl=nl->next) {
1751 if (nl->srvid == srvid) {
1756 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1760 DLIST_REMOVE(client->notify, nl);
1761 talloc_set_destructor(nl, NULL);
1767 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1769 struct ctdb_client_pid_list *client_pid;
1771 for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1772 if (client_pid->pid == pid) {
1773 return client_pid->client;
1780 /* This control is used by samba when probing if a process (of a samba daemon)
1782 Samba does this when it needs/wants to check if a subrecord in one of the
1783 databases is still valid, or if it is stale and can be removed.
1784 If the node is in unhealthy or stopped state we just kill of the samba
1785 process holding this sub-record and return to the calling samba that
1786 the process does not exist.
1787 This allows us to forcefully recall subrecords registered by samba processes
1788 on banned and stopped nodes.
1790 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1792 struct ctdb_client *client;
1794 client = ctdb_find_client_by_pid(ctdb, pid);
1795 if (client == NULL) {
1799 if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1801 ("Killing client with pid:%d on banned/stopped node\n",
1803 talloc_free(client);
1807 return kill(pid, 0);
1810 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1813 struct ctdb_client_pid_list *client_pid;
1818 pid = *(pid_t *)indata.dptr;
1819 srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1821 for (client_pid = ctdb->client_pids;
1823 client_pid = client_pid->next) {
1824 if (client_pid->pid == pid) {
1825 ret = srvid_exists(ctdb->srv, srvid,
1826 client_pid->client);
1836 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1838 struct ctdb_node_map_old *node_map = NULL;
1840 CHECK_CONTROL_DATA_SIZE(0);
1842 node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1843 if (node_map == NULL) {
1844 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1848 outdata->dptr = (unsigned char *)node_map;
1849 outdata->dsize = talloc_get_size(outdata->dptr);
1854 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1856 if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1857 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1861 DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1862 ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1863 ctdb_stop_recoverd(ctdb);
1864 ctdb_stop_keepalive(ctdb);
1865 ctdb_stop_monitoring(ctdb);
1866 ctdb_release_all_ips(ctdb);
1867 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1868 ctdb_stop_eventd(ctdb);
1869 if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1870 ctdb->methods->shutdown(ctdb);
1873 DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1877 /* When forking the main daemon and the child process needs to connect
1878 * back to the daemon as a client process, this function can be used
1879 * to change the ctdb context from daemon into client mode. The child
1880 * process must be created using ctdb_fork() and not fork() -
1881 * ctdb_fork() does some necessary housekeeping.
1883 int switch_from_server_to_client(struct ctdb_context *ctdb)
1887 /* get a new event context */
1888 ctdb->ev = tevent_context_init(ctdb);
1889 if (ctdb->ev == NULL) {
1890 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1893 tevent_loop_allow_nesting(ctdb->ev);
1895 /* Connect to main CTDB daemon */
1896 ret = ctdb_socket_connect(ctdb);
1898 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1902 ctdb->can_send_controls = true;