ctdb-daemon: Use correct tdb flags when enabling robust mutex support
[obnox/samba/samba-obnox.git] / ctdb / client / ctdb_client.c
index 57dc9ba0025d22f46a5ab925371e2135f4926544..da18826fa59d12e64ef5b7231e54c44047ae77ab 100644 (file)
@@ -19,7 +19,7 @@
 */
 
 #include "includes.h"
-#include "db_wrap.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "tdb.h"
 #include "lib/util/dlinklist.h"
 #include "system/network.h"
@@ -29,8 +29,6 @@
 #include "../include/ctdb_private.h"
 #include "lib/util/dlinklist.h"
 
-pid_t ctdbd_pid;
-
 /*
   allocate a packet for use in client<->daemon communication
  */
@@ -56,7 +54,7 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
        hdr->length       = length;
        hdr->operation    = operation;
        hdr->ctdb_magic   = CTDB_MAGIC;
-       hdr->ctdb_version = CTDB_VERSION;
+       hdr->ctdb_version = CTDB_PROTOCOL;
        hdr->srcnode      = ctdb->pnn;
        if (ctdb->vnn_map) {
                hdr->generation = ctdb->vnn_map->generation;
@@ -218,7 +216,7 @@ void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
                goto done;
        }
 
-       if (hdr->ctdb_version != CTDB_VERSION) {
+       if (hdr->ctdb_version != CTDB_PROTOCOL) {
                ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
                goto done;
        }
@@ -253,7 +251,7 @@ int ctdb_socket_connect(struct ctdb_context *ctdb)
 
        memset(&addr, 0, sizeof(addr));
        addr.sun_family = AF_UNIX;
-       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+       strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
 
        ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
        if (ctdb->daemon.sd == -1) {
@@ -511,6 +509,42 @@ int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid
        return 0;
 }
 
+/*
+ * check server ids
+ */
+int ctdb_client_check_message_handlers(struct ctdb_context *ctdb, uint64_t *ids, uint32_t num,
+                                      uint8_t *result)
+{
+       TDB_DATA indata, outdata;
+       int res;
+       int32_t status;
+       int i;
+
+       indata.dptr = (uint8_t *)ids;
+       indata.dsize = num * sizeof(*ids);
+
+       res = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_CHECK_SRVIDS, 0,
+                          indata, ctdb, &outdata, &status, NULL, NULL);
+       if (res != 0 || status != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to check srvids\n"));
+               return -1;
+       }
+
+       if (outdata.dsize != num*sizeof(uint8_t)) {
+               DEBUG(DEBUG_ERR, (__location__ " expected %lu bytes, received %zi bytes\n",
+                                 (long unsigned int)num*sizeof(uint8_t),
+                                 outdata.dsize));
+               talloc_free(outdata.dptr);
+               return -1;
+       }
+
+       for (i=0; i<num; i++) {
+               result[i] = outdata.dptr[i];
+       }
+
+       talloc_free(outdata.dptr);
+       return 0;
+}
 
 /*
   send a message - from client context
@@ -673,6 +707,21 @@ again:
                goto again;
        }
 
+       /* if this is a request for read/write and we have delegations
+          we have to revoke all delegations first
+       */
+       if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
+           (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+               ctdb_ltdb_unlock(ctdb_db, key);
+               ret = ctdb_client_force_migration(ctdb_db, key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+                       talloc_free(h);
+                       return NULL;
+               }
+               goto again;
+       }
+
        DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
        return h;
 }
@@ -1197,6 +1246,61 @@ int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ct
        return 0;
 }
 
