s3:dbwrap_ctdb: start rewrite of transactions using the global lock (g_lock)
authorMichael Adam <obnox@samba.org>
Thu, 3 Dec 2009 16:29:54 +0000 (17:29 +0100)
committerKarolin Seeger <kseeger@samba.org>
Thu, 1 Apr 2010 07:39:14 +0000 (09:39 +0200)
This simplifies the transaction code a lot:

* transaction_start essentially consists of acquiring a global lock.

* No write operations at all are performed on the local database
  until the transaction is committed: Every store operation is just
  going into the marshall buffer.

* The commit operation calls a new simplified TRANS3_COMMIT control
  in ctdb which rolls out thae changes to all nodes including the
  node that is performing the transaction.

Michael
(cherry picked from commit 16bc6ba2268e3660d026076264de8666356e00bf)
(cherry picked from commit c2354aa193617ffc3e4036fe0b9034c2664901fd)

source3/lib/dbwrap_ctdb.c

index 8e188d0ab53a734106effb0dbbf1656627dade85..c32398acdb092c9dbc6e81d122487e71fd11fe48 100644 (file)
 #include "ctdb.h"
 #include "ctdb_private.h"
 #include "ctdbd_conn.h"
+#include "g_lock.h"
 
 struct db_ctdb_transaction_handle {
        struct db_ctdb_ctx *ctx;
-       bool in_replay;
        /*
         * we store the reads and writes done under a transaction:
         * - one list stores both reads and writes (m_all),
@@ -35,6 +35,7 @@ struct db_ctdb_transaction_handle {
        struct ctdb_marshall_buffer *m_write;
        uint32_t nesting;
        bool nested_cancel;
+       char *lock_name;
 };
 
 struct db_ctdb_ctx {
@@ -42,6 +43,7 @@ struct db_ctdb_ctx {
        struct tdb_wrap *wtdb;
        uint32 db_id;
        struct db_ctdb_transaction_handle *transaction;
+       struct g_lock_ctx *lock_ctx;
 };
 
 struct db_ctdb_rec {
@@ -297,145 +299,21 @@ static struct ctdb_rec_data *db_ctdb_marshall_loop_next(struct ctdb_marshall_buf
        return r;
 }
 
-
-static int32_t db_ctdb_transaction_active(uint32_t db_id)
-{
-       int32_t status;
-       NTSTATUS ret;
-       TDB_DATA indata;
-
-       indata.dptr = (uint8_t *)&db_id;
-       indata.dsize = sizeof(db_id);
-
-       ret = ctdbd_control_local(messaging_ctdbd_connection(),
-                                 CTDB_CONTROL_TRANS2_ACTIVE, 0, 0,
-                                 indata, NULL, NULL, &status);
-
-       if (!NT_STATUS_IS_OK(ret)) {
-               DEBUG(2, ("ctdb control TRANS2_ACTIVE failed\n"));
-               return -1;
-       }
-
-       return status;
-}
-
-
 /**
  * CTDB transaction destructor
  */
 static int db_ctdb_transaction_destructor(struct db_ctdb_transaction_handle *h)
 {
-       tdb_transaction_cancel(h->ctx->wtdb->tdb);
-       return 0;
-}
-
-/**
- * start a transaction on a ctdb database:
- * - lock the transaction lock key
- * - start the tdb transaction
- */
-static int db_ctdb_transaction_fetch_start(struct db_ctdb_transaction_handle *h)
-{
-       struct db_record *rh;
-       struct db_ctdb_rec *crec;
-       TDB_DATA key;
-       TALLOC_CTX *tmp_ctx;
-       const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
-       int ret;
-       struct db_ctdb_ctx *ctx = h->ctx;
-       TDB_DATA data;
-       pid_t pid;
        NTSTATUS status;
-       struct ctdb_ltdb_header header;
-       int32_t transaction_status;
 
-       key.dptr = (uint8_t *)discard_const(keyname);
-       key.dsize = strlen(keyname);
-
-again:
-       tmp_ctx = talloc_new(h);
-
-       rh = fetch_locked_internal(ctx, tmp_ctx, key, true);
-       if (rh == NULL) {
-               DEBUG(0,(__location__ " Failed to fetch_lock database\n"));             
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-       crec = talloc_get_type_abort(rh->private_data, struct db_ctdb_rec);
-
-       transaction_status = db_ctdb_transaction_active(ctx->db_id);
-       if (transaction_status == 1) {
-               unsigned long int usec = (1000 + random()) % 100000;
-               DEBUG(3, ("Transaction already active on db_id[0x%08x]."
-                         "Re-trying after %lu microseconds...",
-                         ctx->db_id, usec));
-               talloc_free(tmp_ctx);
-               usleep(usec);
-               goto again;
-       }
-
-       /*
-        * store the pid in the database:
-        * it is not enought that the node is dmaster...
-        */
-       pid = getpid();
-       data.dptr = (unsigned char *)&pid;
-       data.dsize = sizeof(pid_t);
-       crec->header.rsn++;
-       crec->header.dmaster = get_my_vnn();
-       status = db_ctdb_ltdb_store(ctx, key, &(crec->header), data);
+       status = g_lock_unlock(h->ctx->lock_ctx, h->lock_name);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, (__location__ " Failed to store pid in transaction "
-                         "record: %s\n", nt_errstr(status)));
-               talloc_free(tmp_ctx);
+               DEBUG(0, ("g_lock_unlock failed: %s\n", nt_errstr(status)));
                return -1;
        }
-
-       talloc_free(rh);
-
-       ret = tdb_transaction_start(ctx->wtdb->tdb);
-       if (ret != 0) {
-               DEBUG(0,(__location__ " Failed to start tdb transaction\n"));
-               talloc_free(tmp_ctx);
-               return -1;
-       }
-
-       status = db_ctdb_ltdb_fetch(ctx, key, &header, tmp_ctx, &data);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, (__location__ " failed to refetch transaction lock "
-                         "record inside transaction: %s - retrying\n",
-                         nt_errstr(status)));
-               tdb_transaction_cancel(ctx->wtdb->tdb);
-               talloc_free(tmp_ctx);
-               goto again;
-       }
-
-       if (header.dmaster != get_my_vnn()) {
-               DEBUG(3, (__location__ " refetch transaction lock record : "
-                         "we are not dmaster any more "
-                         "(dmaster[%u] != my_vnn[%u]) - retrying\n",
-                         header.dmaster, get_my_vnn()));
-               tdb_transaction_cancel(ctx->wtdb->tdb);
-               talloc_free(tmp_ctx);
-               goto again;
-       }
-
-       if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
-               DEBUG(3, (__location__ " refetch transaction lock record: "
-                         "another local process has started a transaction "
-                         "(stored pid [%u] != my pid [%u]) - retrying\n",
-                         *(pid_t *)(data.dptr), pid));
-               tdb_transaction_cancel(ctx->wtdb->tdb);
-               talloc_free(tmp_ctx);
-               goto again;
-       }
-
-       talloc_free(tmp_ctx);
-
        return 0;
 }
 
