ctdb-recovery: Update timeout and number of retries during recovery
[samba.git] / ctdb / server / ctdb_recovery_helper.c
1 /*
2    ctdb parallel database recovery
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23
24 #include <talloc.h>
25 #include <tevent.h>
26 #include <tdb.h>
27 #include <libgen.h>
28
29 #include "lib/tdb_wrap/tdb_wrap.h"
30 #include "lib/util/time.h"
31 #include "lib/util/tevent_unix.h"
32
33 #include "protocol/protocol.h"
34 #include "protocol/protocol_api.h"
35 #include "client/client.h"
36
37 static int recover_timeout = 30;
38
39 #define NUM_RETRIES     3
40
41 #define TIMEOUT()       timeval_current_ofs(recover_timeout, 0)
42
43 static void LOG(const char *fmt, ...)
44 {
45         va_list ap;
46
47         va_start(ap, fmt);
48         vfprintf(stderr, fmt, ap);
49         va_end(ap);
50 }
51
52 /*
53  * Utility functions
54  */
55
56 static ssize_t sys_write(int fd, const void *buf, size_t count)
57 {
58         ssize_t ret;
59
60         do {
61                 ret = write(fd, buf, count);
62 #if defined(EWOULDBLOCK)
63         } while (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
64 #else
65         } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
66 #endif
67         return ret;
68 }
69
70 static bool generic_recv(struct tevent_req *req, int *perr)
71 {
72         int err;
73
74         if (tevent_req_is_unix_error(req, &err)) {
75                 if (perr != NULL) {
76                         *perr = err;
77                 }
78                 return false;
79         }
80
81         return true;
82 }
83
84 static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
85
86 static uint64_t srvid_next(void)
87 {
88         rec_srvid += 1;
89         return rec_srvid;
90 }
91
92 /*
93  * Recovery database functions
94  */
95
96 struct recdb_context {
97         uint32_t db_id;
98         const char *db_name;
99         const char *db_path;
100         struct tdb_wrap *db;
101         bool persistent;
102 };
103
104 static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
105                                           const char *db_name,
106                                           const char *db_path,
107                                           uint32_t hash_size, bool persistent)
108 {
109         static char *db_dir_state = NULL;
110         struct recdb_context *recdb;
111         unsigned int tdb_flags;
112
113         recdb = talloc(mem_ctx, struct recdb_context);
114         if (recdb == NULL) {
115                 return NULL;
116         }
117
118         if (db_dir_state == NULL) {
119                 db_dir_state = getenv("CTDB_DBDIR_STATE");
120         }
121
122         recdb->db_name = db_name;
123         recdb->db_id = db_id;
124         recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
125                                          db_dir_state != NULL ?
126                                             db_dir_state :
127                                             dirname(discard_const(db_path)),
128                                          db_name);
129         if (recdb->db_path == NULL) {
130                 talloc_free(recdb);
131                 return NULL;
132         }
133         unlink(recdb->db_path);
134
135         tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
136         recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
137                                   tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
138         if (recdb->db == NULL) {
139                 talloc_free(recdb);
140                 LOG("failed to create recovery db %s\n", recdb->db_path);
141         }
142
143         recdb->persistent = persistent;
144
145         return recdb;
146 }
147
148 static uint32_t recdb_id(struct recdb_context *recdb)
149 {
150         return recdb->db_id;
151 }
152
153 static const char *recdb_name(struct recdb_context *recdb)
154 {
155         return recdb->db_name;
156 }
157
158 static const char *recdb_path(struct recdb_context *recdb)
159 {
160         return recdb->db_path;
161 }
162
163 static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
164 {
165         return recdb->db->tdb;
166 }
167
168 static bool recdb_persistent(struct recdb_context *recdb)
169 {
170         return recdb->persistent;
171 }
172
173 struct recdb_add_traverse_state {
174         struct recdb_context *recdb;
175         int mypnn;
176 };
177
178 static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
179                               TDB_DATA key, TDB_DATA data,
180                               void *private_data)
181 {
182         struct recdb_add_traverse_state *state =
183                 (struct recdb_add_traverse_state *)private_data;
184         struct ctdb_ltdb_header *hdr;
185         TDB_DATA prev_data;
186         int ret;
187
188         /* header is not marshalled separately in the pulldb control */
189         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
190                 return -1;
191         }
192
193         hdr = (struct ctdb_ltdb_header *)data.dptr;
194
195         /* fetch the existing record, if any */
196         prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
197
198         if (prev_data.dptr != NULL) {
199                 struct ctdb_ltdb_header prev_hdr;
200
201                 prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
202                 free(prev_data.dptr);
203                 if (hdr->rsn < prev_hdr.rsn ||
204                     (hdr->rsn == prev_hdr.rsn &&
205                      prev_hdr.dmaster != state->mypnn)) {
206                         return 0;
207                 }
208         }
209
210         ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
211         if (ret != 0) {
212                 return -1;
213         }
214         return 0;
215 }
216
217 static bool recdb_add(struct recdb_context *recdb, int mypnn,
218                       struct ctdb_rec_buffer *recbuf)
219 {
220         struct recdb_add_traverse_state state;
221         int ret;
222
223         state.recdb = recdb;
224         state.mypnn = mypnn;
225
226         ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
227         if (ret != 0) {
228                 return false;
229         }
230
231         return true;
232 }
233
234 /* This function decides which records from recdb are retained */
235 static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
236                              uint32_t reqid, uint32_t dmaster,
237                              TDB_DATA key, TDB_DATA data)
238 {
239         struct ctdb_ltdb_header *header;
240         int ret;
241
242         /*
243          * skip empty records - but NOT for persistent databases:
244          *
245          * The record-by-record mode of recovery deletes empty records.
246          * For persistent databases, this can lead to data corruption
247          * by deleting records that should be there:
248          *
249          * - Assume the cluster has been running for a while.
250          *
251          * - A record R in a persistent database has been created and
252          *   deleted a couple of times, the last operation being deletion,
253          *   leaving an empty record with a high RSN, say 10.
254          *
255          * - Now a node N is turned off.
256          *
257          * - This leaves the local database copy of D on N with the empty
258          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
259          *   the copy of record R.
260          *
261          * - Now the record is created again while node N is turned off.
262          *   This creates R with RSN = 1 on all nodes except for N.
263          *
264          * - Now node N is turned on again. The following recovery will chose
265          *   the older empty copy of R due to RSN 10 > RSN 1.
266          *
267          * ==> Hence the record is gone after the recovery.
268          *
269          * On databases like Samba's registry, this can damage the higher-level
270          * data structures built from the various tdb-level records.
271          */
272         if (!persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
273                 return 0;
274         }
275
276         /* update the dmaster field to point to us */
277         header = (struct ctdb_ltdb_header *)data.dptr;
278         if (!persistent) {
279                 header->dmaster = dmaster;
280                 header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
281         }
282
283         ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
284         if (ret != 0) {
285                 return ret;
286         }
287
288         return 0;
289 }
290
291 struct recdb_records_traverse_state {
292         struct ctdb_rec_buffer *recbuf;
293         uint32_t dmaster;
294         uint32_t reqid;
295         bool persistent;
296         bool failed;
297 };
298
299 static int recdb_records_traverse(struct tdb_context *tdb,
300                                   TDB_DATA key, TDB_DATA data,
301                                   void *private_data)
302 {
303         struct recdb_records_traverse_state *state =
304                 (struct recdb_records_traverse_state *)private_data;
305         int ret;
306
307         ret = recbuf_filter_add(state->recbuf, state->persistent,
308                                 state->reqid, state->dmaster, key, data);
309         if (ret != 0) {
310                 state->failed = true;
311                 return ret;
312         }
313
314         return 0;
315 }
316
317 static struct ctdb_rec_buffer *recdb_records(struct recdb_context *recdb,
318                                              TALLOC_CTX *mem_ctx,
319                                              uint32_t dmaster)
320 {
321         struct recdb_records_traverse_state state;
322         int ret;
323
324         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
325         if (state.recbuf == NULL) {
326                 return NULL;
327         }
328         state.dmaster = dmaster;
329         state.reqid = 0;
330         state.persistent = recdb_persistent(recdb);
331         state.failed = false;
332
333         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_records_traverse,
334                                 &state);
335         if (ret == -1 || state.failed) {
336                 LOG("Failed to marshall recovery records for %s\n",
337                     recdb_name(recdb));
338                 TALLOC_FREE(state.recbuf);
339                 return NULL;
340         }
341
342         return state.recbuf;
343 }
344
345 struct recdb_file_traverse_state {
346         struct ctdb_rec_buffer *recbuf;
347         struct recdb_context *recdb;
348         TALLOC_CTX *mem_ctx;
349         uint32_t dmaster;
350         uint32_t reqid;
351         bool persistent;
352         bool failed;
353         int fd;
354         int max_size;
355         int num_buffers;
356 };
357
358 static int recdb_file_traverse(struct tdb_context *tdb,
359                                TDB_DATA key, TDB_DATA data,
360                                void *private_data)
361 {
362         struct recdb_file_traverse_state *state =
363                 (struct recdb_file_traverse_state *)private_data;
364         int ret;
365
366         ret = recbuf_filter_add(state->recbuf, state->persistent,
367                                 state->reqid, state->dmaster, key, data);
368         if (ret != 0) {
369                 state->failed = true;
370                 return ret;
371         }
372
373         if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
374                 ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
375                 if (ret != 0) {
376                         LOG("Failed to collect recovery records for %s\n",
377                             recdb_name(state->recdb));
378                         state->failed = true;
379                         return ret;
380                 }
381
382                 state->num_buffers += 1;
383
384                 TALLOC_FREE(state->recbuf);
385                 state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
386                                                      recdb_id(state->recdb));
387                 if (state->recbuf == NULL) {
388                         state->failed = true;
389                         return ENOMEM;
390                 }
391         }
392
393         return 0;
394 }
395
396 static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
397                       uint32_t dmaster, int fd, int max_size)
398 {
399         struct recdb_file_traverse_state state;
400         int ret;
401
402         state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
403         if (state.recbuf == NULL) {
404                 return -1;
405         }
406         state.recdb = recdb;
407         state.mem_ctx = mem_ctx;
408         state.dmaster = dmaster;
409         state.reqid = 0;
410         state.persistent = recdb_persistent(recdb);
411         state.failed = false;
412         state.fd = fd;
413         state.max_size = max_size;
414         state.num_buffers = 0;
415
416         ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
417         if (ret == -1 || state.failed) {
418                 TALLOC_FREE(state.recbuf);
419                 return -1;
420         }
421
422         ret = ctdb_rec_buffer_write(state.recbuf, fd);
423         if (ret != 0) {
424                 LOG("Failed to collect recovery records for %s\n",
425                     recdb_name(recdb));
426                 TALLOC_FREE(state.recbuf);
427                 return -1;
428         }
429         state.num_buffers += 1;
430
431         LOG("Wrote %d buffers of recovery records for %s\n",
432             state.num_buffers, recdb_name(recdb));
433
434         return state.num_buffers;
435 }
436
437 /*
438  * Pull database from a single node
439  */
440
441 struct pull_database_state {
442         struct tevent_context *ev;
443         struct ctdb_client_context *client;
444         struct recdb_context *recdb;
445         uint32_t pnn;
446         uint64_t srvid;
447         int num_records;
448 };
449
450 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
451                                   void *private_data);
452 static void pull_database_register_done(struct tevent_req *subreq);
453 static void pull_database_old_done(struct tevent_req *subreq);
454 static void pull_database_unregister_done(struct tevent_req *subreq);
455 static void pull_database_new_done(struct tevent_req *subreq);
456
457 static struct tevent_req *pull_database_send(
458                         TALLOC_CTX *mem_ctx,
459                         struct tevent_context *ev,
460                         struct ctdb_client_context *client,
461                         uint32_t pnn, uint32_t caps,
462                         struct recdb_context *recdb)
463 {
464         struct tevent_req *req, *subreq;
465         struct pull_database_state *state;
466         struct ctdb_req_control request;
467
468         req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
469         if (req == NULL) {
470                 return NULL;
471         }
472
473         state->ev = ev;
474         state->client = client;
475         state->recdb = recdb;
476         state->pnn = pnn;
477         state->srvid = srvid_next();
478
479         if (caps & CTDB_CAP_FRAGMENTED_CONTROLS) {
480                 subreq = ctdb_client_set_message_handler_send(
481                                         state, state->ev, state->client,
482                                         state->srvid, pull_database_handler,
483                                         req);
484                 if (tevent_req_nomem(subreq, req)) {
485                         return tevent_req_post(req, ev);
486                 }
487
488                 tevent_req_set_callback(subreq, pull_database_register_done,
489                                         req);
490
491         } else {
492                 struct ctdb_pulldb pulldb;
493
494                 pulldb.db_id = recdb_id(recdb);
495                 pulldb.lmaster = CTDB_LMASTER_ANY;
496
497                 ctdb_req_control_pull_db(&request, &pulldb);
498                 subreq = ctdb_client_control_send(state, state->ev,
499                                                   state->client,
500                                                   pnn, TIMEOUT(),
501                                                   &request);
502                 if (tevent_req_nomem(subreq, req)) {
503                         return tevent_req_post(req, ev);
504                 }
505                 tevent_req_set_callback(subreq, pull_database_old_done, req);
506         }
507
508         return req;
509 }
510
511 static void pull_database_handler(uint64_t srvid, TDB_DATA data,
512                                   void *private_data)
513 {
514         struct tevent_req *req = talloc_get_type_abort(
515                 private_data, struct tevent_req);
516         struct pull_database_state *state = tevent_req_data(
517                 req, struct pull_database_state);
518         struct ctdb_rec_buffer *recbuf;
519         int ret;
520         bool status;
521
522         if (srvid != state->srvid) {
523                 return;
524         }
525
526         ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf);
527         if (ret != 0) {
528                 LOG("Invalid data received for DB_PULL messages\n");
529                 return;
530         }
531
532         if (recbuf->db_id != recdb_id(state->recdb)) {
533                 talloc_free(recbuf);
534                 LOG("Invalid dbid:%08x for DB_PULL messages for %s\n",
535                     recbuf->db_id, recdb_name(state->recdb));
536                 return;
537         }
538
539         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
540                            recbuf);
541         if (! status) {
542                 talloc_free(recbuf);
543                 LOG("Failed to add records to recdb for %s\n",
544                     recdb_name(state->recdb));
545                 return;
546         }
547
548         state->num_records += recbuf->count;
549         talloc_free(recbuf);
550 }
551
552 static void pull_database_register_done(struct tevent_req *subreq)
553 {
554         struct tevent_req *req = tevent_req_callback_data(
555                 subreq, struct tevent_req);
556         struct pull_database_state *state = tevent_req_data(
557                 req, struct pull_database_state);
558         struct ctdb_req_control request;
559         struct ctdb_pulldb_ext pulldb_ext;
560         int ret;
561         bool status;
562
563         status = ctdb_client_set_message_handler_recv(subreq, &ret);
564         TALLOC_FREE(subreq);
565         if (! status) {
566                 LOG("failed to set message handler for DB_PULL for %s\n",
567                     recdb_name(state->recdb));
568                 tevent_req_error(req, ret);
569                 return;
570         }
571
572         pulldb_ext.db_id = recdb_id(state->recdb);
573         pulldb_ext.lmaster = CTDB_LMASTER_ANY;
574         pulldb_ext.srvid = state->srvid;
575
576         ctdb_req_control_db_pull(&request, &pulldb_ext);
577         subreq = ctdb_client_control_send(state, state->ev, state->client,
578                                           state->pnn, TIMEOUT(), &request);
579         if (tevent_req_nomem(subreq, req)) {
580                 return;
581         }
582         tevent_req_set_callback(subreq, pull_database_new_done, req);
583 }
584
585 static void pull_database_old_done(struct tevent_req *subreq)
586 {
587         struct tevent_req *req = tevent_req_callback_data(
588                 subreq, struct tevent_req);
589         struct pull_database_state *state = tevent_req_data(
590                 req, struct pull_database_state);
591         struct ctdb_reply_control *reply;
592         struct ctdb_rec_buffer *recbuf;
593         int ret;
594         bool status;
595
596         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
597         TALLOC_FREE(subreq);
598         if (! status) {
599                 LOG("control PULL_DB failed for %s on node %u, ret=%d\n",
600                     recdb_name(state->recdb), state->pnn, ret);
601                 tevent_req_error(req, ret);
602                 return;
603         }
604
605         ret = ctdb_reply_control_pull_db(reply, state, &recbuf);
606         talloc_free(reply);
607         if (ret != 0) {
608                 tevent_req_error(req, ret);
609                 return;
610         }
611
612         status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
613                            recbuf);
614         if (! status) {
615                 talloc_free(recbuf);
616                 tevent_req_error(req, EIO);
617                 return;
618         }
619
620         state->num_records = recbuf->count;
621         talloc_free(recbuf);
622
623         LOG("Pulled %d records for db %s from node %d\n",
624             state->num_records, recdb_name(state->recdb), state->pnn);
625
626         tevent_req_done(req);
627 }
628
629 static void pull_database_new_done(struct tevent_req *subreq)
630 {
631         struct tevent_req *req = tevent_req_callback_data(
632                 subreq, struct tevent_req);
633         struct pull_database_state *state = tevent_req_data(
634                 req, struct pull_database_state);
635         struct ctdb_reply_control *reply;
636         uint32_t num_records;
637         int ret;
638         bool status;
639
640         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
641         TALLOC_FREE(subreq);
642         if (! status) {
643                 LOG("control DB_PULL failed for %s on node %u, ret=%d\n",
644                     recdb_name(state->recdb), state->pnn, ret);
645                 tevent_req_error(req, ret);
646                 return;
647         }
648
649         ret = ctdb_reply_control_db_pull(reply, &num_records);
650         talloc_free(reply);
651         if (num_records != state->num_records) {
652                 LOG("mismatch (%u != %u) in DB_PULL records for %s\n",
653                     num_records, state->num_records, recdb_name(state->recdb));
654                 tevent_req_error(req, EIO);
655                 return;
656         }
657
658         LOG("Pulled %d records for db %s from node %d\n",
659             state->num_records, recdb_name(state->recdb), state->pnn);
660
661         subreq = ctdb_client_remove_message_handler_send(
662                                         state, state->ev, state->client,
663                                         state->srvid, req);
664         if (tevent_req_nomem(subreq, req)) {
665                 return;
666         }
667         tevent_req_set_callback(subreq, pull_database_unregister_done, req);
668 }
669
670 static void pull_database_unregister_done(struct tevent_req *subreq)
671 {
672         struct tevent_req *req = tevent_req_callback_data(
673                 subreq, struct tevent_req);
674         struct pull_database_state *state = tevent_req_data(
675                 req, struct pull_database_state);
676         int ret;
677         bool status;
678
679         status = ctdb_client_remove_message_handler_recv(subreq, &ret);
680         TALLOC_FREE(subreq);
681         if (! status) {
682                 LOG("failed to remove message handler for DB_PULL for %s\n",
683                     recdb_name(state->recdb));
684                 tevent_req_error(req, ret);
685                 return;
686         }
687
688         tevent_req_done(req);
689 }
690
691 static bool pull_database_recv(struct tevent_req *req, int *perr)
692 {
693         return generic_recv(req, perr);
694 }
695
696 /*
697  * Push database to specified nodes (old style)
698  */
699
700 struct push_database_old_state {
701         struct tevent_context *ev;
702         struct ctdb_client_context *client;
703         struct recdb_context *recdb;
704         uint32_t *pnn_list;
705         int count;
706         struct ctdb_rec_buffer *recbuf;
707         int index;
708 };
709
710 static void push_database_old_push_done(struct tevent_req *subreq);
711
712 static struct tevent_req *push_database_old_send(
713                         TALLOC_CTX *mem_ctx,
714                         struct tevent_context *ev,
715                         struct ctdb_client_context *client,
716                         uint32_t *pnn_list, int count,
717                         struct recdb_context *recdb)
718 {
719         struct tevent_req *req, *subreq;
720         struct push_database_old_state *state;
721         struct ctdb_req_control request;
722         uint32_t pnn;
723
724         req = tevent_req_create(mem_ctx, &state,
725                                 struct push_database_old_state);
726         if (req == NULL) {
727                 return NULL;
728         }
729
730         state->ev = ev;
731         state->client = client;
732         state->recdb = recdb;
733         state->pnn_list = pnn_list;
734         state->count = count;
735         state->index = 0;
736
737         state->recbuf = recdb_records(recdb, state,
738                                       ctdb_client_pnn(client));
739         if (tevent_req_nomem(state->recbuf, req)) {
740                 return tevent_req_post(req, ev);
741         }
742
743         pnn = state->pnn_list[state->index];
744
745         ctdb_req_control_push_db(&request, state->recbuf);
746         subreq = ctdb_client_control_send(state, ev, client, pnn,
747                                           TIMEOUT(), &request);
748         if (tevent_req_nomem(subreq, req)) {
749                 return tevent_req_post(req, ev);
750         }
751         tevent_req_set_callback(subreq, push_database_old_push_done, req);
752
753         return req;
754 }
755
756 static void push_database_old_push_done(struct tevent_req *subreq)
757 {
758         struct tevent_req *req = tevent_req_callback_data(
759                 subreq, struct tevent_req);
760         struct push_database_old_state *state = tevent_req_data(
761                 req, struct push_database_old_state);
762         struct ctdb_req_control request;
763         uint32_t pnn;
764         int ret;
765         bool status;
766
767         status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
768         TALLOC_FREE(subreq);
769         if (! status) {
770                 LOG("control PUSH_DB failed for db %s on node %u, ret=%d\n",
771                     recdb_name(state->recdb), state->pnn_list[state->index],
772                     ret);
773                 tevent_req_error(req, ret);
774                 return;
775         }
776
777         state->index += 1;
778         if (state->index == state->count) {
779                 TALLOC_FREE(state->recbuf);
780                 tevent_req_done(req);
781                 return;
782         }
783
784         pnn = state->pnn_list[state->index];
785
786         ctdb_req_control_push_db(&request, state->recbuf);
787         subreq = ctdb_client_control_send(state, state->ev, state->client,
788                                           pnn, TIMEOUT(), &request);
789         if (tevent_req_nomem(subreq, req)) {
790                 return;
791         }
792         tevent_req_set_callback(subreq, push_database_old_push_done, req);
793 }
794
795 static bool push_database_old_recv(struct tevent_req *req, int *perr)
796 {
797         return generic_recv(req, perr);
798 }
799
800 /*
801  * Push database to specified nodes (new style)
802  */
803
804 struct push_database_new_state {
805         struct tevent_context *ev;
806         struct ctdb_client_context *client;
807         struct recdb_context *recdb;
808         uint32_t *pnn_list;
809         int count;
810         uint64_t srvid;
811         uint32_t dmaster;
812         int fd;
813         int num_buffers;
814         int num_buffers_sent;
815         int num_records;
816 };
817
818 static void push_database_new_started(struct tevent_req *subreq);
819 static void push_database_new_send_msg(struct tevent_req *req);
820 static void push_database_new_send_done(struct tevent_req *subreq);
821 static void push_database_new_confirmed(struct tevent_req *subreq);
822
823 static struct tevent_req *push_database_new_send(
824                         TALLOC_CTX *mem_ctx,
825                         struct tevent_context *ev,
826                         struct ctdb_client_context *client,
827                         uint32_t *pnn_list, int count,
828                         struct recdb_context *recdb,
829                         int max_size)
830 {
831         struct tevent_req *req, *subreq;
832         struct push_database_new_state *state;
833         struct ctdb_req_control request;
834         struct ctdb_pulldb_ext pulldb_ext;
835         char *filename;
836         off_t offset;
837
838         req = tevent_req_create(mem_ctx, &state,
839                                 struct push_database_new_state);
840         if (req == NULL) {
841                 return NULL;
842         }
843
844         state->ev = ev;
845         state->client = client;
846         state->recdb = recdb;
847         state->pnn_list = pnn_list;
848         state->count = count;
849
850         state->srvid = srvid_next();
851         state->dmaster = ctdb_client_pnn(client);
852         state->num_buffers_sent = 0;
853         state->num_records = 0;
854
855         filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
856         if (tevent_req_nomem(filename, req)) {
857                 return tevent_req_post(req, ev);
858         }
859
860         state->fd = open(filename, O_RDWR|O_CREAT, 0644);
861         if (state->fd == -1) {
862                 tevent_req_error(req, errno);
863                 return tevent_req_post(req, ev);
864         }
865         unlink(filename);
866         talloc_free(filename);
867
868         state->num_buffers = recdb_file(recdb, state, state->dmaster,
869                                         state->fd, max_size);
870         if (state->num_buffers == -1) {
871                 tevent_req_error(req, ENOMEM);
872                 return tevent_req_post(req, ev);
873         }
874
875         offset = lseek(state->fd, 0, SEEK_SET);
876         if (offset != 0) {
877                 tevent_req_error(req, EIO);
878                 return tevent_req_post(req, ev);
879         }
880
881         pulldb_ext.db_id = recdb_id(recdb);
882         pulldb_ext.srvid = state->srvid;
883
884         ctdb_req_control_db_push_start(&request, &pulldb_ext);
885         subreq = ctdb_client_control_multi_send(state, ev, client,
886                                                 pnn_list, count,
887                                                 TIMEOUT(), &request);
888         if (tevent_req_nomem(subreq, req)) {
889                 return tevent_req_post(req, ev);
890         }
891         tevent_req_set_callback(subreq, push_database_new_started, req);
892
893         return req;
894 }
895
896 static void push_database_new_started(struct tevent_req *subreq)
897 {
898         struct tevent_req *req = tevent_req_callback_data(
899                 subreq, struct tevent_req);
900         struct push_database_new_state *state = tevent_req_data(
901                 req, struct push_database_new_state);
902         int *err_list;
903         int ret;
904         bool status;
905
906         status = ctdb_client_control_multi_recv(subreq, &ret, state,
907                                                 &err_list, NULL);
908         TALLOC_FREE(subreq);
909         if (! status) {
910                 int ret2;
911                 uint32_t pnn;
912
913                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
914                                                        state->count,
915                                                        err_list, &pnn);
916                 if (ret2 != 0) {
917                         LOG("control DB_PUSH_START failed for db %s "
918                             "on node %u, ret=%d\n",
919                             recdb_name(state->recdb), pnn, ret2);
920                 } else {
921                         LOG("control DB_PUSH_START failed for db %s, ret=%d\n",
922                             recdb_name(state->recdb), ret);
923                 }
924                 talloc_free(err_list);
925
926                 tevent_req_error(req, ret);
927                 return;
928         }
929
930         push_database_new_send_msg(req);
931 }
932
933 static void push_database_new_send_msg(struct tevent_req *req)
934 {
935         struct push_database_new_state *state = tevent_req_data(
936                 req, struct push_database_new_state);
937         struct tevent_req *subreq;
938         struct ctdb_rec_buffer *recbuf;
939         struct ctdb_req_message message;
940         TDB_DATA data;
941         int ret;
942
943         if (state->num_buffers_sent == state->num_buffers) {
944                 struct ctdb_req_control request;
945
946                 ctdb_req_control_db_push_confirm(&request,
947                                                  recdb_id(state->recdb));
948                 subreq = ctdb_client_control_multi_send(state, state->ev,
949                                                         state->client,
950                                                         state->pnn_list,
951                                                         state->count,
952                                                         TIMEOUT(), &request);
953                 if (tevent_req_nomem(subreq, req)) {
954                         return;
955                 }
956                 tevent_req_set_callback(subreq, push_database_new_confirmed,
957                                         req);
958                 return;
959         }
960
961         ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
962         if (ret != 0) {
963                 tevent_req_error(req, ret);
964                 return;
965         }
966
967         data.dsize = ctdb_rec_buffer_len(recbuf);
968         data.dptr = talloc_size(state, data.dsize);
969         if (tevent_req_nomem(data.dptr, req)) {
970                 return;
971         }
972
973         ctdb_rec_buffer_push(recbuf, data.dptr);
974
975         message.srvid = state->srvid;
976         message.data.data = data;
977
978         LOG("Pushing buffer %d with %d records for %s\n",
979             state->num_buffers_sent, recbuf->count, recdb_name(state->recdb));
980
981         subreq = ctdb_client_message_multi_send(state, state->ev,
982                                                 state->client,
983                                                 state->pnn_list, state->count,
984                                                 &message);
985         if (tevent_req_nomem(subreq, req)) {
986                 return;
987         }
988         tevent_req_set_callback(subreq, push_database_new_send_done, req);
989
990         state->num_records += recbuf->count;
991
992         talloc_free(data.dptr);
993         talloc_free(recbuf);
994 }
995
996 static void push_database_new_send_done(struct tevent_req *subreq)
997 {
998         struct tevent_req *req = tevent_req_callback_data(
999                 subreq, struct tevent_req);
1000         struct push_database_new_state *state = tevent_req_data(
1001                 req, struct push_database_new_state);
1002         bool status;
1003         int ret;
1004
1005         status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
1006         TALLOC_FREE(subreq);
1007         if (! status) {
1008                 LOG("Sending recovery records failed for %s\n",
1009                     recdb_name(state->recdb));
1010                 tevent_req_error(req, ret);
1011                 return;
1012         }
1013
1014         state->num_buffers_sent += 1;
1015
1016         push_database_new_send_msg(req);
1017 }
1018
1019 static void push_database_new_confirmed(struct tevent_req *subreq)
1020 {
1021         struct tevent_req *req = tevent_req_callback_data(
1022                 subreq, struct tevent_req);
1023         struct push_database_new_state *state = tevent_req_data(
1024                 req, struct push_database_new_state);
1025         struct ctdb_reply_control **reply;
1026         int *err_list;
1027         bool status;
1028         int ret, i;
1029         uint32_t num_records;
1030
1031         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1032                                                 &err_list, &reply);
1033         TALLOC_FREE(subreq);
1034         if (! status) {
1035                 int ret2;
1036                 uint32_t pnn;
1037
1038                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1039                                                        state->count, err_list,
1040                                                        &pnn);
1041                 if (ret2 != 0) {
1042                         LOG("control DB_PUSH_CONFIRM failed for %s on node %u,"
1043                             " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1044                 } else {
1045                         LOG("control DB_PUSH_CONFIRM failed for %s, ret=%d\n",
1046                             recdb_name(state->recdb), ret);
1047                 }
1048                 tevent_req_error(req, ret);
1049                 return;
1050         }
1051
1052         for (i=0; i<state->count; i++) {
1053                 ret = ctdb_reply_control_db_push_confirm(reply[i],
1054                                                          &num_records);
1055                 if (ret != 0) {
1056                         tevent_req_error(req, EPROTO);
1057                         return;
1058                 }
1059
1060                 if (num_records != state->num_records) {
1061                         LOG("Node %u received %d of %d records for %s\n",
1062                             state->pnn_list[i], num_records,
1063                             state->num_records, recdb_name(state->recdb));
1064                         tevent_req_error(req, EPROTO);
1065                         return;
1066                 }
1067         }
1068
1069         talloc_free(reply);
1070
1071         LOG("Pushed %d records for db %s\n",
1072             state->num_records, recdb_name(state->recdb));
1073
1074         tevent_req_done(req);
1075 }
1076
1077 static bool push_database_new_recv(struct tevent_req *req, int *perr)
1078 {
1079         return generic_recv(req, perr);
1080 }
1081
1082 /*
1083  * wrapper for push_database_old and push_database_new
1084  */
1085
1086 struct push_database_state {
1087         bool old_done, new_done;
1088 };
1089
1090 static void push_database_old_done(struct tevent_req *subreq);
1091 static void push_database_new_done(struct tevent_req *subreq);
1092
1093 static struct tevent_req *push_database_send(
1094                         TALLOC_CTX *mem_ctx,
1095                         struct tevent_context *ev,
1096                         struct ctdb_client_context *client,
1097                         uint32_t *pnn_list, int count, uint32_t *caps,
1098                         struct ctdb_tunable_list *tun_list,
1099                         struct recdb_context *recdb)
1100 {
1101         struct tevent_req *req, *subreq;
1102         struct push_database_state *state;
1103         uint32_t *old_list, *new_list;
1104         int old_count, new_count;
1105         int i;
1106
1107         req = tevent_req_create(mem_ctx, &state, struct push_database_state);
1108         if (req == NULL) {
1109                 return NULL;
1110         }
1111
1112         state->old_done = false;
1113         state->new_done = false;
1114
1115         old_count = 0;
1116         new_count = 0;
1117         old_list = talloc_array(state, uint32_t, count);
1118         new_list = talloc_array(state, uint32_t, count);
1119         if (tevent_req_nomem(old_list, req) ||
1120             tevent_req_nomem(new_list,req)) {
1121                 return tevent_req_post(req, ev);
1122         }
1123
1124         for (i=0; i<count; i++) {
1125                 uint32_t pnn = pnn_list[i];
1126
1127                 if (caps[pnn] & CTDB_CAP_FRAGMENTED_CONTROLS) {
1128                         new_list[new_count] = pnn;
1129                         new_count += 1;
1130                 } else {
1131                         old_list[old_count] = pnn;
1132                         old_count += 1;
1133                 }
1134         }
1135
1136         if (old_count > 0) {
1137                 subreq = push_database_old_send(state, ev, client,
1138                                                 old_list, old_count, recdb);
1139                 if (tevent_req_nomem(subreq, req)) {
1140                         return tevent_req_post(req, ev);
1141                 }
1142                 tevent_req_set_callback(subreq, push_database_old_done, req);
1143         } else {
1144                 state->old_done = true;
1145         }
1146
1147         if (new_count > 0) {
1148                 subreq = push_database_new_send(state, ev, client,
1149                                                 new_list, new_count, recdb,
1150                                                 tun_list->rec_buffer_size_limit);
1151                 if (tevent_req_nomem(subreq, req)) {
1152                         return tevent_req_post(req, ev);
1153                 }
1154                 tevent_req_set_callback(subreq, push_database_new_done, req);
1155         } else {
1156                 state->new_done = true;
1157         }
1158
1159         return req;
1160 }
1161
1162 static void push_database_old_done(struct tevent_req *subreq)
1163 {
1164         struct tevent_req *req = tevent_req_callback_data(
1165                 subreq, struct tevent_req);
1166         struct push_database_state *state = tevent_req_data(
1167                 req, struct push_database_state);
1168         bool status;
1169         int ret;
1170
1171         status = push_database_old_recv(subreq, &ret);
1172         if (! status) {
1173                 tevent_req_error(req, ret);
1174                 return;
1175         }
1176
1177         state->old_done = true;
1178
1179         if (state->old_done && state->new_done) {
1180                 tevent_req_done(req);
1181         }
1182 }
1183
1184 static void push_database_new_done(struct tevent_req *subreq)
1185 {
1186         struct tevent_req *req = tevent_req_callback_data(
1187                 subreq, struct tevent_req);
1188         struct push_database_state *state = tevent_req_data(
1189                 req, struct push_database_state);
1190         bool status;
1191         int ret;
1192
1193         status = push_database_new_recv(subreq, &ret);
1194         if (! status) {
1195                 tevent_req_error(req, ret);
1196                 return;
1197         }
1198
1199         state->new_done = true;
1200
1201         if (state->old_done && state->new_done) {
1202                 tevent_req_done(req);
1203         }
1204 }
1205
1206 static bool push_database_recv(struct tevent_req *req, int *perr)
1207 {
1208         return generic_recv(req, perr);
1209 }
1210
1211 /*
1212  * Collect databases using highest sequence number
1213  */
1214
1215 struct collect_highseqnum_db_state {
1216         struct tevent_context *ev;
1217         struct ctdb_client_context *client;
1218         uint32_t *pnn_list;
1219         int count;
1220         uint32_t *caps;
1221         uint32_t *ban_credits;
1222         uint32_t db_id;
1223         struct recdb_context *recdb;
1224         uint32_t max_pnn;
1225 };
1226
1227 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
1228 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
1229
1230 static struct tevent_req *collect_highseqnum_db_send(
1231                         TALLOC_CTX *mem_ctx,
1232                         struct tevent_context *ev,
1233                         struct ctdb_client_context *client,
1234                         uint32_t *pnn_list, int count, uint32_t *caps,
1235                         uint32_t *ban_credits, uint32_t db_id,
1236                         struct recdb_context *recdb)
1237 {
1238         struct tevent_req *req, *subreq;
1239         struct collect_highseqnum_db_state *state;
1240         struct ctdb_req_control request;
1241
1242         req = tevent_req_create(mem_ctx, &state,
1243                                 struct collect_highseqnum_db_state);
1244         if (req == NULL) {
1245                 return NULL;
1246         }
1247
1248         state->ev = ev;
1249         state->client = client;
1250         state->pnn_list = pnn_list;
1251         state->count = count;
1252         state->caps = caps;
1253         state->ban_credits = ban_credits;
1254         state->db_id = db_id;
1255         state->recdb = recdb;
1256
1257         ctdb_req_control_get_db_seqnum(&request, db_id);
1258         subreq = ctdb_client_control_multi_send(mem_ctx, ev, client,
1259                                                 state->pnn_list, state->count,
1260                                                 TIMEOUT(), &request);
1261         if (tevent_req_nomem(subreq, req)) {
1262                 return tevent_req_post(req, ev);
1263         }
1264         tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
1265                                 req);
1266
1267         return req;
1268 }
1269
1270 static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
1271 {
1272         struct tevent_req *req = tevent_req_callback_data(
1273                 subreq, struct tevent_req);
1274         struct collect_highseqnum_db_state *state = tevent_req_data(
1275                 req, struct collect_highseqnum_db_state);
1276         struct ctdb_reply_control **reply;
1277         int *err_list;
1278         bool status;
1279         int ret, i;
1280         uint64_t seqnum, max_seqnum;
1281
1282         status = ctdb_client_control_multi_recv(subreq, &ret, state,
1283                                                 &err_list, &reply);
1284         TALLOC_FREE(subreq);
1285         if (! status) {
1286                 int ret2;
1287                 uint32_t pnn;
1288
1289                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1290                                                        state->count, err_list,
1291                                                        &pnn);
1292                 if (ret2 != 0) {
1293                         LOG("control GET_DB_SEQNUM failed for %s on node %u,"
1294                             " ret=%d\n", recdb_name(state->recdb), pnn, ret2);
1295                 } else {
1296                         LOG("control GET_DB_SEQNUM failed for %s, ret=%d\n",
1297                             recdb_name(state->recdb), ret);
1298                 }
1299                 tevent_req_error(req, ret);
1300                 return;
1301         }
1302
1303         max_seqnum = 0;
1304         state->max_pnn = state->pnn_list[0];
1305         for (i=0; i<state->count; i++) {
1306                 ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
1307                 if (ret != 0) {
1308                         tevent_req_error(req, EPROTO);
1309                         return;
1310                 }
1311
1312                 if (max_seqnum < seqnum) {
1313                         max_seqnum = seqnum;
1314                         state->max_pnn = state->pnn_list[i];
1315                 }
1316         }
1317
1318         talloc_free(reply);
1319
1320         LOG("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
1321             recdb_name(state->recdb), state->max_pnn, max_seqnum);
1322
1323         subreq = pull_database_send(state, state->ev, state->client,
1324                                     state->max_pnn,
1325                                     state->caps[state->max_pnn],
1326                                     state->recdb);
1327         if (tevent_req_nomem(subreq, req)) {
1328                 return;
1329         }
1330         tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
1331                                 req);
1332 }
1333
1334 static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
1335 {
1336         struct tevent_req *req = tevent_req_callback_data(
1337                 subreq, struct tevent_req);
1338         struct collect_highseqnum_db_state *state = tevent_req_data(
1339                 req, struct collect_highseqnum_db_state);
1340         int ret;
1341         bool status;
1342
1343         status = pull_database_recv(subreq, &ret);
1344         TALLOC_FREE(subreq);
1345         if (! status) {
1346                 state->ban_credits[state->max_pnn] += 1;
1347                 tevent_req_error(req, ret);
1348                 return;
1349         }
1350
1351         tevent_req_done(req);
1352 }
1353
1354 static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
1355 {
1356         return generic_recv(req, perr);
1357 }
1358
1359 /*
1360  * Collect all databases
1361  */
1362
1363 struct collect_all_db_state {
1364         struct tevent_context *ev;
1365         struct ctdb_client_context *client;
1366         uint32_t *pnn_list;
1367         int count;
1368         uint32_t *caps;
1369         uint32_t *ban_credits;
1370         uint32_t db_id;
1371         struct recdb_context *recdb;
1372         struct ctdb_pulldb pulldb;
1373         int index;
1374 };
1375
1376 static void collect_all_db_pulldb_done(struct tevent_req *subreq);
1377
1378 static struct tevent_req *collect_all_db_send(
1379                         TALLOC_CTX *mem_ctx,
1380                         struct tevent_context *ev,
1381                         struct ctdb_client_context *client,
1382                         uint32_t *pnn_list, int count, uint32_t *caps,
1383                         uint32_t *ban_credits, uint32_t db_id,
1384                         struct recdb_context *recdb)
1385 {
1386         struct tevent_req *req, *subreq;
1387         struct collect_all_db_state *state;
1388         uint32_t pnn;
1389
1390         req = tevent_req_create(mem_ctx, &state,
1391                                 struct collect_all_db_state);
1392         if (req == NULL) {
1393                 return NULL;
1394         }
1395
1396         state->ev = ev;
1397         state->client = client;
1398         state->pnn_list = pnn_list;
1399         state->count = count;
1400         state->caps = caps;
1401         state->db_id = db_id;
1402         state->recdb = recdb;
1403         state->index = 0;
1404
1405         pnn = state->pnn_list[state->index];
1406
1407         subreq = pull_database_send(state, ev, client, pnn, caps[pnn], recdb);
1408         if (tevent_req_nomem(subreq, req)) {
1409                 return tevent_req_post(req, ev);
1410         }
1411         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1412
1413         return req;
1414 }
1415
1416 static void collect_all_db_pulldb_done(struct tevent_req *subreq)
1417 {
1418         struct tevent_req *req = tevent_req_callback_data(
1419                 subreq, struct tevent_req);
1420         struct collect_all_db_state *state = tevent_req_data(
1421                 req, struct collect_all_db_state);
1422         uint32_t pnn;
1423         int ret;
1424         bool status;
1425
1426         status = pull_database_recv(subreq, &ret);
1427         TALLOC_FREE(subreq);
1428         if (! status) {
1429                 pnn = state->pnn_list[state->index];
1430                 state->ban_credits[pnn] += 1;
1431                 tevent_req_error(req, ret);
1432                 return;
1433         }
1434
1435         state->index += 1;
1436         if (state->index == state->count) {
1437                 tevent_req_done(req);
1438                 return;
1439         }
1440
1441         pnn = state->pnn_list[state->index];
1442         subreq = pull_database_send(state, state->ev, state->client,
1443                                     pnn, state->caps[pnn], state->recdb);
1444         if (tevent_req_nomem(subreq, req)) {
1445                 return;
1446         }
1447         tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
1448 }
1449
1450 static bool collect_all_db_recv(struct tevent_req *req, int *perr)
1451 {
1452         return generic_recv(req, perr);
1453 }
1454
1455
1456 /**
1457  * For each database do the following:
1458  *  - Get DB name
1459  *  - Get DB path
1460  *  - Freeze database on all nodes
1461  *  - Start transaction on all nodes
1462  *  - Collect database from all nodes
1463  *  - Wipe database on all nodes
1464  *  - Push database to all nodes
1465  *  - Commit transaction on all nodes
1466  *  - Thaw database on all nodes
1467  */
1468
1469 struct recover_db_state {
1470         struct tevent_context *ev;
1471         struct ctdb_client_context *client;
1472         struct ctdb_tunable_list *tun_list;
1473         uint32_t *pnn_list;
1474         int count;
1475         uint32_t *caps;
1476         uint32_t *ban_credits;
1477         uint32_t db_id;
1478         bool persistent;
1479
1480         uint32_t destnode;
1481         struct ctdb_transdb transdb;
1482
1483         const char *db_name, *db_path;
1484         struct recdb_context *recdb;
1485 };
1486
1487 static void recover_db_name_done(struct tevent_req *subreq);
1488 static void recover_db_path_done(struct tevent_req *subreq);
1489 static void recover_db_freeze_done(struct tevent_req *subreq);
1490 static void recover_db_transaction_started(struct tevent_req *subreq);
1491 static void recover_db_collect_done(struct tevent_req *subreq);
1492 static void recover_db_wipedb_done(struct tevent_req *subreq);
1493 static void recover_db_pushdb_done(struct tevent_req *subreq);
1494 static void recover_db_transaction_committed(struct tevent_req *subreq);
1495 static void recover_db_thaw_done(struct tevent_req *subreq);
1496
1497 static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
1498                                           struct tevent_context *ev,
1499                                           struct ctdb_client_context *client,
1500                                           struct ctdb_tunable_list *tun_list,
1501                                           uint32_t *pnn_list, int count,
1502                                           uint32_t *caps,
1503                                           uint32_t *ban_credits,
1504                                           uint32_t generation,
1505                                           uint32_t db_id, bool persistent)
1506 {
1507         struct tevent_req *req, *subreq;
1508         struct recover_db_state *state;
1509         struct ctdb_req_control request;
1510
1511         req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
1512         if (req == NULL) {
1513                 return NULL;
1514         }
1515
1516         state->ev = ev;
1517         state->client = client;
1518         state->tun_list = tun_list;
1519         state->pnn_list = pnn_list;
1520         state->count = count;
1521         state->caps = caps;
1522         state->ban_credits = ban_credits;
1523         state->db_id = db_id;
1524         state->persistent = persistent;
1525
1526         state->destnode = ctdb_client_pnn(client);
1527         state->transdb.db_id = db_id;
1528         state->transdb.tid = generation;
1529
1530         ctdb_req_control_get_dbname(&request, db_id);
1531         subreq = ctdb_client_control_send(state, ev, client, state->destnode,
1532                                           TIMEOUT(), &request);
1533         if (tevent_req_nomem(subreq, req)) {
1534                 return tevent_req_post(req, ev);
1535         }
1536         tevent_req_set_callback(subreq, recover_db_name_done, req);
1537
1538         return req;
1539 }
1540
1541 static void recover_db_name_done(struct tevent_req *subreq)
1542 {
1543         struct tevent_req *req = tevent_req_callback_data(
1544                 subreq, struct tevent_req);
1545         struct recover_db_state *state = tevent_req_data(
1546                 req, struct recover_db_state);
1547         struct ctdb_reply_control *reply;
1548         struct ctdb_req_control request;
1549         int ret;
1550         bool status;
1551
1552         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1553         TALLOC_FREE(subreq);
1554         if (! status) {
1555                 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1556                     state->db_id, ret);
1557                 tevent_req_error(req, ret);
1558                 return;
1559         }
1560
1561         ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
1562         if (ret != 0) {
1563                 LOG("control GET_DBNAME failed for db=0x%x, ret=%d\n",
1564                     state->db_id, ret);
1565                 tevent_req_error(req, EPROTO);
1566                 return;
1567         }
1568
1569         talloc_free(reply);
1570
1571         ctdb_req_control_getdbpath(&request, state->db_id);
1572         subreq = ctdb_client_control_send(state, state->ev, state->client,
1573                                           state->destnode, TIMEOUT(),
1574                                           &request);
1575         if (tevent_req_nomem(subreq, req)) {
1576                 return;
1577         }
1578         tevent_req_set_callback(subreq, recover_db_path_done, req);
1579 }
1580
1581 static void recover_db_path_done(struct tevent_req *subreq)
1582 {
1583         struct tevent_req *req = tevent_req_callback_data(
1584                 subreq, struct tevent_req);
1585         struct recover_db_state *state = tevent_req_data(
1586                 req, struct recover_db_state);
1587         struct ctdb_reply_control *reply;
1588         struct ctdb_req_control request;
1589         int ret;
1590         bool status;
1591
1592         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1593         TALLOC_FREE(subreq);
1594         if (! status) {
1595                 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1596                     state->db_name, ret);
1597                 tevent_req_error(req, ret);
1598                 return;
1599         }
1600
1601         ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
1602         if (ret != 0) {
1603                 LOG("control GETDBPATH failed for db %s, ret=%d\n",
1604                     state->db_name, ret);
1605                 tevent_req_error(req, EPROTO);
1606                 return;
1607         }
1608
1609         talloc_free(reply);
1610
1611         ctdb_req_control_db_freeze(&request, state->db_id);
1612         subreq = ctdb_client_control_multi_send(state, state->ev,
1613                                                 state->client,
1614                                                 state->pnn_list, state->count,
1615                                                 TIMEOUT(), &request);
1616         if (tevent_req_nomem(subreq, req)) {
1617                 return;
1618         }
1619         tevent_req_set_callback(subreq, recover_db_freeze_done, req);
1620 }
1621
1622 static void recover_db_freeze_done(struct tevent_req *subreq)
1623 {
1624         struct tevent_req *req = tevent_req_callback_data(
1625                 subreq, struct tevent_req);
1626         struct recover_db_state *state = tevent_req_data(
1627                 req, struct recover_db_state);
1628         struct ctdb_req_control request;
1629         int *err_list;
1630         int ret;
1631         bool status;
1632
1633         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1634                                                 NULL);
1635         TALLOC_FREE(subreq);
1636         if (! status) {
1637                 int ret2;
1638                 uint32_t pnn;
1639
1640                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1641                                                        state->count, err_list,
1642                                                        &pnn);
1643                 if (ret2 != 0) {
1644                         LOG("control FREEZE_DB failed for db %s on node %u,"
1645                             " ret=%d\n", state->db_name, pnn, ret2);
1646                 } else {
1647                         LOG("control FREEZE_DB failed for db %s, ret=%d\n",
1648                             state->db_name, ret);
1649                 }
1650                 tevent_req_error(req, ret);
1651                 return;
1652         }
1653
1654         ctdb_req_control_db_transaction_start(&request, &state->transdb);
1655         subreq = ctdb_client_control_multi_send(state, state->ev,
1656                                                 state->client,
1657                                                 state->pnn_list, state->count,
1658                                                 TIMEOUT(), &request);
1659         if (tevent_req_nomem(subreq, req)) {
1660                 return;
1661         }
1662         tevent_req_set_callback(subreq, recover_db_transaction_started, req);
1663 }
1664
1665 static void recover_db_transaction_started(struct tevent_req *subreq)
1666 {
1667         struct tevent_req *req = tevent_req_callback_data(
1668                 subreq, struct tevent_req);
1669         struct recover_db_state *state = tevent_req_data(
1670                 req, struct recover_db_state);
1671         int *err_list;
1672         int ret;
1673         bool status;
1674
1675         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1676                                                 NULL);
1677         TALLOC_FREE(subreq);
1678         if (! status) {
1679                 int ret2;
1680                 uint32_t pnn;
1681
1682                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1683                                                        state->count,
1684                                                        err_list, &pnn);
1685                 if (ret2 != 0) {
1686                         LOG("control TRANSACTION_DB failed for db=%s,"
1687                             " ret=%d\n", state->db_name, pnn, ret2);
1688                 } else {
1689                         LOG("control TRANSACTION_DB failed for db=%s,"
1690                             " ret=%d\n", state->db_name, ret);
1691                 }
1692                 tevent_req_error(req, ret);
1693                 return;
1694         }
1695
1696         state->recdb = recdb_create(state, state->db_id, state->db_name,
1697                                     state->db_path,
1698                                     state->tun_list->database_hash_size,
1699                                     state->persistent);
1700         if (tevent_req_nomem(state->recdb, req)) {
1701                 return;
1702         }
1703
1704         if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1705                 subreq = collect_highseqnum_db_send(
1706                                 state, state->ev, state->client,
1707                                 state->pnn_list, state->count, state->caps,
1708                                 state->ban_credits, state->db_id,
1709                                 state->recdb);
1710         } else {
1711                 subreq = collect_all_db_send(
1712                                 state, state->ev, state->client,
1713                                 state->pnn_list, state->count, state->caps,
1714                                 state->ban_credits, state->db_id,
1715                                 state->recdb);
1716         }
1717         if (tevent_req_nomem(subreq, req)) {
1718                 return;
1719         }
1720         tevent_req_set_callback(subreq, recover_db_collect_done, req);
1721 }
1722
1723 static void recover_db_collect_done(struct tevent_req *subreq)
1724 {
1725         struct tevent_req *req = tevent_req_callback_data(
1726                 subreq, struct tevent_req);
1727         struct recover_db_state *state = tevent_req_data(
1728                 req, struct recover_db_state);
1729         struct ctdb_req_control request;
1730         int ret;
1731         bool status;
1732
1733         if (state->persistent && state->tun_list->recover_pdb_by_seqnum != 0) {
1734                 status = collect_highseqnum_db_recv(subreq, &ret);
1735         } else {
1736                 status = collect_all_db_recv(subreq, &ret);
1737         }
1738         TALLOC_FREE(subreq);
1739         if (! status) {
1740                 tevent_req_error(req, ret);
1741                 return;
1742         }
1743
1744         ctdb_req_control_wipe_database(&request, &state->transdb);
1745         subreq = ctdb_client_control_multi_send(state, state->ev,
1746                                                 state->client,
1747                                                 state->pnn_list, state->count,
1748                                                 TIMEOUT(), &request);
1749         if (tevent_req_nomem(subreq, req)) {
1750                 return;
1751         }
1752         tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
1753 }
1754
1755 static void recover_db_wipedb_done(struct tevent_req *subreq)
1756 {
1757         struct tevent_req *req = tevent_req_callback_data(
1758                 subreq, struct tevent_req);
1759         struct recover_db_state *state = tevent_req_data(
1760                 req, struct recover_db_state);
1761         int *err_list;
1762         int ret;
1763         bool status;
1764
1765         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1766                                                 NULL);
1767         TALLOC_FREE(subreq);
1768         if (! status) {
1769                 int ret2;
1770                 uint32_t pnn;
1771
1772                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1773                                                        state->count,
1774                                                        err_list, &pnn);
1775                 if (ret2 != 0) {
1776                         LOG("control WIPEDB failed for db %s on node %u,"
1777                             " ret=%d\n", state->db_name, pnn, ret2);
1778                 } else {
1779                         LOG("control WIPEDB failed for db %s, ret=%d\n",
1780                             state->db_name, pnn, ret);
1781                 }
1782                 tevent_req_error(req, ret);
1783                 return;
1784         }
1785
1786         subreq = push_database_send(state, state->ev, state->client,
1787                                     state->pnn_list, state->count,
1788                                     state->caps, state->tun_list,
1789                                     state->recdb);
1790         if (tevent_req_nomem(subreq, req)) {
1791                 return;
1792         }
1793         tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
1794 }
1795
1796 static void recover_db_pushdb_done(struct tevent_req *subreq)
1797 {
1798         struct tevent_req *req = tevent_req_callback_data(
1799                 subreq, struct tevent_req);
1800         struct recover_db_state *state = tevent_req_data(
1801                 req, struct recover_db_state);
1802         struct ctdb_req_control request;
1803         int ret;
1804         bool status;
1805
1806         status = push_database_recv(subreq, &ret);
1807         TALLOC_FREE(subreq);
1808         if (! status) {
1809                 tevent_req_error(req, ret);
1810                 return;
1811         }
1812
1813         TALLOC_FREE(state->recdb);
1814
1815         ctdb_req_control_db_transaction_commit(&request, &state->transdb);
1816         subreq = ctdb_client_control_multi_send(state, state->ev,
1817                                                 state->client,
1818                                                 state->pnn_list, state->count,
1819                                                 TIMEOUT(), &request);
1820         if (tevent_req_nomem(subreq, req)) {
1821                 return;
1822         }
1823         tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
1824 }
1825
1826 static void recover_db_transaction_committed(struct tevent_req *subreq)
1827 {
1828         struct tevent_req *req = tevent_req_callback_data(
1829                 subreq, struct tevent_req);
1830         struct recover_db_state *state = tevent_req_data(
1831                 req, struct recover_db_state);
1832         struct ctdb_req_control request;
1833         int *err_list;
1834         int ret;
1835         bool status;
1836
1837         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1838                                                 NULL);
1839         TALLOC_FREE(subreq);
1840         if (! status) {
1841                 int ret2;
1842                 uint32_t pnn;
1843
1844                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1845                                                        state->count,
1846                                                        err_list, &pnn);
1847                 if (ret2 != 0) {
1848                         LOG("control DB_TRANSACTION_COMMIT failed for db %s"
1849                             " on node %u, ret=%d\n", state->db_name, pnn, ret2);
1850                 } else {
1851                         LOG("control DB_TRANSACTION_COMMIT failed for db %s,"
1852                             " ret=%d\n", state->db_name, ret);
1853                 }
1854                 tevent_req_error(req, ret);
1855                 return;
1856         }
1857
1858         ctdb_req_control_db_thaw(&request, state->db_id);
1859         subreq = ctdb_client_control_multi_send(state, state->ev,
1860                                                 state->client,
1861                                                 state->pnn_list, state->count,
1862                                                 TIMEOUT(), &request);
1863         if (tevent_req_nomem(subreq, req)) {
1864                 return;
1865         }
1866         tevent_req_set_callback(subreq, recover_db_thaw_done, req);
1867 }
1868
1869 static void recover_db_thaw_done(struct tevent_req *subreq)
1870 {
1871         struct tevent_req *req = tevent_req_callback_data(
1872                 subreq, struct tevent_req);
1873         struct recover_db_state *state = tevent_req_data(
1874                 req, struct recover_db_state);
1875         int *err_list;
1876         int ret;
1877         bool status;
1878
1879         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
1880                                                 NULL);
1881         TALLOC_FREE(subreq);
1882         if (! status) {
1883                 int ret2;
1884                 uint32_t pnn;
1885
1886                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
1887                                                        state->count,
1888                                                        err_list, &pnn);
1889                 if (ret2 != 0) {
1890                         LOG("control DB_THAW failed for db %s on node %u,"
1891                             " ret=%d\n", state->db_name, pnn, ret2);
1892                 } else {
1893                         LOG("control DB_THAW failed for db %s, ret=%d\n",
1894                             state->db_name, ret);
1895                 }
1896                 tevent_req_error(req, ret);
1897                 return;
1898         }
1899
1900         tevent_req_done(req);
1901 }
1902
1903 static bool recover_db_recv(struct tevent_req *req)
1904 {
1905         return generic_recv(req, NULL);
1906 }
1907
1908
1909 /*
1910  * Start database recovery for each database
1911  *
1912  * Try to recover each database 5 times before failing recovery.
1913  */
1914
1915 struct db_recovery_state {
1916         struct tevent_context *ev;
1917         struct ctdb_dbid_map *dbmap;
1918         int num_replies;
1919         int num_failed;
1920 };
1921
1922 struct db_recovery_one_state {
1923         struct tevent_req *req;
1924         struct ctdb_client_context *client;
1925         struct ctdb_dbid_map *dbmap;
1926         struct ctdb_tunable_list *tun_list;
1927         uint32_t *pnn_list;
1928         int count;
1929         uint32_t *caps;
1930         uint32_t *ban_credits;
1931         uint32_t generation;
1932         uint32_t db_id;
1933         bool persistent;
1934         int num_fails;
1935 };
1936
1937 static void db_recovery_one_done(struct tevent_req *subreq);
1938
1939 static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
1940                                            struct tevent_context *ev,
1941                                            struct ctdb_client_context *client,
1942                                            struct ctdb_dbid_map *dbmap,
1943                                            struct ctdb_tunable_list *tun_list,
1944                                            uint32_t *pnn_list, int count,
1945                                            uint32_t *caps,
1946                                            uint32_t *ban_credits,
1947                                            uint32_t generation)
1948 {
1949         struct tevent_req *req, *subreq;
1950         struct db_recovery_state *state;
1951         int i;
1952
1953         req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
1954         if (req == NULL) {
1955                 return NULL;
1956         }
1957
1958         state->ev = ev;
1959         state->dbmap = dbmap;
1960         state->num_replies = 0;
1961         state->num_failed = 0;
1962
1963         if (dbmap->num == 0) {
1964                 tevent_req_done(req);
1965                 return tevent_req_post(req, ev);
1966         }
1967
1968         for (i=0; i<dbmap->num; i++) {
1969                 struct db_recovery_one_state *substate;
1970
1971                 substate = talloc_zero(state, struct db_recovery_one_state);
1972                 if (tevent_req_nomem(substate, req)) {
1973                         return tevent_req_post(req, ev);
1974                 }
1975
1976                 substate->req = req;
1977                 substate->client = client;
1978                 substate->dbmap = dbmap;
1979                 substate->tun_list = tun_list;
1980                 substate->pnn_list = pnn_list;
1981                 substate->count = count;
1982                 substate->caps = caps;
1983                 substate->ban_credits = ban_credits;
1984                 substate->generation = generation;
1985                 substate->db_id = dbmap->dbs[i].db_id;
1986                 substate->persistent = dbmap->dbs[i].flags &
1987                                        CTDB_DB_FLAGS_PERSISTENT;
1988
1989                 subreq = recover_db_send(state, ev, client, tun_list,
1990                                          pnn_list, count, caps, ban_credits,
1991                                          generation, substate->db_id,
1992                                          substate->persistent);
1993                 if (tevent_req_nomem(subreq, req)) {
1994                         return tevent_req_post(req, ev);
1995                 }
1996                 tevent_req_set_callback(subreq, db_recovery_one_done,
1997                                         substate);
1998                 LOG("recover database 0x%08x\n", substate->db_id);
1999         }
2000
2001         return req;
2002 }
2003
2004 static void db_recovery_one_done(struct tevent_req *subreq)
2005 {
2006         struct db_recovery_one_state *substate = tevent_req_callback_data(
2007                 subreq, struct db_recovery_one_state);
2008         struct tevent_req *req = substate->req;
2009         struct db_recovery_state *state = tevent_req_data(
2010                 req, struct db_recovery_state);
2011         bool status;
2012
2013         status = recover_db_recv(subreq);
2014         TALLOC_FREE(subreq);
2015
2016         if (status) {
2017                 talloc_free(substate);
2018                 goto done;
2019         }
2020
2021         substate->num_fails += 1;
2022         if (substate->num_fails < NUM_RETRIES) {
2023                 subreq = recover_db_send(state, state->ev, substate->client,
2024                                          substate->tun_list,
2025                                          substate->pnn_list, substate->count,
2026                                          substate->caps, substate->ban_credits,
2027                                          substate->generation, substate->db_id,
2028                                          substate->persistent);
2029                 if (tevent_req_nomem(subreq, req)) {
2030                         goto failed;
2031                 }
2032                 tevent_req_set_callback(subreq, db_recovery_one_done, substate);
2033                 LOG("recover database 0x%08x, attempt %d\n", substate->db_id,
2034                     substate->num_fails+1);
2035                 return;
2036         }
2037
2038 failed:
2039         state->num_failed += 1;
2040
2041 done:
2042         state->num_replies += 1;
2043
2044         if (state->num_replies == state->dbmap->num) {
2045                 tevent_req_done(req);
2046         }
2047 }
2048
2049 static bool db_recovery_recv(struct tevent_req *req, int *count)
2050 {
2051         struct db_recovery_state *state = tevent_req_data(
2052                 req, struct db_recovery_state);
2053         int err;
2054
2055         if (tevent_req_is_unix_error(req, &err)) {
2056                 *count = 0;
2057                 return false;
2058         }
2059
2060         *count = state->num_replies - state->num_failed;
2061
2062         if (state->num_failed > 0) {
2063                 return false;
2064         }
2065
2066         return true;
2067 }
2068
2069
2070 /*
2071  * Run the parallel database recovery
2072  *
2073  * - Get tunables
2074  * - Get nodemap
2075  * - Get vnnmap
2076  * - Get capabilities from all nodes
2077  * - Get dbmap
2078  * - Set RECOVERY_ACTIVE
2079  * - Send START_RECOVERY
2080  * - Update vnnmap on all nodes
2081  * - Run database recovery
2082  * - Send END_RECOVERY
2083  * - Set RECOVERY_NORMAL
2084  */
2085
2086 struct recovery_state {
2087         struct tevent_context *ev;
2088         struct ctdb_client_context *client;
2089         uint32_t generation;
2090         uint32_t *pnn_list;
2091         int count;
2092         uint32_t destnode;
2093         struct ctdb_node_map *nodemap;
2094         uint32_t *caps;
2095         uint32_t *ban_credits;
2096         struct ctdb_tunable_list *tun_list;
2097         struct ctdb_vnn_map *vnnmap;
2098         struct ctdb_dbid_map *dbmap;
2099 };
2100
2101 static void recovery_tunables_done(struct tevent_req *subreq);
2102 static void recovery_nodemap_done(struct tevent_req *subreq);
2103 static void recovery_vnnmap_done(struct tevent_req *subreq);
2104 static void recovery_capabilities_done(struct tevent_req *subreq);
2105 static void recovery_dbmap_done(struct tevent_req *subreq);
2106 static void recovery_active_done(struct tevent_req *subreq);
2107 static void recovery_start_recovery_done(struct tevent_req *subreq);
2108 static void recovery_vnnmap_update_done(struct tevent_req *subreq);
2109 static void recovery_db_recovery_done(struct tevent_req *subreq);
2110 static void recovery_failed_done(struct tevent_req *subreq);
2111 static void recovery_normal_done(struct tevent_req *subreq);
2112 static void recovery_end_recovery_done(struct tevent_req *subreq);
2113
2114 static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
2115                                         struct tevent_context *ev,
2116                                         struct ctdb_client_context *client,
2117                                         uint32_t generation)
2118 {
2119         struct tevent_req *req, *subreq;
2120         struct recovery_state *state;
2121         struct ctdb_req_control request;
2122
2123         req = tevent_req_create(mem_ctx, &state, struct recovery_state);
2124         if (req == NULL) {
2125                 return NULL;
2126         }
2127
2128         state->ev = ev;
2129         state->client = client;
2130         state->generation = generation;
2131         state->destnode = ctdb_client_pnn(client);
2132
2133         ctdb_req_control_get_all_tunables(&request);
2134         subreq = ctdb_client_control_send(state, state->ev, state->client,
2135                                           state->destnode, TIMEOUT(),
2136                                           &request);
2137         if (tevent_req_nomem(subreq, req)) {
2138                 return tevent_req_post(req, ev);
2139         }
2140         tevent_req_set_callback(subreq, recovery_tunables_done, req);
2141
2142         return req;
2143 }
2144
2145 static void recovery_tunables_done(struct tevent_req *subreq)
2146 {
2147         struct tevent_req *req = tevent_req_callback_data(
2148                 subreq, struct tevent_req);
2149         struct recovery_state *state = tevent_req_data(
2150                 req, struct recovery_state);
2151         struct ctdb_reply_control *reply;
2152         struct ctdb_req_control request;
2153         int ret;
2154         bool status;
2155
2156         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2157         TALLOC_FREE(subreq);
2158         if (! status) {
2159                 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2160                 tevent_req_error(req, ret);
2161                 return;
2162         }
2163
2164         ret = ctdb_reply_control_get_all_tunables(reply, state,
2165                                                   &state->tun_list);
2166         if (ret != 0) {
2167                 LOG("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
2168                 tevent_req_error(req, EPROTO);
2169                 return;
2170         }
2171
2172         talloc_free(reply);
2173
2174         recover_timeout = state->tun_list->recover_timeout;
2175
2176         ctdb_req_control_get_nodemap(&request);
2177         subreq = ctdb_client_control_send(state, state->ev, state->client,
2178                                           state->destnode, TIMEOUT(),
2179                                           &request);
2180         if (tevent_req_nomem(subreq, req)) {
2181                 return;
2182         }
2183         tevent_req_set_callback(subreq, recovery_nodemap_done, req);
2184 }
2185
2186 static void recovery_nodemap_done(struct tevent_req *subreq)
2187 {
2188         struct tevent_req *req = tevent_req_callback_data(
2189                 subreq, struct tevent_req);
2190         struct recovery_state *state = tevent_req_data(
2191                 req, struct recovery_state);
2192         struct ctdb_reply_control *reply;
2193         struct ctdb_req_control request;
2194         bool status;
2195         int ret;
2196
2197         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2198         TALLOC_FREE(subreq);
2199         if (! status) {
2200                 LOG("control GET_NODEMAP failed to node %u, ret=%d\n",
2201                     state->destnode, ret);
2202                 tevent_req_error(req, ret);
2203                 return;
2204         }
2205
2206         ret = ctdb_reply_control_get_nodemap(reply, state, &state->nodemap);
2207         if (ret != 0) {
2208                 LOG("control GET_NODEMAP failed, ret=%d\n", ret);
2209                 tevent_req_error(req, ret);
2210                 return;
2211         }
2212
2213         state->count = list_of_active_nodes(state->nodemap, CTDB_UNKNOWN_PNN,
2214                                             state, &state->pnn_list);
2215         if (state->count <= 0) {
2216                 tevent_req_error(req, ENOMEM);
2217                 return;
2218         }
2219
2220         state->ban_credits = talloc_zero_array(state, uint32_t,
2221                                                state->nodemap->num);
2222         if (tevent_req_nomem(state->ban_credits, req)) {
2223                 return;
2224         }
2225
2226         ctdb_req_control_getvnnmap(&request);
2227         subreq = ctdb_client_control_send(state, state->ev, state->client,
2228                                           state->destnode, TIMEOUT(),
2229                                           &request);
2230         if (tevent_req_nomem(subreq, req)) {
2231                 return;
2232         }
2233         tevent_req_set_callback(subreq, recovery_vnnmap_done, req);
2234 }
2235
2236 static void recovery_vnnmap_done(struct tevent_req *subreq)
2237 {
2238         struct tevent_req *req = tevent_req_callback_data(
2239                 subreq, struct tevent_req);
2240         struct recovery_state *state = tevent_req_data(
2241                 req, struct recovery_state);
2242         struct ctdb_reply_control *reply;
2243         struct ctdb_req_control request;
2244         bool status;
2245         int ret;
2246
2247         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2248         TALLOC_FREE(subreq);
2249         if (! status) {
2250                 LOG("control GETVNNMAP failed to node %u, ret=%d\n",
2251                     state->destnode, ret);
2252                 tevent_req_error(req, ret);
2253                 return;
2254         }
2255
2256         ret = ctdb_reply_control_getvnnmap(reply, state, &state->vnnmap);
2257         if (ret != 0) {
2258                 LOG("control GETVNNMAP failed, ret=%d\n", ret);
2259                 tevent_req_error(req, ret);
2260                 return;
2261         }
2262
2263         ctdb_req_control_get_capabilities(&request);
2264         subreq = ctdb_client_control_multi_send(state, state->ev,
2265                                                 state->client,
2266                                                 state->pnn_list, state->count,
2267                                                 TIMEOUT(), &request);
2268         if (tevent_req_nomem(subreq, req)) {
2269                 return;
2270         }
2271         tevent_req_set_callback(subreq, recovery_capabilities_done, req);
2272 }
2273
2274 static void recovery_capabilities_done(struct tevent_req *subreq)
2275 {
2276         struct tevent_req *req = tevent_req_callback_data(
2277                 subreq, struct tevent_req);
2278         struct recovery_state *state = tevent_req_data(
2279                 req, struct recovery_state);
2280         struct ctdb_reply_control **reply;
2281         struct ctdb_req_control request;
2282         int *err_list;
2283         int ret, i;
2284         bool status;
2285
2286         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2287                                                 &reply);
2288         TALLOC_FREE(subreq);
2289         if (! status) {
2290                 int ret2;
2291                 uint32_t pnn;
2292
2293                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2294                                                        state->count,
2295                                                        err_list, &pnn);
2296                 if (ret2 != 0) {
2297                         LOG("control GET_CAPABILITIES failed on node %u,"
2298                             " ret=%d\n", pnn, ret2);
2299                 } else {
2300                         LOG("control GET_CAPABILITIES failed, ret=%d\n", ret);
2301                 }
2302                 tevent_req_error(req, ret);
2303                 return;
2304         }
2305
2306         /* Make the array size same as nodemap */
2307         state->caps = talloc_zero_array(state, uint32_t,
2308                                         state->nodemap->num);
2309         if (tevent_req_nomem(state->caps, req)) {
2310                 return;
2311         }
2312
2313         for (i=0; i<state->count; i++) {
2314                 uint32_t pnn;
2315
2316                 pnn = state->pnn_list[i];
2317                 ret = ctdb_reply_control_get_capabilities(reply[i],
2318                                                           &state->caps[pnn]);
2319                 if (ret != 0) {
2320                         LOG("control GET_CAPABILITIES failed on node %u\n", pnn);
2321                         tevent_req_error(req, EPROTO);
2322                         return;
2323                 }
2324         }
2325
2326         talloc_free(reply);
2327
2328         ctdb_req_control_get_dbmap(&request);
2329         subreq = ctdb_client_control_send(state, state->ev, state->client,
2330                                           state->destnode, TIMEOUT(),
2331                                           &request);
2332         if (tevent_req_nomem(subreq, req)) {
2333                 return;
2334         }
2335         tevent_req_set_callback(subreq, recovery_dbmap_done, req);
2336 }
2337
2338 static void recovery_dbmap_done(struct tevent_req *subreq)
2339 {
2340         struct tevent_req *req = tevent_req_callback_data(
2341                 subreq, struct tevent_req);
2342         struct recovery_state *state = tevent_req_data(
2343                 req, struct recovery_state);
2344         struct ctdb_reply_control *reply;
2345         struct ctdb_req_control request;
2346         int ret;
2347         bool status;
2348
2349         status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2350         TALLOC_FREE(subreq);
2351         if (! status) {
2352                 LOG("control GET_DBMAP failed to node %u, ret=%d\n",
2353                     state->destnode, ret);
2354                 tevent_req_error(req, ret);
2355                 return;
2356         }
2357
2358         ret = ctdb_reply_control_get_dbmap(reply, state, &state->dbmap);
2359         if (ret != 0) {
2360                 LOG("control GET_DBMAP failed, ret=%d\n", ret);
2361                 tevent_req_error(req, ret);
2362                 return;
2363         }
2364
2365         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
2366         subreq = ctdb_client_control_multi_send(state, state->ev,
2367                                                 state->client,
2368                                                 state->pnn_list, state->count,
2369                                                 TIMEOUT(), &request);
2370         if (tevent_req_nomem(subreq, req)) {
2371                 return;
2372         }
2373         tevent_req_set_callback(subreq, recovery_active_done, req);
2374 }
2375
2376 static void recovery_active_done(struct tevent_req *subreq)
2377 {
2378         struct tevent_req *req = tevent_req_callback_data(
2379                 subreq, struct tevent_req);
2380         struct recovery_state *state = tevent_req_data(
2381                 req, struct recovery_state);
2382         struct ctdb_req_control request;
2383         struct ctdb_vnn_map *vnnmap;
2384         int *err_list;
2385         int ret, count, i;
2386         bool status;
2387
2388         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2389                                                 NULL);
2390         TALLOC_FREE(subreq);
2391         if (! status) {
2392                 int ret2;
2393                 uint32_t pnn;
2394
2395                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2396                                                        state->count,
2397                                                        err_list, &pnn);
2398                 if (ret2 != 0) {
2399                         LOG("failed to set recovery mode to ACTIVE on node %u,"
2400                             " ret=%d\n", pnn, ret2);
2401                 } else {
2402                         LOG("failed to set recovery mode to ACTIVE, ret=%d\n",
2403                             ret);
2404                 }
2405                 tevent_req_error(req, ret);
2406                 return;
2407         }
2408
2409         LOG("set recovery mode to ACTIVE\n");
2410
2411         /* Calculate new VNNMAP */
2412         count = 0;
2413         for (i=0; i<state->nodemap->num; i++) {
2414                 if (state->nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
2415                         continue;
2416                 }
2417                 if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2418                         continue;
2419                 }
2420                 count += 1;
2421         }
2422
2423         if (count == 0) {
2424                 LOG("no active lmasters found. Adding recmaster anyway\n");
2425         }
2426
2427         vnnmap = talloc_zero(state, struct ctdb_vnn_map);
2428         if (tevent_req_nomem(vnnmap, req)) {
2429                 return;
2430         }
2431
2432         vnnmap->size = (count == 0 ? 1 : count);
2433         vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
2434         if (tevent_req_nomem(vnnmap->map, req)) {
2435                 return;
2436         }
2437
2438         if (count == 0) {
2439                 vnnmap->map[0] = state->destnode;
2440         } else {
2441                 count = 0;
2442                 for (i=0; i<state->nodemap->num; i++) {
2443                         if (state->nodemap->node[i].flags &
2444                             NODE_FLAGS_INACTIVE) {
2445                                 continue;
2446                         }
2447                         if (!(state->caps[i] & CTDB_CAP_LMASTER)) {
2448                                 continue;
2449                         }
2450
2451                         vnnmap->map[count] = state->nodemap->node[i].pnn;
2452                         count += 1;
2453                 }
2454         }
2455
2456         vnnmap->generation = state->generation;
2457
2458         talloc_free(state->vnnmap);
2459         state->vnnmap = vnnmap;
2460
2461         ctdb_req_control_start_recovery(&request);
2462         subreq = ctdb_client_control_multi_send(state, state->ev,
2463                                                 state->client,
2464                                                 state->pnn_list, state->count,
2465                                                 TIMEOUT(), &request);
2466         if (tevent_req_nomem(subreq, req)) {
2467                 return;
2468         }
2469         tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
2470 }
2471
2472 static void recovery_start_recovery_done(struct tevent_req *subreq)
2473 {
2474         struct tevent_req *req = tevent_req_callback_data(
2475                 subreq, struct tevent_req);
2476         struct recovery_state *state = tevent_req_data(
2477                 req, struct recovery_state);
2478         struct ctdb_req_control request;
2479         int *err_list;
2480         int ret;
2481         bool status;
2482
2483         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2484                                                 NULL);
2485         TALLOC_FREE(subreq);
2486         if (! status) {
2487                 int ret2;
2488                 uint32_t pnn;
2489
2490                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2491                                                        state->count,
2492                                                        err_list, &pnn);
2493                 if (ret2 != 0) {
2494                         LOG("failed to run start_recovery event on node %u,"
2495                             " ret=%d\n", pnn, ret2);
2496                 } else {
2497                         LOG("failed to run start_recovery event, ret=%d\n",
2498                             ret);
2499                 }
2500                 tevent_req_error(req, ret);
2501                 return;
2502         }
2503
2504         LOG("start_recovery event finished\n");
2505
2506         ctdb_req_control_setvnnmap(&request, state->vnnmap);
2507         subreq = ctdb_client_control_multi_send(state, state->ev,
2508                                                 state->client,
2509                                                 state->pnn_list, state->count,
2510                                                 TIMEOUT(), &request);
2511         if (tevent_req_nomem(subreq, req)) {
2512                 return;
2513         }
2514         tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
2515 }
2516
2517 static void recovery_vnnmap_update_done(struct tevent_req *subreq)
2518 {
2519         struct tevent_req *req = tevent_req_callback_data(
2520                 subreq, struct tevent_req);
2521         struct recovery_state *state = tevent_req_data(
2522                 req, struct recovery_state);
2523         int *err_list;
2524         int ret;
2525         bool status;
2526
2527         status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
2528                                                 NULL);
2529         TALLOC_FREE(subreq);
2530         if (! status) {
2531                 int ret2;
2532                 uint32_t pnn;
2533
2534                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2535                                                        state->count,
2536                                                        err_list, &pnn);
2537                 if (ret2 != 0) {
2538                         LOG("failed to update VNNMAP on node %u, ret=%d\n",
2539                             pnn, ret2);
2540                 } else {
2541                         LOG("failed to update VNNMAP, ret=%d\n", ret);
2542                 }
2543                 tevent_req_error(req, ret);
2544                 return;
2545         }
2546
2547         LOG("updated VNNMAP\n");
2548
2549         subreq = db_recovery_send(state, state->ev, state->client,
2550                                   state->dbmap, state->tun_list,
2551                                   state->pnn_list, state->count,
2552                                   state->caps, state->ban_credits,
2553                                   state->vnnmap->generation);
2554         if (tevent_req_nomem(subreq, req)) {
2555                 return;
2556         }
2557         tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
2558 }
2559
2560 static void recovery_db_recovery_done(struct tevent_req *subreq)
2561 {
2562         struct tevent_req *req = tevent_req_callback_data(
2563                 subreq, struct tevent_req);
2564         struct recovery_state *state = tevent_req_data(
2565                 req, struct recovery_state);
2566         struct ctdb_req_control request;
2567         bool status;
2568         int count;
2569
2570         status = db_recovery_recv(subreq, &count);
2571         TALLOC_FREE(subreq);
2572
2573         LOG("%d of %d databases recovered\n", count, state->dbmap->num);
2574
2575         if (! status) {
2576                 uint32_t max_pnn = CTDB_UNKNOWN_PNN, max_credits = 0;
2577                 int i;
2578
2579                 /* Bans are not enabled */
2580                 if (state->tun_list->enable_bans == 0) {
2581                         tevent_req_error(req, EIO);
2582                         return;
2583                 }
2584
2585                 for (i=0; i<state->count; i++) {
2586                         uint32_t pnn;
2587                         pnn = state->pnn_list[i];
2588                         if (state->ban_credits[pnn] > max_credits) {
2589                                 max_pnn = pnn;
2590                                 max_credits = state->ban_credits[pnn];
2591                         }
2592                 }
2593
2594                 /* If pulling database fails multiple times */
2595                 if (max_credits >= NUM_RETRIES) {
2596                         struct ctdb_req_message message;
2597
2598                         LOG("Assigning banning credits to node %u\n", max_pnn);
2599
2600                         message.srvid = CTDB_SRVID_BANNING;
2601                         message.data.pnn = max_pnn;
2602
2603                         subreq = ctdb_client_message_send(
2604                                         state, state->ev, state->client,
2605                                         ctdb_client_pnn(state->client),
2606                                         &message);
2607                         if (tevent_req_nomem(subreq, req)) {
2608                                 return;
2609                         }
2610                         tevent_req_set_callback(subreq, recovery_failed_done,
2611                                                 req);
2612                 }
2613                 return;
2614         }
2615
2616         ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
2617         subreq = ctdb_client_control_multi_send(state, state->ev,
2618                                                 state->client,
2619                                                 state->pnn_list, state->count,
2620                                                 TIMEOUT(), &request);
2621         if (tevent_req_nomem(subreq, req)) {
2622                 return;
2623         }
2624         tevent_req_set_callback(subreq, recovery_normal_done, req);
2625 }
2626
2627 static void recovery_failed_done(struct tevent_req *subreq)
2628 {
2629         struct tevent_req *req = tevent_req_callback_data(
2630                 subreq, struct tevent_req);
2631         int ret;
2632         bool status;
2633
2634         status = ctdb_client_message_recv(subreq, &ret);
2635         TALLOC_FREE(subreq);
2636         if (! status) {
2637                 LOG("failed to assign banning credits, ret=%d\n", ret);
2638         }
2639
2640         tevent_req_error(req, EIO);
2641 }
2642
2643 static void recovery_normal_done(struct tevent_req *subreq)
2644 {
2645         struct tevent_req *req = tevent_req_callback_data(
2646                 subreq, struct tevent_req);
2647         struct recovery_state *state = tevent_req_data(
2648                 req, struct recovery_state);
2649         struct ctdb_req_control request;
2650         int *err_list;
2651         int ret;
2652         bool status;
2653
2654         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2655                                                 NULL);
2656         TALLOC_FREE(subreq);
2657         if (! status) {
2658                 int ret2;
2659                 uint32_t pnn;
2660
2661                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2662                                                        state->count,
2663                                                        err_list, &pnn);
2664                 if (ret2 != 0) {
2665                         LOG("failed to set recovery mode to NORMAL on node %u,"
2666                             " ret=%d\n", pnn, ret2);
2667                 } else {
2668                         LOG("failed to set recovery mode to NORMAL, ret=%d\n",
2669                             ret);
2670                 }
2671                 tevent_req_error(req, ret);
2672                 return;
2673         }
2674
2675         LOG("set recovery mode to NORMAL\n");
2676
2677         ctdb_req_control_end_recovery(&request);
2678         subreq = ctdb_client_control_multi_send(state, state->ev,
2679                                                 state->client,
2680                                                 state->pnn_list, state->count,
2681                                                 TIMEOUT(), &request);
2682         if (tevent_req_nomem(subreq, req)) {
2683                 return;
2684         }
2685         tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
2686 }
2687
2688 static void recovery_end_recovery_done(struct tevent_req *subreq)
2689 {
2690         struct tevent_req *req = tevent_req_callback_data(
2691                 subreq, struct tevent_req);
2692         struct recovery_state *state = tevent_req_data(
2693                 req, struct recovery_state);
2694         int *err_list;
2695         int ret;
2696         bool status;
2697
2698         status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
2699                                                 NULL);
2700         TALLOC_FREE(subreq);
2701         if (! status) {
2702                 int ret2;
2703                 uint32_t pnn;
2704
2705                 ret2 = ctdb_client_control_multi_error(state->pnn_list,
2706                                                        state->count,
2707                                                        err_list, &pnn);
2708                 if (ret2 != 0) {
2709                         LOG("failed to run recovered event on node %u,"
2710                             " ret=%d\n", pnn, ret2);
2711                 } else {
2712                         LOG("failed to run recovered event, ret=%d\n", ret);
2713                 }
2714                 tevent_req_error(req, ret);
2715                 return;
2716         }
2717
2718         LOG("recovered event finished\n");
2719
2720         tevent_req_done(req);
2721 }
2722
2723 static void recovery_recv(struct tevent_req *req, int *perr)
2724 {
2725         generic_recv(req, perr);
2726 }
2727
2728 static void usage(const char *progname)
2729 {
2730         fprintf(stderr, "\nUsage: %s <log-fd> <output-fd> <ctdb-socket-path> <generation>\n",
2731                 progname);
2732 }
2733
2734
2735 /*
2736  * Arguments - log fd, write fd, socket path, generation
2737  */
2738 int main(int argc, char *argv[])
2739 {
2740         int log_fd, write_fd;
2741         const char *sockpath;
2742         TALLOC_CTX *mem_ctx;
2743         struct tevent_context *ev;
2744         struct ctdb_client_context *client;
2745         int ret;
2746         struct tevent_req *req;
2747         uint32_t generation;
2748
2749         if (argc != 5) {
2750                 usage(argv[0]);
2751                 exit(1);
2752         }
2753
2754         log_fd = atoi(argv[1]);
2755         if (log_fd != STDOUT_FILENO && log_fd != STDERR_FILENO) {
2756                 close(STDOUT_FILENO);
2757                 close(STDERR_FILENO);
2758                 dup2(log_fd, STDOUT_FILENO);
2759                 dup2(log_fd, STDERR_FILENO);
2760         }
2761         close(log_fd);
2762
2763         write_fd = atoi(argv[2]);
2764         sockpath = argv[3];
2765         generation = (uint32_t)strtoul(argv[4], NULL, 0);
2766
2767         mem_ctx = talloc_new(NULL);
2768         if (mem_ctx == NULL) {
2769                 LOG("talloc_new() failed\n");
2770                 goto failed;
2771         }
2772
2773         ev = tevent_context_init(mem_ctx);
2774         if (ev == NULL) {
2775                 LOG("tevent_context_init() failed\n");
2776                 goto failed;
2777         }
2778
2779         ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
2780         if (ret != 0) {
2781                 LOG("ctdb_client_init() failed, ret=%d\n", ret);
2782                 goto failed;
2783         }
2784
2785         req = recovery_send(mem_ctx, ev, client, generation);
2786         if (req == NULL) {
2787                 LOG("database_recover_send() failed\n");
2788                 goto failed;
2789         }
2790
2791         if (! tevent_req_poll(req, ev)) {
2792                 LOG("tevent_req_poll() failed\n");
2793                 goto failed;
2794         }
2795
2796         recovery_recv(req, &ret);
2797         TALLOC_FREE(req);
2798         if (ret != 0) {
2799                 LOG("database recovery failed, ret=%d\n", ret);
2800                 goto failed;
2801         }
2802
2803         sys_write(write_fd, &ret, sizeof(ret));
2804         return 0;
2805
2806 failed:
2807         talloc_free(mem_ctx);
2808         return 1;
2809 }