+/*
+ * get db statistics
+ */
+int ctdb_ctrl_dbstatistics(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
+                          TALLOC_CTX *mem_ctx, struct ctdb_db_statistics **dbstat)
+{
+       int ret;
+       TDB_DATA indata, outdata;
+       int32_t res;
+       struct ctdb_db_statistics *wire, *s;
+       char *ptr;
+       int i;
+
+       indata.dptr = (uint8_t *)&dbid;
+       indata.dsize = sizeof(dbid);
+
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_STATISTICS,
+                          0, indata, ctdb, &outdata, &res, NULL, NULL);
+       if (ret != 0 || res != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " ctdb_control for dbstatistics failed\n"));
+               return -1;
+       }
+
+       if (outdata.dsize < offsetof(struct ctdb_db_statistics, hot_keys_wire)) {
+               DEBUG(DEBUG_ERR,(__location__ " Wrong dbstatistics size %zi - expected >= %lu\n",
+                                outdata.dsize,
+                                (long unsigned int)sizeof(struct ctdb_statistics)));
+               return -1;
+       }
+
+       s = talloc_zero(mem_ctx, struct ctdb_db_statistics);
+       if (s == NULL) {
+               talloc_free(outdata.dptr);
+               CTDB_NO_MEMORY(ctdb, s);
+       }
+
+       wire = (struct ctdb_db_statistics *)outdata.dptr;
+       *s = *wire;
+       ptr = &wire->hot_keys_wire[0];
+       for (i=0; i<wire->num_hot_keys; i++) {
+               s->hot_keys[i].key.dptr = talloc_size(mem_ctx, s->hot_keys[i].key.dsize);
+               if (s->hot_keys[i].key.dptr == NULL) {
+                       talloc_free(outdata.dptr);
+                       CTDB_NO_MEMORY(ctdb, s->hot_keys[i].key.dptr);
+               }
+
+               memcpy(s->hot_keys[i].key.dptr, ptr, s->hot_keys[i].key.dsize);
+               ptr += wire->hot_keys[i].key.dsize;
+       }
+
+       talloc_free(outdata.dptr);
+       *dbstat = s;
+       return 0;
+}
+
 /*
   shutdown a remote ctdb node
  */
@@ -1769,6 +1873,40 @@ int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
        return 0;
 }
 
+/*
+ * get db sequence number
+ */
+int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
+                         uint32_t destnode, uint32_t dbid, uint64_t *seqnum)
+{
+       int ret;
+       int32_t res;
+       TDB_DATA data, outdata;
+
+       data.dptr = (uint8_t *)&dbid;
+       data.dsize = sizeof(uint64_t);  /* This is just wrong */
+
+       ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DB_SEQNUM,
+                          0, data, ctdb, &outdata, &res, &timeout, NULL);
+       if (ret != 0 || res != 0) {
+               DEBUG(DEBUG_ERR,("ctdb_control for getdbesqnum failed\n"));
+               return -1;
+       }
+
+       if (outdata.dsize != sizeof(uint64_t)) {
+               DEBUG(DEBUG_ERR,("Invalid return data in get_dbseqnum\n"));
+               talloc_free(outdata.dptr);
+               return -1;
+       }
+
+       if (seqnum != NULL) {
+               *seqnum = *(uint64_t *)outdata.dptr;
+       }
+       talloc_free(outdata.dptr);
+
+       return 0;
+}
+
 /*
   create a database
  */
@@ -1788,6 +1926,12 @@ int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32
                tdb_flags = TDB_INCOMPATIBLE_HASH;
        }
 
+#ifdef TDB_MUTEX_LOCKING
+       if (!persistent && ctdb->tunable.mutex_enabled == 1) {
+               tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+       }
+#endif
+
        ret = ctdb_control(ctdb, destnode, tdb_flags,
                           persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
                           0, data, 
@@ -1911,6 +2055,9 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
        TDB_DATA data;
        int ret;
        int32_t res;
+#ifdef TDB_MUTEX_LOCKING
+       uint32_t mutex_enabled = 0;
+#endif
 
        ctdb_db = ctdb_db_handle(ctdb, name);
        if (ctdb_db) {
@@ -1935,6 +2082,22 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
                tdb_flags |= TDB_INCOMPATIBLE_HASH;
        }
 
+#ifdef TDB_MUTEX_LOCKING
+       if (!persistent) {
+               ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
+                                           CTDB_CURRENT_NODE,
+                                           "TDBMutexEnabled",
+                                           &mutex_enabled);
+               if (ret != 0) {
+                       DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
+               }
+
+               if (mutex_enabled == 1) {
+                       tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+               }
+       }
+#endif
+
        /* tell ctdb daemon to attach */
        ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
                           persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
