*/
#include "includes.h"
-#include "db_wrap.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
#include "tdb.h"
#include "lib/util/dlinklist.h"
#include "system/network.h"
#include "../include/ctdb_private.h"
#include "lib/util/dlinklist.h"
-pid_t ctdbd_pid;
-
/*
allocate a packet for use in client<->daemon communication
*/
hdr->length = length;
hdr->operation = operation;
hdr->ctdb_magic = CTDB_MAGIC;
- hdr->ctdb_version = CTDB_VERSION;
+ hdr->ctdb_version = CTDB_PROTOCOL;
hdr->srcnode = ctdb->pnn;
if (ctdb->vnn_map) {
hdr->generation = ctdb->vnn_map->generation;
goto done;
}
- if (hdr->ctdb_version != CTDB_VERSION) {
+ if (hdr->ctdb_version != CTDB_PROTOCOL) {
ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
goto done;
}
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
- strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
+ strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
if (ctdb->daemon.sd == -1) {
goto again;
}
+ /* if this is a request for read/write and we have delegations
+ we have to revoke all delegations first
+ */
+ if ((h->header.dmaster == ctdb_db->ctdb->pnn) &&
+ (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
+ ctdb_ltdb_unlock(ctdb_db, key);
+ ret = ctdb_client_force_migration(ctdb_db, key);
+ if (ret != 0) {
+ DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ goto again;
+ }
+
DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
return h;
}
tdb_flags = TDB_INCOMPATIBLE_HASH;
}
+#ifdef TDB_MUTEX_LOCKING
+ if (!persistent && ctdb->tunable.mutex_enabled == 1) {
+ tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+ }
+#endif
+
ret = ctdb_control(ctdb, destnode, tdb_flags,
persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
0, data,
TDB_DATA data;
int ret;
int32_t res;
+#ifdef TDB_MUTEX_LOCKING
+ uint32_t mutex_enabled = 0;
+#endif
ctdb_db = ctdb_db_handle(ctdb, name);
if (ctdb_db) {
tdb_flags |= TDB_INCOMPATIBLE_HASH;
}
+#ifdef TDB_MUTEX_LOCKING
+ if (!persistent) {
+ ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
+ CTDB_CURRENT_NODE,
+ "TDBMutexEnabled",
+ &mutex_enabled);
+ if (ret != 0) {
+ DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
+ }
+
+ if (mutex_enabled == 1) {
+ tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+ }
+ }
+#endif
+
/* tell ctdb daemon to attach */
ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
return NULL;
}
- tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
+ if (persistent) {
+ tdb_flags = TDB_DEFAULT;
+ } else {
+ tdb_flags = TDB_NOSYNC;
+#ifdef TDB_MUTEX_LOCKING
+ if (mutex_enabled) {
+ tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+ }
+#endif
+ }
if (ctdb->valgrinding) {
tdb_flags |= TDB_NOMMAP;
}
tdb_flags |= TDB_DISALLOW_NESTING;
- ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
+ ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path, 0, tdb_flags,
+ O_RDWR, 0);
if (ctdb_db->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
talloc_free(ctdb_db);
return ctdb_db;
}
+/*
+ * detach from a specific database - client call
+ */
+int ctdb_detach(struct ctdb_context *ctdb, uint32_t db_id)
+{
+ int ret;
+ int32_t status;
+ TDB_DATA data;
+
+ data.dsize = sizeof(db_id);
+ data.dptr = (uint8_t *)&db_id;
+
+ ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_DETACH,
+ 0, data, NULL, NULL, &status, NULL, NULL);
+ if (ret != 0 || status != 0) {
+ return -1;
+ }
+ return 0;
+}
/*
setup a call for a database
ctdb->lastid = INT_MAX-200;
CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
- ret = ctdb_set_socketname(ctdb, CTDB_PATH);
+ ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
talloc_free(ctdb);
int32_t res = -1;
uint32_t destnode = state->c->hdr.destnode;
+ outdata.dsize = 0;
+ outdata.dptr = NULL;
+
/* one more node has responded with recmode data */
data->count--;
return id;
}
-static bool server_id_equal(struct server_id *id1, struct server_id *id2)
+/* This is basically a copy from Samba's server_id.*. However, a
+ * dependency chain stops us from using Samba's version, so use a
+ * renamed copy until a better solution is found. */
+static bool ctdb_server_id_equal(struct server_id *id1, struct server_id *id2)
{
if (id1->pid != id2->pid) {
return false;
if (data.dsize % sizeof(struct g_lock_rec) != 0) {
DEBUG(DEBUG_ERR, (__location__ "invalid data size %lu in g_lock record\n",
- data.dsize));
+ (unsigned long)data.dsize));
talloc_free(recs);
return false;
}
struct ctdb_record_handle *h;
struct g_lock_recs *locks;
struct server_id id;
+ struct timeval t_start;
int i;
key.dptr = (uint8_t *)discard_const(keyname);
key.dsize = strlen(keyname) + 1;
+
+ t_start = timeval_current();
+
+again:
+ /* Keep trying for an hour. */
+ if (timeval_elapsed(&t_start) > 3600) {
+ return false;
+ }
+
h = ctdb_fetch_lock(ctdb_db, mem_ctx, key, &data);
if (h == NULL) {
return false;
i = 0;
while (i < locks->num) {
- if (server_id_equal(&locks->lock[i].id, &id)) {
+ if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
/* Internal error */
talloc_free(h);
return false;
id.task_id, id.vnn,
(unsigned long long)id.unique_id));
talloc_free(h);
- return false;
+ goto again;
}
locks->lock = talloc_realloc(locks, locks->lock, struct g_lock_rec,
id = server_id_get(ctdb_db->ctdb, reqid);
for (i=0; i<locks->num; i++) {
- if (server_id_equal(&locks->lock[i].id, &id)) {
+ if (ctdb_server_id_equal(&locks->lock[i].id, &id)) {
if (i < locks->num-1) {
locks->lock[i] = locks->lock[locks->num-1];
}
return true;
}
-/**
- * check whether a transaction is active on a given db on a given node
- */
-int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
- uint32_t destnode,
- uint32_t db_id)
-{
- int32_t status;
- int ret;
- TDB_DATA indata;
-
- indata.dptr = (uint8_t *)&db_id;
- indata.dsize = sizeof(db_id);
-
- ret = ctdb_control(ctdb, destnode, 0,
- CTDB_CONTROL_TRANS2_ACTIVE,
- 0, indata, NULL, NULL, &status,
- NULL, NULL);
-
- if (ret != 0) {
- DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
- return -1;
- }
-
- return status;
-}
-
struct ctdb_transaction_handle {
struct ctdb_db_context *ctdb_db;
- bool in_replay;
+ struct ctdb_db_context *g_lock_db;
+ char *lock_name;
+ uint32_t reqid;
/*
- * we store the reads and writes done under a transaction:
- * - one list stores both reads and writes (m_all),
+ * we store reads and writes done under a transaction:
+ * - one list stores both reads and writes (m_all)
* - the other just writes (m_write)
*/
struct ctdb_marshall_buffer *m_all;
struct ctdb_marshall_buffer *m_write;
};
-/* start a transaction on a database */
static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
{
- tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
- return 0;
-}
-
-/* start a transaction on a database */
-static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
-{
- struct ctdb_record_handle *rh;
- TDB_DATA key;
- TDB_DATA data;
- struct ctdb_ltdb_header header;
- TALLOC_CTX *tmp_ctx;
- const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
- int ret;
- struct ctdb_db_context *ctdb_db = h->ctdb_db;
- pid_t pid;
- int32_t status;
-
- key.dptr = discard_const(keyname);
- key.dsize = strlen(keyname);
-
- if (!ctdb_db->persistent) {
- DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
- return -1;
- }
-
-again:
- tmp_ctx = talloc_new(h);
-
- rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
- if (rh == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
- talloc_free(tmp_ctx);
- return -1;
- }
-
- status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
- CTDB_CURRENT_NODE,
- ctdb_db->db_id);
- if (status == 1) {
- unsigned long int usec = (1000 + random()) % 100000;
- DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
- "on db_id[0x%08x]. waiting for %lu "
- "microseconds\n",
- ctdb_db->db_id, usec));
- talloc_free(tmp_ctx);
- usleep(usec);
- goto again;
- }
-
- /*
- * store the pid in the database:
- * it is not enough that the node is dmaster...
- */
- pid = getpid();
- data.dptr = (unsigned char *)&pid;
- data.dsize = sizeof(pid_t);
- rh->header.rsn++;
- rh->header.dmaster = ctdb_db->ctdb->pnn;
- ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
- if (ret != 0) {
- DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
- "transaction record\n"));
- talloc_free(tmp_ctx);
- return -1;
- }
-
- talloc_free(rh);
-
- ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
- if (ret != 0) {
- DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
- talloc_free(tmp_ctx);
- return -1;
- }
-
- ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
- if (ret != 0) {
- DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
- "lock record inside transaction\n"));
- tdb_transaction_cancel(ctdb_db->ltdb->tdb);
- talloc_free(tmp_ctx);
- goto again;
- }
-
- if (header.dmaster != ctdb_db->ctdb->pnn) {
- DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
- "transaction lock record\n"));
- tdb_transaction_cancel(ctdb_db->ltdb->tdb);
- talloc_free(tmp_ctx);
- goto again;
- }
-
- if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
- DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
- "the transaction lock record\n"));
- tdb_transaction_cancel(ctdb_db->ltdb->tdb);
- talloc_free(tmp_ctx);
- goto again;
- }
-
- talloc_free(tmp_ctx);
-
+ g_lock_unlock(h, h->g_lock_db, h->lock_name, h->reqid);
+ ctdb_reqid_remove(h->ctdb_db->ctdb, h->reqid);
return 0;
}
-/* start a transaction on a database */
+/**
+ * start a transaction on a database
+ */
struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
TALLOC_CTX *mem_ctx)
{
struct ctdb_transaction_handle *h;
- int ret;
+ struct ctdb_server_id id;
h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
if (h == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
+ DEBUG(DEBUG_ERR, (__location__ " memory allocation error\n"));
return NULL;
}
h->ctdb_db = ctdb_db;
+ h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x",
+ (unsigned int)ctdb_db->db_id);
+ if (h->lock_name == NULL) {
+ DEBUG(DEBUG_ERR, (__location__ " talloc asprintf failed\n"));
+ talloc_free(h);
+ return NULL;
+ }
- ret = ctdb_transaction_fetch_start(h);
- if (ret != 0) {
+ h->g_lock_db = ctdb_attach(h->ctdb_db->ctdb, timeval_current_ofs(3,0),
+ "g_lock.tdb", false, 0);
+ if (!h->g_lock_db) {
+ DEBUG(DEBUG_ERR, (__location__ " unable to attach to g_lock.tdb\n"));
talloc_free(h);
return NULL;
}
- talloc_set_destructor(h, ctdb_transaction_destructor);
+ id.type = SERVER_TYPE_SAMBA;
+ id.pnn = ctdb_get_pnn(ctdb_db->ctdb);
+ id.server_id = getpid();
- return h;
-}
+ if (ctdb_ctrl_register_server_id(ctdb_db->ctdb, timeval_current_ofs(3,0),
+ &id) != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " unable to register server id\n"));
+ talloc_free(h);
+ return NULL;
+ }
+ h->reqid = ctdb_reqid_new(h->ctdb_db->ctdb, h);
+ if (!g_lock_lock(h, h->g_lock_db, h->lock_name, h->reqid)) {
+ DEBUG(DEBUG_ERR, (__location__ " Error locking g_lock.tdb\n"));
+ talloc_free(h);
+ return NULL;
+ }
-/*
- fetch a record inside a transaction
+ talloc_set_destructor(h, ctdb_transaction_destructor);
+ return h;
+}
+
+/**
+ * fetch a record inside a transaction
*/
-int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
- TALLOC_CTX *mem_ctx,
+int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
+ TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data)
{
struct ctdb_ltdb_header header;
*data = tdb_null;
ret = 0;
}
-
+
if (ret != 0) {
return ret;
}
- if (!h->in_replay) {
- h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
- if (h->m_all == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
- return -1;
- }
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ return -1;
}
return 0;
}
-/*
- stores a record inside a transaction
+/**
+ * stores a record inside a transaction
*/
-int ctdb_transaction_store(struct ctdb_transaction_handle *h,
+int ctdb_transaction_store(struct ctdb_transaction_handle *h,
TDB_DATA key, TDB_DATA data)
{
TALLOC_CTX *tmp_ctx = talloc_new(h);
TDB_DATA olddata;
int ret;
- ZERO_STRUCT(header);
-
/* we need the header so we can update the RSN */
ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
if (ret == -1 && header.dmaster == (uint32_t)-1) {
}
if (data.dsize == olddata.dsize &&
- memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
+ memcmp(data.dptr, olddata.dptr, data.dsize) == 0 &&
+ header.rsn != 0) {
/* save writing the same data */
talloc_free(tmp_ctx);
return 0;
header.dmaster = h->ctdb_db->ctdb->pnn;
header.rsn++;
- if (!h->in_replay) {
- h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
- if (h->m_all == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
- talloc_free(tmp_ctx);
- return -1;
- }
- }
+ h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
+ if (h->m_all == NULL) {
+ DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
+ talloc_free(tmp_ctx);
+ return -1;
+ }
h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
if (h->m_write == NULL) {
talloc_free(tmp_ctx);
return -1;
}
-
- ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
talloc_free(tmp_ctx);
-
- return ret;
+ return 0;
}
-/*
- replay a transaction
- */
-static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
+static int ctdb_fetch_db_seqnum(struct ctdb_db_context *ctdb_db, uint64_t *seqnum)
{
- int ret, i;
- struct ctdb_rec_data *rec = NULL;
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header header;
+ int ret;
- h->in_replay = true;
- talloc_free(h->m_write);
- h->m_write = NULL;
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
- ret = ctdb_transaction_fetch_start(h);
+ ret = ctdb_ltdb_fetch(ctdb_db, key, &header, ctdb_db, &data);
if (ret != 0) {
- return ret;
+ *seqnum = 0;
+ return 0;
}
- for (i=0;i<h->m_all->count;i++) {
- TDB_DATA key, data;
+ if (data.dsize == 0) {
+ *seqnum = 0;
+ return 0;
+ }
- rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
- if (rec == NULL) {
- DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
- goto failed;
- }
+ if (data.dsize != sizeof(*seqnum)) {
+ DEBUG(DEBUG_ERR, (__location__ " Invalid data recived len=%zi\n",
+ data.dsize));
+ talloc_free(data.dptr);
+ return -1;
+ }
- if (rec->reqid == 0) {
- /* its a store */
- if (ctdb_transaction_store(h, key, data) != 0) {
- goto failed;
- }
- } else {
- TDB_DATA data2;
- TALLOC_CTX *tmp_ctx = talloc_new(h);
+ *seqnum = *(uint64_t *)data.dptr;
+ talloc_free(data.dptr);
- if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
- talloc_free(tmp_ctx);
- goto failed;
- }
- if (data2.dsize != data.dsize ||
- memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
- /* the record has changed on us - we have to give up */
- talloc_free(tmp_ctx);
- goto failed;
- }
- talloc_free(tmp_ctx);
- }
- }
-
return 0;
-
-failed:
- tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
- return -1;
}
-/*
- commit a transaction
- */
-int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+static int ctdb_store_db_seqnum(struct ctdb_transaction_handle *h,
+ uint64_t seqnum)
{
- int ret, retries=0;
- int32_t status;
- struct ctdb_context *ctdb = h->ctdb_db->ctdb;
- struct timeval timeout;
- enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
+ const char *keyname = CTDB_DB_SEQNUM_KEY;
+ TDB_DATA key, data;
- talloc_set_destructor(h, NULL);
+ key.dptr = (uint8_t *)discard_const(keyname);
+ key.dsize = strlen(keyname) + 1;
- /* our commit strategy is quite complex.
+ data.dptr = (uint8_t *)&seqnum;
+ data.dsize = sizeof(seqnum);
- - we first try to commit the changes to all other nodes
+ return ctdb_transaction_store(h, key, data);
+}
- - if that works, then we commit locally and we are done
- - if a commit on another node fails, then we need to cancel
- the transaction, then restart the transaction (thus
- opening a window of time for a pending recovery to
- complete), then replay the transaction, checking all the
- reads and writes (checking that reads give the same data,
- and writes succeed). Then we retry the transaction to the
- other nodes
- */
+/**
+ * commit a transaction
+ */
+int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
+{
+ int ret;
+ uint64_t old_seqnum, new_seqnum;
+ int32_t status;
+ struct timeval timeout;
-again:
if (h->m_write == NULL) {
/* no changes were made */
- tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
talloc_free(h);
return 0;
}
- /* tell ctdbd to commit to the other nodes */
- timeout = timeval_current_ofs(1, 0);
- ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
- retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
- ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
- &timeout, NULL);
- if (ret != 0 || status != 0) {
- tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
- DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
- ", retrying after 1 second...\n",
- (retries==0)?"":"retry "));
- sleep(1);
+ ret = ctdb_fetch_db_seqnum(h->ctdb_db, &old_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+ ret = -1;
+ goto done;
+ }
+ new_seqnum = old_seqnum + 1;
+ ret = ctdb_store_db_seqnum(h, new_seqnum);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, (__location__ " failed to store db sequence number\n"));
+ ret = -1;
+ goto done;
+ }
+
+again:
+ timeout = timeval_current_ofs(3,0);
+ ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE,
+ h->ctdb_db->db_id,
+ CTDB_CONTROL_TRANS3_COMMIT, 0,
+ ctdb_marshall_finish(h->m_write), NULL, NULL,
+ &status, &timeout, NULL);
+ if (ret != 0 || status != 0) {
+ /*
+ * TRANS3_COMMIT control will only fail if recovery has been
+ * triggered. Check if the database has been updated or not.
+ */
+ ret = ctdb_fetch_db_seqnum(h->ctdb_db, &new_seqnum);
if (ret != 0) {
- failure_control = CTDB_CONTROL_TRANS2_ERROR;
- } else {
- /* work out what error code we will give if we
- have to fail the operation */
- switch ((enum ctdb_trans2_commit_error)status) {
- case CTDB_TRANS2_COMMIT_SUCCESS:
- case CTDB_TRANS2_COMMIT_SOMEFAIL:
- case CTDB_TRANS2_COMMIT_TIMEOUT:
- failure_control = CTDB_CONTROL_TRANS2_ERROR;
- break;
- case CTDB_TRANS2_COMMIT_ALLFAIL:
- failure_control = CTDB_CONTROL_TRANS2_FINISHED;
- break;
- }
+ DEBUG(DEBUG_ERR, (__location__ " failed to fetch db sequence number\n"));
+ goto done;
}
- if (++retries == 100) {
- DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
- h->ctdb_db->db_id, retries, (unsigned)failure_control));
- ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
- failure_control, CTDB_CTRL_FLAG_NOREPLY,
- tdb_null, NULL, NULL, NULL, NULL, NULL);
- talloc_free(h);
- return -1;
- }
-
- if (ctdb_replay_transaction(h) != 0) {
- DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
- "transaction on db 0x%08x, "
- "failure control =%u\n",
- h->ctdb_db->db_id,
- (unsigned)failure_control));
- ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
- failure_control, CTDB_CTRL_FLAG_NOREPLY,
- tdb_null, NULL, NULL, NULL, NULL, NULL);
- talloc_free(h);
- return -1;
+ if (new_seqnum == old_seqnum) {
+ /* Database not yet updated, try again */
+ goto again;
}
- goto again;
- } else {
- failure_control = CTDB_CONTROL_TRANS2_ERROR;
- }
- /* do the real commit locally */
- ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
- if (ret != 0) {
- DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
- "on db id 0x%08x locally, "
- "failure_control=%u\n",
- h->ctdb_db->db_id,
- (unsigned)failure_control));
- ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
- failure_control, CTDB_CTRL_FLAG_NOREPLY,
- tdb_null, NULL, NULL, NULL, NULL, NULL);
- talloc_free(h);
- return ret;
+ if (new_seqnum != (old_seqnum + 1)) {
+ DEBUG(DEBUG_ERR, (__location__ " new seqnum [%llu] != old seqnum [%llu] + 1\n",
+ (long long unsigned)new_seqnum,
+ (long long unsigned)old_seqnum));
+ ret = -1;
+ goto done;
+ }
}
- /* tell ctdbd that we are finished with our local commit */
- ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
- CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
- tdb_null, NULL, NULL, NULL, NULL, NULL);
+ ret = 0;
+
+done:
+ talloc_free(h);
+ return ret;
+}
+
+/**
+ * cancel a transaction
+ */
+int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
+{
talloc_free(h);
return 0;
}
+
/*
recovery daemon ping to main daemon
*/