ctdb-daemon: Use correct tdb flags when enabling robust mutex support
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_lock.c
index 5b183fc003648c5351f010ff13131fcd7c8551ec..7959d40fbfec98e12c78348b74ae64d5a8ff12a2 100644 (file)
@@ -22,7 +22,7 @@
 #include "include/ctdb_protocol.h"
 #include "tevent.h"
 #include "tdb.h"
-#include "db_wrap.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "system/filesys.h"
 #include "lib/util/dlinklist.h"
 
@@ -33,7 +33,7 @@
  * 2. Once the locks are obtained, signal parent process via fd.
  * 3. Invoke registered callback routine with locking status.
  * 4. If the child process cannot get locks within certain time,
- *    diagnose using /proc/locks and log warning message
+ *    execute an external script to debug.
  *
  * ctdb_lock_record()      - get a lock on a record
  * ctdb_lock_db()          - get a lock on a DB
@@ -43,9 +43,6 @@
  *  auto_mark              - whether to mark/unmark DBs in before/after callback
  */
 
-/* FIXME: Add a tunable max_lock_processes_per_db */
-#define MAX_LOCK_PROCESSES_PER_DB              (100)
-
 enum lock_type {
        LOCK_RECORD,
        LOCK_DB,
@@ -57,7 +54,7 @@ static const char * const lock_type_str[] = {
        "lock_record",
        "lock_db",
        "lock_alldb_prio",
-       "lock_db",
+       "lock_alldb",
 };
 
 struct lock_request;
@@ -71,19 +68,18 @@ struct lock_context {
        TDB_DATA key;
        uint32_t priority;
        bool auto_mark;
-       struct lock_request *req_queue;
+       struct lock_request *request;
        pid_t child;
        int fd[2];
        struct tevent_fd *tfd;
        struct tevent_timer *ttimer;
-       pid_t block_child;
-       int block_fd[2];
        struct timeval start_time;
+       uint32_t key_hash;
+       bool can_schedule;
 };
 
 /* lock_request is the client specific part for a lock request */
 struct lock_request {
-       struct lock_request *next, *prev;
        struct lock_context *lctx;
        void (*callback)(void *, bool);
        void *private_data;
@@ -156,63 +152,24 @@ static int ctdb_db_iterator(struct ctdb_context *ctdb, uint32_t priority,
 
 
 /*
- * lock all databases
+ * lock all databases - mark only
  */
-static int db_lock_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
-                          void *private_data)
+static int db_lock_mark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
+                               void *private_data)
 {
-       if (priority == 0) {
-               DEBUG(DEBUG_INFO, ("locking database %s\n",
-                                  ctdb_db->db_name));
-       } else {
-               DEBUG(DEBUG_INFO, ("locking database %s, priority:%u\n",
-                                  ctdb_db->db_name, priority));
-       }
+       int tdb_transaction_write_lock_mark(struct tdb_context *);
 
-       if (tdb_lockall(ctdb_db->ltdb->tdb) != 0) {
-               DEBUG(DEBUG_ERR, ("Failed to lock database %s\n",
+       DEBUG(DEBUG_INFO, ("marking locked database %s, priority:%u\n",
+                          ctdb_db->db_name, priority));
+
+       if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to mark (transaction lock) database %s\n",
                                  ctdb_db->db_name));
                return -1;
        }
 
-       return 0;
-}
-
-int ctdb_lockall_prio(struct ctdb_context *ctdb, uint32_t priority)
-{
-       return ctdb_db_iterator(ctdb, priority, db_lock_handler, NULL);
-}
-
-static int ctdb_lockall(struct ctdb_context *ctdb)
-{
-       uint32_t priority;
-
-       for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
-               if (ctdb_db_iterator(ctdb, priority, db_lock_handler, NULL) != 0) {
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-
-/*
- * unlock all databases
- */
-static int db_unlock_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
-                            void *private_data)
-{
-       if (priority == 0) {
-               DEBUG(DEBUG_INFO, ("unlocking database %s\n",
-                                  ctdb_db->db_name));
-       } else {
-               DEBUG(DEBUG_INFO, ("unlocking database %s, priority:%u\n",
-                                  ctdb_db->db_name, priority));
-       }
-
-       if (tdb_unlockall(ctdb_db->ltdb->tdb) != 0) {
-               DEBUG(DEBUG_ERR, ("Failed to unlock database %s\n",
+       if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to mark (all lock) database %s\n",
                                  ctdb_db->db_name));
                return -1;
        }
@@ -220,48 +177,8 @@ static int db_unlock_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
        return 0;
 }
 
-int ctdb_unlockall_prio(struct ctdb_context *ctdb, uint32_t priority)
-{
-       struct ctdb_db_context *ctdb_db;
-
-       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (ctdb_db->priority != priority) {
-                       continue;
-               }
-               DEBUG(DEBUG_INFO, ("unlocking database %s, priority:%u\n",
-                                  ctdb_db->db_name, priority));
-               if (tdb_unlockall(ctdb_db->ltdb->tdb) != 0) {
-                       DEBUG(DEBUG_ERR, ("Failed to unlock database %s\n",
-                                         ctdb_db->db_name));
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-static int ctdb_unlockall(struct ctdb_context *ctdb)
-{
-       uint32_t priority;
-
-       for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
-               if (ctdb_unlockall_prio(ctdb, priority) != 0) {
-                       return -1;
-               }
-       }
-
-       return 0;
-}
-
-
-/*
- * lock all databases - mark only
- */
 int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
 {
-       struct ctdb_db_context *ctdb_db;
-       int tdb_transaction_write_lock_mark(struct tdb_context *);
-
        /*
         * This function is only used by the main dameon during recovery.
         * At this stage, the databases have already been locked, by a
@@ -274,41 +191,7 @@ int ctdb_lockall_mark_prio(struct ctdb_context *ctdb, uint32_t priority)
                return -1;
        }
 
-       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (ctdb_db->priority != priority) {
-                       continue;
-               }
-               if (later_db(ctdb, ctdb_db->db_name)) {
-                       continue;
-               }
-               if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
-                       return -1;
-               }
-               if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
-                       /* FIXME: Shouldn't we unmark here? */
-                       return -1;
-               }
-       }
-
-       /* If priority != 1, later_db check is not required and can return */
-       if (priority != 1) {
-               return 0;
-       }
-
-       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (!later_db(ctdb, ctdb_db->db_name)) {
-                       continue;
-               }
-               if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
-                       return -1;
-               }
-               if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
-                       /* FIXME: Shouldn't we unmark here? */
-                       return -1;
-               }
-       }
-
-       return 0;
+       return ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL);
 }
 
 static int ctdb_lockall_mark(struct ctdb_context *ctdb)