@@ -1955,13 +2118,23 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
                return NULL;
        }
 
-       tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
+       if (persistent) {
+               tdb_flags = TDB_DEFAULT;
+       } else {
+               tdb_flags = TDB_NOSYNC;
+#ifdef TDB_MUTEX_LOCKING
+               if (mutex_enabled) {
+                       tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+               }
+#endif
+       }
        if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
        tdb_flags |= TDB_DISALLOW_NESTING;
 
-       ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
+       ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
+                                     O_RDWR, 0);
        if (ctdb_db->ltdb == NULL) {
                ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
                talloc_free(ctdb_db);
@@ -1980,6 +2153,25 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
        return ctdb_db;
 }
 
+/*
+ * detach from a specific database - client call
+ */
+int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
+{
+       int ret;
+       int32_t status;
+       TDB_DATA data;
+
+       data.dsize = sizeof(db_id);
+       data.dptr = (uint8_t *)&db_id;
+
+       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
+                          0, data, NULL, NULL, &status, NULL, NULL);
+       if (ret != 0 || status != 0) {
+               return -1;
+       }
+       return 0;
+}
 
 /*
   setup a call for a database
@@ -3205,7 +3397,7 @@ struct ctdb_context *ctdb_init(struct event_context *ev)
        ctdb->lastid = INT_MAX-200;
        CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
 
-       ret = ctdb_set_socketname(ctdb, CTDB_PATH);
+       ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
        if (ret != 0) {
                DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
                talloc_free(ctdb);
@@ -3319,6 +3511,9 @@ static void async_callback(struct ctdb_client_control_state *state)
        int32_t res = -1;
        uint32_t destnode = state->c->hdr.destnode;
 
+       outdata.dsize = 0;
+       outdata.dptr = NULL;
+
        /* one more node has responded with recmode data */
        data->count--;
 
@@ -3605,189 +3800,366 @@ int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout,
        return ret;
 }
 
-/**
- * check whether a transaction is active on a given db on a given node
- */
-int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
-                                    uint32_t destnode,
-                                    uint32_t db_id)
+struct server_id {
+       uint64_t pid;
+       uint32_t task_id;
+       uint32_t vnn;
+       uint64_t unique_id;
+};
+
+static struct server_id server_id_get(struct ctdb_context *ctdb, uint32_t reqid)
 {
-       int32_t status;
-       int ret;
-       TDB_DATA indata;
+       struct server_id id;
 
-       indata.dptr = (uint8_t *)&db_id;
-       indata.dsize = sizeof(db_id);
+       id.pid = getpid();
+       id.task_id = reqid;
+       id.vnn = ctdb_get_pnn(ctdb);
+       id.unique_id = id.vnn;
+       id.unique_id = (id.unique_id << 32) | reqid;
 
-       ret = ctdb_control(ctdb, destnode, 0,
-                          CTDB_CONTROL_TRANS2_ACTIVE,
-                          0, indata, NULL, NULL, &status,
-                          NULL, NULL);
+       return id;
+}
 
+/* This is basically a copy from Samba's server_id.*.  However, a
+ * dependency chain stops us from using Samba's version, so use a
+ * renamed copy until a better solution is found. */
+static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
+{
+       if (id1->pid != id2->pid) {
+               return false;
+       }
+
+       if (id1->task_id != id2->task_id) {
+               return false;
+       }
+
+       if (id1->vnn != id2->vnn) {
+               return false;
+       }
+
+       if (id1->unique_id != id2->unique_id) {
+               return false;
+       }
+
+       return true;
+}
+
+static bool server_id_exists(struct ctdb_context *ctdb, struct server_id *id)
+{
+       struct ctdb_server_id sid;
+       int ret;
+       uint32_t result;
+
+       sid.type = SERVER_TYPE_SAMBA;
+       sid.pnn = id->vnn;
+       sid.server_id = id->pid;
+
+       ret = ctdb_ctrl_check_server_id(ctdb, timeval_current_ofs(3,0),
+                                       id->vnn, &sid, &result);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
-               return -1;
+               /* If control times out, assume server_id exists. */
+               return true;
        }
 
