ctdb: Make TDB_SEQNUM work synchronously with ctdb
[samba.git] / ctdb / server / ctdb_ltdb_server.c
index 93e90413d0aeba37933e6feec54cc89b4891ceba..8cc6c4ba4cc6b9e0de86e0a0f781f6cd83f3bac5 100644 (file)
    along with this program; if not, see <http://www.gnu.org/licenses/>.
 */
 
-#include "includes.h"
-#include "tdb.h"
+#include "replace.h"
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/dir.h"
 #include "system/time.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
+#include "system/locale.h"
+
+#include <talloc.h>
+#include <tevent.h>
+
 #include "lib/tdb_wrap/tdb_wrap.h"
 #include "lib/util/dlinklist.h"
-#include <ctype.h>
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
+
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/rb_tree.h"
 #include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
+
+#include "server/ctdb_config.h"
 
 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
 
@@ -46,23 +59,30 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                                  TDB_DATA data)
 {
        struct ctdb_context *ctdb = ctdb_db->ctdb;
-       TDB_DATA rec;
+       TDB_DATA rec[2];
+       uint32_t hsize = sizeof(struct ctdb_ltdb_header);
        int ret;
-       bool seqnum_suppressed = false;
        bool keep = false;
        bool schedule_for_deletion = false;
        bool remove_from_delete_queue = false;
        uint32_t lmaster;
 
        if (ctdb->flags & CTDB_FLAG_TORTURE) {
+               TDB_DATA old;
                struct ctdb_ltdb_header *h2;
-               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
-               h2 = (struct ctdb_ltdb_header *)rec.dptr;
-               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
-                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
-                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+               h2 = (struct ctdb_ltdb_header *)old.dptr;
+               if (old.dptr != NULL &&
+                   old.dsize >= hsize &&
+                   h2->rsn > header->rsn) {
+                       DEBUG(DEBUG_ERR,
+                             ("RSN regression! %"PRIu64" %"PRIu64"\n",
+                              h2->rsn, header->rsn));
+               }
+               if (old.dptr) {
+                       free(old.dptr);
                }
-               if (rec.dptr) free(rec.dptr);
        }
 
        if (ctdb->vnn_map == NULL) {
@@ -85,8 +105,6 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
                keep = true;
        } else if (header->flags & CTDB_REC_RO_FLAGS) {
                keep = true;
-       } else if (ctdb_db->persistent) {
-               keep = true;
        } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
                /*
                 * The record is not created by the client but
@@ -126,7 +144,7 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
        }
 
        if (keep) {
-               if (!ctdb_db->persistent &&
+               if (ctdb_db_volatile(ctdb_db) &&
                    (ctdb_db->ctdb->pnn == header->dmaster) &&
                    !(header->flags & CTDB_REC_RO_FLAGS))
                {
@@ -167,28 +185,11 @@ store:
         */
        header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
 
-       rec.dsize = sizeof(*header) + data.dsize;
-       rec.dptr = talloc_size(ctdb, rec.dsize);
-       CTDB_NO_MEMORY(ctdb, rec.dptr);
-
-       memcpy(rec.dptr, header, sizeof(*header));
-       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
-
-       /* Databases with seqnum updates enabled only get their seqnum
-          changes when/if we modify the data */
-       if (ctdb_db->seqnum_update != NULL) {
-               TDB_DATA old;
-               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+       rec[0].dsize = hsize;
+       rec[0].dptr = (uint8_t *)header;
 
-               if ( (old.dsize == rec.dsize)
-               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
-                       tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
-                       seqnum_suppressed = true;
-               }
-               if (old.dptr) free(old.dptr);
-       }
+       rec[1].dsize = data.dsize;
+       rec[1].dptr = data.dptr;
 
        DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
                            ctdb_db->db_name,
@@ -196,7 +197,7 @@ store:
                            ctdb_hash(&key)));
 
        if (keep) {
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+               ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
        } else {
                ret = tdb_delete(ctdb_db->ltdb->tdb, key);
        }
@@ -219,11 +220,6 @@ store:
                schedule_for_deletion = false;
                remove_from_delete_queue = false;
        }
-       if (seqnum_suppressed) {
-               tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
-       }
-
-       talloc_free(rec.dptr);
 
        if (schedule_for_deletion) {
                int ret2;
@@ -357,14 +353,17 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
 
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
                                     recv_context, ignore_generation);
