return ctdb_db;
}
-
/*
a varient of input packet that can be used in lock requeue
*/
call->key.dsize = c->keylen;
call->call_data.dptr = c->data + c->keylen;
call->call_data.dsize = c->calldatalen;
+ call->reply_data.dptr = NULL;
+ call->reply_data.dsize = 0;
/* determine if we are the dmaster for this key. This also
fetches the record data (if any), thus avoiding a 2nd fetch of the data
return;
}
- /* if we are not the dmaster, then send a redirect to the
- requesting node */
- if (header.dmaster != ctdb->pnn) {
+ /* Dont do READONLY if we dont have a tracking database */
+ if ((c->flags & CTDB_WANT_READONLY) && ctdb_db->rottdb == NULL) {
+ c->flags &= ~CTDB_WANT_READONLY;
+ }
+
+ if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
+ header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
+ if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
+ }
+ }
+
+ /* if we are revoking, we must defer all other calls until the revoke
+ * had completed.
+ */
+ if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
+ talloc_free(data.dptr);
+ ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+
+ if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
+ ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+ }
+ talloc_free(call);
+ return;
+ }
+
+ /* if we are not the dmaster and are not hosting any delegations,
+ then send a redirect to the requesting node */
+ if ((header.dmaster != ctdb->pnn)
+ && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
talloc_free(data.dptr);
ctdb_call_send_redirect(ctdb, call->key, c, &header);
return;
}
+ if ( (!(c->flags & CTDB_WANT_READONLY))
+ && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
+ header.flags |= CTDB_REC_RO_REVOKING_READONLY;
+ if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+ }
+ ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+
+ if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to start record revoke");
+ }
+ talloc_free(data.dptr);
+
+ if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
+ ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
+ }
+ talloc_free(call);
+
+ return;
+ }
+
+ /* If this is the first request for delegation. bump rsn and set
+ * the delegations flag
+ */
+ if ((c->flags & CTDB_WANT_READONLY)
+ && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
+ && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
+ header.rsn += 3;
+ header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
+ if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
+ ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
+ }
+ }
+ if ((c->flags & CTDB_WANT_READONLY)
+ && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
+ TDB_DATA tdata;
+
+ tdata = tdb_fetch(ctdb_db->rottdb, call->key);
+ if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
+ ctdb_fatal(ctdb, "Failed to add node to trackingdb");
+ }
+ if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
+ ctdb_fatal(ctdb, "Failed to store trackingdb data");
+ }
+ free(tdata.dptr);
+
+ ret = ctdb_ltdb_unlock(ctdb_db, call->key);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+ }
+
+ len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
+ r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
+ struct ctdb_reply_call);
+ CTDB_NO_MEMORY_FATAL(ctdb, r);
+ r->hdr.destnode = c->hdr.srcnode;
+ r->hdr.reqid = c->hdr.reqid;
+ r->status = 0;
+ r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
+ header.rsn -= 2;
+ header.flags |= CTDB_REC_RO_HAVE_READONLY;
+ header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
+ memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
+
+ if (data.dsize) {
+ memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
+ }
+
+ ctdb_queue_packet(ctdb, &r->hdr);
+
+ talloc_free(r);
+ return;
+ }
+
CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
/* Try if possible to migrate the record off to the caller node.
void *ctx;
};
-static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
- struct timeval t, void *private_data)
+static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
+ struct timeval t, void *private_data)
{
struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
return 0;
}
-static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
- uint16_t flags, void *private_data)
+static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
+ uint16_t flags, void *private_data)
{
- struct revokechild_handle *rc = talloc_get_type(private_data,
+ struct revokechild_handle *rc = talloc_get_type(private_data,
struct revokechild_handle);
int ret;
char c;
revoke_state = state->async.private_data;
state->async.fn = NULL;
- ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
- if ((ret != 0) || (res != 0)) {
+ ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
+ if ((ret != 0) || (res != 0)) {
DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
revoke_state->status = -1;
}
}
-static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
- struct timeval yt, void *private_data)
+static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
+ struct timeval yt, void *private_data)
{
struct ctdb_revoke_state *state = private_data;
state->key = key;
state->header = header;
state->data = data;
-
+
ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);