@@ -316,7 +199,7 @@ static int ctdb_lockall_mark(struct ctdb_context *ctdb)
        uint32_t priority;
 
        for (priority=1; priority<=NUM_DB_PRIORITIES; priority++) {
-               if (ctdb_lockall_mark_prio(ctdb, priority) != 0) {
+               if (ctdb_db_iterator(ctdb, priority, db_lock_mark_handler, NULL) != 0) {
                        return -1;
                }
        }
@@ -328,13 +211,33 @@ static int ctdb_lockall_mark(struct ctdb_context *ctdb)
 /*
  * lock all databases - unmark only
  */
-int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
+static int db_lock_unmark_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
+                                 void *private_data)
 {
-       struct ctdb_db_context *ctdb_db;
        int tdb_transaction_write_lock_unmark(struct tdb_context *);
 
+       DEBUG(DEBUG_INFO, ("unmarking locked database %s, priority:%u\n",
+                          ctdb_db->db_name, priority));
+
+       if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to unmark (transaction lock) database %s\n",
+                                 ctdb_db->db_name));
+               return -1;
+       }
+
+       if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
+               DEBUG(DEBUG_ERR, ("Failed to unmark (all lock) database %s\n",
+                                 ctdb_db->db_name));
+               return -1;
+       }
+
+       return 0;
+}
+
+int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
+{
        /*
-        * This function is only used by the main dameon during recovery.
+        * This function is only used by the main daemon during recovery.
         * At this stage, the databases have already been locked, by a
         * dedicated child process. The freeze_mode variable is used to track
         * whether the actual locks are held by the child process or not.
@@ -345,27 +248,15 @@ int ctdb_lockall_unmark_prio(struct ctdb_context *ctdb, uint32_t priority)
                return -1;
        }
 
-       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-               if (ctdb_db->priority != priority) {
-                       continue;
-               }
-               if (tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb) != 0) {
-                       return -1;
-               }
-               if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
-                       return -1;
-               }
-       }
-
-       return 0;
+       return ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL);
 }
 
 static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
 {
        uint32_t priority;
 
-       for (priority=NUM_DB_PRIORITIES; priority>=0; priority--) {
-               if (ctdb_lockall_unmark_prio(ctdb, priority) != 0) {
+       for (priority=NUM_DB_PRIORITIES; priority>0; priority--) {
+               if (ctdb_db_iterator(ctdb, priority, db_lock_unmark_handler, NULL) != 0) {
                        return -1;
                }
        }
@@ -374,68 +265,6 @@ static int ctdb_lockall_unmark(struct ctdb_context *ctdb)
 }
 
 
-/*
- * Lock record / db depending on lock_ctx->type
- * Called from child context.
- */
-static bool ctdb_lock_item(struct lock_context *lock_ctx)
-{
-       bool status = false;
-
-       switch (lock_ctx->type) {
-       case LOCK_RECORD:
-               if (tdb_chainlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key) == 0) {
-                       status = true;
-               }
-               break;
-
-       case LOCK_DB:
-               if (tdb_lockall(lock_ctx->ctdb_db->ltdb->tdb) == 0) {
-                       status = true;
-               }
-               break;
-
-       case LOCK_ALLDB_PRIO:
-               if (ctdb_lockall_prio(lock_ctx->ctdb, lock_ctx->priority) == 0) {
-                       status = true;
-               }
-               break;
-
-       case LOCK_ALLDB:
-               if (ctdb_lockall(lock_ctx->ctdb) == 0) {
-                       status = true;
-               }
-               break;
-       }
-
-       return status;
-}
-
-
-/*
- * Unlock record / db depending on lock_ctx->type
- */
-void ctdb_unlock_item(struct lock_context *lock_ctx)
-{
-       switch (lock_ctx->type) {
-       case LOCK_RECORD:
-               tdb_chainunlock(lock_ctx->ctdb_db->ltdb->tdb, lock_ctx->key);
-               break;
-
-       case LOCK_DB:
-               tdb_unlockall(lock_ctx->ctdb_db->ltdb->tdb);
-               break;
-
-       case LOCK_ALLDB_PRIO:
-               ctdb_unlockall_prio(lock_ctx->ctdb, lock_ctx->priority);
-               break;
-
-       case LOCK_ALLDB:
-               ctdb_unlockall(lock_ctx->ctdb);
-               break;
-       }
-}
-
 static void ctdb_lock_schedule(struct ctdb_context *ctdb);
 
 /*
@@ -443,19 +272,31 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb);
  */
 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
 {
+       if (lock_ctx->request) {
+               lock_ctx->request->lctx = NULL;
+       }
        if (lock_ctx->child > 0) {
                ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
-               DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
-               lock_ctx->ctdb->lock_num_current--;
+               if (lock_ctx->type == LOCK_RECORD) {
+                       DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
+               } else {
+                       DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
+               }
+               if (lock_ctx->ctdb_db) {
+                       lock_ctx->ctdb_db->lock_num_current--;
+               }
                CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_current);
-               if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
+               if (lock_ctx->ctdb_db) {
                        CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
                }
        } else {
-               DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
-               lock_ctx->ctdb->lock_num_pending--;
+               if (lock_ctx->type == LOCK_RECORD) {
+                       DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
+               } else {
+                       DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
+               }
                CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
-               if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
+               if (lock_ctx->ctdb_db) {
                        CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
                }
        }
