#include "system/wait.h"
#include "../include/ctdb_client.h"
#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
#include <sys/socket.h>
struct ctdb_client_pid_list {
daemon_incoming_packet(client, hdr);
}
-struct ctdb_deferred_fetch_call {
- struct ctdb_deferred_fetch_call *next, *prev;
- struct ctdb_req_call *c;
- struct ctdb_daemon_packet_wrap *w;
-};
-
-struct ctdb_deferred_fetch_queue {
- struct ctdb_deferred_fetch_call *deferred_calls;
-};
-
-struct ctdb_deferred_requeue {
- struct ctdb_deferred_fetch_call *dfc;
- struct ctdb_client *client;
-};
-
-/* called from a timer event and starts reprocessing the deferred call.*/
-static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
- struct ctdb_client *client = dfr->client;
-
- talloc_steal(client, dfr->dfc->c);
- daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
- talloc_free(dfr);
-}
-
-/* the referral context is destroyed either after a timeout or when the initial
- fetch-lock has finished.
- at this stage, immediately start reprocessing the queued up deferred
- calls so they get reprocessed immediately (and since we are dmaster at
- this stage, trigger the waiting smbd processes to pick up and aquire the
- record right away.
-*/
-static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
-{
-
- /* need to reprocess the packets from the queue explicitely instead of
- just using a normal destructor since we want, need, to
- call the clients in the same oder as the requests queued up
- */
- while (dfq->deferred_calls != NULL) {
- struct ctdb_client *client;
- struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
- struct ctdb_deferred_requeue *dfr;
-
- DLIST_REMOVE(dfq->deferred_calls, dfc);
-
- client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
- if (client == NULL) {
- DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
- dfc->w->client_id));
- continue;
- }
-
- /* process it by pushing it back onto the eventloop */
- dfr = talloc(client, struct ctdb_deferred_requeue);
- if (dfr == NULL) {
- DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
- continue;
- }
-
- dfr->dfc = talloc_steal(dfr, dfc);
- dfr->client = client;
-
- event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
- }
-
- return 0;
-}
-
-/* insert the new deferral context into the rb tree.
- there should never be a pre-existing context here, but check for it
- warn and destroy the previous context if there is already a deferral context
- for this key.
-*/
-static void *insert_dfq_callback(void *parm, void *data)
-{
- if (data) {
- DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
- talloc_free(data);
- }
- return parm;
-}
-
-/* if the original fetch-lock did not complete within a reasonable time,
- free the context and context for all deferred requests to cause them to be
- re-inserted into the event system.
-*/
-static void dfq_timeout(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
-{
- talloc_free(private_data);
-}
-
-/* This function is used in the local daemon to register a KEY in a database
- for being "fetched"
- While the remote fetch is in-flight, any futher attempts to re-fetch the
- same record will be deferred until the fetch completes.
-*/
-static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
-{
- uint32_t *k;
- struct ctdb_deferred_fetch_queue *dfq;
-
- k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
- if (k == NULL) {
- DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
- return -1;
- }
-
- k[0] = (call->key.dsize + 3) / 4 + 1;
- memcpy(&k[1], call->key.dptr, call->key.dsize);
-
- dfq = talloc(call, struct ctdb_deferred_fetch_queue);
- if (dfq == NULL) {
- DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
- talloc_free(k);
- return -1;
- }
- dfq->deferred_calls = NULL;
-
- trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
-
- talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
-
- /* if the fetch havent completed in 30 seconds, just tear it all down
- and let it try again as the events are reissued */
- event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
-
- talloc_free(k);
- return 0;
-}
-
-/* check if this is a duplicate request to a fetch already in-flight
- if it is, make this call deferred to be reprocessed later when
- the in-flight fetch completes.
-*/
-static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
-{
- uint32_t *k;
- struct ctdb_deferred_fetch_queue *dfq;
- struct ctdb_deferred_fetch_call *dfc;
-
- k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
- if (k == NULL) {
- DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
- return -1;
- }
-
- k[0] = (key.dsize + 3) / 4 + 1;
- memcpy(&k[1], key.dptr, key.dsize);
-
- dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
- if (dfq == NULL) {
- talloc_free(k);
- return -1;
- }
-
-
- talloc_free(k);
-
- dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
- if (dfc == NULL) {
- DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
- return -1;
- }
-
- dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
- if (dfc->w == NULL) {
- DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
- talloc_free(dfc);
- return -1;
- }
-
- dfc->c = talloc_steal(dfc, c);
- dfc->w->ctdb = ctdb_db->ctdb;
- dfc->w->client_id = client->client_id;
-
- DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
-
- return 0;
-}
-
/*
this is called when the ctdb daemon received a ctdb request call
return;
}
- if (ctdb->tunable.fetch_lock_collapse == 1) {
- if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
- ret = ctdb_ltdb_unlock(ctdb_db, key);
- if (ret != 0) {
- DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
- }
- return;
- }
- }
-
dstate = talloc(client, struct daemon_call_state);
if (dstate == NULL) {
ret = ctdb_ltdb_unlock(ctdb_db, key);
state = ctdb_call_local_send(ctdb_db, call, &header, &data);
} else {
state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
- if (ctdb->tunable.fetch_lock_collapse == 1) {
- /* This request triggered a remote fetch-lock.
- set up a deferral for this key so any additional
- fetch-locks are deferred until the current one
- finishes.
- */
- setup_deferred_fetch_locks(ctdb_db, call);
- }
}
ret = ctdb_ltdb_unlock(ctdb_db, key);