during shutdown there is a window after we have stopped TCP and disconnected from...
[metze/ctdb/wip.git] / server / ctdb_daemon.c
index c72ef83fead6bedf93b9069ca76f844119603445..5eca7275c685dd5cdf3524e3abf1a3d00a7ba6d1 100644 (file)
 #include "includes.h"
 #include "db_wrap.h"
 #include "lib/tdb/include/tdb.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/wait.h"
-#include "../include/ctdb.h"
+#include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
 #include <sys/socket.h>
 
@@ -97,11 +97,12 @@ static void block_signal(int signum)
  */
 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
 {
-       client->ctdb->statistics.client_packets_sent++;
+       CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
        if (hdr->operation == CTDB_REQ_MESSAGE) {
                if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
-                       DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
-                       return 0;
+                       DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
+                       talloc_free(client);
+                       return -1;
                }
        }
        return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
@@ -134,7 +135,6 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
 
        talloc_free(r);
 }
-                                          
 
 /*
   this is called when the ctdb daemon received a ctdb request to 
@@ -184,9 +184,7 @@ static int ctdb_client_destructor(struct ctdb_client *client)
 
        ctdb_takeover_client_destructor_hook(client);
        ctdb_reqid_remove(client->ctdb, client->client_id);
-       if (client->ctdb->statistics.num_clients) {
-               client->ctdb->statistics.num_clients--;
-       }
+       CTDB_DECREMENT_STAT(client->ctdb, num_clients);
 
        if (client->num_persistent_updates != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
@@ -258,10 +256,9 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
        res = ctdb_daemon_call_recv(state, dstate->call);
        if (res != 0) {
                DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       client->ctdb->statistics.pending_calls--;
-               }
-               ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
+
+               CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
                return;
        }
 
@@ -270,10 +267,8 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
                               length, struct ctdb_reply_call);
        if (r == NULL) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       client->ctdb->statistics.pending_calls--;
-               }
-               ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
+               CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
                return;
        }
        r->hdr.reqid        = dstate->reqid;
@@ -281,14 +276,16 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
        memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
 
        res = daemon_queue_send(client, &r->hdr);
+       if (res == -1) {
+               /* client is dead - return immediately */
+               return;
+       }
        if (res != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
        }
-       ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
+       CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
+       CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
        talloc_free(dstate);
-       if (client->ctdb->statistics.pending_calls > 0) {
-               client->ctdb->statistics.pending_calls--;
-       }
 }
 
 struct ctdb_daemon_packet_wrap {
@@ -340,21 +337,27 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
        struct ctdb_context *ctdb = client->ctdb;
        struct ctdb_daemon_packet_wrap *w;
 
-       ctdb->statistics.total_calls++;
-       if (client->ctdb->statistics.pending_calls > 0) {
-               ctdb->statistics.pending_calls++;
-       }
+       CTDB_INCREMENT_STAT(ctdb, total_calls);
+       CTDB_DECREMENT_STAT(ctdb, pending_calls);
 
        ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
        if (!ctdb_db) {
                DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
                          c->db_id));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       ctdb->statistics.pending_calls--;
-               }
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
 
+       if (ctdb_db->unhealthy_reason) {
+               /*
+                * this is just a warning, as the tdb should be empty anyway,
+                * and only persistent databases can be unhealthy, which doesn't
+                * use this code patch
+                */
+               DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
+                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
+       }
+
        key.dptr = c->data;
        key.dsize = c->keylen;
 
@@ -369,9 +372,7 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                                           daemon_incoming_packet_wrap, w, True);
        if (ret == -2) {
                /* will retry later */
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       ctdb->statistics.pending_calls--;
-               }
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
 
@@ -379,19 +380,19 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       ctdb->statistics.pending_calls--;
-               }
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
 
        dstate = talloc(client, struct daemon_call_state);
        if (dstate == NULL) {
-               ctdb_ltdb_unlock(ctdb_db, key);
-               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       ctdb->statistics.pending_calls--;
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
+
+               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
                return;
        }
        dstate->start_time = timeval_current();