@@ -471,21 +312,10 @@ static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
  */
 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
 {
-       DLIST_REMOVE(lock_request->lctx->req_queue, lock_request);
+       TALLOC_FREE(lock_request->lctx);
        return 0;
 }
 
-
-void ctdb_lock_free_request_context(struct lock_request *lock_req)
-{
-       struct lock_context *lock_ctx;
-
-       lock_ctx = lock_req->lctx;
-       talloc_free(lock_req);
-       talloc_free(lock_ctx);
-}
-
-
 /*
  * Process all the callbacks waiting for lock
  *
@@ -493,7 +323,7 @@ void ctdb_lock_free_request_context(struct lock_request *lock_req)
  */
 static void process_callbacks(struct lock_context *lock_ctx, bool locked)
 {
-       struct lock_request *request, *next;
+       struct lock_request *request;
 
        if (lock_ctx->auto_mark && locked) {
                switch (lock_ctx->type) {
@@ -515,19 +345,12 @@ static void process_callbacks(struct lock_context *lock_ctx, bool locked)
                }
        }
 
-       /* Iterate through all callbacks */
-       request = lock_ctx->req_queue;
-       while (request) {
-               if (lock_ctx->auto_mark) {
-                       /* Reset the destructor, so request is not removed from the list */
-                       talloc_set_destructor(request, NULL);
-               }
-
-               /* In case, callback frees the request, store next */
-               next = request->next;
-               request->callback(request->private_data, locked);
-               request = next;
+       request = lock_ctx->request;
+       if (lock_ctx->auto_mark) {
+               /* Reset the destructor, so request is not removed from the list */
+               talloc_set_destructor(request, NULL);
        }
+       request->callback(request->private_data, locked);
 
        if (lock_ctx->auto_mark && locked) {
                switch (lock_ctx->type) {
@@ -553,27 +376,31 @@ static void process_callbacks(struct lock_context *lock_ctx, bool locked)
 
 static int lock_bucket_id(double t)
 {
-       double us = 1.e-6, ms = 1.e-3, s = 1;
+       double ms = 1.e-3, s = 1;
        int id;
 
-       if (t < 1*us) {
+       if (t < 1*ms) {
                id = 0;
-       } else if (t < 10*us) {
+       } else if (t < 10*ms) {
                id = 1;
-       } else if (t < 100*us) {
+       } else if (t < 100*ms) {
                id = 2;
-       } else if (t < 1*ms) {
+       } else if (t < 1*s) {
                id = 3;
-       } else if (t < 10*ms) {
+       } else if (t < 2*s) {
                id = 4;
-       } else if (t < 100*ms) {
+       } else if (t < 4*s) {
                id = 5;
-       } else if (t < 1*s) {
+       } else if (t < 8*s) {
                id = 6;
-       } else if (t < 10*s) {
+       } else if (t < 16*s) {
                id = 7;
-       } else {
+       } else if (t < 32*s) {
                id = 8;
+       } else if (t < 64*s) {
+               id = 9;
+       } else {
+               id = 10;
        }
 
        return id;
@@ -589,7 +416,7 @@ static void ctdb_lock_handler(struct tevent_context *ev,
                            void *private_data)
 {
        struct lock_context *lock_ctx;
-       TALLOC_CTX *tmp_ctx;
+       TALLOC_CTX *tmp_ctx = NULL;
        char c;
        bool locked;
        double t;
@@ -598,9 +425,7 @@ static void ctdb_lock_handler(struct tevent_context *ev,
        lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
 
        /* cancel the timeout event */
-       if (lock_ctx->ttimer) {
-               TALLOC_FREE(lock_ctx->ttimer);
-       }
+       TALLOC_FREE(lock_ctx->ttimer);
 
        t = timeval_elapsed(&lock_ctx->start_time);
        id = lock_bucket_id(t);
@@ -611,22 +436,25 @@ static void ctdb_lock_handler(struct tevent_context *ev,
        }
 
        /* Read the status from the child process */
-       read(lock_ctx->fd[0], &c, 1);
-       locked = (c == 0 ? true : false);
+       if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
+               locked = false;
+       } else {
+               locked = (c == 0 ? true : false);
+       }
 
        /* Update statistics */
-       CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
        CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_calls);
        if (lock_ctx->ctdb_db) {
-               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
                CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_calls);
        }
 
        if (locked) {
-               CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
-               CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
                if (lock_ctx->ctdb_db) {
-                       CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
+                       CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.buckets[id]);
+                       CTDB_UPDATE_LATENCY(lock_ctx->ctdb, lock_ctx->ctdb_db,
+                                           lock_type_str[lock_ctx->type], locks.latency,
+                                           lock_ctx->start_time);
+
                        CTDB_UPDATE_DB_LATENCY(lock_ctx->ctdb_db, lock_type_str[lock_ctx->type], locks.latency, t);
                        CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.buckets[id]);
                }
@@ -645,8 +473,6 @@ static void ctdb_lock_handler(struct tevent_context *ev,
 }
 
 
-static void ctdb_lock_find_blocker(struct lock_context *lock_ctx);
-
 /*
  * Callback routine when required locks are not obtained within timeout
  * Called from parent context
@@ -656,15 +482,51 @@ static void ctdb_lock_timeout_handler(struct tevent_context *ev,
                                    struct timeval current_time,
                                    void *private_data)
 {
+       static const char * debug_locks = NULL;
        struct lock_context *lock_ctx;
        struct ctdb_context *ctdb;
+       pid_t pid;
 
        lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
        ctdb = lock_ctx->ctdb;
 
-       /* fire a child process to find the blocking process */
-       if (lock_ctx->block_child == -1) {
-               ctdb_lock_find_blocker(lock_ctx);
+       /* If a node stopped/banned, don't spam the logs */
+       if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
+               lock_ctx->ttimer = NULL;
+               return;
+       }
+       if (lock_ctx->ctdb_db) {
+               DEBUG(DEBUG_WARNING,
+                     ("Unable to get %s lock on database %s for %.0lf seconds\n",
+                      (lock_ctx->type == LOCK_RECORD ? "RECORD" : "DB"),
+                      lock_ctx->ctdb_db->db_name,
+                      timeval_elapsed(&lock_ctx->start_time)));
+       } else {
+               DEBUG(DEBUG_WARNING,
+                     ("Unable to get ALLDB locks for %.0lf seconds\n",
+                      timeval_elapsed(&lock_ctx->start_time)));
+       }
+
+       /* Fire a child process to find the blocking process. */
+       if (debug_locks == NULL) {
+               debug_locks = getenv("CTDB_DEBUG_LOCKS");
+               if (debug_locks == NULL) {
+                       debug_locks = talloc_asprintf(ctdb,
+                                                     "%s/debug_locks.sh",
+                                                     getenv("CTDB_BASE"));
+               }
+       }
+       if (debug_locks != NULL) {
+               pid = vfork();
+               if (pid == 0) {
+                       execl(debug_locks, debug_locks, NULL);
+                       _exit(0);
+               }
+               ctdb_track_child(ctdb, pid);
+       } else {
+               DEBUG(DEBUG_WARNING,
+                     (__location__
+                      " Unable to setup lock debugging - no memory?\n"));
        }
 
        /* reset the timeout timer */
@@ -677,74 +539,227 @@ static void ctdb_lock_timeout_handler(struct tevent_context *ev,
 }
 
 
-static char *lock_child_log_prefix(struct lock_context *lock_ctx)
+static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
+                           void *private_data)
 {
-       char *prefix;
-       pid_t pid;
+       int *count = (int *)private_data;
+
+       (*count) += 2;
 
-       pid = getpid();
+       return 0;
+}
+
+static int db_flags(struct ctdb_db_context *ctdb_db)
+{
+       int tdb_flags = TDB_DEFAULT;
+
+#ifdef TDB_MUTEX_LOCKING
+       if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
+               tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+       }
+#endif
+       return tdb_flags;
+}
+
+struct db_namelist {
+       const char **names;
+       int n;
+};
+
+static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
+                          void *private_data)
+{
+       struct db_namelist *list = (struct db_namelist *)private_data;
+
+       list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
+       list->names[list->n+1] = talloc_asprintf(list->names, "0x%x",
+                                                db_flags(ctdb_db));
+       list->n += 2;
+
+       return 0;
+}
+
+static bool lock_helper_args(TALLOC_CTX *mem_ctx,
+                            struct lock_context *lock_ctx, int fd,
+                            int *argc, const char ***argv)
+{
+       struct ctdb_context *ctdb = lock_ctx->ctdb;
+       const char **args = NULL;
+       int nargs, i;
+       int priority;
+       struct db_namelist list;
 
        switch (lock_ctx->type) {
        case LOCK_RECORD:
-               prefix = talloc_asprintf(NULL, "lockR(%d): ", pid);
+               nargs = 6;
                break;
 
        case LOCK_DB:
-               prefix = talloc_asprintf(NULL, "lockD(%d): ", pid);
+               nargs = 5;
                break;
 
        case LOCK_ALLDB_PRIO:
-               prefix = talloc_asprintf(NULL, "lockP(%d): ", pid);
+               nargs = 3;
+               ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
                break;
 
        case LOCK_ALLDB:
-               prefix = talloc_asprintf(NULL, "lockA(%d): ", pid);
+               nargs = 3;
+               for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
+                       ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
+               }
                break;
        }
 
-       return prefix;
-}
+       /* Add extra argument for null termination */
+       nargs++;
+
+       args = talloc_array(mem_ctx, const char *, nargs);
+       if (args == NULL) {
+               return false;
+       }
+
+       args[0] = talloc_asprintf(args, "%d", getpid());
+       args[1] = talloc_asprintf(args, "%d", fd);
+
+       switch (lock_ctx->type) {
+       case LOCK_RECORD:
+               args[2] = talloc_strdup(args, "RECORD");
+               args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                                         db_flags(lock_ctx->ctdb_db));
+               if (lock_ctx->key.dsize == 0) {
+                       args[5] = talloc_strdup(args, "NULL");
+               } else {
+                       args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
+               }
+               break;
+
+       case LOCK_DB:
+               args[2] = talloc_strdup(args, "DB");
+               args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                                         db_flags(lock_ctx->ctdb_db));
+               break;
+
+       case LOCK_ALLDB_PRIO:
+               args[2] = talloc_strdup(args, "DB");
+               list.names = args;
+               list.n = 3;
+               ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
+               break;
+
+       case LOCK_ALLDB:
+               args[2] = talloc_strdup(args, "DB");
+               list.names = args;
+               list.n = 3;
+               for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
+                       ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
+               }
+               break;
+       }
+
+       /* Make sure last argument is NULL */
+       args[nargs-1] = NULL;
 
