ctdb: Make TDB_SEQNUM work synchronously with ctdb
[samba.git] / ctdb / server / ctdb_ltdb_server.c
index 57611e29fbcc0957a8bf36d6facc2b47e3cfec49..8cc6c4ba4cc6b9e0de86e0a0f781f6cd83f3bac5 100644 (file)
@@ -41,6 +41,8 @@
 #include "common/common.h"
 #include "common/logging.h"
 
+#include "server/ctdb_config.h"
+
 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
 
 /**
@@ -60,7 +62,6 @@ static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
        TDB_DATA rec[2];
        uint32_t hsize = sizeof(struct ctdb_ltdb_header);
        int ret;
-       bool seqnum_suppressed = false;
        bool keep = false;
        bool schedule_for_deletion = false;
        bool remove_from_delete_queue = false;
@@ -190,22 +191,6 @@ store:
        rec[1].dsize = data.dsize;
        rec[1].dptr = data.dptr;
 
-       /* Databases with seqnum updates enabled only get their seqnum
-          changes when/if we modify the data */
-       if (ctdb_db->seqnum_update != NULL) {
-               TDB_DATA old;
-               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
-
-               if ((old.dsize == hsize + data.dsize) &&
-                   memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
-                       tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
-                       seqnum_suppressed = true;
-               }
-               if (old.dptr != NULL) {
-                       free(old.dptr);
-               }
-       }
-
        DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
                            ctdb_db->db_name,
                            keep?"storing":"deleting",
@@ -235,9 +220,6 @@ store:
                schedule_for_deletion = false;
                remove_from_delete_queue = false;
        }
-       if (seqnum_suppressed) {
-               tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
-       }
 
        if (schedule_for_deletion) {
                int ret2;
@@ -371,14 +353,17 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
 
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
                                     recv_context, ignore_generation);
-       if (ret == 0) {
-               ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
-               if (ret != 0) {
-                       int uret;
-                       uret = ctdb_ltdb_unlock(ctdb_db, key);
-                       if (uret != 0) {
-                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
-                       }
+       if (ret != 0) {
+               return ret;
+       }
+
+       ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+       if (ret != 0) {
+               int uret;
+               uret = ctdb_ltdb_unlock(ctdb_db, key);
+               if (uret != 0) {
+                       DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
+                               uret);
                }
        }
        return ret;
@@ -386,7 +371,7 @@ int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
 
 
 /*
-  paraoid check to see if the db is empty
+  paranoid check to see if the db is empty
  */
 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
 {
@@ -758,7 +743,7 @@ int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb
   return 0 on success, -1 on failure
  */
 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
-                            bool persistent, const char *unhealthy_reason)
+                            uint8_t db_flags, const char *unhealthy_reason)
 {
        struct ctdb_db_context *ctdb_db, *tmp_db;
        int ret;
@@ -766,7 +751,6 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        int tdb_flags;
        int mode = 0600;
        int remaining_tries = 0;
-       uint8_t db_flags = 0;
 
        ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
        CTDB_NO_MEMORY(ctdb, ctdb_db);
@@ -778,7 +762,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        key.dsize = strlen(db_name)+1;
        key.dptr  = discard_const(db_name);
        ctdb_db->db_id = ctdb_hash(&key);
-       ctdb_db->persistent = persistent;
+       ctdb_db->db_flags = db_flags;
 
        if (ctdb_db_volatile(ctdb_db)) {
                ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
@@ -799,7 +783,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                }
        }
 
-       if (persistent) {
+       if (ctdb_db_persistent(ctdb_db)) {
                if (unhealthy_reason) {
                        ret = ctdb_update_persistent_health(ctdb, ctdb_db,
                                                            unhealthy_reason, 0);
@@ -841,16 +825,15 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
        }
 
        /* open the database */
-       ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
-                                          persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
+       ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
+                                          ctdb_db_persistent(ctdb_db) ?
+                                               ctdb->db_directory_persistent :
+                                               ctdb->db_directory,
                                           db_name, ctdb->pnn);
 
-       if (persistent) {
-               db_flags = CTDB_DB_FLAGS_PERSISTENT;
-       }
-
-       tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
-                                     ctdb->tunable.mutex_enabled);
+       tdb_flags = ctdb_db_tdb_flags(db_flags,
+                                     ctdb->valgrinding,
+                                     ctdb_config.tdb_mutexes);
 
 again:
        ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
@@ -861,7 +844,7 @@ again:
                struct stat st;
                int saved_errno = errno;
 
