Readrecordlock changes:
[rusty/ctdb.git] / libctdb / ctdb.c
index 85875ac40d5c7c27f63ba2ef869f2519dcb93031..52cd38e4a192b6b173554ee3e23db395c2c9c009 100644 (file)
 #include <dlinklist.h>
 #include <ctdb_protocol.h>
 
+/* Remove type-safety macros. */
+#undef ctdb_attachdb_send
+#undef ctdb_readrecordlock_async
+
 /* FIXME: Could be in shared util code with rest of ctdb */
 static void close_noerr(int fd)
 {
@@ -63,14 +67,15 @@ static void set_close_on_exec(int fd)
         fcntl(fd, F_SETFD, v | FD_CLOEXEC);
 }
 
-static void set_pnn(int32_t status, uint32_t pnn, void *private_data)
+static void set_pnn(struct ctdb_connection *ctdb,
+                   struct ctdb_request *req,
+                   void *unused)
 {
-       if (status != 0) {
+       if (ctdb_getpnn_recv(req, &ctdb->pnn) != 0) {
                /* FIXME: Report error. */
-               ((struct ctdb_connection *)private_data)->broken = true;
-       } else {
-               ((struct ctdb_connection *)private_data)->pnn = pnn;
+               ctdb->broken = true;
        }
+       ctdb_request_free(req);
 }
 
 struct ctdb_connection *ctdb_connect(const char *addr)
@@ -83,9 +88,10 @@ struct ctdb_connection *ctdb_connect(const char *addr)
                goto fail;
        ctdb->outq = NULL;
        ctdb->doneq = NULL;
-       ctdb->immediateq = NULL;
        ctdb->in = NULL;
        ctdb->message_handlers = NULL;
+       ctdb->next_id = 0;
+       ctdb->broken = false;
 
        memset(&sun, 0, sizeof(sun));
        sun.sun_family = AF_UNIX;
@@ -103,7 +109,7 @@ struct ctdb_connection *ctdb_connect(const char *addr)
                goto close_fail;
 
        /* Immediately queue a request to get our pnn. */
-       if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, ctdb))
+       if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, NULL))
                goto close_fail;
 
        return ctdb;
@@ -130,7 +136,8 @@ int ctdb_which_events(struct ctdb_connection *ctdb)
        return events;
 }
 
-struct ctdb_request *new_ctdb_request(size_t len)
+struct ctdb_request *new_ctdb_request(size_t len,
+                                     ctdb_callback_t cb, void *cbdata)
 {
        struct ctdb_request *req = malloc(sizeof(*req));
        if (!req)
@@ -141,105 +148,104 @@ struct ctdb_request *new_ctdb_request(size_t len)
                return NULL;
        }
        req->hdr.hdr = io_elem_data(req->io, NULL);
-       req->cancelled = false;
+       req->reply = NULL;
+       req->callback = cb;
+       req->priv_data = cbdata;
+       req->extra = NULL;
+       req->extra_destructor = NULL;
        return req;
 }
 