-       return status;
+       if (result) {
+               return true;
+       }
+
+       return false;
 }
 
 
-struct ctdb_transaction_handle {
-       struct ctdb_db_context *ctdb_db;
-       bool in_replay;
-       /*
-        * we store the reads and writes done under a transaction:
-        * - one list stores both reads and writes (m_all),
-        * - the other just writes (m_write)
-        */
-       struct ctdb_marshall_buffer *m_all;
-       struct ctdb_marshall_buffer *m_write;
+enum g_lock_type {
+       G_LOCK_READ = 0,
+       G_LOCK_WRITE = 1,
 };
 
-/* start a transaction on a database */
-static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
+struct g_lock_rec {
+       enum g_lock_type type;
+       struct server_id id;
+};
+
+struct g_lock_recs {
+       unsigned int num;
+       struct g_lock_rec *lock;
+};
+
+static bool g_lock_parse(TALLOC_CTX *mem_ctx, TDB_DATA data,
+                        struct g_lock_recs **locks)
 {
-       tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
-       return 0;
+       struct g_lock_recs *recs;
+
+       recs = talloc_zero(mem_ctx, struct g_lock_recs);
+       if (recs == NULL) {
+               return false;
+       }
+
+       if (data.dsize == 0) {
+               goto done;
+       }
+
+       if (data.dsize % sizeof(struct g_lock_rec) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
+                                 (unsigned long)data.dsize));
+               talloc_free(recs);
+               return false;
+       }
+
+       recs->num = data.dsize / sizeof(struct g_lock_rec);
+       recs->lock = talloc_memdup(mem_ctx, data.dptr, data.dsize);
+       if (recs->lock == NULL) {
+               talloc_free(recs);
+               return false;
+       }
+
+done:
+       if (locks != NULL) {
+               *locks = recs;
+       }
+
+       return true;
 }
 
-/* start a transaction on a database */
-static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
+
+static bool g_lock_lock(TALLOC_CTX *mem_ctx,
+                       struct ctdb_db_context *ctdb_db,
+                       const char *keyname, uint32_t reqid)
 {
-       struct ctdb_record_handle *rh;
-       TDB_DATA key;
-       TDB_DATA data;
-       struct ctdb_ltdb_header header;
-       TALLOC_CTX *tmp_ctx;
-       const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
-       int ret;
-       struct ctdb_db_context *ctdb_db = h->ctdb_db;
-       pid_t pid;
-       int32_t status;
+       TDB_DATA key, data;
+       struct ctdb_record_handle *h;
+       struct g_lock_recs *locks;
+       struct server_id id;
+       struct timeval t_start;
+       int i;
 
-       key.dptr = discard_const(keyname);
-       key.dsize = strlen(keyname);
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
 
-       if (!ctdb_db->persistent) {
-               DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
-               return -1;
-       }
+       t_start = timeval_current();
 
 again:
-       tmp_ctx = talloc_new(h);
+       /* Keep trying for an hour. */
+       if (timeval_elapsed(&t_start) > 3600) {
+               return false;
+       }
 
-       rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
-       if (rh == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
+       if (h == NULL) {
+               return false;
        }
 
-       status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
-                                             CTDB_CURRENT_NODE,
-                                             ctdb_db->db_id);
-       if (status == 1) {
-               unsigned long int usec = (1000 + random()) % 100000;
-               DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
-                                   "on db_id[0x%08x]. waiting for %lu "
-                                   "microseconds\n",
-                                   ctdb_db->db_id, usec));
-               talloc_free(tmp_ctx);
-               usleep(usec);
+       if (!g_lock_parse(h, data, &locks)) {
+               DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
+               talloc_free(data.dptr);
+               talloc_free(h);
+               return false;
+       }
+
+       talloc_free(data.dptr);
+
+       id = server_id_get(ctdb_db->ctdb, reqid);
+
+       i = 0;
+       while (i < locks->num) {
+               if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
+                       /* Internal error */
+                       talloc_free(h);
+                       return false;
+               }
+
+               if (!server_id_exists(ctdb_db->ctdb, &locks->lock[i].id)) {
+                       if (i < locks->num-1) {
+                               locks->lock[i] = locks->lock[locks->num-1];
+                       }
+                       locks->num--;
+                       continue;
+               }
+
+               /* This entry is locked. */
+               DEBUG(DEBUG_INFO, ("g_lock: lock already granted for "
+                                  "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
+                                  (unsigned long long)id.pid,
+                                  id.task_id, id.vnn,
+                                  (unsigned long long)id.unique_id));
+               talloc_free(h);
                goto again;
        }
 
