2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../common/rb_tree.h"
31 struct ctdb_sticky_record {
32 struct ctdb_context *ctdb;
33 struct ctdb_db_context *ctdb_db;
38 find the ctdb_db from a db index
40 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
42 struct ctdb_db_context *ctdb_db;
44 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
45 if (ctdb_db->db_id == id) {
53 a varient of input packet that can be used in lock requeue
55 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
57 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
58 ctdb_input_pkt(ctdb, hdr);
65 static void ctdb_send_error(struct ctdb_context *ctdb,
66 struct ctdb_req_header *hdr, uint32_t status,
67 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
68 static void ctdb_send_error(struct ctdb_context *ctdb,
69 struct ctdb_req_header *hdr, uint32_t status,
73 struct ctdb_reply_error *r;
77 if (ctdb->methods == NULL) {
78 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
83 msg = talloc_vasprintf(ctdb, fmt, ap);
85 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
89 msglen = strlen(msg)+1;
90 len = offsetof(struct ctdb_reply_error, msg);
91 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
92 struct ctdb_reply_error);
93 CTDB_NO_MEMORY_FATAL(ctdb, r);
95 r->hdr.destnode = hdr->srcnode;
96 r->hdr.reqid = hdr->reqid;
99 memcpy(&r->msg[0], msg, msglen);
101 ctdb_queue_packet(ctdb, &r->hdr);
108 * send a redirect reply
110 * The logic behind this function is this:
112 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
113 * to its local ctdb (ctdb_request_call). If the node is not itself
114 * the record's DMASTER, it first redirects the packet to the
115 * record's LMASTER. The LMASTER then redirects the call packet to
116 * the current DMASTER. But there is a race: The record may have
117 * been migrated off the DMASTER while the redirected packet is
118 * on the wire (or in the local queue). So in case the record has
119 * migrated off the new destinaton of the call packet, instead of
120 * going back to the LMASTER to get the new DMASTER, we try to
121 * reduce round-trips by first chasing the record a couple of times
122 * before giving up the direct chase and finally going back to the
123 * LMASTER (again). Note that this works because of this: When
124 * a record is migrated off a node, then the new DMASTER is stored
125 * in the record's copy on the former DMASTER.
127 * The maximum number of attempts for direct chase to make before
128 * going back to the LMASTER is configurable by the tunable
129 * "MaxRedirectCount".
131 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
132 struct ctdb_db_context *ctdb_db,
134 struct ctdb_req_call *c,
135 struct ctdb_ltdb_header *header)
138 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
140 c->hdr.destnode = lmaster;
141 if (ctdb->pnn == lmaster) {
142 c->hdr.destnode = header->dmaster;
146 if (c->hopcount%100 == 99) {
147 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:0x%08x "
148 "key:0x%08x pnn:%d src:%d lmaster:%d "
149 "header->dmaster:%d dst:%d\n",
150 c->hopcount, ctdb_db->db_id, ctdb_hash(&key),
151 ctdb->pnn, c->hdr.srcnode, lmaster,
152 header->dmaster, c->hdr.destnode));
155 ctdb_queue_packet(ctdb, &c->hdr);
162 caller must have the chainlock before calling this routine. Caller must be
165 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
166 struct ctdb_ltdb_header *header,
167 TDB_DATA key, TDB_DATA data,
168 uint32_t new_dmaster,
171 struct ctdb_context *ctdb = ctdb_db->ctdb;
172 struct ctdb_reply_dmaster *r;
176 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
177 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
181 header->dmaster = new_dmaster;
182 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
184 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
188 if (ctdb->methods == NULL) {
189 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
193 /* put the packet on a temporary context, allowing us to safely free
194 it below even if ctdb_reply_dmaster() has freed it already */
195 tmp_ctx = talloc_new(ctdb);
197 /* send the CTDB_REPLY_DMASTER */
198 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
199 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
200 struct ctdb_reply_dmaster);
201 CTDB_NO_MEMORY_FATAL(ctdb, r);
203 r->hdr.destnode = new_dmaster;
204 r->hdr.reqid = reqid;
205 r->rsn = header->rsn;
206 r->keylen = key.dsize;
207 r->datalen = data.dsize;
208 r->db_id = ctdb_db->db_id;
209 memcpy(&r->data[0], key.dptr, key.dsize);
210 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
211 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
213 ctdb_queue_packet(ctdb, &r->hdr);
215 talloc_free(tmp_ctx);
219 send a dmaster request (give another node the dmaster for a record)
221 This is always sent to the lmaster, which ensures that the lmaster
222 always knows who the dmaster is. The lmaster will then send a
223 CTDB_REPLY_DMASTER to the new dmaster
225 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
226 struct ctdb_req_call *c,
227 struct ctdb_ltdb_header *header,
228 TDB_DATA *key, TDB_DATA *data)
230 struct ctdb_req_dmaster *r;
231 struct ctdb_context *ctdb = ctdb_db->ctdb;
233 uint32_t lmaster = ctdb_lmaster(ctdb, key);
235 if (ctdb->methods == NULL) {
236 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
240 if (data->dsize != 0) {
241 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
244 if (lmaster == ctdb->pnn) {
245 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
246 c->hdr.srcnode, c->hdr.reqid);
250 len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
252 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
253 struct ctdb_req_dmaster);
254 CTDB_NO_MEMORY_FATAL(ctdb, r);
255 r->hdr.destnode = lmaster;
256 r->hdr.reqid = c->hdr.reqid;
258 r->rsn = header->rsn;
259 r->dmaster = c->hdr.srcnode;
260 r->keylen = key->dsize;
261 r->datalen = data->dsize;
262 memcpy(&r->data[0], key->dptr, key->dsize);
263 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
264 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
266 header->dmaster = c->hdr.srcnode;
267 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
268 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
271 ctdb_queue_packet(ctdb, &r->hdr);
276 static void ctdb_sticky_pindown_timeout(struct event_context *ev, struct timed_event *te,
277 struct timeval t, void *private_data)
279 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
280 struct ctdb_sticky_record);
282 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
283 if (sr->pindown != NULL) {
284 talloc_free(sr->pindown);
290 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
292 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
294 struct ctdb_sticky_record *sr;
296 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
298 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
299 talloc_free(tmp_ctx);
303 k[0] = (key.dsize + 3) / 4 + 1;
304 memcpy(&k[1], key.dptr, key.dsize);
306 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
308 talloc_free(tmp_ctx);
312 talloc_free(tmp_ctx);
314 if (sr->pindown == NULL) {
315 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
316 sr->pindown = talloc_new(sr);
317 if (sr->pindown == NULL) {
318 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
321 event_add_timed(ctdb->ev, sr->pindown, timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000, (ctdb->tunable.sticky_pindown * 1000) % 1000000), ctdb_sticky_pindown_timeout, sr);
328 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
329 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
331 must be called with the chainlock held. This function releases the chainlock
333 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
334 struct ctdb_req_header *hdr,
335 TDB_DATA key, TDB_DATA data,
336 uint64_t rsn, uint32_t record_flags)
338 struct ctdb_call_state *state;
339 struct ctdb_context *ctdb = ctdb_db->ctdb;
340 struct ctdb_ltdb_header header;
343 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
347 header.dmaster = ctdb->pnn;
348 header.flags = record_flags;
350 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
353 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
355 * We temporarily add the VACUUM_MIGRATED flag to
356 * the record flags, so that ctdb_ltdb_store can
357 * decide whether the record should be stored or
360 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
364 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
365 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
367 ret = ctdb_ltdb_unlock(ctdb_db, key);
369 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
374 /* we just became DMASTER and this database is "sticky",
375 see if the record is flagged as "hot" and set up a pin-down
376 context to stop migrations for a little while if so
378 if (ctdb_db->sticky) {
379 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
383 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
384 ctdb->pnn, hdr->reqid, hdr->srcnode));
386 ret = ctdb_ltdb_unlock(ctdb_db, key);
388 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
393 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
394 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
396 ret = ctdb_ltdb_unlock(ctdb_db, key);
398 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
403 if (hdr->reqid != state->reqid) {
404 /* we found a record but it was the wrong one */
405 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
407 ret = ctdb_ltdb_unlock(ctdb_db, key);
409 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
414 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true, ctdb->pnn);
416 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
418 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
421 state->state = CTDB_CALL_DONE;
422 if (state->async.fn) {
423 state->async.fn(state);
430 called when a CTDB_REQ_DMASTER packet comes in
432 this comes into the lmaster for a record when the current dmaster
433 wants to give up the dmaster role and give it to someone else
435 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
437 struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
438 TDB_DATA key, data, data2;
439 struct ctdb_ltdb_header header;
440 struct ctdb_db_context *ctdb_db;
441 uint32_t record_flags = 0;
446 key.dsize = c->keylen;
447 data.dptr = c->data + c->keylen;
448 data.dsize = c->datalen;
449 len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
451 if (len <= c->hdr.length) {
452 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
455 ctdb_db = find_ctdb_db(ctdb, c->db_id);
457 ctdb_send_error(ctdb, hdr, -1,
458 "Unknown database in request. db_id==0x%08x",
463 /* fetch the current record */
464 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
465 ctdb_call_input_pkt, ctdb, false);
467 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
471 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
475 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
476 DEBUG(DEBUG_ALERT,("pnn %u dmaster request to non-lmaster lmaster=%u gen=%u curgen=%u\n",
477 ctdb->pnn, ctdb_lmaster(ctdb, &key),
478 hdr->generation, ctdb->vnn_map->generation));
479 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
482 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
483 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
485 /* its a protocol error if the sending node is not the current dmaster */
486 if (header.dmaster != hdr->srcnode) {
487 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
488 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
489 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
490 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
491 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
492 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
493 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
495 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
496 ctdb_ltdb_unlock(ctdb_db, key);
501 if (header.rsn > c->rsn) {
502 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
503 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
504 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
505 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
508 /* use the rsn from the sending node */
511 /* store the record flags from the sending node */
512 header.flags = record_flags;
514 /* check if the new dmaster is the lmaster, in which case we
515 skip the dmaster reply */
516 if (c->dmaster == ctdb->pnn) {
517 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
519 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
521 ret = ctdb_ltdb_unlock(ctdb_db, key);
523 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
528 static void ctdb_sticky_record_timeout(struct event_context *ev, struct timed_event *te,
529 struct timeval t, void *private_data)
531 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
532 struct ctdb_sticky_record);
536 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
539 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
546 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
548 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
550 struct ctdb_sticky_record *sr;
552 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
554 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
555 talloc_free(tmp_ctx);
559 k[0] = (key.dsize + 3) / 4 + 1;
560 memcpy(&k[1], key.dptr, key.dsize);
562 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
564 talloc_free(tmp_ctx);
568 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
570 talloc_free(tmp_ctx);
571 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
576 sr->ctdb_db = ctdb_db;
579 DEBUG(DEBUG_ERR,("Make record sticky in db %s\n", ctdb_db->db_name));
581 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
583 event_add_timed(ctdb->ev, sr, timeval_current_ofs(ctdb->tunable.sticky_duration, 0), ctdb_sticky_record_timeout, sr);
585 talloc_free(tmp_ctx);
589 struct pinned_down_requeue_handle {
590 struct ctdb_context *ctdb;
591 struct ctdb_req_header *hdr;
594 struct pinned_down_deferred_call {
595 struct ctdb_context *ctdb;
596 struct ctdb_req_header *hdr;
599 static void pinned_down_requeue(struct event_context *ev, struct timed_event *te,
600 struct timeval t, void *private_data)
602 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
603 struct ctdb_context *ctdb = handle->ctdb;
605 talloc_steal(ctdb, handle->hdr);
606 ctdb_call_input_pkt(ctdb, handle->hdr);
611 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
613 struct ctdb_context *ctdb = pinned_down->ctdb;
614 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
616 handle->ctdb = pinned_down->ctdb;
617 handle->hdr = pinned_down->hdr;
618 talloc_steal(handle, handle->hdr);
620 event_add_timed(ctdb->ev, handle, timeval_zero(), pinned_down_requeue, handle);
626 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
628 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
630 struct ctdb_sticky_record *sr;
631 struct pinned_down_deferred_call *pinned_down;
633 k = talloc_zero_size(tmp_ctx, ((key.dsize + 3) & 0xfffffffc) + 4);
635 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
636 talloc_free(tmp_ctx);
640 k[0] = (key.dsize + 3) / 4 + 1;
641 memcpy(&k[1], key.dptr, key.dsize);
643 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
645 talloc_free(tmp_ctx);
649 talloc_free(tmp_ctx);
651 if (sr->pindown == NULL) {
655 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
656 if (pinned_down == NULL) {
657 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
661 pinned_down->ctdb = ctdb;
662 pinned_down->hdr = hdr;
664 talloc_set_destructor(pinned_down, pinned_down_destructor);
665 talloc_steal(pinned_down, hdr);
671 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key, int hopcount)
675 /* smallest value is always at index 0 */
676 if (hopcount <= ctdb_db->statistics.hot_keys[0].count) {
680 /* see if we already know this key */
681 for (i = 0; i < MAX_HOT_KEYS; i++) {
682 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
685 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
688 /* found an entry for this key */
689 if (hopcount <= ctdb_db->statistics.hot_keys[i].count) {
692 ctdb_db->statistics.hot_keys[i].count = hopcount;
696 if (ctdb_db->statistics.hot_keys[0].key.dptr != NULL) {
697 talloc_free(ctdb_db->statistics.hot_keys[0].key.dptr);
699 ctdb_db->statistics.hot_keys[0].key.dsize = key.dsize;
700 ctdb_db->statistics.hot_keys[0].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
701 ctdb_db->statistics.hot_keys[0].count = hopcount;
705 for (i = 2; i < MAX_HOT_KEYS; i++) {
706 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
707 hopcount = ctdb_db->statistics.hot_keys[i].count;
708 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
709 ctdb_db->statistics.hot_keys[0].count = hopcount;
711 key = ctdb_db->statistics.hot_keys[i].key;
712 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
713 ctdb_db->statistics.hot_keys[0].key = key;
719 called when a CTDB_REQ_CALL packet comes in
721 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
723 struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
725 struct ctdb_reply_call *r;
727 struct ctdb_ltdb_header header;
728 struct ctdb_call *call;
729 struct ctdb_db_context *ctdb_db;
730 int tmp_count, bucket;
732 if (ctdb->methods == NULL) {
733 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
738 ctdb_db = find_ctdb_db(ctdb, c->db_id);
740 ctdb_send_error(ctdb, hdr, -1,
741 "Unknown database in request. db_id==0x%08x",
746 call = talloc(hdr, struct ctdb_call);
747 CTDB_NO_MEMORY_FATAL(ctdb, call);
749 call->call_id = c->callid;
750 call->key.dptr = c->data;
751 call->key.dsize = c->keylen;
752 call->call_data.dptr = c->data + c->keylen;
753 call->call_data.dsize = c->calldatalen;
754 call->reply_data.dptr = NULL;
755 call->reply_data.dsize = 0;
758 /* If this record is pinned down we should defer the
759 request until the pindown times out
761 if (ctdb_db->sticky) {
762 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
763 DEBUG(DEBUG_WARNING,("Defer request for pinned down record in %s\n", ctdb_db->db_name));
769 /* determine if we are the dmaster for this key. This also
770 fetches the record data (if any), thus avoiding a 2nd fetch of the data
771 if the call will be answered locally */
773 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
774 ctdb_call_input_pkt, ctdb, false);
776 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
780 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
784 /* Dont do READONLY if we dont have a tracking database */
785 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
786 c->flags &= ~CTDB_WANT_READONLY;
789 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
790 header.flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
791 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
792 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
793 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
794 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
796 /* and clear out the tracking data */
797 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
798 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
802 /* if we are revoking, we must defer all other calls until the revoke
805 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
806 talloc_free(data.dptr);
807 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
809 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
810 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
816 /* if we are not the dmaster and are not hosting any delegations,
817 then send a redirect to the requesting node */
818 if ((header.dmaster != ctdb->pnn)
819 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
820 talloc_free(data.dptr);
821 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
823 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
825 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
830 if ( (!(c->flags & CTDB_WANT_READONLY))
831 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
832 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
833 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
834 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
836 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
838 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
839 ctdb_fatal(ctdb, "Failed to start record revoke");
841 talloc_free(data.dptr);
843 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
844 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
851 /* If this is the first request for delegation. bump rsn and set
852 * the delegations flag
854 if ((c->flags & CTDB_WANT_READONLY)
855 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
856 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
858 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
859 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
860 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
863 if ((c->flags & CTDB_WANT_READONLY)
864 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
867 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
868 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
869 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
871 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
872 ctdb_fatal(ctdb, "Failed to store trackingdb data");
876 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
878 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
881 len = offsetof(struct ctdb_reply_call, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
882 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
883 struct ctdb_reply_call);
884 CTDB_NO_MEMORY_FATAL(ctdb, r);
885 r->hdr.destnode = c->hdr.srcnode;
886 r->hdr.reqid = c->hdr.reqid;
888 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
890 header.flags |= CTDB_REC_RO_HAVE_READONLY;
891 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
892 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
895 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
898 ctdb_queue_packet(ctdb, &r->hdr);
899 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
900 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
906 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
907 tmp_count = c->hopcount;
913 if (bucket >= MAX_COUNT_BUCKETS) {
914 bucket = MAX_COUNT_BUCKETS - 1;
916 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
917 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
918 ctdb_update_db_stat_hot_keys(ctdb_db, call->key, c->hopcount);
920 /* If this database supports sticky records, then check if the
921 hopcount is big. If it is it means the record is hot and we
922 should make it sticky.
924 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
925 DEBUG(DEBUG_ERR, ("Hot record in database %s. Hopcount is %d. Make record sticky for %d seconds\n", ctdb_db->db_name, c->hopcount, ctdb->tunable.sticky_duration));
926 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
930 /* if this nodes has done enough consecutive calls on the same record
931 then give them the record
932 or if the node requested an immediate migration
934 if ( c->hdr.srcnode != ctdb->pnn &&
935 ((header.laccessor == c->hdr.srcnode
936 && header.lacount >= ctdb->tunable.max_lacount
937 && ctdb->tunable.max_lacount != 0)
938 || (c->flags & CTDB_IMMEDIATE_MIGRATION)) ) {
939 if (ctdb_db->transaction_active) {
940 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
941 " of key %s while transaction is active\n",
942 (char *)call->key.dptr));
944 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
945 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
946 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
947 talloc_free(data.dptr);
949 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
951 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
957 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true, c->hdr.srcnode);
959 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
963 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
965 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
968 len = offsetof(struct ctdb_reply_call, data) + call->reply_data.dsize;
969 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
970 struct ctdb_reply_call);
971 CTDB_NO_MEMORY_FATAL(ctdb, r);
972 r->hdr.destnode = hdr->srcnode;
973 r->hdr.reqid = hdr->reqid;
974 r->status = call->status;
975 r->datalen = call->reply_data.dsize;
976 if (call->reply_data.dsize) {
977 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
980 ctdb_queue_packet(ctdb, &r->hdr);
986 called when a CTDB_REPLY_CALL packet comes in
988 This packet comes in response to a CTDB_REQ_CALL request packet. It
989 contains any reply data from the call
991 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
993 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
994 struct ctdb_call_state *state;
996 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
998 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1002 if (hdr->reqid != state->reqid) {
1003 /* we found a record but it was the wrong one */
1004 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1009 /* read only delegation processing */
1010 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1011 * delegation since we may need to update the record header
1013 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1014 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1015 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1016 struct ctdb_ltdb_header oldheader;
1017 TDB_DATA key, data, olddata;
1020 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1025 key.dsize = state->c->keylen;
1026 key.dptr = state->c->data;
1027 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1028 ctdb_call_input_pkt, ctdb, false);
1033 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1037 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1039 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1040 ctdb_ltdb_unlock(ctdb_db, key);
1044 if (header->rsn <= oldheader.rsn) {
1045 ctdb_ltdb_unlock(ctdb_db, key);
1049 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1050 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1051 ctdb_ltdb_unlock(ctdb_db, key);
1055 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1056 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1057 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1059 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1060 ctdb_ltdb_unlock(ctdb_db, key);
1064 ctdb_ltdb_unlock(ctdb_db, key);
1068 state->call->reply_data.dptr = c->data;
1069 state->call->reply_data.dsize = c->datalen;
1070 state->call->status = c->status;
1072 talloc_steal(state, c);
1074 state->state = CTDB_CALL_DONE;
1075 if (state->async.fn) {
1076 state->async.fn(state);
1082 called when a CTDB_REPLY_DMASTER packet comes in
1084 This packet comes in from the lmaster response to a CTDB_REQ_CALL
1085 request packet. It means that the current dmaster wants to give us
1088 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1090 struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
1091 struct ctdb_db_context *ctdb_db;
1093 uint32_t record_flags = 0;
1097 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1098 if (ctdb_db == NULL) {
1099 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1104 key.dsize = c->keylen;
1105 data.dptr = &c->data[key.dsize];
1106 data.dsize = c->datalen;
1107 len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
1109 if (len <= c->hdr.length) {
1110 record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
1113 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1114 ctdb_call_input_pkt, ctdb, false);
1119 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1123 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1128 called when a CTDB_REPLY_ERROR packet comes in
1130 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1132 struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
1133 struct ctdb_call_state *state;
1135 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_call_state);
1136 if (state == NULL) {
1137 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1138 ctdb->pnn, hdr->reqid));
1142 if (hdr->reqid != state->reqid) {
1143 /* we found a record but it was the wrong one */
1144 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1148 talloc_steal(state, c);
1150 state->state = CTDB_CALL_ERROR;
1151 state->errmsg = (char *)c->msg;
1152 if (state->async.fn) {
1153 state->async.fn(state);
1161 static int ctdb_call_destructor(struct ctdb_call_state *state)
1163 DLIST_REMOVE(state->ctdb_db->ctdb->pending_calls, state);
1164 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
1170 called when a ctdb_call needs to be resent after a reconfigure event
1172 static void ctdb_call_resend(struct ctdb_call_state *state)
1174 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1176 state->generation = ctdb->vnn_map->generation;
1178 /* use a new reqid, in case the old reply does eventually come in */
1179 ctdb_reqid_remove(ctdb, state->reqid);
1180 state->reqid = ctdb_reqid_new(ctdb, state);
1181 state->c->hdr.reqid = state->reqid;
1183 /* update the generation count for this request, so its valid with the new vnn_map */
1184 state->c->hdr.generation = state->generation;
1186 /* send the packet to ourselves, it will be redirected appropriately */
1187 state->c->hdr.destnode = ctdb->pnn;
1189 ctdb_queue_packet(ctdb, &state->c->hdr);
1190 DEBUG(DEBUG_NOTICE,("resent ctdb_call\n"));
1194 resend all pending calls on recovery
1196 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1198 struct ctdb_call_state *state, *next;
1199 for (state=ctdb->pending_calls;state;state=next) {
1201 ctdb_call_resend(state);
1206 this allows the caller to setup a async.fn
1208 static void call_local_trigger(struct event_context *ev, struct timed_event *te,
1209 struct timeval t, void *private_data)
1211 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1212 if (state->async.fn) {
1213 state->async.fn(state);
1219 construct an event driven local ctdb_call
1221 this is used so that locally processed ctdb_call requests are processed
1222 in an event driven manner
1224 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1225 struct ctdb_call *call,
1226 struct ctdb_ltdb_header *header,
1229 struct ctdb_call_state *state;
1230 struct ctdb_context *ctdb = ctdb_db->ctdb;
1233 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1234 CTDB_NO_MEMORY_NULL(ctdb, state);
1236 talloc_steal(state, data->dptr);
1238 state->state = CTDB_CALL_DONE;
1239 state->call = talloc(state, struct ctdb_call);
1240 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1241 *(state->call) = *call;
1242 state->ctdb_db = ctdb_db;
1244 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
1246 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1249 event_add_timed(ctdb->ev, state, timeval_zero(), call_local_trigger, state);
1256 make a remote ctdb call - async send. Called in daemon context.
1258 This constructs a ctdb_call request and queues it for processing.
1259 This call never blocks.
1261 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1262 struct ctdb_call *call,
1263 struct ctdb_ltdb_header *header)
1266 struct ctdb_call_state *state;
1267 struct ctdb_context *ctdb = ctdb_db->ctdb;
1269 if (ctdb->methods == NULL) {
1270 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1274 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1275 CTDB_NO_MEMORY_NULL(ctdb, state);
1276 state->call = talloc(state, struct ctdb_call);
1277 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1279 state->reqid = ctdb_reqid_new(ctdb, state);
1280 state->ctdb_db = ctdb_db;
1281 talloc_set_destructor(state, ctdb_call_destructor);
1283 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
1284 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1285 struct ctdb_req_call);
1286 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1287 state->c->hdr.destnode = header->dmaster;
1289 /* this limits us to 16k outstanding messages - not unreasonable */
1290 state->c->hdr.reqid = state->reqid;
1291 state->c->flags = call->flags;
1292 state->c->db_id = ctdb_db->db_id;
1293 state->c->callid = call->call_id;
1294 state->c->hopcount = 0;
1295 state->c->keylen = call->key.dsize;
1296 state->c->calldatalen = call->call_data.dsize;
1297 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1298 memcpy(&state->c->data[call->key.dsize],
1299 call->call_data.dptr, call->call_data.dsize);
1300 *(state->call) = *call;
1301 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1302 state->call->key.dptr = &state->c->data[0];
1304 state->state = CTDB_CALL_WAIT;
1305 state->generation = ctdb->vnn_map->generation;
1307 DLIST_ADD(ctdb->pending_calls, state);
1309 ctdb_queue_packet(ctdb, &state->c->hdr);
1315 make a remote ctdb call - async recv - called in daemon context
1317 This is called when the program wants to wait for a ctdb_call to complete and get the
1318 results. This call will block unless the call has already completed.
1320 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1322 while (state->state < CTDB_CALL_DONE) {
1323 event_loop_once(state->ctdb_db->ctdb->ev);
1325 if (state->state != CTDB_CALL_DONE) {
1326 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1331 if (state->call->reply_data.dsize) {
1332 call->reply_data.dptr = talloc_memdup(call,
1333 state->call->reply_data.dptr,
1334 state->call->reply_data.dsize);
1335 call->reply_data.dsize = state->call->reply_data.dsize;
1337 call->reply_data.dptr = NULL;
1338 call->reply_data.dsize = 0;
1340 call->status = state->call->status;
1347 send a keepalive packet to the other node
1349 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1351 struct ctdb_req_keepalive *r;
1353 if (ctdb->methods == NULL) {
1354 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1358 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1359 sizeof(struct ctdb_req_keepalive),
1360 struct ctdb_req_keepalive);
1361 CTDB_NO_MEMORY_FATAL(ctdb, r);
1362 r->hdr.destnode = destnode;
1365 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1367 ctdb_queue_packet(ctdb, &r->hdr);
1374 struct revokechild_deferred_call {
1375 struct ctdb_context *ctdb;
1376 struct ctdb_req_header *hdr;
1377 deferred_requeue_fn fn;
1381 struct revokechild_handle {
1382 struct revokechild_handle *next, *prev;
1383 struct ctdb_context *ctdb;
1384 struct ctdb_db_context *ctdb_db;
1385 struct fd_event *fde;
1392 struct revokechild_requeue_handle {
1393 struct ctdb_context *ctdb;
1394 struct ctdb_req_header *hdr;
1395 deferred_requeue_fn fn;
1399 static void deferred_call_requeue(struct event_context *ev, struct timed_event *te,
1400 struct timeval t, void *private_data)
1402 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1404 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1405 talloc_free(requeue_handle);
1408 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1410 struct ctdb_context *ctdb = deferred_call->ctdb;
1411 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1412 struct ctdb_req_call *c = (struct ctdb_req_call *)deferred_call->hdr;
1414 requeue_handle->ctdb = ctdb;
1415 requeue_handle->hdr = deferred_call->hdr;
1416 requeue_handle->fn = deferred_call->fn;
1417 requeue_handle->ctx = deferred_call->ctx;
1418 talloc_steal(requeue_handle, requeue_handle->hdr);
1420 /* when revoking, any READONLY requests have 1 second grace to let read/write finish first */
1421 event_add_timed(ctdb->ev, requeue_handle, timeval_current_ofs(c->flags & CTDB_WANT_READONLY ? 1 : 0, 0), deferred_call_requeue, requeue_handle);
1427 static int revokechild_destructor(struct revokechild_handle *rc)
1429 if (rc->fde != NULL) {
1430 talloc_free(rc->fde);
1433 if (rc->fd[0] != -1) {
1436 if (rc->fd[1] != -1) {
1439 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1441 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1445 static void revokechild_handler(struct event_context *ev, struct fd_event *fde,
1446 uint16_t flags, void *private_data)
1448 struct revokechild_handle *rc = talloc_get_type(private_data,
1449 struct revokechild_handle);
1453 ret = read(rc->fd[0], &c, 1);
1455 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1461 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1470 struct ctdb_revoke_state {
1471 struct ctdb_db_context *ctdb_db;
1473 struct ctdb_ltdb_header *header;
1480 static void update_record_cb(struct ctdb_client_control_state *state)
1482 struct ctdb_revoke_state *revoke_state;
1486 if (state == NULL) {
1489 revoke_state = state->async.private_data;
1491 state->async.fn = NULL;
1492 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1493 if ((ret != 0) || (res != 0)) {
1494 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1495 revoke_state->status = -1;
1498 revoke_state->count--;
1499 if (revoke_state->count <= 0) {
1500 revoke_state->finished = 1;
1504 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1506 struct ctdb_revoke_state *revoke_state = private_data;
1507 struct ctdb_client_control_state *state;
1509 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(5,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1510 if (state == NULL) {
1511 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1512 revoke_state->status = -1;
1515 state->async.fn = update_record_cb;
1516 state->async.private_data = revoke_state;
1518 revoke_state->count++;
1522 static void ctdb_revoke_timeout_handler(struct event_context *ev, struct timed_event *te,
1523 struct timeval yt, void *private_data)
1525 struct ctdb_revoke_state *state = private_data;
1527 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1528 state->finished = 1;
1532 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1534 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1537 state->ctdb_db = ctdb_db;
1539 state->header = header;
1542 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1544 event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0), ctdb_revoke_timeout_handler, state);
1546 while (state->finished == 0) {
1547 event_loop_once(ctdb->ev);
1550 status = state->status;
1553 struct ctdb_ltdb_header new_header;
1556 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1557 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1561 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1562 ctdb_ltdb_unlock(ctdb_db, key);
1563 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1568 if (new_header.rsn > header->rsn) {
1569 ctdb_ltdb_unlock(ctdb_db, key);
1570 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1574 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1575 ctdb_ltdb_unlock(ctdb_db, key);
1576 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1581 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1582 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1583 ctdb_ltdb_unlock(ctdb_db, key);
1584 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1588 ctdb_ltdb_unlock(ctdb_db, key);
1596 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1599 struct revokechild_handle *rc;
1600 pid_t parent = getpid();
1603 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1604 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1607 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1608 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1612 tdata = tdb_fetch(ctdb_db->rottdb, key);
1613 if (tdata.dsize > 0) {
1617 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1623 rc->ctdb_db = ctdb_db;
1627 talloc_set_destructor(rc, revokechild_destructor);
1629 rc->key.dsize = key.dsize;
1630 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1631 if (rc->key.dptr == NULL) {
1632 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1639 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1645 rc->child = ctdb_fork(ctdb);
1646 if (rc->child == (pid_t)-1) {
1647 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1652 if (rc->child == 0) {
1655 debug_extra = talloc_asprintf(NULL, "revokechild-%s:", ctdb_db->db_name);
1657 if (switch_from_server_to_client(ctdb, "revokechild-%s", ctdb_db->db_name) != 0) {
1658 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1660 goto child_finished;
1663 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1666 write(rc->fd[1], &c, 1);
1667 /* make sure we die when our parent dies */
1668 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
1676 set_close_on_exec(rc->fd[0]);
1678 /* This is an active revokechild child process */
1679 DLIST_ADD_END(ctdb_db->revokechild_active, rc, NULL);
1681 rc->fde = event_add_fd(ctdb->ev, rc, rc->fd[0],
1682 EVENT_FD_READ, revokechild_handler,
1684 if (rc->fde == NULL) {
1685 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1688 tevent_fd_set_auto_close(rc->fde);
1693 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1695 struct revokechild_handle *rc;
1696 struct revokechild_deferred_call *deferred_call;
1698 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1699 if (rc->key.dsize == 0) {
1702 if (rc->key.dsize != key.dsize) {
1705 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1711 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1715 deferred_call = talloc(rc, struct revokechild_deferred_call);
1716 if (deferred_call == NULL) {
1717 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1721 deferred_call->ctdb = ctdb;
1722 deferred_call->hdr = hdr;
1723 deferred_call->fn = fn;
1724 deferred_call->ctx = call_context;
1726 talloc_set_destructor(deferred_call, deferred_call_destructor);
1727 talloc_steal(deferred_call, hdr);