Split ctdb_release_lock() into a function to release the locvk and another function...
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Sat, 5 Jun 2010 05:38:11 +0000 (15:38 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Sat, 5 Jun 2010 05:38:11 +0000 (15:38 +1000)
This allows us to keep the datastructure valid after the lock has been released by the application and we can trap and warn when the application is accessing the lock after it has been released. I.e. application bugs.

include/ctdb.h
libctdb/ctdb.c
libctdb/tst.c

index e62bb45a15fa27d44f463014c43b1e20f5477f2c..0dc5da720c647cb53a478bef025a66be2d744342 100644 (file)
@@ -214,6 +214,9 @@ struct ctdb_db *ctdb_attachdb_recv(struct ctdb_connection *ctdb,
  *
  * You MUST NOT block during holding this lock and MUST release it
  * quickly by performing ctdb_release_lock(lock).
+ * Do NOT make any system calls that may block while holding the lock.
+ *
+ * Try to release the lock as quickly as possible.
  */
 struct ctdb_lock;
 
index 9d79f6e013bb5a5667a03d730b5d5e62705429db..d60c8894db15aa1ddfe885869a3bbedfdb52a942 100644 (file)
@@ -642,10 +642,26 @@ void ctdb_release_lock(struct ctdb_lock *lock)
                lock->held = false;
                remove_lock(lock->ctdb_db->ctdb, lock);
        }
+}
+
+static void ctdb_free_lock(struct ctdb_lock *lock)
+{
+       if (lock->held) {
+               errno = EEXIST;
+               DEBUG(lock->ctdb_db->ctdb, LOG_ERR,
+                       "Lock freed before it was released");
+               ctdb_release_lock(lock);
+       }
        free(lock->hdr);
        free(lock);
 }
 
+static void ctdb_destroy_lock(struct ctdb_lock *lock)
+{
+       ctdb_release_lock(lock);
+       ctdb_free_lock(lock);
+}
+
 /* We keep the lock if local node is the dmaster. */
 static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 {
@@ -676,7 +692,7 @@ static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 static void destroy_lock(struct ctdb_connection *ctdb,
                         struct ctdb_request *req)
 {
-       ctdb_release_lock(req->extra);
+       ctdb_destroy_lock(req->extra);
 }
 
 static void readrecordlock_retry(struct ctdb_connection *ctdb,
@@ -704,6 +720,7 @@ static void readrecordlock_retry(struct ctdb_connection *ctdb,
                /* Now it's their responsibility to free lock & request! */
                req->extra_destructor = NULL;
                lock->callback(lock->ctdb_db, lock, data, private);
+               ctdb_free_lock(lock);
                return;
        }
 
@@ -743,6 +760,7 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        /* Fast path. */
        if (try_readrecordlock(lock, &data)) {
                callback(ctdb_db, lock, data, cbdata);
+               ctdb_free_lock(lock);
                return true;
        }
 
@@ -752,7 +770,7 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        if (!req) {
                DEBUG(ctdb_db->ctdb, LOG_ERR,
                      "ctdb_readrecordlock_async: allocation failed");
-               ctdb_release_lock(lock);
+               ctdb_destroy_lock(lock);
                return NULL;
        }
        req->extra = lock;
index c8c08cfe7d1ae6162f27982a8cbbeea7b193d2eb..391e92c72768a28bc2f5606472210589426112da 100644 (file)
@@ -109,9 +109,11 @@ static void rrl_cb(struct ctdb_db *ctdb_db,
        data.dsize = strlen(tmp) + 1;
        ctdb_writerecord(lock, data);
 
+       /* Release the lock as quickly as possible */
+       ctdb_release_lock(lock);
+
        printf("Wrote new record : %s\n", tmp);
 
-       ctdb_release_lock(lock);
 }
 
 static bool registered = false;