ctdb_lockwait: create overflow queue.
authorRusty Russell <rusty@rustcorp.com.au>
Fri, 21 Jan 2011 10:47:02 +0000 (21:17 +1030)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Mon, 24 Jan 2011 01:20:00 +0000 (12:20 +1100)
Once we have more than 200 children waiting on a particular db, don't create
any more.  Just put them on an overflow queue, and when a child gets a lock
search that queue to see if others were after the same lock (they probably
were).

include/ctdb_private.h
server/ctdb_lockwait.c

index cb2b066d2b0b1095069c5040893c272f8e7e78f1..6e5c46365a991a4fdcc923b5ff3ce141a95eec1a 100644 (file)
@@ -506,6 +506,8 @@ struct ctdb_db_context {
        bool transaction_active;
        struct ctdb_vacuum_handle *vacuum_handle;
        char *unhealthy_reason;
+       int pending_requests;
+       struct lockwait_handle *lockwait_overflow;
 };
 
 
index 1d3a59766acdb7cab59abc53c8a23849646129e9..48fa7961986f2c57d4664d02f2e4fd67a164d4dc 100644 (file)
 #include "system/wait.h"
 #include "db_wrap.h"
 #include "lib/tdb/include/tdb.h"
+#include "lib/util/dlinklist.h"
 #include "../include/ctdb_private.h"
 
 
 struct lockwait_handle {
+       struct lockwait_handle *next, *prev;
        struct ctdb_context *ctdb;
        struct ctdb_db_context *ctdb_db;
        struct fd_event *fde;
@@ -38,6 +40,36 @@ struct lockwait_handle {
        struct timeval start_time;
 };
 
+/* If we managed to obtain a lock, find any overflow records which wanted the
+ * same one and do all the callbacks at once. */
+static void do_overflow(struct ctdb_db_context *ctdb_db,
+                       TDB_DATA key)
+{
+       struct lockwait_handle *i, *next;
+       TALLOC_CTX *tmp_ctx = talloc_new(ctdb_db);
+
+       for (i = ctdb_db->lockwait_overflow; i; i = next) {
+               /* Careful: destructor removes it from list! */
+               next = i->next;
+               if (key.dsize == i->key.dsize
+                   && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) {
+                       /* Callback might free them, so reparent. */
+                       talloc_steal(tmp_ctx, i);
+                       i->callback(i->private_data);
+               }
+       }
+
+       /* This will free them if callback didn't. */
+       talloc_free(tmp_ctx);
+
+       /* Remove one from the overflow queue if there is one. */
+       if (ctdb_db->lockwait_overflow) {
+               i = ctdb->lockwait_overflow;
+               ctdb_lockwait(ctdb_db, i->key, i->callback, i->private_data);
+               talloc_free(i);
+       }
+}
+
 static void lockwait_handler(struct event_context *ev, struct fd_event *fde, 
                             uint16_t flags, void *private_data)
 {
@@ -51,6 +83,7 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
        TALLOC_CTX *tmp_ctx = talloc_new(ev);
 
        key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize);
+       h->ctdb_db->pending_requests--;
 
        talloc_set_destructor(h, NULL);
        CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "lockwait", lockwait_latency, h->start_time);
@@ -69,6 +102,9 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde,
 
        tdb_chainlock_mark(tdb, key);
        callback(p);
+       if (h->ctdb_db->lockwait_overflow) {
+               do_overflow(h->ctdb_db, key);
+       }
        tdb_chainlock_unmark(tdb, key);
 
        kill(child, SIGKILL);
@@ -79,6 +115,14 @@ static int lockwait_destructor(struct lockwait_handle *h)
 {
        CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
        kill(h->child, SIGKILL);
+       h->ctdb_db->pending_requests--;
+       return 0;
+}
+
+static int overflow_lockwait_destructor(struct lockwait_handle *h)
+{
+       CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls);
+       DLIST_REMOVE(h->ctdb_db->lockwait_overflow, h);
        return 0;
 }
 
@@ -109,6 +153,21 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
                return NULL;
        }
 
+       result->callback = callback;
+       result->private_data = private_data;
+       result->ctdb = ctdb_db->ctdb;
+       result->ctdb_db = ctdb_db;
+       result->key = key;
+
+       /* Don't fire off too many children at once! */
+       if (ctdb_db->pending_requests > 200) {
+               DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL);
+               talloc_set_destructor(result, overflow_lockwait_destructor);
+               DEBUG(DEBUG_DEBUG, (__location__ " Created overflow for %s\n",
+                                   ctdb_db->db_name));
+               return result;
+       }
+
        ret = pipe(result->fd);
 
        if (ret != 0) {
@@ -127,12 +186,6 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
                return NULL;
        }
 
-       result->callback = callback;
-       result->private_data = private_data;
-       result->ctdb = ctdb_db->ctdb;
-       result->ctdb_db = ctdb_db;
-       result->key = key;
-
        if (result->child == 0) {
                char c = 0;
                close(result->fd[0]);
@@ -151,6 +204,7 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
 
        DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
 
+       ctdb_db->pending_requests++;
        talloc_set_destructor(result, lockwait_destructor);
 
        result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
@@ -164,6 +218,5 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
        tevent_fd_set_auto_close(result->fde);
 
        result->start_time = timeval_current();
-
        return result;
 }