ctdb-recovery: Ban a node that causes recovery failure
[samba.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/sys_rw.h"
31 #include "lib/util/time.h"
32 #include "lib/util/tevent_unix.h"
33
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client.h"
37
38 #include "common/logging.h"
39
40 static int recover_timeout = 30;
41
42 #define NUM_RETRIES     3
43
44 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
45
46 /*
47  * Utility functions
48  */
49
50 static bool generic_recv(struct tevent_req *req, int *perr)
51 {
52         int err;
53
54         if (tevent_req_is_unix_error(req, &err)) {
55                 if (perr != NULL) {
56                         *perr = err;
57                 }
58                 return false;
59         }
60
61         return true;
62 }
63
64 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
65
66 static uint64_t srvid_next(void)
67 {
68         rec_srvid += 1;
69         return rec_srvid;
70 }
71
72 /*
73  * Recovery database functions
74  */
75
76 struct recdb_context {
77         uint32_t db_id;
78         const char *db_name;
79         const char *db_path;
80         struct tdb_wrap *db;
81         bool persistent;
82 };
83
84 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
85                                           const char *db_name,
86                                           const char *db_path,
87                                           uint32_t hash_size, bool persistent)
88 {
89         static char *db_dir_state = NULL;
90         struct recdb_context *recdb;
91         unsigned int tdb_flags;
92
93         recdb = talloc(mem_ctx, struct recdb_context);
94         if (recdb == NULL) {
95                 return NULL;
96         }
97
98         if (db_dir_state == NULL) {
99                 db_dir_state = getenv("CTDB_DBDIR_STATE");
100         }
101
102         recdb->db_name = db_name;
103         recdb->db_id = db_id;
104         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
105                                          db_dir_state != NULL ?
106                                             db_dir_state :
107                                             dirname(discard_const(db_path)),
108                                          db_name);
109         if (recdb->db_path == NULL) {
110                 talloc_free(recdb);
111                 return NULL;
112         }
113         unlink(recdb->db_path);
114
115         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
116         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
117                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
118         if (recdb->db == NULL) {
119                 talloc_free(recdb);
120                 D_ERR("failed to create recovery db %s\n", recdb->db_path);
121                 return NULL;
122         }
123
124         recdb->persistent = persistent;
125
126         return recdb;
127 }
128
129 static uint32_t recdb_id(struct recdb_context *recdb)
130 {
131         return recdb->db_id;
132 }
133
134 static const char *recdb_name(struct recdb_context *recdb)
135 {
136         return recdb->db_name;
137 }
138
139 static const char *recdb_path(struct recdb_context *recdb)
140 {
141         return recdb->db_path;
142 }
143
144 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
145 {
146         return recdb->db->tdb;
147 }
148
149 static bool recdb_persistent(struct recdb_context *recdb)
150 {
151         return recdb->persistent;
152 }
153
154 struct recdb_add_traverse_state {
155         struct recdb_context *recdb;
156         int mypnn;
157 };
158
159 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
160                               TDB_DATA key, TDB_DATA data,
161                               void *private_data)
162 {
163         struct recdb_add_traverse_state *state =
164                 (struct recdb_add_traverse_state *)private_data;
165         struct ctdb_ltdb_header *hdr;
166         TDB_DATA prev_data;
167         int ret;
168
169         /* header is not marshalled separately in the pulldb control */
170         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
171                 return -1;
172         }
173
174         hdr = (struct ctdb_ltdb_header *)data.dptr;
175
176         /* fetch the existing record, if any */
177         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
178
179         if (prev_data.dptr != NULL) {
180                 struct ctdb_ltdb_header prev_hdr;
181
182                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
183                 free(prev_data.dptr);
184                 if (hdr->rsn < prev_hdr.rsn ||
185                     (hdr->rsn == prev_hdr.rsn &&
186                      prev_hdr.dmaster != state->mypnn)) {
187                         return 0;
188                 }
189         }
190
191         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
192         if (ret != 0) {
193                 return -1;
194         }
195         return 0;
196 }
197
198 static bool recdb_add(struct recdb_context *recdb, int mypnn,
199                       struct ctdb_rec_buffer *recbuf)
200 {
201         struct recdb_add_traverse_state state;
202         int ret;
203
204         state.recdb = recdb;
205         state.mypnn = mypnn;
206
207         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
208         if (ret != 0) {
209                 return false;
210         }
211
212         return true;
213 }
214
215 /* This function decides which records from recdb are retained */
216 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
217                              uint32_t reqid, uint32_t dmaster,
218                              TDB_DATA key, TDB_DATA data)
219 {
220         struct ctdb_ltdb_header *header;
221         int ret;
222
223         /* Skip empty records */
224         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
225                 return 0;
226         }
227
228         /* update the dmaster field to point to us */
229         header = (struct ctdb_ltdb_header *)data.dptr;
230         if (!persistent) {
231                 header->dmaster = dmaster;
232                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
233         }
234
235         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
236         if (ret != 0) {
237                 return ret;
238         }
239
240         return 0;
241 }
242
243 struct recdb_records_traverse_state {
244         struct ctdb_rec_buffer *recbuf;
245         uint32_t dmaster;
246         uint32_t reqid;
247         bool persistent;
248         bool failed;
249 };
250
251 static int recdb_records_traverse(struct tdb_context *tdb,
252                                   TDB_DATA key, TDB_DATA data,
253                                   void *private_data)
254 {
255         struct recdb_records_traverse_state *state =
256                 (struct recdb_records_traverse_state *)private_data;
257         int ret;
258
259         ret = recbuf_filter_add(state->recbuf, state->persistent,
260                                 state->reqid, state->dmaster, key, data);
261         if (ret != 0) {
262                 state->failed = true;
263                 return ret;
264         }
265
266         return 0;
267 }
268
269 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
270                                              TALLOC_CTX *mem_ctx,
271                                              uint32_t dmaster)
272 {
273         struct recdb_records_traverse_state state;
274         int ret;
275
276         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
277         if (state.recbuf == NULL) {
278                 return NULL;
279         }
280         state.dmaster = dmaster;
281         state.reqid = 0;
282         state.persistent = recdb_persistent(recdb);
283         state.failed = false;
284
285         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
286                                 &state);
287         if (ret == -1 || state.failed) {
288                 D_ERR("Failed to marshall recovery records for %s\n",
289                       recdb_name(recdb));
290                 TALLOC_FREE(state.recbuf);
291                 return NULL;
292         }
293
294         return state.recbuf;
295 }
296
297 struct recdb_file_traverse_state {
298         struct ctdb_rec_buffer *recbuf;
299         struct recdb_context *recdb;
300         TALLOC_CTX *mem_ctx;
301         uint32_t dmaster;
302         uint32_t reqid;
303         bool persistent;
304         bool failed;
305         int fd;
306         int max_size;
307         int num_buffers;
308 };
309
310 static int recdb_file_traverse(struct tdb_context *tdb,
311                                TDB_DATA key, TDB_DATA data,
312                                void *private_data)
313 {
314         struct recdb_file_traverse_state *state =
315                 (struct recdb_file_traverse_state *)private_data;
316         int ret;
317
318         ret = recbuf_filter_add(state->recbuf, state->persistent,
319                                 state->reqid, state->dmaster, key, data);
320         if (ret != 0) {
321                 state->failed = true;
322                 return ret;
323         }
324
325         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
326                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
327                 if (ret != 0) {
328                         D_ERR("Failed to collect recovery records for %s\n",
329                               recdb_name(state->recdb));
330                         state->failed = true;
331                         return ret;
332                 }
333
334                 state->num_buffers += 1;
335
336                 TALLOC_FREE(state->recbuf);
337                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
338                                                      recdb_id(state->recdb));
339                 if (state->recbuf == NULL) {
340                         state->failed = true;
341                         return ENOMEM;
342                 }
343         }
344
345         return 0;
346 }
347
348 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
349                       uint32_t dmaster, int fd, int max_size)
350 {
351         struct recdb_file_traverse_state state;
352         int ret;
353
354         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
355         if (state.recbuf == NULL) {
356                 return -1;
357         }
358         state.recdb = recdb;
359         state.mem_ctx = mem_ctx;
360         state.dmaster = dmaster;
361         state.reqid = 0;
362         state.persistent = recdb_persistent(recdb);
363         state.failed = false;
364         state.fd = fd;
365         state.max_size = max_size;
366         state.num_buffers = 0;
367
368         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
369         if (ret == -1 || state.failed) {
370                 TALLOC_FREE(state.recbuf);
371                 return -1;
372         }
373
374         ret = ctdb_rec_buffer_write(state.recbuf, fd);
375         if (ret != 0) {
376                 D_ERR("Failed to collect recovery records for %s\n",
377                       recdb_name(recdb));
378                 TALLOC_FREE(state.recbuf);
379                 return -1;
380         }
381         state.num_buffers += 1;
382
383         D_DEBUG("Wrote %d buffers of recovery records for %s\n",
384                 state.num_buffers, recdb_name(recdb));
385
386         return state.num_buffers;
387 }
388
389 /*
390  * Pull database from a single node
391  */
392
393 struct pull_database_state {
394         struct tevent_context *ev;
395         struct ctdb_client_context *client;
396         struct recdb_context *recdb;
397         uint32_t pnn;
398         uint64_t srvid;
399         int num_records;
400         int result;
401 };
402
403 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
404                                   void *private_data);
405 static void pull_database_register_done(struct tevent_req *subreq);
406 static void pull_database_old_done(struct tevent_req *subreq);
407 static void pull_database_unregister_done(struct tevent_req *subreq);
408 static void pull_database_new_done(struct tevent_req *subreq);
409
410 static struct tevent_req *pull_database_send(
411                         TALLOC_CTX *mem_ctx,
412                         struct tevent_context *ev,
413                         struct ctdb_client_context *client,
414                         uint32_t pnn, uint32_t caps,
415                         struct recdb_context *recdb)
416 {
417         struct tevent_req *req, *subreq;
418         struct pull_database_state *state;
419         struct ctdb_req_control request;
420
421         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
422         if (req == NULL) {
423                 return NULL;
424         }
425
426         state->ev = ev;
427         state->client = client;
428         state->recdb = recdb;
429         state->pnn = pnn;
430         state->srvid = srvid_next();
431
432         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
433                 subreq = ctdb_client_set_message_handler_send(
434                                         state, state->ev, state->client,
435                                         state->srvid, pull_database_handler,
436                                         req);
437                 if (tevent_req_nomem(subreq, req)) {
438                         return tevent_req_post(req, ev);
439                 }
440
441                 tevent_req_set_callback(subreq, pull_database_register_done,
442                                         req);
443
444         } else {
445                 struct ctdb_pulldb pulldb;
446
447                 pulldb.db_id = recdb_id(recdb);
448                 pulldb.lmaster = CTDB_LMASTER_ANY;
449
450                 ctdb_req_control_pull_db(&request, &pulldb);
451                 subreq = ctdb_client_control_send(state, state->ev,
452                                                   state->client,
453                                                   pnn, TIMEOUT(),
454                                                   &request);
455                 if (tevent_req_nomem(subreq, req)) {
456                         return tevent_req_post(req, ev);
457                 }
458                 tevent_req_set_callback(subreq, pull_database_old_done, req);
459         }
460
461         return req;
462 }
463
464 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
465                                   void *private_data)
466 {
467         struct tevent_req *req = talloc_get_type_abort(
468                 private_data, struct tevent_req);
469         struct pull_database_state *state = tevent_req_data(
470                 req, struct pull_database_state);
471         struct ctdb_rec_buffer *recbuf;
472         size_t np;
473         int ret;
474         bool status;
475
476         if (srvid != state->srvid) {
477                 return;
478         }
479
480         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
481         if (ret != 0) {
482                 D_ERR("Invalid data received for DB_PULL messages\n");
483                 return;
484         }
485
486         if (recbuf->db_id != recdb_id(state->recdb)) {
487                 talloc_free(recbuf);
488                 D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
489                       recbuf->db_id, recdb_name(state->recdb));
490                 return;
491         }
492
493         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
494                            recbuf);
495         if (! status) {
496                 talloc_free(recbuf);
497                 D_ERR("Failed to add records to recdb for %s\n",
498                       recdb_name(state->recdb));
499                 return;
500         }
501
502         state->num_records += recbuf->count;
503         talloc_free(recbuf);
504 }
505
506 static void pull_database_register_done(struct tevent_req *subreq)
507 {
508         struct tevent_req *req = tevent_req_callback_data(
509                 subreq, struct tevent_req);
510         struct pull_database_state *state = tevent_req_data(
511                 req, struct pull_database_state);
512         struct ctdb_req_control request;
513         struct ctdb_pulldb_ext pulldb_ext;
514         int ret;
515         bool status;
516
517         status = ctdb_client_set_message_handler_recv(subreq, &ret);
518         TALLOC_FREE(subreq);
519         if (! status) {
520                 D_ERR("Failed to set message handler for DB_PULL for %s\n",
521                       recdb_name(state->recdb));
522                 tevent_req_error(req, ret);
523                 return;
524         }
525
526         pulldb_ext.db_id = recdb_id(state->recdb);
527         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
528         pulldb_ext.srvid = state->srvid;
529
530         ctdb_req_control_db_pull(&request, &pulldb_ext);
531         subreq = ctdb_client_control_send(state, state->ev, state->client,
532                                           state->pnn, TIMEOUT(), &request);
533         if (tevent_req_nomem(subreq, req)) {
534                 return;
535         }
536         tevent_req_set_callback(subreq, pull_database_new_done, req);
537 }
538
539 static void pull_database_old_done(struct tevent_req *subreq)
540 {
541         struct tevent_req *req = tevent_req_callback_data(
542                 subreq, struct tevent_req);
543         struct pull_database_state *state = tevent_req_data(
544                 req, struct pull_database_state);
545         struct ctdb_reply_control *reply;
546         struct ctdb_rec_buffer *recbuf;
547         int ret;
548         bool status;
549
550         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
551         TALLOC_FREE(subreq);
552         if (! status) {
553                 D_ERR("control PULL_DB failed for %s on node %u, ret=%d\n",
554                       recdb_name(state->recdb), state->pnn, ret);
555                 tevent_req_error(req, ret);
556                 return;
557         }
558
559         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
560         talloc_free(reply);
561         if (ret != 0) {
562                 tevent_req_error(req, ret);
563                 return;
564         }
565
566         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
567                            recbuf);
568         if (! status) {
569                 talloc_free(recbuf);
570                 tevent_req_error(req, EIO);
571                 return;
572         }
573
574         state->num_records = recbuf->count;
575         talloc_free(recbuf);
576
577         D_INFO("Pulled %d records for db %s from node %d\n",
578                state->num_records, recdb_name(state->recdb), state->pnn);
579
580         tevent_req_done(req);
581 }
582
583 static void pull_database_new_done(struct tevent_req *subreq)
584 {
585         struct tevent_req *req = tevent_req_callback_data(
586                 subreq, struct tevent_req);
587         struct pull_database_state *state = tevent_req_data(
588                 req, struct pull_database_state);
589         struct ctdb_reply_control *reply;
590         uint32_t num_records;
591         int ret;
592         bool status;
593
594         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
595         TALLOC_FREE(subreq);
596         if (! status) {
597                 D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
598                       recdb_name(state->recdb), state->pnn, ret);
599                 state->result = ret;
600                 goto unregister;
601         }
602
603         ret = ctdb_reply_control_db_pull(reply, &num_records);
604         talloc_free(reply);
605         if (num_records != state->num_records) {
606                 D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
607                       num_records, state->num_records,
608                       recdb_name(state->recdb));
609                 state->result = EIO;
610                 goto unregister;
611         }
612
613         D_INFO("Pulled %d records for db %s from node %d\n",
614                state->num_records, recdb_name(state->recdb), state->pnn);
615
616 unregister:
617
618         subreq = ctdb_client_remove_message_handler_send(
619                                         state, state->ev, state->client,
620                                         state->srvid, req);
621         if (tevent_req_nomem(subreq, req)) {
622                 return;
623         }
624         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
625 }
626
627 static void pull_database_unregister_done(struct tevent_req *subreq)
628 {
629         struct tevent_req *req = tevent_req_callback_data(
630                 subreq, struct tevent_req);
631         struct pull_database_state *state = tevent_req_data(
632                 req, struct pull_database_state);
633         int ret;
634         bool status;
635
636         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
637         TALLOC_FREE(subreq);
638         if (! status) {
639                 D_ERR("failed to remove message handler for DB_PULL for db %s\n",
640                       recdb_name(state->recdb));
641                 tevent_req_error(req, ret);
642                 return;
643         }
644
645         if (state->result != 0) {
646                 tevent_req_error(req, state->result);
647                 return;
648         }
649
650         tevent_req_done(req);
651 }
652
653 static bool pull_database_recv(struct tevent_req *req, int *perr)
654 {
655         return generic_recv(req, perr);
656 }
657
658 /*
659  * Push database to specified nodes (old style)
660  */
661
662 struct push_database_old_state {
663         struct tevent_context *ev;
664         struct ctdb_client_context *client;
665         struct recdb_context *recdb;
666         uint32_t *pnn_list;
667         int count;
668         struct ctdb_rec_buffer *recbuf;
669         int index;
670 };
671
672 static void push_database_old_push_done(struct tevent_req *subreq);
673
674 static struct tevent_req *push_database_old_send(
675                         TALLOC_CTX *mem_ctx,
676                         struct tevent_context *ev,
677                         struct ctdb_client_context *client,
678                         uint32_t *pnn_list, int count,
679                         struct recdb_context *recdb)
680 {
681         struct tevent_req *req, *subreq;
682         struct push_database_old_state *state;
683         struct ctdb_req_control request;
684         uint32_t pnn;
685
686         req = tevent_req_create(mem_ctx, &state,
687                                 struct push_database_old_state);
688         if (req == NULL) {
689                 return NULL;
690         }
691
692         state->ev = ev;
693         state->client = client;
694         state->recdb = recdb;
695         state->pnn_list = pnn_list;
696         state->count = count;
697         state->index = 0;
698
699         state->recbuf = recdb_records(recdb, state,
700                                       ctdb_client_pnn(client));
701         if (tevent_req_nomem(state->recbuf, req)) {
702                 return tevent_req_post(req, ev);
703         }
704
705         pnn = state->pnn_list[state->index];
706
707         ctdb_req_control_push_db(&request, state->recbuf);
708         subreq = ctdb_client_control_send(state, ev, client, pnn,
709                                           TIMEOUT(), &request);
710         if (tevent_req_nomem(subreq, req)) {
711                 return tevent_req_post(req, ev);
712         }
713         tevent_req_set_callback(subreq, push_database_old_push_done, req);
714
715         return req;
716 }
717
718 static void push_database_old_push_done(struct tevent_req *subreq)
719 {
720         struct tevent_req *req = tevent_req_callback_data(
721                 subreq, struct tevent_req);
722         struct push_database_old_state *state = tevent_req_data(
723                 req, struct push_database_old_state);
724         struct ctdb_req_control request;
725         uint32_t pnn;
726         int ret;
727         bool status;
728
729         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
730         TALLOC_FREE(subreq);
731         if (! status) {
732                 D_ERR("control PUSH_DB failed for db %s on node %u, ret=%d\n",
733                       recdb_name(state->recdb), state->pnn_list[state->index],
734                       ret);
735                 tevent_req_error(req, ret);
736                 return;
737         }
738
739         state->index += 1;
740         if (state->index == state->count) {
741                 TALLOC_FREE(state->recbuf);
742                 tevent_req_done(req);
743                 return;
744         }
745
746         pnn = state->pnn_list[state->index];
747
748         ctdb_req_control_push_db(&request, state->recbuf);
749         subreq = ctdb_client_control_send(state, state->ev, state->client,
750                                           pnn, TIMEOUT(), &request);
751         if (tevent_req_nomem(subreq, req)) {
752                 return;
753         }
754         tevent_req_set_callback(subreq, push_database_old_push_done, req);
755 }
756
757 static bool push_database_old_recv(struct tevent_req *req, int *perr)
758 {
759         return generic_recv(req, perr);
760 }
761
762 /*
763  * Push database to specified nodes (new style)
764  */
765
766 struct push_database_new_state {
767         struct tevent_context *ev;
768         struct ctdb_client_context *client;
769         struct recdb_context *recdb;
770         uint32_t *pnn_list;
771         int count;
772         uint64_t srvid;
773         uint32_t dmaster;
774         int fd;
775         int num_buffers;
776         int num_buffers_sent;
777         int num_records;
778 };
779
780 static void push_database_new_started(struct tevent_req *subreq);
781 static void push_database_new_send_msg(struct tevent_req *req);
782 static void push_database_new_send_done(struct tevent_req *subreq);
783 static void push_database_new_confirmed(struct tevent_req *subreq);
784
785 static struct tevent_req *push_database_new_send(
786                         TALLOC_CTX *mem_ctx,
787                         struct tevent_context *ev,
788                         struct ctdb_client_context *client,
789                         uint32_t *pnn_list, int count,
790                         struct recdb_context *recdb,
791                         int max_size)
792 {
793         struct tevent_req *req, *subreq;
794         struct push_database_new_state *state;
795         struct ctdb_req_control request;
796         struct ctdb_pulldb_ext pulldb_ext;
797         char *filename;
798         off_t offset;
799
800         req = tevent_req_create(mem_ctx, &state,
801                                 struct push_database_new_state);
802         if (req == NULL) {
803                 return NULL;
804         }
805
806         state->ev = ev;
807         state->client = client;
808         state->recdb = recdb;
809         state->pnn_list = pnn_list;
810         state->count = count;
811
812         state->srvid = srvid_next();
813         state->dmaster = ctdb_client_pnn(client);
814         state->num_buffers_sent = 0;
815         state->num_records = 0;
816
817         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
818         if (tevent_req_nomem(filename, req)) {
819                 return tevent_req_post(req, ev);
820         }
821
822         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
823         if (state->fd == -1) {
824                 tevent_req_error(req, errno);
825                 return tevent_req_post(req, ev);
826         }
827         unlink(filename);
828         talloc_free(filename);
829
830         state->num_buffers = recdb_file(recdb, state, state->dmaster,
831                                         state->fd, max_size);
832         if (state->num_buffers == -1) {
833                 tevent_req_error(req, ENOMEM);
834                 return tevent_req_post(req, ev);
835         }
836
837         offset = lseek(state->fd, 0, SEEK_SET);
838         if (offset != 0) {
839                 tevent_req_error(req, EIO);
840                 return tevent_req_post(req, ev);
841         }
842
843         pulldb_ext.db_id = recdb_id(recdb);
844         pulldb_ext.srvid = state->srvid;
845
846         ctdb_req_control_db_push_start(&request, &pulldb_ext);
847         subreq = ctdb_client_control_multi_send(state, ev, client,
848                                                 pnn_list, count,
849                                                 TIMEOUT(), &request);
850         if (tevent_req_nomem(subreq, req)) {
851                 return tevent_req_post(req, ev);
852         }
853         tevent_req_set_callback(subreq, push_database_new_started, req);
854
855         return req;
856 }
857
858 static void push_database_new_started(struct tevent_req *subreq)
859 {
860         struct tevent_req *req = tevent_req_callback_data(
861                 subreq, struct tevent_req);
862         struct push_database_new_state *state = tevent_req_data(
863                 req, struct push_database_new_state);
864         int *err_list;
865         int ret;
866         bool status;
867
868         status = ctdb_client_control_multi_recv(subreq, &ret, state,
869                                                 &err_list, NULL);
870         TALLOC_FREE(subreq);
871         if (! status) {
872                 int ret2;
873                 uint32_t pnn;
874
875                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
876                                                        state->count,
877                                                        err_list, &pnn);
878                 if (ret2 != 0) {
879                         D_ERR("control DB_PUSH_START failed for db %s"
880                               " on node %u, ret=%d\n",
881                               recdb_name(state->recdb), pnn, ret2);
882                 } else {
883                         D_ERR("control DB_PUSH_START failed for db %s,"
884                               " ret=%d\n",
885                               recdb_name(state->recdb), ret);
886                 }
887                 talloc_free(err_list);
888
889                 tevent_req_error(req, ret);
890                 return;
891         }
892
893         push_database_new_send_msg(req);
894 }
895
896 static void push_database_new_send_msg(struct tevent_req *req)
897 {
898         struct push_database_new_state *state = tevent_req_data(
899                 req, struct push_database_new_state);
900         struct tevent_req *subreq;
901         struct ctdb_rec_buffer *recbuf;
902         struct ctdb_req_message message;
903         TDB_DATA data;
904         size_t np;
905         int ret;
906
907         if (state->num_buffers_sent == state->num_buffers) {
908                 struct ctdb_req_control request;
909
910                 ctdb_req_control_db_push_confirm(&request,
911                                                  recdb_id(state->recdb));
912                 subreq = ctdb_client_control_multi_send(state, state->ev,
913                                                         state->client,
914                                                         state->pnn_list,
915                                                         state->count,
916                                                         TIMEOUT(), &request);
917                 if (tevent_req_nomem(subreq, req)) {
918                         return;
919                 }
920                 tevent_req_set_callback(subreq, push_database_new_confirmed,
921                                         req);
922                 return;
923         }
924
925         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
926         if (ret != 0) {
927                 tevent_req_error(req, ret);
928                 return;
929         }
930
931         data.dsize = ctdb_rec_buffer_len(recbuf);
932         data.dptr = talloc_size(state, data.dsize);
933         if (tevent_req_nomem(data.dptr, req)) {
934                 return;
935         }
936
937         ctdb_rec_buffer_push(recbuf, data.dptr, &np);
938
939         message.srvid = state->srvid;
940         message.data.data = data;
941
942         D_DEBUG("Pushing buffer %d with %d records for db %s\n",
943                 state->num_buffers_sent, recbuf->count,
944                 recdb_name(state->recdb));
945
946         subreq = ctdb_client_message_multi_send(state, state->ev,
947                                                 state->client,
948                                                 state->pnn_list, state->count,
949                                                 &message);
950         if (tevent_req_nomem(subreq, req)) {
951                 return;
952         }
953         tevent_req_set_callback(subreq, push_database_new_send_done, req);
954
955         state->num_records += recbuf->count;
956
957         talloc_free(data.dptr);
958         talloc_free(recbuf);
959 }
960
961 static void push_database_new_send_done(struct tevent_req *subreq)
962 {
963         struct tevent_req *req = tevent_req_callback_data(
964                 subreq, struct tevent_req);
965         struct push_database_new_state *state = tevent_req_data(
966                 req, struct push_database_new_state);
967         bool status;
968         int ret;
969
970         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
971         TALLOC_FREE(subreq);
972         if (! status) {
973                 D_ERR("Sending recovery records failed for %s\n",
974                       recdb_name(state->recdb));
975                 tevent_req_error(req, ret);
976                 return;
977         }
978
979         state->num_buffers_sent += 1;
980
981         push_database_new_send_msg(req);
982 }
983
984 static void push_database_new_confirmed(struct tevent_req *subreq)
985 {
986         struct tevent_req *req = tevent_req_callback_data(
987                 subreq, struct tevent_req);
988         struct push_database_new_state *state = tevent_req_data(
989                 req, struct push_database_new_state);
990         struct ctdb_reply_control **reply;
991         int *err_list;
992         bool status;
993         int ret, i;
994         uint32_t num_records;
995
996         status = ctdb_client_control_multi_recv(subreq, &ret, state,
997                                                 &err_list, &reply);
998         TALLOC_FREE(subreq);
999         if (! status) {
1000                 int ret2;
1001                 uint32_t pnn;
1002
1003                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1004                                                        state->count, err_list,
1005                                                        &pnn);
1006                 if (ret2 != 0) {
1007                         D_ERR("control DB_PUSH_CONFIRM failed for db %s"
1008                               " on node %u, ret=%d\n",
1009                               recdb_name(state->recdb), pnn, ret2);
1010                 } else {
1011                         D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
1012                               " ret=%d\n",
1013                               recdb_name(state->recdb), ret);
1014                 }
1015                 tevent_req_error(req, ret);
1016                 return;
1017         }
1018
1019         for (i=0; i<state->count; i++) {
1020                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1021                                                          &num_records);
1022                 if (ret != 0) {
1023                         tevent_req_error(req, EPROTO);
1024                         return;
1025                 }
1026
1027                 if (num_records != state->num_records) {
1028                         D_ERR("Node %u received %d of %d records for %s\n",
1029                               state->pnn_list[i], num_records,
1030                               state->num_records, recdb_name(state->recdb));
1031                         tevent_req_error(req, EPROTO);
1032                         return;
1033                 }
1034         }
1035
1036         talloc_free(reply);
1037
1038         D_INFO("Pushed %d records for db %s\n",
1039                state->num_records, recdb_name(state->recdb));
1040
1041         tevent_req_done(req);
1042 }
1043
1044 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1045 {
1046         return generic_recv(req, perr);
1047 }
1048
1049 /*
1050  * wrapper for push_database_old and push_database_new
1051  */
1052
1053 struct push_database_state {
1054         bool old_done, new_done;
1055 };
1056
1057 static void push_database_old_done(struct tevent_req *subreq);
1058 static void push_database_new_done(struct tevent_req *subreq);
1059
1060 static struct tevent_req *push_database_send(
1061                         TALLOC_CTX *mem_ctx,
1062                         struct tevent_context *ev,
1063                         struct ctdb_client_context *client,
1064                         uint32_t *pnn_list, int count, uint32_t *caps,
1065                         struct ctdb_tunable_list *tun_list,
1066                         struct recdb_context *recdb)
1067 {
1068         struct tevent_req *req, *subreq;
1069         struct push_database_state *state;
1070         uint32_t *old_list, *new_list;
1071         unsigned int old_count, new_count;
1072         int i;
1073
1074         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1075         if (req == NULL) {
1076                 return NULL;
1077         }
1078
1079         state->old_done = false;
1080         state->new_done = false;
1081
1082         old_count = 0;
1083         new_count = 0;
1084         old_list = talloc_array(state, uint32_t, count);
1085         new_list = talloc_array(state, uint32_t, count);
1086         if (tevent_req_nomem(old_list, req) ||
1087             tevent_req_nomem(new_list,req)) {
1088                 return tevent_req_post(req, ev);
1089         }
1090
1091         for (i=0; i<count; i++) {
1092                 uint32_t pnn = pnn_list[i];
1093
1094                 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1095                         new_list[new_count] = pnn;
1096                         new_count += 1;
1097                 } else {
1098                         old_list[old_count] = pnn;
1099                         old_count += 1;
1100                 }
1101         }
1102
1103         if (old_count > 0) {
1104                 subreq = push_database_old_send(state, ev, client,
1105                                                 old_list, old_count, recdb);
1106                 if (tevent_req_nomem(subreq, req)) {
1107                         return tevent_req_post(req, ev);
1108                 }
1109                 tevent_req_set_callback(subreq, push_database_old_done, req);
1110         } else {
1111                 state->old_done = true;
1112         }
1113
1114         if (new_count > 0) {
1115                 subreq = push_database_new_send(state, ev, client,
1116                                                 new_list, new_count, recdb,
1117                                                 tun_list->rec_buffer_size_limit);
1118                 if (tevent_req_nomem(subreq, req)) {
1119                         return tevent_req_post(req, ev);
1120                 }
1121                 tevent_req_set_callback(subreq, push_database_new_done, req);
1122         } else {
1123                 state->new_done = true;
1124         }
1125
1126         return req;
1127 }
1128
1129 static void push_database_old_done(struct tevent_req *subreq)
1130 {
1131         struct tevent_req *req = tevent_req_callback_data(
1132                 subreq, struct tevent_req);
1133         struct push_database_state *state = tevent_req_data(
1134                 req, struct push_database_state);
1135         bool status;
1136         int ret;
1137
1138         status = push_database_old_recv(subreq, &ret);
1139         if (! status) {
1140                 tevent_req_error(req, ret);
1141                 return;
1142         }
1143
1144         state->old_done = true;
1145
1146         if (state->old_done && state->new_done) {
1147                 tevent_req_done(req);
1148         }
1149 }
1150
1151 static void push_database_new_done(struct tevent_req *subreq)
1152 {
1153         struct tevent_req *req = tevent_req_callback_data(
1154                 subreq, struct tevent_req);
1155         struct push_database_state *state = tevent_req_data(
1156                 req, struct push_database_state);
1157         bool status;
1158         int ret;
1159
1160         status = push_database_new_recv(subreq, &ret);
1161         if (! status) {
1162                 tevent_req_error(req, ret);
1163                 return;
1164         }
1165
1166         state->new_done = true;
1167
1168         if (state->old_done && state->new_done) {
1169                 tevent_req_done(req);
1170         }
1171 }
1172
1173 static bool push_database_recv(struct tevent_req *req, int *perr)
1174 {
1175         return generic_recv(req, perr);
1176 }
1177
1178 /*
1179  * Collect databases using highest sequence number
1180  */
1181
1182 struct collect_highseqnum_db_state {
1183         struct tevent_context *ev;
1184         struct ctdb_client_context *client;
1185         uint32_t *pnn_list;
1186         int count;
1187         uint32_t *caps;
1188         uint32_t *ban_credits;
1189         uint32_t db_id;
1190         struct recdb_context *recdb;
1191         uint32_t max_pnn;
1192 };
1193
1194 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1195 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1196
1197 static struct tevent_req *collect_highseqnum_db_send(
1198                         TALLOC_CTX *mem_ctx,
1199                         struct tevent_context *ev,
1200                         struct ctdb_client_context *client,
1201                         uint32_t *pnn_list, int count, uint32_t *caps,
1202                         uint32_t *ban_credits, uint32_t db_id,
1203                         struct recdb_context *recdb)
1204 {
1205         struct tevent_req *req, *subreq;
1206         struct collect_highseqnum_db_state *state;
1207         struct ctdb_req_control request;
1208
1209         req = tevent_req_create(mem_ctx, &state,
1210                                 struct collect_highseqnum_db_state);
1211         if (req == NULL) {
1212                 return NULL;
1213         }
1214
1215         state->ev = ev;
1216         state->client = client;
1217         state->pnn_list = pnn_list;
1218         state->count = count;
1219         state->caps = caps;
1220         state->ban_credits = ban_credits;
1221         state->db_id = db_id;
1222         state->recdb = recdb;
1223
1224         ctdb_req_control_get_db_seqnum(&request, db_id);
1225         subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1226                                                 state->pnn_list, state->count,
1227                                                 TIMEOUT(), &request);
1228         if (tevent_req_nomem(subreq, req)) {
1229                 return tevent_req_post(req, ev);
1230         }
1231         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1232                                 req);
1233
1234         return req;
1235 }
1236
1237 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1238 {
1239         struct tevent_req *req = tevent_req_callback_data(
1240                 subreq, struct tevent_req);
1241         struct collect_highseqnum_db_state *state = tevent_req_data(
1242                 req, struct collect_highseqnum_db_state);
1243         struct ctdb_reply_control **reply;
1244         int *err_list;
1245         bool status;
1246         int ret, i;
1247         uint64_t seqnum, max_seqnum;
1248
1249         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1250                                                 &err_list, &reply);
1251         TALLOC_FREE(subreq);
1252         if (! status) {
1253                 int ret2;
1254                 uint32_t pnn;
1255
1256                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1257                                                        state->count, err_list,
1258                                                        &pnn);
1259                 if (ret2 != 0) {
1260                         D_ERR("control GET_DB_SEQNUM failed for db %s"
1261                               " on node %u, ret=%d\n",
1262                               recdb_name(state->recdb), pnn, ret2);
1263                 } else {
1264                         D_ERR("control GET_DB_SEQNUM failed for db %s,"
1265                               " ret=%d\n",
1266                               recdb_name(state->recdb), ret);
1267                 }
1268                 tevent_req_error(req, ret);
1269                 return;
1270         }
1271
1272         max_seqnum = 0;
1273         state->max_pnn = state->pnn_list[0];
1274         for (i=0; i<state->count; i++) {
1275                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1276                 if (ret != 0) {
1277                         tevent_req_error(req, EPROTO);
1278                         return;
1279                 }
1280
1281                 if (max_seqnum < seqnum) {
1282                         max_seqnum = seqnum;
1283                         state->max_pnn = state->pnn_list[i];
1284                 }
1285         }
1286
1287         talloc_free(reply);
1288
1289         D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1290                recdb_name(state->recdb), state->max_pnn, max_seqnum);
1291
1292         subreq = pull_database_send(state, state->ev, state->client,
1293                                     state->max_pnn,
1294                                     state->caps[state->max_pnn],
1295                                     state->recdb);
1296         if (tevent_req_nomem(subreq, req)) {
1297                 return;
1298         }
1299         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1300                                 req);
1301 }
1302
1303 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1304 {
1305         struct tevent_req *req = tevent_req_callback_data(
1306                 subreq, struct tevent_req);
1307         struct collect_highseqnum_db_state *state = tevent_req_data(
1308                 req, struct collect_highseqnum_db_state);
1309         int ret;
1310         bool status;
1311
1312         status = pull_database_recv(subreq, &ret);
1313         TALLOC_FREE(subreq);
1314         if (! status) {
1315                 state->ban_credits[state->max_pnn] += 1;
1316                 tevent_req_error(req, ret);
1317                 return;
1318         }
1319
1320         tevent_req_done(req);
1321 }
1322
1323 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1324 {
1325         return generic_recv(req, perr);
1326 }
1327
1328 /*
1329  * Collect all databases
1330  */
1331
1332 struct collect_all_db_state {
1333         struct tevent_context *ev;
1334         struct ctdb_client_context *client;
1335         uint32_t *pnn_list;
1336         int count;
1337         uint32_t *caps;
1338         uint32_t *ban_credits;
1339         uint32_t db_id;
1340         struct recdb_context *recdb;
1341         struct ctdb_pulldb pulldb;
1342         int index;
1343 };
1344
1345 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1346
1347 static struct tevent_req *collect_all_db_send(
1348                         TALLOC_CTX *mem_ctx,
1349                         struct tevent_context *ev,
1350                         struct ctdb_client_context *client,
1351                         uint32_t *pnn_list, int count, uint32_t *caps,
1352                         uint32_t *ban_credits, uint32_t db_id,
1353                         struct recdb_context *recdb)
1354 {
1355         struct tevent_req *req, *subreq;
1356         struct collect_all_db_state *state;
1357         uint32_t pnn;
1358
1359         req = tevent_req_create(mem_ctx, &state,
1360                                 struct collect_all_db_state);
1361         if (req == NULL) {
1362                 return NULL;
1363         }
1364
1365         state->ev = ev;
1366         state->client = client;
1367         state->pnn_list = pnn_list;
1368         state->count = count;
1369         state->caps = caps;
1370         state->ban_credits = ban_credits;
1371         state->db_id = db_id;
1372         state->recdb = recdb;
1373         state->index = 0;
1374
1375         pnn = state->pnn_list[state->index];
1376
1377         subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1378         if (tevent_req_nomem(subreq, req)) {
1379                 return tevent_req_post(req, ev);
1380         }
1381         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1382
1383         return req;
1384 }
1385
1386 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1387 {
1388         struct tevent_req *req = tevent_req_callback_data(
1389                 subreq, struct tevent_req);
1390         struct collect_all_db_state *state = tevent_req_data(
1391                 req, struct collect_all_db_state);
1392         uint32_t pnn;
1393         int ret;
1394         bool status;
1395
1396         status = pull_database_recv(subreq, &ret);
1397         TALLOC_FREE(subreq);
1398         if (! status) {
1399                 pnn = state->pnn_list[state->index];
1400                 state->ban_credits[pnn] += 1;
1401                 tevent_req_error(req, ret);
1402                 return;
1403         }
1404
1405         state->index += 1;
1406         if (state->index == state->count) {
1407                 tevent_req_done(req);
1408                 return;
1409         }
1410
1411         pnn = state->pnn_list[state->index];
1412         subreq = pull_database_send(state, state->ev, state->client,
1413                                     pnn, state->caps[pnn], state->recdb);
1414         if (tevent_req_nomem(subreq, req)) {
1415                 return;
1416         }
1417         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1418 }
1419
1420 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1421 {
1422         return generic_recv(req, perr);
1423 }
1424
1425
1426 /**
1427  * For each database do the following:
1428  *  - Get DB name
1429  *  - Get DB path
1430  *  - Freeze database on all nodes
1431  *  - Start transaction on all nodes
1432  *  - Collect database from all nodes
1433  *  - Wipe database on all nodes
1434  *  - Push database to all nodes
1435  *  - Commit transaction on all nodes
1436  *  - Thaw database on all nodes
1437  */
1438
1439 struct recover_db_state {
1440         struct tevent_context *ev;
1441         struct ctdb_client_context *client;
1442         struct ctdb_tunable_list *tun_list;
1443         uint32_t *pnn_list;
1444         int count;
1445         uint32_t *caps;
1446         uint32_t *ban_credits;
1447         uint32_t db_id;
1448         uint8_t db_flags;
1449
1450         uint32_t destnode;
1451         struct ctdb_transdb transdb;
1452
1453         const char *db_name, *db_path;
1454         struct recdb_context *recdb;
1455 };
1456
1457 static void recover_db_name_done(struct tevent_req *subreq);
1458 static void recover_db_path_done(struct tevent_req *subreq);
1459 static void recover_db_freeze_done(struct tevent_req *subreq);
1460 static void recover_db_transaction_started(struct tevent_req *subreq);
1461 static void recover_db_collect_done(struct tevent_req *subreq);
1462 static void recover_db_wipedb_done(struct tevent_req *subreq);
1463 static void recover_db_pushdb_done(struct tevent_req *subreq);
1464 static void recover_db_transaction_committed(struct tevent_req *subreq);
1465 static void recover_db_thaw_done(struct tevent_req *subreq);
1466
1467 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1468                                           struct tevent_context *ev,
1469                                           struct ctdb_client_context *client,
1470                                           struct ctdb_tunable_list *tun_list,
1471                                           uint32_t *pnn_list, int count,
1472                                           uint32_t *caps,
1473                                           uint32_t *ban_credits,
1474                                           uint32_t generation,
1475                                           uint32_t db_id, uint8_t db_flags)
1476 {
1477         struct tevent_req *req, *subreq;
1478         struct recover_db_state *state;
1479         struct ctdb_req_control request;
1480
1481         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1482         if (req == NULL) {
1483                 return NULL;
1484         }
1485
1486         state->ev = ev;
1487         state->client = client;
1488         state->tun_list = tun_list;
1489         state->pnn_list = pnn_list;
1490         state->count = count;
1491         state->caps = caps;
1492         state->ban_credits = ban_credits;
1493         state->db_id = db_id;
1494         state->db_flags = db_flags;
1495
1496         state->destnode = ctdb_client_pnn(client);
1497         state->transdb.db_id = db_id;
1498         state->transdb.tid = generation;
1499
1500         ctdb_req_control_get_dbname(&request, db_id);
1501         subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1502                                           TIMEOUT(), &request);
1503         if (tevent_req_nomem(subreq, req)) {
1504                 return tevent_req_post(req, ev);
1505         }
1506         tevent_req_set_callback(subreq, recover_db_name_done, req);
1507
1508         return req;
1509 }
1510
1511 static void recover_db_name_done(struct tevent_req *subreq)
1512 {
1513         struct tevent_req *req = tevent_req_callback_data(
1514                 subreq, struct tevent_req);
1515         struct recover_db_state *state = tevent_req_data(
1516                 req, struct recover_db_state);
1517         struct ctdb_reply_control *reply;
1518         struct ctdb_req_control request;
1519         int ret;
1520         bool status;
1521
1522         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1523         TALLOC_FREE(subreq);
1524         if (! status) {
1525                 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1526                       state->db_id, ret);
1527                 tevent_req_error(req, ret);
1528                 return;
1529         }
1530
1531         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1532         if (ret != 0) {
1533                 D_ERR("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1534                       state->db_id, ret);
1535                 tevent_req_error(req, EPROTO);
1536                 return;
1537         }
1538
1539         talloc_free(reply);
1540
1541         ctdb_req_control_getdbpath(&request, state->db_id);
1542         subreq = ctdb_client_control_send(state, state->ev, state->client,
1543                                           state->destnode, TIMEOUT(),
1544                                           &request);
1545         if (tevent_req_nomem(subreq, req)) {
1546                 return;
1547         }
1548         tevent_req_set_callback(subreq, recover_db_path_done, req);
1549 }
1550
1551 static void recover_db_path_done(struct tevent_req *subreq)
1552 {
1553         struct tevent_req *req = tevent_req_callback_data(
1554                 subreq, struct tevent_req);
1555         struct recover_db_state *state = tevent_req_data(
1556                 req, struct recover_db_state);
1557         struct ctdb_reply_control *reply;
1558         struct ctdb_req_control request;
1559         int ret;
1560         bool status;
1561
1562         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1563         TALLOC_FREE(subreq);
1564         if (! status) {
1565                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1566                       state->db_name, ret);
1567                 tevent_req_error(req, ret);
1568                 return;
1569         }
1570
1571         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1572         if (ret != 0) {
1573                 D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
1574                       state->db_name, ret);
1575                 tevent_req_error(req, EPROTO);
1576                 return;
1577         }
1578
1579         talloc_free(reply);
1580
1581         ctdb_req_control_db_freeze(&request, state->db_id);
1582         subreq = ctdb_client_control_multi_send(state, state->ev,
1583                                                 state->client,
1584                                                 state->pnn_list, state->count,
1585                                                 TIMEOUT(), &request);
1586         if (tevent_req_nomem(subreq, req)) {
1587                 return;
1588         }
1589         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1590 }
1591
1592 static void recover_db_freeze_done(struct tevent_req *subreq)
1593 {
1594         struct tevent_req *req = tevent_req_callback_data(
1595                 subreq, struct tevent_req);
1596         struct recover_db_state *state = tevent_req_data(
1597                 req, struct recover_db_state);
1598         struct ctdb_req_control request;
1599         int *err_list;
1600         int ret;
1601         bool status;
1602
1603         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1604                                                 NULL);
1605         TALLOC_FREE(subreq);
1606         if (! status) {
1607                 int ret2;
1608                 uint32_t pnn;
1609
1610                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1611                                                        state->count, err_list,
1612                                                        &pnn);
1613                 if (ret2 != 0) {
1614                         D_ERR("control FREEZE_DB failed for db %s"
1615                               " on node %u, ret=%d\n",
1616                               state->db_name, pnn, ret2);
1617                         state->ban_credits[pnn] += 1;
1618                 } else {
1619                         D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
1620                               state->db_name, ret);
1621                 }
1622                 tevent_req_error(req, ret);
1623                 return;
1624         }
1625
1626         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1627         subreq = ctdb_client_control_multi_send(state, state->ev,
1628                                                 state->client,
1629                                                 state->pnn_list, state->count,
1630                                                 TIMEOUT(), &request);
1631         if (tevent_req_nomem(subreq, req)) {
1632                 return;
1633         }
1634         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1635 }
1636
1637 static void recover_db_transaction_started(struct tevent_req *subreq)
1638 {
1639         struct tevent_req *req = tevent_req_callback_data(
1640                 subreq, struct tevent_req);
1641         struct recover_db_state *state = tevent_req_data(
1642                 req, struct recover_db_state);
1643         int *err_list;
1644         int ret;
1645         bool status;
1646
1647         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1648                                                 NULL);
1649         TALLOC_FREE(subreq);
1650         if (! status) {
1651                 int ret2;
1652                 uint32_t pnn;
1653
1654                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1655                                                        state->count,
1656                                                        err_list, &pnn);
1657                 if (ret2 != 0) {
1658                         D_ERR("control TRANSACTION_DB failed for db=%s"
1659                               " on node %u, ret=%d\n",
1660                               state->db_name, pnn, ret2);
1661                 } else {
1662                         D_ERR("control TRANSACTION_DB failed for db=%s,"
1663                               " ret=%d\n", state->db_name, ret);
1664                 }
1665                 tevent_req_error(req, ret);
1666                 return;
1667         }
1668
1669         state->recdb = recdb_create(state, state->db_id, state->db_name,
1670                                     state->db_path,
1671                                     state->tun_list->database_hash_size,
1672                                     state->db_flags & CTDB_DB_FLAGS_PERSISTENT);
1673         if (tevent_req_nomem(state->recdb, req)) {
1674                 return;
1675         }
1676
1677         if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1678             (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1679                 subreq = collect_highseqnum_db_send(
1680                                 state, state->ev, state->client,
1681                                 state->pnn_list, state->count, state->caps,
1682                                 state->ban_credits, state->db_id,
1683                                 state->recdb);
1684         } else {
1685                 subreq = collect_all_db_send(
1686                                 state, state->ev, state->client,
1687                                 state->pnn_list, state->count, state->caps,
1688                                 state->ban_credits, state->db_id,
1689                                 state->recdb);
1690         }
1691         if (tevent_req_nomem(subreq, req)) {
1692                 return;
1693         }
1694         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1695 }
1696
1697 static void recover_db_collect_done(struct tevent_req *subreq)
1698 {
1699         struct tevent_req *req = tevent_req_callback_data(
1700                 subreq, struct tevent_req);
1701         struct recover_db_state *state = tevent_req_data(
1702                 req, struct recover_db_state);
1703         struct ctdb_req_control request;
1704         int ret;
1705         bool status;
1706
1707         if ((state->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
1708             (state->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
1709                 status = collect_highseqnum_db_recv(subreq, &ret);
1710         } else {
1711                 status = collect_all_db_recv(subreq, &ret);
1712         }
1713         TALLOC_FREE(subreq);
1714         if (! status) {
1715                 tevent_req_error(req, ret);
1716                 return;
1717         }
1718
1719         ctdb_req_control_wipe_database(&request, &state->transdb);
1720         subreq = ctdb_client_control_multi_send(state, state->ev,
1721                                                 state->client,
1722                                                 state->pnn_list, state->count,
1723                                                 TIMEOUT(), &request);
1724         if (tevent_req_nomem(subreq, req)) {
1725                 return;
1726         }
1727         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1728 }
1729
1730 static void recover_db_wipedb_done(struct tevent_req *subreq)
1731 {
1732         struct tevent_req *req = tevent_req_callback_data(
1733                 subreq, struct tevent_req);
1734         struct recover_db_state *state = tevent_req_data(
1735                 req, struct recover_db_state);
1736         int *err_list;
1737         int ret;
1738         bool status;
1739
1740         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1741                                                 NULL);
1742         TALLOC_FREE(subreq);
1743         if (! status) {
1744                 int ret2;
1745                 uint32_t pnn;
1746
1747                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1748                                                        state->count,
1749                                                        err_list, &pnn);
1750                 if (ret2 != 0) {
1751                         D_ERR("control WIPEDB failed for db %s on node %u,"
1752                               " ret=%d\n", state->db_name, pnn, ret2);
1753                 } else {
1754                         D_ERR("control WIPEDB failed for db %s, ret=%d\n",
1755                               state->db_name, ret);
1756                 }
1757                 tevent_req_error(req, ret);
1758                 return;
1759         }
1760
1761         subreq = push_database_send(state, state->ev, state->client,
1762                                     state->pnn_list, state->count,
1763                                     state->caps, state->tun_list,
1764                                     state->recdb);
1765         if (tevent_req_nomem(subreq, req)) {
1766                 return;
1767         }
1768         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1769 }
1770
1771 static void recover_db_pushdb_done(struct tevent_req *subreq)
1772 {
1773         struct tevent_req *req = tevent_req_callback_data(
1774                 subreq, struct tevent_req);
1775         struct recover_db_state *state = tevent_req_data(
1776                 req, struct recover_db_state);
1777         struct ctdb_req_control request;
1778         int ret;
1779         bool status;
1780
1781         status = push_database_recv(subreq, &ret);
1782         TALLOC_FREE(subreq);
1783         if (! status) {
1784                 tevent_req_error(req, ret);
1785                 return;
1786         }
1787
1788         TALLOC_FREE(state->recdb);
1789
1790         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1791         subreq = ctdb_client_control_multi_send(state, state->ev,
1792                                                 state->client,
1793                                                 state->pnn_list, state->count,
1794                                                 TIMEOUT(), &request);
1795         if (tevent_req_nomem(subreq, req)) {
1796                 return;
1797         }
1798         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1799 }
1800
1801 static void recover_db_transaction_committed(struct tevent_req *subreq)
1802 {
1803         struct tevent_req *req = tevent_req_callback_data(
1804                 subreq, struct tevent_req);
1805         struct recover_db_state *state = tevent_req_data(
1806                 req, struct recover_db_state);
1807         struct ctdb_req_control request;
1808         int *err_list;
1809         int ret;
1810         bool status;
1811
1812         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1813                                                 NULL);
1814         TALLOC_FREE(subreq);
1815         if (! status) {
1816                 int ret2;
1817                 uint32_t pnn;
1818
1819                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1820                                                        state->count,
1821                                                        err_list, &pnn);
1822                 if (ret2 != 0) {
1823                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
1824                               " on node %u, ret=%d\n",
1825                               state->db_name, pnn, ret2);
1826                 } else {
1827                         D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
1828                               " ret=%d\n", state->db_name, ret);
1829                 }
1830                 tevent_req_error(req, ret);
1831                 return;
1832         }
1833
1834         ctdb_req_control_db_thaw(&request, state->db_id);
1835         subreq = ctdb_client_control_multi_send(state, state->ev,
1836                                                 state->client,
1837                                                 state->pnn_list, state->count,
1838                                                 TIMEOUT(), &request);
1839         if (tevent_req_nomem(subreq, req)) {
1840                 return;
1841         }
1842         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1843 }
1844
1845 static void recover_db_thaw_done(struct tevent_req *subreq)
1846 {
1847         struct tevent_req *req = tevent_req_callback_data(
1848                 subreq, struct tevent_req);
1849         struct recover_db_state *state = tevent_req_data(
1850                 req, struct recover_db_state);
1851         int *err_list;
1852         int ret;
1853         bool status;
1854
1855         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1856                                                 NULL);
1857         TALLOC_FREE(subreq);
1858         if (! status) {
1859                 int ret2;
1860                 uint32_t pnn;
1861
1862                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1863                                                        state->count,
1864                                                        err_list, &pnn);
1865                 if (ret2 != 0) {
1866                         D_ERR("control DB_THAW failed for db %s on node %u,"
1867                               " ret=%d\n", state->db_name, pnn, ret2);
1868                 } else {
1869                         D_ERR("control DB_THAW failed for db %s, ret=%d\n",
1870                               state->db_name, ret);
1871                 }
1872                 tevent_req_error(req, ret);
1873                 return;
1874         }
1875
1876         tevent_req_done(req);
1877 }
1878
1879 static bool recover_db_recv(struct tevent_req *req)
1880 {
1881         return generic_recv(req, NULL);
1882 }
1883
1884
1885 /*
1886  * Start database recovery for each database
1887  *
1888  * Try to recover each database 5 times before failing recovery.
1889  */
1890
1891 struct db_recovery_state {
1892         struct tevent_context *ev;
1893         struct ctdb_dbid_map *dbmap;
1894         int num_replies;
1895         int num_failed;
1896 };
1897
1898 struct db_recovery_one_state {
1899         struct tevent_req *req;
1900         struct ctdb_client_context *client;
1901         struct ctdb_dbid_map *dbmap;
1902         struct ctdb_tunable_list *tun_list;
1903         uint32_t *pnn_list;
1904         int count;
1905         uint32_t *caps;
1906         uint32_t *ban_credits;
1907         uint32_t generation;
1908         uint32_t db_id;
1909         uint8_t db_flags;
1910         int num_fails;
1911 };
1912
1913 static void db_recovery_one_done(struct tevent_req *subreq);
1914
1915 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1916                                            struct tevent_context *ev,
1917                                            struct ctdb_client_context *client,
1918                                            struct ctdb_dbid_map *dbmap,
1919                                            struct ctdb_tunable_list *tun_list,
1920                                            uint32_t *pnn_list, int count,
1921                                            uint32_t *caps,
1922                                            uint32_t *ban_credits,
1923                                            uint32_t generation)
1924 {
1925         struct tevent_req *req, *subreq;
1926         struct db_recovery_state *state;
1927         int i;
1928
1929         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1930         if (req == NULL) {
1931                 return NULL;
1932         }
1933
1934         state->ev = ev;
1935         state->dbmap = dbmap;
1936         state->num_replies = 0;
1937         state->num_failed = 0;
1938
1939         if (dbmap->num == 0) {
1940                 tevent_req_done(req);
1941                 return tevent_req_post(req, ev);
1942         }
1943
1944         for (i=0; i<dbmap->num; i++) {
1945                 struct db_recovery_one_state *substate;
1946
1947                 substate = talloc_zero(state, struct db_recovery_one_state);
1948                 if (tevent_req_nomem(substate, req)) {
1949                         return tevent_req_post(req, ev);
1950                 }
1951
1952                 substate->req = req;
1953                 substate->client = client;
1954                 substate->dbmap = dbmap;
1955                 substate->tun_list = tun_list;
1956                 substate->pnn_list = pnn_list;
1957                 substate->count = count;
1958                 substate->caps = caps;
1959                 substate->ban_credits = ban_credits;
1960                 substate->generation = generation;
1961                 substate->db_id = dbmap->dbs[i].db_id;
1962                 substate->db_flags = dbmap->dbs[i].flags;
1963
1964                 subreq = recover_db_send(state, ev, client, tun_list,
1965                                          pnn_list, count, caps, ban_credits,
1966                                          generation, substate->db_id,
1967                                          substate->db_flags);
1968                 if (tevent_req_nomem(subreq, req)) {
1969                         return tevent_req_post(req, ev);
1970                 }
1971                 tevent_req_set_callback(subreq, db_recovery_one_done,
1972                                         substate);
1973                 D_NOTICE("recover database 0x%08x\n", substate->db_id);
1974         }
1975
1976         return req;
1977 }
1978
1979 static void db_recovery_one_done(struct tevent_req *subreq)
1980 {
1981         struct db_recovery_one_state *substate = tevent_req_callback_data(
1982                 subreq, struct db_recovery_one_state);
1983         struct tevent_req *req = substate->req;
1984         struct db_recovery_state *state = tevent_req_data(
1985                 req, struct db_recovery_state);
1986         bool status;
1987
1988         status = recover_db_recv(subreq);
1989         TALLOC_FREE(subreq);
1990
1991         if (status) {
1992                 talloc_free(substate);
1993                 goto done;
1994         }
1995
1996         substate->num_fails += 1;
1997         if (substate->num_fails < NUM_RETRIES) {
1998                 subreq = recover_db_send(state, state->ev, substate->client,
1999                                          substate->tun_list,
2000                                          substate->pnn_list, substate->count,
2001                                          substate->caps, substate->ban_credits,
2002                                          substate->generation, substate->db_id,
2003                                          substate->db_flags);
2004                 if (tevent_req_nomem(subreq, req)) {
2005                         goto failed;
2006                 }
2007                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2008                 D_NOTICE("recover database 0x%08x, attempt %d\n",
2009                          substate->db_id, substate->num_fails+1);
2010                 return;
2011         }
2012
2013 failed:
2014         state->num_failed += 1;
2015
2016 done:
2017         state->num_replies += 1;
2018
2019         if (state->num_replies == state->dbmap->num) {
2020                 tevent_req_done(req);
2021         }
2022 }
2023
2024 static bool db_recovery_recv(struct tevent_req *req, int *count)
2025 {
2026         struct db_recovery_state *state = tevent_req_data(
2027                 req, struct db_recovery_state);
2028         int err;
2029
2030         if (tevent_req_is_unix_error(req, &err)) {
2031                 *count = 0;
2032                 return false;
2033         }
2034
2035         *count = state->num_replies - state->num_failed;
2036
2037         if (state->num_failed > 0) {
2038                 return false;
2039         }
2040
2041         return true;
2042 }
2043
2044
2045 /*
2046  * Run the parallel database recovery
2047  *
2048  * - Get tunables
2049  * - Get nodemap
2050  * - Get vnnmap
2051  * - Get capabilities from all nodes
2052  * - Get dbmap
2053  * - Set RECOVERY_ACTIVE
2054  * - Send START_RECOVERY
2055  * - Update vnnmap on all nodes
2056  * - Run database recovery
2057  * - Set RECOVERY_NORMAL
2058  * - Send END_RECOVERY
2059  */
2060
2061 struct recovery_state {
2062         struct tevent_context *ev;
2063         struct ctdb_client_context *client;
2064         uint32_t generation;
2065         uint32_t *pnn_list;
2066         int count;
2067         uint32_t destnode;
2068         struct ctdb_node_map *nodemap;
2069         uint32_t *caps;
2070         uint32_t *ban_credits;
2071         struct ctdb_tunable_list *tun_list;
2072         struct ctdb_vnn_map *vnnmap;
2073         struct ctdb_dbid_map *dbmap;
2074 };
2075
2076 static void recovery_tunables_done(struct tevent_req *subreq);
2077 static void recovery_nodemap_done(struct tevent_req *subreq);
2078 static void recovery_vnnmap_done(struct tevent_req *subreq);
2079 static void recovery_capabilities_done(struct tevent_req *subreq);
2080 static void recovery_dbmap_done(struct tevent_req *subreq);
2081 static void recovery_active_done(struct tevent_req *subreq);
2082 static void recovery_start_recovery_done(struct tevent_req *subreq);
2083 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2084 static void recovery_db_recovery_done(struct tevent_req *subreq);
2085 static void recovery_failed_done(struct tevent_req *subreq);
2086 static void recovery_normal_done(struct tevent_req *subreq);
2087 static void recovery_end_recovery_done(struct tevent_req *subreq);
2088
2089 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2090                                         struct tevent_context *ev,
2091                                         struct ctdb_client_context *client,
2092                                         uint32_t generation)
2093 {
2094         struct tevent_req *req, *subreq;
2095         struct recovery_state *state;
2096         struct ctdb_req_control request;
2097
2098         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2099         if (req == NULL) {
2100                 return NULL;
2101         }
2102
2103         state->ev = ev;
2104         state->client = client;
2105         state->generation = generation;
2106         state->destnode = ctdb_client_pnn(client);
2107
2108         ctdb_req_control_get_all_tunables(&request);
2109         subreq = ctdb_client_control_send(state, state->ev, state->client,
2110                                           state->destnode, TIMEOUT(),
2111                                           &request);
2112         if (tevent_req_nomem(subreq, req)) {
2113                 return tevent_req_post(req, ev);
2114         }
2115         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2116
2117         return req;
2118 }
2119
2120 static void recovery_tunables_done(struct tevent_req *subreq)
2121 {
2122         struct tevent_req *req = tevent_req_callback_data(
2123                 subreq, struct tevent_req);
2124         struct recovery_state *state = tevent_req_data(
2125                 req, struct recovery_state);
2126         struct ctdb_reply_control *reply;
2127         struct ctdb_req_control request;
2128         int ret;
2129         bool status;
2130
2131         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2132         TALLOC_FREE(subreq);
2133         if (! status) {
2134                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2135                 tevent_req_error(req, ret);
2136                 return;
2137         }
2138
2139         ret = ctdb_reply_control_get_all_tunables(reply, state,
2140                                                   &state->tun_list);
2141         if (ret != 0) {
2142                 D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2143                 tevent_req_error(req, EPROTO);
2144                 return;
2145         }
2146
2147         talloc_free(reply);
2148
2149         recover_timeout = state->tun_list->recover_timeout;
2150
2151         ctdb_req_control_get_nodemap(&request);
2152         subreq = ctdb_client_control_send(state, state->ev, state->client,
2153                                           state->destnode, TIMEOUT(),
2154                                           &request);
2155         if (tevent_req_nomem(subreq, req)) {
2156                 return;
2157         }
2158         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2159 }
2160
2161 static void recovery_nodemap_done(struct tevent_req *subreq)
2162 {
2163         struct tevent_req *req = tevent_req_callback_data(
2164                 subreq, struct tevent_req);
2165         struct recovery_state *state = tevent_req_data(
2166                 req, struct recovery_state);
2167         struct ctdb_reply_control *reply;
2168         struct ctdb_req_control request;
2169         bool status;
2170         int ret;
2171
2172         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2173         TALLOC_FREE(subreq);
2174         if (! status) {
2175                 D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
2176                       state->destnode, ret);
2177                 tevent_req_error(req, ret);
2178                 return;
2179         }
2180
2181         ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2182         if (ret != 0) {
2183                 D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
2184                 tevent_req_error(req, ret);
2185                 return;
2186         }
2187
2188         state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2189                                             state, &state->pnn_list);
2190         if (state->count <= 0) {
2191                 tevent_req_error(req, ENOMEM);
2192                 return;
2193         }
2194
2195         state->ban_credits = talloc_zero_array(state, uint32_t,
2196                                                state->nodemap->num);
2197         if (tevent_req_nomem(state->ban_credits, req)) {
2198                 return;
2199         }
2200
2201         ctdb_req_control_getvnnmap(&request);
2202         subreq = ctdb_client_control_send(state, state->ev, state->client,
2203                                           state->destnode, TIMEOUT(),
2204                                           &request);
2205         if (tevent_req_nomem(subreq, req)) {
2206                 return;
2207         }
2208         tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2209 }
2210
2211 static void recovery_vnnmap_done(struct tevent_req *subreq)
2212 {
2213         struct tevent_req *req = tevent_req_callback_data(
2214                 subreq, struct tevent_req);
2215         struct recovery_state *state = tevent_req_data(
2216                 req, struct recovery_state);
2217         struct ctdb_reply_control *reply;
2218         struct ctdb_req_control request;
2219         bool status;
2220         int ret;
2221
2222         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2223         TALLOC_FREE(subreq);
2224         if (! status) {
2225                 D_ERR("control GETVNNMAP failed to node %u, ret=%d\n",
2226                       state->destnode, ret);
2227                 tevent_req_error(req, ret);
2228                 return;
2229         }
2230
2231         ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2232         if (ret != 0) {
2233                 D_ERR("control GETVNNMAP failed, ret=%d\n", ret);
2234                 tevent_req_error(req, ret);
2235                 return;
2236         }
2237
2238         ctdb_req_control_get_capabilities(&request);
2239         subreq = ctdb_client_control_multi_send(state, state->ev,
2240                                                 state->client,
2241                                                 state->pnn_list, state->count,
2242                                                 TIMEOUT(), &request);
2243         if (tevent_req_nomem(subreq, req)) {
2244                 return;
2245         }
2246         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2247 }
2248
2249 static void recovery_capabilities_done(struct tevent_req *subreq)
2250 {
2251         struct tevent_req *req = tevent_req_callback_data(
2252                 subreq, struct tevent_req);
2253         struct recovery_state *state = tevent_req_data(
2254                 req, struct recovery_state);
2255         struct ctdb_reply_control **reply;
2256         struct ctdb_req_control request;
2257         int *err_list;
2258         int ret, i;
2259         bool status;
2260
2261         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2262                                                 &reply);
2263         TALLOC_FREE(subreq);
2264         if (! status) {
2265                 int ret2;
2266                 uint32_t pnn;
2267
2268                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2269                                                        state->count,
2270                                                        err_list, &pnn);
2271                 if (ret2 != 0) {
2272                         D_ERR("control GET_CAPABILITIES failed on node %u,"
2273                               " ret=%d\n", pnn, ret2);
2274                 } else {
2275                         D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
2276                               ret);
2277                 }
2278                 tevent_req_error(req, ret);
2279                 return;
2280         }
2281
2282         /* Make the array size same as nodemap */
2283         state->caps = talloc_zero_array(state, uint32_t,
2284                                         state->nodemap->num);
2285         if (tevent_req_nomem(state->caps, req)) {
2286                 return;
2287         }
2288
2289         for (i=0; i<state->count; i++) {
2290                 uint32_t pnn;
2291
2292                 pnn = state->pnn_list[i];
2293                 ret = ctdb_reply_control_get_capabilities(reply[i],
2294                                                           &state->caps[pnn]);
2295                 if (ret != 0) {
2296                         D_ERR("control GET_CAPABILITIES failed on node %u\n",
2297                               pnn);
2298                         tevent_req_error(req, EPROTO);
2299                         return;
2300                 }
2301         }
2302
2303         talloc_free(reply);
2304
2305         ctdb_req_control_get_dbmap(&request);
2306         subreq = ctdb_client_control_send(state, state->ev, state->client,
2307                                           state->destnode, TIMEOUT(),
2308                                           &request);
2309         if (tevent_req_nomem(subreq, req)) {
2310                 return;
2311         }
2312         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2313 }
2314
2315 static void recovery_dbmap_done(struct tevent_req *subreq)
2316 {
2317         struct tevent_req *req = tevent_req_callback_data(
2318                 subreq, struct tevent_req);
2319         struct recovery_state *state = tevent_req_data(
2320                 req, struct recovery_state);
2321         struct ctdb_reply_control *reply;
2322         struct ctdb_req_control request;
2323         int ret;
2324         bool status;
2325
2326         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2327         TALLOC_FREE(subreq);
2328         if (! status) {
2329                 D_ERR("control GET_DBMAP failed to node %u, ret=%d\n",
2330                       state->destnode, ret);
2331                 tevent_req_error(req, ret);
2332                 return;
2333         }
2334
2335         ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2336         if (ret != 0) {
2337                 D_ERR("control GET_DBMAP failed, ret=%d\n", ret);
2338                 tevent_req_error(req, ret);
2339                 return;
2340         }
2341
2342         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2343         subreq = ctdb_client_control_multi_send(state, state->ev,
2344                                                 state->client,
2345                                                 state->pnn_list, state->count,
2346                                                 TIMEOUT(), &request);
2347         if (tevent_req_nomem(subreq, req)) {
2348                 return;
2349         }
2350         tevent_req_set_callback(subreq, recovery_active_done, req);
2351 }
2352
2353 static void recovery_active_done(struct tevent_req *subreq)
2354 {
2355         struct tevent_req *req = tevent_req_callback_data(
2356                 subreq, struct tevent_req);
2357         struct recovery_state *state = tevent_req_data(
2358                 req, struct recovery_state);
2359         struct ctdb_req_control request;
2360         struct ctdb_vnn_map *vnnmap;
2361         int *err_list;
2362         int ret, i;
2363         unsigned int count;
2364         bool status;
2365
2366         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2367                                                 NULL);
2368         TALLOC_FREE(subreq);
2369         if (! status) {
2370                 int ret2;
2371                 uint32_t pnn;
2372
2373                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2374                                                        state->count,
2375                                                        err_list, &pnn);
2376                 if (ret2 != 0) {
2377                         D_ERR("failed to set recovery mode ACTIVE on node %u,"
2378                               " ret=%d\n", pnn, ret2);
2379                 } else {
2380                         D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
2381                               ret);
2382                 }
2383                 tevent_req_error(req, ret);
2384                 return;
2385         }
2386
2387         D_ERR("Set recovery mode to ACTIVE\n");
2388
2389         /* Calculate new VNNMAP */
2390         count = 0;
2391         for (i=0; i<state->nodemap->num; i++) {
2392                 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2393                         continue;
2394                 }
2395                 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2396                         continue;
2397                 }
2398                 count += 1;
2399         }
2400
2401         if (count == 0) {
2402                 D_WARNING("No active lmasters found. Adding recmaster anyway\n");
2403         }
2404
2405         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2406         if (tevent_req_nomem(vnnmap, req)) {
2407                 return;
2408         }
2409
2410         vnnmap->size = (count == 0 ? 1 : count);
2411         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2412         if (tevent_req_nomem(vnnmap->map, req)) {
2413                 return;
2414         }
2415
2416         if (count == 0) {
2417                 vnnmap->map[0] = state->destnode;
2418         } else {
2419                 count = 0;
2420                 for (i=0; i<state->nodemap->num; i++) {
2421                         if (state->nodemap->node[i].flags &
2422                             NODE_FLAGS_INACTIVE) {
2423                                 continue;
2424                         }
2425                         if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2426                                 continue;
2427                         }
2428
2429                         vnnmap->map[count] = state->nodemap->node[i].pnn;
2430                         count += 1;
2431                 }
2432         }
2433
2434         vnnmap->generation = state->generation;
2435
2436         talloc_free(state->vnnmap);
2437         state->vnnmap = vnnmap;
2438
2439         ctdb_req_control_start_recovery(&request);
2440         subreq = ctdb_client_control_multi_send(state, state->ev,
2441                                                 state->client,
2442                                                 state->pnn_list, state->count,
2443                                                 TIMEOUT(), &request);
2444         if (tevent_req_nomem(subreq, req)) {
2445                 return;
2446         }
2447         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2448 }
2449
2450 static void recovery_start_recovery_done(struct tevent_req *subreq)
2451 {
2452         struct tevent_req *req = tevent_req_callback_data(
2453                 subreq, struct tevent_req);
2454         struct recovery_state *state = tevent_req_data(
2455                 req, struct recovery_state);
2456         struct ctdb_req_control request;
2457         int *err_list;
2458         int ret;
2459         bool status;
2460
2461         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2462                                                 NULL);
2463         TALLOC_FREE(subreq);
2464         if (! status) {
2465                 int ret2;
2466                 uint32_t pnn;
2467
2468                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2469                                                        state->count,
2470                                                        err_list, &pnn);
2471                 if (ret2 != 0) {
2472                         D_ERR("failed to run start_recovery event on node %u,"
2473                               " ret=%d\n", pnn, ret2);
2474                 } else {
2475                         D_ERR("failed to run start_recovery event, ret=%d\n",
2476                               ret);
2477                 }
2478                 tevent_req_error(req, ret);
2479                 return;
2480         }
2481
2482         D_ERR("start_recovery event finished\n");
2483
2484         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2485         subreq = ctdb_client_control_multi_send(state, state->ev,
2486                                                 state->client,
2487                                                 state->pnn_list, state->count,
2488                                                 TIMEOUT(), &request);
2489         if (tevent_req_nomem(subreq, req)) {
2490                 return;
2491         }
2492         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2493 }
2494
2495 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2496 {
2497         struct tevent_req *req = tevent_req_callback_data(
2498                 subreq, struct tevent_req);
2499         struct recovery_state *state = tevent_req_data(
2500                 req, struct recovery_state);
2501         int *err_list;
2502         int ret;
2503         bool status;
2504
2505         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2506                                                 NULL);
2507         TALLOC_FREE(subreq);
2508         if (! status) {
2509                 int ret2;
2510                 uint32_t pnn;
2511
2512                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2513                                                        state->count,
2514                                                        err_list, &pnn);
2515                 if (ret2 != 0) {
2516                         D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
2517                               pnn, ret2);
2518                 } else {
2519                         D_ERR("failed to update VNNMAP, ret=%d\n", ret);
2520                 }
2521                 tevent_req_error(req, ret);
2522                 return;
2523         }
2524
2525         D_NOTICE("updated VNNMAP\n");
2526
2527         subreq = db_recovery_send(state, state->ev, state->client,
2528                                   state->dbmap, state->tun_list,
2529                                   state->pnn_list, state->count,
2530                                   state->caps, state->ban_credits,
2531                                   state->vnnmap->generation);
2532         if (tevent_req_nomem(subreq, req)) {
2533                 return;
2534         }
2535         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2536 }
2537
2538 static void recovery_db_recovery_done(struct tevent_req *subreq)
2539 {
2540         struct tevent_req *req = tevent_req_callback_data(
2541                 subreq, struct tevent_req);
2542         struct recovery_state *state = tevent_req_data(
2543                 req, struct recovery_state);
2544         struct ctdb_req_control request;
2545         bool status;
2546         int count;
2547
2548         status = db_recovery_recv(subreq, &count);
2549         TALLOC_FREE(subreq);
2550
2551         D_ERR("%d of %d databases recovered\n", count, state->dbmap->num);
2552
2553         if (! status) {
2554                 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2555                 int i;
2556
2557                 /* Bans are not enabled */
2558                 if (state->tun_list->enable_bans == 0) {
2559                         tevent_req_error(req, EIO);
2560                         return;
2561                 }
2562
2563                 for (i=0; i<state->count; i++) {
2564                         uint32_t pnn;
2565                         pnn = state->pnn_list[i];
2566                         if (state->ban_credits[pnn] > max_credits) {
2567                                 max_pnn = pnn;
2568                                 max_credits = state->ban_credits[pnn];
2569                         }
2570                 }
2571
2572                 /* If pulling database fails multiple times */
2573                 if (max_credits >= NUM_RETRIES) {
2574                         struct ctdb_ban_state ban_state = {
2575                                 .pnn = max_pnn,
2576                                 .time = state->tun_list->recovery_ban_period,
2577                         };
2578
2579                         D_ERR("Banning node %u for %u seconds\n",
2580                               ban_state.pnn,
2581                               ban_state.time);
2582
2583                         ctdb_req_control_set_ban_state(&request,
2584                                                        &ban_state);
2585                         subreq = ctdb_client_control_send(state,
2586                                                           state->ev,
2587                                                           state->client,
2588                                                           ban_state.pnn,
2589                                                           TIMEOUT(),
2590                                                           &request);
2591                         if (tevent_req_nomem(subreq, req)) {
2592                                 return;
2593                         }
2594                         tevent_req_set_callback(subreq,
2595                                                 recovery_failed_done,
2596                                                 req);
2597                 } else {
2598                         tevent_req_error(req, EIO);
2599                 }
2600                 return;
2601         }
2602
2603         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2604         subreq = ctdb_client_control_multi_send(state, state->ev,
2605                                                 state->client,
2606                                                 state->pnn_list, state->count,
2607                                                 TIMEOUT(), &request);
2608         if (tevent_req_nomem(subreq, req)) {
2609                 return;
2610         }
2611         tevent_req_set_callback(subreq, recovery_normal_done, req);
2612 }
2613
2614 static void recovery_failed_done(struct tevent_req *subreq)
2615 {
2616         struct tevent_req *req = tevent_req_callback_data(
2617                 subreq, struct tevent_req);
2618         struct recovery_state *state = tevent_req_data(
2619                 req, struct recovery_state);
2620         struct ctdb_reply_control *reply;
2621         int ret;
2622         bool status;
2623
2624         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2625         TALLOC_FREE(subreq);
2626         if (! status) {
2627                 D_ERR("failed to ban node, ret=%d\n", ret);
2628                 goto done;
2629         }
2630
2631         ret = ctdb_reply_control_set_ban_state(reply);
2632         if (ret != 0) {
2633                 D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
2634         }
2635
2636 done:
2637         tevent_req_error(req, EIO);
2638 }
2639
2640 static void recovery_normal_done(struct tevent_req *subreq)
2641 {
2642         struct tevent_req *req = tevent_req_callback_data(
2643                 subreq, struct tevent_req);
2644         struct recovery_state *state = tevent_req_data(
2645                 req, struct recovery_state);
2646         struct ctdb_req_control request;
2647         int *err_list;
2648         int ret;
2649         bool status;
2650
2651         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2652                                                 NULL);
2653         TALLOC_FREE(subreq);
2654         if (! status) {
2655                 int ret2;
2656                 uint32_t pnn;
2657
2658                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2659                                                        state->count,
2660                                                        err_list, &pnn);
2661                 if (ret2 != 0) {
2662                         D_ERR("failed to set recovery mode NORMAL on node %u,"
2663                               " ret=%d\n", pnn, ret2);
2664                 } else {
2665                         D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
2666                               ret);
2667                 }
2668                 tevent_req_error(req, ret);
2669                 return;
2670         }
2671
2672         D_ERR("Set recovery mode to NORMAL\n");
2673
2674         ctdb_req_control_end_recovery(&request);
2675         subreq = ctdb_client_control_multi_send(state, state->ev,
2676                                                 state->client,
2677                                                 state->pnn_list, state->count,
2678                                                 TIMEOUT(), &request);
2679         if (tevent_req_nomem(subreq, req)) {
2680                 return;
2681         }
2682         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2683 }
2684
2685 static void recovery_end_recovery_done(struct tevent_req *subreq)
2686 {
2687         struct tevent_req *req = tevent_req_callback_data(
2688                 subreq, struct tevent_req);
2689         struct recovery_state *state = tevent_req_data(
2690                 req, struct recovery_state);
2691         int *err_list;
2692         int ret;
2693         bool status;
2694
2695         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2696                                                 NULL);
2697         TALLOC_FREE(subreq);
2698         if (! status) {
2699                 int ret2;
2700                 uint32_t pnn;
2701
2702                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2703                                                        state->count,
2704                                                        err_list, &pnn);
2705                 if (ret2 != 0) {
2706                         D_ERR("failed to run recovered event on node %u,"
2707                               " ret=%d\n", pnn, ret2);
2708                 } else {
2709                         D_ERR("failed to run recovered event, ret=%d\n", ret);
2710                 }
2711                 tevent_req_error(req, ret);
2712                 return;
2713         }
2714
2715         D_ERR("recovered event finished\n");
2716
2717         tevent_req_done(req);
2718 }
2719
2720 static void recovery_recv(struct tevent_req *req, int *perr)
2721 {
2722         generic_recv(req, perr);
2723 }
2724
2725 static void usage(const char *progname)
2726 {
2727         fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
2728                 progname);
2729 }
2730
2731
2732 /*
2733  * Arguments - log fd, write fd, socket path, generation
2734  */
2735 int main(int argc, char *argv[])
2736 {
2737         int write_fd;
2738         const char *sockpath;
2739         TALLOC_CTX *mem_ctx;
2740         struct tevent_context *ev;
2741         struct ctdb_client_context *client;
2742         int ret;
2743         struct tevent_req *req;
2744         uint32_t generation;
2745
2746         if (argc != 4) {
2747                 usage(argv[0]);
2748                 exit(1);
2749         }
2750
2751         write_fd = atoi(argv[1]);
2752         sockpath = argv[2];
2753         generation = (uint32_t)strtoul(argv[3], NULL, 0);
2754
2755         mem_ctx = talloc_new(NULL);
2756         if (mem_ctx == NULL) {
2757                 fprintf(stderr, "recovery: talloc_new() failed\n");
2758                 goto failed;
2759         }
2760
2761         ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
2762         if (ret != 0) {
2763                 fprintf(stderr, "recovery: Unable to initialize logging\n");
2764                 goto failed;
2765         }
2766
2767         ev = tevent_context_init(mem_ctx);
2768         if (ev == NULL) {
2769                 D_ERR("tevent_context_init() failed\n");
2770                 goto failed;
2771         }
2772
2773         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2774         if (ret != 0) {
2775                 D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
2776                 goto failed;
2777         }
2778
2779         req = recovery_send(mem_ctx, ev, client, generation);
2780         if (req == NULL) {
2781                 D_ERR("database_recover_send() failed\n");
2782                 goto failed;
2783         }
2784
2785         if (! tevent_req_poll(req, ev)) {
2786                 D_ERR("tevent_req_poll() failed\n");
2787                 goto failed;
2788         }
2789
2790         recovery_recv(req, &ret);
2791         TALLOC_FREE(req);
2792         if (ret != 0) {
2793                 D_ERR("database recovery failed, ret=%d\n", ret);
2794                 goto failed;
2795         }
2796
2797         sys_write(write_fd, &ret, sizeof(ret));
2798         return 0;
2799
2800 failed:
2801         TALLOC_FREE(mem_ctx);
2802         return 1;
2803 }