-               if (!persistent) {
+               if (! ctdb_db_persistent(ctdb_db)) {
                        DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
                                          ctdb_db->db_path,
                                          saved_errno,
@@ -907,7 +890,7 @@ again:
                goto again;
        }
 
-       if (!persistent) {
+       if (!ctdb_db_persistent(ctdb_db)) {
                ctdb_check_db_empty(ctdb_db);
        } else {
                ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
@@ -1108,9 +1091,12 @@ int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
 /*
   a client has asked to attach a new database
  */
-int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
+                              TDB_DATA indata,
                               TDB_DATA *outdata,
-                              bool persistent, uint32_t client_id,
+                              uint8_t db_flags,
+                              uint32_t srcnode,
+                              uint32_t client_id,
                               struct ctdb_req_control_old *c,
                               bool *async_reply)
 {
@@ -1118,6 +1104,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        struct ctdb_db_context *db;
        struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
        struct ctdb_client *client = NULL;
+       uint32_t opcode;
 
        if (ctdb->tunable.allow_client_db_attach == 0) {
                DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
@@ -1130,7 +1117,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
         * allow all attach from the network since these are always from remote
         * recovery daemons.
         */
-       if (client_id != 0) {
+       if (srcnode == ctdb->pnn && client_id != 0) {
                client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
        }
        if (client != NULL) {
@@ -1171,10 +1158,11 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        /* see if we already have this name */
        db = ctdb_db_handle(ctdb, db_name);
        if (db) {
-               if (ctdb_db_persistent(db) != persistent) {
-                       DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
-                                         "database %s\n", persistent ? "" : "non-",
-                                         ctdb_db_persistent(db) ? "" : "non-", db_name));
+               if ((db->db_flags & db_flags) != db_flags) {
+                       DEBUG(DEBUG_ERR,
+                             ("Error: Failed to re-attach with 0x%x flags,"
+                              " database has 0x%x flags\n", db_flags,
+                              db->db_flags));
                        return -1;
                }
                outdata->dptr  = (uint8_t *)&db->db_id;
@@ -1182,7 +1170,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
                return 0;
        }
 
-       if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
+       if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
                return -1;
        }
 
@@ -1198,10 +1186,16 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
        /* Try to ensure it's locked in mem */
        lockdown_memory(ctdb->valgrinding);
 
+       if (ctdb_db_persistent(db)) {
+               opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+       } else if (ctdb_db_replicated(db)) {
+               opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
+       } else {
+               opcode = CTDB_CONTROL_DB_ATTACH;
+       }
+
        /* tell all the other nodes about this database */
-       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
-                                persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
-                                               CTDB_CONTROL_DB_ATTACH,
+       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
                                 0, CTDB_CTRL_FLAG_NOREPLY,
                                 indata, NULL, NULL);
 
@@ -1255,7 +1249,8 @@ int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
                client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
                if (client != NULL) {
                        /* forward the control to all the nodes */
-                       ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+                       ctdb_daemon_send_control(ctdb,
+                                                CTDB_BROADCAST_CONNECTED, 0,
                                                 CTDB_CONTROL_DB_DETACH, 0,
                                                 CTDB_CTRL_FLAG_NOREPLY,
                                                 indata, NULL, NULL);
@@ -1353,7 +1348,7 @@ static int ctdb_attach_persistent(struct ctdb_context *ctdb,
                }
                p[4] = 0;
 
-               if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
+               if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
                        DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
                        closedir(d);
                        talloc_free(s);
@@ -1515,7 +1510,7 @@ int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint3
 }
 
 /*
-  timer to check for seqnum changes in a ltdb and propogate them
+  timer to check for seqnum changes in a ltdb and propagate them
  */
 static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
                                   struct tevent_timer *te,
@@ -1525,13 +1520,19 @@ static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
        if (new_seqnum != ctdb_db->seqnum) {
-               /* something has changed - propogate it */
+               /* something has changed - propagate it */
                TDB_DATA data;
                data.dptr = (uint8_t *)&ctdb_db->db_id;
                data.dsize = sizeof(uint32_t);
-               ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
-                                        CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
-                                        data, NULL, NULL);             
+               ctdb_daemon_send_control(ctdb,
+                                        CTDB_BROADCAST_ACTIVE,
+                                        0,
+                                        CTDB_CONTROL_UPDATE_SEQNUM,
+                                        0,
+                                        CTDB_CTRL_FLAG_NOREPLY,
+                                        data,
+                                        NULL,
+                                        NULL);
        }
        ctdb_db->seqnum = new_seqnum;