+       for (i=0; i<nargs-1; i++) {
+               if (args[i] == NULL) {
+                       talloc_free(args);
+                       return false;
+               }
+       }
+
+       *argc = nargs;
+       *argv = args;
+       return true;
+}
 
 /*
- * Schedule a new lock child process
- * Set up callback handler and timeout handler
+ * Find a lock request that can be scheduled
  */
-static void ctdb_lock_schedule(struct ctdb_context *ctdb)
+static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
 {
        struct lock_context *lock_ctx, *next_ctx;
-       int ret;
-       pid_t parent;
+       struct ctdb_db_context *ctdb_db;
 
-       if (ctdb->lock_num_current >= MAX_LOCK_PROCESSES_PER_DB) {
-               return;
-       }
+       /* First check if there are database lock requests */
 
-       if (ctdb->lock_pending == NULL) {
-               return;
+       for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
+            lock_ctx = next_ctx) {
+
+               if (lock_ctx->request != NULL) {
+                       /* Found a lock context with a request */
+                       return lock_ctx;
+               }
+
+               next_ctx = lock_ctx->next;
+
+               DEBUG(DEBUG_INFO, ("Removing lock context without lock "
+                                  "request\n"));
+               DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
+               CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
+               if (lock_ctx->ctdb_db) {
+                       CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
+                                              locks.num_pending);
+               }
+               talloc_free(lock_ctx);
        }
 
