struct ctdb_db *ctdb_db;
TDB_DATA key;
- /* This will always be true by the time user sees this. */
- bool held;
+ /* This will always be set by the time user sees this. */
+ unsigned long held_magic;
struct ctdb_ltdb_header *hdr;
/* For convenience, we stash original callback here. */
/* Library user error if this isn't a reply to a call. */
if (req->hdr.hdr->operation != CTDB_REQ_CALL) {
errno = EINVAL;
- DEBUG(ctdb, LOG_ERR,
+ DEBUG(ctdb, LOG_ALERT,
"This was not a ctdbd call request: operation %u",
req->hdr.hdr->operation);
return NULL;
if (req->hdr.call->callid != callid) {
errno = EINVAL;
- DEBUG(ctdb, LOG_ERR,
+ DEBUG(ctdb, LOG_ALERT,
"This was not a ctdbd %u call request: %u",
callid, req->hdr.call->callid);
return NULL;
/* Library user error if this isn't a reply to a call. */
if (len < sizeof(*inhdr)) {
errno = EINVAL;
- DEBUG(ctdb, LOG_CRIT,
+ DEBUG(ctdb, LOG_ALERT,
"Short ctdbd control reply: %zu bytes", len);
return NULL;
}
if (req->hdr.hdr->operation != CTDB_REQ_CONTROL) {
errno = EINVAL;
- DEBUG(ctdb, LOG_ERR,
+ DEBUG(ctdb, LOG_ALERT,
"This was not a ctdbd control request: operation %u",
req->hdr.hdr->operation);
return NULL;
/* ... or if it was a different control from what we expected. */
if (req->hdr.control->opcode != control) {
errno = EINVAL;
- DEBUG(ctdb, LOG_ERR,
+ DEBUG(ctdb, LOG_ALERT,
"This was not an opcode %u ctdbd control request: %u",
control, req->hdr.control->opcode);
return NULL;
}
if (holding_lock(ctdb)) {
- DEBUG(ctdb, LOG_WARNING, "Do not block while holding lock!");
+ DEBUG(ctdb, LOG_ALERT, "Do not block while holding lock!");
}
if (revents & POLLOUT) {
return req;
}
-void ctdb_release_lock(struct ctdb_lock *lock)
+static unsigned long lock_magic(struct ctdb_lock *lock)
{
- if (lock->held) {
- tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
- DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
- "ctdb_release_lock %p", lock);
- lock->held = false;
- remove_lock(lock->ctdb_db->ctdb, lock);
- }
+ /* A non-zero magic specific to this structure. */
+ return ((unsigned long)lock->key.dptr
+ ^ (((unsigned long)lock->key.dptr) << 16)
+ ^ 0xBADC0FFEEBADC0DEULL)
+ | 1;
}
-static void ctdb_free_lock(struct ctdb_lock *lock)
+/* This is only called on locks before they're held. */
+static void free_lock(struct ctdb_lock *lock)
{
- if (lock->held) {
- errno = EEXIST;
- DEBUG(lock->ctdb_db->ctdb, LOG_ERR,
- "Lock freed before it was released");
- ctdb_release_lock(lock);
+ if (lock->held_magic) {
+ DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
+ "free_lock invalid lock %p", lock);
}
free(lock->hdr);
free(lock);
}
-static void ctdb_destroy_lock(struct ctdb_lock *lock)
+
+void ctdb_release_lock(struct ctdb_lock *lock)
{
- ctdb_release_lock(lock);
- ctdb_free_lock(lock);
+ if (lock->held_magic != lock_magic(lock)) {
+ DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
+ "ctdb_release_lock invalid lock %p", lock);
+ } else {
+ tdb_chainunlock(lock->ctdb_db->tdb, lock->key);
+ DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
+ "ctdb_release_lock %p", lock);
+ remove_lock(lock->ctdb_db->ctdb, lock);
+ }
+ lock->held_magic = 0;
+ free_lock(lock);
}
+
/* We keep the lock if local node is the dmaster. */
static bool try_readrecordlock(struct ctdb_lock *lock, TDB_DATA *data)
{
if (hdr && hdr->dmaster == lock->ctdb_db->ctdb->pnn) {
DEBUG(lock->ctdb_db->ctdb, LOG_DEBUG,
"ctdb_readrecordlock_async: got local lock");
- lock->held = true;
+ lock->held_magic = lock_magic(lock);
lock->hdr = hdr;
add_lock(lock->ctdb_db->ctdb, lock);
return true;
static void destroy_lock(struct ctdb_connection *ctdb,
struct ctdb_request *req)
{
- ctdb_destroy_lock(req->extra);
+ free_lock(req->extra);
}
static void readrecordlock_retry(struct ctdb_connection *ctdb,
/* Now it's their responsibility to free lock & request! */
req->extra_destructor = NULL;
lock->callback(lock->ctdb_db, lock, data, private);
- ctdb_free_lock(lock);
return;
}
TDB_DATA data;
if (holding_lock(ctdb_db->ctdb)) {
- DEBUG(ctdb_db->ctdb, LOG_ERR,
+ DEBUG(ctdb_db->ctdb, LOG_ALERT,
"ctdb_readrecordlock_async: already holding lock");
return false;
}
lock->key.dsize = key.dsize;
lock->ctdb_db = ctdb_db;
lock->hdr = NULL;
- lock->held = false;
+ lock->held_magic = 0;
/* Fast path. */
if (try_readrecordlock(lock, &data)) {
callback(ctdb_db, lock, data, cbdata);
- ctdb_free_lock(lock);
return true;
}
if (!req) {
DEBUG(ctdb_db->ctdb, LOG_ERR,
"ctdb_readrecordlock_async: allocation failed");
- ctdb_destroy_lock(lock);
+ free_lock(lock);
return NULL;
}
req->extra = lock;
int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data)
{
+ if (lock->held_magic != lock_magic(lock)) {
+ errno = EBADF;
+ DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
+ "ctdb_writerecord: Can not write. Lock has been released.");
+ return -1;
+ }
+
if (lock->ctdb_db->persistent) {
errno = EINVAL;
- DEBUG(lock->ctdb_db->ctdb, LOG_ERR,
+ DEBUG(lock->ctdb_db->ctdb, LOG_ALERT,
"ctdb_writerecord: cannot write to persistent db");
return -1;
}