-static struct ctdb_request *new_immediate_request(void)
+void ctdb_request_free(struct ctdb_request *req)
 {
-       struct ctdb_request *req = malloc(sizeof(*req));
-       if (!req)
-               return NULL;
-       req->cancelled = false;
-       req->io = NULL;
-       req->hdr.hdr = NULL;
-       return req;
-}
-
-static void free_ctdb_request(struct ctdb_request *req)
-{
-       /* immediate requests don't have IO */
-       if (req->io) {
-               free_io_elem(req->io);
+       if (req->extra_destructor) {
+               req->extra_destructor(req);
+       }
+       if (req->reply) {
+               free_io_elem(req->reply);
        }
+       free_io_elem(req->io);
        free(req);
 }
 
-static void handle_call_reply(struct ctdb_connection *ctdb,
-                             struct ctdb_req_header *hdr,
-                             struct ctdb_request *i)
+/* Sanity-checking wrapper for reply.
+ * FIXME: logging! */
+static struct ctdb_reply_call *unpack_reply_call(struct ctdb_request *req,
+                                                uint32_t callid)
 {
-       struct ctdb_req_call *call = i->hdr.call;
-       struct ctdb_reply_call *reply = (struct ctdb_reply_call *)hdr;
+       size_t len;
+       struct ctdb_reply_call *inhdr = io_elem_data(req->reply, &len);
 
-       switch (call->callid) {
-       case CTDB_NULL_FUNC:
-               /* FIXME: We should let it steal the request, rathe than copy */
-               i->callback.nullfunc(reply->status, reply, i->priv_data);
-               break;
+       /* ctdbd or our error if this isn't a reply call. */
+       if (len < sizeof(*inhdr) || inhdr->hdr.operation != CTDB_REPLY_CALL) {
+               errno = EIO;
+               return NULL;
        }
+
+       /* Library user error if this isn't a reply to a call. */
+       if (req->hdr.hdr->operation != CTDB_REQ_CALL
+           || req->hdr.call->callid != callid) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       return inhdr;
 }
 
-static void handle_control_reply(struct ctdb_connection *ctdb,
-                                struct ctdb_req_header *hdr,
-                                struct ctdb_request *i)
+/* Sanity-checking wrapper for reply.
+ * FIXME: logging! */
+struct ctdb_reply_control *unpack_reply_control(struct ctdb_request *req,
+                                               enum ctdb_controls control)
 {
-       struct ctdb_req_control *control = i->hdr.control;
-       struct ctdb_reply_control *reply = (struct ctdb_reply_control *)hdr;
+       size_t len;
+       struct ctdb_reply_control *inhdr = io_elem_data(req->reply, &len);
 
-       switch (control->opcode) {
-       case CTDB_CONTROL_GET_RECMASTER:
-               i->callback.getrecmaster(0, reply->status, i->priv_data);
-               break;
-       case CTDB_CONTROL_GET_PNN:
-               i->callback.getpnn(0, reply->status, i->priv_data);
-               break;
-       case CTDB_CONTROL_REGISTER_SRVID:
-               i->callback.register_srvid(reply->status, i->priv_data);
-               break;
-       case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
-       case CTDB_CONTROL_DB_ATTACH:
-               i->callback.attachdb(reply->status, *(uint32_t *)reply->data,
-                                    i->priv_data);
-               break;
-       case CTDB_CONTROL_GETDBPATH:
-               i->callback.getdbpath(reply->status, (char *)reply->data,
-                                     i->priv_data);
-               break;
+       /* Library user error if this isn't a reply to a call. */
+       if (len < sizeof(*inhdr)
+           || req->hdr.hdr->operation != CTDB_REQ_CONTROL) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       /* ... or if it was a different control from what we expected. */
+       if (req->hdr.control->opcode != control) {
+               errno = EINVAL;
+               return NULL;
+       }
+
+       /* ctdbd or our error if this isn't a reply call. */
+       if (inhdr->hdr.operation != CTDB_REPLY_CONTROL) {
+               errno = EIO;
+               return NULL;
        }
+
+       return inhdr;
 }
 
-static void handle_incoming(struct ctdb_connection *ctdb,
-                           struct ctdb_req_header *hdr,
-                           size_t len /* FIXME: use len to check packet! */)
+static void handle_incoming(struct ctdb_connection *ctdb, struct io_elem *in)
 {
+       struct ctdb_req_header *hdr;
+       size_t len;
        struct ctdb_request *i;
 
+       hdr = io_elem_data(in, &len);
+       /* FIXME: use len to check packet! */
+
        if (hdr->operation == CTDB_REQ_MESSAGE) {
                deliver_message(ctdb, hdr);
                return;
        }
 
-       if (hdr->operation != CTDB_REPLY_CALL
-           && hdr->operation != CTDB_REPLY_CONTROL) {
-               /* FIXME: report this error. */
-               return;
-       }
-
        for (i = ctdb->doneq; i; i = i->next) {
                if (i->hdr.hdr->reqid == hdr->reqid) {
-                       if (!i->cancelled) {
-                               if (hdr->operation == CTDB_REPLY_CALL)
-                                       handle_call_reply(ctdb, hdr, i);
-                               else
-                                       handle_control_reply(ctdb, hdr, i);
-                       }
                        DLIST_REMOVE(ctdb->doneq, i);
-                       free_ctdb_request(i);
+                       i->reply = in;
+                       i->callback(ctdb, i, i->priv_data);
                        return;
                }
        }
        /* FIXME: report this error. */
+       free_io_elem(in);
 }
 
 /* Remove "harmless" errors. */
@@ -266,12 +272,9 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                        if (io_elem_finished(ctdb->outq->io)) {
                                struct ctdb_request *done = ctdb->outq;
                                DLIST_REMOVE(ctdb->outq, done);
-                               if (done->cancelled) {
-                                       free_ctdb_request(done);
-                               } else {
-                                       DLIST_ADD_END(ctdb->doneq, done,
-                                                     struct ctdb_request);
-                               }
+                               /* We add at the head: any dead ones
+                                * sit and end. */
+                               DLIST_ADD(ctdb->doneq, done);
                        }
                }
        }
@@ -298,24 +301,11 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                        /* No progress, stop loop. */
                        revents = 0;
                } else if (io_elem_finished(ctdb->in)) {
-                       struct ctdb_req_header *hdr;
-                       size_t len;
-
-                       hdr = io_elem_data(ctdb->in, &len);
-                       handle_incoming(ctdb, hdr, len);
-                       free_io_elem(ctdb->in);
+                       handle_incoming(ctdb, ctdb->in);
                        ctdb->in = NULL;
                }
        }
 