-       /*
-        * store the pid in the database:
-        * it is not enough that the node is dmaster...
-        */
-       pid = getpid();
-       data.dptr = (unsigned char *)&pid;
-       data.dsize = sizeof(pid_t);
-       rh->header.rsn++;
-       rh->header.dmaster = ctdb_db->ctdb->pnn;
-       ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
-                                 "transaction record\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
+                                    locks->num+1);
+       if (locks->lock == NULL) {
+               talloc_free(h);
+               return false;
        }
 
-       talloc_free(rh);
+       locks->lock[locks->num].type = G_LOCK_WRITE;
+       locks->lock[locks->num].id = id;
+       locks->num++;
 
-       ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
-               talloc_free(tmp_ctx);
-               return -1;
+       data.dptr = (uint8_t *)locks->lock;
+       data.dsize = locks->num * sizeof(struct g_lock_rec);
+
+       if (ctdb_record_store(h, data) != 0) {
+               DEBUG(DEBUG_ERR, ("g_lock: failed to write transaction lock for "
+                                 "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
+                                 (unsigned long long)id.pid,
+                                 id.task_id, id.vnn,
+                                 (unsigned long long)id.unique_id));
+               talloc_free(h);
+               return false;
        }
 
-       ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
-                                "lock record inside transaction\n"));
-               tdb_transaction_cancel(ctdb_db->ltdb->tdb);
-               talloc_free(tmp_ctx);
-               goto again;
+       DEBUG(DEBUG_INFO, ("g_lock: lock granted for "
+                          "pid=0x%llx taskid=%x vnn=%d id=0x%llx\n",
+                          (unsigned long long)id.pid,
+                          id.task_id, id.vnn,
+                          (unsigned long long)id.unique_id));
+
+       talloc_free(h);
+       return true;
+}
+
+static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
+                         struct ctdb_db_context *ctdb_db,
+                         const char *keyname, uint32_t reqid)
+{
+       TDB_DATA key, data;
+       struct ctdb_record_handle *h;
+       struct g_lock_recs *locks;
+       struct server_id id;
+       int i;
+       bool found = false;
+
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
+       h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
+       if (h == NULL) {
+               return false;
        }
 
-       if (header.dmaster != ctdb_db->ctdb->pnn) {
-               DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
-                                  "transaction lock record\n"));
-               tdb_transaction_cancel(ctdb_db->ltdb->tdb);
-               talloc_free(tmp_ctx);
-               goto again;
+       if (!g_lock_parse(h, data, &locks)) {
+               DEBUG(DEBUG_ERR, ("g_lock: error parsing locks\n"));
+               talloc_free(data.dptr);
+               talloc_free(h);
+               return false;
        }
 
-       if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
-               DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
-                                   "the transaction lock record\n"));
-               tdb_transaction_cancel(ctdb_db->ltdb->tdb);
-               talloc_free(tmp_ctx);
-               goto again;
+       talloc_free(data.dptr);
+
+       id = server_id_get(ctdb_db->ctdb, reqid);
+
+       for (i=0; i<locks->num; i++) {
+               if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
+                       if (i < locks->num-1) {
+                               locks->lock[i] = locks->lock[locks->num-1];
+                       }
+                       locks->num--;
+                       found = true;
+                       break;
+               }
        }
 