-
 /**
  * CTDB dbwrap API: transaction_start function
  * starts a transaction on a persistent database
@@ -443,7 +321,7 @@ again:
 static int db_ctdb_transaction_start(struct db_context *db)
 {
        struct db_ctdb_transaction_handle *h;
-       int ret;
+       NTSTATUS status;
        struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                        struct db_ctdb_ctx);
 
@@ -466,9 +344,22 @@ static int db_ctdb_transaction_start(struct db_context *db)
 
        h->ctx = ctx;
 
-       ret = db_ctdb_transaction_fetch_start(h);
-       if (ret != 0) {
-               talloc_free(h);
+       h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
+                                      (unsigned int)ctx->db_id);
+       if (h->lock_name == NULL) {
+               DEBUG(0, ("talloc_asprintf failed\n"));
+               TALLOC_FREE(h);
+               return -1;
+       }
+
+       /*
+        * Wait a day, i.e. forever...
+        */
+       status = g_lock_lock(ctx->lock_ctx, h->lock_name, G_LOCK_WRITE,
+                            timeval_set(86400, 0));
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(0, ("g_lock_lock failed: %s\n", nt_errstr(status)));
+               TALLOC_FREE(h);
                return -1;
        }
 
@@ -481,7 +372,65 @@ static int db_ctdb_transaction_start(struct db_context *db)
        return 0;
 }
 
