ReadOnly: Add processing for ReadOnly delegation requests and revoke requests to...
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Wed, 20 Jul 2011 05:13:47 +0000 (15:13 +1000)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Tue, 23 Aug 2011 00:32:02 +0000 (10:32 +1000)
This implements the ReadOnly and ReadWrite request processing, delegation and revoking of delegations for all requests coming in across the network from a remote node.

server/ctdb_call.c

index 7d1dd56c6d56d828b09053c784215c87669774ad..2df86b46c9474a3060e2ff1fdce9492c17ea51fd 100644 (file)
@@ -43,7 +43,6 @@
        return ctdb_db;
 }
 
-
 /*
   a varient of input packet that can be used in lock requeue
 */
@@ -489,6 +488,8 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        call->key.dsize = c->keylen;
        call->call_data.dptr = c->data + c->keylen;
        call->call_data.dsize = c->calldatalen;
+       call->reply_data.dptr  = NULL;
+       call->reply_data.dsize = 0;
 
        /* determine if we are the dmaster for this key. This also
           fetches the record data (if any), thus avoiding a 2nd fetch of the data 
@@ -505,9 +506,36 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-       /* if we are not the dmaster, then send a redirect to the
-          requesting node */
-       if (header.dmaster != ctdb->pnn) {
+       /* Dont do READONLY if we dont have a tracking database */
+       if ((c->flags & CTDB_WANT_READONLY) && ctdb_db->rottdb == NULL) {
+               c->flags &= ~CTDB_WANT_READONLY;
+       }
+
+       if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
+               header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
+               if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
+               }
+       }
+
+       /* if we are revoking, we must defer all other calls until the revoke
+        * had completed.
+        */
+       if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
+               talloc_free(data.dptr);
+               ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+
+               if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+               }
+               talloc_free(call);
+               return;
+       }
+
+       /* if we are not the dmaster and are not hosting any delegations,
+          then send a redirect to the requesting node */
+       if ((header.dmaster != ctdb->pnn) 
+           && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
                talloc_free(data.dptr);
                ctdb_call_send_redirect(ctdb, call->key, c, &header);
 
@@ -518,6 +546,80 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
+       if ( (!(c->flags & CTDB_WANT_READONLY))
+       && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
+               header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
+               if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+               }
+               ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+
+               if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to start record revoke");
+               }
+               talloc_free(data.dptr);
+
+               if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+               }
+               talloc_free(call);
+
+               return;
+       }               
+
+       /* If this is the first request for delegation. bump rsn and set
+        * the delegations flag
+        */
+       if ((c->flags & CTDB_WANT_READONLY)
+       &&  (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
+       &&  (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
+               header.rsn     += 3;
+               header.flags   |= CTDB_REC_RO_HAVE_DELEGATIONS;
+               if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
+                       ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+               }
+       }
+       if ((c->flags & CTDB_WANT_READONLY) 
+       &&  (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
+               TDB_DATA tdata;
+
+               tdata = tdb_fetch(ctdb_db->rottdb, call->key);
+               if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
+                       ctdb_fatal(ctdb, "Failed to add node to trackingdb");
+               }
+               if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
+                       ctdb_fatal(ctdb, "Failed to store trackingdb data");
+               }
+               free(tdata.dptr);
+
+               ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+               if (ret != 0) {
+                       DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+               }
+
+               len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
+               r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len, 
+                                           struct ctdb_reply_call);
+               CTDB_NO_MEMORY_FATAL(ctdb, r);
+               r->hdr.destnode  = c->hdr.srcnode;
+               r->hdr.reqid     = c->hdr.reqid;
+               r->status        = 0;
+               r->datalen       = data.dsize + sizeof(struct ctdb_ltdb_header);
+               header.rsn      -= 2;
+               header.flags   |= CTDB_REC_RO_HAVE_READONLY;
+               header.flags   &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
+               memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
+
+               if (data.dsize) {
+                       memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
+               }
+
+               ctdb_queue_packet(ctdb, &r->hdr);
+
+               talloc_free(r);
+               return;
+       }
+
        CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
 
        /* Try if possible to migrate the record off to the caller node.
@@ -922,8 +1024,8 @@ struct revokechild_requeue_handle {
        void *ctx;
 };
 
-static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
-                     struct timeval t, void *private_data)
+static void deferred_call_requeue(struct event_context *ev, struct timed_event *te, 
+                      struct timeval t, void *private_data)
 {
        struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
 
@@ -968,10 +1070,10 @@ static int revokechild_destructor(struct revokechild_handle *rc)
        return 0;
 }
 
-static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
-                    uint16_t flags, void *private_data)
+static void revokechild_handler(struct event_context *ev, struct fd_event *fde, 
+                            uint16_t flags, void *private_data)
 {
-       struct revokechild_handle *rc = talloc_get_type(private_data,
+       struct revokechild_handle *rc = talloc_get_type(private_data, 
                                                     struct revokechild_handle);
        int ret;
        char c;
@@ -1015,8 +1117,8 @@ static void update_record_cb(struct ctdb_client_control_state *state)
        revoke_state = state->async.private_data;
 
        state->async.fn = NULL;
-        ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
-        if ((ret != 0) || (res != 0)) {
+        ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
+        if ((ret != 0) || (res != 0)) {
                DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
                revoke_state->status = -1;
        }
@@ -1045,8 +1147,8 @@ static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *privat
 
 }
 
-static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
-                            struct timeval yt, void *private_data)
+static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te, 
+                             struct timeval yt, void *private_data)
 {
        struct ctdb_revoke_state *state = private_data;
 
@@ -1064,7 +1166,7 @@ static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db
        state->key     = key;
        state->header  = header;
        state->data    = data;
-
        ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
 
        event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);