-       talloc_free(tmp_ctx);
+       if (!found) {
+               DEBUG(DEBUG_ERR, ("g_lock: lock not found\n"));
+               talloc_free(h);
+               return false;
+       }
+
+       data.dptr = (uint8_t *)locks->lock;
+       data.dsize = locks->num * sizeof(struct g_lock_rec);
+
+       if (ctdb_record_store(h, data) != 0) {
+               talloc_free(h);
+               return false;
+       }
+
+       talloc_free(h);
+       return true;
+}
+
 
+struct ctdb_transaction_handle {
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_db_context *g_lock_db;
+       char *lock_name;
+       uint32_t reqid;
+       /*
+        * we store reads and writes done under a transaction:
+        * - one list stores both reads and writes (m_all)
+        * - the other just writes (m_write)
+        */
+       struct ctdb_marshall_buffer *m_all;
+       struct ctdb_marshall_buffer *m_write;
+};
+
+static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
+{
+       g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
+       ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
        return 0;
 }
 
 
-/* start a transaction on a database */
+/**
+ * start a transaction on a database
+ */
 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
                                                       TALLOC_CTX *mem_ctx)
 {
        struct ctdb_transaction_handle *h;
-       int ret;
+       struct ctdb_server_id id;
 
        h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
        if (h == NULL) {
-               DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
+               DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
                return NULL;
        }
 
        h->ctdb_db = ctdb_db;
+       h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
+                                      (unsigned int)ctdb_db->db_id);
+       if (h->lock_name == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
+               talloc_free(h);
+               return NULL;
+       }
 
-       ret = ctdb_transaction_fetch_start(h);
-       if (ret != 0) {
+       h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
+                                  "g_lock.tdb", false, 0);
+       if (!h->g_lock_db) {
+               DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
                talloc_free(h);
                return NULL;
        }
 
-       talloc_set_destructor(h, ctdb_transaction_destructor);
+       id.type = SERVER_TYPE_SAMBA;
+       id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
+       id.server_id = getpid();
 
-       return h;
-}
+       if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
+                                        &id) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
+               talloc_free(h);
+               return NULL;
+       }
 
+       h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
 
+       if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
+               DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
+               talloc_free(h);
+               return NULL;
+       }
 
-/*
-  fetch a record inside a transaction
+       talloc_set_destructor(h, ctdb_transaction_destructor);
+       return h;
+}
+
+/**
+ * fetch a record inside a transaction
  */
-int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
-                          TALLOC_CTX *mem_ctx, 
+int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
+                          TALLOC_CTX *mem_ctx,
                           TDB_DATA key, TDB_DATA *data)
 {
        struct ctdb_ltdb_header header;
@@ -3801,26 +4173,24 @@ int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
                *data = tdb_null;
                ret = 0;
        }
-       
+
        if (ret != 0) {
                return ret;
        }
 
-       if (!h->in_replay) {
-               h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
-               if (h->m_all == NULL) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
-                       return -1;
-               }
+       h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+       if (h->m_all == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+               return -1;
        }
 
        return 0;
 }
 
-/*
-  stores a record inside a transaction
+/**
* stores a record inside a transaction
  */
-int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
+int ctdb_transaction_store(struct ctdb_transaction_handle *h,
                           TDB_DATA key, TDB_DATA data)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(h);
@@ -3828,8 +4198,6 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h,
        TDB_DATA olddata;
        int ret;
 
