Record Fetch Collapse: Collapse multiple fetch request into one single request.
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Mon, 7 Nov 2011 19:55:46 +0000 (06:55 +1100)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Tue, 8 Nov 2011 05:08:28 +0000 (16:08 +1100)
When multiple clients fetch the same record concurrently, send only one single
fetch across the network and deferr all other fetches locally.
This improves performance for hot records and reduces cpu load on ctdb.

(This used to be ctdb commit 82d6946ad8b3348e8b9d3d971f24925ade02d1be)

ctdb/include/ctdb_private.h
ctdb/server/ctdb_daemon.c
ctdb/server/ctdb_ltdb_server.c

index 7e5947301338eb738a08052285c8c09e1fc7cade..f1818b98dec7c8af44797d48dbec61bdf520fe5d 100644 (file)
@@ -527,6 +527,10 @@ struct ctdb_db_context {
                                  struct ctdb_ltdb_header *header,
                                  TDB_DATA data);
 
+       /* used to track which records we are currently fetching
+          so we can avoid sending duplicate fetch requests
+       */
+       struct trbt_tree *deferred_fetch;
 };
 
 
index 88d12103f77606a9fe67ec8750a4c7608b0bf723..69fb6fb1f62695d444877a4081527d24303f7deb 100644 (file)
@@ -27,6 +27,7 @@
 #include "system/wait.h"
 #include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include <sys/socket.h>
 
 struct ctdb_client_pid_list {
@@ -359,6 +360,190 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
        daemon_incoming_packet(client, hdr);    
 }
 
+struct ctdb_deferred_fetch_call {
+       struct ctdb_deferred_fetch_call *next, *prev;
+       struct ctdb_req_call *c;
+       struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+       struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+       struct ctdb_deferred_fetch_call *dfc;
+       struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
+                                      struct timeval t, void *private_data)
+{
+       struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+       struct ctdb_client *client = dfr->client;
+
+       talloc_steal(client, dfr->dfc->c);
+       daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+       talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+   fetch-lock has finished.
+   at this stage, immediately start reprocessing the queued up deferred
+   calls so they get reprocessed immediately (and since we are dmaster at
+   this stage, trigger the waiting smbd processes to pick up and aquire the
+   record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+       /* need to reprocess the packets from the queue explicitely instead of
+          just using a normal destructor since we want, need, to
+          call the clients in the same oder as the requests queued up
+       */
+       while (dfq->deferred_calls != NULL) {
+               struct ctdb_client *client;
+               struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+               struct ctdb_deferred_requeue *dfr;
+
+               DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+               client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+               if (client == NULL) {
+                       DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+                                dfc->w->client_id));
+                       continue;
+               }
+
+               /* process it by pushing it back onto the eventloop */
+               dfr = talloc(client, struct ctdb_deferred_requeue);
+               if (dfr == NULL) {
+                       DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+                       continue;
+               }
+
+               dfr->dfc    = talloc_steal(dfr, dfc);
+               dfr->client = client;
+
+               event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+       }
+
+       return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+   there should never be a pre-existing context here, but check for it
+   warn and destroy the previous context if there is already a deferral context
+   for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+        if (data) {
+               DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+                talloc_free(data);
+        }
+        return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+   free the context and context for all deferred requests to cause them to be
+   re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
+                                 struct timeval t, void *private_data)
+{
+       talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+   for being "fetched"
+   While the remote fetch is in-flight, any futher attempts to re-fetch the
+   same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+
+       k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       k[0] = (call->key.dsize + 3) / 4 + 1;
+       memcpy(&k[1], call->key.dptr, call->key.dsize);
+
+       dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
+       if (dfq == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+               talloc_free(k);
+               return -1;
+       }
+       dfq->deferred_calls = NULL;
+
+       trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+       talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+       /* if the fetch havent completed in 30 seconds, just tear it all down
+          and let it try again as the events are reissued */
+       event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+       talloc_free(k);
+       return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+   if it is, make this call deferred to be reprocessed later when
+   the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+       uint32_t *k;
+       struct ctdb_deferred_fetch_queue *dfq;
+       struct ctdb_deferred_fetch_call *dfc;
+
+       k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
+       if (k == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+               return -1;
+       }
+
+       k[0] = (key.dsize + 3) / 4 + 1;
+       memcpy(&k[1], key.dptr, key.dsize);
+
+       dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+       if (dfq == NULL) {
+               talloc_free(k);
+               return -1;
+       }
+
+
+       talloc_free(k);
+
+       dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+       if (dfc == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+               return -1;
+       }
+
+       dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+       if (dfc->w == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+               talloc_free(dfc);
+               return -1;
+       }
+
+       dfc->c = talloc_steal(dfc, c);
+       dfc->w->ctdb = ctdb_db->ctdb;
+       dfc->w->client_id = client->client_id;
+
+       DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+       return 0;
+}
+
 
 /*
   this is called when the ctdb daemon received a ctdb request call
@@ -424,6 +609,20 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                return;
        }
 
+       if (c->flags & CTDB_IMMEDIATE_MIGRATION) {
+               /* check if this fetch-lock request is a duplicate for a
+                  request we already have in flight. If so defer it until
+                  the first request completes.
+                */
+               if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+                       ret = ctdb_ltdb_unlock(ctdb_db, key);
+                       if (ret != 0) {
+                               DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+                       }
+                       return;
+               }
+       }
+
        /* Dont do READONLY if we dont have a tracking database */
        if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
                c->flags &= ~CTDB_WANT_READONLY;
@@ -513,6 +712,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
                state = ctdb_call_local_send(ctdb_db, call, &header, &data);
        } else {
                state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+               if (call->flags & CTDB_IMMEDIATE_MIGRATION) {
+                       /* This request triggered a remote fetch-lock.
+                          set up a deferral for this key so any additional
+                          fetch-locks are deferred until the current one
+                          finishes.
+                        */
+                       setup_deferred_fetch_locks(ctdb_db, call);
+               }
        }
 
        ret = ctdb_ltdb_unlock(ctdb_db, key);
index 745b3d256bc6e696541ac3c9b0e92a1029e75b2d..713606d75930efdf11facceb8546121e4475db24 100644 (file)
@@ -962,6 +962,17 @@ again:
                }
        }
 
+       /* set up a rb tree we can use to track which records we have a 
+          fetch-lock in-flight for so we can defer any additional calls
+          for the same record.
+        */
+       ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+       if (ctdb_db->deferred_fetch == NULL) {
+               DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+               talloc_free(ctdb_db);
+               return -1;
+       }
+
        DLIST_ADD(ctdb->db_list, ctdb_db);
 
        /* setting this can help some high churn databases */