client: Reimplement persistent transaction code using TRANS3_COMMIT
authorAmitay Isaacs <amitay@gmail.com>
Mon, 23 Sep 2013 08:30:04 +0000 (18:30 +1000)
committerAmitay Isaacs <amitay@gmail.com>
Fri, 4 Oct 2013 05:46:15 +0000 (15:46 +1000)
Implementing persistent trasnaction code from Samba.

Persistent transaction code was reimplemented in Samba using g_lock.tdb
to hold transaction locks and using TRANS3_COMMIT control.

Implementation details:

1. When starting a transaction, create a record with "transaction-<dbid>"
   as key and store current server_id in the structure.

2. If a record already exists, some other client has already started a
   transaction.  Verify that the process corresponding to server_id stored
   in the record really exists or it's a stale record and overwrite it.

3. All modifications to the actual persistent database are stored in a
   marshal buffer.

4. When transaction is committed, read the sequence number of the
   persistent database and increment it.  Sequence number record is also
   stored in the marshal buffer.

5. Send the changed records (marshal buffer) in TRANS3_COMMIT control
   to all the active nodes.

6. If all controls succeed, verify that the sequence number has been
   incremented.  Commit is successful.  If any of the controls fail,
   abort the transaction.

7. In case sequence number has not yet been incremented, then database
   recovery has been triggered.  So repeat from step 5.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
client/ctdb_client.c
include/ctdb_client.h

index 2666ca2a31e5730a0537475d358d1feec92f1740..3d9c6ed5ffc74faa7951722756f956d3135fcdba 100644 (file)
@@ -3994,6 +3994,295 @@ static bool g_lock_unlock(TALLOC_CTX *mem_ctx,
        return true;
 }
 
