libctdb: clarify logging levels
[rusty/ctdb.git] / libctdb / ctdb.c
index d60c8894db15aa1ddfe885869a3bbedfdb52a942..afe7e1dff7aa04e025d9cfb6e3629c5387bdbbb6 100644 (file)
@@ -42,8 +42,8 @@ struct ctdb_lock {
        struct ctdb_db *ctdb_db;
        TDB_DATA key;
 
-       /* This will always be true by the time user sees this. */
-       bool held;
+       /* This will always be set by the time user sees this. */
+       unsigned long held_magic;
        struct ctdb_ltdb_header *hdr;
 
        /* For convenience, we stash original callback here. */
@@ -222,7 +222,7 @@ static struct ctdb_reply_call *unpack_reply_call(struct ctdb_connection *ctdb,
        /* Library user error if this isn't a reply to a call. */
        if (req->hdr.hdr->operation != CTDB_REQ_CALL) {
                errno = EINVAL;
-               DEBUG(ctdb, LOG_ERR,
+               DEBUG(ctdb, LOG_ALERT,
                      "This was not a ctdbd call request: operation %u",
                      req->hdr.hdr->operation);
                return NULL;
@@ -230,7 +230,7 @@ static struct ctdb_reply_call *unpack_reply_call(struct ctdb_connection *ctdb,
 
        if (req->hdr.call->callid != callid) {
                errno = EINVAL;
-               DEBUG(ctdb, LOG_ERR,
+               DEBUG(ctdb, LOG_ALERT,
                      "This was not a ctdbd %u call request: %u",
                      callid, req->hdr.call->callid);
                return NULL;
@@ -259,13 +259,13 @@ struct ctdb_reply_control *unpack_reply_control(struct ctdb_connection *ctdb,
        /* Library user error if this isn't a reply to a call. */
        if (len < sizeof(*inhdr)) {
                errno = EINVAL;
-               DEBUG(ctdb, LOG_CRIT,
+               DEBUG(ctdb, LOG_ALERT,
                      "Short ctdbd control reply: %zu bytes", len);
                return NULL;
        }
        if (req->hdr.hdr->operation != CTDB_REQ_CONTROL) {
                errno = EINVAL;
-               DEBUG(ctdb, LOG_ERR,
+               DEBUG(ctdb, LOG_ALERT,
                      "This was not a ctdbd control request: operation %u",
                      req->hdr.hdr->operation);
                return NULL;
@@ -274,7 +274,7 @@ struct ctdb_reply_control *unpack_reply_control(struct ctdb_connection *ctdb,
        /* ... or if it was a different control from what we expected. */
        if (req->hdr.control->opcode != control) {
                errno = EINVAL;
-               DEBUG(ctdb, LOG_ERR,
+               DEBUG(ctdb, LOG_ALERT,
                      "This was not an opcode %u ctdbd control request: %u",
                      control, req->hdr.control->opcode);
                return NULL;
@@ -335,7 +335,7 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents)
        }
 
        if (holding_lock(ctdb)) {
-               DEBUG(ctdb, LOG_WARNING, "Do not block while holding lock!");
+               DEBUG(ctdb, LOG_ALERT, "Do not block while holding lock!");
        }
 
        if (revents & POLLOUT) {
@@ -633,35 +633,43 @@ ctdb_attachdb_send(struct ctdb_connection *ctdb,
        return req;
 }
 
-void ctdb_release_lock(struct ctdb_lock *lock)
+static unsigned long lock_magic(struct ctdb_lock *lock)
 {
-       if (lock->held) {
-               tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
-               DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
-                     "ctdb_release_lock %p", lock);
-               lock->held = false;
-               remove_lock(lock->ctdb_db->ctdb, lock);
-       }
+       /* A non-zero magic specific to this structure. */
+       return ((unsigned long)lock->key.dptr
+               ^ (((unsigned long)lock->key.dptr) << 16)
+               ^ 0xBADC0FFEEBADC0DEULL)
+               | 1;
 }
 
-static void ctdb_free_lock(struct ctdb_lock *lock)
+/* This is only called on locks before they're held. */
+static void free_lock(struct ctdb_lock *lock)
 {
-       if (lock->held) {
-               errno = EEXIST;
-               DEBUG(lock->ctdb_db->ctdb, LOG_ERR,
-                       "Lock freed before it was released");
-               ctdb_release_lock(lock);
+       if (lock->held_magic) {
+               DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
+                     "free_lock invalid lock %p", lock);
        }
        free(lock->hdr);
        free(lock);
 }
 
-static void ctdb_destroy_lock(struct ctdb_lock *lock)
+
+void ctdb_release_lock(struct ctdb_lock *lock)
 {
-       ctdb_release_lock(lock);
-       ctdb_free_lock(lock);
+       if (lock->held_magic != lock_magic(lock)) {
+               DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
+                     "ctdb_release_lock invalid lock %p", lock);
+       } else {
+               tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+               DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
+                     "ctdb_release_lock %p", lock);
+               remove_lock(lock->ctdb_db->ctdb, lock);
+       }
+       lock->held_magic = 0;
+       free_lock(lock);
 }
 
+
 /* We keep the lock if local node is the dmaster. */
 static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 {
@@ -677,7 +685,7 @@ static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
        if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
                DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
                      "ctdb_readrecordlock_async: got local lock");
-               lock->held = true;
+               lock->held_magic = lock_magic(lock);
                lock->hdr = hdr;
                add_lock(lock->ctdb_db->ctdb, lock);
                return true;
@@ -692,7 +700,7 @@ static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
 static void destroy_lock(struct ctdb_connection *ctdb,
                         struct ctdb_request *req)
 {
-       ctdb_destroy_lock(req->extra);
+       free_lock(req->extra);
 }
 
 static void readrecordlock_retry(struct ctdb_connection *ctdb,
@@ -720,7 +728,6 @@ static void readrecordlock_retry(struct ctdb_connection *ctdb,
                /* Now it's their responsibility to free lock & request! */
                req->extra_destructor = NULL;
                lock->callback(lock->ctdb_db, lock, data, private);
-               ctdb_free_lock(lock);
                return;
        }
 
@@ -738,7 +745,7 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        TDB_DATA data;
 
        if (holding_lock(ctdb_db->ctdb)) {
-               DEBUG(ctdb_db->ctdb, LOG_ERR,
+               DEBUG(ctdb_db->ctdb, LOG_ALERT,
                      "ctdb_readrecordlock_async: already holding lock");
                return false;
        }
@@ -755,12 +762,11 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        lock->key.dsize = key.dsize;
        lock->ctdb_db = ctdb_db;
        lock->hdr = NULL;
-       lock->held = false;
+       lock->held_magic = 0;
 
        /* Fast path. */
        if (try_readrecordlock(lock, &data)) {
                callback(ctdb_db, lock, data, cbdata);
-               ctdb_free_lock(lock);
                return true;
        }
 
@@ -770,7 +776,7 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
        if (!req) {
                DEBUG(ctdb_db->ctdb, LOG_ERR,
                      "ctdb_readrecordlock_async: allocation failed");
-               ctdb_destroy_lock(lock);
+               free_lock(lock);
                return NULL;
        }
        req->extra = lock;
@@ -794,9 +800,16 @@ ctdb_readrecordlock_async(struct ctdb_db *ctdb_db, TDB_DATA key,
 
 int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
 {
+       if (lock->held_magic != lock_magic(lock)) {
+               errno = EBADF;
+               DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
+                     "ctdb_writerecord: Can not write. Lock has been released.");
+               return -1;
+       }
+               
        if (lock->ctdb_db->persistent) {
                errno = EINVAL;
-               DEBUG(lock->ctdb_db->ctdb, LOG_ERR,
+               DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
                      "ctdb_writerecord: cannot write to persistent db");
                return -1;
        }