Dont return error if trying to set db priority on a db that does not yet exist.
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
index 47a2d6a693c2d2c488391a86ecdf53cc80bee1a1..07fdec0d4459b3434ff6da4e8c313138550f4ba0 100644 (file)
@@ -18,7 +18,7 @@
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
@@ -141,7 +141,6 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
        /* now the contended path */
        h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
        if (h == NULL) {
-               tdb_chainunlock(tdb, key);
                return -1;
        }
 
@@ -170,7 +169,11 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
        if (ret == 0) {
                ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
                if (ret != 0) {
-                       ctdb_ltdb_unlock(ctdb_db, key);
+                       int uret;
+                       uret = ctdb_ltdb_unlock(ctdb_db, key);
+                       if (uret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
+                       }
                }
        }
        return ret;
@@ -510,7 +513,8 @@ int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
   return 0 on success, -1 on failure
  */
 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
-                            bool persistent, const char *unhealthy_reason)
+                            bool persistent, const char *unhealthy_reason,
+                            bool jenkinshash)
 {
        struct ctdb_db_context *ctdb_db, *tmp_db;
        int ret;
@@ -589,10 +593,13 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                                           db_name, ctdb->pnn);
 
        tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
-       if (!ctdb->do_setsched) {
+       if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
        tdb_flags |= TDB_DISALLOW_NESTING;
+       if (jenkinshash) {
+               tdb_flags |= TDB_INCOMPATIBLE_HASH;
+       }
 
 again:
        ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
@@ -738,32 +745,116 @@ again:
 }
 
 
+struct ctdb_deferred_attach_context {
+       struct ctdb_deferred_attach_context *next, *prev;
+       struct ctdb_context *ctdb;
+       struct ctdb_req_control *c;
+};
+
+
+static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
+{
+       DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
+
+       return 0;
+}
+
+static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
+       struct ctdb_context *ctdb = da_ctx->ctdb;
+
+       ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
+       talloc_free(da_ctx);
+}
+
+static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
+       struct ctdb_context *ctdb = da_ctx->ctdb;
+
+       /* This talloc-steals the packet ->c */
+       ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
+       talloc_free(da_ctx);
+}
+
+int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
+{
+       struct ctdb_deferred_attach_context *da_ctx;
+
+       /* call it from the main event loop as soon as the current event 
+          finishes.
+        */
+       while ((da_ctx = ctdb->deferred_attach) != NULL) {
+               DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
+               event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+       }
+
+       return 0;
+}
+
 /*
   a client has asked to attach a new database
  */
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                               TDB_DATA *outdata, uint64_t tdb_flags, 
-                              bool persistent)
+                              bool persistent, uint32_t client_id,
+                              struct ctdb_req_control *c,
+                              bool *async_reply)
 {
        const char *db_name = (const char *)indata.dptr;
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
 
+       /* dont allow any local clients to attach while we are in recovery mode
+        * except for the recovery daemon.
+        * allow all attach from the network since these are always from remote
+        * recovery daemons.
+        */
+       if (client_id != 0) {
+               struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+
+               if (client == NULL) {
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused. Can not match clientid:%d to a client structure.\n", db_name, client_id));
+                       return -1;
+               }
+
+               /* If the node is inactive it is not part of the cluster
+                  and we should not allow clients to attach to any
+                  databases
+               */
+               if (node->flags & NODE_FLAGS_INACTIVE) {
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+                       return -1;
+               }
+
+               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
+                && client->pid != ctdb->recoverd_pid) {
+                       struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
+
+                       if (da_ctx == NULL) {
+                               DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
+                               return -1;
+                       }
+
+                       da_ctx->ctdb = ctdb;
+                       da_ctx->c = talloc_steal(da_ctx, c);
+                       talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
+                       DLIST_ADD(ctdb->deferred_attach, da_ctx);
+
+                       event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
+
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
+                       *async_reply = true;
+                       return 0;
+               }
+       }
+
        /* the client can optionally pass additional tdb flags, but we
           only allow a subset of those on the database in ctdb. Note
           that tdb_flags is passed in via the (otherwise unused)
           srvid to the attach control */
-       tdb_flags &= TDB_NOSYNC;
-
-       /* If the node is inactive it is not part of the cluster
-          and we should not allow clients to attach to any
-          databases
-       */
-       if (node->flags & NODE_FLAGS_INACTIVE) {
-               DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
-               return -1;
-       }
-
+       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
 
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
@@ -774,7 +865,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return 0;
        }
 
-       if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
+       if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
                return -1;
        }
 
@@ -790,8 +881,11 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        outdata->dptr  = (uint8_t *)&db->db_id;
        outdata->dsize = sizeof(db->db_id);
 
+       /* Try to ensure it's locked in mem */
+       ctdb_lockdown_memory(ctdb);
+
        /* tell all the other nodes about this database */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
                                 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
                                                CTDB_CONTROL_DB_ATTACH,
                                 0, CTDB_CTRL_FLAG_NOREPLY,
@@ -847,7 +941,7 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                }
                p[4] = 0;
 
-               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
+               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
                        closedir(d);
                        talloc_free(s);
@@ -1101,12 +1195,12 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
        ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
        if (!ctdb_db) {
                DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
-               return -1;
+               return 0;
        }
 
        if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
                DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
-               return -1;
+               return 0;
        }
 
        ctdb_db->priority = db_prio->priority;