-       while (ctdb->immediateq) {
-               struct ctdb_request *imm = ctdb->immediateq;
-               /* This has to handle fake->cancelled internally. */
-               imm->callback.immediate(imm, imm->priv_data);
-               DLIST_REMOVE(ctdb->immediateq, imm);
-               free_ctdb_request(imm);
-       }
-
        return 0;
 }
 
@@ -349,12 +339,14 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
                                              uint32_t opcode,
                                              uint32_t destnode,
                                              const void *extra_data,
-                                             size_t extra)
+                                             size_t extra,
+                                             ctdb_callback_t callback,
+                                             void *cbdata)
 {
        struct ctdb_request *req;
        struct ctdb_req_control *pkt;
 
-       req = new_ctdb_request(sizeof(*pkt) + extra);
+       req = new_ctdb_request(offsetof(struct ctdb_req_control, data) + extra, callback, cbdata);
        if (!req)
                return NULL;
 
@@ -362,20 +354,28 @@ struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb,
                                CTDB_REQ_CONTROL, destnode, new_reqid(ctdb));
 
        pkt = req->hdr.control;
+       pkt->pad = 0;
        pkt->opcode = opcode;
        pkt->srvid = 0;
        pkt->client_id = 0;
        pkt->flags = 0;
        pkt->datalen = extra;
        memcpy(pkt->data, extra_data, extra);
-       DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
+       DLIST_ADD(ctdb->outq, req);
        return req;
 }
 
+void ctdb_cancel_callback(struct ctdb_connection *ctdb,
+                         struct ctdb_request *req,
+                         void *unused)
+{
+       ctdb_request_free(req);
+}
+
 int ctdb_cancel(struct ctdb_request *req)
 {
        /* FIXME: If it's not sent, we could just free it right now. */
-       req->cancelled = true;
+       req->callback = ctdb_cancel_callback;
        return 0;
 }
 
@@ -386,71 +386,107 @@ struct ctdb_db {
        uint32_t id;
        struct tdb_context *tdb;
 
-       ctdb_attachdb_cb callback;
+       ctdb_callback_t callback;
        void *private_data;
 };
 
