From 8f26219bf65c1ed564f98696833c1aed6d8de988 Mon Sep 17 00:00:00 2001 From: Rusty Russell Date: Fri, 21 Jan 2011 21:17:02 +1030 Subject: [PATCH] ctdb_lockwait: create overflow queue. Once we have more than 200 children waiting on a particular db, don't create any more. Just put them on an overflow queue, and when a child gets a lock search that queue to see if others were after the same lock (they probably were). --- include/ctdb_private.h | 2 ++ server/ctdb_lockwait.c | 67 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 62 insertions(+), 7 deletions(-) diff --git a/include/ctdb_private.h b/include/ctdb_private.h index cb2b066d..6e5c4636 100644 --- a/include/ctdb_private.h +++ b/include/ctdb_private.h @@ -506,6 +506,8 @@ struct ctdb_db_context { bool transaction_active; struct ctdb_vacuum_handle *vacuum_handle; char *unhealthy_reason; + int pending_requests; + struct lockwait_handle *lockwait_overflow; }; diff --git a/server/ctdb_lockwait.c b/server/ctdb_lockwait.c index 1d3a5976..48fa7961 100644 --- a/server/ctdb_lockwait.c +++ b/server/ctdb_lockwait.c @@ -23,10 +23,12 @@ #include "system/wait.h" #include "db_wrap.h" #include "lib/tdb/include/tdb.h" +#include "lib/util/dlinklist.h" #include "../include/ctdb_private.h" struct lockwait_handle { + struct lockwait_handle *next, *prev; struct ctdb_context *ctdb; struct ctdb_db_context *ctdb_db; struct fd_event *fde; @@ -38,6 +40,36 @@ struct lockwait_handle { struct timeval start_time; }; +/* If we managed to obtain a lock, find any overflow records which wanted the + * same one and do all the callbacks at once. */ +static void do_overflow(struct ctdb_db_context *ctdb_db, + TDB_DATA key) +{ + struct lockwait_handle *i, *next; + TALLOC_CTX *tmp_ctx = talloc_new(ctdb_db); + + for (i = ctdb_db->lockwait_overflow; i; i = next) { + /* Careful: destructor removes it from list! */ + next = i->next; + if (key.dsize == i->key.dsize + && memcmp(key.dptr, i->key.dptr, key.dsize) == 0) { + /* Callback might free them, so reparent. */ + talloc_steal(tmp_ctx, i); + i->callback(i->private_data); + } + } + + /* This will free them if callback didn't. */ + talloc_free(tmp_ctx); + + /* Remove one from the overflow queue if there is one. */ + if (ctdb_db->lockwait_overflow) { + i = ctdb->lockwait_overflow; + ctdb_lockwait(ctdb_db, i->key, i->callback, i->private_data); + talloc_free(i); + } +} + static void lockwait_handler(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data) { @@ -51,6 +83,7 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde, TALLOC_CTX *tmp_ctx = talloc_new(ev); key.dptr = talloc_memdup(tmp_ctx, key.dptr, key.dsize); + h->ctdb_db->pending_requests--; talloc_set_destructor(h, NULL); CTDB_UPDATE_LATENCY(h->ctdb, h->ctdb_db, "lockwait", lockwait_latency, h->start_time); @@ -69,6 +102,9 @@ static void lockwait_handler(struct event_context *ev, struct fd_event *fde, tdb_chainlock_mark(tdb, key); callback(p); + if (h->ctdb_db->lockwait_overflow) { + do_overflow(h->ctdb_db, key); + } tdb_chainlock_unmark(tdb, key); kill(child, SIGKILL); @@ -79,6 +115,14 @@ static int lockwait_destructor(struct lockwait_handle *h) { CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls); kill(h->child, SIGKILL); + h->ctdb_db->pending_requests--; + return 0; +} + +static int overflow_lockwait_destructor(struct lockwait_handle *h) +{ + CTDB_DECREMENT_STAT(h->ctdb, pending_lockwait_calls); + DLIST_REMOVE(h->ctdb_db->lockwait_overflow, h); return 0; } @@ -109,6 +153,21 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, return NULL; } + result->callback = callback; + result->private_data = private_data; + result->ctdb = ctdb_db->ctdb; + result->ctdb_db = ctdb_db; + result->key = key; + + /* Don't fire off too many children at once! */ + if (ctdb_db->pending_requests > 200) { + DLIST_ADD_END(ctdb_db->lockwait_overflow, result, NULL); + talloc_set_destructor(result, overflow_lockwait_destructor); + DEBUG(DEBUG_DEBUG, (__location__ " Created overflow for %s\n", + ctdb_db->db_name)); + return result; + } + ret = pipe(result->fd); if (ret != 0) { @@ -127,12 +186,6 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, return NULL; } - result->callback = callback; - result->private_data = private_data; - result->ctdb = ctdb_db->ctdb; - result->ctdb_db = ctdb_db; - result->key = key; - if (result->child == 0) { char c = 0; close(result->fd[0]); @@ -151,6 +204,7 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0])); + ctdb_db->pending_requests++; talloc_set_destructor(result, lockwait_destructor); result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0], @@ -164,6 +218,5 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db, tevent_fd_set_auto_close(result->fde); result->start_time = timeval_current(); - return result; } -- 2.34.1