ctdb-daemon: Add tracking of migration records
[samba.git] / ctdb / server / ctdb_ltdb_server.c
index a93e2fa0c950a7cc3c50f2a838c3ff8a724a3ddf..677078be6a87e3d3b29b9b56c4be2b32e02d274c 100644 (file)
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "lib/tevent/tevent.h"
-#include "lib/tdb/include/tdb.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/dir.h"
 #include "system/time.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
-#include "db_wrap.h"
-#include "lib/util/dlinklist.h"
-#include <ctype.h>
+#include "system/locale.h"
 
-#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
+#include <talloc.h>
+#include <tevent.h>
 
-/*
-  this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
-       return 0;
-}
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
 
-/*
-  this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
-{
-       call->reply_data = &call->record_data;
-       return 0;
-}
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/rb_tree.h"
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
 
+#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
 
 /**
  * write a record to a normal database
@@ -64,22 +57,31 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                                  TDB_DATA data)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
-       TDB_DATA rec;
+       TDB_DATA rec[2];
+       uint32_t hsize = sizeof(struct ctdb_ltdb_header);
        int ret;
        bool seqnum_suppressed = false;
        bool keep = false;
        bool schedule_for_deletion = false;
+       bool remove_from_delete_queue = false;
        uint32_t lmaster;
 
        if (ctdb->flags & CTDB_FLAG_TORTURE) {
+               TDB_DATA old;
                struct ctdb_ltdb_header *h2;
-               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
-               h2 = (struct ctdb_ltdb_header *)rec.dptr;
-               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
-                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
-                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+               h2 = (struct ctdb_ltdb_header *)old.dptr;
+               if (old.dptr != NULL &&
+                   old.dsize >= hsize &&
+                   h2->rsn > header->rsn) {
+                       DEBUG(DEBUG_ERR,
+                             ("RSN regression! %"PRIu64" %"PRIu64"\n",
+                              h2->rsn, header->rsn));
+               }
+               if (old.dptr) {
+                       free(old.dptr);
                }
-               if (rec.dptr) free(rec.dptr);
        }
 
        if (ctdb->vnn_map == NULL) {
@@ -100,6 +102,8 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
         */
        if (data.dsize != 0) {
                keep = true;
+       } else if (header->flags & CTDB_REC_RO_FLAGS) {
+               keep = true;
        } else if (ctdb_db->persistent) {
                keep = true;
        } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
@@ -140,12 +144,18 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                keep = true;
        }
 
-       if (keep &&
-           (data.dsize == 0) &&
-           !ctdb_db->persistent &&
-           (ctdb_db->ctdb->pnn == header->dmaster))
-       {
-               schedule_for_deletion = true;
+       if (keep) {
+               if (!ctdb_db->persistent &&
+                   (ctdb_db->ctdb->pnn == header->dmaster) &&
+                   !(header->flags & CTDB_REC_RO_FLAGS))
+               {
+                       header->rsn++;
+
+                       if (data.dsize == 0) {
+                               schedule_for_deletion = true;
+                       }
+               }
+               remove_from_delete_queue = !schedule_for_deletion;
        }
 
 store:
@@ -176,12 +186,11 @@ store:
         */
        header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
 
-       rec.dsize = sizeof(*header) + data.dsize;
-       rec.dptr = talloc_size(ctdb, rec.dsize);
-       CTDB_NO_MEMORY(ctdb, rec.dptr);
+       rec[0].dsize = hsize;
+       rec[0].dptr = (uint8_t *)header;
 
-       memcpy(rec.dptr, header, sizeof(*header));
-       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+       rec[1].dsize = data.dsize;
+       rec[1].dptr = data.dptr;
 
        /* Databases with seqnum updates enabled only get their seqnum
           changes when/if we modify the data */
@@ -189,14 +198,14 @@ store:
                TDB_DATA old;
                old = tdb_fetch(ctdb_db->ltdb->tdb, key);
 
-               if ( (old.dsize == rec.dsize)
-               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+               if ((old.dsize == hsize + data.dsize) &&
+                   memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
                        tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
                        seqnum_suppressed = true;
                }
-               if (old.dptr) free(old.dptr);
+               if (old.dptr != NULL) {
+                       free(old.dptr);
+               }
        }
 
        DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
@@ -205,7 +214,7 @@ store:
                            ctdb_hash(&key)));
 
        if (keep) {
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+               ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
        } else {
                ret = tdb_delete(ctdb_db->ltdb->tdb, key);
        }
