2 ctdb_call protocol code
4 Copyright (C) Andrew Tridgell 2006
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
20 see http://wiki.samba.org/index.php/Samba_%26_Clustering for
21 protocol design and packet details
24 #include "system/network.h"
25 #include "system/filesys.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "lib/util/samba_util.h"
33 #include "lib/util/sys_rw.h"
34 #include "lib/util/util_process.h"
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
39 #include "common/rb_tree.h"
40 #include "common/reqid.h"
41 #include "common/system.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44 #include "common/hash_count.h"
46 struct ctdb_sticky_record {
47 struct ctdb_context *ctdb;
48 struct ctdb_db_context *ctdb_db;
53 find the ctdb_db from a db index
55 struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id)
57 struct ctdb_db_context *ctdb_db;
59 for (ctdb_db=ctdb->db_list; ctdb_db; ctdb_db=ctdb_db->next) {
60 if (ctdb_db->db_id == id) {
68 a varient of input packet that can be used in lock requeue
70 static void ctdb_call_input_pkt(void *p, struct ctdb_req_header *hdr)
72 struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
73 ctdb_input_pkt(ctdb, hdr);
80 static void ctdb_send_error(struct ctdb_context *ctdb,
81 struct ctdb_req_header *hdr, uint32_t status,
82 const char *fmt, ...) PRINTF_ATTRIBUTE(4,5);
83 static void ctdb_send_error(struct ctdb_context *ctdb,
84 struct ctdb_req_header *hdr, uint32_t status,
88 struct ctdb_reply_error_old *r;
92 if (ctdb->methods == NULL) {
93 DEBUG(DEBUG_INFO,(__location__ " Failed to send error. Transport is DOWN\n"));
98 msg = talloc_vasprintf(ctdb, fmt, ap);
100 ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
104 msglen = strlen(msg)+1;
105 len = offsetof(struct ctdb_reply_error_old, msg);
106 r = ctdb_transport_allocate(ctdb, msg, CTDB_REPLY_ERROR, len + msglen,
107 struct ctdb_reply_error_old);
108 CTDB_NO_MEMORY_FATAL(ctdb, r);
110 r->hdr.destnode = hdr->srcnode;
111 r->hdr.reqid = hdr->reqid;
114 memcpy(&r->msg[0], msg, msglen);
116 ctdb_queue_packet(ctdb, &r->hdr);
123 * send a redirect reply
125 * The logic behind this function is this:
127 * A client wants to grab a record and sends a CTDB_REQ_CALL packet
128 * to its local ctdb (ctdb_request_call). If the node is not itself
129 * the record's DMASTER, it first redirects the packet to the
130 * record's LMASTER. The LMASTER then redirects the call packet to
131 * the current DMASTER. Note that this works because of this: When
132 * a record is migrated off a node, then the new DMASTER is stored
133 * in the record's copy on the former DMASTER.
135 static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
136 struct ctdb_db_context *ctdb_db,
138 struct ctdb_req_call_old *c,
139 struct ctdb_ltdb_header *header)
141 uint32_t lmaster = ctdb_lmaster(ctdb, &key);
143 c->hdr.destnode = lmaster;
144 if (ctdb->pnn == lmaster) {
145 c->hdr.destnode = header->dmaster;
149 if (c->hopcount%100 > 95) {
150 DEBUG(DEBUG_WARNING,("High hopcount %d dbid:%s "
151 "key:0x%08x reqid=%08x pnn:%d src:%d lmaster:%d "
152 "header->dmaster:%d dst:%d\n",
153 c->hopcount, ctdb_db->db_name, ctdb_hash(&key),
154 c->hdr.reqid, ctdb->pnn, c->hdr.srcnode, lmaster,
155 header->dmaster, c->hdr.destnode));
158 ctdb_queue_packet(ctdb, &c->hdr);
165 caller must have the chainlock before calling this routine. Caller must be
168 static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
169 struct ctdb_ltdb_header *header,
170 TDB_DATA key, TDB_DATA data,
171 uint32_t new_dmaster,
174 struct ctdb_context *ctdb = ctdb_db->ctdb;
175 struct ctdb_reply_dmaster_old *r;
179 if (ctdb->pnn != ctdb_lmaster(ctdb, &key)) {
180 DEBUG(DEBUG_ALERT,(__location__ " Caller is not lmaster!\n"));
184 header->dmaster = new_dmaster;
185 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
187 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply unable to update dmaster");
191 if (ctdb->methods == NULL) {
192 ctdb_fatal(ctdb, "ctdb_send_dmaster_reply cant update dmaster since transport is down");
196 /* put the packet on a temporary context, allowing us to safely free
197 it below even if ctdb_reply_dmaster() has freed it already */
198 tmp_ctx = talloc_new(ctdb);
200 /* send the CTDB_REPLY_DMASTER */
201 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize + sizeof(uint32_t);
202 r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
203 struct ctdb_reply_dmaster_old);
204 CTDB_NO_MEMORY_FATAL(ctdb, r);
206 r->hdr.destnode = new_dmaster;
207 r->hdr.reqid = reqid;
208 r->hdr.generation = ctdb_db->generation;
209 r->rsn = header->rsn;
210 r->keylen = key.dsize;
211 r->datalen = data.dsize;
212 r->db_id = ctdb_db->db_id;
213 memcpy(&r->data[0], key.dptr, key.dsize);
214 memcpy(&r->data[key.dsize], data.dptr, data.dsize);
215 memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
217 ctdb_queue_packet(ctdb, &r->hdr);
219 talloc_free(tmp_ctx);
223 send a dmaster request (give another node the dmaster for a record)
225 This is always sent to the lmaster, which ensures that the lmaster
226 always knows who the dmaster is. The lmaster will then send a
227 CTDB_REPLY_DMASTER to the new dmaster
229 static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
230 struct ctdb_req_call_old *c,
231 struct ctdb_ltdb_header *header,
232 TDB_DATA *key, TDB_DATA *data)
234 struct ctdb_req_dmaster_old *r;
235 struct ctdb_context *ctdb = ctdb_db->ctdb;
237 uint32_t lmaster = ctdb_lmaster(ctdb, key);
239 if (ctdb->methods == NULL) {
240 ctdb_fatal(ctdb, "Failed ctdb_call_send_dmaster since transport is down");
244 if (data->dsize != 0) {
245 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
248 if (lmaster == ctdb->pnn) {
249 ctdb_send_dmaster_reply(ctdb_db, header, *key, *data,
250 c->hdr.srcnode, c->hdr.reqid);
254 len = offsetof(struct ctdb_req_dmaster_old, data) + key->dsize + data->dsize
256 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
257 struct ctdb_req_dmaster_old);
258 CTDB_NO_MEMORY_FATAL(ctdb, r);
259 r->hdr.destnode = lmaster;
260 r->hdr.reqid = c->hdr.reqid;
261 r->hdr.generation = ctdb_db->generation;
263 r->rsn = header->rsn;
264 r->dmaster = c->hdr.srcnode;
265 r->keylen = key->dsize;
266 r->datalen = data->dsize;
267 memcpy(&r->data[0], key->dptr, key->dsize);
268 memcpy(&r->data[key->dsize], data->dptr, data->dsize);
269 memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
271 header->dmaster = c->hdr.srcnode;
272 if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
273 ctdb_fatal(ctdb, "Failed to store record in ctdb_call_send_dmaster");
276 ctdb_queue_packet(ctdb, &r->hdr);
281 static void ctdb_sticky_pindown_timeout(struct tevent_context *ev,
282 struct tevent_timer *te,
283 struct timeval t, void *private_data)
285 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
286 struct ctdb_sticky_record);
288 DEBUG(DEBUG_ERR,("Pindown timeout db:%s unstick record\n", sr->ctdb_db->db_name));
289 if (sr->pindown != NULL) {
290 talloc_free(sr->pindown);
296 ctdb_set_sticky_pindown(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
298 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
300 struct ctdb_sticky_record *sr;
302 k = ctdb_key_to_idkey(tmp_ctx, key);
304 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
305 talloc_free(tmp_ctx);
309 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
311 talloc_free(tmp_ctx);
315 talloc_free(tmp_ctx);
317 if (sr->pindown == NULL) {
318 DEBUG(DEBUG_ERR,("Pinning down record in %s for %d ms\n", ctdb_db->db_name, ctdb->tunable.sticky_pindown));
319 sr->pindown = talloc_new(sr);
320 if (sr->pindown == NULL) {
321 DEBUG(DEBUG_ERR,("Failed to allocate pindown context for sticky record\n"));
324 tevent_add_timer(ctdb->ev, sr->pindown,
325 timeval_current_ofs(ctdb->tunable.sticky_pindown / 1000,
326 (ctdb->tunable.sticky_pindown * 1000) % 1000000),
327 ctdb_sticky_pindown_timeout, sr);
334 called when a CTDB_REPLY_DMASTER packet comes in, or when the lmaster
335 gets a CTDB_REQUEST_DMASTER for itself. We become the dmaster.
337 must be called with the chainlock held. This function releases the chainlock
339 static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
340 struct ctdb_req_header *hdr,
341 TDB_DATA key, TDB_DATA data,
342 uint64_t rsn, uint32_t record_flags)
344 struct ctdb_call_state *state;
345 struct ctdb_context *ctdb = ctdb_db->ctdb;
346 struct ctdb_ltdb_header header;
349 DEBUG(DEBUG_DEBUG,("pnn %u dmaster response %08x\n", ctdb->pnn, ctdb_hash(&key)));
353 header.dmaster = ctdb->pnn;
354 header.flags = record_flags;
356 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
359 if (state->call->flags & CTDB_CALL_FLAG_VACUUM_MIGRATION) {
361 * We temporarily add the VACUUM_MIGRATED flag to
362 * the record flags, so that ctdb_ltdb_store can
363 * decide whether the record should be stored or
366 header.flags |= CTDB_REC_FLAG_VACUUM_MIGRATED;
370 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
371 ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
373 ret = ctdb_ltdb_unlock(ctdb_db, key);
375 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
380 /* we just became DMASTER and this database is "sticky",
381 see if the record is flagged as "hot" and set up a pin-down
382 context to stop migrations for a little while if so
384 if (ctdb_db->sticky) {
385 ctdb_set_sticky_pindown(ctdb, ctdb_db, key);
389 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_become_dmaster from node %u\n",
390 ctdb->pnn, hdr->reqid, hdr->srcnode));
392 ret = ctdb_ltdb_unlock(ctdb_db, key);
394 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
399 if (key.dsize != state->call->key.dsize || memcmp(key.dptr, state->call->key.dptr, key.dsize)) {
400 DEBUG(DEBUG_ERR, ("Got bogus DMASTER packet reqid:%u from node %u. Key does not match key held in matching idr.\n", hdr->reqid, hdr->srcnode));
402 ret = ctdb_ltdb_unlock(ctdb_db, key);
404 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
409 if (hdr->reqid != state->reqid) {
410 /* we found a record but it was the wrong one */
411 DEBUG(DEBUG_ERR, ("Dropped orphan in ctdb_become_dmaster with reqid:%u\n from node %u", hdr->reqid, hdr->srcnode));
413 ret = ctdb_ltdb_unlock(ctdb_db, key);
415 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
420 (void) hash_count_increment(ctdb_db->migratedb, key);
422 ctdb_call_local(ctdb_db, state->call, &header, state, &data, true);
424 ret = ctdb_ltdb_unlock(ctdb_db, state->call->key);
426 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
429 state->state = CTDB_CALL_DONE;
430 if (state->async.fn) {
431 state->async.fn(state);
435 struct dmaster_defer_call {
436 struct dmaster_defer_call *next, *prev;
437 struct ctdb_context *ctdb;
438 struct ctdb_req_header *hdr;
441 struct dmaster_defer_queue {
442 struct ctdb_db_context *ctdb_db;
444 struct dmaster_defer_call *deferred_calls;
447 static void dmaster_defer_reprocess(struct tevent_context *ev,
448 struct tevent_timer *te,
452 struct dmaster_defer_call *call = talloc_get_type(
453 private_data, struct dmaster_defer_call);
455 ctdb_input_pkt(call->ctdb, call->hdr);
459 static int dmaster_defer_queue_destructor(struct dmaster_defer_queue *ddq)
461 /* Ignore requests, if database recovery happens in-between. */
462 if (ddq->generation != ddq->ctdb_db->generation) {
466 while (ddq->deferred_calls != NULL) {
467 struct dmaster_defer_call *call = ddq->deferred_calls;
469 DLIST_REMOVE(ddq->deferred_calls, call);
471 talloc_steal(call->ctdb, call);
472 tevent_add_timer(call->ctdb->ev, call, timeval_zero(),
473 dmaster_defer_reprocess, call);
478 static void *insert_ddq_callback(void *parm, void *data)
487 * This function is used to reigster a key in database that needs to be updated.
488 * Any requests for that key should get deferred till this is completed.
490 static int dmaster_defer_setup(struct ctdb_db_context *ctdb_db,
491 struct ctdb_req_header *hdr,
495 struct dmaster_defer_queue *ddq;
497 k = ctdb_key_to_idkey(hdr, key);
499 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer setup\n"));
504 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
506 if (ddq->generation == ctdb_db->generation) {
511 /* Recovery ocurred - get rid of old queue. All the deferred
512 * requests will be resent anyway from ctdb_call_resend_db.
517 ddq = talloc(hdr, struct dmaster_defer_queue);
519 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer queue\n"));
523 ddq->ctdb_db = ctdb_db;
524 ddq->generation = hdr->generation;
525 ddq->deferred_calls = NULL;
527 trbt_insertarray32_callback(ctdb_db->defer_dmaster, k[0], k,
528 insert_ddq_callback, ddq);
529 talloc_set_destructor(ddq, dmaster_defer_queue_destructor);
535 static int dmaster_defer_add(struct ctdb_db_context *ctdb_db,
536 struct ctdb_req_header *hdr,
539 struct dmaster_defer_queue *ddq;
540 struct dmaster_defer_call *call;
543 k = ctdb_key_to_idkey(hdr, key);
545 DEBUG(DEBUG_ERR, ("Failed to allocate key for dmaster defer add\n"));
549 ddq = trbt_lookuparray32(ctdb_db->defer_dmaster, k[0], k);
557 if (ddq->generation != hdr->generation) {
558 talloc_set_destructor(ddq, NULL);
563 call = talloc(ddq, struct dmaster_defer_call);
565 DEBUG(DEBUG_ERR, ("Failed to allocate dmaster defer call\n"));
569 call->ctdb = ctdb_db->ctdb;
570 call->hdr = talloc_steal(call, hdr);
572 DLIST_ADD_END(ddq->deferred_calls, call);
578 called when a CTDB_REQ_DMASTER packet comes in
580 this comes into the lmaster for a record when the current dmaster
581 wants to give up the dmaster role and give it to someone else
583 void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
585 struct ctdb_req_dmaster_old *c = (struct ctdb_req_dmaster_old *)hdr;
586 TDB_DATA key, data, data2;
587 struct ctdb_ltdb_header header;
588 struct ctdb_db_context *ctdb_db;
589 uint32_t record_flags = 0;
594 key.dsize = c->keylen;
595 data.dptr = c->data + c->keylen;
596 data.dsize = c->datalen;
597 len = offsetof(struct ctdb_req_dmaster_old, data) + key.dsize + data.dsize
599 if (len <= c->hdr.length) {
600 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
601 sizeof(record_flags));
604 ctdb_db = find_ctdb_db(ctdb, c->db_id);
606 ctdb_send_error(ctdb, hdr, -1,
607 "Unknown database in request. db_id==0x%08x",
612 dmaster_defer_setup(ctdb_db, hdr, key);
614 /* fetch the current record */
615 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, hdr, &data2,
616 ctdb_call_input_pkt, ctdb, false);
618 ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
622 DEBUG(DEBUG_INFO,(__location__ " deferring ctdb_request_dmaster\n"));
626 if (ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
627 DEBUG(DEBUG_ERR, ("dmaster request to non-lmaster "
628 "db=%s lmaster=%u gen=%u curgen=%u\n",
629 ctdb_db->db_name, ctdb_lmaster(ctdb, &key),
630 hdr->generation, ctdb_db->generation));
631 ctdb_fatal(ctdb, "ctdb_req_dmaster to non-lmaster");
634 DEBUG(DEBUG_DEBUG,("pnn %u dmaster request on %08x for %u from %u\n",
635 ctdb->pnn, ctdb_hash(&key), c->dmaster, c->hdr.srcnode));
637 /* its a protocol error if the sending node is not the current dmaster */
638 if (header.dmaster != hdr->srcnode) {
639 DEBUG(DEBUG_ALERT,("pnn %u dmaster request for new-dmaster %u from non-master %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u keyval=0x%08x\n",
640 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
641 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
642 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid,
643 (key.dsize >= 4)?(*(uint32_t *)key.dptr):0));
644 if (header.rsn != 0 || header.dmaster != ctdb->pnn) {
645 DEBUG(DEBUG_ERR,("ctdb_req_dmaster from non-master. Force a recovery.\n"));
647 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
648 ctdb_ltdb_unlock(ctdb_db, key);
653 if (header.rsn > c->rsn) {
654 DEBUG(DEBUG_ALERT,("pnn %u dmaster request with older RSN new-dmaster %u from %u real-dmaster=%u key %08x dbid 0x%08x gen=%u curgen=%u c->rsn=%llu header.rsn=%llu reqid=%u\n",
655 ctdb->pnn, c->dmaster, hdr->srcnode, header.dmaster, ctdb_hash(&key),
656 ctdb_db->db_id, hdr->generation, ctdb->vnn_map->generation,
657 (unsigned long long)c->rsn, (unsigned long long)header.rsn, c->hdr.reqid));
660 /* use the rsn from the sending node */
663 /* store the record flags from the sending node */
664 header.flags = record_flags;
666 /* check if the new dmaster is the lmaster, in which case we
667 skip the dmaster reply */
668 if (c->dmaster == ctdb->pnn) {
669 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
671 ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
673 ret = ctdb_ltdb_unlock(ctdb_db, key);
675 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
680 static void ctdb_sticky_record_timeout(struct tevent_context *ev,
681 struct tevent_timer *te,
682 struct timeval t, void *private_data)
684 struct ctdb_sticky_record *sr = talloc_get_type(private_data,
685 struct ctdb_sticky_record);
689 static void *ctdb_make_sticky_record_callback(void *parm, void *data)
692 DEBUG(DEBUG_ERR,("Already have sticky record registered. Free old %p and create new %p\n", data, parm));
699 ctdb_make_record_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key)
701 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
703 struct ctdb_sticky_record *sr;
705 k = ctdb_key_to_idkey(tmp_ctx, key);
707 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
708 talloc_free(tmp_ctx);
712 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
714 talloc_free(tmp_ctx);
718 sr = talloc(ctdb_db->sticky_records, struct ctdb_sticky_record);
720 talloc_free(tmp_ctx);
721 DEBUG(DEBUG_ERR,("Failed to allocate sticky record structure\n"));
726 sr->ctdb_db = ctdb_db;
729 DEBUG(DEBUG_ERR,("Make record sticky for %d seconds in db %s key:0x%08x.\n",
730 ctdb->tunable.sticky_duration,
731 ctdb_db->db_name, ctdb_hash(&key)));
733 trbt_insertarray32_callback(ctdb_db->sticky_records, k[0], &k[0], ctdb_make_sticky_record_callback, sr);
735 tevent_add_timer(ctdb->ev, sr,
736 timeval_current_ofs(ctdb->tunable.sticky_duration, 0),
737 ctdb_sticky_record_timeout, sr);
739 talloc_free(tmp_ctx);
743 struct pinned_down_requeue_handle {
744 struct ctdb_context *ctdb;
745 struct ctdb_req_header *hdr;
748 struct pinned_down_deferred_call {
749 struct ctdb_context *ctdb;
750 struct ctdb_req_header *hdr;
753 static void pinned_down_requeue(struct tevent_context *ev,
754 struct tevent_timer *te,
755 struct timeval t, void *private_data)
757 struct pinned_down_requeue_handle *handle = talloc_get_type(private_data, struct pinned_down_requeue_handle);
758 struct ctdb_context *ctdb = handle->ctdb;
760 talloc_steal(ctdb, handle->hdr);
761 ctdb_call_input_pkt(ctdb, handle->hdr);
766 static int pinned_down_destructor(struct pinned_down_deferred_call *pinned_down)
768 struct ctdb_context *ctdb = pinned_down->ctdb;
769 struct pinned_down_requeue_handle *handle = talloc(ctdb, struct pinned_down_requeue_handle);
771 handle->ctdb = pinned_down->ctdb;
772 handle->hdr = pinned_down->hdr;
773 talloc_steal(handle, handle->hdr);
775 tevent_add_timer(ctdb->ev, handle, timeval_zero(),
776 pinned_down_requeue, handle);
782 ctdb_defer_pinned_down_request(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr)
784 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
786 struct ctdb_sticky_record *sr;
787 struct pinned_down_deferred_call *pinned_down;
789 k = ctdb_key_to_idkey(tmp_ctx, key);
791 DEBUG(DEBUG_ERR,("Failed to allocate key for sticky record\n"));
792 talloc_free(tmp_ctx);
796 sr = trbt_lookuparray32(ctdb_db->sticky_records, k[0], &k[0]);
798 talloc_free(tmp_ctx);
802 talloc_free(tmp_ctx);
804 if (sr->pindown == NULL) {
808 pinned_down = talloc(sr->pindown, struct pinned_down_deferred_call);
809 if (pinned_down == NULL) {
810 DEBUG(DEBUG_ERR,("Failed to allocate structure for deferred pinned down request\n"));
814 pinned_down->ctdb = ctdb;
815 pinned_down->hdr = hdr;
817 talloc_set_destructor(pinned_down, pinned_down_destructor);
818 talloc_steal(pinned_down, hdr);
824 ctdb_update_db_stat_hot_keys(struct ctdb_db_context *ctdb_db, TDB_DATA key,
829 /* smallest value is always at index 0 */
830 if (count <= ctdb_db->statistics.hot_keys[0].count) {
834 /* see if we already know this key */
835 for (i = 0; i < MAX_HOT_KEYS; i++) {
836 if (key.dsize != ctdb_db->statistics.hot_keys[i].key.dsize) {
839 if (memcmp(key.dptr, ctdb_db->statistics.hot_keys[i].key.dptr, key.dsize)) {
842 /* found an entry for this key */
843 if (count <= ctdb_db->statistics.hot_keys[i].count) {
846 ctdb_db->statistics.hot_keys[i].count = count;
850 if (ctdb_db->statistics.num_hot_keys < MAX_HOT_KEYS) {
851 id = ctdb_db->statistics.num_hot_keys;
852 ctdb_db->statistics.num_hot_keys++;
857 if (ctdb_db->statistics.hot_keys[id].key.dptr != NULL) {
858 talloc_free(ctdb_db->statistics.hot_keys[id].key.dptr);
860 ctdb_db->statistics.hot_keys[id].key.dsize = key.dsize;
861 ctdb_db->statistics.hot_keys[id].key.dptr = talloc_memdup(ctdb_db, key.dptr, key.dsize);
862 ctdb_db->statistics.hot_keys[id].count = count;
864 ("Updated hot key database=%s key=0x%08x id=%d count=%d\n",
865 ctdb_db->db_name, ctdb_hash(&key), id, count));
868 for (i = 1; i < MAX_HOT_KEYS; i++) {
869 if (ctdb_db->statistics.hot_keys[i].count == 0) {
872 if (ctdb_db->statistics.hot_keys[i].count < ctdb_db->statistics.hot_keys[0].count) {
873 count = ctdb_db->statistics.hot_keys[i].count;
874 ctdb_db->statistics.hot_keys[i].count = ctdb_db->statistics.hot_keys[0].count;
875 ctdb_db->statistics.hot_keys[0].count = count;
877 key = ctdb_db->statistics.hot_keys[i].key;
878 ctdb_db->statistics.hot_keys[i].key = ctdb_db->statistics.hot_keys[0].key;
879 ctdb_db->statistics.hot_keys[0].key = key;
885 called when a CTDB_REQ_CALL packet comes in
887 void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
889 struct ctdb_req_call_old *c = (struct ctdb_req_call_old *)hdr;
891 struct ctdb_reply_call_old *r;
893 struct ctdb_ltdb_header header;
894 struct ctdb_call *call;
895 struct ctdb_db_context *ctdb_db;
896 int tmp_count, bucket;
898 if (ctdb->methods == NULL) {
899 DEBUG(DEBUG_INFO,(__location__ " Failed ctdb_request_call. Transport is DOWN\n"));
904 ctdb_db = find_ctdb_db(ctdb, c->db_id);
906 ctdb_send_error(ctdb, hdr, -1,
907 "Unknown database in request. db_id==0x%08x",
912 call = talloc(hdr, struct ctdb_call);
913 CTDB_NO_MEMORY_FATAL(ctdb, call);
915 call->call_id = c->callid;
916 call->key.dptr = c->data;
917 call->key.dsize = c->keylen;
918 call->call_data.dptr = c->data + c->keylen;
919 call->call_data.dsize = c->calldatalen;
920 call->reply_data.dptr = NULL;
921 call->reply_data.dsize = 0;
924 /* If this record is pinned down we should defer the
925 request until the pindown times out
927 if (ctdb_db->sticky) {
928 if (ctdb_defer_pinned_down_request(ctdb, ctdb_db, call->key, hdr) == 0) {
930 ("Defer request for pinned down record in %s\n", ctdb_db->db_name));
936 if (dmaster_defer_add(ctdb_db, hdr, call->key) == 0) {
941 /* determine if we are the dmaster for this key. This also
942 fetches the record data (if any), thus avoiding a 2nd fetch of the data
943 if the call will be answered locally */
945 ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, call->key, &header, hdr, &data,
946 ctdb_call_input_pkt, ctdb, false);
948 ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
953 DEBUG(DEBUG_INFO,(__location__ " deferred ctdb_request_call\n"));
958 /* Dont do READONLY if we don't have a tracking database */
959 if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
960 c->flags &= ~CTDB_WANT_READONLY;
963 if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
964 header.flags &= ~CTDB_REC_RO_FLAGS;
965 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
966 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
967 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
968 ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
970 /* and clear out the tracking data */
971 if (tdb_delete(ctdb_db->rottdb, call->key) != 0) {
972 DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
976 /* if we are revoking, we must defer all other calls until the revoke
979 if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
980 talloc_free(data.dptr);
981 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
983 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
984 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
991 * If we are not the dmaster and are not hosting any delegations,
992 * then we redirect the request to the node than can answer it
993 * (the lmaster or the dmaster).
995 if ((header.dmaster != ctdb->pnn)
996 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) ) {
997 talloc_free(data.dptr);
998 ctdb_call_send_redirect(ctdb, ctdb_db, call->key, c, &header);
1000 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1002 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1008 if ( (!(c->flags & CTDB_WANT_READONLY))
1009 && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
1010 header.flags |= CTDB_REC_RO_REVOKING_READONLY;
1011 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1012 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1014 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1016 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, call->key, &header, data) != 0) {
1017 ctdb_fatal(ctdb, "Failed to start record revoke");
1019 talloc_free(data.dptr);
1021 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, call->key, hdr, ctdb_call_input_pkt, ctdb) != 0) {
1022 ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
1029 /* If this is the first request for delegation. bump rsn and set
1030 * the delegations flag
1032 if ((c->flags & CTDB_WANT_READONLY)
1033 && (c->callid == CTDB_FETCH_WITH_HEADER_FUNC)
1034 && (!(header.flags & CTDB_REC_RO_HAVE_DELEGATIONS))) {
1036 header.flags |= CTDB_REC_RO_HAVE_DELEGATIONS;
1037 if (ctdb_ltdb_store(ctdb_db, call->key, &header, data) != 0) {
1038 ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
1041 if ((c->flags & CTDB_WANT_READONLY)
1042 && (call->call_id == CTDB_FETCH_WITH_HEADER_FUNC)) {
1045 tdata = tdb_fetch(ctdb_db->rottdb, call->key);
1046 if (ctdb_trackingdb_add_pnn(ctdb, &tdata, c->hdr.srcnode) != 0) {
1047 ctdb_fatal(ctdb, "Failed to add node to trackingdb");
1049 if (tdb_store(ctdb_db->rottdb, call->key, tdata, TDB_REPLACE) != 0) {
1050 ctdb_fatal(ctdb, "Failed to store trackingdb data");
1054 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1056 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1059 len = offsetof(struct ctdb_reply_call_old, data) + data.dsize + sizeof(struct ctdb_ltdb_header);
1060 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1061 struct ctdb_reply_call_old);
1062 CTDB_NO_MEMORY_FATAL(ctdb, r);
1063 r->hdr.destnode = c->hdr.srcnode;
1064 r->hdr.reqid = c->hdr.reqid;
1065 r->hdr.generation = ctdb_db->generation;
1067 r->datalen = data.dsize + sizeof(struct ctdb_ltdb_header);
1069 header.flags |= CTDB_REC_RO_HAVE_READONLY;
1070 header.flags &= ~CTDB_REC_RO_HAVE_DELEGATIONS;
1071 memcpy(&r->data[0], &header, sizeof(struct ctdb_ltdb_header));
1074 memcpy(&r->data[sizeof(struct ctdb_ltdb_header)], data.dptr, data.dsize);
1077 ctdb_queue_packet(ctdb, &r->hdr);
1078 CTDB_INCREMENT_STAT(ctdb, total_ro_delegations);
1079 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_delegations);
1086 CTDB_UPDATE_STAT(ctdb, max_hop_count, c->hopcount);
1087 tmp_count = c->hopcount;
1093 if (bucket >= MAX_COUNT_BUCKETS) {
1094 bucket = MAX_COUNT_BUCKETS - 1;
1096 CTDB_INCREMENT_STAT(ctdb, hop_count_bucket[bucket]);
1097 CTDB_INCREMENT_DB_STAT(ctdb_db, hop_count_bucket[bucket]);
1099 /* If this database supports sticky records, then check if the
1100 hopcount is big. If it is it means the record is hot and we
1101 should make it sticky.
1103 if (ctdb_db->sticky && c->hopcount >= ctdb->tunable.hopcount_make_sticky) {
1104 ctdb_make_record_sticky(ctdb, ctdb_db, call->key);
1108 /* Try if possible to migrate the record off to the caller node.
1109 * From the clients perspective a fetch of the data is just as
1110 * expensive as a migration.
1112 if (c->hdr.srcnode != ctdb->pnn) {
1113 if (ctdb_db->persistent_state) {
1114 DEBUG(DEBUG_INFO, (__location__ " refusing migration"
1115 " of key %s while transaction is active\n",
1116 (char *)call->key.dptr));
1118 DEBUG(DEBUG_DEBUG,("pnn %u starting migration of %08x to %u\n",
1119 ctdb->pnn, ctdb_hash(&(call->key)), c->hdr.srcnode));
1120 ctdb_call_send_dmaster(ctdb_db, c, &header, &(call->key), &data);
1121 talloc_free(data.dptr);
1123 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1125 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1132 ret = ctdb_call_local(ctdb_db, call, &header, hdr, &data, true);
1134 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_local failed\n"));
1138 ret = ctdb_ltdb_unlock(ctdb_db, call->key);
1140 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
1143 len = offsetof(struct ctdb_reply_call_old, data) + call->reply_data.dsize;
1144 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REPLY_CALL, len,
1145 struct ctdb_reply_call_old);
1146 CTDB_NO_MEMORY_FATAL(ctdb, r);
1147 r->hdr.destnode = hdr->srcnode;
1148 r->hdr.reqid = hdr->reqid;
1149 r->hdr.generation = ctdb_db->generation;
1150 r->status = call->status;
1151 r->datalen = call->reply_data.dsize;
1152 if (call->reply_data.dsize) {
1153 memcpy(&r->data[0], call->reply_data.dptr, call->reply_data.dsize);
1156 ctdb_queue_packet(ctdb, &r->hdr);
1163 * called when a CTDB_REPLY_CALL packet comes in
1165 * This packet comes in response to a CTDB_REQ_CALL request packet. It
1166 * contains any reply data from the call
1168 void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1170 struct ctdb_reply_call_old *c = (struct ctdb_reply_call_old *)hdr;
1171 struct ctdb_call_state *state;
1173 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1174 if (state == NULL) {
1175 DEBUG(DEBUG_ERR, (__location__ " reqid %u not found\n", hdr->reqid));
1179 if (hdr->reqid != state->reqid) {
1180 /* we found a record but it was the wrong one */
1181 DEBUG(DEBUG_ERR, ("Dropped orphaned call reply with reqid:%u\n",hdr->reqid));
1186 /* read only delegation processing */
1187 /* If we got a FETCH_WITH_HEADER we should check if this is a ro
1188 * delegation since we may need to update the record header
1190 if (state->c->callid == CTDB_FETCH_WITH_HEADER_FUNC) {
1191 struct ctdb_db_context *ctdb_db = state->ctdb_db;
1192 struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)&c->data[0];
1193 struct ctdb_ltdb_header oldheader;
1194 TDB_DATA key, data, olddata;
1197 if (!(header->flags & CTDB_REC_RO_HAVE_READONLY)) {
1202 key.dsize = state->c->keylen;
1203 key.dptr = state->c->data;
1204 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1205 ctdb_call_input_pkt, ctdb, false);
1210 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_call\n"));
1214 ret = ctdb_ltdb_fetch(ctdb_db, key, &oldheader, state, &olddata);
1216 DEBUG(DEBUG_ERR, ("Failed to fetch old record in ctdb_reply_call\n"));
1217 ctdb_ltdb_unlock(ctdb_db, key);
1221 if (header->rsn <= oldheader.rsn) {
1222 ctdb_ltdb_unlock(ctdb_db, key);
1226 if (c->datalen < sizeof(struct ctdb_ltdb_header)) {
1227 DEBUG(DEBUG_ERR,(__location__ " Got FETCH_WITH_HEADER reply with too little data: %d bytes\n", c->datalen));
1228 ctdb_ltdb_unlock(ctdb_db, key);
1232 data.dsize = c->datalen - sizeof(struct ctdb_ltdb_header);
1233 data.dptr = &c->data[sizeof(struct ctdb_ltdb_header)];
1234 ret = ctdb_ltdb_store(ctdb_db, key, header, data);
1236 DEBUG(DEBUG_ERR, ("Failed to store new record in ctdb_reply_call\n"));
1237 ctdb_ltdb_unlock(ctdb_db, key);
1241 ctdb_ltdb_unlock(ctdb_db, key);
1245 state->call->reply_data.dptr = c->data;
1246 state->call->reply_data.dsize = c->datalen;
1247 state->call->status = c->status;
1249 talloc_steal(state, c);
1251 state->state = CTDB_CALL_DONE;
1252 if (state->async.fn) {
1253 state->async.fn(state);
1259 * called when a CTDB_REPLY_DMASTER packet comes in
1261 * This packet comes in from the lmaster in response to a CTDB_REQ_CALL
1262 * request packet. It means that the current dmaster wants to give us
1265 void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1267 struct ctdb_reply_dmaster_old *c = (struct ctdb_reply_dmaster_old *)hdr;
1268 struct ctdb_db_context *ctdb_db;
1270 uint32_t record_flags = 0;
1274 ctdb_db = find_ctdb_db(ctdb, c->db_id);
1275 if (ctdb_db == NULL) {
1276 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_reply_dmaster\n", c->db_id));
1281 key.dsize = c->keylen;
1282 data.dptr = &c->data[key.dsize];
1283 data.dsize = c->datalen;
1284 len = offsetof(struct ctdb_reply_dmaster_old, data) + key.dsize + data.dsize
1286 if (len <= c->hdr.length) {
1287 memcpy(&record_flags, &c->data[c->keylen + c->datalen],
1288 sizeof(record_flags));
1291 dmaster_defer_setup(ctdb_db, hdr, key);
1293 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
1294 ctdb_call_input_pkt, ctdb, false);
1299 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock in ctdb_reply_dmaster\n"));
1303 ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
1308 called when a CTDB_REPLY_ERROR packet comes in
1310 void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
1312 struct ctdb_reply_error_old *c = (struct ctdb_reply_error_old *)hdr;
1313 struct ctdb_call_state *state;
1315 state = reqid_find(ctdb->idr, hdr->reqid, struct ctdb_call_state);
1316 if (state == NULL) {
1317 DEBUG(DEBUG_ERR,("pnn %u Invalid reqid %u in ctdb_reply_error\n",
1318 ctdb->pnn, hdr->reqid));
1322 if (hdr->reqid != state->reqid) {
1323 /* we found a record but it was the wrong one */
1324 DEBUG(DEBUG_ERR, ("Dropped orphaned error reply with reqid:%u\n",hdr->reqid));
1328 talloc_steal(state, c);
1330 state->state = CTDB_CALL_ERROR;
1331 state->errmsg = (char *)c->msg;
1332 if (state->async.fn) {
1333 state->async.fn(state);
1341 static int ctdb_call_destructor(struct ctdb_call_state *state)
1343 DLIST_REMOVE(state->ctdb_db->pending_calls, state);
1344 reqid_remove(state->ctdb_db->ctdb->idr, state->reqid);
1350 called when a ctdb_call needs to be resent after a reconfigure event
1352 static void ctdb_call_resend(struct ctdb_call_state *state)
1354 struct ctdb_context *ctdb = state->ctdb_db->ctdb;
1356 state->generation = state->ctdb_db->generation;
1358 /* use a new reqid, in case the old reply does eventually come in */
1359 reqid_remove(ctdb->idr, state->reqid);
1360 state->reqid = reqid_new(ctdb->idr, state);
1361 state->c->hdr.reqid = state->reqid;
1363 /* update the generation count for this request, so its valid with the new vnn_map */
1364 state->c->hdr.generation = state->generation;
1366 /* send the packet to ourselves, it will be redirected appropriately */
1367 state->c->hdr.destnode = ctdb->pnn;
1369 ctdb_queue_packet(ctdb, &state->c->hdr);
1370 DEBUG(DEBUG_NOTICE,("resent ctdb_call for db %s reqid %u generation %u\n",
1371 state->ctdb_db->db_name, state->reqid, state->generation));
1375 resend all pending calls on recovery
1377 void ctdb_call_resend_db(struct ctdb_db_context *ctdb_db)
1379 struct ctdb_call_state *state, *next;
1381 for (state = ctdb_db->pending_calls; state; state = next) {
1383 ctdb_call_resend(state);
1387 void ctdb_call_resend_all(struct ctdb_context *ctdb)
1389 struct ctdb_db_context *ctdb_db;
1391 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
1392 ctdb_call_resend_db(ctdb_db);
1397 this allows the caller to setup a async.fn
1399 static void call_local_trigger(struct tevent_context *ev,
1400 struct tevent_timer *te,
1401 struct timeval t, void *private_data)
1403 struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
1404 if (state->async.fn) {
1405 state->async.fn(state);
1411 construct an event driven local ctdb_call
1413 this is used so that locally processed ctdb_call requests are processed
1414 in an event driven manner
1416 struct ctdb_call_state *ctdb_call_local_send(struct ctdb_db_context *ctdb_db,
1417 struct ctdb_call *call,
1418 struct ctdb_ltdb_header *header,
1421 struct ctdb_call_state *state;
1422 struct ctdb_context *ctdb = ctdb_db->ctdb;
1425 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1426 CTDB_NO_MEMORY_NULL(ctdb, state);
1428 talloc_steal(state, data->dptr);
1430 state->state = CTDB_CALL_DONE;
1431 state->call = talloc(state, struct ctdb_call);
1432 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1433 *(state->call) = *call;
1434 state->ctdb_db = ctdb_db;
1436 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
1438 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
1441 tevent_add_timer(ctdb->ev, state, timeval_zero(),
1442 call_local_trigger, state);
1449 make a remote ctdb call - async send. Called in daemon context.
1451 This constructs a ctdb_call request and queues it for processing.
1452 This call never blocks.
1454 struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctdb_db,
1455 struct ctdb_call *call,
1456 struct ctdb_ltdb_header *header)
1459 struct ctdb_call_state *state;
1460 struct ctdb_context *ctdb = ctdb_db->ctdb;
1462 if (ctdb->methods == NULL) {
1463 DEBUG(DEBUG_INFO,(__location__ " Failed send packet. Transport is down\n"));
1467 state = talloc_zero(ctdb_db, struct ctdb_call_state);
1468 CTDB_NO_MEMORY_NULL(ctdb, state);
1469 state->call = talloc(state, struct ctdb_call);
1470 CTDB_NO_MEMORY_NULL(ctdb, state->call);
1472 state->reqid = reqid_new(ctdb->idr, state);
1473 state->ctdb_db = ctdb_db;
1474 talloc_set_destructor(state, ctdb_call_destructor);
1476 len = offsetof(struct ctdb_req_call_old, data) + call->key.dsize + call->call_data.dsize;
1477 state->c = ctdb_transport_allocate(ctdb, state, CTDB_REQ_CALL, len,
1478 struct ctdb_req_call_old);
1479 CTDB_NO_MEMORY_NULL(ctdb, state->c);
1480 state->c->hdr.destnode = header->dmaster;
1482 /* this limits us to 16k outstanding messages - not unreasonable */
1483 state->c->hdr.reqid = state->reqid;
1484 state->c->hdr.generation = ctdb_db->generation;
1485 state->c->flags = call->flags;
1486 state->c->db_id = ctdb_db->db_id;
1487 state->c->callid = call->call_id;
1488 state->c->hopcount = 0;
1489 state->c->keylen = call->key.dsize;
1490 state->c->calldatalen = call->call_data.dsize;
1491 memcpy(&state->c->data[0], call->key.dptr, call->key.dsize);
1492 memcpy(&state->c->data[call->key.dsize],
1493 call->call_data.dptr, call->call_data.dsize);
1494 *(state->call) = *call;
1495 state->call->call_data.dptr = &state->c->data[call->key.dsize];
1496 state->call->key.dptr = &state->c->data[0];
1498 state->state = CTDB_CALL_WAIT;
1499 state->generation = ctdb_db->generation;
1501 DLIST_ADD(ctdb_db->pending_calls, state);
1503 ctdb_queue_packet(ctdb, &state->c->hdr);
1509 make a remote ctdb call - async recv - called in daemon context
1511 This is called when the program wants to wait for a ctdb_call to complete and get the
1512 results. This call will block unless the call has already completed.
1514 int ctdb_daemon_call_recv(struct ctdb_call_state *state, struct ctdb_call *call)
1516 while (state->state < CTDB_CALL_DONE) {
1517 tevent_loop_once(state->ctdb_db->ctdb->ev);
1519 if (state->state != CTDB_CALL_DONE) {
1520 ctdb_set_error(state->ctdb_db->ctdb, "%s", state->errmsg);
1525 if (state->call->reply_data.dsize) {
1526 call->reply_data.dptr = talloc_memdup(call,
1527 state->call->reply_data.dptr,
1528 state->call->reply_data.dsize);
1529 call->reply_data.dsize = state->call->reply_data.dsize;
1531 call->reply_data.dptr = NULL;
1532 call->reply_data.dsize = 0;
1534 call->status = state->call->status;
1541 send a keepalive packet to the other node
1543 void ctdb_send_keepalive(struct ctdb_context *ctdb, uint32_t destnode)
1545 struct ctdb_req_keepalive_old *r;
1547 if (ctdb->methods == NULL) {
1548 DEBUG(DEBUG_INFO,(__location__ " Failed to send keepalive. Transport is DOWN\n"));
1552 r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_KEEPALIVE,
1553 sizeof(struct ctdb_req_keepalive_old),
1554 struct ctdb_req_keepalive_old);
1555 CTDB_NO_MEMORY_FATAL(ctdb, r);
1556 r->hdr.destnode = destnode;
1559 CTDB_INCREMENT_STAT(ctdb, keepalive_packets_sent);
1561 ctdb_queue_packet(ctdb, &r->hdr);
1568 struct revokechild_deferred_call {
1569 struct ctdb_context *ctdb;
1570 struct ctdb_req_header *hdr;
1571 deferred_requeue_fn fn;
1575 struct revokechild_handle {
1576 struct revokechild_handle *next, *prev;
1577 struct ctdb_context *ctdb;
1578 struct ctdb_db_context *ctdb_db;
1579 struct tevent_fd *fde;
1586 struct revokechild_requeue_handle {
1587 struct ctdb_context *ctdb;
1588 struct ctdb_req_header *hdr;
1589 deferred_requeue_fn fn;
1593 static void deferred_call_requeue(struct tevent_context *ev,
1594 struct tevent_timer *te,
1595 struct timeval t, void *private_data)
1597 struct revokechild_requeue_handle *requeue_handle = talloc_get_type(private_data, struct revokechild_requeue_handle);
1599 requeue_handle->fn(requeue_handle->ctx, requeue_handle->hdr);
1600 talloc_free(requeue_handle);
1603 static int deferred_call_destructor(struct revokechild_deferred_call *deferred_call)
1605 struct ctdb_context *ctdb = deferred_call->ctdb;
1606 struct revokechild_requeue_handle *requeue_handle = talloc(ctdb, struct revokechild_requeue_handle);
1608 requeue_handle->ctdb = ctdb;
1609 requeue_handle->hdr = deferred_call->hdr;
1610 requeue_handle->fn = deferred_call->fn;
1611 requeue_handle->ctx = deferred_call->ctx;
1612 talloc_steal(requeue_handle, requeue_handle->hdr);
1614 /* Always delay revoke requests. Either wait for the read/write
1615 * operation to complete, or if revoking failed wait for recovery to
1618 tevent_add_timer(ctdb->ev, requeue_handle,
1619 timeval_current_ofs(1, 0),
1620 deferred_call_requeue, requeue_handle);
1626 static int revokechild_destructor(struct revokechild_handle *rc)
1628 if (rc->fde != NULL) {
1629 talloc_free(rc->fde);
1632 if (rc->fd[0] != -1) {
1635 if (rc->fd[1] != -1) {
1638 ctdb_kill(rc->ctdb, rc->child, SIGKILL);
1640 DLIST_REMOVE(rc->ctdb_db->revokechild_active, rc);
1644 static void revokechild_handler(struct tevent_context *ev,
1645 struct tevent_fd *fde,
1646 uint16_t flags, void *private_data)
1648 struct revokechild_handle *rc = talloc_get_type(private_data,
1649 struct revokechild_handle);
1653 ret = sys_read(rc->fd[0], &c, 1);
1655 DEBUG(DEBUG_ERR,("Failed to read status from revokechild. errno:%d\n", errno));
1661 DEBUG(DEBUG_ERR,("revokechild returned failure. status:%d\n", c));
1670 struct ctdb_revoke_state {
1671 struct ctdb_db_context *ctdb_db;
1673 struct ctdb_ltdb_header *header;
1680 static void update_record_cb(struct ctdb_client_control_state *state)
1682 struct ctdb_revoke_state *revoke_state;
1686 if (state == NULL) {
1689 revoke_state = state->async.private_data;
1691 state->async.fn = NULL;
1692 ret = ctdb_control_recv(state->ctdb, state, state, NULL, &res, NULL);
1693 if ((ret != 0) || (res != 0)) {
1694 DEBUG(DEBUG_ERR,("Recv for revoke update record failed ret:%d res:%d\n", ret, res));
1695 revoke_state->status = -1;
1698 revoke_state->count--;
1699 if (revoke_state->count <= 0) {
1700 revoke_state->finished = 1;
1704 static void revoke_send_cb(struct ctdb_context *ctdb, uint32_t pnn, void *private_data)
1706 struct ctdb_revoke_state *revoke_state = private_data;
1707 struct ctdb_client_control_state *state;
1709 state = ctdb_ctrl_updaterecord_send(ctdb, revoke_state, timeval_current_ofs(ctdb->tunable.control_timeout,0), pnn, revoke_state->ctdb_db, revoke_state->key, revoke_state->header, revoke_state->data);
1710 if (state == NULL) {
1711 DEBUG(DEBUG_ERR,("Failure to send update record to revoke readonly delegation\n"));
1712 revoke_state->status = -1;
1715 state->async.fn = update_record_cb;
1716 state->async.private_data = revoke_state;
1718 revoke_state->count++;
1722 static void ctdb_revoke_timeout_handler(struct tevent_context *ev,
1723 struct tevent_timer *te,
1724 struct timeval yt, void *private_data)
1726 struct ctdb_revoke_state *state = private_data;
1728 DEBUG(DEBUG_ERR,("Timed out waiting for revoke to finish\n"));
1729 state->finished = 1;
1733 static int ctdb_revoke_all_delegations(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA tdata, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1735 struct ctdb_revoke_state *state = talloc_zero(ctdb, struct ctdb_revoke_state);
1736 struct ctdb_ltdb_header new_header;
1739 state->ctdb_db = ctdb_db;
1741 state->header = header;
1744 ctdb_trackingdb_traverse(ctdb, tdata, revoke_send_cb, state);
1746 tevent_add_timer(ctdb->ev, state,
1747 timeval_current_ofs(ctdb->tunable.control_timeout, 0),
1748 ctdb_revoke_timeout_handler, state);
1750 while (state->finished == 0) {
1751 tevent_loop_once(ctdb->ev);
1754 if (ctdb_ltdb_lock(ctdb_db, key) != 0) {
1755 DEBUG(DEBUG_ERR,("Failed to chainlock the database in revokechild\n"));
1759 if (ctdb_ltdb_fetch(ctdb_db, key, &new_header, state, &new_data) != 0) {
1760 ctdb_ltdb_unlock(ctdb_db, key);
1761 DEBUG(DEBUG_ERR,("Failed for fetch tdb record in revokechild\n"));
1766 if (new_header.rsn > header->rsn) {
1767 ctdb_ltdb_unlock(ctdb_db, key);
1768 DEBUG(DEBUG_ERR,("RSN too high in tdb record in revokechild\n"));
1772 if ( (new_header.flags & (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS)) != (CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS) ) {
1773 ctdb_ltdb_unlock(ctdb_db, key);
1774 DEBUG(DEBUG_ERR,("Flags are wrong in tdb record in revokechild\n"));
1780 * If revoke on all nodes succeed, revoke is complete. Otherwise,
1781 * remove CTDB_REC_RO_REVOKING_READONLY flag and retry.
1783 if (state->status == 0) {
1785 new_header.flags |= CTDB_REC_RO_REVOKE_COMPLETE;
1787 DEBUG(DEBUG_NOTICE, ("Revoke all delegations failed, retrying.\n"));
1788 new_header.flags &= ~CTDB_REC_RO_REVOKING_READONLY;
1790 if (ctdb_ltdb_store(ctdb_db, key, &new_header, new_data) != 0) {
1791 ctdb_ltdb_unlock(ctdb_db, key);
1792 DEBUG(DEBUG_ERR,("Failed to write new record in revokechild\n"));
1796 ctdb_ltdb_unlock(ctdb_db, key);
1803 int ctdb_start_revoke_ro_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
1806 struct revokechild_handle *rc;
1807 pid_t parent = getpid();
1810 header->flags &= ~(CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY);
1811 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1814 if ((rc = talloc_zero(ctdb_db, struct revokechild_handle)) == NULL) {
1815 DEBUG(DEBUG_ERR,("Failed to allocate revokechild_handle\n"));
1819 tdata = tdb_fetch(ctdb_db->rottdb, key);
1820 if (tdata.dsize > 0) {
1824 tdata.dptr = talloc_memdup(rc, tdata.dptr, tdata.dsize);
1830 rc->ctdb_db = ctdb_db;
1834 talloc_set_destructor(rc, revokechild_destructor);
1836 rc->key.dsize = key.dsize;
1837 rc->key.dptr = talloc_memdup(rc, key.dptr, key.dsize);
1838 if (rc->key.dptr == NULL) {
1839 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1846 DEBUG(DEBUG_ERR,("Failed to allocate key for revokechild_handle\n"));
1852 rc->child = ctdb_fork(ctdb);
1853 if (rc->child == (pid_t)-1) {
1854 DEBUG(DEBUG_ERR,("Failed to fork child for revokechild\n"));
1859 if (rc->child == 0) {
1863 prctl_set_comment("ctdb_revokechild");
1864 if (switch_from_server_to_client(ctdb) != 0) {
1865 DEBUG(DEBUG_ERR,("Failed to switch from server to client for revokechild process\n"));
1867 goto child_finished;
1870 c = ctdb_revoke_all_delegations(ctdb, ctdb_db, tdata, key, header, data);
1873 sys_write(rc->fd[1], &c, 1);
1874 ctdb_wait_for_process_to_exit(parent);
1880 set_close_on_exec(rc->fd[0]);
1882 /* This is an active revokechild child process */
1883 DLIST_ADD_END(ctdb_db->revokechild_active, rc);
1885 rc->fde = tevent_add_fd(ctdb->ev, rc, rc->fd[0], TEVENT_FD_READ,
1886 revokechild_handler, (void *)rc);
1887 if (rc->fde == NULL) {
1888 DEBUG(DEBUG_ERR,("Failed to set up fd event for revokechild process\n"));
1891 tevent_fd_set_auto_close(rc->fde);
1896 int ctdb_add_revoke_deferred_call(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_req_header *hdr, deferred_requeue_fn fn, void *call_context)
1898 struct revokechild_handle *rc;
1899 struct revokechild_deferred_call *deferred_call;
1901 for (rc = ctdb_db->revokechild_active; rc; rc = rc->next) {
1902 if (rc->key.dsize == 0) {
1905 if (rc->key.dsize != key.dsize) {
1908 if (!memcmp(rc->key.dptr, key.dptr, key.dsize)) {
1914 DEBUG(DEBUG_ERR,("Failed to add deferred call to revoke list. revoke structure not found\n"));
1918 deferred_call = talloc(rc, struct revokechild_deferred_call);
1919 if (deferred_call == NULL) {
1920 DEBUG(DEBUG_ERR,("Failed to allocate deferred call structure for revoking record\n"));
1924 deferred_call->ctdb = ctdb;
1925 deferred_call->hdr = hdr;
1926 deferred_call->fn = fn;
1927 deferred_call->ctx = call_context;
1929 talloc_set_destructor(deferred_call, deferred_call_destructor);
1930 talloc_steal(deferred_call, hdr);
1935 static void ctdb_migration_count_handler(TDB_DATA key, uint64_t counter,
1938 struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1939 private_data, struct ctdb_db_context);
1942 value = (counter < INT_MAX ? counter : INT_MAX);
1943 ctdb_update_db_stat_hot_keys(ctdb_db, key, value);
1946 static void ctdb_migration_cleandb_event(struct tevent_context *ev,
1947 struct tevent_timer *te,
1948 struct timeval current_time,
1951 struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
1952 private_data, struct ctdb_db_context);
1954 if (ctdb_db->migratedb == NULL) {
1958 hash_count_expire(ctdb_db->migratedb, NULL);
1960 te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
1961 tevent_timeval_current_ofs(10, 0),
1962 ctdb_migration_cleandb_event, ctdb_db);
1965 ("Memory error in migration cleandb event for %s\n",
1967 TALLOC_FREE(ctdb_db->migratedb);
1971 int ctdb_migration_init(struct ctdb_db_context *ctdb_db)
1973 struct timeval one_second = { 1, 0 };
1974 struct tevent_timer *te;
1977 if (ctdb_db->persistent) {
1981 ret = hash_count_init(ctdb_db, one_second,
1982 ctdb_migration_count_handler, ctdb_db,
1983 &ctdb_db->migratedb);
1986 ("Memory error in migration init for %s\n",
1991 te = tevent_add_timer(ctdb_db->ctdb->ev, ctdb_db->migratedb,
1992 tevent_timeval_current_ofs(10, 0),
1993 ctdb_migration_cleandb_event, ctdb_db);
1996 ("Memory error in migration init for %s\n",
1998 TALLOC_FREE(ctdb_db->migratedb);