-static void attachdb_getdbpath_done(int status, const char *path,
+static void attachdb_getdbpath_done(struct ctdb_connection *ctdb,
+                                   struct ctdb_request *req,
                                    void *_db)
 {
        struct ctdb_db *db = _db;
+
+       /* Do callback on original request. */
+       db->callback(ctdb, req->extra, db->private_data);
+}
+
+struct ctdb_db *ctdb_attachdb_recv(struct ctdb_request *req)
+{
+       struct ctdb_request *dbpath_req = req->extra;
+       struct ctdb_reply_control *reply;
+       struct ctdb_db *db = req->priv_data;
        uint32_t tdb_flags = db->tdb_flags;
 
-       if (status != 0) {
-               db->callback(status, NULL, db->private_data);
-               free(db);
-               return;
+       /* Never sent the dbpath request?  We've failed. */
+       if (!dbpath_req) {
+               /* FIXME: Save errno? */
+               errno = EINVAL;
+               return NULL;
+       }
+
+       reply = unpack_reply_control(dbpath_req, CTDB_CONTROL_GETDBPATH);
+       if (!reply || reply->status != 0) {
+               return NULL;
        }
 
        tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC;
        tdb_flags |= TDB_DISALLOW_NESTING;
 
-       db->tdb = tdb_open(path, 0, tdb_flags, O_RDWR, 0);
+       db->tdb = tdb_open((char *)reply->data, 0, tdb_flags, O_RDWR, 0);
        if (db->tdb == NULL) {
-               db->callback(-1, NULL, db->private_data);
-               free(db);
-               return;
+               return NULL;
        }
 
-       /* Finally, we tell the client that we opened the db. */
-       db->callback(status, db, db->private_data);
+       /* Finally, separate the db from the request (see destroy_req_db). */
+       req->priv_data = NULL;
+       return db;
 }
 
-static void attachdb_done(int status, uint32_t id, struct ctdb_db *db)
+static void attachdb_done(struct ctdb_connection *ctdb,
+                         struct ctdb_request *req,
+                         void *_db)
 {
-       struct ctdb_request *req;
+       struct ctdb_db *db = _db;
+       struct ctdb_request *req2;
+       struct ctdb_reply_control *reply;
+       enum ctdb_controls control = CTDB_CONTROL_DB_ATTACH;
 
-       if (status != 0) {
-               db->callback(status, NULL, db->private_data);
-               free(db);
+       if (db->persistent) {
+               control = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+       }
+
+       reply = unpack_reply_control(req, control);
+       if (!reply || reply->status != 0) {
+               /* We failed.  Hand request to user and have them discover it
+                * via ctdb_attachdb_recv. */
+               db->callback(ctdb, req, db->private_data);
                return;
        }
-       db->id = id;
+       db->id = *(uint32_t *)reply->data;
 
        /* Now we do another call, to get the dbpath. */
-       req = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
-                                      CTDB_CURRENT_NODE, &id, sizeof(id));
-       if (!req) {
-               db->callback(-1, NULL, db->private_data);
-               free(db);
+       req2 = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH,
+                                       CTDB_CURRENT_NODE,
+                                       &db->id, sizeof(db->id),
+                                       attachdb_getdbpath_done, db);
+       if (!req2) {
+               db->callback(ctdb, req, db->private_data);
                return;
        }
-       req->callback.getdbpath = attachdb_getdbpath_done;
-       req->priv_data = db;
+       req->extra = req2;
+       req2->extra = req;
+}
+
+static void destroy_req_db(struct ctdb_request *req)
+{
+       /* Incomplete db is in priv_data. */
+       free(req->priv_data);
+       /* second request is chained off this one. */
+       if (req->extra) {
+               ctdb_request_free(req->extra);
+       }
 }
 
 struct ctdb_request *
 ctdb_attachdb_send(struct ctdb_connection *ctdb,
                   const char *name, int persistent, uint32_t tdb_flags,
-                  ctdb_attachdb_cb callback,
-                  void *private_data)
+                  ctdb_callback_t callback, void *private_data)
 {
        struct ctdb_request *req;
        struct ctdb_db *db;
        uint32_t opcode;
 
        /* FIXME: Search if db already open. */
-
        db = malloc(sizeof(*db));
        if (!db) {
                return NULL;
@@ -463,7 +499,7 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
        }
 
        req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name,
-                                      strlen(name) + 1);
+                                      strlen(name) + 1, attachdb_done, db);
        if (!req) {
                free(db);
                return NULL;
@@ -475,8 +511,9 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
        db->callback = callback;
        db->private_data = private_data;
 
-       req->callback.attachdb = attachdb_done;
-       req->priv_data = db;
+       req->extra_destructor = destroy_req_db;
+       /* This is set non-NULL when we succeed, see ctdb_attachdb_recv */
+       req->extra = NULL;
 
        /* Flags get overloaded into srvid. */
        req->hdr.control->srvid = tdb_flags;
@@ -486,150 +523,144 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
 struct ctdb_lock {
        struct ctdb_db *ctdb_db;
        TDB_DATA key;
-       struct ctdb_ltdb_header *hdr;
-       TDB_DATA data;
+
+       /* This will always be true by the time user sees this. */
        bool held;
-       /* For convenience, we stash this here. */
-       ctdb_readrecordlock_cb callback;
-       void *private_data;
+       struct ctdb_ltdb_header *hdr;
+
+       /* For convenience, we stash original callback here. */
+       ctdb_rrl_callback_t callback;
 };
 
 void ctdb_release_lock(struct ctdb_lock *lock)
 {
        if (lock->held) {
                tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+               lock->held = false;
        }
-       free(lock->key.dptr);
-       free(lock->hdr); /* Also frees lock->data */
+}
+
+static void ctdb_free_lock(struct ctdb_lock *lock)
+{
+       if (lock->held) {
+               /* FIXME: report error. Callback never released the lock */
+               ctdb_release_lock(lock);
+       }
+
+       free(lock->hdr);
        free(lock);
 }
 
 /* We keep the lock if local node is the dmaster. */
-static bool try_readrecordlock(struct ctdb_lock *lock)
+static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 {
+       struct ctdb_ltdb_header *hdr;
+
        if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) {
-               return false;
+               return NULL;
        }
 
-       lock->hdr = ctdb_local_fetch(lock->ctdb_db->tdb,
-                                    lock->key, &lock->data);
-       if (lock->hdr && lock->hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
+       hdr = ctdb_local_fetch(lock->ctdb_db->tdb, lock->key, data);
+       if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
                lock->held = true;
+               lock->hdr = hdr;
                return true;
        }
 
        tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
-       free(lock->hdr);
-       return false;
+       free(hdr);
+       return NULL;
 }
 