-       if (ret == 0) {
-               ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
-               if (ret != 0) {
-                       int uret;
-                       uret = ctdb_ltdb_unlock(ctdb_db, key);
-                       if (uret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
-                       }
+       if (ret != 0) {
+               return ret;
+       }
+
+       ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+       if (ret != 0) {
+               int uret;
+               uret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (uret != 0) {
+                       DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
+                               uret);
                }
        }
        return ret;
@@ -372,7 +371,7 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
 
 
 /*
-  paraoid check to see if the db is empty
+  paranoid check to see if the db is empty
  */
 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
 {
@@ -593,7 +592,7 @@ int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
        int fail = 0;
 
        for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (!ctdb_db->persistent) {
+               if (!ctdb_db_persistent(ctdb_db)) {
                        continue;
                }
 
@@ -619,7 +618,7 @@ int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
                                   ctdb_db->db_path,
                                   ctdb_db->unhealthy_reason));
        }
-       DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
+       DEBUG(DEBUG_NOTICE,
              ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
               ok, fail));
 
@@ -704,12 +703,13 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
 {
        char *ropath;
 
-       if (ctdb_db->readonly) {
+       if (ctdb_db_readonly(ctdb_db)) {
                return 0;
        }
 
-       if (ctdb_db->persistent) {
-               DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
+       if (! ctdb_db_volatile(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Non-volatile databases do not support readonly flag\n"));
                return -1;
        }
 
@@ -721,7 +721,7 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
        ctdb_db->rottdb = tdb_open(ropath, 
                              ctdb->tunable.database_hash_size, 
                              TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
-                             O_CREAT|O_RDWR, 0);
+                             O_CREAT|O_RDWR, 0600);
        if (ctdb_db->rottdb == NULL) {
                DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
                talloc_free(ropath);
@@ -730,7 +730,7 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
 
        DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
 
-       ctdb_db->readonly = true;
+       ctdb_db_set_readonly(ctdb_db);
 
        DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
 
@@ -743,20 +743,18 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
   return 0 on success, -1 on failure
  */
 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
-                            bool persistent, const char *unhealthy_reason,
-                            bool jenkinshash, bool mutexes)
+                            uint8_t db_flags, const char *unhealthy_reason)
 {
        struct ctdb_db_context *ctdb_db, *tmp_db;
        int ret;
        struct TDB_DATA key;
-       unsigned tdb_flags;
+       int tdb_flags;
        int mode = 0600;
        int remaining_tries = 0;
 
        ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
        CTDB_NO_MEMORY(ctdb, ctdb_db);
 
-       ctdb_db->priority = 1;
        ctdb_db->ctdb = ctdb;
        ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
        CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
@@ -764,9 +762,9 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        key.dsize = strlen(db_name)+1;
        key.dptr  = discard_const(db_name);
        ctdb_db->db_id = ctdb_hash(&key);
-       ctdb_db->persistent = persistent;
+       ctdb_db->db_flags = db_flags;
 
-       if (!ctdb_db->persistent) {
+       if (ctdb_db_volatile(ctdb_db)) {
                ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
                if (ctdb_db->delete_queue == NULL) {
                        CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
@@ -785,7 +783,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                }
        }
 
-       if (persistent) {
+       if (ctdb_db_persistent(ctdb_db)) {
                if (unhealthy_reason) {
                        ret = ctdb_update_persistent_health(ctdb, ctdb_db,
                                                            unhealthy_reason, 0);
@@ -827,24 +825,15 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        }
 
        /* open the database */
-       ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
-                                          persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
+       ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
+                                          ctdb_db_persistent(ctdb_db) ?
+                                               ctdb->db_directory_persistent :
+                                               ctdb->db_directory,
                                           db_name, ctdb->pnn);
 
-       tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
-       if (ctdb->valgrinding) {
-               tdb_flags |= TDB_NOMMAP;
-       }
-       tdb_flags |= TDB_DISALLOW_NESTING;
-       if (jenkinshash) {
-               tdb_flags |= TDB_INCOMPATIBLE_HASH;
-       }
-#ifdef TDB_MUTEX_LOCKING
-       if (ctdb->tunable.mutex_enabled && mutexes &&
-           tdb_runtime_check_for_robust_mutexes()) {
-               tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
-       }
-#endif
+       tdb_flags = ctdb_db_tdb_flags(db_flags,
+                                     ctdb->valgrinding,
+                                     ctdb_config.tdb_mutexes);
 
 again:
        ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
@@ -855,7 +844,7 @@ again:
                struct stat st;
                int saved_errno = errno;
 
-               if (!persistent) {
+               if (! ctdb_db_persistent(ctdb_db)) {
                        DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
                                          ctdb_db->db_path,
                                          saved_errno,
@@ -901,7 +890,7 @@ again:
                goto again;
        }
 
-       if (!persistent) {
+       if (!ctdb_db_persistent(ctdb_db)) {
                ctdb_check_db_empty(ctdb_db);
        } else {
                ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
@@ -947,6 +936,10 @@ again:
                }
        }
 
+       /* remember the flags the client has specified */
+       tdb_add_flags(ctdb_db->ltdb->tdb, tdb_flags);
+
+
        /* set up a rb tree we can use to track which records we have a 
           fetch-lock in-flight for so we can defer any additional calls
           for the same record.
@@ -1012,6 +1005,25 @@ again:
                return -1;
        }
 
+       ret = ctdb_migration_init(ctdb_db);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to setup migration tracking for db '%s'\n",
+                      ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
+       ret = db_hash_init(ctdb_db, "lock_log", 2048, DB_HASH_COMPLEX,
+                          &ctdb_db->lock_log);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR,
+                     ("Failed to setup lock logging for db '%s'\n",
+                      ctdb_db->db_name));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        ctdb_db->generation = ctdb->vnn_map->generation;
 
        DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
@@ -1025,7 +1037,7 @@ again:
 struct ctdb_deferred_attach_context {
        struct ctdb_deferred_attach_context *next, *prev;
        struct ctdb_context *ctdb;
-       struct ctdb_req_control *c;
+       struct ctdb_req_control_old *c;
 };
 
 
@@ -1036,7 +1048,9 @@ static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *
        return 0;
 }
 
-static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
+                                        struct tevent_timer *te,
+                                        struct timeval t, void *private_data)
 {
        struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
        struct ctdb_context *ctdb = da_ctx->ctdb;
@@ -1045,7 +1059,9 @@ static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_
        talloc_free(da_ctx);
 }
 
-static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+static void ctdb_deferred_attach_callback(struct tevent_context *ev,
+                                         struct tevent_timer *te,
+                                         struct timeval t, void *private_data)
 {
        struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
        struct ctdb_context *ctdb = da_ctx->ctdb;
@@ -1064,7 +1080,9 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
         */
        while ((da_ctx = ctdb->deferred_attach) != NULL) {
                DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
-               event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+               tevent_add_timer(ctdb->ev, da_ctx,
+                                timeval_current_ofs(1,0),
+                                ctdb_deferred_attach_callback, da_ctx);
        }
 
        return 0;
@@ -1073,17 +1091,20 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
 /*
   a client has asked to attach a new database
  */
-int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
-                              TDB_DATA *outdata, uint64_t tdb_flags, 
-                              bool persistent, uint32_t client_id,
-                              struct ctdb_req_control *c,
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
+                              TDB_DATA indata,
+                              TDB_DATA *outdata,
+                              uint8_t db_flags,
+                              uint32_t srcnode,
+                              uint32_t client_id,
+                              struct ctdb_req_control_old *c,
                               bool *async_reply)
 {
        const char *db_name = (const char *)indata.dptr;
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
        struct ctdb_client *client = NULL;
-       bool with_jenkinshash, with_mutexes;
+       uint32_t opcode;
 
        if (ctdb->tunable.allow_client_db_attach == 0) {
                DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
@@ -1091,12 +1112,12 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return -1;
        }
 
-       /* dont allow any local clients to attach while we are in recovery mode
+       /* don't allow any local clients to attach while we are in recovery mode
         * except for the recovery daemon.
         * allow all attach from the network since these are always from remote
         * recovery daemons.
         */
-       if (client_id != 0) {
+       if (srcnode == ctdb->pnn && client_id != 0) {
                client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
        }
        if (client != NULL) {
@@ -1124,7 +1145,9 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                        talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
                        DLIST_ADD(ctdb->deferred_attach, da_ctx);
 
-                       event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
+                       tevent_add_timer(ctdb->ev, da_ctx,
+                                        timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
+                                        ctdb_deferred_attach_timeout, da_ctx);
 
                        DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
                        *async_reply = true;
@@ -1132,40 +1155,22 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                }
        }
 
-       /* the client can optionally pass additional tdb flags, but we
-          only allow a subset of those on the database in ctdb. Note
-          that tdb_flags is passed in via the (otherwise unused)
-          srvid to the attach control */
-#ifdef TDB_MUTEX_LOCKING
-       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
-#else
-       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
-#endif
-
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
        if (db) {
-               if (db->persistent != persistent) {
-                       DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
-                                         "database %s\n", persistent ? "" : "non-",
-                                         db-> persistent ? "" : "non-", db_name));
+               if ((db->db_flags & db_flags) != db_flags) {
+                       DEBUG(DEBUG_ERR,
+                             ("Error: Failed to re-attach with 0x%x flags,"
+                              " database has 0x%x flags\n", db_flags,
+                              db->db_flags));
                        return -1;
                }
                outdata->dptr  = (uint8_t *)&db->db_id;
                outdata->dsize = sizeof(db->db_id);
-               tdb_add_flags(db->ltdb->tdb, tdb_flags);
                return 0;
        }
 
-       with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
-#ifdef TDB_MUTEX_LOCKING
-       with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
-#else
-       with_mutexes = false;
-#endif
-
-       if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
-                             with_jenkinshash, with_mutexes) != 0) {
+       if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
                return -1;
        }
 
@@ -1175,19 +1180,22 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return -1;
        }
 
-       /* remember the flags the client has specified */
-       tdb_add_flags(db->ltdb->tdb, tdb_flags);
-
        outdata->dptr  = (uint8_t *)&db->db_id;
        outdata->dsize = sizeof(db->db_id);
 
        /* Try to ensure it's locked in mem */
        lockdown_memory(ctdb->valgrinding);
 
+       if (ctdb_db_persistent(db)) {
+               opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+       } else if (ctdb_db_replicated(db)) {
+               opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
+       } else {
+               opcode = CTDB_CONTROL_DB_ATTACH;
+       }
+
        /* tell all the other nodes about this database */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
-                                persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
-                                               CTDB_CONTROL_DB_ATTACH,
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
                                 0, CTDB_CTRL_FLAG_NOREPLY,
                                 indata, NULL, NULL);
 
@@ -1221,9 +1229,10 @@ int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
                return -1;
        }
 