-       /* Find a lock context with requests */
-       lock_ctx = ctdb->lock_pending;
-       while (lock_ctx != NULL) {
-               if (! lock_ctx->req_queue) {
+       /* Next check database queues */
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               if (ctdb_db->lock_num_current ==
+                   ctdb->tunable.lock_processes_per_db) {
+                       continue;
+               }
+
+               for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
+                    lock_ctx = next_ctx) {
+
                        next_ctx = lock_ctx->next;
-                       DEBUG(DEBUG_INFO, ("Removing lock context without lock requests\n"));
-                       DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
-                       ctdb->lock_num_pending--;
-                       CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
-                       if (lock_ctx->ctdb_db) {
-                               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
+
+                       if (lock_ctx->request != NULL) {
+                               return lock_ctx;
                        }
+
+                       DEBUG(DEBUG_INFO, ("Removing lock context without "
+                                          "lock request\n"));
+                       DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
+                       CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
+                       CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
                        talloc_free(lock_ctx);
-                       lock_ctx = next_ctx;
-                       continue;
+               }
+       }
+
+       return NULL;
+}
+
+/*
+ * Schedule a new lock child process
+ * Set up callback handler and timeout handler
+ */
+static void ctdb_lock_schedule(struct ctdb_context *ctdb)
+{
+       struct lock_context *lock_ctx;
+       int ret, argc;
+       TALLOC_CTX *tmp_ctx;
+       const char *helper = CTDB_HELPER_BINDIR "/ctdb_lock_helper";
+       static const char *prog = NULL;
+       const char **args;
+
+       if (prog == NULL) {
+               const char *t;
+
+               t = getenv("CTDB_LOCK_HELPER");
+               if (t != NULL) {
+                       prog = talloc_strdup(ctdb, t);
                } else {
-                       /* Found a lock context with lock requests */
-                       break;
+                       prog = talloc_strdup(ctdb, helper);
                }
+               CTDB_NO_MEMORY_VOID(ctdb, prog);
        }
 
+       /* Find a lock context with requests */
+       lock_ctx = ctdb_find_lock_context(ctdb);
        if (lock_ctx == NULL) {
                return;
        }
@@ -756,41 +771,44 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                return;
        }
 