-static void readrecordlock_done(int, struct ctdb_reply_call *, void *);
-
-static struct ctdb_request *new_readrecordlock_request(struct ctdb_lock *lock)
+/* If they shutdown before we hand them the lock, we free it here. */
+static void destroy_lock(struct ctdb_request *req)
 {
-       struct ctdb_request *req;
-       struct ctdb_req_call *pkt;
-
-       req = new_ctdb_request(sizeof(*pkt) + lock->key.dsize);
-       if (!req)
-               return NULL;
-       req->callback.nullfunc = readrecordlock_done;
-       req->priv_data = lock;
-
-       io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
-                               new_reqid(lock->ctdb_db->ctdb));
-
-       pkt = req->hdr.call;
-       pkt->flags = CTDB_IMMEDIATE_MIGRATION;
-       pkt->db_id = lock->ctdb_db->id;
-       pkt->callid = CTDB_NULL_FUNC;
-       pkt->hopcount = 0;
-       pkt->keylen = lock->key.dsize;
-       pkt->calldatalen = 0;
-       memcpy(pkt->data, lock->key.dptr, lock->key.dsize);
-       DLIST_ADD_END(lock->ctdb_db->ctdb->outq, req, struct ctdb_request);
-       return req;
+       ctdb_release_lock(req->extra);
+       ctdb_free_lock(req->extra);
 }
 
