ctdb-daemon: Use correct tdb flags when enabling robust mutex support
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
index 9c1c27d0f4bb08376feb51fd8d6daf5807bd74fb..da18826fa59d12e64ef5b7231e54c44047ae77ab 100644 (file)
@@ -19,7 +19,7 @@
 */
 
 #include "includes.h"
-#include "db_wrap.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "tdb.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
@@ -29,8 +29,6 @@
 #include "../include/ctdb_private.h"
 #include "lib/util/dlinklist.h"
 
-pid_t ctdbd_pid;
-
 /*
   allocate a packet for use in client<->daemon communication
  */
@@ -56,7 +54,7 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
        hdr->length       = length;
        hdr->operation    = operation;
        hdr->ctdb_magic   = CTDB_MAGIC;
-       hdr->ctdb_version = CTDB_VERSION;
+       hdr->ctdb_version = CTDB_PROTOCOL;
        hdr->srcnode      = ctdb->pnn;
        if (ctdb->vnn_map) {
                hdr->generation = ctdb->vnn_map->generation;
@@ -218,7 +216,7 @@ void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
                goto done;
        }
 
-       if (hdr->ctdb_version != CTDB_VERSION) {
+       if (hdr->ctdb_version != CTDB_PROTOCOL) {
                ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
                goto done;
        }
@@ -253,7 +251,7 @@ int ctdb_socket_connect(struct ctdb_context *ctdb)
 
        memset(&addr, 0, sizeof(addr));
        addr.sun_family = AF_UNIX;
-       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
 
        ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (ctdb->daemon.sd == -1) {
@@ -709,6 +707,21 @@ again:
                goto again;
        }
 
+       /* if this is a request for read/write and we have delegations
+          we have to revoke all delegations first
+       */
+       if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
+           (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_force_migration(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+               goto again;
+       }
+
        DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
        return h;
 }
@@ -1913,6 +1926,12 @@ int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32
                tdb_flags = TDB_INCOMPATIBLE_HASH;
        }
 
+#ifdef TDB_MUTEX_LOCKING
+       if (!persistent && ctdb->tunable.mutex_enabled == 1) {
+               tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+       }
+#endif
+
        ret = ctdb_control(ctdb, destnode, tdb_flags,
                           persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
                           0, data, 
@@ -2036,6 +2055,9 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
        TDB_DATA data;
        int ret;
        int32_t res;
+#ifdef TDB_MUTEX_LOCKING
+       uint32_t mutex_enabled = 0;
+#endif
 
        ctdb_db = ctdb_db_handle(ctdb, name);
        if (ctdb_db) {
@@ -2060,6 +2082,22 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
                tdb_flags |= TDB_INCOMPATIBLE_HASH;
        }
 
+#ifdef TDB_MUTEX_LOCKING
+       if (!persistent) {
+               ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
+                                           CTDB_CURRENT_NODE,
+                                           "TDBMutexEnabled",
+                                           &mutex_enabled);
+               if (ret != 0) {
+                       DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
+               }
+
+               if (mutex_enabled == 1) {
+                       tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+               }
+       }
+#endif
+
        /* tell ctdb daemon to attach */
        ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
                           persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
@@ -2080,13 +2118,23 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
                return NULL;
        }
 
-       tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
+       if (persistent) {
+               tdb_flags = TDB_DEFAULT;
+       } else {
+               tdb_flags = TDB_NOSYNC;
+#ifdef TDB_MUTEX_LOCKING
+               if (mutex_enabled) {
+                       tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+               }
+#endif
+       }
        if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
        tdb_flags |= TDB_DISALLOW_NESTING;
 
-       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
+       ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
+                                     O_RDWR, 0);
        if (ctdb_db->ltdb == NULL) {
                ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
                talloc_free(ctdb_db);
@@ -2105,6 +2153,25 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
        return ctdb_db;
 }
 
+/*
+ * detach from a specific database - client call
+ */
+int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
+{
+       int ret;
+       int32_t status;
+       TDB_DATA data;
+
+       data.dsize = sizeof(db_id);
+       data.dptr = (uint8_t *)&db_id;
+
+       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
+                          0, data, NULL, NULL, &status, NULL, NULL);
+       if (ret != 0 || status != 0) {
+               return -1;
+       }
+       return 0;
+}
 
 /*
   setup a call for a database
@@ -3330,7 +3397,7 @@ struct ctdb_context *ctdb_init(struct event_context *ev)
        ctdb->lastid = INT_MAX-200;
        CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
 
-       ret = ctdb_set_socketname(ctdb, CTDB_PATH);
+       ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
                talloc_free(ctdb);
@@ -3444,6 +3511,9 @@ static void async_callback(struct ctdb_client_control_state *state)
        int32_t res = -1;
        uint32_t destnode = state->c->hdr.destnode;
 
+       outdata.dsize = 0;
+       outdata.dptr = NULL;
+
        /* one more node has responded with recmode data */
        data->count--;
 
@@ -3750,7 +3820,10 @@ static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
        return id;
 }
 
-static bool server_id_equal(struct server_id *id1, struct server_id *id2)
+/* This is basically a copy from Samba's server_id.*.  However, a
+ * dependency chain stops us from using Samba's version, so use a
+ * renamed copy until a better solution is found. */
+static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
 {
        if (id1->pid != id2->pid) {
                return false;
@@ -3827,7 +3900,7 @@ static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
 
        if (data.dsize % sizeof(struct g_lock_rec) != 0) {
                DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
-                                 data.dsize));
+                                 (unsigned long)data.dsize));
                talloc_free(recs);
                return false;
        }
@@ -3856,10 +3929,20 @@ static bool g_lock_lock(TALLOC_CTX *mem_ctx,
        struct ctdb_record_handle *h;
        struct g_lock_recs *locks;
        struct server_id id;
+       struct timeval t_start;
        int i;
 
        key.dptr = (uint8_t *)discard_const(keyname);
        key.dsize = strlen(keyname) + 1;
+
+       t_start = timeval_current();
+
+again:
+       /* Keep trying for an hour. */
+       if (timeval_elapsed(&t_start) > 3600) {
+               return false;
+       }
+
        h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
        if (h == NULL) {
                return false;
@@ -3878,7 +3961,7 @@ static bool g_lock_lock(TALLOC_CTX *mem_ctx,
 
        i = 0;
        while (i < locks->num) {
-               if (server_id_equal(&locks->lock[i].id, &id)) {
+               if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
                        /* Internal error */
                        talloc_free(h);
                        return false;
@@ -3899,7 +3982,7 @@ static bool g_lock_lock(TALLOC_CTX *mem_ctx,
                                   id.task_id, id.vnn,
                                   (unsigned long long)id.unique_id));
                talloc_free(h);
-               return false;
+               goto again;
        }
 
        locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
@@ -3966,7 +4049,7 @@ static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
        id = server_id_get(ctdb_db->ctdb, reqid);
 
        for (i=0; i<locks->num; i++) {
-               if (server_id_equal(&locks->lock[i].id, &id)) {
+               if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
                        if (i < locks->num-1) {
                                locks->lock[i] = locks->lock[locks->num-1];
                        }
@@ -4173,6 +4256,11 @@ static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnu
                return 0;
        }
 
+       if (data.dsize == 0) {
+               *seqnum = 0;
+               return 0;
+       }
+
        if (data.dsize != sizeof(*seqnum)) {
                DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
                                  data.dsize));