-       parent = getpid();
-       lock_ctx->child = ctdb_fork(ctdb);
+       set_close_on_exec(lock_ctx->fd[0]);
 
-       if (lock_ctx->child == (pid_t)-1) {
-               DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
+       /* Create data for child process */
+       tmp_ctx = talloc_new(lock_ctx);
+       if (tmp_ctx == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate memory for helper args\n"));
                close(lock_ctx->fd[0]);
                close(lock_ctx->fd[1]);
                return;
        }
 
-       /* Child process */
-       if (lock_ctx->child == 0) {
-               char c;
+       /* Create arguments for lock helper */
+       if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
+                             &argc, &args)) {
+               DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
                close(lock_ctx->fd[0]);
-               debug_extra = lock_child_log_prefix(lock_ctx);
-               if (ctdb_lock_item(lock_ctx)) {
-                       c = 0;
-               } else {
-                       c = 1;
-               }
-               write(lock_ctx->fd[1], &c, 1);
+               close(lock_ctx->fd[1]);
+               talloc_free(tmp_ctx);
+               return;
+       }
 
-               /* Hang around, but if parent dies, terminate */
-               while (kill(parent, 0) == 0 || errno != ESRCH) {
-                       sleep(5);
-               }
-               _exit(0);
+       if (!ctdb_vfork_with_logging(lock_ctx, ctdb, "lock_helper",
+                                    prog, argc, (const char **)args,
+                                    NULL, NULL, &lock_ctx->child)) {
+               DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
+               close(lock_ctx->fd[0]);
+               close(lock_ctx->fd[1]);
+               talloc_free(tmp_ctx);
+               return;
        }
 
        /* Parent process */
        close(lock_ctx->fd[1]);
-       set_close_on_exec(lock_ctx->fd[0]);
 
        talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
 
+       talloc_free(tmp_ctx);
+
        /* Set up timeout handler */
        lock_ctx->ttimer = tevent_add_timer(ctdb->ev,
                                            lock_ctx,
@@ -823,70 +841,28 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
        tevent_fd_set_auto_close(lock_ctx->tfd);
 
        /* Move the context from pending to current */
-       DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
-       ctdb->lock_num_pending--;
-       DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
-       ctdb->lock_num_current++;
-}
-
-
-/*
- * Find the lock context of a given type
- */
-static struct lock_context *find_lock_context(struct lock_context *lock_list,
-                                             struct ctdb_db_context *ctdb_db,
-                                             TDB_DATA key,
-                                             uint32_t priority,
-                                             enum lock_type type)
-{
-       struct lock_context *lock_ctx;
-
-       /* Search active locks */
-       for (lock_ctx=lock_list; lock_ctx; lock_ctx=lock_ctx->next) {
-               if (lock_ctx->type != type) {
-                       continue;
-               }
-
-               switch (lock_ctx->type) {
-               case LOCK_RECORD:
-                       if (ctdb_db == lock_ctx->ctdb_db &&
-                           key.dsize == lock_ctx->key.dsize &&
-                           memcmp(key.dptr, lock_ctx->key.dptr, key.dsize) == 0) {
-                               goto done;
-                       }
-                       break;
-
-               case LOCK_DB:
-                       if (ctdb_db == lock_ctx->ctdb_db) {
-                               goto done;
-                       }
-                       break;
-
-               case LOCK_ALLDB_PRIO:
-                       if (priority == lock_ctx->priority) {
-                               goto done;
-                       }
-                       break;
-
-               case LOCK_ALLDB:
-                       goto done;
-                       break;
-               }
+       if (lock_ctx->type == LOCK_RECORD) {
+               DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
+               DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx, NULL);
+       } else {
+               DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
+               DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
+       }
+       CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
+       CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
+       if (lock_ctx->ctdb_db) {
+               lock_ctx->ctdb_db->lock_num_current++;
+               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
+               CTDB_INCREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
        }
-
-       /* Did not find the lock context we are searching for */
-       lock_ctx = NULL;
-
-done:
-       return lock_ctx;
-
 }
 
 
 /*
  * Lock record / db depending on type
  */
-static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
+static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
+                                              struct ctdb_context *ctdb,
                                               struct ctdb_db_context *ctdb_db,
                                               TDB_DATA key,
                                               uint32_t priority,