@@ -226,26 +235,30 @@ store:
                            tdb_errorstr(ctdb_db->ltdb->tdb)));
 
                schedule_for_deletion = false;
+               remove_from_delete_queue = false;
        }
        if (seqnum_suppressed) {
                tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
        }
 
-       talloc_free(rec.dptr);
-
        if (schedule_for_deletion) {
                int ret2;
                ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
-               if (ret != 0) {
+               if (ret2 != 0) {
                        DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
                }
        }
 
+       if (remove_from_delete_queue) {
+               ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
+       }
+
        return ret;
 }
 
 struct lock_fetch_state {
        struct ctdb_context *ctdb;
+       struct ctdb_db_context *ctdb_db;
        void (*recv_pkt)(void *, struct ctdb_req_header *);
        void *recv_context;
        struct ctdb_req_header *hdr;
@@ -256,11 +269,11 @@ struct lock_fetch_state {
 /*
   called when we should retry the operation
  */
-static void lock_fetch_callback(void *p)
+static void lock_fetch_callback(void *p, bool locked)
 {
        struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
        if (!state->ignore_generation &&
-           state->generation != state->ctdb->vnn_map->generation) {
+           state->generation != state->ctdb_db->generation) {
                DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
                talloc_free(state->hdr);
                return;
@@ -279,9 +292,9 @@ static void lock_fetch_callback(void *p)
    1) tries to get the chainlock. If it succeeds, then it returns 0
 
    2) if it fails to get a chainlock immediately then it sets up a
-   non-blocking chainlock via ctdb_lockwait, and when it gets the
+   non-blocking chainlock via ctdb_lock_record, and when it gets the
    chainlock it re-submits this ctdb request to the main packet
-   receive function
+   receive function.
 
    This effectively queues all ctdb requests that cannot be
    immediately satisfied until it can get the lock. This means that
@@ -301,7 +314,7 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
 {
        int ret;
        struct tdb_context *tdb = ctdb_db->ltdb->tdb;
-       struct lockwait_handle *h;
+       struct lock_request *lreq;
        struct lock_fetch_state *state;
        
        ret = tdb_chainlock_nonblock(tdb, key);
@@ -326,22 +339,22 @@ int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
 
        state = talloc(hdr, struct lock_fetch_state);
        state->ctdb = ctdb_db->ctdb;
+       state->ctdb_db = ctdb_db;
        state->hdr = hdr;
        state->recv_pkt = recv_pkt;
        state->recv_context = recv_context;
-       state->generation = ctdb_db->ctdb->vnn_map->generation;
+       state->generation = ctdb_db->generation;
        state->ignore_generation = ignore_generation;
 
        /* now the contended path */
-       h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
-       if (h == NULL) {
+       lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
+       if (lreq == NULL) {
                return -1;
        }
 
        /* we need to move the packet off the temporary context in ctdb_input_pkt(),
           so it won't be freed yet */
        talloc_steal(state, hdr);
-       talloc_steal(state, h);
 
        /* now tell the caller than we will retry asynchronously */
        return -2;
@@ -622,8 +635,8 @@ int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
                                   ctdb_db->db_path,
                                   ctdb_db->unhealthy_reason));
        }
-       DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
-             ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
+       DEBUG(DEBUG_NOTICE,
+             ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
               ok, fail));
 
        if (fail != 0) {
@@ -662,7 +675,7 @@ int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
                return -1;
        }
 
