return true;
}
+
+struct ctdb_transaction_handle {
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_db_context *g_lock_db;
+ char *lock_name;
+ uint32_t reqid;
+ /*
+ * we store reads and writes done under a transaction:
+ * - one list stores both reads and writes (m_all)
+ * - the other just writes (m_write)
+ */
+ struct ctdb_marshall_buffer *m_all;
+ struct ctdb_marshall_buffer *m_write;
+};
+
+static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
+{
+ g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
+ ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
+ return 0;
+}
+
+
+/**
+ * start a transaction on a database
+ */
+struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
+ TALLOC_CTX *mem_ctx)
+{
+ struct ctdb_transaction_handle *h;
+ struct ctdb_server_id id;
+
+ h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
+ if (h == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
+ return NULL;
+ }
+
+ h->ctdb_db = ctdb_db;
+ h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
+ (unsigned int)ctdb_db->db_id);
+ if (h->lock_name == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
+ "g_lock.tdb", false, 0);
+ if (!h->g_lock_db) {
+ DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ id.type = SERVER_TYPE_SAMBA;
+ id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
+ id.server_id = getpid();
+
+ if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
+ &id) != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
+
+ if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
+ DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
+ talloc_free(h);
+ return NULL;
+ }
+
+ talloc_set_destructor(h, ctdb_transaction_destructor);
+ return h;
+}
+
+/**
+ * fetch a record inside a transaction
+ */
+int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
+ TALLOC_CTX *mem_ctx,
+ TDB_DATA key, TDB_DATA *data)
+{
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ ZERO_STRUCT(header);
+
+ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
+ if (ret == -1 && header.dmaster == (uint32_t)-1) {
+ /* record doesn't exist yet */
+ *data = tdb_null;
+ ret = 0;
+ }
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+/**
+ * stores a record inside a transaction
+ */
+int ctdb_transaction_store(struct ctdb_transaction_handle *h,
+ TDB_DATA key, TDB_DATA data)
+{
+ TALLOC_CTX *tmp_ctx = talloc_new(h);
+ struct ctdb_ltdb_header header;
+ TDB_DATA olddata;
+ int ret;
+
+ /* we need the header so we can update the RSN */
+ ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
+ if (ret == -1 && header.dmaster == (uint32_t)-1) {
+ /* the record doesn't exist - create one with us as dmaster.
+ This is only safe because we are in a transaction and this
+ is a persistent database */
+ ZERO_STRUCT(header);
+ } else if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
+ talloc_free(tmp_ctx);
+ return ret;
+ }
+
+ if (data.dsize == olddata.dsize &&
+ memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
+ header.rsn != 0) {
+ /* save writing the same data */
+ talloc_free(tmp_ctx);
+ return 0;
+ }
+
+ header.dmaster = h->ctdb_db->ctdb->pnn;
+ header.rsn++;
+
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
+ if (h->m_write == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
+
+ talloc_free(tmp_ctx);
+ return 0;
+}
+
+static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
+ if (ret != 0) {
+ *seqnum = 0;
+ return 0;
+ }
+
+ if (data.dsize != sizeof(*seqnum)) {
+ DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
+ data.dsize));
+ talloc_free(data.dptr);
+ return -1;
+ }
+
+ *seqnum = *(uint64_t *)data.dptr;
+ talloc_free(data.dptr);
+
+ return 0;
+}
+
+
+static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
+ uint64_t seqnum)
+{
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
+
+ data.dptr = (uint8_t *)&seqnum;
+ data.dsize = sizeof(seqnum);
+
+ return ctdb_transaction_store(h, key, data);
+}
+
+
+/**
+ * commit a transaction
+ */
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+ int ret;
+ uint64_t old_seqnum, new_seqnum;
+ int32_t status;
+ struct timeval timeout;
+
+ if (h->m_write == NULL) {
+ /* no changes were made */
+ talloc_free(h);
+ return 0;
+ }
+
+ ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+ ret = -1;
+ goto done;
+ }
+
+ new_seqnum = old_seqnum + 1;
+ ret = ctdb_store_db_seqnum(h, new_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
+ ret = -1;
+ goto done;
+ }
+
+again:
+ timeout = timeval_current_ofs(3,0);
+ ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
+ h->ctdb_db->db_id,
+ CTDB_CONTROL_TRANS3_COMMIT, 0,
+ ctdb_marshall_finish(h->m_write), NULL, NULL,
+ &status, &timeout, NULL);
+ if (ret != 0 || status != 0) {
+ /*
+ * TRANS3_COMMIT control will only fail if recovery has been
+ * triggered. Check if the database has been updated or not.
+ */
+ ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+ goto done;
+ }
+
+ if (new_seqnum == old_seqnum) {
+ /* Database not yet updated, try again */
+ goto again;
+ }
+
+ if (new_seqnum != (old_seqnum + 1)) {
+ DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
+ (long long unsigned)new_seqnum,
+ (long long unsigned)old_seqnum));
+ ret = -1;
+ goto done;
+ }
+ }
+
+ ret = 0;
+
+done:
+ talloc_free(h);
+ return ret;
+}
+
+/**
+ * cancel a transaction
+ */
+int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
+{
+ talloc_free(h);
+ return 0;
+}
+
+#if 0
/**
* check whether a transaction is active on a given db on a given node
*/
talloc_free(h);
return 0;
}
+#endif
/*
recovery daemon ping to main daemon