-       ZERO_STRUCT(header);
-
        /* we need the header so we can update the RSN */
        ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
        if (ret == -1 && header.dmaster == (uint32_t)-1) {
@@ -3844,7 +4212,8 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h,
        }
 
        if (data.dsize == olddata.dsize &&
-           memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
+           memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
+           header.rsn != 0) {
                /* save writing the same data */
                talloc_free(tmp_ctx);
                return 0;
@@ -3853,14 +4222,12 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h,
        header.dmaster = h->ctdb_db->ctdb->pnn;
        header.rsn++;
 
-       if (!h->in_replay) {
-               h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
-               if (h->m_all == NULL) {
-                       DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
-       }               
+       h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+       if (h->m_all == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
 
        h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
        if (h->m_write == NULL) {
@@ -3868,187 +4235,142 @@ int ctdb_transaction_store(struct ctdb_transaction_handle *h,
                talloc_free(tmp_ctx);
                return -1;
        }
-       
-       ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
 
        talloc_free(tmp_ctx);
-       
-       return ret;
+       return 0;
 }
 
-/*
-  replay a transaction
- */
-static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
+static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
 {
-       int ret, i;
-       struct ctdb_rec_data *rec = NULL;
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key, data;
+       struct ctdb_ltdb_header header;
+       int ret;
 
-       h->in_replay = true;
-       talloc_free(h->m_write);
-       h->m_write = NULL;
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
 
-       ret = ctdb_transaction_fetch_start(h);
+       ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
        if (ret != 0) {
-               return ret;
+               *seqnum = 0;
+               return 0;
        }
 
-       for (i=0;i<h->m_all->count;i++) {
-               TDB_DATA key, data;
+       if (data.dsize == 0) {
+               *seqnum = 0;
+               return 0;
+       }
 
-               rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
-               if (rec == NULL) {
-                       DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
-                       goto failed;
-               }
+       if (data.dsize != sizeof(*seqnum)) {
+               DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
+                                 data.dsize));
+               talloc_free(data.dptr);
+               return -1;
+       }
 
-               if (rec->reqid == 0) {
-                       /* its a store */
-                       if (ctdb_transaction_store(h, key, data) != 0) {
-                               goto failed;
-                       }
-               } else {
-                       TDB_DATA data2;
-                       TALLOC_CTX *tmp_ctx = talloc_new(h);
+       *seqnum = *(uint64_t *)data.dptr;
+       talloc_free(data.dptr);
 
-                       if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
-                               talloc_free(tmp_ctx);
-                               goto failed;
-                       }
-                       if (data2.dsize != data.dsize ||
-                           memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
-                               /* the record has changed on us - we have to give up */
-                               talloc_free(tmp_ctx);
-                               goto failed;
-                       }
-                       talloc_free(tmp_ctx);
-               }
-       }
-       
        return 0;
-
-failed:
-       tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
-       return -1;
 }
 
 
-/*
-  commit a transaction
- */
-int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
+                               uint64_t seqnum)
 {
-       int ret, retries=0;
-       int32_t status;
-       struct ctdb_context *ctdb = h->ctdb_db->ctdb;
-       struct timeval timeout;
-       enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key, data;
 
-       talloc_set_destructor(h, NULL);
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
 
-       /* our commit strategy is quite complex.
+       data.dptr = (uint8_t *)&seqnum;
+       data.dsize = sizeof(seqnum);
 
-          - we first try to commit the changes to all other nodes
+       return ctdb_transaction_store(h, key, data);
+}
 
-          - if that works, then we commit locally and we are done
 
-          - if a commit on another node fails, then we need to cancel
-            the transaction, then restart the transaction (thus
-            opening a window of time for a pending recovery to
-            complete), then replay the transaction, checking all the
-            reads and writes (checking that reads give the same data,
-            and writes succeed). Then we retry the transaction to the
-            other nodes
-       */
+/**
+ * commit a transaction
+ */
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+       int ret;
+       uint64_t old_seqnum, new_seqnum;
+       int32_t status;
+       struct timeval timeout;
 
-again:
        if (h->m_write == NULL) {
                /* no changes were made */
-               tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
                talloc_free(h);
                return 0;
        }
 
-       /* tell ctdbd to commit to the other nodes */
-       timeout = timeval_current_ofs(1, 0);
-       ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
-                          retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
-                          ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
-                          &timeout, NULL);
-       if (ret != 0 || status != 0) {
-               tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
-               DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
-                                    ", retrying after 1 second...\n",
-                                    (retries==0)?"":"retry "));
-               sleep(1);
+       ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+               ret = -1;
+               goto done;
+       }
 
