libctdb: track lock for each ctdb_db, complain if they hold too long.
authorRusty Russell <rusty@rustcorp.com.au>
Fri, 4 Jun 2010 10:11:42 +0000 (19:41 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Fri, 4 Jun 2010 10:11:42 +0000 (19:41 +0930)
In particular, this stops them grabbing two (with wrappers so we can
enhance this logic once we support threads), and warns them if they
re-enter ctdb_service() holding a lock (you are not supposed to block!).

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
libctdb/ctdb.c
libctdb/libctdb_private.h

index 915e496f54233d76d7c7173181ff84d4c32c9d9d..5e4121fc2ff058114fbd5cda54afd5ece7849892 100644 (file)
 #undef ctdb_readrecordlock_async
 #undef ctdb_connect
 
+struct ctdb_lock {
+       struct ctdb_lock *next, *prev;
+
+       struct ctdb_db *ctdb_db;
+       TDB_DATA key;
+
+       /* This will always be true by the time user sees this. */
+       bool held;
+       struct ctdb_ltdb_header *hdr;
+
+       /* For convenience, we stash original callback here. */
+       ctdb_rrl_callback_t callback;
+};
+
+static void remove_lock(struct ctdb_connection *ctdb, struct ctdb_lock *lock)
+{
+       DLIST_REMOVE(ctdb->locks, lock);
+}
+
+/* FIXME: for thread safety, need tid info too. */
+static bool holding_lock(struct ctdb_connection *ctdb)
+{
+       /* For the moment, you can't ever hold more than 1 lock. */
+       return (ctdb->locks != NULL);
+}
+
+static void add_lock(struct ctdb_connection *ctdb, struct ctdb_lock *lock)
+{
+       DLIST_ADD(ctdb->locks, lock);
+}
+
 /* FIXME: Could be in shared util code with rest of ctdb */
 static void close_noerr(int fd)
 {
@@ -302,6 +333,10 @@ int ctdb_service(struct ctdb_connection *ctdb, int revents)
                return -1;
        }
 
+       if (holding_lock(ctdb)) {
+               DEBUG(ctdb, LOG_WARNING, "Do not block while holding lock!");
+       }
+
        if (revents & POLLOUT) {
                while (ctdb->outq) {
                        if (real_error(write_io_elem(ctdb->fd,
@@ -435,6 +470,9 @@ struct ctdb_db {
        uint32_t id;
        struct tdb_context *tdb;
 
+       /* The lock we are holding, if any (we can only have one!) */
+       struct ctdb_lock *lock;
+
        ctdb_callback_t callback;
        void *private_data;
 };
@@ -595,35 +633,15 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
        return req;
 }
 
-struct ctdb_lock {
-       struct ctdb_db *ctdb_db;
-       TDB_DATA key;
-
-       /* This will always be true by the time user sees this. */
-       bool held;
-       struct ctdb_ltdb_header *hdr;
-
-       /* For convenience, we stash original callback here. */
-       ctdb_rrl_callback_t callback;
-};
-
 void ctdb_release_lock(struct ctdb_lock *lock)
 {
        if (lock->held) {
-               DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
-                     "ctdb_attachdb_send: ctdb_release_lock %p", lock);
                tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+               DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
+                     "ctdb_release_lock %p", lock);
                lock->held = false;
+               remove_lock(lock->ctdb_db->ctdb, lock);
        }
-}
-
-static void ctdb_free_lock(struct ctdb_lock *lock)
-{
-       if (lock->held) {
-               /* FIXME: report error. Callback never released the lock */
-               ctdb_release_lock(lock);
-       }
-
        free(lock->hdr);
        free(lock);
 }
@@ -645,6 +663,7 @@ static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
                      "ctdb_readrecordlock_async: got local lock");
                lock->held = true;
                lock->hdr = hdr;
+               add_lock(lock->ctdb_db->ctdb, lock);
                return true;
        }
 
@@ -658,7 +677,6 @@ static void destroy_lock(struct ctdb_connection *ctdb,
                         struct ctdb_request *req)
 {
        ctdb_release_lock(req->extra);
-       ctdb_free_lock(req->extra);
 }
 
 static void readrecordlock_retry(struct ctdb_connection *ctdb,
@@ -678,7 +696,6 @@ static void readrecordlock_retry(struct ctdb_connection *ctdb,
                }
                lock->callback(lock->ctdb_db, NULL, tdb_null, private);
                ctdb_request_free(ctdb, req); /* Also frees lock. */
-               ctdb_free_lock(lock);
                return;
        }
 
@@ -687,7 +704,6 @@ static void readrecordlock_retry(struct ctdb_connection *ctdb,
                /* Now it's their responsibility to free lock & request! */
                req->extra_destructor = NULL;
                lock->callback(lock->ctdb_db, lock, data, private);
-               ctdb_free_lock(lock);
                return;
        }
 
@@ -704,6 +720,12 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        struct ctdb_lock *lock;
        TDB_DATA data;
 
+       if (holding_lock(ctdb_db->ctdb)) {
+               DEBUG(ctdb_db->ctdb, LOG_ERR,
+                     "ctdb_readrecordlock_async: already holding lock");
+               return false;
+       }
+
        /* Setup lock */
        lock = malloc(sizeof(*lock) + key.dsize);
        if (!lock) {
@@ -721,7 +743,6 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        /* Fast path. */
        if (try_readrecordlock(lock, &data)) {
                callback(ctdb_db, lock, data, cbdata);
-               ctdb_free_lock(lock);
                return true;
        }
 
@@ -732,7 +753,6 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
                DEBUG(ctdb_db->ctdb, LOG_ERR,
                      "ctdb_readrecordlock_async: allocation failed");
                ctdb_release_lock(lock);
-               ctdb_free_lock(lock);
                return NULL;
        }
        req->extra = lock;
@@ -763,11 +783,6 @@ int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
                return -1;
        }
 
-       if (!lock->held) {
-               /* FIXME: Report error. */
-               return -1;
-       }
-
        return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr,
                                data);
 }
index 9e8783e533a35ca3818b89c8a167aced93753325..51b9305a086e8c33f36972777bc5dfcf6d4a7a26 100644 (file)
@@ -69,6 +69,8 @@ struct ctdb_connection {
        struct message_handler_info *message_handlers;
        /* PNN of this ctdb: valid by the time we do our first db connection. */
        uint32_t pnn;
+       /* Chain of locks we hold. */
+       struct ctdb_lock *locks;
        /* Extra logging. */
        ctdb_log_fn_t log;
        void *log_priv;