all persistent databases now do all stores via automatic transactions
authorAndrew Tridgell <tridge@samba.org>
Thu, 7 Aug 2008 09:14:16 +0000 (19:14 +1000)
committerMichael Adam <obnox@samba.org>
Wed, 13 Aug 2008 09:54:09 +0000 (11:54 +0200)
(This used to be commit 76fbe56e827193d939676da23a580aa0f9394dd1)

source3/lib/dbwrap_ctdb.c

index a6bda8e403331c7f8a30f1a6676fd8c908c5e6dc..d46b64dba59af4ec209fb2532284b7f4d6d39892 100644 (file)
@@ -34,6 +34,7 @@ struct db_ctdb_transaction_handle {
 };
 
 struct db_ctdb_ctx {
+       struct db_context *db;
        struct tdb_wrap *wtdb;
        uint32 db_id;
        struct db_ctdb_transaction_handle *transaction;
@@ -400,6 +401,40 @@ static struct db_record *db_ctdb_fetch_locked_transaction(struct db_ctdb_ctx *ct
        return result;
 }
 
+static int db_ctdb_record_destructor(struct db_record *rec)
+{
+       struct db_ctdb_transaction_handle *h = talloc_get_type_abort(
+               rec->private_data, struct db_ctdb_transaction_handle);
+       h->ctx->db->transaction_cancel(h->ctx->db);
+       return 0;
+}
+
+/*
+  auto-create a transaction for persistent databases
+ */
+static struct db_record *db_ctdb_fetch_locked_persistent(struct db_ctdb_ctx *ctx,
+                                                        TALLOC_CTX *mem_ctx,
+                                                        TDB_DATA key)
+{
+       int res;
+       struct db_record *rec;
+
+       res = db_ctdb_transaction_start(ctx->db);
+       if (res == -1) {
+               return NULL;
+       }
+
+       rec = db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
+       if (rec == NULL) {
+               ctx->db->transaction_cancel(ctx->db);           
+               return NULL;
+       }
+
+       /* destroy this transaction when we release the lock */
+       talloc_set_destructor((struct db_record *)talloc_new(rec), db_ctdb_record_destructor);
+       return rec;
+}
+
 
 /*
   stores a record inside a transaction
@@ -681,130 +716,6 @@ static NTSTATUS db_ctdb_store(struct db_record *rec, TDB_DATA data, int flag)
 }
 
 
-/* for persistent databases the store is a bit different. We have to
-   ask the ctdb daemon to push the record to all nodes after the
-   store */
-static NTSTATUS db_ctdb_store_persistent(struct db_record *rec, TDB_DATA data, int flag)
-{
-       struct db_ctdb_rec *crec;
-       struct db_record *record;
-       TDB_DATA cdata;
-       int ret;
-       NTSTATUS status;
-       uint32_t count;
-       int max_retries = lp_parm_int(-1, "dbwrap ctdb", "max store retries", 5);
-
-       for (count = 0, status = NT_STATUS_UNSUCCESSFUL, record = rec;
-            (count < max_retries) && !NT_STATUS_IS_OK(status);
-            count++)
-       {
-               if (count > 0) {
-                       /* retry */
-                       /*
-                        * There is a hack here: We use rec as a memory
-                        * context and re-use it as the record struct ptr.
-                        * We don't free the record data allocated
-                        * in each turn. So all gets freed when the caller
-                        * releases the original record. This is because
-                        * we don't get the record passed in by reference
-                        * in the first place and the caller relies on
-                        * having to free the record himself.
-                        */
-                       record = fetch_locked_internal(crec->ctdb_ctx,
-                                                      rec,
-                                                      rec->key,
-                                                      true /* persistent */);
-                       if (record == NULL) {
-                               DEBUG(5, ("fetch_locked_internal failed.\n"));
-                               status = NT_STATUS_NO_MEMORY;
-                               break;
-                       }
-               }
-
-               crec = talloc_get_type_abort(record->private_data,
-                                            struct db_ctdb_rec);
-
-               cdata.dsize = sizeof(crec->header) + data.dsize;
-
-               if (!(cdata.dptr = SMB_MALLOC_ARRAY(uint8, cdata.dsize))) {
-                       return NT_STATUS_NO_MEMORY;
-               }
-
-               crec->header.rsn++;
-
-               memcpy(cdata.dptr, &crec->header, sizeof(crec->header));
-               memcpy(cdata.dptr + sizeof(crec->header), data.dptr, data.dsize);
-
-               status = ctdbd_start_persistent_update(
-                               messaging_ctdbd_connection(),
-                               crec->ctdb_ctx->db_id,
-                               rec->key,
-                               cdata);
-
-               if (NT_STATUS_IS_OK(status)) {
-                       ret = tdb_store(crec->ctdb_ctx->wtdb->tdb, rec->key,
-                                       cdata, TDB_REPLACE);
-                       status = (ret == 0)
-                                       ? NT_STATUS_OK
-                                       : tdb_error_to_ntstatus(
-                                               crec->ctdb_ctx->wtdb->tdb);
-               }
-
-               /*
-                * release the lock *now* in order to prevent deadlocks.
-                *
-                * There is a tradeoff: Usually, the record is still locked
-                * after db->store operation. This lock is usually released
-                * via the talloc destructor with the TALLOC_FREE to
-                * the record. So we have two choices:
-                *
-                * - Either re-lock the record after the call to persistent_store
-                *   or cancel_persistent update and this way not changing any
-                *   assumptions callers may have about the state, but possibly
-                *   introducing new race conditions.
-                *
-                * - Or don't lock the record again but just remove the
-                *   talloc_destructor. This is less racy but assumes that
-                *   the lock is always released via TALLOC_FREE of the record.
-                *
-                * I choose the first variant for now since it seems less racy.
-                * We can't guarantee that we succeed in getting the lock
-                * anyways. The only real danger here is that a caller
-                * performs multiple store operations after a fetch_locked()
-                * which is currently not the case.
-                */
-               tdb_chainunlock(crec->ctdb_ctx->wtdb->tdb, rec->key);
-               talloc_set_destructor(record, NULL);
-
-               /* now tell ctdbd to update this record on all other nodes */
-               if (NT_STATUS_IS_OK(status)) {
-                       status = ctdbd_persistent_store(
-                                       messaging_ctdbd_connection(),
-                                       crec->ctdb_ctx->db_id,
-                                       rec->key,
-                                       cdata);
-               } else {
-                       ctdbd_cancel_persistent_update(
-                                       messaging_ctdbd_connection(),
-                                       crec->ctdb_ctx->db_id,
-                                       rec->key,
-                                       cdata);
-                       break;
-               }
-
-               SAFE_FREE(cdata.dptr);
-       } /* retry-loop */
-
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(5, ("ctdbd_persistent_store failed after "
-                         "%d retries with error %s - giving up.\n",
-                         count, nt_errstr(status)));
-       }
-
-       SAFE_FREE(cdata.dptr);
-
-       return status;
-}
 
 static NTSTATUS db_ctdb_delete(struct db_record *rec)
 {
@@ -821,21 +732,6 @@ static NTSTATUS db_ctdb_delete(struct db_record *rec)
 
 }
 