-       if (may_recover && !ctdb->done_startup) {
+       if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
                DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
                                  ctdb_db->db_name));
                ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
@@ -702,13 +715,52 @@ int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
        return 0;
 }
 
+
+int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+       char *ropath;
+
+       if (ctdb_db->readonly) {
+               return 0;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
+               return -1;
+       }
+
+       ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
+       if (ropath == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
+               return -1;
+       }
+       ctdb_db->rottdb = tdb_open(ropath, 
+                             ctdb->tunable.database_hash_size, 
+                             TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
+                             O_CREAT|O_RDWR, 0600);
+       if (ctdb_db->rottdb == NULL) {
+               DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
+               talloc_free(ropath);
+               return -1;
+       }
+
+       DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
+
+       ctdb_db->readonly = true;
+
+       DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
+
+       talloc_free(ropath);
+       return 0;
+}
+
 /*
   attach to a database, handling both persistent and non-persistent databases
   return 0 on success, -1 on failure
  */
 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                             bool persistent, const char *unhealthy_reason,
-                            bool jenkinshash)
+                            bool jenkinshash, bool mutexes)
 {
        struct ctdb_db_context *ctdb_db, *tmp_db;
        int ret;
@@ -720,7 +772,6 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
        CTDB_NO_MEMORY(ctdb, ctdb_db);
 
-       ctdb_db->priority = 1;
        ctdb_db->ctdb = ctdb;
        ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
        CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
@@ -764,7 +815,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                if (ctdb->max_persistent_check_errors > 0) {
                        remaining_tries = 1;
                }
-               if (ctdb->done_startup) {
+               if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
                        remaining_tries = 0;
                }
 
@@ -803,9 +854,15 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        if (jenkinshash) {
                tdb_flags |= TDB_INCOMPATIBLE_HASH;
        }
+#ifdef TDB_MUTEX_LOCKING
+       if (ctdb->tunable.mutex_enabled && mutexes &&
+           tdb_runtime_check_for_robust_mutexes()) {
+               tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+       }
+#endif
 
 again:
-       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
+       ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
                                      ctdb->tunable.database_hash_size, 
                                      tdb_flags, 
                                      O_CREAT|O_RDWR, mode);
@@ -905,6 +962,25 @@ again:
                }
        }
 
+       /* set up a rb tree we can use to track which records we have a 
+          fetch-lock in-flight for so we can defer any additional calls
+          for the same record.
+        */
+       ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+       if (ctdb_db->deferred_fetch == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
+       ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
+       if (ctdb_db->defer_dmaster == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
+                                 ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
        /* setting this can help some high churn databases */
@@ -932,6 +1008,17 @@ again:
                return -1;
        }
 
+       /* 
+          all databases support the "fetch_with_header" function. we need this
+          for efficient readonly record fetches
+       */
+       ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
+       if (ret != 0) {
+               DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        ret = ctdb_vacuum_init(ctdb_db);
        if (ret != 0) {
                DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
@@ -940,9 +1027,20 @@ again:
                return -1;
        }
 
+       ret = ctdb_migration_init(ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to setup migration tracking for db '%s'\n",
+                      ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
+       ctdb_db->generation = ctdb->vnn_map->generation;
+
+       DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
+                           ctdb_db->db_path, tdb_flags));
 
-       DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
-       
        /* success */
        return 0;
 }
@@ -951,7 +1049,7 @@ again:
 struct ctdb_deferred_attach_context {
        struct ctdb_deferred_attach_context *next, *prev;
        struct ctdb_context *ctdb;
-       struct ctdb_req_control *c;
+       struct ctdb_req_control_old *c;
 };
 
 
@@ -962,7 +1060,9 @@ static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *
        return 0;
 }
 
-static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
+                                        struct tevent_timer *te,
+                                        struct timeval t, void *private_data)
 {
        struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
        struct ctdb_context *ctdb = da_ctx->ctdb;
@@ -971,7 +1071,9 @@ static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_
        talloc_free(da_ctx);
 }
 
