Readrecordlock changes:
[rusty/ctdb.git] / libctdb / ctdb.c
index 5461d3c132d21323ac38ed29377987f66fb950b2..52cd38e4a192b6b173554ee3e23db395c2c9c009 100644 (file)
@@ -33,7 +33,7 @@
 
 /* Remove type-safety macros. */
 #undef ctdb_attachdb_send
-#undef ctdb_readrecordlock_send
+#undef ctdb_readrecordlock_async
 
 /* FIXME: Could be in shared util code with rest of ctdb */
 static void close_noerr(int fd)
@@ -75,6 +75,7 @@ static void set_pnn(struct ctdb_connection *ctdb,
                /* FIXME: Report error. */
                ctdb->broken = true;
        }
+       ctdb_request_free(req);
 }
 
 struct ctdb_connection *ctdb_connect(const char *addr)
@@ -87,9 +88,10 @@ struct ctdb_connection *ctdb_connect(const char *addr)
                goto fail;
        ctdb->outq = NULL;
        ctdb->doneq = NULL;
-       ctdb->immediateq = NULL;
        ctdb->in = NULL;
        ctdb->message_handlers = NULL;
+       ctdb->next_id = 0;
+       ctdb->broken = false;
 
        memset(&sun, 0, sizeof(sun));
        sun.sun_family = AF_UNIX;
@@ -270,8 +272,9 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                        if (io_elem_finished(ctdb->outq->io)) {
                                struct ctdb_request *done = ctdb->outq;
                                DLIST_REMOVE(ctdb->outq, done);
-                               DLIST_ADD_END(ctdb->doneq, done,
-                                             struct ctdb_request);
+                               /* We add at the head: any dead ones
+                                * sit and end. */
+                               DLIST_ADD(ctdb->doneq, done);
                        }
                }
        }
@@ -303,12 +306,6 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                }
        }
 
-       while (ctdb->immediateq) {
-               struct ctdb_request *imm = ctdb->immediateq;
-               imm->callback(ctdb, imm, imm->priv_data);
-               DLIST_REMOVE(ctdb->immediateq, imm);
-       }
-
        return 0;
 }
 
@@ -349,7 +346,7 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
        struct ctdb_request *req;
        struct ctdb_req_control *pkt;
 
-       req = new_ctdb_request(sizeof(*pkt) + extra, callback, cbdata);
+       req = new_ctdb_request(offsetof(struct ctdb_req_control, data) + extra, callback, cbdata);
        if (!req)
                return NULL;
 
@@ -357,13 +354,14 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
                                CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
 
        pkt = req->hdr.control;
+       pkt->pad = 0;
        pkt->opcode = opcode;
        pkt->srvid = 0;
        pkt->client_id = 0;
        pkt->flags = 0;
        pkt->datalen = extra;
        memcpy(pkt->data, extra_data, extra);
-       DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+       DLIST_ADD(ctdb->outq, req);
        return req;
 }
 
@@ -451,7 +449,7 @@ static void attachdb_done(struct ctdb_connection *ctdb,
        if (!reply || reply->status != 0) {
                /* We failed.  Hand request to user and have them discover it
                 * via ctdb_attachdb_recv. */
-               db->callback(ctdb, req, db);
+               db->callback(ctdb, req, db->private_data);
                return;
        }
        db->id = *(uint32_t *)reply->data;
@@ -462,7 +460,7 @@ static void attachdb_done(struct ctdb_connection *ctdb,
                                        &db->id, sizeof(db->id),
                                        attachdb_getdbpath_done, db);
        if (!req2) {
-               db->callback(ctdb, req, db);
+               db->callback(ctdb, req, db->private_data);
                return;
        }
        req->extra = req2;
@@ -529,23 +527,32 @@ struct ctdb_lock {
        /* This will always be true by the time user sees this. */
        bool held;
        struct ctdb_ltdb_header *hdr;
-       TDB_DATA data;
 
        /* For convenience, we stash original callback here. */
-       ctdb_callback_t callback;
+       ctdb_rrl_callback_t callback;
 };
 
 void ctdb_release_lock(struct ctdb_lock *lock)
 {
        if (lock->held) {
                tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+               lock->held = false;
        }
-       free(lock->hdr); /* Also frees data */
+}
+
+static void ctdb_free_lock(struct ctdb_lock *lock)
+{
+       if (lock->held) {
+               /* FIXME: report error. Callback never released the lock */
+               ctdb_release_lock(lock);
+       }
+
+       free(lock->hdr);
        free(lock);
 }
 
 /* We keep the lock if local node is the dmaster. */
-static bool try_readrecordlock(struct ctdb_lock *lock)
+static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 {
        struct ctdb_ltdb_header *hdr;
 
@@ -553,7 +560,7 @@ static bool try_readrecordlock(struct ctdb_lock *lock)
                return NULL;
        }
 
-       hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, &lock->data);
+       hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, data);
        if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
                lock->held = true;
                lock->hdr = hdr;
