ctdb_lockwait: create overflow queue.
[sahlberg/ctdb.git] / server / ctdb_lockwait.c
index de802187cf19c4212392229ce90c86b022875f97..48fa7961986f2c57d4664d02f2e4fd67a164d4dc 100644 (file)
 #include "system/wait.h"
 #include "db_wrap.h"
 #include "lib/tdb/include/tdb.h"
+#include "lib/util/dlinklist.h"
 #include "../include/ctdb_private.h"
 
 
 struct lockwait_handle {
+       struct lockwait_handle *next, *prev;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
        struct fd_event *fde;
@@ -38,6 +40,36 @@ struct lockwait_handle {
        struct timeval start_time;
 };
 
+/* If we managed to obtain a lock, find any overflow records which wanted the
+ * same one and do all the callbacks at once. */
+static void do_overflow(struct ctdb_db_context *ctdb_db,
+                       TDB_DATA key)
+{
+       struct lockwait_handle *i, *next;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb_db);
+
+       for (i = ctdb_db->lockwait_overflow; i; i = next) {
+               /* Careful: destructor removes it from list! */
+               next = i->next;
+               if (key.dsize == i->key.dsize
+                   && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) {
+                       /* Callback might free them, so reparent. */
+                       talloc_steal(tmp_ctx, i);
+                       i->callback(i->private_data);
+               }
+       }
+
+       /* This will free them if callback didn't. */
+       talloc_free(tmp_ctx);
+
+       /* Remove one from the overflow queue if there is one. */
+       if (ctdb_db->lockwait_overflow) {
+               i = ctdb->lockwait_overflow;
+               ctdb_lockwait(ctdb_db, i->key, i->callback, i->private_data);
+               talloc_free(i);
+       }
+}
+
 static void lockwait_handler(struct event_context *ev, struct fd_event *fde, 
                             uint16_t flags, void *private_data)
 {
@@ -51,10 +83,11 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
        TALLOC_CTX *tmp_ctx = talloc_new(ev);
 
        key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
+       h->ctdb_db->pending_requests--;
 
        talloc_set_destructor(h, NULL);
-       ctdb_latency(h->ctdb_db, "lockwait", &h->ctdb->statistics.max_lockwait_latency, h->start_time);
-       h->ctdb->statistics.pending_lockwait_calls--;
+       CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "lockwait", lockwait_latency, h->start_time);
+       CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
 
        /* the handle needs to go away when the context is gone - when
           the handle goes away this implicitly closes the pipe, which
@@ -69,6 +102,9 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
 
        tdb_chainlock_mark(tdb, key);
        callback(p);
+       if (h->ctdb_db->lockwait_overflow) {
+               do_overflow(h->ctdb_db, key);
+       }
        tdb_chainlock_unmark(tdb, key);
 
        kill(child, SIGKILL);
@@ -77,8 +113,16 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
 
 static int lockwait_destructor(struct lockwait_handle *h)
 {
-       h->ctdb->statistics.pending_lockwait_calls--;
+       CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
        kill(h->child, SIGKILL);
+       h->ctdb_db->pending_requests--;
+       return 0;
+}
+
+static int overflow_lockwait_destructor(struct lockwait_handle *h)
+{
+       CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
+       DLIST_REMOVE(h->ctdb_db->lockwait_overflow, h);
        return 0;
 }
 
@@ -101,41 +145,51 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
        int ret;
        pid_t parent = getpid();
 
-       ctdb_db->ctdb->statistics.lockwait_calls++;
-       ctdb_db->ctdb->statistics.pending_lockwait_calls++;
+       CTDB_INCREMENT_STAT(ctdb_db->ctdb, lockwait_calls);
+       CTDB_INCREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
 
        if (!(result = talloc_zero(private_data, struct lockwait_handle))) {
-               ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
                return NULL;
        }
 
+       result->callback = callback;
+       result->private_data = private_data;
+       result->ctdb = ctdb_db->ctdb;
+       result->ctdb_db = ctdb_db;
+       result->key = key;
+
+       /* Don't fire off too many children at once! */
+       if (ctdb_db->pending_requests > 200) {
+               DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL);
+               talloc_set_destructor(result, overflow_lockwait_destructor);
+               DEBUG(DEBUG_DEBUG, (__location__ " Created overflow for %s\n",
+                                   ctdb_db->db_name));
+               return result;
+       }
+
        ret = pipe(result->fd);
 
        if (ret != 0) {
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
                return NULL;
        }
 
-       result->child = fork();
+       result->child = ctdb_fork(ctdb_db->ctdb);
 
        if (result->child == (pid_t)-1) {
                close(result->fd[0]);
                close(result->fd[1]);
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
                return NULL;
        }
 
-       result->callback = callback;
-       result->private_data = private_data;
-       result->ctdb = ctdb_db->ctdb;
-       result->ctdb_db = ctdb_db;
-       result->key = key;
-
        if (result->child == 0) {
                char c = 0;
                close(result->fd[0]);
+               debug_extra = talloc_asprintf(NULL, "chainlock-%s:", ctdb_db->db_name);
                tdb_chainlock(ctdb_db->ltdb->tdb, key);
                write(result->fd[1], &c, 1);
                /* make sure we die when our parent dies */
@@ -150,6 +204,7 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
 
        DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
 
+       ctdb_db->pending_requests++;
        talloc_set_destructor(result, lockwait_destructor);
 
        result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
@@ -157,12 +212,11 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
                                   (void *)result);
        if (result->fde == NULL) {
                talloc_free(result);
-               ctdb_db->ctdb->statistics.pending_lockwait_calls--;
+               CTDB_DECREMENT_STAT(ctdb_db->ctdb, pending_lockwait_calls);
                return NULL;
        }
        tevent_fd_set_auto_close(result->fde);
 
        result->start_time = timeval_current();
-
        return result;
 }