Readrecordlock changes:
[rusty/ctdb.git] / libctdb / ctdb.c
index c9adfdaa2ce28185d88e9d355fc92b5e10e114f8..52cd38e4a192b6b173554ee3e23db395c2c9c009 100644 (file)
@@ -33,7 +33,7 @@
 
 /* Remove type-safety macros. */
 #undef ctdb_attachdb_send
-#undef ctdb_readrecordlock_send
+#undef ctdb_readrecordlock_async
 
 /* FIXME: Could be in shared util code with rest of ctdb */
 static void close_noerr(int fd)
@@ -75,6 +75,7 @@ static void set_pnn(struct ctdb_connection *ctdb,
                /* FIXME: Report error. */
                ctdb->broken = true;
        }
+       ctdb_request_free(req);
 }
 
 struct ctdb_connection *ctdb_connect(const char *addr)
@@ -345,7 +346,7 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
        struct ctdb_request *req;
        struct ctdb_req_control *pkt;
 
-       req = new_ctdb_request(sizeof(*pkt) + extra, callback, cbdata);
+       req = new_ctdb_request(offsetof(struct ctdb_req_control, data) + extra, callback, cbdata);
        if (!req)
                return NULL;
 
@@ -353,6 +354,7 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
                                CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
 
        pkt = req->hdr.control;
+       pkt->pad = 0;
        pkt->opcode = opcode;
        pkt->srvid = 0;
        pkt->client_id = 0;
@@ -447,7 +449,7 @@ static void attachdb_done(struct ctdb_connection *ctdb,
        if (!reply || reply->status != 0) {
                /* We failed.  Hand request to user and have them discover it
                 * via ctdb_attachdb_recv. */
-               db->callback(ctdb, req, db);
+               db->callback(ctdb, req, db->private_data);
                return;
        }
        db->id = *(uint32_t *)reply->data;
@@ -458,7 +460,7 @@ static void attachdb_done(struct ctdb_connection *ctdb,
                                        &db->id, sizeof(db->id),
                                        attachdb_getdbpath_done, db);
        if (!req2) {
-               db->callback(ctdb, req, db);
+               db->callback(ctdb, req, db->private_data);
                return;
        }
        req->extra = req2;
@@ -525,23 +527,32 @@ struct ctdb_lock {
        /* This will always be true by the time user sees this. */
        bool held;
        struct ctdb_ltdb_header *hdr;
-       TDB_DATA data;
 
        /* For convenience, we stash original callback here. */
-       ctdb_callback_t callback;
+       ctdb_rrl_callback_t callback;
 };
 
 void ctdb_release_lock(struct ctdb_lock *lock)
 {
        if (lock->held) {
                tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+               lock->held = false;
+       }
+}
+
+static void ctdb_free_lock(struct ctdb_lock *lock)
+{
+       if (lock->held) {
+               /* FIXME: report error. Callback never released the lock */
+               ctdb_release_lock(lock);
        }
-       free(lock->hdr); /* Also frees data */
+
+       free(lock->hdr);
        free(lock);
 }
 
 /* We keep the lock if local node is the dmaster. */
-static bool try_readrecordlock(struct ctdb_lock *lock)
+static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 {
        struct ctdb_ltdb_header *hdr;
 
@@ -549,7 +560,7 @@ static bool try_readrecordlock(struct ctdb_lock *lock)
                return NULL;
        }
 
-       hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, &lock->data);
+       hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, data);
        if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
                lock->held = true;
                lock->hdr = hdr;
@@ -561,28 +572,11 @@ static bool try_readrecordlock(struct ctdb_lock *lock)
        return NULL;
 }
 