-static NTSTATUS db_ctdb_delete_persistent(struct db_record *rec)
-{
-       TDB_DATA data;
-
-       /*
-        * We have to store the header with empty data. TODO: Fix the
-        * tdb-level cleanup
-        */
-
-       ZERO_STRUCT(data);
-
-       return db_ctdb_store_persistent(rec, data, 0);
-
-}
-
 static int db_ctdb_record_destr(struct db_record* data)
 {
        struct db_ctdb_rec *crec = talloc_get_type_abort(
@@ -867,10 +763,6 @@ static struct db_record *fetch_locked_internal(struct db_ctdb_ctx *ctx,
        TDB_DATA ctdb_data;
        int migrate_attempts = 0;
 
-       if (ctx->transaction != NULL) {
-               return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
-       }
-
        if (!(result = talloc(mem_ctx, struct db_record))) {
                DEBUG(0, ("talloc failed\n"));
                return NULL;
@@ -913,13 +805,8 @@ again:
                return NULL;
        }
 
-       if (persistent) {
-               result->store = db_ctdb_store_persistent;
-               result->delete_rec = db_ctdb_delete_persistent;
-       } else {
-               result->store = db_ctdb_store;
-               result->delete_rec = db_ctdb_delete;
-       }
+       result->store = db_ctdb_store;
+       result->delete_rec = db_ctdb_delete;
        talloc_set_destructor(result, db_ctdb_record_destr);
 
        ctdb_data = tdb_fetch(ctx->wtdb->tdb, key);
@@ -988,6 +875,14 @@ static struct db_record *db_ctdb_fetch_locked(struct db_context *db,
        struct db_ctdb_ctx *ctx = talloc_get_type_abort(db->private_data,
                                                        struct db_ctdb_ctx);
 
+       if (ctx->transaction != NULL) {
+               return db_ctdb_fetch_locked_transaction(ctx, mem_ctx, key);
+       }
+
+       if (db->persistent) {
+               return db_ctdb_fetch_locked_persistent(ctx, mem_ctx, key);
+       }
+
        return fetch_locked_internal(ctx, mem_ctx, key, db->persistent);
 }
 
@@ -1210,6 +1105,7 @@ struct db_context *db_open_ctdb(TALLOC_CTX *mem_ctx,
        }
 
        db_ctdb->transaction = NULL;
+       db_ctdb->db = result;
 
        if (!NT_STATUS_IS_OK(ctdbd_db_attach(messaging_ctdbd_connection(),name, &db_ctdb->db_id, tdb_flags))) {
                DEBUG(0, ("ctdbd_db_attach failed for %s\n", name));