-static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+static void ctdb_deferred_attach_callback(struct tevent_context *ev,
+                                         struct tevent_timer *te,
+                                         struct timeval t, void *private_data)
 {
        struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
        struct ctdb_context *ctdb = da_ctx->ctdb;
@@ -990,7 +1092,9 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
         */
        while ((da_ctx = ctdb->deferred_attach) != NULL) {
                DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
-               event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+               tevent_add_timer(ctdb->ev, da_ctx,
+                                timeval_current_ofs(1,0),
+                                ctdb_deferred_attach_callback, da_ctx);
        }
 
        return 0;
@@ -1002,21 +1106,28 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                               TDB_DATA *outdata, uint64_t tdb_flags, 
                               bool persistent, uint32_t client_id,
-                              struct ctdb_req_control *c,
+                              struct ctdb_req_control_old *c,
                               bool *async_reply)
 {
        const char *db_name = (const char *)indata.dptr;
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
        struct ctdb_client *client = NULL;
+       bool with_jenkinshash, with_mutexes;
 
-       /* dont allow any local clients to attach while we are in recovery mode
+       if (ctdb->tunable.allow_client_db_attach == 0) {
+               DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
+                                 "AllowClientDBAccess == 0\n", db_name));
+               return -1;
+       }
+
+       /* don't allow any local clients to attach while we are in recovery mode
         * except for the recovery daemon.
         * allow all attach from the network since these are always from remote
         * recovery daemons.
         */
        if (client_id != 0) {
-               client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+               client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
        }
        if (client != NULL) {
                /* If the node is inactive it is not part of the cluster
@@ -1024,13 +1135,13 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                   databases
                */
                if (node->flags & NODE_FLAGS_INACTIVE) {
-                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+                       DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
                        return -1;
                }
 
-               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
-                && client->pid != ctdb->recoverd_pid
-                && !ctdb->done_startup) {
+               if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+                   client->pid != ctdb->recoverd_pid &&
+                   ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
                        struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
 
                        if (da_ctx == NULL) {
@@ -1043,7 +1154,9 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                        talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
                        DLIST_ADD(ctdb->deferred_attach, da_ctx);
 
-                       event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
+                       tevent_add_timer(ctdb->ev, da_ctx,
+                                        timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
+                                        ctdb_deferred_attach_timeout, da_ctx);
 
                        DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
                        *async_reply = true;
@@ -1055,18 +1168,36 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
           only allow a subset of those on the database in ctdb. Note
           that tdb_flags is passed in via the (otherwise unused)
           srvid to the attach control */
+#ifdef TDB_MUTEX_LOCKING
+       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
+#else
        tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
+#endif
 
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
        if (db) {
+               if (db->persistent != persistent) {
+                       DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
+                                         "database %s\n", persistent ? "" : "non-",
+                                         db-> persistent ? "" : "non-", db_name));
+                       return -1;
+               }
                outdata->dptr  = (uint8_t *)&db->db_id;
                outdata->dsize = sizeof(db->db_id);
                tdb_add_flags(db->ltdb->tdb, tdb_flags);
                return 0;
        }
 
-       if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
+       with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
+#ifdef TDB_MUTEX_LOCKING
+       with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
+#else
+       with_mutexes = false;
+#endif
+
+       if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
+                             with_jenkinshash, with_mutexes) != 0) {
                return -1;
        }
 
@@ -1083,7 +1214,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        outdata->dsize = sizeof(db->db_id);
 
        /* Try to ensure it's locked in mem */
-       ctdb_lockdown_memory(ctdb);
+       lockdown_memory(ctdb->valgrinding);
 
        /* tell all the other nodes about this database */
        ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
@@ -1096,6 +1227,100 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        return 0;
 }
 