-       if (ctdb_db->persistent) {
-               DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
-                                 "denied\n", ctdb_db->db_name));
+       if (! ctdb_db_volatile(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Detaching non-volatile database %s denied\n",
+                      ctdb_db->db_name));
                return -1;
        }
 
@@ -1240,7 +1249,8 @@ int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
                client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
                if (client != NULL) {
                        /* forward the control to all the nodes */
-                       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+                       ctdb_daemon_send_control(ctdb,
+                                                CTDB_BROADCAST_CONNECTED, 0,
                                                 CTDB_CONTROL_DB_DETACH, 0,
                                                 CTDB_CTRL_FLAG_NOREPLY,
                                                 indata, NULL, NULL);
@@ -1277,7 +1287,7 @@ int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
        }
 
        /* Free readonly tracking database */
-       if (ctdb_db->readonly) {
+       if (ctdb_db_readonly(ctdb_db)) {
                talloc_free(ctdb_db->rottdb);
        }
 
@@ -1338,7 +1348,7 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                }
                p[4] = 0;
 
-               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
+               if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
                        closedir(d);
                        talloc_free(s);
@@ -1500,30 +1510,38 @@ int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint3
 }
 
 /*
-  timer to check for seqnum changes in a ltdb and propogate them
+  timer to check for seqnum changes in a ltdb and propagate them
  */