@@ -895,7 +871,7 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
                                               enum lock_type type,
                                               bool auto_mark)
 {
-       struct lock_context *lock_ctx;
+       struct lock_context *lock_ctx = NULL;
        struct lock_request *request;
 
        if (callback == NULL) {
@@ -903,54 +879,59 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
                return NULL;
        }
 
-       /* get a context for this key - search only the pending contexts,
-        * current contexts might in the middle of processing callbacks */
-       lock_ctx = find_lock_context(ctdb->lock_pending, ctdb_db, key, priority, type);
-
-       /* No existing context, create one */
+       lock_ctx = talloc_zero(ctdb, struct lock_context);
        if (lock_ctx == NULL) {
-               lock_ctx = talloc_zero(ctdb, struct lock_context);
-               if (lock_ctx == NULL) {
-                       DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
-                       return NULL;
-               }
+               DEBUG(DEBUG_ERR, ("Failed to create a new lock context\n"));
+               return NULL;
+       }
 
-               lock_ctx->type = type;
-               lock_ctx->ctdb = ctdb;
-               lock_ctx->ctdb_db = ctdb_db;
-               lock_ctx->key.dsize = key.dsize;
-               if (key.dsize > 0) {
-                       lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
-               } else {
-                       lock_ctx->key.dptr = NULL;
+       if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
+               talloc_free(lock_ctx);
+               return NULL;
+       }
+
+       lock_ctx->type = type;
+       lock_ctx->ctdb = ctdb;
+       lock_ctx->ctdb_db = ctdb_db;
+       lock_ctx->key.dsize = key.dsize;
+       if (key.dsize > 0) {
+               lock_ctx->key.dptr = talloc_memdup(lock_ctx, key.dptr, key.dsize);
+               if (lock_ctx->key.dptr == NULL) {
+                       DEBUG(DEBUG_ERR, (__location__ "Memory allocation error\n"));
+                       talloc_free(lock_ctx);
+                       return NULL;
                }
-               lock_ctx->priority = priority;
-               lock_ctx->auto_mark = auto_mark;
+               lock_ctx->key_hash = ctdb_hash(&key);
+       } else {
+               lock_ctx->key.dptr = NULL;
+       }
+       lock_ctx->priority = priority;
+       lock_ctx->auto_mark = auto_mark;
 
-               lock_ctx->child = -1;
-               lock_ctx->block_child = -1;
+       lock_ctx->request = request;
+       lock_ctx->child = -1;
 
+       /* Non-record locks are required by recovery and should be scheduled
+        * immediately, so keep them at the head of the pending queue.
+        */
+       if (lock_ctx->type == LOCK_RECORD) {
+               DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx, NULL);
+       } else {
                DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
-               ctdb->lock_num_pending++;
-               CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
-               if (ctdb_db) {
-                       CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
-               }
-
-               /* Start the timer when we activate the context */
-               lock_ctx->start_time = timeval_current();
        }
-
-       if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
-               return NULL;
+       CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
+       if (ctdb_db) {
+               CTDB_INCREMENT_DB_STAT(ctdb_db, locks.num_pending);
        }
 
+       /* Start the timer when we activate the context */
+       lock_ctx->start_time = timeval_current();
+
        request->lctx = lock_ctx;
        request->callback = callback;
        request->private_data = private_data;
 
        talloc_set_destructor(request, ctdb_lock_request_destructor);
-       DLIST_ADD_END(lock_ctx->req_queue, request, NULL);
 
        ctdb_lock_schedule(ctdb);
 
@@ -961,13 +942,15 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
 /*
  * obtain a lock on a record in a database
  */
-struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
+struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
+                                     struct ctdb_db_context *ctdb_db,
                                      TDB_DATA key,
                                      bool auto_mark,
                                      void (*callback)(void *, bool),
                                      void *private_data)
 {
-       return ctdb_lock_internal(ctdb_db->ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb_db->ctdb,
                                  ctdb_db,
                                  key,
                                  0,
@@ -981,12 +964,14 @@ struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
 /*
  * obtain a lock on a database
  */
-struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
+struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
+                                 struct ctdb_db_context *ctdb_db,
                                  bool auto_mark,
                                  void (*callback)(void *, bool),
                                  void *private_data)
 {
-       return ctdb_lock_internal(ctdb_db->ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb_db->ctdb,
                                  ctdb_db,
                                  tdb_null,
                                  0,
@@ -1000,18 +985,20 @@ struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
 /*
  * obtain locks on all databases of specified priority
  */
-struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
+struct lock_request *ctdb_lock_alldb_prio(TALLOC_CTX *mem_ctx,
+                                         struct ctdb_context *ctdb,
                                          uint32_t priority,
                                          bool auto_mark,
                                          void (*callback)(void *, bool),
                                          void *private_data)
 {
-       if (priority < 0 || priority > NUM_DB_PRIORITIES) {
+       if (priority < 1 || priority > NUM_DB_PRIORITIES) {
                DEBUG(DEBUG_ERR, ("Invalid db priority: %u\n", priority));
                return NULL;
        }
 
-       return ctdb_lock_internal(ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb,
                                  NULL,
                                  tdb_null,
                                  priority,
@@ -1025,12 +1012,14 @@ struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
 /*
  * obtain locks on all databases
  */
-struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
+struct lock_request *ctdb_lock_alldb(TALLOC_CTX *mem_ctx,
+                                    struct ctdb_context *ctdb,
                                     bool auto_mark,
                                     void (*callback)(void *, bool),
                                     void *private_data)
 {
-       return ctdb_lock_internal(ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb,
                                  NULL,
                                  tdb_null,
                                  0,
@@ -1040,151 +1029,3 @@ struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
                                  auto_mark);
 }
 
-/*
- * Callback routine to read the PID of blocking process from the child and log
- *
- */
-void ctdb_lock_blocked_handler(struct tevent_context *ev,
-                               struct tevent_fd *tfd,
-                               uint16_t flags,
-                               void *private_data)
-{
-       struct lock_context *lock_ctx;
-       pid_t blocker_pid = -1;
-       char *process_name = NULL;
-       const char *db_name = NULL;
-       ino_t inode;
-       struct ctdb_db_context *ctdb_db;
-       int fd;
-       struct stat stat_buf;
-
-       lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
-
-       if (read(lock_ctx->block_fd[0], &blocker_pid, sizeof(blocker_pid)) != sizeof(blocker_pid)) {
-               DEBUG(DEBUG_ERR, ("Error reading blocker process pid from child\n"));
-               goto failed;
-       }
-       if (read(lock_ctx->block_fd[0], &inode, sizeof(inode)) != sizeof(inode)) {
-               DEBUG(DEBUG_ERR, ("Error reading blocked inode from child\n"));
-               goto failed;
-       }
-
-       if (blocker_pid < 0) {
-               goto failed;
-       }
-
-       process_name = ctdb_get_process_name(blocker_pid);
-
-       if (lock_ctx->type == LOCK_RECORD || lock_ctx->type == LOCK_DB) {
-               db_name = lock_ctx->ctdb_db->ltdb->name;
-       } else {
-               for (ctdb_db = lock_ctx->ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
-                       fd = tdb_fd(ctdb_db->ltdb->tdb);
-                       if (fstat(fd, &stat_buf) == 0) {
-                               if (stat_buf.st_ino == inode) {
-                                       db_name = ctdb_db->ltdb->name;
-                                       break;
-                               }
-                       }
-               }
-       }
-
-       if (db_name) {
-               DEBUG(DEBUG_WARNING,
-                     ("Process (pid=%d) blocked in locking\n", lock_ctx->child));
-               DEBUG(DEBUG_WARNING,
-                     ("Process %s (pid=%d) locked database %s (inode %lu) for %.0lf seconds\n",
-                      (process_name ? process_name : "unknown"),
-                      blocker_pid, db_name, (unsigned long)inode,
-                      timeval_elapsed(&lock_ctx->start_time)));
-       } else {
-               DEBUG(DEBUG_WARNING,
-                     ("Process %s (pid=%d) locked database (inode %lu) for %.0lf seconds\n",
-                      (process_name ? process_name : "unknown"),
-                      blocker_pid, (unsigned long)inode,
-                      timeval_elapsed(&lock_ctx->start_time)));
-       }
-
-       /*
-        * If ctdb is blocked by smbd for deadlock_interval, detect it as a deadlock
-        * and kill smbd process.
-        */
-       if (lock_ctx->ctdb->tunable.deadlock_timeout > 0 &&
-           timeval_elapsed(&lock_ctx->start_time) > lock_ctx->ctdb->tunable.deadlock_timeout &&
-           process_name && strstr(process_name, "smbd")) {
-               DEBUG(DEBUG_WARNING,
-                     ("Deadlock detected. Killing smbd process (pid=%d)", blocker_pid));
-               kill(blocker_pid, SIGKILL);
-       }
-
-       free(process_name);
-
-failed:
-       if (lock_ctx->block_child > 0) {
-               ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
-       }
-       lock_ctx->block_child = -1;
-       talloc_free(tfd);
-}
-
-
-/*
- * Find processes that holds lock we are interested in
- */
-void ctdb_lock_find_blocker(struct lock_context *lock_ctx)
-{
-       struct tevent_fd *tfd;
-       pid_t parent;
-
-       if (pipe(lock_ctx->block_fd) < 0) {
-               return;
-       }
-
-       parent = getpid();
-
-       lock_ctx->block_child = ctdb_fork(lock_ctx->ctdb);
-       if (lock_ctx->block_child == -1) {
-               close(lock_ctx->block_fd[0]);
-               close(lock_ctx->block_fd[1]);
-               return;
-       }
-
-       /* Child process */
-       if (lock_ctx->block_child == 0) {
-               struct ctdb_lock_info reqlock;
-               pid_t blocker_pid = -1;
-               bool status;
-
-               close(lock_ctx->block_fd[0]);
-               if (ctdb_get_lock_info(lock_ctx->child, &reqlock)) {
-                       status = ctdb_get_blocker_pid(&reqlock, &blocker_pid);
-                       if (!status) {
-                               /* Could not find blocker pid */
-                               blocker_pid = -2;
-                       }
-               }
-               write(lock_ctx->block_fd[1], &blocker_pid, sizeof(blocker_pid));
-               write(lock_ctx->block_fd[1], &reqlock.inode, sizeof(reqlock.inode));
-
-               /* Hang around till parent dies */
-               while (kill(parent, 0) == 0 || errno != ESRCH) {
-                       sleep(5);
-               }
-               _exit(0);
-       }
-
-       /* Parent process */
-       close(lock_ctx->block_fd[1]);
-       set_close_on_exec(lock_ctx->block_fd[0]);
-
-       tfd = tevent_add_fd(lock_ctx->ctdb->ev,
-                               lock_ctx,
-                               lock_ctx->block_fd[0],
-                               EVENT_FD_READ,
-                               ctdb_lock_blocked_handler,
-                               (void *)lock_ctx);
-       if (tfd == NULL) {
-               ctdb_kill(lock_ctx->ctdb, lock_ctx->block_child, SIGKILL);
-               close(lock_ctx->block_fd[0]);
-       }
-}