+
+struct ctdb_transaction_handle {
+       struct ctdb_db_context *ctdb_db;
+       struct ctdb_db_context *g_lock_db;
+       char *lock_name;
+       uint32_t reqid;
+       /*
+        * we store reads and writes done under a transaction:
+        * - one list stores both reads and writes (m_all)
+        * - the other just writes (m_write)
+        */
+       struct ctdb_marshall_buffer *m_all;
+       struct ctdb_marshall_buffer *m_write;
+};
+
+static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
+{
+       g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
+       ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
+       return 0;
+}
+
+
+/**
+ * start a transaction on a database
+ */
+struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
+                                                      TALLOC_CTX *mem_ctx)
+{
+       struct ctdb_transaction_handle *h;
+       struct ctdb_server_id id;
+
+       h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
+       if (h == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
+               return NULL;
+       }
+
+       h->ctdb_db = ctdb_db;
+       h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
+                                      (unsigned int)ctdb_db->db_id);
+       if (h->lock_name == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
+                                  "g_lock.tdb", false, 0);
+       if (!h->g_lock_db) {
+               DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       id.type = SERVER_TYPE_SAMBA;
+       id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
+       id.server_id = getpid();
+
+       if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
+                                        &id) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
+
+       if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
+               DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
+               talloc_free(h);
+               return NULL;
+       }
+
+       talloc_set_destructor(h, ctdb_transaction_destructor);
+       return h;
+}
+
+/**
+ * fetch a record inside a transaction
+ */
+int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
+                          TALLOC_CTX *mem_ctx,
+                          TDB_DATA key, TDB_DATA *data)
+{
+       struct ctdb_ltdb_header header;
+       int ret;
+
+       ZERO_STRUCT(header);
+
+       ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
+       if (ret == -1 && header.dmaster == (uint32_t)-1) {
+               /* record doesn't exist yet */
+               *data = tdb_null;
+               ret = 0;
+       }
+
+       if (ret != 0) {
+               return ret;
+       }
+
+       h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+       if (h->m_all == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+               return -1;
+       }
+
+       return 0;
+}
+
+/**
+ * stores a record inside a transaction
+ */
+int ctdb_transaction_store(struct ctdb_transaction_handle *h,
+                          TDB_DATA key, TDB_DATA data)
+{
+       TALLOC_CTX *tmp_ctx = talloc_new(h);
+       struct ctdb_ltdb_header header;
+       TDB_DATA olddata;
+       int ret;
+
+       /* we need the header so we can update the RSN */
+       ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
+       if (ret == -1 && header.dmaster == (uint32_t)-1) {
+               /* the record doesn't exist - create one with us as dmaster.
+                  This is only safe because we are in a transaction and this
+                  is a persistent database */
+               ZERO_STRUCT(header);
+       } else if (ret != 0) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
+               talloc_free(tmp_ctx);
+               return ret;
+       }
+
+       if (data.dsize == olddata.dsize &&
+           memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
+           header.rsn != 0) {
+               /* save writing the same data */
+               talloc_free(tmp_ctx);
+               return 0;
+       }
+
+       header.dmaster = h->ctdb_db->ctdb->pnn;
+       header.rsn++;
+
+       h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+       if (h->m_all == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
+       if (h->m_write == NULL) {
+               DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+               talloc_free(tmp_ctx);
+               return -1;
+       }
+
+       talloc_free(tmp_ctx);
+       return 0;
+}
+
+static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
+{
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key, data;
+       struct ctdb_ltdb_header header;
+       int ret;
+
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
+
+       ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
+       if (ret != 0) {
+               *seqnum = 0;
+               return 0;
+       }
+
+       if (data.dsize != sizeof(*seqnum)) {
+               DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
+                                 data.dsize));
+               talloc_free(data.dptr);
+               return -1;
+       }
+
+       *seqnum = *(uint64_t *)data.dptr;
+       talloc_free(data.dptr);
+
+       return 0;
+}
+
+
+static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
+                               uint64_t seqnum)
+{
+       const char *keyname = CTDB_DB_SEQNUM_KEY;
+       TDB_DATA key, data;
+
+       key.dptr = (uint8_t *)discard_const(keyname);
+       key.dsize = strlen(keyname) + 1;
+
+       data.dptr = (uint8_t *)&seqnum;
+       data.dsize = sizeof(seqnum);
+
+       return ctdb_transaction_store(h, key, data);
+}
+
+
+/**
+ * commit a transaction
+ */
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+       int ret;
+       uint64_t old_seqnum, new_seqnum;
+       int32_t status;
+       struct timeval timeout;
+
+       if (h->m_write == NULL) {
+               /* no changes were made */
+               talloc_free(h);
+               return 0;
+       }
+
+       ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+               ret = -1;
+               goto done;
+       }
+
+       new_seqnum = old_seqnum + 1;
+       ret = ctdb_store_db_seqnum(h, new_seqnum);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
+               ret = -1;
+               goto done;
+       }
+
+again:
+       timeout = timeval_current_ofs(3,0);
+       ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
+                          h->ctdb_db->db_id,
+                          CTDB_CONTROL_TRANS3_COMMIT, 0,
+                          ctdb_marshall_finish(h->m_write), NULL, NULL,
+                          &status, &timeout, NULL);
+       if (ret != 0 || status != 0) {
+               /*
+                * TRANS3_COMMIT control will only fail if recovery has been
+                * triggered.  Check if the database has been updated or not.
+                */
+               ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+                       goto done;
+               }
+
+               if (new_seqnum == old_seqnum) {
+                       /* Database not yet updated, try again */
+                       goto again;
+               }
+
+               if (new_seqnum != (old_seqnum + 1)) {
+                       DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
+                                         (long long unsigned)new_seqnum,
+                                         (long long unsigned)old_seqnum));
+                       ret = -1;
+                       goto done;
+               }
+       }
+
+       ret = 0;
+
+done:
+       talloc_free(h);
+       return ret;
+}
+
+/**
+ * cancel a transaction
+ */
+int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
+{
+       talloc_free(h);
+       return 0;
+}
+
+#if 0
 /**
  * check whether a transaction is active on a given db on a given node
  */
@@ -4437,6 +4726,7 @@ again:
        talloc_free(h);
        return 0;
 }
+#endif
 
 /*
   recovery daemon ping to main daemon
index 35ce6d1b26fdbbbbd8610a38d35a7aac14db9e00..ca2f7620e5b04d523e17cfa10a1606a307ab72b9 100644 (file)
@@ -546,11 +546,6 @@ struct ctdb_client_control_state *ctdb_ctrl_getcapabilities_send(struct ctdb_con
 
 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities);
 
-
-int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
-                                    uint32_t destnode,
-                                    uint32_t db_id);
-
 struct ctdb_marshall_buffer *ctdb_marshall_add(TALLOC_CTX *mem_ctx,
                                               struct ctdb_marshall_buffer *m,
                                               uint64_t db_id,