+/*
+ * a client has asked to detach from a database
+ */
+int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
+                              uint32_t client_id)
+{
+       uint32_t db_id;
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_client *client = NULL;
+
+       db_id = *(uint32_t *)indata.dptr;
+       ctdb_db = find_ctdb_db(ctdb, db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
+                                 db_id));
+               return -1;
+       }
+
+       if (ctdb->tunable.allow_client_db_attach == 1) {
+               DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
+                                 "Clients are allowed access to databases "
+                                 "(AllowClientDBAccess == 1)\n",
+                                 ctdb_db->db_name));
+               return -1;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
+                                 "denied\n", ctdb_db->db_name));
+               return -1;
+       }
+
+       /* Cannot detach from database when in recovery */
+       if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+               DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
+               return -1;
+       }
+
+       /* If a control comes from a client, then broadcast it to all nodes.
+        * Do the actual detach only if the control comes from other daemons.
+        */
+       if (client_id != 0) {
+               client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
+               if (client != NULL) {
+                       /* forward the control to all the nodes */
+                       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+                                                CTDB_CONTROL_DB_DETACH, 0,
+                                                CTDB_CTRL_FLAG_NOREPLY,
+                                                indata, NULL, NULL);
+                       return 0;
+               }
+               DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
+                                 "for database '%s'\n", ctdb_db->db_name));
+               return -1;
+       }
+
+       /* Detach database from recoverd */
+       if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
+                                    CTDB_SRVID_DETACH_DATABASE,
+                                    indata) != 0) {
+               DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
+               return -1;
+       }
+
+       /* Disable vacuuming and drop all vacuuming data */
+       talloc_free(ctdb_db->vacuum_handle);
+       talloc_free(ctdb_db->delete_queue);
+
+       /* Terminate any deferred fetch */
+       talloc_free(ctdb_db->deferred_fetch);
+
+       /* Terminate any traverses */
+       while (ctdb_db->traverse) {
+               talloc_free(ctdb_db->traverse);
+       }
+
+       /* Terminate any revokes */
+       while (ctdb_db->revokechild_active) {
+               talloc_free(ctdb_db->revokechild_active);
+       }
+
+       /* Free readonly tracking database */
+       if (ctdb_db->readonly) {
+               talloc_free(ctdb_db->rottdb);
+       }
+
+       DLIST_REMOVE(ctdb->db_list, ctdb_db);
+
+       DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
+                            ctdb_db->db_name));
+       talloc_free(ctdb_db);
+
+       return 0;
+}
 
 /*
   attach to all existing persistent databases
@@ -1119,7 +1344,10 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                int invalid_name = 0;
                
                s = talloc_strdup(ctdb, de->d_name);
-               CTDB_NO_MEMORY(ctdb, s);
+               if (s == NULL) {
+                       closedir(d);
+                       CTDB_NO_MEMORY(ctdb, s);
+               }
 
                /* only accept names ending in .tdb */
                p = strstr(s, ".tdb.");
@@ -1142,7 +1370,7 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                }
                p[4] = 0;
 
-               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
+               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
                        closedir(d);
                        talloc_free(s);
@@ -1164,40 +1392,6 @@ int ctdb_attach_databases(struct ctdb_context *ctdb)
        char *unhealthy_reason = NULL;
        bool first_try = true;
 
-       if (ctdb->db_directory == NULL) {
-               ctdb->db_directory = VARDIR "/ctdb";
-       }
-       if (ctdb->db_directory_persistent == NULL) {
-               ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
-       }
-       if (ctdb->db_directory_state == NULL) {
-               ctdb->db_directory_state = VARDIR "/ctdb/state";
-       }
-
-       /* make sure the db directory exists */
-       ret = mkdir(ctdb->db_directory, 0700);
-       if (ret == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
-                        ctdb->db_directory));
-               return -1;
-       }
-
-       /* make sure the persistent db directory exists */
-       ret = mkdir(ctdb->db_directory_persistent, 0700);
-       if (ret == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
-                        ctdb->db_directory_persistent));
-               return -1;
-       }
-
-       /* make sure the internal state db directory exists */
-       ret = mkdir(ctdb->db_directory_state, 0700);
-       if (ret == -1 && errno != EEXIST) {
-               DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
-                        ctdb->db_directory_state));
-               return -1;
-       }
-
        persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
                                                 ctdb->db_directory_state,
                                                 PERSISTENT_HEALTH_TDB,
