2 ctdb parallel database recovery
4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
38 #include "common/logging.h"
40 static int recover_timeout = 30;
44 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
50 static bool generic_recv(struct tevent_req *req, int *perr)
54 if (tevent_req_is_unix_error(req, &err)) {
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
66 static uint64_t srvid_next(void)
73 * Recovery database functions
76 struct recdb_context {
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
87 uint32_t hash_size, bool persistent)
89 static char *db_dir_state = NULL;
90 struct recdb_context *recdb;
91 unsigned int tdb_flags;
93 recdb = talloc(mem_ctx, struct recdb_context);
98 if (db_dir_state == NULL) {
99 db_dir_state = getenv("CTDB_DBDIR_STATE");
102 recdb->db_name = db_name;
103 recdb->db_id = db_id;
104 recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105 db_dir_state != NULL ?
107 dirname(discard_const(db_path)),
109 if (recdb->db_path == NULL) {
113 unlink(recdb->db_path);
115 tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116 recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117 tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118 if (recdb->db == NULL) {
120 D_ERR("failed to create recovery db %s\n", recdb->db_path);
124 recdb->persistent = persistent;
129 static uint32_t recdb_id(struct recdb_context *recdb)
134 static const char *recdb_name(struct recdb_context *recdb)
136 return recdb->db_name;
139 static const char *recdb_path(struct recdb_context *recdb)
141 return recdb->db_path;
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
146 return recdb->db->tdb;
149 static bool recdb_persistent(struct recdb_context *recdb)
151 return recdb->persistent;
154 struct recdb_add_traverse_state {
155 struct recdb_context *recdb;
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160 TDB_DATA key, TDB_DATA data,
163 struct recdb_add_traverse_state *state =
164 (struct recdb_add_traverse_state *)private_data;
165 struct ctdb_ltdb_header *hdr;
169 /* header is not marshalled separately in the pulldb control */
170 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
174 hdr = (struct ctdb_ltdb_header *)data.dptr;
176 /* fetch the existing record, if any */
177 prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
179 if (prev_data.dptr != NULL) {
180 struct ctdb_ltdb_header prev_hdr;
182 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183 free(prev_data.dptr);
184 if (hdr->rsn < prev_hdr.rsn ||
185 (hdr->rsn == prev_hdr.rsn &&
186 prev_hdr.dmaster != state->mypnn)) {
191 ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199 struct ctdb_rec_buffer *recbuf)
201 struct recdb_add_traverse_state state;
207 ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217 uint32_t reqid, uint32_t dmaster,
218 TDB_DATA key, TDB_DATA data)
220 struct ctdb_ltdb_header *header;
223 /* Skip empty records */
224 if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
228 /* update the dmaster field to point to us */
229 header = (struct ctdb_ltdb_header *)data.dptr;
231 header->dmaster = dmaster;
232 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
235 ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
243 struct recdb_records_traverse_state {
244 struct ctdb_rec_buffer *recbuf;
251 static int recdb_records_traverse(struct tdb_context *tdb,
252 TDB_DATA key, TDB_DATA data,
255 struct recdb_records_traverse_state *state =
256 (struct recdb_records_traverse_state *)private_data;
259 ret = recbuf_filter_add(state->recbuf, state->persistent,
260 state->reqid, state->dmaster, key, data);
262 state->failed = true;
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
273 struct recdb_records_traverse_state state;
276 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277 if (state.recbuf == NULL) {
280 state.dmaster = dmaster;
282 state.persistent = recdb_persistent(recdb);
283 state.failed = false;
285 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
287 if (ret == -1 || state.failed) {
288 D_ERR("Failed to marshall recovery records for %s\n",
290 TALLOC_FREE(state.recbuf);
297 struct recdb_file_traverse_state {
298 struct ctdb_rec_buffer *recbuf;
299 struct recdb_context *recdb;
310 static int recdb_file_traverse(struct tdb_context *tdb,
311 TDB_DATA key, TDB_DATA data,
314 struct recdb_file_traverse_state *state =
315 (struct recdb_file_traverse_state *)private_data;
318 ret = recbuf_filter_add(state->recbuf, state->persistent,
319 state->reqid, state->dmaster, key, data);
321 state->failed = true;
325 if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
328 D_ERR("Failed to collect recovery records for %s\n",
329 recdb_name(state->recdb));
330 state->failed = true;
334 state->num_buffers += 1;
336 TALLOC_FREE(state->recbuf);
337 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338 recdb_id(state->recdb));
339 if (state->recbuf == NULL) {
340 state->failed = true;
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349 uint32_t dmaster, int fd, int max_size)
351 struct recdb_file_traverse_state state;
354 state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355 if (state.recbuf == NULL) {
359 state.mem_ctx = mem_ctx;
360 state.dmaster = dmaster;
362 state.persistent = recdb_persistent(recdb);
363 state.failed = false;
365 state.max_size = max_size;
366 state.num_buffers = 0;
368 ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369 if (ret == -1 || state.failed) {
370 TALLOC_FREE(state.recbuf);
374 ret = ctdb_rec_buffer_write(state.recbuf, fd);
376 D_ERR("Failed to collect recovery records for %s\n",
378 TALLOC_FREE(state.recbuf);
381 state.num_buffers += 1;
383 D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384 state.num_buffers, recdb_name(recdb));
386 return state.num_buffers;
390 * Pull database from a single node
393 struct pull_database_state {
394 struct tevent_context *ev;
395 struct ctdb_client_context *client;
396 struct recdb_context *recdb;
403 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
405 static void pull_database_register_done(struct tevent_req *subreq);
406 static void pull_database_old_done(struct tevent_req *subreq);
407 static void pull_database_unregister_done(struct tevent_req *subreq);
408 static void pull_database_new_done(struct tevent_req *subreq);
410 static struct tevent_req *pull_database_send(
412 struct tevent_context *ev,
413 struct ctdb_client_context *client,
414 uint32_t pnn, uint32_t caps,
415 struct recdb_context *recdb)
417 struct tevent_req *req, *subreq;
418 struct pull_database_state *state;
419 struct ctdb_req_control request;
421 req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
427 state->client = client;
428 state->recdb = recdb;
430 state->srvid = srvid_next();
432 if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
433 subreq = ctdb_client_set_message_handler_send(
434 state, state->ev, state->client,
435 state->srvid, pull_database_handler,
437 if (tevent_req_nomem(subreq, req)) {
438 return tevent_req_post(req, ev);
441 tevent_req_set_callback(subreq, pull_database_register_done,
445 struct ctdb_pulldb pulldb;
447 pulldb.db_id = recdb_id(recdb);
448 pulldb.lmaster = CTDB_LMASTER_ANY;
450 ctdb_req_control_pull_db(&request, &pulldb);
451 subreq = ctdb_client_control_send(state, state->ev,
455 if (tevent_req_nomem(subreq, req)) {
456 return tevent_req_post(req, ev);
458 tevent_req_set_callback(subreq, pull_database_old_done, req);
464 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
467 struct tevent_req *req = talloc_get_type_abort(
468 private_data, struct tevent_req);
469 struct pull_database_state *state = tevent_req_data(
470 req, struct pull_database_state);
471 struct ctdb_rec_buffer *recbuf;
476 if (srvid != state->srvid) {
480 ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
482 D_ERR("Invalid data received for DB_PULL messages\n");
486 if (recbuf->db_id != recdb_id(state->recdb)) {
488 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
489 recbuf->db_id, recdb_name(state->recdb));
493 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
497 D_ERR("Failed to add records to recdb for %s\n",
498 recdb_name(state->recdb));
502 state->num_records += recbuf->count;
506 static void pull_database_register_done(struct tevent_req *subreq)
508 struct tevent_req *req = tevent_req_callback_data(
509 subreq, struct tevent_req);
510 struct pull_database_state *state = tevent_req_data(
511 req, struct pull_database_state);
512 struct ctdb_req_control request;
513 struct ctdb_pulldb_ext pulldb_ext;
517 status = ctdb_client_set_message_handler_recv(subreq, &ret);
520 D_ERR("Failed to set message handler for DB_PULL for %s\n",
521 recdb_name(state->recdb));
522 tevent_req_error(req, ret);
526 pulldb_ext.db_id = recdb_id(state->recdb);
527 pulldb_ext.lmaster = CTDB_LMASTER_ANY;
528 pulldb_ext.srvid = state->srvid;
530 ctdb_req_control_db_pull(&request, &pulldb_ext);
531 subreq = ctdb_client_control_send(state, state->ev, state->client,
532 state->pnn, TIMEOUT(), &request);
533 if (tevent_req_nomem(subreq, req)) {
536 tevent_req_set_callback(subreq, pull_database_new_done, req);
539 static void pull_database_old_done(struct tevent_req *subreq)
541 struct tevent_req *req = tevent_req_callback_data(
542 subreq, struct tevent_req);
543 struct pull_database_state *state = tevent_req_data(
544 req, struct pull_database_state);
545 struct ctdb_reply_control *reply;
546 struct ctdb_rec_buffer *recbuf;
550 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
553 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
554 recdb_name(state->recdb), state->pnn, ret);
555 tevent_req_error(req, ret);
559 ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
562 tevent_req_error(req, ret);
566 status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
570 tevent_req_error(req, EIO);
574 state->num_records = recbuf->count;
577 D_INFO("Pulled %d records for db %s from node %d\n",
578 state->num_records, recdb_name(state->recdb), state->pnn);
580 tevent_req_done(req);
583 static void pull_database_new_done(struct tevent_req *subreq)
585 struct tevent_req *req = tevent_req_callback_data(
586 subreq, struct tevent_req);
587 struct pull_database_state *state = tevent_req_data(
588 req, struct pull_database_state);
589 struct ctdb_reply_control *reply;
590 uint32_t num_records;
594 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
597 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
598 recdb_name(state->recdb), state->pnn, ret);
603 ret = ctdb_reply_control_db_pull(reply, &num_records);
605 if (num_records != state->num_records) {
606 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
607 num_records, state->num_records,
608 recdb_name(state->recdb));
613 D_INFO("Pulled %d records for db %s from node %d\n",
614 state->num_records, recdb_name(state->recdb), state->pnn);
618 subreq = ctdb_client_remove_message_handler_send(
619 state, state->ev, state->client,
621 if (tevent_req_nomem(subreq, req)) {
624 tevent_req_set_callback(subreq, pull_database_unregister_done, req);
627 static void pull_database_unregister_done(struct tevent_req *subreq)
629 struct tevent_req *req = tevent_req_callback_data(
630 subreq, struct tevent_req);
631 struct pull_database_state *state = tevent_req_data(
632 req, struct pull_database_state);
636 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
639 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
640 recdb_name(state->recdb));
641 tevent_req_error(req, ret);
645 if (state->result != 0) {
646 tevent_req_error(req, state->result);
650 tevent_req_done(req);
653 static bool pull_database_recv(struct tevent_req *req, int *perr)
655 return generic_recv(req, perr);
659 * Push database to specified nodes (old style)
662 struct push_database_old_state {
663 struct tevent_context *ev;
664 struct ctdb_client_context *client;
665 struct recdb_context *recdb;
668 struct ctdb_rec_buffer *recbuf;
672 static void push_database_old_push_done(struct tevent_req *subreq);
674 static struct tevent_req *push_database_old_send(
676 struct tevent_context *ev,
677 struct ctdb_client_context *client,
678 uint32_t *pnn_list, int count,
679 struct recdb_context *recdb)
681 struct tevent_req *req, *subreq;
682 struct push_database_old_state *state;
683 struct ctdb_req_control request;
686 req = tevent_req_create(mem_ctx, &state,
687 struct push_database_old_state);
693 state->client = client;
694 state->recdb = recdb;
695 state->pnn_list = pnn_list;
696 state->count = count;
699 state->recbuf = recdb_records(recdb, state,
700 ctdb_client_pnn(client));
701 if (tevent_req_nomem(state->recbuf, req)) {
702 return tevent_req_post(req, ev);
705 pnn = state->pnn_list[state->index];
707 ctdb_req_control_push_db(&request, state->recbuf);
708 subreq = ctdb_client_control_send(state, ev, client, pnn,
709 TIMEOUT(), &request);
710 if (tevent_req_nomem(subreq, req)) {
711 return tevent_req_post(req, ev);
713 tevent_req_set_callback(subreq, push_database_old_push_done, req);
718 static void push_database_old_push_done(struct tevent_req *subreq)
720 struct tevent_req *req = tevent_req_callback_data(
721 subreq, struct tevent_req);
722 struct push_database_old_state *state = tevent_req_data(
723 req, struct push_database_old_state);
724 struct ctdb_req_control request;
729 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
732 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
733 recdb_name(state->recdb), state->pnn_list[state->index],
735 tevent_req_error(req, ret);
740 if (state->index == state->count) {
741 TALLOC_FREE(state->recbuf);
742 tevent_req_done(req);
746 pnn = state->pnn_list[state->index];
748 ctdb_req_control_push_db(&request, state->recbuf);
749 subreq = ctdb_client_control_send(state, state->ev, state->client,
750 pnn, TIMEOUT(), &request);
751 if (tevent_req_nomem(subreq, req)) {
754 tevent_req_set_callback(subreq, push_database_old_push_done, req);
757 static bool push_database_old_recv(struct tevent_req *req, int *perr)
759 return generic_recv(req, perr);
763 * Push database to specified nodes (new style)
766 struct push_database_new_state {
767 struct tevent_context *ev;
768 struct ctdb_client_context *client;
769 struct recdb_context *recdb;
776 int num_buffers_sent;
780 static void push_database_new_started(struct tevent_req *subreq);
781 static void push_database_new_send_msg(struct tevent_req *req);
782 static void push_database_new_send_done(struct tevent_req *subreq);
783 static void push_database_new_confirmed(struct tevent_req *subreq);
785 static struct tevent_req *push_database_new_send(
787 struct tevent_context *ev,
788 struct ctdb_client_context *client,
789 uint32_t *pnn_list, int count,
790 struct recdb_context *recdb,
793 struct tevent_req *req, *subreq;
794 struct push_database_new_state *state;
795 struct ctdb_req_control request;
796 struct ctdb_pulldb_ext pulldb_ext;
800 req = tevent_req_create(mem_ctx, &state,
801 struct push_database_new_state);
807 state->client = client;
808 state->recdb = recdb;
809 state->pnn_list = pnn_list;
810 state->count = count;
812 state->srvid = srvid_next();
813 state->dmaster = ctdb_client_pnn(client);
814 state->num_buffers_sent = 0;
815 state->num_records = 0;
817 filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
818 if (tevent_req_nomem(filename, req)) {
819 return tevent_req_post(req, ev);
822 state->fd = open(filename, O_RDWR|O_CREAT, 0644);
823 if (state->fd == -1) {
824 tevent_req_error(req, errno);
825 return tevent_req_post(req, ev);
828 talloc_free(filename);
830 state->num_buffers = recdb_file(recdb, state, state->dmaster,
831 state->fd, max_size);
832 if (state->num_buffers == -1) {
833 tevent_req_error(req, ENOMEM);
834 return tevent_req_post(req, ev);
837 offset = lseek(state->fd, 0, SEEK_SET);
839 tevent_req_error(req, EIO);
840 return tevent_req_post(req, ev);
843 pulldb_ext.db_id = recdb_id(recdb);
844 pulldb_ext.srvid = state->srvid;
846 ctdb_req_control_db_push_start(&request, &pulldb_ext);
847 subreq = ctdb_client_control_multi_send(state, ev, client,
849 TIMEOUT(), &request);
850 if (tevent_req_nomem(subreq, req)) {
851 return tevent_req_post(req, ev);
853 tevent_req_set_callback(subreq, push_database_new_started, req);
858 static void push_database_new_started(struct tevent_req *subreq)
860 struct tevent_req *req = tevent_req_callback_data(
861 subreq, struct tevent_req);
862 struct push_database_new_state *state = tevent_req_data(
863 req, struct push_database_new_state);
868 status = ctdb_client_control_multi_recv(subreq, &ret, state,
875 ret2 = ctdb_client_control_multi_error(state->pnn_list,
879 D_ERR("control DB_PUSH_START failed for db %s"
880 " on node %u, ret=%d\n",
881 recdb_name(state->recdb), pnn, ret2);
883 D_ERR("control DB_PUSH_START failed for db %s,"
885 recdb_name(state->recdb), ret);
887 talloc_free(err_list);
889 tevent_req_error(req, ret);
893 push_database_new_send_msg(req);
896 static void push_database_new_send_msg(struct tevent_req *req)
898 struct push_database_new_state *state = tevent_req_data(
899 req, struct push_database_new_state);
900 struct tevent_req *subreq;
901 struct ctdb_rec_buffer *recbuf;
902 struct ctdb_req_message message;
907 if (state->num_buffers_sent == state->num_buffers) {
908 struct ctdb_req_control request;
910 ctdb_req_control_db_push_confirm(&request,
911 recdb_id(state->recdb));
912 subreq = ctdb_client_control_multi_send(state, state->ev,
916 TIMEOUT(), &request);
917 if (tevent_req_nomem(subreq, req)) {
920 tevent_req_set_callback(subreq, push_database_new_confirmed,
925 ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
927 tevent_req_error(req, ret);
931 data.dsize = ctdb_rec_buffer_len(recbuf);
932 data.dptr = talloc_size(state, data.dsize);
933 if (tevent_req_nomem(data.dptr, req)) {
937 ctdb_rec_buffer_push(recbuf, data.dptr, &np);
939 message.srvid = state->srvid;
940 message.data.data = data;
942 D_DEBUG("Pushing buffer %d with %d records for db %s\n",
943 state->num_buffers_sent, recbuf->count,
944 recdb_name(state->recdb));
946 subreq = ctdb_client_message_multi_send(state, state->ev,
948 state->pnn_list, state->count,
950 if (tevent_req_nomem(subreq, req)) {
953 tevent_req_set_callback(subreq, push_database_new_send_done, req);
955 state->num_records += recbuf->count;
957 talloc_free(data.dptr);
961 static void push_database_new_send_done(struct tevent_req *subreq)
963 struct tevent_req *req = tevent_req_callback_data(
964 subreq, struct tevent_req);
965 struct push_database_new_state *state = tevent_req_data(
966 req, struct push_database_new_state);
970 status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
973 D_ERR("Sending recovery records failed for %s\n",
974 recdb_name(state->recdb));
975 tevent_req_error(req, ret);
979 state->num_buffers_sent += 1;
981 push_database_new_send_msg(req);
984 static void push_database_new_confirmed(struct tevent_req *subreq)
986 struct tevent_req *req = tevent_req_callback_data(
987 subreq, struct tevent_req);
988 struct push_database_new_state *state = tevent_req_data(
989 req, struct push_database_new_state);
990 struct ctdb_reply_control **reply;
994 uint32_t num_records;
996 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1003 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1004 state->count, err_list,
1007 D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1008 " on node %u, ret=%d\n",
1009 recdb_name(state->recdb), pnn, ret2);
1011 D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1013 recdb_name(state->recdb), ret);
1015 tevent_req_error(req, ret);
1019 for (i=0; i<state->count; i++) {
1020 ret = ctdb_reply_control_db_push_confirm(reply[i],
1023 tevent_req_error(req, EPROTO);
1027 if (num_records != state->num_records) {
1028 D_ERR("Node %u received %d of %d records for %s\n",
1029 state->pnn_list[i], num_records,
1030 state->num_records, recdb_name(state->recdb));
1031 tevent_req_error(req, EPROTO);
1038 D_INFO("Pushed %d records for db %s\n",
1039 state->num_records, recdb_name(state->recdb));
1041 tevent_req_done(req);
1044 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1046 return generic_recv(req, perr);
1050 * wrapper for push_database_old and push_database_new
1053 struct push_database_state {
1054 bool old_done, new_done;
1057 static void push_database_old_done(struct tevent_req *subreq);
1058 static void push_database_new_done(struct tevent_req *subreq);
1060 static struct tevent_req *push_database_send(
1061 TALLOC_CTX *mem_ctx,
1062 struct tevent_context *ev,
1063 struct ctdb_client_context *client,
1064 uint32_t *pnn_list, int count, uint32_t *caps,
1065 struct ctdb_tunable_list *tun_list,
1066 struct recdb_context *recdb)
1068 struct tevent_req *req, *subreq;
1069 struct push_database_state *state;
1070 uint32_t *old_list, *new_list;
1071 unsigned int old_count, new_count;
1074 req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1079 state->old_done = false;
1080 state->new_done = false;
1084 old_list = talloc_array(state, uint32_t, count);
1085 new_list = talloc_array(state, uint32_t, count);
1086 if (tevent_req_nomem(old_list, req) ||
1087 tevent_req_nomem(new_list,req)) {
1088 return tevent_req_post(req, ev);
1091 for (i=0; i<count; i++) {
1092 uint32_t pnn = pnn_list[i];
1094 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1095 new_list[new_count] = pnn;
1098 old_list[old_count] = pnn;
1103 if (old_count > 0) {
1104 subreq = push_database_old_send(state, ev, client,
1105 old_list, old_count, recdb);
1106 if (tevent_req_nomem(subreq, req)) {
1107 return tevent_req_post(req, ev);
1109 tevent_req_set_callback(subreq, push_database_old_done, req);
1111 state->old_done = true;
1114 if (new_count > 0) {
1115 subreq = push_database_new_send(state, ev, client,
1116 new_list, new_count, recdb,
1117 tun_list->rec_buffer_size_limit);
1118 if (tevent_req_nomem(subreq, req)) {
1119 return tevent_req_post(req, ev);
1121 tevent_req_set_callback(subreq, push_database_new_done, req);
1123 state->new_done = true;
1129 static void push_database_old_done(struct tevent_req *subreq)
1131 struct tevent_req *req = tevent_req_callback_data(
1132 subreq, struct tevent_req);
1133 struct push_database_state *state = tevent_req_data(
1134 req, struct push_database_state);
1138 status = push_database_old_recv(subreq, &ret);
1140 tevent_req_error(req, ret);
1144 state->old_done = true;
1146 if (state->old_done && state->new_done) {
1147 tevent_req_done(req);
1151 static void push_database_new_done(struct tevent_req *subreq)
1153 struct tevent_req *req = tevent_req_callback_data(
1154 subreq, struct tevent_req);
1155 struct push_database_state *state = tevent_req_data(
1156 req, struct push_database_state);
1160 status = push_database_new_recv(subreq, &ret);
1162 tevent_req_error(req, ret);
1166 state->new_done = true;
1168 if (state->old_done && state->new_done) {
1169 tevent_req_done(req);
1173 static bool push_database_recv(struct tevent_req *req, int *perr)
1175 return generic_recv(req, perr);
1179 * Collect databases using highest sequence number
1182 struct collect_highseqnum_db_state {
1183 struct tevent_context *ev;
1184 struct ctdb_client_context *client;
1188 uint32_t *ban_credits;
1190 struct recdb_context *recdb;
1194 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1195 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1197 static struct tevent_req *collect_highseqnum_db_send(
1198 TALLOC_CTX *mem_ctx,
1199 struct tevent_context *ev,
1200 struct ctdb_client_context *client,
1201 uint32_t *pnn_list, int count, uint32_t *caps,
1202 uint32_t *ban_credits, uint32_t db_id,
1203 struct recdb_context *recdb)
1205 struct tevent_req *req, *subreq;
1206 struct collect_highseqnum_db_state *state;
1207 struct ctdb_req_control request;
1209 req = tevent_req_create(mem_ctx, &state,
1210 struct collect_highseqnum_db_state);
1216 state->client = client;
1217 state->pnn_list = pnn_list;
1218 state->count = count;
1220 state->ban_credits = ban_credits;
1221 state->db_id = db_id;
1222 state->recdb = recdb;
1224 ctdb_req_control_get_db_seqnum(&request, db_id);
1225 subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1226 state->pnn_list, state->count,
1227 TIMEOUT(), &request);
1228 if (tevent_req_nomem(subreq, req)) {
1229 return tevent_req_post(req, ev);
1231 tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1237 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1239 struct tevent_req *req = tevent_req_callback_data(
1240 subreq, struct tevent_req);
1241 struct collect_highseqnum_db_state *state = tevent_req_data(
1242 req, struct collect_highseqnum_db_state);
1243 struct ctdb_reply_control **reply;
1247 uint64_t seqnum, max_seqnum;
1249 status = ctdb_client_control_multi_recv(subreq, &ret, state,
1251 TALLOC_FREE(subreq);
1256 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1257 state->count, err_list,
1260 D_ERR("control GET_DB_SEQNUM failed for db %s"
1261 " on node %u, ret=%d\n",
1262 recdb_name(state->recdb), pnn, ret2);
1264 D_ERR("control GET_DB_SEQNUM failed for db %s,"
1266 recdb_name(state->recdb), ret);
1268 tevent_req_error(req, ret);
1273 state->max_pnn = state->pnn_list[0];
1274 for (i=0; i<state->count; i++) {
1275 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1277 tevent_req_error(req, EPROTO);
1281 if (max_seqnum < seqnum) {
1282 max_seqnum = seqnum;
1283 state->max_pnn = state->pnn_list[i];
1289 D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1290 recdb_name(state->recdb), state->max_pnn, max_seqnum);
1292 subreq = pull_database_send(state, state->ev, state->client,
1294 state->caps[state->max_pnn],
1296 if (tevent_req_nomem(subreq, req)) {
1299 tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1303 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1305 struct tevent_req *req = tevent_req_callback_data(
1306 subreq, struct tevent_req);
1307 struct collect_highseqnum_db_state *state = tevent_req_data(
1308 req, struct collect_highseqnum_db_state);
1312 status = pull_database_recv(subreq, &ret);
1313 TALLOC_FREE(subreq);
1315 state->ban_credits[state->max_pnn] += 1;
1316 tevent_req_error(req, ret);
1320 tevent_req_done(req);
1323 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1325 return generic_recv(req, perr);
1329 * Collect all databases
1332 struct collect_all_db_state {
1333 struct tevent_context *ev;
1334 struct ctdb_client_context *client;
1338 uint32_t *ban_credits;
1340 struct recdb_context *recdb;
1341 struct ctdb_pulldb pulldb;
1345 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1347 static struct tevent_req *collect_all_db_send(
1348 TALLOC_CTX *mem_ctx,
1349 struct tevent_context *ev,
1350 struct ctdb_client_context *client,
1351 uint32_t *pnn_list, int count, uint32_t *caps,
1352 uint32_t *ban_credits, uint32_t db_id,
1353 struct recdb_context *recdb)
1355 struct tevent_req *req, *subreq;
1356 struct collect_all_db_state *state;
1359 req = tevent_req_create(mem_ctx, &state,
1360 struct collect_all_db_state);
1366 state->client = client;
1367 state->pnn_list = pnn_list;
1368 state->count = count;
1370 state->ban_credits = ban_credits;
1371 state->db_id = db_id;
1372 state->recdb = recdb;
1375 pnn = state->pnn_list[state->index];
1377 subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1378 if (tevent_req_nomem(subreq, req)) {
1379 return tevent_req_post(req, ev);
1381 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1386 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1388 struct tevent_req *req = tevent_req_callback_data(
1389 subreq, struct tevent_req);
1390 struct collect_all_db_state *state = tevent_req_data(
1391 req, struct collect_all_db_state);
1396 status = pull_database_recv(subreq, &ret);
1397 TALLOC_FREE(subreq);
1399 pnn = state->pnn_list[state->index];
1400 state->ban_credits[pnn] += 1;
1401 tevent_req_error(req, ret);
1406 if (state->index == state->count) {
1407 tevent_req_done(req);
1411 pnn = state->pnn_list[state->index];
1412 subreq = pull_database_send(state, state->ev, state->client,
1413 pnn, state->caps[pnn], state->recdb);
1414 if (tevent_req_nomem(subreq, req)) {
1417 tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1420 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1422 return generic_recv(req, perr);
1427 * For each database do the following:
1430 * - Freeze database on all nodes
1431 * - Start transaction on all nodes
1432 * - Collect database from all nodes
1433 * - Wipe database on all nodes
1434 * - Push database to all nodes
1435 * - Commit transaction on all nodes
1436 * - Thaw database on all nodes
1439 struct recover_db_state {
1440 struct tevent_context *ev;
1441 struct ctdb_client_context *client;
1442 struct ctdb_tunable_list *tun_list;
1446 uint32_t *ban_credits;
1451 struct ctdb_transdb transdb;
1453 const char *db_name, *db_path;
1454 struct recdb_context *recdb;
1457 static void recover_db_name_done(struct tevent_req *subreq);
1458 static void recover_db_path_done(struct tevent_req *subreq);
1459 static void recover_db_freeze_done(struct tevent_req *subreq);
1460 static void recover_db_transaction_started(struct tevent_req *subreq);
1461 static void recover_db_collect_done(struct tevent_req *subreq);
1462 static void recover_db_wipedb_done(struct tevent_req *subreq);
1463 static void recover_db_pushdb_done(struct tevent_req *subreq);
1464 static void recover_db_transaction_committed(struct tevent_req *subreq);
1465 static void recover_db_thaw_done(struct tevent_req *subreq);
1467 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1468 struct tevent_context *ev,
1469 struct ctdb_client_context *client,
1470 struct ctdb_tunable_list *tun_list,
1471 uint32_t *pnn_list, int count,
1473 uint32_t *ban_credits,
1474 uint32_t generation,
1475 uint32_t db_id, uint8_t db_flags)
1477 struct tevent_req *req, *subreq;
1478 struct recover_db_state *state;
1479 struct ctdb_req_control request;
1481 req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1487 state->client = client;
1488 state->tun_list = tun_list;
1489 state->pnn_list = pnn_list;
1490 state->count = count;
1492 state->ban_credits = ban_credits;
1493 state->db_id = db_id;
1494 state->db_flags = db_flags;
1496 state->destnode = ctdb_client_pnn(client);
1497 state->transdb.db_id = db_id;
1498 state->transdb.tid = generation;
1500 ctdb_req_control_get_dbname(&request, db_id);
1501 subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1502 TIMEOUT(), &request);
1503 if (tevent_req_nomem(subreq, req)) {
1504 return tevent_req_post(req, ev);
1506 tevent_req_set_callback(subreq, recover_db_name_done, req);
1511 static void recover_db_name_done(struct tevent_req *subreq)
1513 struct tevent_req *req = tevent_req_callback_data(
1514 subreq, struct tevent_req);
1515 struct recover_db_state *state = tevent_req_data(
1516 req, struct recover_db_state);
1517 struct ctdb_reply_control *reply;
1518 struct ctdb_req_control request;
1522 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1523 TALLOC_FREE(subreq);
1525 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1527 tevent_req_error(req, ret);
1531 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1533 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1535 tevent_req_error(req, EPROTO);
1541 ctdb_req_control_getdbpath(&request, state->db_id);
1542 subreq = ctdb_client_control_send(state, state->ev, state->client,
1543 state->destnode, TIMEOUT(),
1545 if (tevent_req_nomem(subreq, req)) {
1548 tevent_req_set_callback(subreq, recover_db_path_done, req);
1551 static void recover_db_path_done(struct tevent_req *subreq)
1553 struct tevent_req *req = tevent_req_callback_data(
1554 subreq, struct tevent_req);
1555 struct recover_db_state *state = tevent_req_data(
1556 req, struct recover_db_state);
1557 struct ctdb_reply_control *reply;
1558 struct ctdb_req_control request;
1562 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1563 TALLOC_FREE(subreq);
1565 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566 state->db_name, ret);
1567 tevent_req_error(req, ret);
1571 ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1573 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1574 state->db_name, ret);
1575 tevent_req_error(req, EPROTO);
1581 ctdb_req_control_db_freeze(&request, state->db_id);
1582 subreq = ctdb_client_control_multi_send(state, state->ev,
1584 state->pnn_list, state->count,
1585 TIMEOUT(), &request);
1586 if (tevent_req_nomem(subreq, req)) {
1589 tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1592 static void recover_db_freeze_done(struct tevent_req *subreq)
1594 struct tevent_req *req = tevent_req_callback_data(
1595 subreq, struct tevent_req);
1596 struct recover_db_state *state = tevent_req_data(
1597 req, struct recover_db_state);
1598 struct ctdb_req_control request;
1603 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1605 TALLOC_FREE(subreq);
1610 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1611 state->count, err_list,
1614 D_ERR("control FREEZE_DB failed for db %s"
1615 " on node %u, ret=%d\n",
1616 state->db_name, pnn, ret2);
1617 state->ban_credits[pnn] += 1;
1619 D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1620 state->db_name, ret);
1622 tevent_req_error(req, ret);
1626 ctdb_req_control_db_transaction_start(&request, &state->transdb);
1627 subreq = ctdb_client_control_multi_send(state, state->ev,
1629 state->pnn_list, state->count,
1630 TIMEOUT(), &request);
1631 if (tevent_req_nomem(subreq, req)) {
1634 tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1637 static void recover_db_transaction_started(struct tevent_req *subreq)
1639 struct tevent_req *req = tevent_req_callback_data(
1640 subreq, struct tevent_req);
1641 struct recover_db_state *state = tevent_req_data(
1642 req, struct recover_db_state);
1647 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1649 TALLOC_FREE(subreq);
1654 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1658 D_ERR("control TRANSACTION_DB failed for db=%s"
1659 " on node %u, ret=%d\n",
1660 state->db_name, pnn, ret2);
1662 D_ERR("control TRANSACTION_DB failed for db=%s,"
1663 " ret=%d\n", state->db_name, ret);
1665 tevent_req_error(req, ret);
1669 state->recdb = recdb_create(state, state->db_id, state->db_name,
1671 state->tun_list->database_hash_size,
1672 state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1673 if (tevent_req_nomem(state->recdb, req)) {
1677 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1678 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1679 subreq = collect_highseqnum_db_send(
1680 state, state->ev, state->client,
1681 state->pnn_list, state->count, state->caps,
1682 state->ban_credits, state->db_id,
1685 subreq = collect_all_db_send(
1686 state, state->ev, state->client,
1687 state->pnn_list, state->count, state->caps,
1688 state->ban_credits, state->db_id,
1691 if (tevent_req_nomem(subreq, req)) {
1694 tevent_req_set_callback(subreq, recover_db_collect_done, req);
1697 static void recover_db_collect_done(struct tevent_req *subreq)
1699 struct tevent_req *req = tevent_req_callback_data(
1700 subreq, struct tevent_req);
1701 struct recover_db_state *state = tevent_req_data(
1702 req, struct recover_db_state);
1703 struct ctdb_req_control request;
1707 if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1708 (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1709 status = collect_highseqnum_db_recv(subreq, &ret);
1711 status = collect_all_db_recv(subreq, &ret);
1713 TALLOC_FREE(subreq);
1715 tevent_req_error(req, ret);
1719 ctdb_req_control_wipe_database(&request, &state->transdb);
1720 subreq = ctdb_client_control_multi_send(state, state->ev,
1722 state->pnn_list, state->count,
1723 TIMEOUT(), &request);
1724 if (tevent_req_nomem(subreq, req)) {
1727 tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1730 static void recover_db_wipedb_done(struct tevent_req *subreq)
1732 struct tevent_req *req = tevent_req_callback_data(
1733 subreq, struct tevent_req);
1734 struct recover_db_state *state = tevent_req_data(
1735 req, struct recover_db_state);
1740 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1742 TALLOC_FREE(subreq);
1747 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1751 D_ERR("control WIPEDB failed for db %s on node %u,"
1752 " ret=%d\n", state->db_name, pnn, ret2);
1754 D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1755 state->db_name, ret);
1757 tevent_req_error(req, ret);
1761 subreq = push_database_send(state, state->ev, state->client,
1762 state->pnn_list, state->count,
1763 state->caps, state->tun_list,
1765 if (tevent_req_nomem(subreq, req)) {
1768 tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1771 static void recover_db_pushdb_done(struct tevent_req *subreq)
1773 struct tevent_req *req = tevent_req_callback_data(
1774 subreq, struct tevent_req);
1775 struct recover_db_state *state = tevent_req_data(
1776 req, struct recover_db_state);
1777 struct ctdb_req_control request;
1781 status = push_database_recv(subreq, &ret);
1782 TALLOC_FREE(subreq);
1784 tevent_req_error(req, ret);
1788 TALLOC_FREE(state->recdb);
1790 ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1791 subreq = ctdb_client_control_multi_send(state, state->ev,
1793 state->pnn_list, state->count,
1794 TIMEOUT(), &request);
1795 if (tevent_req_nomem(subreq, req)) {
1798 tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1801 static void recover_db_transaction_committed(struct tevent_req *subreq)
1803 struct tevent_req *req = tevent_req_callback_data(
1804 subreq, struct tevent_req);
1805 struct recover_db_state *state = tevent_req_data(
1806 req, struct recover_db_state);
1807 struct ctdb_req_control request;
1812 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1814 TALLOC_FREE(subreq);
1819 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1823 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1824 " on node %u, ret=%d\n",
1825 state->db_name, pnn, ret2);
1827 D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1828 " ret=%d\n", state->db_name, ret);
1830 tevent_req_error(req, ret);
1834 ctdb_req_control_db_thaw(&request, state->db_id);
1835 subreq = ctdb_client_control_multi_send(state, state->ev,
1837 state->pnn_list, state->count,
1838 TIMEOUT(), &request);
1839 if (tevent_req_nomem(subreq, req)) {
1842 tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1845 static void recover_db_thaw_done(struct tevent_req *subreq)
1847 struct tevent_req *req = tevent_req_callback_data(
1848 subreq, struct tevent_req);
1849 struct recover_db_state *state = tevent_req_data(
1850 req, struct recover_db_state);
1855 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1857 TALLOC_FREE(subreq);
1862 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1866 D_ERR("control DB_THAW failed for db %s on node %u,"
1867 " ret=%d\n", state->db_name, pnn, ret2);
1869 D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1870 state->db_name, ret);
1872 tevent_req_error(req, ret);
1876 tevent_req_done(req);
1879 static bool recover_db_recv(struct tevent_req *req)
1881 return generic_recv(req, NULL);
1886 * Start database recovery for each database
1888 * Try to recover each database 5 times before failing recovery.
1891 struct db_recovery_state {
1892 struct tevent_context *ev;
1893 struct ctdb_dbid_map *dbmap;
1898 struct db_recovery_one_state {
1899 struct tevent_req *req;
1900 struct ctdb_client_context *client;
1901 struct ctdb_dbid_map *dbmap;
1902 struct ctdb_tunable_list *tun_list;
1906 uint32_t *ban_credits;
1907 uint32_t generation;
1913 static void db_recovery_one_done(struct tevent_req *subreq);
1915 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1916 struct tevent_context *ev,
1917 struct ctdb_client_context *client,
1918 struct ctdb_dbid_map *dbmap,
1919 struct ctdb_tunable_list *tun_list,
1920 uint32_t *pnn_list, int count,
1922 uint32_t *ban_credits,
1923 uint32_t generation)
1925 struct tevent_req *req, *subreq;
1926 struct db_recovery_state *state;
1929 req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1935 state->dbmap = dbmap;
1936 state->num_replies = 0;
1937 state->num_failed = 0;
1939 if (dbmap->num == 0) {
1940 tevent_req_done(req);
1941 return tevent_req_post(req, ev);
1944 for (i=0; i<dbmap->num; i++) {
1945 struct db_recovery_one_state *substate;
1947 substate = talloc_zero(state, struct db_recovery_one_state);
1948 if (tevent_req_nomem(substate, req)) {
1949 return tevent_req_post(req, ev);
1952 substate->req = req;
1953 substate->client = client;
1954 substate->dbmap = dbmap;
1955 substate->tun_list = tun_list;
1956 substate->pnn_list = pnn_list;
1957 substate->count = count;
1958 substate->caps = caps;
1959 substate->ban_credits = ban_credits;
1960 substate->generation = generation;
1961 substate->db_id = dbmap->dbs[i].db_id;
1962 substate->db_flags = dbmap->dbs[i].flags;
1964 subreq = recover_db_send(state, ev, client, tun_list,
1965 pnn_list, count, caps, ban_credits,
1966 generation, substate->db_id,
1967 substate->db_flags);
1968 if (tevent_req_nomem(subreq, req)) {
1969 return tevent_req_post(req, ev);
1971 tevent_req_set_callback(subreq, db_recovery_one_done,
1973 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1979 static void db_recovery_one_done(struct tevent_req *subreq)
1981 struct db_recovery_one_state *substate = tevent_req_callback_data(
1982 subreq, struct db_recovery_one_state);
1983 struct tevent_req *req = substate->req;
1984 struct db_recovery_state *state = tevent_req_data(
1985 req, struct db_recovery_state);
1988 status = recover_db_recv(subreq);
1989 TALLOC_FREE(subreq);
1992 talloc_free(substate);
1996 substate->num_fails += 1;
1997 if (substate->num_fails < NUM_RETRIES) {
1998 subreq = recover_db_send(state, state->ev, substate->client,
2000 substate->pnn_list, substate->count,
2001 substate->caps, substate->ban_credits,
2002 substate->generation, substate->db_id,
2003 substate->db_flags);
2004 if (tevent_req_nomem(subreq, req)) {
2007 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2008 D_NOTICE("recover database 0x%08x, attempt %d\n",
2009 substate->db_id, substate->num_fails+1);
2014 state->num_failed += 1;
2017 state->num_replies += 1;
2019 if (state->num_replies == state->dbmap->num) {
2020 tevent_req_done(req);
2024 static bool db_recovery_recv(struct tevent_req *req, int *count)
2026 struct db_recovery_state *state = tevent_req_data(
2027 req, struct db_recovery_state);
2030 if (tevent_req_is_unix_error(req, &err)) {
2035 *count = state->num_replies - state->num_failed;
2037 if (state->num_failed > 0) {
2046 * Run the parallel database recovery
2051 * - Get capabilities from all nodes
2053 * - Set RECOVERY_ACTIVE
2054 * - Send START_RECOVERY
2055 * - Update vnnmap on all nodes
2056 * - Run database recovery
2057 * - Set RECOVERY_NORMAL
2058 * - Send END_RECOVERY
2061 struct recovery_state {
2062 struct tevent_context *ev;
2063 struct ctdb_client_context *client;
2064 uint32_t generation;
2068 struct ctdb_node_map *nodemap;
2070 uint32_t *ban_credits;
2071 struct ctdb_tunable_list *tun_list;
2072 struct ctdb_vnn_map *vnnmap;
2073 struct ctdb_dbid_map *dbmap;
2076 static void recovery_tunables_done(struct tevent_req *subreq);
2077 static void recovery_nodemap_done(struct tevent_req *subreq);
2078 static void recovery_vnnmap_done(struct tevent_req *subreq);
2079 static void recovery_capabilities_done(struct tevent_req *subreq);
2080 static void recovery_dbmap_done(struct tevent_req *subreq);
2081 static void recovery_active_done(struct tevent_req *subreq);
2082 static void recovery_start_recovery_done(struct tevent_req *subreq);
2083 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2084 static void recovery_db_recovery_done(struct tevent_req *subreq);
2085 static void recovery_failed_done(struct tevent_req *subreq);
2086 static void recovery_normal_done(struct tevent_req *subreq);
2087 static void recovery_end_recovery_done(struct tevent_req *subreq);
2089 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2090 struct tevent_context *ev,
2091 struct ctdb_client_context *client,
2092 uint32_t generation)
2094 struct tevent_req *req, *subreq;
2095 struct recovery_state *state;
2096 struct ctdb_req_control request;
2098 req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2104 state->client = client;
2105 state->generation = generation;
2106 state->destnode = ctdb_client_pnn(client);
2108 ctdb_req_control_get_all_tunables(&request);
2109 subreq = ctdb_client_control_send(state, state->ev, state->client,
2110 state->destnode, TIMEOUT(),
2112 if (tevent_req_nomem(subreq, req)) {
2113 return tevent_req_post(req, ev);
2115 tevent_req_set_callback(subreq, recovery_tunables_done, req);
2120 static void recovery_tunables_done(struct tevent_req *subreq)
2122 struct tevent_req *req = tevent_req_callback_data(
2123 subreq, struct tevent_req);
2124 struct recovery_state *state = tevent_req_data(
2125 req, struct recovery_state);
2126 struct ctdb_reply_control *reply;
2127 struct ctdb_req_control request;
2131 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2132 TALLOC_FREE(subreq);
2134 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2135 tevent_req_error(req, ret);
2139 ret = ctdb_reply_control_get_all_tunables(reply, state,
2142 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2143 tevent_req_error(req, EPROTO);
2149 recover_timeout = state->tun_list->recover_timeout;
2151 ctdb_req_control_get_nodemap(&request);
2152 subreq = ctdb_client_control_send(state, state->ev, state->client,
2153 state->destnode, TIMEOUT(),
2155 if (tevent_req_nomem(subreq, req)) {
2158 tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2161 static void recovery_nodemap_done(struct tevent_req *subreq)
2163 struct tevent_req *req = tevent_req_callback_data(
2164 subreq, struct tevent_req);
2165 struct recovery_state *state = tevent_req_data(
2166 req, struct recovery_state);
2167 struct ctdb_reply_control *reply;
2168 struct ctdb_req_control request;
2172 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2173 TALLOC_FREE(subreq);
2175 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2176 state->destnode, ret);
2177 tevent_req_error(req, ret);
2181 ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2183 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2184 tevent_req_error(req, ret);
2188 state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2189 state, &state->pnn_list);
2190 if (state->count <= 0) {
2191 tevent_req_error(req, ENOMEM);
2195 state->ban_credits = talloc_zero_array(state, uint32_t,
2196 state->nodemap->num);
2197 if (tevent_req_nomem(state->ban_credits, req)) {
2201 ctdb_req_control_getvnnmap(&request);
2202 subreq = ctdb_client_control_send(state, state->ev, state->client,
2203 state->destnode, TIMEOUT(),
2205 if (tevent_req_nomem(subreq, req)) {
2208 tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2211 static void recovery_vnnmap_done(struct tevent_req *subreq)
2213 struct tevent_req *req = tevent_req_callback_data(
2214 subreq, struct tevent_req);
2215 struct recovery_state *state = tevent_req_data(
2216 req, struct recovery_state);
2217 struct ctdb_reply_control *reply;
2218 struct ctdb_req_control request;
2222 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2223 TALLOC_FREE(subreq);
2225 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2226 state->destnode, ret);
2227 tevent_req_error(req, ret);
2231 ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2233 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2234 tevent_req_error(req, ret);
2238 ctdb_req_control_get_capabilities(&request);
2239 subreq = ctdb_client_control_multi_send(state, state->ev,
2241 state->pnn_list, state->count,
2242 TIMEOUT(), &request);
2243 if (tevent_req_nomem(subreq, req)) {
2246 tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2249 static void recovery_capabilities_done(struct tevent_req *subreq)
2251 struct tevent_req *req = tevent_req_callback_data(
2252 subreq, struct tevent_req);
2253 struct recovery_state *state = tevent_req_data(
2254 req, struct recovery_state);
2255 struct ctdb_reply_control **reply;
2256 struct ctdb_req_control request;
2261 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2263 TALLOC_FREE(subreq);
2268 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2272 D_ERR("control GET_CAPABILITIES failed on node %u,"
2273 " ret=%d\n", pnn, ret2);
2275 D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2278 tevent_req_error(req, ret);
2282 /* Make the array size same as nodemap */
2283 state->caps = talloc_zero_array(state, uint32_t,
2284 state->nodemap->num);
2285 if (tevent_req_nomem(state->caps, req)) {
2289 for (i=0; i<state->count; i++) {
2292 pnn = state->pnn_list[i];
2293 ret = ctdb_reply_control_get_capabilities(reply[i],
2296 D_ERR("control GET_CAPABILITIES failed on node %u\n",
2298 tevent_req_error(req, EPROTO);
2305 ctdb_req_control_get_dbmap(&request);
2306 subreq = ctdb_client_control_send(state, state->ev, state->client,
2307 state->destnode, TIMEOUT(),
2309 if (tevent_req_nomem(subreq, req)) {
2312 tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2315 static void recovery_dbmap_done(struct tevent_req *subreq)
2317 struct tevent_req *req = tevent_req_callback_data(
2318 subreq, struct tevent_req);
2319 struct recovery_state *state = tevent_req_data(
2320 req, struct recovery_state);
2321 struct ctdb_reply_control *reply;
2322 struct ctdb_req_control request;
2326 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2327 TALLOC_FREE(subreq);
2329 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2330 state->destnode, ret);
2331 tevent_req_error(req, ret);
2335 ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2337 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2338 tevent_req_error(req, ret);
2342 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2343 subreq = ctdb_client_control_multi_send(state, state->ev,
2345 state->pnn_list, state->count,
2346 TIMEOUT(), &request);
2347 if (tevent_req_nomem(subreq, req)) {
2350 tevent_req_set_callback(subreq, recovery_active_done, req);
2353 static void recovery_active_done(struct tevent_req *subreq)
2355 struct tevent_req *req = tevent_req_callback_data(
2356 subreq, struct tevent_req);
2357 struct recovery_state *state = tevent_req_data(
2358 req, struct recovery_state);
2359 struct ctdb_req_control request;
2360 struct ctdb_vnn_map *vnnmap;
2366 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2368 TALLOC_FREE(subreq);
2373 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2377 D_ERR("failed to set recovery mode ACTIVE on node %u,"
2378 " ret=%d\n", pnn, ret2);
2380 D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2383 tevent_req_error(req, ret);
2387 D_ERR("Set recovery mode to ACTIVE\n");
2389 /* Calculate new VNNMAP */
2391 for (i=0; i<state->nodemap->num; i++) {
2392 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2395 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2402 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2405 vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2406 if (tevent_req_nomem(vnnmap, req)) {
2410 vnnmap->size = (count == 0 ? 1 : count);
2411 vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2412 if (tevent_req_nomem(vnnmap->map, req)) {
2417 vnnmap->map[0] = state->destnode;
2420 for (i=0; i<state->nodemap->num; i++) {
2421 if (state->nodemap->node[i].flags &
2422 NODE_FLAGS_INACTIVE) {
2425 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2429 vnnmap->map[count] = state->nodemap->node[i].pnn;
2434 vnnmap->generation = state->generation;
2436 talloc_free(state->vnnmap);
2437 state->vnnmap = vnnmap;
2439 ctdb_req_control_start_recovery(&request);
2440 subreq = ctdb_client_control_multi_send(state, state->ev,
2442 state->pnn_list, state->count,
2443 TIMEOUT(), &request);
2444 if (tevent_req_nomem(subreq, req)) {
2447 tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2450 static void recovery_start_recovery_done(struct tevent_req *subreq)
2452 struct tevent_req *req = tevent_req_callback_data(
2453 subreq, struct tevent_req);
2454 struct recovery_state *state = tevent_req_data(
2455 req, struct recovery_state);
2456 struct ctdb_req_control request;
2461 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2463 TALLOC_FREE(subreq);
2468 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2472 D_ERR("failed to run start_recovery event on node %u,"
2473 " ret=%d\n", pnn, ret2);
2475 D_ERR("failed to run start_recovery event, ret=%d\n",
2478 tevent_req_error(req, ret);
2482 D_ERR("start_recovery event finished\n");
2484 ctdb_req_control_setvnnmap(&request, state->vnnmap);
2485 subreq = ctdb_client_control_multi_send(state, state->ev,
2487 state->pnn_list, state->count,
2488 TIMEOUT(), &request);
2489 if (tevent_req_nomem(subreq, req)) {
2492 tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2495 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2497 struct tevent_req *req = tevent_req_callback_data(
2498 subreq, struct tevent_req);
2499 struct recovery_state *state = tevent_req_data(
2500 req, struct recovery_state);
2505 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2507 TALLOC_FREE(subreq);
2512 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2516 D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2519 D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2521 tevent_req_error(req, ret);
2525 D_NOTICE("updated VNNMAP\n");
2527 subreq = db_recovery_send(state, state->ev, state->client,
2528 state->dbmap, state->tun_list,
2529 state->pnn_list, state->count,
2530 state->caps, state->ban_credits,
2531 state->vnnmap->generation);
2532 if (tevent_req_nomem(subreq, req)) {
2535 tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2538 static void recovery_db_recovery_done(struct tevent_req *subreq)
2540 struct tevent_req *req = tevent_req_callback_data(
2541 subreq, struct tevent_req);
2542 struct recovery_state *state = tevent_req_data(
2543 req, struct recovery_state);
2544 struct ctdb_req_control request;
2548 status = db_recovery_recv(subreq, &count);
2549 TALLOC_FREE(subreq);
2551 D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2554 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2557 /* Bans are not enabled */
2558 if (state->tun_list->enable_bans == 0) {
2559 tevent_req_error(req, EIO);
2563 for (i=0; i<state->count; i++) {
2565 pnn = state->pnn_list[i];
2566 if (state->ban_credits[pnn] > max_credits) {
2568 max_credits = state->ban_credits[pnn];
2572 /* If pulling database fails multiple times */
2573 if (max_credits >= NUM_RETRIES) {
2574 struct ctdb_ban_state ban_state = {
2576 .time = state->tun_list->recovery_ban_period,
2579 D_ERR("Banning node %u for %u seconds\n",
2583 ctdb_req_control_set_ban_state(&request,
2585 subreq = ctdb_client_control_send(state,
2591 if (tevent_req_nomem(subreq, req)) {
2594 tevent_req_set_callback(subreq,
2595 recovery_failed_done,
2598 tevent_req_error(req, EIO);
2603 ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2604 subreq = ctdb_client_control_multi_send(state, state->ev,
2606 state->pnn_list, state->count,
2607 TIMEOUT(), &request);
2608 if (tevent_req_nomem(subreq, req)) {
2611 tevent_req_set_callback(subreq, recovery_normal_done, req);
2614 static void recovery_failed_done(struct tevent_req *subreq)
2616 struct tevent_req *req = tevent_req_callback_data(
2617 subreq, struct tevent_req);
2618 struct recovery_state *state = tevent_req_data(
2619 req, struct recovery_state);
2620 struct ctdb_reply_control *reply;
2624 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2625 TALLOC_FREE(subreq);
2627 D_ERR("failed to ban node, ret=%d\n", ret);
2631 ret = ctdb_reply_control_set_ban_state(reply);
2633 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2637 tevent_req_error(req, EIO);
2640 static void recovery_normal_done(struct tevent_req *subreq)
2642 struct tevent_req *req = tevent_req_callback_data(
2643 subreq, struct tevent_req);
2644 struct recovery_state *state = tevent_req_data(
2645 req, struct recovery_state);
2646 struct ctdb_req_control request;
2651 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2653 TALLOC_FREE(subreq);
2658 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2662 D_ERR("failed to set recovery mode NORMAL on node %u,"
2663 " ret=%d\n", pnn, ret2);
2665 D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2668 tevent_req_error(req, ret);
2672 D_ERR("Set recovery mode to NORMAL\n");
2674 ctdb_req_control_end_recovery(&request);
2675 subreq = ctdb_client_control_multi_send(state, state->ev,
2677 state->pnn_list, state->count,
2678 TIMEOUT(), &request);
2679 if (tevent_req_nomem(subreq, req)) {
2682 tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2685 static void recovery_end_recovery_done(struct tevent_req *subreq)
2687 struct tevent_req *req = tevent_req_callback_data(
2688 subreq, struct tevent_req);
2689 struct recovery_state *state = tevent_req_data(
2690 req, struct recovery_state);
2695 status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2697 TALLOC_FREE(subreq);
2702 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2706 D_ERR("failed to run recovered event on node %u,"
2707 " ret=%d\n", pnn, ret2);
2709 D_ERR("failed to run recovered event, ret=%d\n", ret);
2711 tevent_req_error(req, ret);
2715 D_ERR("recovered event finished\n");
2717 tevent_req_done(req);
2720 static void recovery_recv(struct tevent_req *req, int *perr)
2722 generic_recv(req, perr);
2725 static void usage(const char *progname)
2727 fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2733 * Arguments - log fd, write fd, socket path, generation
2735 int main(int argc, char *argv[])
2738 const char *sockpath;
2739 TALLOC_CTX *mem_ctx;
2740 struct tevent_context *ev;
2741 struct ctdb_client_context *client;
2743 struct tevent_req *req;
2744 uint32_t generation;
2751 write_fd = atoi(argv[1]);
2753 generation = (uint32_t)strtoul(argv[3], NULL, 0);
2755 mem_ctx = talloc_new(NULL);
2756 if (mem_ctx == NULL) {
2757 fprintf(stderr, "recovery: talloc_new() failed\n");
2761 ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2763 fprintf(stderr, "recovery: Unable to initialize logging\n");
2767 ev = tevent_context_init(mem_ctx);
2769 D_ERR("tevent_context_init() failed\n");
2773 ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2775 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2779 req = recovery_send(mem_ctx, ev, client, generation);
2781 D_ERR("database_recover_send() failed\n");
2785 if (! tevent_req_poll(req, ev)) {
2786 D_ERR("tevent_req_poll() failed\n");
2790 recovery_recv(req, &ret);
2793 D_ERR("database recovery failed, ret=%d\n", ret);
2797 sys_write(write_fd, &ret, sizeof(ret));
2801 TALLOC_FREE(mem_ctx);