-/* OK, let's try again... */
-static void readrecordlock_done(int status, struct ctdb_reply_call *reply,
-                               void *_lock)
+static void readrecordlock_retry(struct ctdb_connection *ctdb,
+                                struct ctdb_request *req, void *private)
 {
-       struct ctdb_lock *lock = _lock;
+       struct ctdb_lock *lock = req->extra;
+       struct ctdb_reply_call *reply;
+       TDB_DATA data;
 
-       if (status != 0) {
-               lock->callback(status, NULL, tdb_null, lock->private_data);
-               ctdb_release_lock(lock);
+       /* OK, we've received reply to noop migration */
+       reply = unpack_reply_call(req, CTDB_NULL_FUNC);
+       if (!reply || reply->status != 0) {
+               lock->callback(lock->ctdb_db, NULL, tdb_null, private);
+               ctdb_request_free(req); /* Also frees lock. */
+               ctdb_free_lock(lock);
                return;
        }
 
-       if (try_readrecordlock(lock)) {
-               lock->callback(0, lock, lock->data, lock->private_data);
+       /* Can we get lock now? */
+       if (try_readrecordlock(lock, &data)) {
+               /* Now it's their responsibility to free lock & request! */
+               req->extra_destructor = NULL;
+               lock->callback(lock->ctdb_db, lock, data, private);
+               ctdb_free_lock(lock);
                return;
        }
 
-       if (!new_readrecordlock_request(lock)) {
-               lock->callback(-1, NULL, tdb_null, lock->private_data);
-               ctdb_release_lock(lock);
-       }
-}
-
-static void lock_complete(struct ctdb_request *req, void *_lock)
-{
-       struct ctdb_lock *lock = _lock;
-
-       if (!req->cancelled) {
-               lock->callback(0, lock, lock->data, lock->private_data);
-       } else {
-               ctdb_release_lock(lock);
-       }
+       /* Retransmit the same request again (we lost race). */
+       io_elem_reset(req->io);
+       DLIST_ADD(ctdb->outq, req);
 }
 
-struct ctdb_request *
-ctdb_readrecordlock_send(struct ctdb_db *ctdb_db,
-                        TDB_DATA key,
-                        ctdb_readrecordlock_cb callback,
-                        void *private_data)
+bool
+ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
+                         ctdb_rrl_callback_t callback, void *cbdata)
 {
        struct ctdb_request *req;
        struct ctdb_lock *lock;
+       TDB_DATA data;
 
-       lock = malloc(sizeof(*lock));
-       if (!lock)
-               return NULL;
-       lock->key.dptr = malloc(key.dsize);
-       if (!lock->key.dptr) {
-               free_noerr(lock);
-               return NULL;
+       /* Setup lock */
+       lock = malloc(sizeof(*lock) + key.dsize);
+       if (!lock) {
+               return false;
        }
+       lock->key.dptr = (void *)(lock + 1);
        memcpy(lock->key.dptr, key.dptr, key.dsize);
        lock->key.dsize = key.dsize;
        lock->ctdb_db = ctdb_db;
-       lock->callback = callback;
-       lock->private_data = private_data;
        lock->hdr = NULL;
        lock->held = false;
 
-       if (try_readrecordlock(lock)) {
-               /* We pretend to be async, so we just queue this. */
-               req = new_immediate_request();
-               if (!req) {
-                       ctdb_release_lock(lock);
-                       return NULL;
-               }
-               req->callback.immediate = lock_complete;
-               req->priv_data = lock;
-               DLIST_ADD_END(lock->ctdb_db->ctdb->immediateq,
-                             req, struct ctdb_request);
-               return req;
+       /* Fast path. */
+       if (try_readrecordlock(lock, &data)) {
+               callback(ctdb_db, lock, data, cbdata);
+               ctdb_free_lock(lock);
+               return true;
        }
 
-       req = new_readrecordlock_request(lock);
+       /* Slow path: create request. */
+       req = new_ctdb_request(offsetof(struct ctdb_req_call, data)
+                              + key.dsize, readrecordlock_retry, cbdata);
        if (!req) {
                ctdb_release_lock(lock);
+               ctdb_free_lock(lock);
                return NULL;
        }
-       return req;
+       req->extra = lock;
+       req->extra_destructor = destroy_lock;
+       /* We store the original callback in the lock, and use our own. */
+       lock->callback = callback;
+
+       io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE,
+                               new_reqid(ctdb_db->ctdb));
+
+       req->hdr.call->flags = CTDB_IMMEDIATE_MIGRATION;
+       req->hdr.call->db_id = ctdb_db->id;
+       req->hdr.call->callid = CTDB_NULL_FUNC;
+       req->hdr.call->hopcount = 0;
+       req->hdr.call->keylen = key.dsize;
+       req->hdr.call->calldatalen = 0;
+       memcpy(req->hdr.call->data, key.dptr, key.dsize);
+       DLIST_ADD(ctdb_db->ctdb->outq, req);
+       return true;
 }
 
 int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
@@ -639,5 +670,11 @@ int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
                return -1;
        }
 
-       return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, data);
+       if (!lock->held) {
+               /* FIXME: Report error. */
+               return -1;
+       }
+
+       return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr,
+                               data);
 }