@@ -1340,7 +1534,8 @@ int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint3
 /*
   timer to check for seqnum changes in a ltdb and propogate them
  */
-static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
+static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
+                                  struct tevent_timer *te,
                                   struct timeval t, void *p)
 {
        struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
@@ -1359,9 +1554,10 @@ static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event
 
        /* setup a new timer */
        ctdb_db->seqnum_update =
-               event_add_timed(ctdb->ev, ctdb_db, 
-                               timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
-                               ctdb_ltdb_seqnum_check, ctdb_db);
+               tevent_add_timer(ctdb->ev, ctdb_db,
+                                timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
+                                                    (ctdb->tunable.seqnum_interval%1000)*1000),
+                                ctdb_ltdb_seqnum_check, ctdb_db);
 }
 
 /*
@@ -1377,10 +1573,11 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
        }
 
        if (ctdb_db->seqnum_update == NULL) {
-               ctdb_db->seqnum_update =
-                       event_add_timed(ctdb->ev, ctdb_db, 
-                                       timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
-                                       ctdb_ltdb_seqnum_check, ctdb_db);
+               ctdb_db->seqnum_update = tevent_add_timer(
+                       ctdb->ev, ctdb_db,
+                       timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
+                                           (ctdb->tunable.seqnum_interval%1000)*1000),
+                       ctdb_ltdb_seqnum_check, ctdb_db);
        }
 
        tdb_enable_seqnum(ctdb_db->ltdb->tdb);
@@ -1388,25 +1585,81 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
        return 0;
 }
 
-int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
+int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+       if (ctdb_db->sticky) {
+               return 0;
+       }
+
+       if (ctdb_db->persistent) {
+               DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+               return -1;
+       }
+
+       ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
+
+       ctdb_db->sticky = true;
+
+       DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
+       return 0;
+}
+
+void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
+{
+       struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
+       int i;
+
+       for (i=0; i<MAX_HOT_KEYS; i++) {
+               if (s->hot_keys[i].key.dsize > 0) {
+                       talloc_free(s->hot_keys[i].key.dptr);
+               }
+       }
+
+       ZERO_STRUCT(ctdb_db->statistics);
+}
+
+int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
+                               uint32_t db_id,
+                               TDB_DATA *outdata)
 {
-       struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
        struct ctdb_db_context *ctdb_db;
+       struct ctdb_db_statistics_old *stats;
+       int i;
+       int len;
+       char *ptr;
 
-       ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
+       ctdb_db = find_ctdb_db(ctdb, db_id);
        if (!ctdb_db) {
-               DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
-               return 0;
+               DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
+               return -1;
        }
 
-       if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
-               DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
-               return 0;
+       len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
+       for (i = 0; i < MAX_HOT_KEYS; i++) {
+               len += ctdb_db->statistics.hot_keys[i].key.dsize;
+       }
+
+       stats = talloc_size(outdata, len);
+       if (stats == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
+               return -1;
        }
 
-       ctdb_db->priority = db_prio->priority;
-       DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
+       memcpy(stats, &ctdb_db->statistics,
+              offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
+
+       stats->num_hot_keys = MAX_HOT_KEYS;
+
+       ptr = &stats->hot_keys_wire[0];
+       for (i = 0; i < MAX_HOT_KEYS; i++) {
+               memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
+                      ctdb_db->statistics.hot_keys[i].key.dsize);
+               ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
+       }
+
+       outdata->dptr  = (uint8_t *)stats;
+       outdata->dsize = len;
 
        return 0;
 }
-