+static bool pull_newest_from_marshall_buffer(struct ctdb_marshall_buffer *buf,
+                                            TDB_DATA key,
+                                            struct ctdb_ltdb_header *pheader,
+                                            TALLOC_CTX *mem_ctx,
+                                            TDB_DATA *pdata)
+{
+       struct ctdb_rec_data *rec = NULL;
+       struct ctdb_ltdb_header h;
+       bool found;
+       TDB_DATA data;
+       int i;
+
+       if (buf == NULL) {
+               return false;
+       }
+
+       /*
+        * Walk the list of records written during this
+        * transaction. If we want to read one we have already
+        * written, return the last written sample. Thus we do not do
+        * a "break;" for the first hit, this record might have been
+        * overwritten later.
+        */
+
+       for (i=0; i<buf->count; i++) {
+               TDB_DATA tkey, tdata;
+               uint32_t reqid;
+
+               rec = db_ctdb_marshall_loop_next(buf, rec, &reqid, &h, &tkey,
+                                                &tdata);
+               if (rec == NULL) {
+                       return false;
+               }
+
+               if (tdb_data_equal(key, tkey)) {
+                       found = true;
+                       data = tdata;
+               }
+       }
+
+       if (!found) {
+               return false;
+       }
+
+       if (pdata != NULL) {
+               data.dptr = (uint8_t *)talloc_memdup(mem_ctx, data.dptr,
+                                                    data.dsize);
+               if ((data.dsize != 0) && (data.dptr == NULL)) {
+                       return false;
+               }
+               *pdata = data;
+       }
+
+       if (pheader != NULL) {
+               *pheader = h;
+       }
 
+       return true;
+}
 
 /*
   fetch a record inside a transaction
@@ -492,6 +441,13 @@ static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
 {
        struct db_ctdb_transaction_handle *h = db->transaction;
        NTSTATUS status;
+       bool found;
+
+       found = pull_newest_from_marshall_buffer(h->m_write, key, NULL,
+                                                mem_ctx, data);
+       if (found) {
+               return 0;
+       }
 
        status = db_ctdb_ltdb_fetch(h->ctx, key, NULL, mem_ctx, data);
 
@@ -501,14 +457,14 @@ static int db_ctdb_transaction_fetch(struct db_ctdb_ctx *db,
                return -1;
        }
 
-       if (!h->in_replay) {
-               h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key, NULL, *data);
-               if (h->m_all == NULL) {
-                       DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
-                       data->dsize = 0;
-                       talloc_free(data->dptr);
-                       return -1;
-               }
+       h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 1, key,
+                                       NULL, *data);
+       if (h->m_all == NULL) {
+               DEBUG(0,(__location__ " Failed to add to marshalling "
+                        "record\n"));
+               data->dsize = 0;
+               talloc_free(data->dptr);
+               return -1;
        }
 
        return 0;
@@ -543,6 +499,11 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
        result->store = db_ctdb_store_transaction;
        result->delete_rec = db_ctdb_delete_transaction;
 
+       if (pull_newest_from_marshall_buffer(ctx->transaction->m_write, key,
+                                            NULL, result, &result->value)) {
+               return result;
+       }
+
        ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
        if (ctdb_data.dptr == NULL) {
                /* create the record */