@@ -401,12 +402,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 
        call = dstate->call = talloc_zero(dstate, struct ctdb_call);
        if (call == NULL) {
-               ctdb_ltdb_unlock(ctdb_db, key);
-               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       ctdb->statistics.pending_calls--;
+               ret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
                }
-               ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
+
+               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
+               CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
                return;
        }
 
@@ -422,14 +425,15 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
        }
 
-       ctdb_ltdb_unlock(ctdb_db, key);
+       ret = ctdb_ltdb_unlock(ctdb_db, key);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+       }
 
        if (state == NULL) {
                DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
-               if (client->ctdb->statistics.pending_calls > 0) {
-                       ctdb->statistics.pending_calls--;
-               }
-               ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
+               CTDB_DECREMENT_STAT(ctdb, pending_calls);
+               CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
                return;
        }
        talloc_steal(state, dstate);
@@ -469,17 +473,17 @@ static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
 
        switch (hdr->operation) {
        case CTDB_REQ_CALL:
-               ctdb->statistics.client.req_call++;
+               CTDB_INCREMENT_STAT(ctdb, client.req_call);
                daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
                break;
 
        case CTDB_REQ_MESSAGE:
-               ctdb->statistics.client.req_message++;
+               CTDB_INCREMENT_STAT(ctdb, client.req_message);
                daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
                break;
 
        case CTDB_REQ_CONTROL:
-               ctdb->statistics.client.req_control++;
+               CTDB_INCREMENT_STAT(ctdb, client.req_control);
                daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
                break;
 
@@ -505,7 +509,7 @@ static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
                return;
        }
 
-       client->ctdb->statistics.client_packets_recv++;
+       CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
 
        if (cnt < sizeof(*hdr)) {
                ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
@@ -583,7 +587,7 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
 #else
        if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
 #endif
-               DEBUG(DEBUG_ERR,("Connected client with pid:%u\n", (unsigned)cr.pid));
+               DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
        }
 
        client->ctdb = ctdb;
@@ -605,11 +609,12 @@ static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde,
        DLIST_ADD(ctdb->client_pids, client_pid);
 
        client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
-                                        ctdb_daemon_read_cb, client);
+                                        ctdb_daemon_read_cb, client,
+                                        "client-%u", client->pid);
 
        talloc_set_destructor(client, ctdb_client_destructor);
        talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
-       ctdb->statistics.num_clients++;
+       CTDB_INCREMENT_STAT(ctdb, num_clients);
 }
 
 
@@ -680,6 +685,22 @@ static void sig_child_handler(struct event_context *ev,
        }
 }
 
+static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
+                                     void *private_data)
+{
+       if (status != 0) {
+               ctdb_fatal(ctdb, "Failed to run setup event\n");
+               return;
+       }
+       ctdb_run_notification_script(ctdb, "setup");
+
+       /* tell all other nodes we've just started up */
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
+                                0, CTDB_CONTROL_STARTUP, 0,
+                                CTDB_CTRL_FLAG_NOREPLY,
+                                tdb_null, NULL, NULL);
+}
+
 /*
   start the protocol going as a daemon
 */
@@ -721,10 +742,7 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
 
        DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
 
-       if (ctdb->do_setsched) {
-               /* try to set us up as realtime */
-               ctdb_set_scheduler(ctdb);
-       }
+       ctdb_high_priority(ctdb);
 
        /* ensure the socket is deleted on exit of the daemon */
        domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
@@ -734,9 +752,18 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
        }
 
        ctdb->ev = event_context_init(NULL);
+       tevent_loop_allow_nesting(ctdb->ev);
+       ret = ctdb_init_tevent_logging(ctdb);
+       if (ret != 0) {
+               DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
+               exit(1);
+       }
 
        ctdb_set_child_logging(ctdb);
 
+       /* initialize statistics collection */
+       ctdb_statistics_init(ctdb);
+
        /* force initial recovery for election */
        ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
 
@@ -765,10 +792,16 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
                ctdb_fatal(ctdb, "transport failed to initialise");
        }
 
