s3:dbwrap_ctdb: maintain a database sequence number that bumps in transactions
[abartlet/samba.git/.git] / source3 / lib / dbwrap_ctdb.c
index 0986083268fd87ff1d0d3e0c0a2d22873adacb02..fb99e1d9cf52c5265d6856dd5bd682c02f279a4c 100644 (file)
@@ -664,6 +664,65 @@ static NTSTATUS db_ctdb_delete_transaction(struct db_record *rec)
        return status;
 }
 
+/**
+ * Fetch the db sequence number of a persistent db directly from the db.
+ */
+static NTSTATUS db_ctdb_fetch_db_seqnum_from_db(struct db_ctdb_ctx *db,
+                                               uint64_t *seqnum)
+{
+       NTSTATUS status;
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key;
+       TDB_DATA data;
+       struct ctdb_ltdb_header header;
+       TALLOC_CTX *mem_ctx = talloc_stackframe();
+
+       if (seqnum == NULL) {
+               return NT_STATUS_INVALID_PARAMETER;
+       }
+
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
+
+       status = db_ctdb_ltdb_fetch(db, key, &header, mem_ctx, &data);
+       if (!NT_STATUS_IS_OK(status)) {
+               goto done;
+       }
+
+       if (data.dsize != sizeof(uint64_t)) {
+               *seqnum = 0;
+               goto done;
+       }
+
+       *seqnum = *(uint64_t *)data.dptr;
+
+done:
+       TALLOC_FREE(mem_ctx);
+       return status;
+}
+
+/**
+ * Store the database sequence number inside a transaction.
+ */
+static NTSTATUS db_ctdb_store_db_seqnum(struct db_ctdb_transaction_handle *h,
+                                       uint64_t seqnum)
+{
+       NTSTATUS status;
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key;
+       TDB_DATA data;
+
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname);
+
+       data.dptr = (uint8_t *)&seqnum;
+       data.dsize = sizeof(uint64_t);
+
+       status = db_ctdb_transaction_store(h, key, data);
+
+       return status;
+}
+
 /*
   commit a transaction
  */
@@ -674,6 +733,8 @@ static int db_ctdb_transaction_commit(struct db_context *db)
        NTSTATUS rets;
        int status;
        struct db_ctdb_transaction_handle *h = ctx->transaction;
+       uint64_t old_seqnum, new_seqnum;
+       int ret;
 
        if (h == NULL) {
                DEBUG(0,(__location__ " transaction commit with no open transaction on db 0x%08x\n", ctx->db_id));
@@ -693,6 +754,30 @@ static int db_ctdb_transaction_commit(struct db_context *db)
 
        DEBUG(5,(__location__ " Commit transaction on db 0x%08x\n", ctx->db_id));
 
+       /*
+        * As the last db action before committing, bump the database sequence
+        * number. Note that this undoes all changes to the seqnum records
+        * performed under the transaction. This record is not meant to be
+        * modified by user interaction. It is for internal use only...
+        */
+       rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &old_seqnum);
+       if (!NT_STATUS_IS_OK(rets)) {
+               DEBUG(1, (__location__ " failed to fetch the db sequence number "
+                         "in transaction commit on db 0x%08x\n", ctx->db_id));
+               ret = -1;
+               goto done;
+       }
+
+       new_seqnum = old_seqnum + 1;
+
+       rets = db_ctdb_store_db_seqnum(h, new_seqnum);
+       if (!NT_STATUS_IS_OK(rets)) {
+               DEBUG(1, (__location__ "failed to store the db sequence number "
+                         " in transaction commit on db 0x%08x\n", ctx->db_id));
+               ret = -1;
+               goto done;
+       }
+
 again:
        if (h->m_write == NULL) {
                /* no changes were made, potentially after a retry */
@@ -707,14 +792,40 @@ again:
                                   NULL, NULL, &status);
        if (!NT_STATUS_IS_OK(rets) || status != 0) {
                /*
-                * TODO:
-                * check the database sequence number and
-                * compare it to the seqnum after applying the
-                * marshall buffer. If it is the same: return success.
+                * The TRANS3_COMMIT control should only possibly fail when a
+                * recovery has been running concurrently. In any case, the db
+                * will be the same on all nodes, either the new copy or the
+                * old copy.  This can be detected by comparing the old and new
+                * local sequence numbers.
+                */
+               rets = db_ctdb_fetch_db_seqnum_from_db(ctx, &new_seqnum);
+               if (!NT_STATUS_IS_OK(rets)) {
+                       DEBUG(1, (__location__ " failed to refetch db sequence "
+                                 "number after failed TRANS3_COMMIT\n"));
+                       ret = -1;
+                       goto done;
+               }
+
+               if (new_seqnum == old_seqnum) {
+                       /* Recovery prevented all our changes: retry. */
+                       goto again;
+               } else if (new_seqnum != (old_seqnum + 1)) {
+                       DEBUG(0, (__location__ " ERROR: new_seqnum[%lu] != "
+                                 "old_seqnum[%lu] + (0 or 1) after failed "
+                                 "TRANS3_COMMIT - this should not happen!\n",
+                                 (unsigned long)new_seqnum,
+                                 (unsigned long)old_seqnum));
+                       ret = -1;
+                       goto done;
+               }
+               /*
+                * Recovery propagated our changes to all nodes, completing
+                * our commit for us - succeed.
                 */
-               goto again;
        }
 
+       ret = 0;
+
 done:
        h->ctx->transaction = NULL;
        talloc_free(h);