-/* If they cancel *before* we hand them the lock from
- * ctdb_readrecordlock_recv, we free it here. */
+/* If they shutdown before we hand them the lock, we free it here. */
 static void destroy_lock(struct ctdb_request *req)
 {
        ctdb_release_lock(req->extra);
-}
-
-struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db,
-                                          struct ctdb_request *req,
-                                          TDB_DATA *data)
-{
-       struct ctdb_lock *lock = req->extra;
-
-       if (!lock->held) {
-               /* Something went wrong. */
-               return NULL;
-       }
-
-       /* Now it's their responsibility to free! */
-       req->extra_destructor = NULL;
-       *data = lock->data;
-       return lock;
+       ctdb_free_lock(req->extra);
 }
 
 static void readrecordlock_retry(struct ctdb_connection *ctdb,
@@ -590,38 +584,43 @@ static void readrecordlock_retry(struct ctdb_connection *ctdb,
 {
        struct ctdb_lock *lock = req->extra;
        struct ctdb_reply_call *reply;
+       TDB_DATA data;
 
        /* OK, we've received reply to noop migration */
        reply = unpack_reply_call(req, CTDB_NULL_FUNC);
        if (!reply || reply->status != 0) {
-               lock->callback(ctdb, req, private);
+               lock->callback(lock->ctdb_db, NULL, tdb_null, private);
+               ctdb_request_free(req); /* Also frees lock. */
+               ctdb_free_lock(lock);
                return;
        }
 
        /* Can we get lock now? */
-       if (try_readrecordlock(lock)) {
-               lock->callback(ctdb, req, private);
+       if (try_readrecordlock(lock, &data)) {
+               /* Now it's their responsibility to free lock & request! */
+               req->extra_destructor = NULL;
+               lock->callback(lock->ctdb_db, lock, data, private);
+               ctdb_free_lock(lock);
                return;
        }
 
        /* Retransmit the same request again (we lost race). */
        io_elem_reset(req->io);
        DLIST_ADD(ctdb->outq, req);
-       return;
 }
 
 bool
-ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
-                        struct ctdb_request **reqp,
-                        ctdb_callback_t callback, void *cbdata)
+ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
+                         ctdb_rrl_callback_t callback, void *cbdata)
 {
        struct ctdb_request *req;
        struct ctdb_lock *lock;
+       TDB_DATA data;
 
-       /* Setup lock. */
+       /* Setup lock */
        lock = malloc(sizeof(*lock) + key.dsize);
        if (!lock) {
-               return NULL;
+               return false;
        }
        lock->key.dptr = (void *)(lock + 1);
        memcpy(lock->key.dptr, key.dptr, key.dsize);
@@ -630,25 +629,25 @@ ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
        lock->hdr = NULL;
        lock->held = false;
 
-       /* Get ready in case we need to send a migrate request. */
-       req = new_ctdb_request(sizeof(*req->hdr.call)
-                              + key.dsize, callback, cbdata);
+       /* Fast path. */
+       if (try_readrecordlock(lock, &data)) {
+               callback(ctdb_db, lock, data, cbdata);
+               ctdb_free_lock(lock);
+               return true;
+       }
+
+       /* Slow path: create request. */
+       req = new_ctdb_request(offsetof(struct ctdb_req_call, data)
+                              + key.dsize, readrecordlock_retry, cbdata);
        if (!req) {
                ctdb_release_lock(lock);
+               ctdb_free_lock(lock);
                return NULL;
        }
        req->extra = lock;
        req->extra_destructor = destroy_lock;
-
-       if (try_readrecordlock(lock)) {
-               *reqp = NULL;
-               callback(ctdb_db->ctdb, req, cbdata);
-               return true;
-       }
-
        /* We store the original callback in the lock, and use our own. */
        lock->callback = callback;
-       req->callback = readrecordlock_retry;
 
        io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
                                new_reqid(ctdb_db->ctdb));
@@ -661,7 +660,6 @@ ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
        req->hdr.call->calldatalen = 0;
        memcpy(req->hdr.call->data, key.dptr, key.dsize);
        DLIST_ADD(ctdb_db->ctdb->outq, req);
-       *reqp = req;
        return true;
 }
 
@@ -672,6 +670,11 @@ int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
                return -1;
        }
 
+       if (!lock->held) {
+               /* FIXME: Report error. */
+               return -1;
+       }
+
        return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr,
                                data);
 }