-       /* attach to any existing persistent databases */
-       if (ctdb_attach_persistent(ctdb) != 0) {
-               ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
+       /* attach to existing databases */
+       if (ctdb_attach_databases(ctdb) != 0) {
+               ctdb_fatal(ctdb, "Failed to attach to databases\n");
+       }
+
+       ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
+       if (ret != 0) {
+               ctdb_fatal(ctdb, "Failed to run init event\n");
        }
+       ctdb_run_notification_script(ctdb, "init");
 
        /* start frozen, then let the first election sort things out */
        if (ctdb_blocking_freeze(ctdb)) {
@@ -777,14 +810,9 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
 
        /* now start accepting clients, only can do this once frozen */
        fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
-                          EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
+                          EVENT_FD_READ,
                           ctdb_accept_client, ctdb);
-
-       /* tell all other nodes we've just started up */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
-                                0, CTDB_CONTROL_STARTUP, 0,
-                                CTDB_CTRL_FLAG_NOREPLY,
-                                tdb_null, NULL, NULL);
+       tevent_fd_set_auto_close(fde);
 
        /* release any IPs we hold from previous runs of the daemon */
        ctdb_release_all_ips(ctdb);
@@ -802,6 +830,18 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
                exit(1);
        }
 
+       ret = ctdb_event_script_callback(ctdb,
+                                        ctdb,
+                                        ctdb_setup_event_callback,
+                                        ctdb,
+                                        false,
+                                        CTDB_EVENT_SETUP,
+                                        "");
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
+               exit(1);
+       }
+
        if (use_syslog) {
                if (start_syslog_daemon(ctdb)) {
                        DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
@@ -809,6 +849,7 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
                }
        }
 
+       ctdb_lockdown_memory(ctdb);
          
        /* go into a wait loop to allow other nodes to complete */
        event_loop_wait(ctdb->ev);
@@ -833,7 +874,7 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
        size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
+               DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
                         operation, (unsigned)length));
                return NULL;
        }
@@ -877,6 +918,7 @@ static void daemon_control_callback(struct ctdb_context *ctdb,
        struct ctdb_client *client = state->client;
        struct ctdb_reply_control *r;
        size_t len;
+       int ret;
 
        /* construct a message to send to the client containing the data */
        len = offsetof(struct ctdb_reply_control, data) + data.dsize;
@@ -897,9 +939,10 @@ static void daemon_control_callback(struct ctdb_context *ctdb,
                memcpy(&r->data[r->datalen], errormsg, r->errorlen);
        }
 
-       daemon_queue_send(client, &r->hdr);
-
-       talloc_free(state);
+       ret = daemon_queue_send(client, &r->hdr);
+       if (ret != -1) {
+               talloc_free(state);
+       }
 }
 
 /*
@@ -1056,7 +1099,7 @@ int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
        int len;
 
        if (ctdb->methods == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
+               DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
                return -1;
        }
 
@@ -1111,7 +1154,7 @@ int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_
         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
        struct ctdb_client_notify_list *nl;
 
-       DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
+       DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
 
        if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
                DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
@@ -1160,7 +1203,7 @@ int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t clien
         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
        struct ctdb_client_notify_list *nl;
 
-       DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
+       DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
 
         if (client == NULL) {
                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
@@ -1184,3 +1227,41 @@ int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t clien
        return 0;
 }
 
+struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
+{
+       struct ctdb_client_pid_list *client_pid;
+
+       for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
+               if (client_pid->pid == pid) {
+                       return client_pid->client;
+               }
+       }
+       return NULL;
+}
+
+
+/* This control is used by samba when probing if a process (of a samba daemon)
+   exists on the node.
+   Samba does this when it needs/wants to check if a subrecord in one of the
+   databases is still valied, or if it is stale and can be removed.
+   If the node is in unhealthy or stopped state we just kill of the samba
+   process holding htis sub-record and return to the calling samba that
+   the process does not exist.
+   This allows us to forcefully recall subrecords registered by samba processes
+   on banned and stopped nodes.
+*/
+int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
+{
+        struct ctdb_client *client;
+
+       if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
+               client = ctdb_find_client_by_pid(ctdb, pid);
+               if (client != NULL) {
+                       DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
+                       talloc_free(client);
+               }
+               return -1;
+       }
+
+       return kill(pid, 0);
+}