Record Fetch Collapse: Collapse multiple fetch request into one single request
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Wed, 7 Mar 2012 04:58:51 +0000 (15:58 +1100)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Wed, 7 Mar 2012 04:58:51 +0000 (15:58 +1100)
When multiple clients fetch the same record concurrently, send only one single
fetch across the network and deferr all other fetches locally.
This improves performance for hot records and reduces cpu load on ctdb.

include/ctdb_private.h
server/ctdb_daemon.c
server/ctdb_ltdb_server.c
server/ctdb_tunables.c

index a1db4d1322ec43477f0e9ca396bb638b9cdd07ef..624aa7f95ac4528e4b9914cd625f32dc7c14b5f8 100644 (file)
@@ -122,6 +122,7 @@ struct ctdb_tunable {
        uint32_t vacuum_fast_path_count;
        uint32_t lcp2_public_ip_assignment;
        uint32_t allow_client_db_attach;
+       uint32_t fetch_lock_collapse;
 };
 
 /*
@@ -523,6 +524,10 @@ struct ctdb_db_context {
                                  struct ctdb_ltdb_header *header,
                                  TDB_DATA data);
 
+       /* used to track which records we are currently fetching
+          so we can avoid sending duplicate fetch requests
+       */
+       struct trbt_tree *deferred_fetch;
 };
 
 
index 8b9f2eb2ceb8e29e078d59a8c233773ad2a4619e..3d5a71e0602a27158b6773ceceb2201b2f494bdd 100644 (file)
@@ -27,6 +27,7 @@
 #include "system/wait.h"
 #include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include <sys/socket.h>
 
 struct ctdb_client_pid_list {
@@ -394,6 +395,190 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
        daemon_incoming_packet(client, hdr);    
 }
 
+struct ctdb_deferred_fetch_call {
+       struct ctdb_deferred_fetch_call *next, *prev;
+       struct ctdb_req_call *c;
+       struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+       struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+       struct ctdb_deferred_fetch_call *dfc;
+       struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
+                                      struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+       struct ctdb_client *client = dfr->client;
+
+       talloc_steal(client, dfr->dfc->c);
+       daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+       talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+   fetch-lock has finished.
+   at this stage, immediately start reprocessing the queued up deferred
+   calls so they get reprocessed immediately (and since we are dmaster at
+   this stage, trigger the waiting smbd processes to pick up and aquire the
+   record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+       /* need to reprocess the packets from the queue explicitely instead of
+          just using a normal destructor since we want, need, to
+          call the clients in the same oder as the requests queued up
+       */
+       while (dfq->deferred_calls != NULL) {
+               struct ctdb_client *client;
+               struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+               struct ctdb_deferred_requeue *dfr;
+
+               DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+               client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+               if (client == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+                                dfc->w->client_id));
+                       continue;
+               }
+
+               /* process it by pushing it back onto the eventloop */
+               dfr = talloc(client, struct ctdb_deferred_requeue);
+               if (dfr == NULL) {
+                       DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+                       continue;
+               }
+
+               dfr->dfc    = talloc_steal(dfr, dfc);
+               dfr->client = client;
+
+               event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+       }
+
+       return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+   there should never be a pre-existing context here, but check for it
+   warn and destroy the previous context if there is already a deferral context
+   for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+        if (data) {
+               DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+                talloc_free(data);
+        }
+        return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+   free the context and context for all deferred requests to cause them to be
+   re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
+                                 struct timeval t, void *private_data)
+{
+       talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+   for being "fetched"
+   While the remote fetch is in-flight, any futher attempts to re-fetch the
+   same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+
+       k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       k[0] = (call->key.dsize + 3) / 4 + 1;
+       memcpy(&k[1], call->key.dptr, call->key.dsize);
+
+       dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
+       if (dfq == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+               talloc_free(k);
+               return -1;
+       }
+       dfq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+       talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+       /* if the fetch havent completed in 30 seconds, just tear it all down
+          and let it try again as the events are reissued */
+       event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+       talloc_free(k);
+       return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+   if it is, make this call deferred to be reprocessed later when
+   the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+       struct ctdb_deferred_fetch_call *dfc;
+
+       k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       k[0] = (key.dsize + 3) / 4 + 1;
+       memcpy(&k[1], key.dptr, key.dsize);
+
+       dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+       if (dfq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+
+       talloc_free(k);
+
+       dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+       if (dfc == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+               return -1;
+       }
+
+       dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+       if (dfc->w == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+               talloc_free(dfc);
+               return -1;
+       }
+
+       dfc->c = talloc_steal(dfc, c);
+       dfc->w->ctdb = ctdb_db->ctdb;
+       dfc->w->client_id = client->client_id;
+
+       DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+       return 0;
+}
+
 
 /*
   this is called when the ctdb daemon received a ctdb request call
@@ -459,6 +644,16 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                return;
        }
 
+       if (ctdb->tunable.fetch_lock_collapse == 1) {
+               if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+                       ret = ctdb_ltdb_unlock(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
+       }
+
        dstate = talloc(client, struct daemon_call_state);
        if (dstate == NULL) {
                ret = ctdb_ltdb_unlock(ctdb_db, key);
@@ -498,6 +693,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                state = ctdb_call_local_send(ctdb_db, call, &header, &data);
        } else {
                state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+               if (ctdb->tunable.fetch_lock_collapse == 1) {
+                       /* This request triggered a remote fetch-lock.
+                          set up a deferral for this key so any additional
+                          fetch-locks are deferred until the current one
+                          finishes.
+                        */
+                       setup_deferred_fetch_locks(ctdb_db, call);
+               }
        }
 
        ret = ctdb_ltdb_unlock(ctdb_db, key);
index 3d18e06155802947bb1d35108feec4a6750bf820..39dfdf3dba9bb7a3474782db1df026e8d2dee1a3 100644 (file)
@@ -905,6 +905,17 @@ again:
                }
        }
 
+       /* set up a rb tree we can use to track which records we have a 
+          fetch-lock in-flight for so we can defer any additional calls
+          for the same record.
+        */
+       ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+       if (ctdb_db->deferred_fetch == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
        /* setting this can help some high churn databases */
index 092c30fe9b404265a11c8db3d7139f2fbab342a2..56eadc5e20ef8e60e071135675435d2c0ef620dc 100644 (file)
@@ -69,7 +69,8 @@ static const struct {
        { "AllowUnhealthyDBRead", 0,  offsetof(struct ctdb_tunable, allow_unhealthy_db_read) },
        { "StatHistoryInterval",  1,  offsetof(struct ctdb_tunable, stat_history_interval) },
        { "DeferredAttachTO",  120,  offsetof(struct ctdb_tunable, deferred_attach_timeout) },
-       { "AllowClientDBAttach", 1, offsetof(struct ctdb_tunable, allow_client_db_attach) }
+       { "AllowClientDBAttach", 1, offsetof(struct ctdb_tunable, allow_client_db_attach) },
+       { "FetchLockCollapse", 1, offsetof(struct ctdb_tunable, fetch_lock_collapse) }
 };
 
 /*