ctdb-daemon: Use correct tdb flags when enabling robust mutex support
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_lock.c
index 174779d66cc26c099cc7a1325a6cd574e5d41ff2..7959d40fbfec98e12c78348b74ae64d5a8ff12a2 100644 (file)
@@ -22,7 +22,7 @@
 #include "include/ctdb_protocol.h"
 #include "tevent.h"
 #include "tdb.h"
-#include "db_wrap.h"
+#include "lib/tdb_wrap/tdb_wrap.h"
 #include "system/filesys.h"
 #include "lib/util/dlinklist.h"
 
@@ -33,7 +33,7 @@
  * 2. Once the locks are obtained, signal parent process via fd.
  * 3. Invoke registered callback routine with locking status.
  * 4. If the child process cannot get locks within certain time,
- *    diagnose using /proc/locks and log warning message
+ *    execute an external script to debug.
  *
  * ctdb_lock_record()      - get a lock on a record
  * ctdb_lock_db()          - get a lock on a DB
@@ -272,9 +272,16 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb);
  */
 static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
 {
+       if (lock_ctx->request) {
+               lock_ctx->request->lctx = NULL;
+       }
        if (lock_ctx->child > 0) {
                ctdb_kill(lock_ctx->ctdb, lock_ctx->child, SIGKILL);
-               DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
+               if (lock_ctx->type == LOCK_RECORD) {
+                       DLIST_REMOVE(lock_ctx->ctdb_db->lock_current, lock_ctx);
+               } else {
+                       DLIST_REMOVE(lock_ctx->ctdb->lock_current, lock_ctx);
+               }
                if (lock_ctx->ctdb_db) {
                        lock_ctx->ctdb_db->lock_num_current--;
                }
@@ -283,7 +290,11 @@ static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
                        CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_current);
                }
        } else {
-               DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
+               if (lock_ctx->type == LOCK_RECORD) {
+                       DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
+               } else {
+                       DLIST_REMOVE(lock_ctx->ctdb->lock_pending, lock_ctx);
+               }
                CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
                if (lock_ctx->ctdb_db) {
                        CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
@@ -301,20 +312,10 @@ static int ctdb_lock_context_destructor(struct lock_context *lock_ctx)
  */
 static int ctdb_lock_request_destructor(struct lock_request *lock_request)
 {
-       lock_request->lctx->request = NULL;
+       TALLOC_FREE(lock_request->lctx);
        return 0;
 }
 
-void ctdb_lock_free_request_context(struct lock_request *lock_req)
-{
-       struct lock_context *lock_ctx;
-
-       lock_ctx = lock_req->lctx;
-       talloc_free(lock_req);
-       talloc_free(lock_ctx);
-}
-
-
 /*
  * Process all the callbacks waiting for lock
  *
@@ -424,9 +425,7 @@ static void ctdb_lock_handler(struct tevent_context *ev,
        lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
 
        /* cancel the timeout event */
-       if (lock_ctx->ttimer) {
-               TALLOC_FREE(lock_ctx->ttimer);
-       }
+       TALLOC_FREE(lock_ctx->ttimer);
 
        t = timeval_elapsed(&lock_ctx->start_time);
        id = lock_bucket_id(t);
@@ -437,7 +436,7 @@ static void ctdb_lock_handler(struct tevent_context *ev,
        }
 
        /* Read the status from the child process */
-       if (read(lock_ctx->fd[0], &c, 1) != 1) {
+       if (sys_read(lock_ctx->fd[0], &c, 1) != 1) {
                locked = false;
        } else {
                locked = (c == 0 ? true : false);
@@ -491,6 +490,11 @@ static void ctdb_lock_timeout_handler(struct tevent_context *ev,
        lock_ctx = talloc_get_type_abort(private_data, struct lock_context);
        ctdb = lock_ctx->ctdb;
 
+       /* If a node stopped/banned, don't spam the logs */
+       if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
+               lock_ctx->ttimer = NULL;
+               return;
+       }
        if (lock_ctx->ctdb_db) {
                DEBUG(DEBUG_WARNING,
                      ("Unable to get %s lock on database %s for %.0lf seconds\n",
@@ -540,13 +544,25 @@ static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
 {
        int *count = (int *)private_data;
 
-       (*count)++;
+       (*count) += 2;
 
        return 0;
 }
 
+static int db_flags(struct ctdb_db_context *ctdb_db)
+{
+       int tdb_flags = TDB_DEFAULT;
+
+#ifdef TDB_MUTEX_LOCKING
+       if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
+               tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+       }
+#endif
+       return tdb_flags;
+}
+
 struct db_namelist {
-       char **names;
+       const char **names;
        int n;
 };
 
@@ -556,15 +572,19 @@ static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
        struct db_namelist *list = (struct db_namelist *)private_data;
 
        list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
-       list->n++;
+       list->names[list->n+1] = talloc_asprintf(list->names, "0x%x",
+                                                db_flags(ctdb_db));
+       list->n += 2;
 
        return 0;
 }
 
-static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ctx, int fd)
+static bool lock_helper_args(TALLOC_CTX *mem_ctx,
+                            struct lock_context *lock_ctx, int fd,
+                            int *argc, const char ***argv)
 {
        struct ctdb_context *ctdb = lock_ctx->ctdb;
-       char **args = NULL;
+       const char **args = NULL;
        int nargs, i;
        int priority;
        struct db_namelist list;
@@ -579,12 +599,12 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
                break;
 
        case LOCK_ALLDB_PRIO:
-               nargs = 4;
+               nargs = 3;
                ctdb_db_iterator(ctdb, lock_ctx->priority, db_count_handler, &nargs);
                break;
 
        case LOCK_ALLDB:
-               nargs = 4;
+               nargs = 3;
                for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
                        ctdb_db_iterator(ctdb, priority, db_count_handler, &nargs);
                }
@@ -594,19 +614,20 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
        /* Add extra argument for null termination */
        nargs++;
 
-       args = talloc_array(mem_ctx, char *, nargs);
+       args = talloc_array(mem_ctx, const char *, nargs);
        if (args == NULL) {
-               return NULL;
+               return false;
        }
 
-       args[0] = talloc_strdup(args, "ctdb_lock_helper");
-       args[1] = talloc_asprintf(args, "%d", getpid());
-       args[2] = talloc_asprintf(args, "%d", fd);
+       args[0] = talloc_asprintf(args, "%d", getpid());
+       args[1] = talloc_asprintf(args, "%d", fd);
 
        switch (lock_ctx->type) {
        case LOCK_RECORD:
-               args[3] = talloc_strdup(args, "RECORD");
-               args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[2] = talloc_strdup(args, "RECORD");
+               args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                                         db_flags(lock_ctx->ctdb_db));
                if (lock_ctx->key.dsize == 0) {
                        args[5] = talloc_strdup(args, "NULL");
                } else {
@@ -615,21 +636,23 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
                break;
 
        case LOCK_DB:
-               args[3] = talloc_strdup(args, "DB");
-               args[4] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[2] = talloc_strdup(args, "DB");
+               args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                                         db_flags(lock_ctx->ctdb_db));
                break;
 
        case LOCK_ALLDB_PRIO:
-               args[3] = talloc_strdup(args, "DB");
+               args[2] = talloc_strdup(args, "DB");
                list.names = args;
-               list.n = 4;
+               list.n = 3;
                ctdb_db_iterator(ctdb, lock_ctx->priority, db_name_handler, &list);
                break;
 
        case LOCK_ALLDB:
-               args[3] = talloc_strdup(args, "DB");
+               args[2] = talloc_strdup(args, "DB");
                list.names = args;
-               list.n = 4;
+               list.n = 3;
                for (priority=1; priority<NUM_DB_PRIORITIES; priority++) {
                        ctdb_db_iterator(ctdb, priority, db_name_handler, &list);
                }
@@ -642,13 +665,73 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
        for (i=0; i<nargs-1; i++) {
                if (args[i] == NULL) {
                        talloc_free(args);
-                       return NULL;
+                       return false;
                }
        }
 
-       return args;
+       *argc = nargs;
+       *argv = args;
+       return true;
 }
 
+/*
+ * Find a lock request that can be scheduled
+ */
+static struct lock_context *ctdb_find_lock_context(struct ctdb_context *ctdb)
+{
+       struct lock_context *lock_ctx, *next_ctx;
+       struct ctdb_db_context *ctdb_db;
+
+       /* First check if there are database lock requests */
+
+       for (lock_ctx = ctdb->lock_pending; lock_ctx != NULL;
+            lock_ctx = next_ctx) {
+
+               if (lock_ctx->request != NULL) {
+                       /* Found a lock context with a request */
+                       return lock_ctx;
+               }
+
+               next_ctx = lock_ctx->next;
+
+               DEBUG(DEBUG_INFO, ("Removing lock context without lock "
+                                  "request\n"));
+               DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
+               CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
+               if (lock_ctx->ctdb_db) {
+                       CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db,
+                                              locks.num_pending);
+               }
+               talloc_free(lock_ctx);
+       }
+
+       /* Next check database queues */
+       for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
+               if (ctdb_db->lock_num_current ==
+                   ctdb->tunable.lock_processes_per_db) {
+                       continue;
+               }
+
+               for (lock_ctx = ctdb_db->lock_pending; lock_ctx != NULL;
+                    lock_ctx = next_ctx) {
+
+                       next_ctx = lock_ctx->next;
+
+                       if (lock_ctx->request != NULL) {
+                               return lock_ctx;
+                       }
+
+                       DEBUG(DEBUG_INFO, ("Removing lock context without "
+                                          "lock request\n"));
+                       DLIST_REMOVE(ctdb_db->lock_pending, lock_ctx);
+                       CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
+                       CTDB_DECREMENT_DB_STAT(ctdb_db, locks.num_pending);
+                       talloc_free(lock_ctx);
+               }
+       }
+
+       return NULL;
+}
 
 /*
  * Schedule a new lock child process
@@ -656,12 +739,12 @@ static char **lock_helper_args(TALLOC_CTX *mem_ctx, struct lock_context *lock_ct
  */
 static void ctdb_lock_schedule(struct ctdb_context *ctdb)
 {
-       struct lock_context *lock_ctx, *next_ctx;
-       int ret;
+       struct lock_context *lock_ctx;
+       int ret, argc;
        TALLOC_CTX *tmp_ctx;
-       const char *helper = BINDIR "/ctdb_lock_helper";
+       const char *helper = CTDB_HELPER_BINDIR "/ctdb_lock_helper";
        static const char *prog = NULL;
-       char **args;
+       const char **args;
 
        if (prog == NULL) {
                const char *t;
@@ -675,32 +758,8 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                CTDB_NO_MEMORY_VOID(ctdb, prog);
        }
 
-       if (ctdb->lock_pending == NULL) {
-               return;
-       }
-
        /* Find a lock context with requests */
-       lock_ctx = ctdb->lock_pending;
-       while (lock_ctx != NULL) {
-               next_ctx = lock_ctx->next;
-               if (! lock_ctx->request) {
-                       DEBUG(DEBUG_INFO, ("Removing lock context without lock request\n"));
-                       DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
-                       CTDB_DECREMENT_STAT(ctdb, locks.num_pending);
-                       if (lock_ctx->ctdb_db) {
-                               CTDB_DECREMENT_DB_STAT(lock_ctx->ctdb_db, locks.num_pending);
-                       }
-                       talloc_free(lock_ctx);
-               } else {
-                       if (lock_ctx->ctdb_db == NULL ||
-                           lock_ctx->ctdb_db->lock_num_current < ctdb->tunable.lock_processes_per_db) {
-                               /* Found a lock context with lock requests */
-                               break;
-                       }
-               }
-               lock_ctx = next_ctx;
-       }
-
+       lock_ctx = ctdb_find_lock_context(ctdb);
        if (lock_ctx == NULL) {
                return;
        }
@@ -724,8 +783,8 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
        }
 
        /* Create arguments for lock helper */
-       args = lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1]);
-       if (args == NULL) {
+       if (!lock_helper_args(tmp_ctx, lock_ctx, lock_ctx->fd[1],
+                             &argc, &args)) {
                DEBUG(DEBUG_ERR, ("Failed to create lock helper args\n"));
                close(lock_ctx->fd[0]);
                close(lock_ctx->fd[1]);
@@ -733,9 +792,9 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                return;
        }
 
-       lock_ctx->child = vfork();
-
-       if (lock_ctx->child == (pid_t)-1) {
+       if (!ctdb_vfork_with_logging(lock_ctx, ctdb, "lock_helper",
+                                    prog, argc, (const char **)args,
+                                    NULL, NULL, &lock_ctx->child)) {
                DEBUG(DEBUG_ERR, ("Failed to create a child in ctdb_lock_schedule\n"));
                close(lock_ctx->fd[0]);
                close(lock_ctx->fd[1]);
@@ -743,19 +802,7 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
                return;
        }
 
-
-       /* Child process */
-       if (lock_ctx->child == 0) {
-               ret = execv(prog, args);
-               if (ret < 0) {
-                       DEBUG(DEBUG_ERR, ("Failed to execute helper %s (%d, %s)\n",
-                                         prog, errno, strerror(errno)));
-               }
-               _exit(1);
-       }
-
        /* Parent process */
-       ctdb_track_child(ctdb, lock_ctx->child);
        close(lock_ctx->fd[1]);
 
        talloc_set_destructor(lock_ctx, ctdb_lock_context_destructor);
@@ -794,8 +841,13 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
        tevent_fd_set_auto_close(lock_ctx->tfd);
 
        /* Move the context from pending to current */
-       DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
-       DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
+       if (lock_ctx->type == LOCK_RECORD) {
+               DLIST_REMOVE(lock_ctx->ctdb_db->lock_pending, lock_ctx);
+               DLIST_ADD_END(lock_ctx->ctdb_db->lock_current, lock_ctx, NULL);
+       } else {
+               DLIST_REMOVE(ctdb->lock_pending, lock_ctx);
+               DLIST_ADD_END(ctdb->lock_current, lock_ctx, NULL);
+       }
        CTDB_DECREMENT_STAT(lock_ctx->ctdb, locks.num_pending);
        CTDB_INCREMENT_STAT(lock_ctx->ctdb, locks.num_current);
        if (lock_ctx->ctdb_db) {
@@ -809,7 +861,8 @@ static void ctdb_lock_schedule(struct ctdb_context *ctdb)
 /*
  * Lock record / db depending on type
  */
-static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
+static struct lock_request *ctdb_lock_internal(TALLOC_CTX *mem_ctx,
+                                              struct ctdb_context *ctdb,
                                               struct ctdb_db_context *ctdb_db,
                                               TDB_DATA key,
                                               uint32_t priority,
@@ -832,7 +885,7 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
                return NULL;
        }
 
-       if ((request = talloc_zero(lock_ctx, struct lock_request)) == NULL) {
+       if ((request = talloc_zero(mem_ctx, struct lock_request)) == NULL) {
                talloc_free(lock_ctx);
                return NULL;
        }
@@ -862,9 +915,9 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
         * immediately, so keep them at the head of the pending queue.
         */
        if (lock_ctx->type == LOCK_RECORD) {
-               DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
+               DLIST_ADD_END(ctdb_db->lock_pending, lock_ctx, NULL);
        } else {
-               DLIST_ADD(ctdb->lock_pending, lock_ctx);
+               DLIST_ADD_END(ctdb->lock_pending, lock_ctx, NULL);
        }
        CTDB_INCREMENT_STAT(ctdb, locks.num_pending);
        if (ctdb_db) {
@@ -889,13 +942,15 @@ static struct lock_request *ctdb_lock_internal(struct ctdb_context *ctdb,
 /*
  * obtain a lock on a record in a database
  */
-struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
+struct lock_request *ctdb_lock_record(TALLOC_CTX *mem_ctx,
+                                     struct ctdb_db_context *ctdb_db,
                                      TDB_DATA key,
                                      bool auto_mark,
                                      void (*callback)(void *, bool),
                                      void *private_data)
 {
-       return ctdb_lock_internal(ctdb_db->ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb_db->ctdb,
                                  ctdb_db,
                                  key,
                                  0,
@@ -909,12 +964,14 @@ struct lock_request *ctdb_lock_record(struct ctdb_db_context *ctdb_db,
 /*
  * obtain a lock on a database
  */
-struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
+struct lock_request *ctdb_lock_db(TALLOC_CTX *mem_ctx,
+                                 struct ctdb_db_context *ctdb_db,
                                  bool auto_mark,
                                  void (*callback)(void *, bool),
                                  void *private_data)
 {
-       return ctdb_lock_internal(ctdb_db->ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb_db->ctdb,
                                  ctdb_db,
                                  tdb_null,
                                  0,
@@ -928,7 +985,8 @@ struct lock_request *ctdb_lock_db(struct ctdb_db_context *ctdb_db,
 /*
  * obtain locks on all databases of specified priority
  */
-struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
+struct lock_request *ctdb_lock_alldb_prio(TALLOC_CTX *mem_ctx,
+                                         struct ctdb_context *ctdb,
                                          uint32_t priority,
                                          bool auto_mark,
                                          void (*callback)(void *, bool),
@@ -939,7 +997,8 @@ struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
                return NULL;
        }
 
-       return ctdb_lock_internal(ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb,
                                  NULL,
                                  tdb_null,
                                  priority,
@@ -953,12 +1012,14 @@ struct lock_request *ctdb_lock_alldb_prio(struct ctdb_context *ctdb,
 /*
  * obtain locks on all databases
  */
-struct lock_request *ctdb_lock_alldb(struct ctdb_context *ctdb,
+struct lock_request *ctdb_lock_alldb(TALLOC_CTX *mem_ctx,
+                                    struct ctdb_context *ctdb,
                                     bool auto_mark,
                                     void (*callback)(void *, bool),
                                     void *private_data)
 {
-       return ctdb_lock_internal(ctdb,
+       return ctdb_lock_internal(mem_ctx,
+                                 ctdb,
                                  NULL,
                                  tdb_null,
                                  0,