/* Remove type-safety macros. */
#undef ctdb_attachdb_send
-#undef ctdb_readrecordlock_send
+#undef ctdb_readrecordlock_async
/* FIXME: Could be in shared util code with rest of ctdb */
static void close_noerr(int fd)
/* FIXME: Report error. */
ctdb->broken = true;
}
+ ctdb_request_free(req);
}
struct ctdb_connection *ctdb_connect(const char *addr)
goto fail;
ctdb->outq = NULL;
ctdb->doneq = NULL;
- ctdb->immediateq = NULL;
ctdb->in = NULL;
ctdb->message_handlers = NULL;
+ ctdb->next_id = 0;
+ ctdb->broken = false;
memset(&sun, 0, sizeof(sun));
sun.sun_family = AF_UNIX;
if (io_elem_finished(ctdb->outq->io)) {
struct ctdb_request *done = ctdb->outq;
DLIST_REMOVE(ctdb->outq, done);
- DLIST_ADD_END(ctdb->doneq, done,
- struct ctdb_request);
+ /* We add at the head: any dead ones
+ * sit and end. */
+ DLIST_ADD(ctdb->doneq, done);
}
}
}
}
}
- while (ctdb->immediateq) {
- struct ctdb_request *imm = ctdb->immediateq;
- imm->callback(ctdb, imm, imm->priv_data);
- DLIST_REMOVE(ctdb->immediateq, imm);
- }
-
return 0;
}
struct ctdb_request *req;
struct ctdb_req_control *pkt;
- req = new_ctdb_request(sizeof(*pkt) + extra, callback, cbdata);
+ req = new_ctdb_request(offsetof(struct ctdb_req_control, data) + extra, callback, cbdata);
if (!req)
return NULL;
CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
pkt = req->hdr.control;
+ pkt->pad = 0;
pkt->opcode = opcode;
pkt->srvid = 0;
pkt->client_id = 0;
pkt->flags = 0;
pkt->datalen = extra;
memcpy(pkt->data, extra_data, extra);
- DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+ DLIST_ADD(ctdb->outq, req);
return req;
}
if (!reply || reply->status != 0) {
/* We failed. Hand request to user and have them discover it
* via ctdb_attachdb_recv. */
- db->callback(ctdb, req, db);
+ db->callback(ctdb, req, db->private_data);
return;
}
db->id = *(uint32_t *)reply->data;
&db->id, sizeof(db->id),
attachdb_getdbpath_done, db);
if (!req2) {
- db->callback(ctdb, req, db);
+ db->callback(ctdb, req, db->private_data);
return;
}
req->extra = req2;
/* This will always be true by the time user sees this. */
bool held;
struct ctdb_ltdb_header *hdr;
- TDB_DATA data;
/* For convenience, we stash original callback here. */
- ctdb_callback_t callback;
+ ctdb_rrl_callback_t callback;
};
void ctdb_release_lock(struct ctdb_lock *lock)
{
if (lock->held) {
tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+ lock->held = false;
}
- free(lock->hdr); /* Also frees data */
+}
+
+static void ctdb_free_lock(struct ctdb_lock *lock)
+{
+ if (lock->held) {
+ /* FIXME: report error. Callback never released the lock */
+ ctdb_release_lock(lock);
+ }
+
+ free(lock->hdr);
free(lock);
}
/* We keep the lock if local node is the dmaster. */
-static bool try_readrecordlock(struct ctdb_lock *lock)
+static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
{
struct ctdb_ltdb_header *hdr;
return NULL;
}
- hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, &lock->data);
+ hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, data);
if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
lock->held = true;
lock->hdr = hdr;
return NULL;
}
-/* If they cancel *before* we hand them the lock from
- * ctdb_readrecordlock_recv, we free it here. */
+/* If they shutdown before we hand them the lock, we free it here. */
static void destroy_lock(struct ctdb_request *req)
{
ctdb_release_lock(req->extra);
-}
-
-struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db,
- struct ctdb_request *req,
- TDB_DATA *data)
-{
- struct ctdb_lock *lock = req->extra;
-
- if (!lock->held) {
- /* Something went wrong. */
- return NULL;
- }
-
- /* Now it's their responsibility to free! */
- req->extra_destructor = NULL;
- *data = lock->data;
- return lock;
+ ctdb_free_lock(req->extra);
}
static void readrecordlock_retry(struct ctdb_connection *ctdb,
{
struct ctdb_lock *lock = req->extra;
struct ctdb_reply_call *reply;
+ TDB_DATA data;
/* OK, we've received reply to noop migration */
reply = unpack_reply_call(req, CTDB_NULL_FUNC);
if (!reply || reply->status != 0) {
- lock->callback(ctdb, req, private);
+ lock->callback(lock->ctdb_db, NULL, tdb_null, private);
+ ctdb_request_free(req); /* Also frees lock. */
+ ctdb_free_lock(lock);
return;
}
/* Can we get lock now? */
- if (try_readrecordlock(lock)) {
- lock->callback(ctdb, req, private);
+ if (try_readrecordlock(lock, &data)) {
+ /* Now it's their responsibility to free lock & request! */
+ req->extra_destructor = NULL;
+ lock->callback(lock->ctdb_db, lock, data, private);
+ ctdb_free_lock(lock);
return;
}
/* Retransmit the same request again (we lost race). */
io_elem_reset(req->io);
- DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
- return;
+ DLIST_ADD(ctdb->outq, req);
}
-struct ctdb_request *
-ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
- ctdb_callback_t callback, void *cbdata)
+bool
+ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
+ ctdb_rrl_callback_t callback, void *cbdata)
{
struct ctdb_request *req;
struct ctdb_lock *lock;
+ TDB_DATA data;
- /* Setup lock. */
+ /* Setup lock */
lock = malloc(sizeof(*lock) + key.dsize);
if (!lock) {
- return NULL;
+ return false;
}
lock->key.dptr = (void *)(lock + 1);
memcpy(lock->key.dptr, key.dptr, key.dsize);
lock->hdr = NULL;
lock->held = false;
- /* Get ready in case we need to send a migrate request. */
- req = new_ctdb_request(sizeof(*req->hdr.call)
- + key.dsize, callback, cbdata);
+ /* Fast path. */
+ if (try_readrecordlock(lock, &data)) {
+ callback(ctdb_db, lock, data, cbdata);
+ ctdb_free_lock(lock);
+ return true;
+ }
+
+ /* Slow path: create request. */
+ req = new_ctdb_request(offsetof(struct ctdb_req_call, data)
+ + key.dsize, readrecordlock_retry, cbdata);
if (!req) {
ctdb_release_lock(lock);
+ ctdb_free_lock(lock);
return NULL;
}
req->extra = lock;
req->extra_destructor = destroy_lock;
-
- if (try_readrecordlock(lock)) {
- /* Already got it: prepare for immediate callback. */
- DLIST_ADD_END(ctdb_db->ctdb->immediateq,
- req, struct ctdb_request);
- return req;
- }
-
/* We store the original callback in the lock, and use our own. */
lock->callback = callback;
- req->callback = readrecordlock_retry;
io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
new_reqid(ctdb_db->ctdb));
req->hdr.call->keylen = key.dsize;
req->hdr.call->calldatalen = 0;
memcpy(req->hdr.call->data, key.dptr, key.dsize);
- DLIST_ADD_END(ctdb_db->ctdb->outq, req, struct ctdb_request);
- return req;
+ DLIST_ADD(ctdb_db->ctdb->outq, req);
+ return true;
}
int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
return -1;
}
+ if (!lock->held) {
+ /* FIXME: Report error. */
+ return -1;
+ }
+
return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr,
data);
}