-static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
+static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
+                                  struct tevent_timer *te,
                                   struct timeval t, void *p)
 {
        struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
        if (new_seqnum != ctdb_db->seqnum) {
-               /* something has changed - propogate it */
+               /* something has changed - propagate it */
                TDB_DATA data;
                data.dptr = (uint8_t *)&ctdb_db->db_id;
                data.dsize = sizeof(uint32_t);
-               ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
-                                        CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
-                                        data, NULL, NULL);             
+               ctdb_daemon_send_control(ctdb,
+                                        CTDB_BROADCAST_ACTIVE,
+                                        0,
+                                        CTDB_CONTROL_UPDATE_SEQNUM,
+                                        0,
+                                        CTDB_CTRL_FLAG_NOREPLY,
+                                        data,
+                                        NULL,
+                                        NULL);
        }
        ctdb_db->seqnum = new_seqnum;
 
        /* setup a new timer */
        ctdb_db->seqnum_update =
-               event_add_timed(ctdb->ev, ctdb_db, 
-                               timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
-                               ctdb_ltdb_seqnum_check, ctdb_db);
+               tevent_add_timer(ctdb->ev, ctdb_db,
+                                timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
+                                                    (ctdb->tunable.seqnum_interval%1000)*1000),
+                                ctdb_ltdb_seqnum_check, ctdb_db);
 }
 
 /*
@@ -1539,10 +1557,11 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
        }
 
        if (ctdb_db->seqnum_update == NULL) {
-               ctdb_db->seqnum_update =
-                       event_add_timed(ctdb->ev, ctdb_db, 
-                                       timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
-                                       ctdb_ltdb_seqnum_check, ctdb_db);
+               ctdb_db->seqnum_update = tevent_add_timer(
+                       ctdb->ev, ctdb_db,
+                       timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
+                                           (ctdb->tunable.seqnum_interval%1000)*1000),
+                       ctdb_ltdb_seqnum_check, ctdb_db);
        }
 
        tdb_enable_seqnum(ctdb_db->ltdb->tdb);
@@ -1550,54 +1569,21 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
        return 0;
 }
 
-int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata,
-                                    uint32_t client_id)
-{
-       struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
-       struct ctdb_db_context *ctdb_db;
-
-       ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
-       if (!ctdb_db) {
-               if (!(ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE)) {
-                       DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n",
-                                        db_prio->db_id));
-               }
-               return 0;
-       }
-
-       if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
-               DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
-               return 0;
-       }
-
-       ctdb_db->priority = db_prio->priority;
-       DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
-
-       if (client_id != 0) {
-               /* Broadcast the update to the rest of the cluster */
-               ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
-                                        CTDB_CONTROL_SET_DB_PRIORITY, 0,
-                                        CTDB_CTRL_FLAG_NOREPLY, indata,
-                                        NULL, NULL);
-       }
-       return 0;
-}
-
-
 int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
 {
-       if (ctdb_db->sticky) {
+       if (ctdb_db_sticky(ctdb_db)) {
                return 0;
        }
 
-       if (ctdb_db->persistent) {
-               DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+       if (! ctdb_db_volatile(ctdb_db)) {
+               DEBUG(DEBUG_ERR,
+                     ("Non-volatile databases do not support sticky flag\n"));
                return -1;
        }
 
        ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
 
-       ctdb_db->sticky = true;
+       ctdb_db_set_sticky(ctdb_db);
 
        DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
 
@@ -1606,7 +1592,7 @@ int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_d
 
 void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
 {
-       struct ctdb_db_statistics *s = &ctdb_db->statistics;
+       struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
        int i;
 
        for (i=0; i<MAX_HOT_KEYS; i++) {
@@ -1623,7 +1609,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
                                TDB_DATA *outdata)
 {
        struct ctdb_db_context *ctdb_db;
-       struct ctdb_db_statistics *stats;
+       struct ctdb_db_statistics_old *stats;
        int i;
        int len;
        char *ptr;
@@ -1634,7 +1620,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
                return -1;
        }
 
-       len = offsetof(struct ctdb_db_statistics, hot_keys_wire);
+       len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
        for (i = 0; i < MAX_HOT_KEYS; i++) {
                len += ctdb_db->statistics.hot_keys[i].key.dsize;
        }
@@ -1646,7 +1632,7 @@ int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
        }
 
        memcpy(stats, &ctdb_db->statistics,
-              offsetof(struct ctdb_db_statistics, hot_keys_wire));
+              offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
 
        stats->num_hot_keys = MAX_HOT_KEYS;