+       new_seqnum = old_seqnum + 1;
+       ret = ctdb_store_db_seqnum(h, new_seqnum);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
+               ret = -1;
+               goto done;
+       }
+
+again:
+       timeout = timeval_current_ofs(3,0);
+       ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
+                          h->ctdb_db->db_id,
+                          CTDB_CONTROL_TRANS3_COMMIT, 0,
+                          ctdb_marshall_finish(h->m_write), NULL, NULL,
+                          &status, &timeout, NULL);
+       if (ret != 0 || status != 0) {
+               /*
+                * TRANS3_COMMIT control will only fail if recovery has been
+                * triggered.  Check if the database has been updated or not.
+                */
+               ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
                if (ret != 0) {
-                       failure_control = CTDB_CONTROL_TRANS2_ERROR;
-               } else {
-                       /* work out what error code we will give if we 
-                          have to fail the operation */
-                       switch ((enum ctdb_trans2_commit_error)status) {
-                       case CTDB_TRANS2_COMMIT_SUCCESS:
-                       case CTDB_TRANS2_COMMIT_SOMEFAIL:
-                       case CTDB_TRANS2_COMMIT_TIMEOUT:
-                               failure_control = CTDB_CONTROL_TRANS2_ERROR;
-                               break;
-                       case CTDB_TRANS2_COMMIT_ALLFAIL:
-                               failure_control = CTDB_CONTROL_TRANS2_FINISHED;
-                               break;
-                       }
+                       DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+                       goto done;
                }
 
-               if (++retries == 100) {
-                       DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
-                                        h->ctdb_db->db_id, retries, (unsigned)failure_control));
-                       ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
-                                    failure_control, CTDB_CTRL_FLAG_NOREPLY, 
-                                    tdb_null, NULL, NULL, NULL, NULL, NULL);           
-                       talloc_free(h);
-                       return -1;
-               }               
-
-               if (ctdb_replay_transaction(h) != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
-                                         "transaction on db 0x%08x, "
-                                         "failure control =%u\n",
-                                         h->ctdb_db->db_id,
-                                         (unsigned)failure_control));
-                       ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
-                                    failure_control, CTDB_CTRL_FLAG_NOREPLY, 
-                                    tdb_null, NULL, NULL, NULL, NULL, NULL);           
-                       talloc_free(h);
-                       return -1;
+               if (new_seqnum == old_seqnum) {
+                       /* Database not yet updated, try again */
+                       goto again;
                }
-               goto again;
-       } else {
-               failure_control = CTDB_CONTROL_TRANS2_ERROR;
-       }
 
-       /* do the real commit locally */
-       ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
-       if (ret != 0) {
-               DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
-                                 "on db id 0x%08x locally, "
-                                 "failure_control=%u\n",
-                                 h->ctdb_db->db_id,
-                                 (unsigned)failure_control));
-               ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
-                            failure_control, CTDB_CTRL_FLAG_NOREPLY, 
-                            tdb_null, NULL, NULL, NULL, NULL, NULL);           
-               talloc_free(h);
-               return ret;
+               if (new_seqnum != (old_seqnum + 1)) {
+                       DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
+                                         (long long unsigned)new_seqnum,
+                                         (long long unsigned)old_seqnum));
+                       ret = -1;
+                       goto done;
+               }
        }
 
-       /* tell ctdbd that we are finished with our local commit */
-       ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
-                    CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
-                    tdb_null, NULL, NULL, NULL, NULL, NULL);
+       ret = 0;
+
+done:
+       talloc_free(h);
+       return ret;
+}
+
+/**
+ * cancel a transaction
+ */
+int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
+{
        talloc_free(h);
        return 0;
 }
 
+
 /*
   recovery daemon ping to main daemon
  */