@@ -619,27 +580,35 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
                                     TDB_DATA key, TDB_DATA data)
 {
        TALLOC_CTX *tmp_ctx = talloc_new(h);
-       int ret;
        TDB_DATA rec;
        struct ctdb_ltdb_header header;
-       NTSTATUS status;
+
+       ZERO_STRUCT(header);
 
        /* we need the header so we can update the RSN */
-       rec = tdb_fetch(h->ctx->wtdb->tdb, key);
-       if (rec.dptr == NULL) {
-               /* the record doesn't exist - create one with us as dmaster.
-                  This is only safe because we are in a transaction and this
-                  is a persistent database */
-               ZERO_STRUCT(header);
-       } else {
-               memcpy(&header, rec.dptr, sizeof(struct ctdb_ltdb_header));
-               rec.dsize -= sizeof(struct ctdb_ltdb_header);
-               /* a special case, we are writing the same data that is there now */
-               if (data.dsize == rec.dsize &&
-                   memcmp(data.dptr, rec.dptr + sizeof(struct ctdb_ltdb_header), data.dsize) == 0) {
-                       SAFE_FREE(rec.dptr);
-                       talloc_free(tmp_ctx);
-                       return 0;
+
+       if (!pull_newest_from_marshall_buffer(h->m_write, key, &header,
+                                             NULL, NULL)) {
+
+               rec = tdb_fetch(h->ctx->wtdb->tdb, key);
+
+               if (rec.dptr != NULL) {
+                       memcpy(&header, rec.dptr,
+                              sizeof(struct ctdb_ltdb_header));
+                       rec.dsize -= sizeof(struct ctdb_ltdb_header);
+
+                       /*
+                        * a special case, we are writing the same
+                        * data that is there now
+                        */
+                       if (data.dsize == rec.dsize &&
+                           memcmp(data.dptr,
+                                  rec.dptr + sizeof(struct ctdb_ltdb_header),
+                                  data.dsize) == 0) {
+                               SAFE_FREE(rec.dptr);
+                               talloc_free(tmp_ctx);
+                               return 0;
+                       }
                }
                SAFE_FREE(rec.dptr);
        }
@@ -647,13 +616,13 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
        header.dmaster = get_my_vnn();
        header.rsn++;
 
-       if (!h->in_replay) {
-               h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key, NULL, data);
-               if (h->m_all == NULL) {
-                       DEBUG(0,(__location__ " Failed to add to marshalling record\n"));
-                       talloc_free(tmp_ctx);
-                       return -1;
-               }
+       h->m_all = db_ctdb_marshall_add(h, h->m_all, h->ctx->db_id, 0, key,
+                                       NULL, data);
+       if (h->m_all == NULL) {
+               DEBUG(0,(__location__ " Failed to add to marshalling "
+                        "record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
        }
 
        h->m_write = db_ctdb_marshall_add(h, h->m_write, h->ctx->db_id, 0, key, &header, data);
@@ -663,16 +632,8 @@ static int db_ctdb_transaction_store(struct db_ctdb_transaction_handle *h,
                return -1;
        }
 
-       status = db_ctdb_ltdb_store(h->ctx, key, &header, data);
-       if (NT_STATUS_IS_OK(status)) {
-               ret = 0;
-       } else {
-               ret = -1;
-       }
-
        talloc_free(tmp_ctx);
-
-       return ret;
+       return 0;
 }
 
 
@@ -708,64 +669,6 @@ static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
        return NT_STATUS_OK;
 }
 
-
-/*
-  replay a transaction
- */
-static int ctdb_replay_transaction(struct db_ctdb_transaction_handle *h)
-{
-       int ret, i;
-       struct ctdb_rec_data *rec = NULL;
-
-       h->in_replay = true;
-       talloc_free(h->m_write);
-       h->m_write = NULL;
-
-       ret = db_ctdb_transaction_fetch_start(h);
-       if (ret != 0) {
-               return ret;
-       }
-
-       for (i=0;i<h->m_all->count;i++) {
-               TDB_DATA key, data;
-
-               rec = db_ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
-               if (rec == NULL) {
-                       DEBUG(0, (__location__ " Out of records in ctdb_replay_transaction?\n"));
-                       goto failed;
-               }
-
-               if (rec->reqid == 0) {
-                       /* its a store */
-                       if (db_ctdb_transaction_store(h, key, data) != 0) {
-                               goto failed;
-                       }
-               } else {
-                       TDB_DATA data2;
-                       TALLOC_CTX *tmp_ctx = talloc_new(h);
-
-                       if (db_ctdb_transaction_fetch(h->ctx, tmp_ctx, key, &data2) != 0) {
-                               talloc_free(tmp_ctx);
-                               goto failed;
-                       }
-                       if (data2.dsize != data.dsize ||
-                           memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
-                               /* the record has changed on us - we have to give up */
-                               talloc_free(tmp_ctx);
-                               goto failed;
-                       }
-                       talloc_free(tmp_ctx);
-               }
-       }
-
-       return 0;
-
-failed:
-       tdb_transaction_cancel(h->ctx->wtdb->tdb);
-       return -1;
-}
-
-
 /*
   commit a transaction
  */
@@ -774,11 +677,8 @@ static int db_ctdb_transaction_commit(struct db_context *db)
        struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                        struct db_ctdb_ctx);
        NTSTATUS rets;
-       int ret;
        int status;
-       int retries = 0;
        struct db_ctdb_transaction_handle *h = ctx->transaction;
-       enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
 
        if (h == NULL) {
                DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
@@ -798,102 +698,29 @@ static int db_ctdb_transaction_commit(struct db_context *db)
 
        DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
 
-       talloc_set_destructor(h, NULL);
-
-       /* our commit strategy is quite complex.
-
-          - we first try to commit the changes to all other nodes
-
-          - if that works, then we commit locally and we are done
-
-          - if a commit on another node fails, then we need to cancel
-            the transaction, then restart the transaction (thus
-            opening a window of time for a pending recovery to
-            complete), then replay the transaction, checking all the
-            reads and writes (checking that reads give the same data,
-            and writes succeed). Then we retry the transaction to the
-            other nodes
-       */
-
 again:
        if (h->m_write == NULL) {
                /* no changes were made, potentially after a retry */
-               tdb_transaction_cancel(h->ctx->wtdb->tdb);
-               talloc_free(h);
-               ctx->transaction = NULL;
-               return 0;
+               goto done;
        }
 
        /* tell ctdbd to commit to the other nodes */
-       rets = ctdbd_control_local(messaging_ctdbd_connection(), 
-                                  retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 
+       rets = ctdbd_control_local(messaging_ctdbd_connection(),
+                                  CTDB_CONTROL_TRANS3_COMMIT,
                                   h->ctx->db_id, 0,
-                                  db_ctdb_marshall_finish(h->m_write), NULL, NULL, &status);
+                                  db_ctdb_marshall_finish(h->m_write),
+                                  NULL, NULL, &status);
        if (!NT_STATUS_IS_OK(rets) || status != 0) {
-               tdb_transaction_cancel(h->ctx->wtdb->tdb);
-               sleep(1);
-
-               if (!NT_STATUS_IS_OK(rets)) {
-                       failure_control = CTDB_CONTROL_TRANS2_ERROR;                    
-               } else {
-                       /* work out what error code we will give if we 
-                          have to fail the operation */
-                       switch ((enum ctdb_trans2_commit_error)status) {
-                       case CTDB_TRANS2_COMMIT_SUCCESS:
-                       case CTDB_TRANS2_COMMIT_SOMEFAIL:
-                       case CTDB_TRANS2_COMMIT_TIMEOUT:
-                               failure_control = CTDB_CONTROL_TRANS2_ERROR;
-                               break;
-                       case CTDB_TRANS2_COMMIT_ALLFAIL:
-                               failure_control = CTDB_CONTROL_TRANS2_FINISHED;
-                               break;
-                       }
-               }
-
-               if (++retries == 100) {
-                       DEBUG(0,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
-                                h->ctx->db_id, retries, (unsigned)failure_control));
-                       ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
-                                           h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
-                                           tdb_null, NULL, NULL, NULL);
-                       h->ctx->transaction = NULL;
-                       talloc_free(h);
-                       ctx->transaction = NULL;
-                       return -1;                      
-               }
-
-               if (ctdb_replay_transaction(h) != 0) {
-                       DEBUG(0,(__location__ " Failed to replay transaction failure_control=%u\n",
-                                (unsigned)failure_control));
-                       ctdbd_control_local(messaging_ctdbd_connection(), failure_control,
-                                           h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
-                                           tdb_null, NULL, NULL, NULL);
-                       h->ctx->transaction = NULL;
-                       talloc_free(h);
-                       ctx->transaction = NULL;
-                       return -1;
-               }
+               /*
+                * TODO:
+                * check the database sequence number and
+                * compare it to the seqnum after applying the
+                * marshall buffer. If it is the same: return success.
+                */
                goto again;
-       } else {
-               failure_control = CTDB_CONTROL_TRANS2_ERROR;
        }
 
-       /* do the real commit locally */
-       ret = tdb_transaction_commit(h->ctx->wtdb->tdb);
-       if (ret != 0) {
-               DEBUG(0,(__location__ " Failed to commit transaction failure_control=%u\n",
-                        (unsigned)failure_control));
-               ctdbd_control_local(messaging_ctdbd_connection(), failure_control, h->ctx->db_id, 
-                                   CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL);
-               h->ctx->transaction = NULL;
-               talloc_free(h);
-               return ret;
-       }
-
-       /* tell ctdbd that we are finished with our local commit */
-       ctdbd_control_local(messaging_ctdbd_connection(), CTDB_CONTROL_TRANS2_FINISHED, 
-                           h->ctx->db_id, CTDB_CTRL_FLAG_NOREPLY, 
-                           tdb_null, NULL, NULL, NULL);
+done:
        h->ctx->transaction = NULL;
        talloc_free(h);
        return 0;
@@ -1314,6 +1141,7 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        struct db_context *result;
        struct db_ctdb_ctx *db_ctdb;
        char *db_path;
+       struct ctdbd_connection *conn;
 
        if (!lp_clustering()) {
                DEBUG(10, ("Clustering disabled -- no ctdb\n"));
@@ -1335,13 +1163,15 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        db_ctdb->transaction = NULL;
        db_ctdb->db = result;
 
-       if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
+       conn = messaging_ctdbd_connection();
+
+       if (!NT_STATUS_IS_OK(ctdbd_db_attach(conn, name, &db_ctdb->db_id, tdb_flags))) {
                DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));
                TALLOC_FREE(result);
                return NULL;
        }
 
-       db_path = ctdbd_dbpath(messaging_ctdbd_connection(), db_ctdb, db_ctdb->db_id);
+       db_path = ctdbd_dbpath(conn, db_ctdb, db_ctdb->db_id);
 
        result->persistent = ((tdb_flags & TDB_CLEAR_IF_FIRST) == 0);
 
@@ -1361,6 +1191,16 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        }
        talloc_free(db_path);
 
+       if (result->persistent) {
+               db_ctdb->lock_ctx = g_lock_ctx_init(db_ctdb,
+                                                   ctdb_conn_msg_ctx(conn));
+               if (db_ctdb->lock_ctx == NULL) {
+                       DEBUG(0, ("g_lock_ctx_init failed\n"));
+                       TALLOC_FREE(result);
+                       return NULL;
+               }
+       }
+
        result->private_data = (void *)db_ctdb;
        result->fetch_locked = db_ctdb_fetch_locked;
        result->fetch = db_ctdb_fetch;