@@ -565,28 +572,11 @@ static bool try_readrecordlock(struct ctdb_lock *lock)
        return NULL;
 }
 
-/* If they cancel *before* we hand them the lock from
- * ctdb_readrecordlock_recv, we free it here. */
+/* If they shutdown before we hand them the lock, we free it here. */
 static void destroy_lock(struct ctdb_request *req)
 {
        ctdb_release_lock(req->extra);
-}
-
-struct ctdb_lock *ctdb_readrecordlock_recv(struct ctdb_db *ctdb_db,
-                                          struct ctdb_request *req,
-                                          TDB_DATA *data)
-{
-       struct ctdb_lock *lock = req->extra;
-
-       if (!lock->held) {
-               /* Something went wrong. */
-               return NULL;
-       }
-
-       /* Now it's their responsibility to free! */
-       req->extra_destructor = NULL;
-       *data = lock->data;
-       return lock;
+       ctdb_free_lock(req->extra);
 }
 
 static void readrecordlock_retry(struct ctdb_connection *ctdb,
@@ -594,37 +584,43 @@ static void readrecordlock_retry(struct ctdb_connection *ctdb,
 {
        struct ctdb_lock *lock = req->extra;
        struct ctdb_reply_call *reply;
+       TDB_DATA data;
 
        /* OK, we've received reply to noop migration */
        reply = unpack_reply_call(req, CTDB_NULL_FUNC);
        if (!reply || reply->status != 0) {
-               lock->callback(ctdb, req, private);
+               lock->callback(lock->ctdb_db, NULL, tdb_null, private);
+               ctdb_request_free(req); /* Also frees lock. */
+               ctdb_free_lock(lock);
                return;
        }
 
        /* Can we get lock now? */
-       if (try_readrecordlock(lock)) {
-               lock->callback(ctdb, req, private);
+       if (try_readrecordlock(lock, &data)) {
+               /* Now it's their responsibility to free lock & request! */
+               req->extra_destructor = NULL;
+               lock->callback(lock->ctdb_db, lock, data, private);
+               ctdb_free_lock(lock);
                return;
        }
 
        /* Retransmit the same request again (we lost race). */
        io_elem_reset(req->io);
-       DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
-       return;
+       DLIST_ADD(ctdb->outq, req);
 }
 
-struct ctdb_request *
-ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
-                        ctdb_callback_t callback, void *cbdata)
+bool
+ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
+                         ctdb_rrl_callback_t callback, void *cbdata)
 {
        struct ctdb_request *req;
        struct ctdb_lock *lock;
+       TDB_DATA data;
 
-       /* Setup lock. */
+       /* Setup lock */
        lock = malloc(sizeof(*lock) + key.dsize);
        if (!lock) {
-               return NULL;
+               return false;
        }
        lock->key.dptr = (void *)(lock + 1);
        memcpy(lock->key.dptr, key.dptr, key.dsize);
@@ -633,26 +629,25 @@ ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
        lock->hdr = NULL;
        lock->held = false;
 
-       /* Get ready in case we need to send a migrate request. */
-       req = new_ctdb_request(sizeof(*req->hdr.call)
-                              + key.dsize, callback, cbdata);
+       /* Fast path. */
+       if (try_readrecordlock(lock, &data)) {
+               callback(ctdb_db, lock, data, cbdata);
+               ctdb_free_lock(lock);
+               return true;
+       }
+
+       /* Slow path: create request. */
+       req = new_ctdb_request(offsetof(struct ctdb_req_call, data)
+                              + key.dsize, readrecordlock_retry, cbdata);
        if (!req) {
                ctdb_release_lock(lock);
+               ctdb_free_lock(lock);
                return NULL;
        }
        req->extra = lock;
        req->extra_destructor = destroy_lock;
-
-       if (try_readrecordlock(lock)) {
-               /* Already got it: prepare for immediate callback. */
-               DLIST_ADD_END(ctdb_db->ctdb->immediateq,
-                             req, struct ctdb_request);
-               return req;
-       }
-
        /* We store the original callback in the lock, and use our own. */
        lock->callback = callback;
-       req->callback = readrecordlock_retry;
 
        io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
                                new_reqid(ctdb_db->ctdb));
@@ -664,8 +659,8 @@ ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, TDB_DATA key,
        req->hdr.call->keylen = key.dsize;
        req->hdr.call->calldatalen = 0;
        memcpy(req->hdr.call->data, key.dptr, key.dsize);
-       DLIST_ADD_END(ctdb_db->ctdb->outq, req, struct ctdb_request);
-       return req;
+       DLIST_ADD(ctdb_db->ctdb->outq, req);
+       return true;
 }
 
 int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
@@ -675,6 +670,11 @@ int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
                return -1;
        }
 
+       if (!lock->held) {
+               /* FIXME: Report error. */
+               return -1;
+       }
+
        return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr,
                                data);
 }