1d63526e70d09c0c4b10dc9d94aa231bb6c7a8ee
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/util_process.h"
36
37 #include "ctdb_private.h"
38 #include "ctdb_client.h"
39
40 #include "common/system.h"
41 #include "common/cmdline.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45
46 /* List of SRVID requests that need to be processed */
47 struct srvid_list {
48         struct srvid_list *next, *prev;
49         struct ctdb_srvid_message *request;
50 };
51
52 struct srvid_requests {
53         struct srvid_list *requests;
54 };
55
56 static void srvid_request_reply(struct ctdb_context *ctdb,
57                                 struct ctdb_srvid_message *request,
58                                 TDB_DATA result)
59 {
60         /* Someone that sent srvid==0 does not want a reply */
61         if (request->srvid == 0) {
62                 talloc_free(request);
63                 return;
64         }
65
66         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
67                                      result) == 0) {
68                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
69                                   (unsigned)request->pnn,
70                                   (unsigned long long)request->srvid));
71         } else {
72                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
73                                  (unsigned)request->pnn,
74                                  (unsigned long long)request->srvid));
75         }
76
77         talloc_free(request);
78 }
79
80 static void srvid_requests_reply(struct ctdb_context *ctdb,
81                                  struct srvid_requests **requests,
82                                  TDB_DATA result)
83 {
84         struct srvid_list *r;
85
86         for (r = (*requests)->requests; r != NULL; r = r->next) {
87                 srvid_request_reply(ctdb, r->request, result);
88         }
89
90         /* Free the list structure... */
91         TALLOC_FREE(*requests);
92 }
93
94 static void srvid_request_add(struct ctdb_context *ctdb,
95                               struct srvid_requests **requests,
96                               struct ctdb_srvid_message *request)
97 {
98         struct srvid_list *t;
99         int32_t ret;
100         TDB_DATA result;
101
102         if (*requests == NULL) {
103                 *requests = talloc_zero(ctdb, struct srvid_requests);
104                 if (*requests == NULL) {
105                         goto nomem;
106                 }
107         }
108
109         t = talloc_zero(*requests, struct srvid_list);
110         if (t == NULL) {
111                 /* If *requests was just allocated above then free it */
112                 if ((*requests)->requests == NULL) {
113                         TALLOC_FREE(*requests);
114                 }
115                 goto nomem;
116         }
117
118         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
119         DLIST_ADD((*requests)->requests, t);
120
121         return;
122
123 nomem:
124         /* Failed to add the request to the list.  Send a fail. */
125         DEBUG(DEBUG_ERR, (__location__
126                           " Out of memory, failed to queue SRVID request\n"));
127         ret = -ENOMEM;
128         result.dsize = sizeof(ret);
129         result.dptr = (uint8_t *)&ret;
130         srvid_request_reply(ctdb, request, result);
131 }
132
133 /* An abstraction to allow an operation (takeover runs, recoveries,
134  * ...) to be disabled for a given timeout */
135 struct ctdb_op_state {
136         struct tevent_timer *timer;
137         bool in_progress;
138         const char *name;
139 };
140
141 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
142 {
143         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
144
145         if (state != NULL) {
146                 state->in_progress = false;
147                 state->name = name;
148         }
149
150         return state;
151 }
152
153 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
154 {
155         return state->timer != NULL;
156 }
157
158 static bool ctdb_op_begin(struct ctdb_op_state *state)
159 {
160         if (ctdb_op_is_disabled(state)) {
161                 DEBUG(DEBUG_NOTICE,
162                       ("Unable to begin - %s are disabled\n", state->name));
163                 return false;
164         }
165
166         state->in_progress = true;
167         return true;
168 }
169
170 static bool ctdb_op_end(struct ctdb_op_state *state)
171 {
172         return state->in_progress = false;
173 }
174
175 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
176 {
177         return state->in_progress;
178 }
179
180 static void ctdb_op_enable(struct ctdb_op_state *state)
181 {
182         TALLOC_FREE(state->timer);
183 }
184
185 static void ctdb_op_timeout_handler(struct tevent_context *ev,
186                                     struct tevent_timer *te,
187                                     struct timeval yt, void *p)
188 {
189         struct ctdb_op_state *state =
190                 talloc_get_type(p, struct ctdb_op_state);
191
192         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
193         ctdb_op_enable(state);
194 }
195
196 static int ctdb_op_disable(struct ctdb_op_state *state,
197                            struct tevent_context *ev,
198                            uint32_t timeout)
199 {
200         if (timeout == 0) {
201                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
202                 ctdb_op_enable(state);
203                 return 0;
204         }
205
206         if (state->in_progress) {
207                 DEBUG(DEBUG_ERR,
208                       ("Unable to disable %s - in progress\n", state->name));
209                 return -EAGAIN;
210         }
211
212         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
213                             state->name, timeout));
214
215         /* Clear any old timers */
216         talloc_free(state->timer);
217
218         /* Arrange for the timeout to occur */
219         state->timer = tevent_add_timer(ev, state,
220                                         timeval_current_ofs(timeout, 0),
221                                         ctdb_op_timeout_handler, state);
222         if (state->timer == NULL) {
223                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
224                 return -ENOMEM;
225         }
226
227         return 0;
228 }
229
230 struct ctdb_banning_state {
231         uint32_t count;
232         struct timeval last_reported_time;
233 };
234
235 /*
236   private state of recovery daemon
237  */
238 struct ctdb_recoverd {
239         struct ctdb_context *ctdb;
240         uint32_t recmaster;
241         uint32_t last_culprit_node;
242         struct ctdb_node_map_old *nodemap;
243         struct timeval priority_time;
244         bool need_takeover_run;
245         bool need_recovery;
246         uint32_t node_flags;
247         struct tevent_timer *send_election_te;
248         struct tevent_timer *election_timeout;
249         struct srvid_requests *reallocate_requests;
250         struct ctdb_op_state *takeover_run;
251         struct ctdb_op_state *recovery;
252         struct ctdb_iface_list_old *ifaces;
253         uint32_t *force_rebalance_nodes;
254         struct ctdb_node_capabilities *caps;
255 };
256
257 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
258 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
259
260 static void ctdb_restart_recd(struct tevent_context *ev,
261                               struct tevent_timer *te, struct timeval t,
262                               void *private_data);
263
264 /*
265   ban a node for a period of time
266  */
267 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
268 {
269         int ret;
270         struct ctdb_context *ctdb = rec->ctdb;
271         struct ctdb_ban_state bantime;
272
273         if (!ctdb_validate_pnn(ctdb, pnn)) {
274                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
275                 return;
276         }
277
278         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
279
280         bantime.pnn  = pnn;
281         bantime.time = ban_time;
282
283         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
284         if (ret != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
286                 return;
287         }
288
289 }
290
291 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
292
293
294 /*
295   remember the trouble maker
296  */
297 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
298 {
299         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
300         struct ctdb_banning_state *ban_state;
301
302         if (culprit > ctdb->num_nodes) {
303                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
304                 return;
305         }
306
307         /* If we are banned or stopped, do not set other nodes as culprits */
308         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
309                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
310                 return;
311         }
312
313         if (ctdb->nodes[culprit]->ban_state == NULL) {
314                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
315                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
316
317                 
318         }
319         ban_state = ctdb->nodes[culprit]->ban_state;
320         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
321                 /* this was the first time in a long while this node
322                    misbehaved so we will forgive any old transgressions.
323                 */
324                 ban_state->count = 0;
325         }
326
327         ban_state->count += count;
328         ban_state->last_reported_time = timeval_current();
329         rec->last_culprit_node = culprit;
330 }
331
332 /*
333   remember the trouble maker
334  */
335 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
336 {
337         ctdb_set_culprit_count(rec, culprit, 1);
338 }
339
340
341 /* this callback is called for every node that failed to execute the
342    recovered event
343 */
344 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
345 {
346         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
347
348         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
349
350         ctdb_set_culprit(rec, node_pnn);
351 }
352
353 /*
354   run the "recovered" eventscript on all nodes
355  */
356 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
357 {
358         TALLOC_CTX *tmp_ctx;
359         uint32_t *nodes;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(ctdb);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
366         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
367                                         nodes, 0,
368                                         CONTROL_TIMEOUT(), false, tdb_null,
369                                         NULL, recovered_fail_callback,
370                                         rec) != 0) {
371                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
372
373                 talloc_free(tmp_ctx);
374                 return -1;
375         }
376
377         talloc_free(tmp_ctx);
378         return 0;
379 }
380
381 /* this callback is called for every node that failed to execute the
382    start recovery event
383 */
384 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
385 {
386         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
387
388         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
389
390         ctdb_set_culprit(rec, node_pnn);
391 }
392
393 /*
394   run the "startrecovery" eventscript on all nodes
395  */
396 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
397 {
398         TALLOC_CTX *tmp_ctx;
399         uint32_t *nodes;
400         struct ctdb_context *ctdb = rec->ctdb;
401
402         tmp_ctx = talloc_new(ctdb);
403         CTDB_NO_MEMORY(ctdb, tmp_ctx);
404
405         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
406         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
407                                         nodes, 0,
408                                         CONTROL_TIMEOUT(), false, tdb_null,
409                                         NULL,
410                                         startrecovery_fail_callback,
411                                         rec) != 0) {
412                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
413                 talloc_free(tmp_ctx);
414                 return -1;
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   Retrieve capabilities from all connected nodes
423  */
424 static int update_capabilities(struct ctdb_recoverd *rec,
425                                struct ctdb_node_map_old *nodemap)
426 {
427         uint32_t *capp;
428         TALLOC_CTX *tmp_ctx;
429         struct ctdb_node_capabilities *caps;
430         struct ctdb_context *ctdb = rec->ctdb;
431
432         tmp_ctx = talloc_new(rec);
433         CTDB_NO_MEMORY(ctdb, tmp_ctx);
434
435         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
436                                      CONTROL_TIMEOUT(), nodemap);
437
438         if (caps == NULL) {
439                 DEBUG(DEBUG_ERR,
440                       (__location__ " Failed to get node capabilities\n"));
441                 talloc_free(tmp_ctx);
442                 return -1;
443         }
444
445         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
446         if (capp == NULL) {
447                 DEBUG(DEBUG_ERR,
448                       (__location__
449                        " Capabilities don't include current node.\n"));
450                 talloc_free(tmp_ctx);
451                 return -1;
452         }
453         ctdb->capabilities = *capp;
454
455         TALLOC_FREE(rec->caps);
456         rec->caps = talloc_steal(rec, caps);
457
458         talloc_free(tmp_ctx);
459         return 0;
460 }
461
462 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
463 {
464         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
465
466         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
467         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
468 }
469
470 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
471 {
472         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
473
474         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
475         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
476 }
477
478 /*
479   change recovery mode on all nodes
480  */
481 static int set_recovery_mode(struct ctdb_context *ctdb,
482                              struct ctdb_recoverd *rec,
483                              struct ctdb_node_map_old *nodemap,
484                              uint32_t rec_mode, bool freeze)
485 {
486         TDB_DATA data;
487         uint32_t *nodes;
488         TALLOC_CTX *tmp_ctx;
489
490         tmp_ctx = talloc_new(ctdb);
491         CTDB_NO_MEMORY(ctdb, tmp_ctx);
492
493         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
494
495         data.dsize = sizeof(uint32_t);
496         data.dptr = (unsigned char *)&rec_mode;
497
498         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
499                                         nodes, 0,
500                                         CONTROL_TIMEOUT(),
501                                         false, data,
502                                         NULL, NULL,
503                                         NULL) != 0) {
504                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
505                 talloc_free(tmp_ctx);
506                 return -1;
507         }
508
509         /* freeze all nodes */
510         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
511                 int i;
512
513                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
514                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
515                                                 nodes, i,
516                                                 CONTROL_TIMEOUT(),
517                                                 false, tdb_null,
518                                                 NULL,
519                                                 set_recmode_fail_callback,
520                                                 rec) != 0) {
521                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
522                                 talloc_free(tmp_ctx);
523                                 return -1;
524                         }
525                 }
526         }
527
528         talloc_free(tmp_ctx);
529         return 0;
530 }
531
532 /* update all remote nodes to use the same db priority that we have
533    this can fail if the remove node has not yet been upgraded to 
534    support this function, so we always return success and never fail
535    a recovery if this call fails.
536 */
537 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
538         struct ctdb_node_map_old *nodemap, 
539         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
540 {
541         int db;
542
543         /* step through all local databases */
544         for (db=0; db<dbmap->num;db++) {
545                 struct ctdb_db_priority db_prio;
546                 int ret;
547
548                 db_prio.db_id     = dbmap->dbs[db].db_id;
549                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
550                 if (ret != 0) {
551                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
552                         continue;
553                 }
554
555                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
556
557                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
558                                                 CTDB_CURRENT_NODE, &db_prio);
559                 if (ret != 0) {
560                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
561                                          db_prio.db_id));
562                 }
563         }
564
565         return 0;
566 }                       
567
568 /*
569   ensure all other nodes have attached to any databases that we have
570  */
571 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
572                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
573 {
574         int i, j, db, ret;
575         struct ctdb_dbid_map_old *remote_dbmap;
576
577         /* verify that all other nodes have all our databases */
578         for (j=0; j<nodemap->num; j++) {
579                 /* we don't need to ourself ourselves */
580                 if (nodemap->nodes[j].pnn == pnn) {
581                         continue;
582                 }
583                 /* don't check nodes that are unavailable */
584                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
585                         continue;
586                 }
587
588                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
589                                          mem_ctx, &remote_dbmap);
590                 if (ret != 0) {
591                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
592                         return -1;
593                 }
594
595                 /* step through all local databases */
596                 for (db=0; db<dbmap->num;db++) {
597                         const char *name;
598
599
600                         for (i=0;i<remote_dbmap->num;i++) {
601                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
602                                         break;
603                                 }
604                         }
605                         /* the remote node already have this database */
606                         if (i!=remote_dbmap->num) {
607                                 continue;
608                         }
609                         /* ok so we need to create this database */
610                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
611                                                   dbmap->dbs[db].db_id, mem_ctx,
612                                                   &name);
613                         if (ret != 0) {
614                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
615                                 return -1;
616                         }
617                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
618                                                  nodemap->nodes[j].pnn,
619                                                  mem_ctx, name,
620                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
621                         if (ret != 0) {
622                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
623                                 return -1;
624                         }
625                 }
626         }
627
628         return 0;
629 }
630
631
632 /*
633   ensure we are attached to any databases that anyone else is attached to
634  */
635 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
636                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
637 {
638         int i, j, db, ret;
639         struct ctdb_dbid_map_old *remote_dbmap;
640
641         /* verify that we have all database any other node has */
642         for (j=0; j<nodemap->num; j++) {
643                 /* we don't need to ourself ourselves */
644                 if (nodemap->nodes[j].pnn == pnn) {
645                         continue;
646                 }
647                 /* don't check nodes that are unavailable */
648                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
649                         continue;
650                 }
651
652                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
653                                          mem_ctx, &remote_dbmap);
654                 if (ret != 0) {
655                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
656                         return -1;
657                 }
658
659                 /* step through all databases on the remote node */
660                 for (db=0; db<remote_dbmap->num;db++) {
661                         const char *name;
662
663                         for (i=0;i<(*dbmap)->num;i++) {
664                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
665                                         break;
666                                 }
667                         }
668                         /* we already have this db locally */
669                         if (i!=(*dbmap)->num) {
670                                 continue;
671                         }
672                         /* ok so we need to create this database and
673                            rebuild dbmap
674                          */
675                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
676                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
677                         if (ret != 0) {
678                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
679                                           nodemap->nodes[j].pnn));
680                                 return -1;
681                         }
682                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
683                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
684                         if (ret != 0) {
685                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
686                                 return -1;
687                         }
688                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
689                         if (ret != 0) {
690                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
691                                 return -1;
692                         }
693                 }
694         }
695
696         return 0;
697 }
698
699
700 /*
701   pull the remote database contents from one node into the recdb
702  */
703 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
704                                     struct tdb_wrap *recdb, uint32_t dbid)
705 {
706         int ret;
707         TDB_DATA outdata;
708         struct ctdb_marshall_buffer *reply;
709         struct ctdb_rec_data_old *recdata;
710         int i;
711         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
712
713         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
714                                CONTROL_TIMEOUT(), &outdata);
715         if (ret != 0) {
716                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
717                 talloc_free(tmp_ctx);
718                 return -1;
719         }
720
721         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
722
723         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
724                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
725                 talloc_free(tmp_ctx);
726                 return -1;
727         }
728
729         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
730
731         for (i=0;
732              i<reply->count;
733              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
734                 TDB_DATA key, data;
735                 struct ctdb_ltdb_header *hdr;
736                 TDB_DATA existing;
737
738                 key.dptr = &recdata->data[0];
739                 key.dsize = recdata->keylen;
740                 data.dptr = &recdata->data[key.dsize];
741                 data.dsize = recdata->datalen;
742
743                 hdr = (struct ctdb_ltdb_header *)data.dptr;
744
745                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
746                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
747                         talloc_free(tmp_ctx);
748                         return -1;
749                 }
750
751                 /* fetch the existing record, if any */
752                 existing = tdb_fetch(recdb->tdb, key);
753
754                 if (existing.dptr != NULL) {
755                         struct ctdb_ltdb_header header;
756                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
757                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
758                                          (unsigned)existing.dsize, srcnode));
759                                 free(existing.dptr);
760                                 talloc_free(tmp_ctx);
761                                 return -1;
762                         }
763                         header = *(struct ctdb_ltdb_header *)existing.dptr;
764                         free(existing.dptr);
765                         if (!(header.rsn < hdr->rsn ||
766                               (header.dmaster != ctdb_get_pnn(ctdb) &&
767                                header.rsn == hdr->rsn))) {
768                                 continue;
769                         }
770                 }
771
772                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
773                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
774                         talloc_free(tmp_ctx);
775                         return -1;
776                 }
777         }
778
779         talloc_free(tmp_ctx);
780
781         return 0;
782 }
783
784
785 struct pull_seqnum_cbdata {
786         int failed;
787         uint32_t pnn;
788         uint64_t seqnum;
789 };
790
791 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
792 {
793         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
794         uint64_t seqnum;
795
796         if (cb_data->failed != 0) {
797                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
798                 return;
799         }
800
801         if (res != 0) {
802                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
803                 cb_data->failed = 1;
804                 return;
805         }
806
807         if (outdata.dsize != sizeof(uint64_t)) {
808                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
809                 cb_data->failed = -1;
810                 return;
811         }
812
813         seqnum = *((uint64_t *)outdata.dptr);
814
815         if (seqnum > cb_data->seqnum ||
816             (cb_data->pnn == -1 && seqnum == 0)) {
817                 cb_data->seqnum = seqnum;
818                 cb_data->pnn = node_pnn;
819         }
820 }
821
822 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
823 {
824         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
825
826         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
827         cb_data->failed = 1;
828 }
829
830 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
831                                 struct ctdb_recoverd *rec, 
832                                 struct ctdb_node_map_old *nodemap, 
833                                 struct tdb_wrap *recdb, uint32_t dbid)
834 {
835         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
836         uint32_t *nodes;
837         TDB_DATA data;
838         uint32_t outdata[2];
839         struct pull_seqnum_cbdata *cb_data;
840
841         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
842
843         outdata[0] = dbid;
844         outdata[1] = 0;
845
846         data.dsize = sizeof(outdata);
847         data.dptr  = (uint8_t *)&outdata[0];
848
849         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
850         if (cb_data == NULL) {
851                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
852                 talloc_free(tmp_ctx);
853                 return -1;
854         }
855
856         cb_data->failed = 0;
857         cb_data->pnn    = -1;
858         cb_data->seqnum = 0;
859         
860         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
861         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
862                                         nodes, 0,
863                                         CONTROL_TIMEOUT(), false, data,
864                                         pull_seqnum_cb,
865                                         pull_seqnum_fail_cb,
866                                         cb_data) != 0) {
867                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
868
869                 talloc_free(tmp_ctx);
870                 return -1;
871         }
872
873         if (cb_data->failed != 0) {
874                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
875                 talloc_free(tmp_ctx);
876                 return -1;
877         }
878
879         if (cb_data->pnn == -1) {
880                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
881                 talloc_free(tmp_ctx);
882                 return -1;
883         }
884
885         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
886
887         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
888                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
889                 talloc_free(tmp_ctx);
890                 return -1;
891         }
892
893         talloc_free(tmp_ctx);
894         return 0;
895 }
896
897
898 /*
899   pull all the remote database contents into the recdb
900  */
901 static int pull_remote_database(struct ctdb_context *ctdb,
902                                 struct ctdb_recoverd *rec, 
903                                 struct ctdb_node_map_old *nodemap, 
904                                 struct tdb_wrap *recdb, uint32_t dbid,
905                                 bool persistent)
906 {
907         int j;
908
909         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
910                 int ret;
911                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
912                 if (ret == 0) {
913                         return 0;
914                 }
915         }
916
917         /* pull all records from all other nodes across onto this node
918            (this merges based on rsn)
919         */
920         for (j=0; j<nodemap->num; j++) {
921                 /* don't merge from nodes that are unavailable */
922                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
923                         continue;
924                 }
925                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
926                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
927                                  nodemap->nodes[j].pnn));
928                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
929                         return -1;
930                 }
931         }
932         
933         return 0;
934 }
935
936
937 /*
938   update flags on all active nodes
939  */
940 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
941 {
942         int ret;
943
944         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
945                 if (ret != 0) {
946                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
947                 return -1;
948         }
949
950         return 0;
951 }
952
953 /*
954   ensure all nodes have the same vnnmap we do
955  */
956 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
957                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
958 {
959         int j, ret;
960
961         /* push the new vnn map out to all the nodes */
962         for (j=0; j<nodemap->num; j++) {
963                 /* don't push to nodes that are unavailable */
964                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
965                         continue;
966                 }
967
968                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
969                 if (ret != 0) {
970                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
971                         return -1;
972                 }
973         }
974
975         return 0;
976 }
977
978
979 /*
980   called when a vacuum fetch has completed - just free it and do the next one
981  */
982 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
983 {
984         talloc_free(state);
985 }
986
987
988 /**
989  * Process one elements of the vacuum fetch list:
990  * Migrate it over to us with the special flag
991  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
992  */
993 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
994                                      uint32_t pnn,
995                                      struct ctdb_rec_data_old *r)
996 {
997         struct ctdb_client_call_state *state;
998         TDB_DATA data;
999         struct ctdb_ltdb_header *hdr;
1000         struct ctdb_call call;
1001
1002         ZERO_STRUCT(call);
1003         call.call_id = CTDB_NULL_FUNC;
1004         call.flags = CTDB_IMMEDIATE_MIGRATION;
1005         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1006
1007         call.key.dptr = &r->data[0];
1008         call.key.dsize = r->keylen;
1009
1010         /* ensure we don't block this daemon - just skip a record if we can't get
1011            the chainlock */
1012         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1013                 return true;
1014         }
1015
1016         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1017         if (data.dptr == NULL) {
1018                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1019                 return true;
1020         }
1021
1022         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1023                 free(data.dptr);
1024                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1025                 return true;
1026         }
1027
1028         hdr = (struct ctdb_ltdb_header *)data.dptr;
1029         if (hdr->dmaster == pnn) {
1030                 /* its already local */
1031                 free(data.dptr);
1032                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1033                 return true;
1034         }
1035
1036         free(data.dptr);
1037
1038         state = ctdb_call_send(ctdb_db, &call);
1039         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1040         if (state == NULL) {
1041                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1042                 return false;
1043         }
1044         state->async.fn = vacuum_fetch_callback;
1045         state->async.private_data = NULL;
1046
1047         return true;
1048 }
1049
1050
1051 /*
1052   handler for vacuum fetch
1053 */
1054 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1055                                  void *private_data)
1056 {
1057         struct ctdb_recoverd *rec = talloc_get_type(
1058                 private_data, struct ctdb_recoverd);
1059         struct ctdb_context *ctdb = rec->ctdb;
1060         struct ctdb_marshall_buffer *recs;
1061         int ret, i;
1062         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1063         const char *name;
1064         struct ctdb_dbid_map_old *dbmap=NULL;
1065         bool persistent = false;
1066         struct ctdb_db_context *ctdb_db;
1067         struct ctdb_rec_data_old *r;
1068
1069         recs = (struct ctdb_marshall_buffer *)data.dptr;
1070
1071         if (recs->count == 0) {
1072                 goto done;
1073         }
1074
1075         /* work out if the database is persistent */
1076         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1077         if (ret != 0) {
1078                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1079                 goto done;
1080         }
1081
1082         for (i=0;i<dbmap->num;i++) {
1083                 if (dbmap->dbs[i].db_id == recs->db_id) {
1084                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1085                         break;
1086                 }
1087         }
1088         if (i == dbmap->num) {
1089                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1090                 goto done;
1091         }
1092
1093         /* find the name of this database */
1094         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1095                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1096                 goto done;
1097         }
1098
1099         /* attach to it */
1100         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1101         if (ctdb_db == NULL) {
1102                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1103                 goto done;
1104         }
1105
1106         r = (struct ctdb_rec_data_old *)&recs->data[0];
1107         while (recs->count) {
1108                 bool ok;
1109
1110                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1111                 if (!ok) {
1112                         break;
1113                 }
1114
1115                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1116                 recs->count--;
1117         }
1118
1119 done:
1120         talloc_free(tmp_ctx);
1121 }
1122
1123
1124 /*
1125  * handler for database detach
1126  */
1127 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1128                                     void *private_data)
1129 {
1130         struct ctdb_recoverd *rec = talloc_get_type(
1131                 private_data, struct ctdb_recoverd);
1132         struct ctdb_context *ctdb = rec->ctdb;
1133         uint32_t db_id;
1134         struct ctdb_db_context *ctdb_db;
1135
1136         if (data.dsize != sizeof(db_id)) {
1137                 return;
1138         }
1139         db_id = *(uint32_t *)data.dptr;
1140
1141         ctdb_db = find_ctdb_db(ctdb, db_id);
1142         if (ctdb_db == NULL) {
1143                 /* database is not attached */
1144                 return;
1145         }
1146
1147         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1148
1149         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1150                              ctdb_db->db_name));
1151         talloc_free(ctdb_db);
1152 }
1153
1154 /*
1155   called when ctdb_wait_timeout should finish
1156  */
1157 static void ctdb_wait_handler(struct tevent_context *ev,
1158                               struct tevent_timer *te,
1159                               struct timeval yt, void *p)
1160 {
1161         uint32_t *timed_out = (uint32_t *)p;
1162         (*timed_out) = 1;
1163 }
1164
1165 /*
1166   wait for a given number of seconds
1167  */
1168 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1169 {
1170         uint32_t timed_out = 0;
1171         time_t usecs = (secs - (time_t)secs) * 1000000;
1172         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1173                          ctdb_wait_handler, &timed_out);
1174         while (!timed_out) {
1175                 tevent_loop_once(ctdb->ev);
1176         }
1177 }
1178
1179 /*
1180   called when an election times out (ends)
1181  */
1182 static void ctdb_election_timeout(struct tevent_context *ev,
1183                                   struct tevent_timer *te,
1184                                   struct timeval t, void *p)
1185 {
1186         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1187         rec->election_timeout = NULL;
1188         fast_start = false;
1189
1190         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1191 }
1192
1193
1194 /*
1195   wait for an election to finish. It finished election_timeout seconds after
1196   the last election packet is received
1197  */
1198 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1199 {
1200         struct ctdb_context *ctdb = rec->ctdb;
1201         while (rec->election_timeout) {
1202                 tevent_loop_once(ctdb->ev);
1203         }
1204 }
1205
1206 /*
1207   Update our local flags from all remote connected nodes. 
1208   This is only run when we are or we belive we are the recovery master
1209  */
1210 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1211 {
1212         int j;
1213         struct ctdb_context *ctdb = rec->ctdb;
1214         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1215
1216         /* get the nodemap for all active remote nodes and verify
1217            they are the same as for this node
1218          */
1219         for (j=0; j<nodemap->num; j++) {
1220                 struct ctdb_node_map_old *remote_nodemap=NULL;
1221                 int ret;
1222
1223                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1224                         continue;
1225                 }
1226                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1227                         continue;
1228                 }
1229
1230                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1231                                            mem_ctx, &remote_nodemap);
1232                 if (ret != 0) {
1233                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1234                                   nodemap->nodes[j].pnn));
1235                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1236                         talloc_free(mem_ctx);
1237                         return MONITOR_FAILED;
1238                 }
1239                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1240                         /* We should tell our daemon about this so it
1241                            updates its flags or else we will log the same 
1242                            message again in the next iteration of recovery.
1243                            Since we are the recovery master we can just as
1244                            well update the flags on all nodes.
1245                         */
1246                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1247                         if (ret != 0) {
1248                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1249                                 return -1;
1250                         }
1251
1252                         /* Update our local copy of the flags in the recovery
1253                            daemon.
1254                         */
1255                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1256                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1257                                  nodemap->nodes[j].flags));
1258                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1259                 }
1260                 talloc_free(remote_nodemap);
1261         }
1262         talloc_free(mem_ctx);
1263         return MONITOR_OK;
1264 }
1265
1266
1267 /* Create a new random generation id.
1268    The generation id can not be the INVALID_GENERATION id
1269 */
1270 static uint32_t new_generation(void)
1271 {
1272         uint32_t generation;
1273
1274         while (1) {
1275                 generation = random();
1276
1277                 if (generation != INVALID_GENERATION) {
1278                         break;
1279                 }
1280         }
1281
1282         return generation;
1283 }
1284
1285
1286 /*
1287   create a temporary working database
1288  */
1289 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1290 {
1291         char *name;
1292         struct tdb_wrap *recdb;
1293         unsigned tdb_flags;
1294
1295         /* open up the temporary recovery database */
1296         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1297                                ctdb->db_directory_state,
1298                                ctdb->pnn);
1299         if (name == NULL) {
1300                 return NULL;
1301         }
1302         unlink(name);
1303
1304         tdb_flags = TDB_NOLOCK;
1305         if (ctdb->valgrinding) {
1306                 tdb_flags |= TDB_NOMMAP;
1307         }
1308         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1309
1310         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1311                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1312         if (recdb == NULL) {
1313                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1314         }
1315
1316         talloc_free(name);
1317
1318         return recdb;
1319 }
1320
1321
1322 /* 
1323    a traverse function for pulling all relevant records from recdb
1324  */
1325 struct recdb_data {
1326         struct ctdb_context *ctdb;
1327         struct ctdb_marshall_buffer *recdata;
1328         uint32_t len;
1329         uint32_t allocated_len;
1330         bool failed;
1331         bool persistent;
1332 };
1333
1334 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1335 {
1336         struct recdb_data *params = (struct recdb_data *)p;
1337         struct ctdb_rec_data_old *recdata;
1338         struct ctdb_ltdb_header *hdr;
1339
1340         /*
1341          * skip empty records - but NOT for persistent databases:
1342          *
1343          * The record-by-record mode of recovery deletes empty records.
1344          * For persistent databases, this can lead to data corruption
1345          * by deleting records that should be there:
1346          *
1347          * - Assume the cluster has been running for a while.
1348          *
1349          * - A record R in a persistent database has been created and
1350          *   deleted a couple of times, the last operation being deletion,
1351          *   leaving an empty record with a high RSN, say 10.
1352          *
1353          * - Now a node N is turned off.
1354          *
1355          * - This leaves the local database copy of D on N with the empty
1356          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1357          *   the copy of record R.
1358          *
1359          * - Now the record is created again while node N is turned off.
1360          *   This creates R with RSN = 1 on all nodes except for N.
1361          *
1362          * - Now node N is turned on again. The following recovery will chose
1363          *   the older empty copy of R due to RSN 10 > RSN 1.
1364          *
1365          * ==> Hence the record is gone after the recovery.
1366          *
1367          * On databases like Samba's registry, this can damage the higher-level
1368          * data structures built from the various tdb-level records.
1369          */
1370         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1371                 return 0;
1372         }
1373
1374         /* update the dmaster field to point to us */
1375         hdr = (struct ctdb_ltdb_header *)data.dptr;
1376         if (!params->persistent) {
1377                 hdr->dmaster = params->ctdb->pnn;
1378                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1379         }
1380
1381         /* add the record to the blob ready to send to the nodes */
1382         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1383         if (recdata == NULL) {
1384                 params->failed = true;
1385                 return -1;
1386         }
1387         if (params->len + recdata->length >= params->allocated_len) {
1388                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1389                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1390         }
1391         if (params->recdata == NULL) {
1392                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1393                          recdata->length + params->len));
1394                 params->failed = true;
1395                 return -1;
1396         }
1397         params->recdata->count++;
1398         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1399         params->len += recdata->length;
1400         talloc_free(recdata);
1401
1402         return 0;
1403 }
1404
1405 /*
1406   push the recdb database out to all nodes
1407  */
1408 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1409                                bool persistent,
1410                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1411 {
1412         struct recdb_data params;
1413         struct ctdb_marshall_buffer *recdata;
1414         TDB_DATA outdata;
1415         TALLOC_CTX *tmp_ctx;
1416         uint32_t *nodes;
1417
1418         tmp_ctx = talloc_new(ctdb);
1419         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1420
1421         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1422         CTDB_NO_MEMORY(ctdb, recdata);
1423
1424         recdata->db_id = dbid;
1425
1426         params.ctdb = ctdb;
1427         params.recdata = recdata;
1428         params.len = offsetof(struct ctdb_marshall_buffer, data);
1429         params.allocated_len = params.len;
1430         params.failed = false;
1431         params.persistent = persistent;
1432
1433         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1434                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1435                 talloc_free(params.recdata);
1436                 talloc_free(tmp_ctx);
1437                 return -1;
1438         }
1439
1440         if (params.failed) {
1441                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1442                 talloc_free(params.recdata);
1443                 talloc_free(tmp_ctx);
1444                 return -1;              
1445         }
1446
1447         recdata = params.recdata;
1448
1449         outdata.dptr = (void *)recdata;
1450         outdata.dsize = params.len;
1451
1452         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1453         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1454                                         nodes, 0,
1455                                         CONTROL_TIMEOUT(), false, outdata,
1456                                         NULL, NULL,
1457                                         NULL) != 0) {
1458                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1459                 talloc_free(recdata);
1460                 talloc_free(tmp_ctx);
1461                 return -1;
1462         }
1463
1464         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1465                   dbid, recdata->count));
1466
1467         talloc_free(recdata);
1468         talloc_free(tmp_ctx);
1469
1470         return 0;
1471 }
1472
1473
1474 /*
1475   go through a full recovery on one database 
1476  */
1477 static int recover_database(struct ctdb_recoverd *rec, 
1478                             TALLOC_CTX *mem_ctx,
1479                             uint32_t dbid,
1480                             bool persistent,
1481                             uint32_t pnn, 
1482                             struct ctdb_node_map_old *nodemap,
1483                             uint32_t transaction_id)
1484 {
1485         struct tdb_wrap *recdb;
1486         int ret;
1487         struct ctdb_context *ctdb = rec->ctdb;
1488         TDB_DATA data;
1489         struct ctdb_transdb w;
1490         uint32_t *nodes;
1491
1492         recdb = create_recdb(ctdb, mem_ctx);
1493         if (recdb == NULL) {
1494                 return -1;
1495         }
1496
1497         /* pull all remote databases onto the recdb */
1498         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1499         if (ret != 0) {
1500                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1501                 return -1;
1502         }
1503
1504         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1505
1506         /* wipe all the remote databases. This is safe as we are in a transaction */
1507         w.db_id = dbid;
1508         w.tid = transaction_id;
1509
1510         data.dptr = (void *)&w;
1511         data.dsize = sizeof(w);
1512
1513         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1514         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1515                                         nodes, 0,
1516                                         CONTROL_TIMEOUT(), false, data,
1517                                         NULL, NULL,
1518                                         NULL) != 0) {
1519                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1520                 talloc_free(recdb);
1521                 return -1;
1522         }
1523         
1524         /* push out the correct database. This sets the dmaster and skips 
1525            the empty records */
1526         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1527         if (ret != 0) {
1528                 talloc_free(recdb);
1529                 return -1;
1530         }
1531
1532         /* all done with this database */
1533         talloc_free(recdb);
1534
1535         return 0;
1536 }
1537
1538 /* when we start a recovery, make sure all nodes use the same reclock file
1539    setting
1540 */
1541 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1542 {
1543         struct ctdb_context *ctdb = rec->ctdb;
1544         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1545         TDB_DATA data;
1546         uint32_t *nodes;
1547
1548         if (ctdb->recovery_lock_file == NULL) {
1549                 data.dptr  = NULL;
1550                 data.dsize = 0;
1551         } else {
1552                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1553                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1554         }
1555
1556         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1557         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1558                                         nodes, 0,
1559                                         CONTROL_TIMEOUT(),
1560                                         false, data,
1561                                         NULL, NULL,
1562                                         rec) != 0) {
1563                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1564                 talloc_free(tmp_ctx);
1565                 return -1;
1566         }
1567
1568         talloc_free(tmp_ctx);
1569         return 0;
1570 }
1571
1572
1573 /*
1574  * this callback is called for every node that failed to execute ctdb_takeover_run()
1575  * and set flag to re-run takeover run.
1576  */
1577 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1578 {
1579         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1580
1581         if (callback_data != NULL) {
1582                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1583
1584                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1585
1586                 ctdb_set_culprit(rec, node_pnn);
1587         }
1588 }
1589
1590
1591 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1592 {
1593         struct ctdb_context *ctdb = rec->ctdb;
1594         int i;
1595         struct ctdb_banning_state *ban_state;
1596
1597         *self_ban = false;
1598         for (i=0; i<ctdb->num_nodes; i++) {
1599                 if (ctdb->nodes[i]->ban_state == NULL) {
1600                         continue;
1601                 }
1602                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1603                 if (ban_state->count < 2*ctdb->num_nodes) {
1604                         continue;
1605                 }
1606
1607                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1608                         ctdb->nodes[i]->pnn, ban_state->count,
1609                         ctdb->tunable.recovery_ban_period));
1610                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1611                 ban_state->count = 0;
1612
1613                 /* Banning ourself? */
1614                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1615                         *self_ban = true;
1616                 }
1617         }
1618 }
1619
1620 static bool do_takeover_run(struct ctdb_recoverd *rec,
1621                             struct ctdb_node_map_old *nodemap,
1622                             bool banning_credits_on_fail)
1623 {
1624         uint32_t *nodes = NULL;
1625         struct ctdb_disable_message dtr;
1626         TDB_DATA data;
1627         int i;
1628         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1629         int ret;
1630         bool ok;
1631
1632         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1633
1634         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1635                 DEBUG(DEBUG_ERR, (__location__
1636                                   " takeover run already in progress \n"));
1637                 ok = false;
1638                 goto done;
1639         }
1640
1641         if (!ctdb_op_begin(rec->takeover_run)) {
1642                 ok = false;
1643                 goto done;
1644         }
1645
1646         /* Disable IP checks (takeover runs, really) on other nodes
1647          * while doing this takeover run.  This will stop those other
1648          * nodes from triggering takeover runs when think they should
1649          * be hosting an IP but it isn't yet on an interface.  Don't
1650          * wait for replies since a failure here might cause some
1651          * noise in the logs but will not actually cause a problem.
1652          */
1653         dtr.srvid = 0; /* No reply */
1654         dtr.pnn = -1;
1655
1656         data.dptr  = (uint8_t*)&dtr;
1657         data.dsize = sizeof(dtr);
1658
1659         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1660
1661         /* Disable for 60 seconds.  This can be a tunable later if
1662          * necessary.
1663          */
1664         dtr.timeout = 60;
1665         for (i = 0; i < talloc_array_length(nodes); i++) {
1666                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1667                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1668                                              data) != 0) {
1669                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1670                 }
1671         }
1672
1673         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1674                                 rec->force_rebalance_nodes,
1675                                 takeover_fail_callback,
1676                                 banning_credits_on_fail ? rec : NULL);
1677
1678         /* Reenable takeover runs and IP checks on other nodes */
1679         dtr.timeout = 0;
1680         for (i = 0; i < talloc_array_length(nodes); i++) {
1681                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1682                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1683                                              data) != 0) {
1684                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1685                 }
1686         }
1687
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1690                 ok = false;
1691                 goto done;
1692         }
1693
1694         ok = true;
1695         /* Takeover run was successful so clear force rebalance targets */
1696         if (rebalance_nodes == rec->force_rebalance_nodes) {
1697                 TALLOC_FREE(rec->force_rebalance_nodes);
1698         } else {
1699                 DEBUG(DEBUG_WARNING,
1700                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1701         }
1702 done:
1703         rec->need_takeover_run = !ok;
1704         talloc_free(nodes);
1705         ctdb_op_end(rec->takeover_run);
1706
1707         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1708         return ok;
1709 }
1710
1711 struct recovery_helper_state {
1712         int fd[2];
1713         pid_t pid;
1714         int result;
1715         bool done;
1716 };
1717
1718 static void ctdb_recovery_handler(struct tevent_context *ev,
1719                                   struct tevent_fd *fde,
1720                                   uint16_t flags, void *private_data)
1721 {
1722         struct recovery_helper_state *state = talloc_get_type_abort(
1723                 private_data, struct recovery_helper_state);
1724         int ret;
1725
1726         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1727         if (ret != sizeof(state->result)) {
1728                 state->result = EPIPE;
1729         }
1730
1731         state->done = true;
1732 }
1733
1734
1735 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1736 {
1737         static char prog[PATH_MAX+1] = "";
1738         const char **args;
1739         struct recovery_helper_state *state;
1740         struct tevent_fd *fde;
1741         int nargs, ret;
1742
1743         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1744                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1745                              "ctdb_recovery_helper")) {
1746                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1747         }
1748
1749         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1750         if (state == NULL) {
1751                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1752                 return -1;
1753         }
1754
1755         state->pid = -1;
1756
1757         ret = pipe(state->fd);
1758         if (ret != 0) {
1759                 DEBUG(DEBUG_ERR,
1760                       ("Failed to create pipe for recovery helper\n"));
1761                 goto fail;
1762         }
1763
1764         set_close_on_exec(state->fd[0]);
1765
1766         nargs = 4;
1767         args = talloc_array(state, const char *, nargs);
1768         if (args == NULL) {
1769                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1770                 goto fail;
1771         }
1772
1773         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1774         args[1] = rec->ctdb->daemon.name;
1775         args[2] = talloc_asprintf(args, "%u", new_generation());
1776         args[3] = NULL;
1777
1778         if (args[0] == NULL || args[2] == NULL) {
1779                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1780                 goto fail;
1781         }
1782
1783         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1784                                      args, NULL, NULL, &state->pid)) {
1785                 DEBUG(DEBUG_ERR,
1786                       ("Failed to create child for recovery helper\n"));
1787                 goto fail;
1788         }
1789
1790         close(state->fd[1]);
1791         state->fd[1] = -1;
1792
1793         state->done = false;
1794
1795         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1796                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1797         if (fde == NULL) {
1798                 goto fail;
1799         }
1800         tevent_fd_set_auto_close(fde);
1801
1802         while (!state->done) {
1803                 tevent_loop_once(rec->ctdb->ev);
1804         }
1805
1806         close(state->fd[0]);
1807         state->fd[0] = -1;
1808
1809         if (state->result != 0) {
1810                 goto fail;
1811         }
1812
1813         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1814         talloc_free(state);
1815         return 0;
1816
1817 fail:
1818         if (state->fd[0] != -1) {
1819                 close(state->fd[0]);
1820         }
1821         if (state->fd[1] != -1) {
1822                 close(state->fd[1]);
1823         }
1824         if (state->pid != -1) {
1825                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1826         }
1827         talloc_free(state);
1828         return -1;
1829 }
1830
1831 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1832                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1833                               struct ctdb_vnn_map *vnnmap,
1834                               struct ctdb_dbid_map_old *dbmap)
1835 {
1836         struct ctdb_context *ctdb = rec->ctdb;
1837         uint32_t generation;
1838         TDB_DATA data;
1839         uint32_t *nodes;
1840         int ret, i, j;
1841
1842         /* set recovery mode to active on all nodes */
1843         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1846                 return -1;
1847         }
1848
1849         /* execute the "startrecovery" event script on all nodes */
1850         ret = run_startrecovery_eventscript(rec, nodemap);
1851         if (ret!=0) {
1852                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1853                 return -1;
1854         }
1855
1856         /* pick a new generation number */
1857         generation = new_generation();
1858
1859         /* change the vnnmap on this node to use the new generation 
1860            number but not on any other nodes.
1861            this guarantees that if we abort the recovery prematurely
1862            for some reason (a node stops responding?)
1863            that we can just return immediately and we will reenter
1864            recovery shortly again.
1865            I.e. we deliberately leave the cluster with an inconsistent
1866            generation id to allow us to abort recovery at any stage and
1867            just restart it from scratch.
1868          */
1869         vnnmap->generation = generation;
1870         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1871         if (ret != 0) {
1872                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1873                 return -1;
1874         }
1875
1876         /* Database generations are updated when the transaction is commited to
1877          * the databases.  So make sure to use the final generation as the
1878          * transaction id
1879          */
1880         generation = new_generation();
1881
1882         data.dptr = (void *)&generation;
1883         data.dsize = sizeof(uint32_t);
1884
1885         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1886         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1887                                         nodes, 0,
1888                                         CONTROL_TIMEOUT(), false, data,
1889                                         NULL,
1890                                         transaction_start_fail_callback,
1891                                         rec) != 0) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1893                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1894                                         nodes, 0,
1895                                         CONTROL_TIMEOUT(), false, tdb_null,
1896                                         NULL,
1897                                         NULL,
1898                                         NULL) != 0) {
1899                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1900                 }
1901                 return -1;
1902         }
1903
1904         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1905
1906         for (i=0;i<dbmap->num;i++) {
1907                 ret = recover_database(rec, mem_ctx,
1908                                        dbmap->dbs[i].db_id,
1909                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1910                                        pnn, nodemap, generation);
1911                 if (ret != 0) {
1912                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
1913                         return -1;
1914                 }
1915         }
1916
1917         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1918
1919         /* commit all the changes */
1920         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1921                                         nodes, 0,
1922                                         CONTROL_TIMEOUT(), false, data,
1923                                         NULL, NULL,
1924                                         NULL) != 0) {
1925                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1926                 return -1;
1927         }
1928
1929         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1930
1931         /* build a new vnn map with all the currently active and
1932            unbanned nodes */
1933         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1934         CTDB_NO_MEMORY(ctdb, vnnmap);
1935         vnnmap->generation = generation;
1936         vnnmap->size = 0;
1937         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1938         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1939         for (i=j=0;i<nodemap->num;i++) {
1940                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1941                         continue;
1942                 }
1943                 if (!ctdb_node_has_capabilities(rec->caps,
1944                                                 ctdb->nodes[i]->pnn,
1945                                                 CTDB_CAP_LMASTER)) {
1946                         /* this node can not be an lmaster */
1947                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1948                         continue;
1949                 }
1950
1951                 vnnmap->size++;
1952                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1953                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1954                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1955
1956         }
1957         if (vnnmap->size == 0) {
1958                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1959                 vnnmap->size++;
1960                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1961                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1962                 vnnmap->map[0] = pnn;
1963         }
1964
1965         /* update to the new vnnmap on all nodes */
1966         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1967         if (ret != 0) {
1968                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1969                 return -1;
1970         }
1971
1972         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1973
1974         /* disable recovery mode */
1975         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
1976         if (ret != 0) {
1977                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1978                 return -1;
1979         }
1980
1981         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1982
1983         return 0;
1984 }
1985
1986 /*
1987   we are the recmaster, and recovery is needed - start a recovery run
1988  */
1989 static int do_recovery(struct ctdb_recoverd *rec,
1990                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1991                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1992 {
1993         struct ctdb_context *ctdb = rec->ctdb;
1994         int i, ret;
1995         struct ctdb_dbid_map_old *dbmap;
1996         struct timeval start_time;
1997         bool self_ban;
1998         bool par_recovery;
1999
2000         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2001
2002         /* Check if the current node is still the recmaster.  It's possible that
2003          * re-election has changed the recmaster.
2004          */
2005         if (pnn != rec->recmaster) {
2006                 DEBUG(DEBUG_NOTICE,
2007                       ("Recovery master changed to %u, aborting recovery\n",
2008                        rec->recmaster));
2009                 return -1;
2010         }
2011
2012         /* if recovery fails, force it again */
2013         rec->need_recovery = true;
2014
2015         if (!ctdb_op_begin(rec->recovery)) {
2016                 return -1;
2017         }
2018
2019         if (rec->election_timeout) {
2020                 /* an election is in progress */
2021                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2022                 goto fail;
2023         }
2024
2025         ban_misbehaving_nodes(rec, &self_ban);
2026         if (self_ban) {
2027                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2028                 goto fail;
2029         }
2030
2031         if (ctdb->recovery_lock_file != NULL) {
2032                 if (ctdb_recovery_have_lock(ctdb)) {
2033                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2034                 } else {
2035                         start_time = timeval_current();
2036                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2037                                              ctdb->recovery_lock_file));
2038                         if (!ctdb_recovery_lock(ctdb)) {
2039                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2040                                         /* If ctdb is trying first recovery, it's
2041                                          * possible that current node does not know
2042                                          * yet who the recmaster is.
2043                                          */
2044                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2045                                                           " - retrying recovery\n"));
2046                                         goto fail;
2047                                 }
2048
2049                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2050                                                  "and ban ourself for %u seconds\n",
2051                                                  ctdb->tunable.recovery_ban_period));
2052                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2053                                 goto fail;
2054                         }
2055                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2056                                                            CONTROL_TIMEOUT(),
2057                                                            timeval_elapsed(&start_time));
2058                         DEBUG(DEBUG_NOTICE,
2059                               ("Recovery lock taken successfully by recovery daemon\n"));
2060                 }
2061         }
2062
2063         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2064
2065         /* get a list of all databases */
2066         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2069                 goto fail;
2070         }
2071
2072         /* we do the db creation before we set the recovery mode, so the freeze happens
2073            on all databases we will be dealing with. */
2074
2075         /* verify that we have all the databases any other node has */
2076         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2077         if (ret != 0) {
2078                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2079                 goto fail;
2080         }
2081
2082         /* verify that all other nodes have all our databases */
2083         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2084         if (ret != 0) {
2085                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2086                 goto fail;
2087         }
2088         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2089
2090         /* update the database priority for all remote databases */
2091         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2094         }
2095         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2096
2097
2098         /* update all other nodes to use the same setting for reclock files
2099            as the local recovery master.
2100         */
2101         sync_recovery_lock_file_across_cluster(rec);
2102
2103         /* Retrieve capabilities from all connected nodes */
2104         ret = update_capabilities(rec, nodemap);
2105         if (ret!=0) {
2106                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2107                 return -1;
2108         }
2109
2110         /*
2111           update all nodes to have the same flags that we have
2112          */
2113         for (i=0;i<nodemap->num;i++) {
2114                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2115                         continue;
2116                 }
2117
2118                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2119                 if (ret != 0) {
2120                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2121                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2122                         } else {
2123                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2124                                 return -1;
2125                         }
2126                 }
2127         }
2128
2129         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2130
2131         /* Check if all participating nodes have parallel recovery capability */
2132         par_recovery = true;
2133         for (i=0; i<nodemap->num; i++) {
2134                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2135                         continue;
2136                 }
2137
2138                 if (!(rec->caps[i].capabilities &
2139                       CTDB_CAP_PARALLEL_RECOVERY)) {
2140                         par_recovery = false;
2141                         break;
2142                 }
2143         }
2144
2145         if (par_recovery) {
2146                 ret = db_recovery_parallel(rec, mem_ctx);
2147         } else {
2148                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2149                                          dbmap);
2150         }
2151
2152         if (ret != 0) {
2153                 goto fail;
2154         }
2155
2156         do_takeover_run(rec, nodemap, false);
2157
2158         /* execute the "recovered" event script on all nodes */
2159         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2160         if (ret!=0) {
2161                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2162                 goto fail;
2163         }
2164
2165         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2166
2167         /* send a message to all clients telling them that the cluster 
2168            has been reconfigured */
2169         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2170                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2171         if (ret != 0) {
2172                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2173                 goto fail;
2174         }
2175
2176         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2177
2178         rec->need_recovery = false;
2179         ctdb_op_end(rec->recovery);
2180
2181         /* we managed to complete a full recovery, make sure to forgive
2182            any past sins by the nodes that could now participate in the
2183            recovery.
2184         */
2185         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2186         for (i=0;i<nodemap->num;i++) {
2187                 struct ctdb_banning_state *ban_state;
2188
2189                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2190                         continue;
2191                 }
2192
2193                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2194                 if (ban_state == NULL) {
2195                         continue;
2196                 }
2197
2198                 ban_state->count = 0;
2199         }
2200
2201         /* We just finished a recovery successfully.
2202            We now wait for rerecovery_timeout before we allow
2203            another recovery to take place.
2204         */
2205         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2206         ctdb_op_disable(rec->recovery, ctdb->ev,
2207                         ctdb->tunable.rerecovery_timeout);
2208         return 0;
2209
2210 fail:
2211         ctdb_op_end(rec->recovery);
2212         return -1;
2213 }
2214
2215
2216 /*
2217   elections are won by first checking the number of connected nodes, then
2218   the priority time, then the pnn
2219  */
2220 struct election_message {
2221         uint32_t num_connected;
2222         struct timeval priority_time;
2223         uint32_t pnn;
2224         uint32_t node_flags;
2225 };
2226
2227 /*
2228   form this nodes election data
2229  */
2230 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2231 {
2232         int ret, i;
2233         struct ctdb_node_map_old *nodemap;
2234         struct ctdb_context *ctdb = rec->ctdb;
2235
2236         ZERO_STRUCTP(em);
2237
2238         em->pnn = rec->ctdb->pnn;
2239         em->priority_time = rec->priority_time;
2240
2241         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2242         if (ret != 0) {
2243                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2244                 return;
2245         }
2246
2247         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2248         em->node_flags = rec->node_flags;
2249
2250         for (i=0;i<nodemap->num;i++) {
2251                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2252                         em->num_connected++;
2253                 }
2254         }
2255
2256         /* we shouldnt try to win this election if we cant be a recmaster */
2257         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2258                 em->num_connected = 0;
2259                 em->priority_time = timeval_current();
2260         }
2261
2262         talloc_free(nodemap);
2263 }
2264
2265 /*
2266   see if the given election data wins
2267  */
2268 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2269 {
2270         struct election_message myem;
2271         int cmp = 0;
2272
2273         ctdb_election_data(rec, &myem);
2274
2275         /* we cant win if we don't have the recmaster capability */
2276         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2277                 return false;
2278         }
2279
2280         /* we cant win if we are banned */
2281         if (rec->node_flags & NODE_FLAGS_BANNED) {
2282                 return false;
2283         }
2284
2285         /* we cant win if we are stopped */
2286         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2287                 return false;
2288         }
2289
2290         /* we will automatically win if the other node is banned */
2291         if (em->node_flags & NODE_FLAGS_BANNED) {
2292                 return true;
2293         }
2294
2295         /* we will automatically win if the other node is banned */
2296         if (em->node_flags & NODE_FLAGS_STOPPED) {
2297                 return true;
2298         }
2299
2300         /* then the longest running node */
2301         if (cmp == 0) {
2302                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2303         }
2304
2305         if (cmp == 0) {
2306                 cmp = (int)myem.pnn - (int)em->pnn;
2307         }
2308
2309         return cmp > 0;
2310 }
2311
2312 /*
2313   send out an election request
2314  */
2315 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2316 {
2317         int ret;
2318         TDB_DATA election_data;
2319         struct election_message emsg;
2320         uint64_t srvid;
2321         struct ctdb_context *ctdb = rec->ctdb;
2322
2323         srvid = CTDB_SRVID_ELECTION;
2324
2325         ctdb_election_data(rec, &emsg);
2326
2327         election_data.dsize = sizeof(struct election_message);
2328         election_data.dptr  = (unsigned char *)&emsg;
2329
2330
2331         /* first we assume we will win the election and set 
2332            recoverymaster to be ourself on the current node
2333          */
2334         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2335                                      CTDB_CURRENT_NODE, pnn);
2336         if (ret != 0) {
2337                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
2338                 return -1;
2339         }
2340         rec->recmaster = pnn;
2341
2342         /* send an election message to all active nodes */
2343         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2344         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2345 }
2346
2347 /*
2348   we think we are winning the election - send a broadcast election request
2349  */
2350 static void election_send_request(struct tevent_context *ev,
2351                                   struct tevent_timer *te,
2352                                   struct timeval t, void *p)
2353 {
2354         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2355         int ret;
2356
2357         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2358         if (ret != 0) {
2359                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2360         }
2361
2362         TALLOC_FREE(rec->send_election_te);
2363 }
2364
2365 /*
2366   handler for memory dumps
2367 */
2368 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2369 {
2370         struct ctdb_recoverd *rec = talloc_get_type(
2371                 private_data, struct ctdb_recoverd);
2372         struct ctdb_context *ctdb = rec->ctdb;
2373         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2374         TDB_DATA *dump;
2375         int ret;
2376         struct ctdb_srvid_message *rd;
2377
2378         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2379                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2380                 talloc_free(tmp_ctx);
2381                 return;
2382         }
2383         rd = (struct ctdb_srvid_message *)data.dptr;
2384
2385         dump = talloc_zero(tmp_ctx, TDB_DATA);
2386         if (dump == NULL) {
2387                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2388                 talloc_free(tmp_ctx);
2389                 return;
2390         }
2391         ret = ctdb_dump_memory(ctdb, dump);
2392         if (ret != 0) {
2393                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2394                 talloc_free(tmp_ctx);
2395                 return;
2396         }
2397
2398 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2399
2400         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2401         if (ret != 0) {
2402                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2403                 talloc_free(tmp_ctx);
2404                 return;
2405         }
2406
2407         talloc_free(tmp_ctx);
2408 }
2409
2410 /*
2411   handler for reload_nodes
2412 */
2413 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2414                                  void *private_data)
2415 {
2416         struct ctdb_recoverd *rec = talloc_get_type(
2417                 private_data, struct ctdb_recoverd);
2418
2419         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2420
2421         ctdb_load_nodes_file(rec->ctdb);
2422 }
2423
2424
2425 static void ctdb_rebalance_timeout(struct tevent_context *ev,
2426                                    struct tevent_timer *te,
2427                                    struct timeval t, void *p)
2428 {
2429         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2430
2431         if (rec->force_rebalance_nodes == NULL) {
2432                 DEBUG(DEBUG_ERR,
2433                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2434                 return;
2435         }
2436
2437         DEBUG(DEBUG_NOTICE,
2438               ("Rebalance timeout occurred - trigger takeover run\n"));
2439         rec->need_takeover_run = true;
2440 }
2441
2442
2443 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2444                                         void *private_data)
2445 {
2446         struct ctdb_recoverd *rec = talloc_get_type(
2447                 private_data, struct ctdb_recoverd);
2448         struct ctdb_context *ctdb = rec->ctdb;
2449         uint32_t pnn;
2450         uint32_t *t;
2451         int len;
2452         uint32_t deferred_rebalance;
2453
2454         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2455                 return;
2456         }
2457
2458         if (data.dsize != sizeof(uint32_t)) {
2459                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2460                 return;
2461         }
2462
2463         pnn = *(uint32_t *)&data.dptr[0];
2464
2465         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2466
2467         /* Copy any existing list of nodes.  There's probably some
2468          * sort of realloc variant that will do this but we need to
2469          * make sure that freeing the old array also cancels the timer
2470          * event for the timeout... not sure if realloc will do that.
2471          */
2472         len = (rec->force_rebalance_nodes != NULL) ?
2473                 talloc_array_length(rec->force_rebalance_nodes) :
2474                 0;
2475
2476         /* This allows duplicates to be added but they don't cause
2477          * harm.  A call to add a duplicate PNN arguably means that
2478          * the timeout should be reset, so this is the simplest
2479          * solution.
2480          */
2481         t = talloc_zero_array(rec, uint32_t, len+1);
2482         CTDB_NO_MEMORY_VOID(ctdb, t);
2483         if (len > 0) {
2484                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2485         }
2486         t[len] = pnn;
2487
2488         talloc_free(rec->force_rebalance_nodes);
2489
2490         rec->force_rebalance_nodes = t;
2491
2492         /* If configured, setup a deferred takeover run to make sure
2493          * that certain nodes get IPs rebalanced to them.  This will
2494          * be cancelled if a successful takeover run happens before
2495          * the timeout.  Assign tunable value to variable for
2496          * readability.
2497          */
2498         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2499         if (deferred_rebalance != 0) {
2500                 tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
2501                                  timeval_current_ofs(deferred_rebalance, 0),
2502                                  ctdb_rebalance_timeout, rec);
2503         }
2504 }
2505
2506
2507
2508 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2509                                    void *private_data)
2510 {
2511         struct ctdb_recoverd *rec = talloc_get_type(
2512                 private_data, struct ctdb_recoverd);
2513         struct ctdb_public_ip *ip;
2514
2515         if (rec->recmaster != rec->ctdb->pnn) {
2516                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2517                 return;
2518         }
2519
2520         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2521                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2522                 return;
2523         }
2524
2525         ip = (struct ctdb_public_ip *)data.dptr;
2526
2527         update_ip_assignment_tree(rec->ctdb, ip);
2528 }
2529
2530 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2531                                     TDB_DATA data,
2532                                     struct ctdb_op_state *op_state)
2533 {
2534         struct ctdb_disable_message *r;
2535         uint32_t timeout;
2536         TDB_DATA result;
2537         int32_t ret = 0;
2538
2539         /* Validate input data */
2540         if (data.dsize != sizeof(struct ctdb_disable_message)) {
2541                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2542                                  "expecting %lu\n", (long unsigned)data.dsize,
2543                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
2544                 return;
2545         }
2546         if (data.dptr == NULL) {
2547                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2548                 return;
2549         }
2550
2551         r = (struct ctdb_disable_message *)data.dptr;
2552         timeout = r->timeout;
2553
2554         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2555         if (ret != 0) {
2556                 goto done;
2557         }
2558
2559         /* Returning our PNN tells the caller that we succeeded */
2560         ret = ctdb_get_pnn(ctdb);
2561 done:
2562         result.dsize = sizeof(int32_t);
2563         result.dptr  = (uint8_t *)&ret;
2564         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
2565 }
2566
2567 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2568                                           void *private_data)
2569 {
2570         struct ctdb_recoverd *rec = talloc_get_type(
2571                 private_data, struct ctdb_recoverd);
2572
2573         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2574 }
2575
2576 /* Backward compatibility for this SRVID */
2577 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2578                                      void *private_data)
2579 {
2580         struct ctdb_recoverd *rec = talloc_get_type(
2581                 private_data, struct ctdb_recoverd);
2582         uint32_t timeout;
2583
2584         if (data.dsize != sizeof(uint32_t)) {
2585                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2586                                  "expecting %lu\n", (long unsigned)data.dsize,
2587                                  (long unsigned)sizeof(uint32_t)));
2588                 return;
2589         }
2590         if (data.dptr == NULL) {
2591                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2592                 return;
2593         }
2594
2595         timeout = *((uint32_t *)data.dptr);
2596
2597         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2598 }
2599
2600 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2601                                        void *private_data)
2602 {
2603         struct ctdb_recoverd *rec = talloc_get_type(
2604                 private_data, struct ctdb_recoverd);
2605
2606         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2607 }
2608
2609 /*
2610   handler for ip reallocate, just add it to the list of requests and 
2611   handle this later in the monitor_cluster loop so we do not recurse
2612   with other requests to takeover_run()
2613 */
2614 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2615                                   void *private_data)
2616 {
2617         struct ctdb_srvid_message *request;
2618         struct ctdb_recoverd *rec = talloc_get_type(
2619                 private_data, struct ctdb_recoverd);
2620
2621         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2622                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2623                 return;
2624         }
2625
2626         request = (struct ctdb_srvid_message *)data.dptr;
2627
2628         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2629 }
2630
2631 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2632                                           struct ctdb_recoverd *rec)
2633 {
2634         TDB_DATA result;
2635         int32_t ret;
2636         struct srvid_requests *current;
2637
2638         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2639
2640         /* Only process requests that are currently pending.  More
2641          * might come in while the takeover run is in progress and
2642          * they will need to be processed later since they might
2643          * be in response flag changes.
2644          */
2645         current = rec->reallocate_requests;
2646         rec->reallocate_requests = NULL;
2647
2648         if (do_takeover_run(rec, rec->nodemap, false)) {
2649                 ret = ctdb_get_pnn(ctdb);
2650         } else {
2651                 ret = -1;
2652         }
2653
2654         result.dsize = sizeof(int32_t);
2655         result.dptr  = (uint8_t *)&ret;
2656
2657         srvid_requests_reply(ctdb, &current, result);
2658 }
2659
2660
2661 /*
2662   handler for recovery master elections
2663 */
2664 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2665 {
2666         struct ctdb_recoverd *rec = talloc_get_type(
2667                 private_data, struct ctdb_recoverd);
2668         struct ctdb_context *ctdb = rec->ctdb;
2669         int ret;
2670         struct election_message *em = (struct election_message *)data.dptr;
2671
2672         /* Ignore election packets from ourself */
2673         if (ctdb->pnn == em->pnn) {
2674                 return;
2675         }
2676
2677         /* we got an election packet - update the timeout for the election */
2678         talloc_free(rec->election_timeout);
2679         rec->election_timeout = tevent_add_timer(
2680                         ctdb->ev, ctdb,
2681                         fast_start ?
2682                                 timeval_current_ofs(0, 500000) :
2683                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2684                         ctdb_election_timeout, rec);
2685
2686         /* someone called an election. check their election data
2687            and if we disagree and we would rather be the elected node, 
2688            send a new election message to all other nodes
2689          */
2690         if (ctdb_election_win(rec, em)) {
2691                 if (!rec->send_election_te) {
2692                         rec->send_election_te = tevent_add_timer(
2693                                         ctdb->ev, rec,
2694                                         timeval_current_ofs(0, 500000),
2695                                         election_send_request, rec);
2696                 }
2697                 return;
2698         }
2699
2700         /* we didn't win */
2701         TALLOC_FREE(rec->send_election_te);
2702
2703         /* Release the recovery lock file */
2704         if (ctdb_recovery_have_lock(ctdb)) {
2705                 ctdb_recovery_unlock(ctdb);
2706         }
2707
2708         clear_ip_assignment_tree(ctdb);
2709
2710         /* ok, let that guy become recmaster then */
2711         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2712                                      CTDB_CURRENT_NODE, em->pnn);
2713         if (ret != 0) {
2714                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
2715                 return;
2716         }
2717         rec->recmaster = em->pnn;
2718
2719         return;
2720 }
2721
2722
2723 /*
2724   force the start of the election process
2725  */
2726 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2727                            struct ctdb_node_map_old *nodemap)
2728 {
2729         int ret;
2730         struct ctdb_context *ctdb = rec->ctdb;
2731
2732         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2733
2734         /* set all nodes to recovery mode to stop all internode traffic */
2735         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2736         if (ret != 0) {
2737                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2738                 return;
2739         }
2740
2741         talloc_free(rec->election_timeout);
2742         rec->election_timeout = tevent_add_timer(
2743                         ctdb->ev, ctdb,
2744                         fast_start ?
2745                                 timeval_current_ofs(0, 500000) :
2746                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2747                         ctdb_election_timeout, rec);
2748
2749         ret = send_election_request(rec, pnn);
2750         if (ret!=0) {
2751                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2752                 return;
2753         }
2754
2755         /* wait for a few seconds to collect all responses */
2756         ctdb_wait_election(rec);
2757 }
2758
2759
2760
2761 /*
2762   handler for when a node changes its flags
2763 */
2764 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2765 {
2766         struct ctdb_recoverd *rec = talloc_get_type(
2767                 private_data, struct ctdb_recoverd);
2768         struct ctdb_context *ctdb = rec->ctdb;
2769         int ret;
2770         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2771         struct ctdb_node_map_old *nodemap=NULL;
2772         TALLOC_CTX *tmp_ctx;
2773         int i;
2774         int disabled_flag_changed;
2775
2776         if (data.dsize != sizeof(*c)) {
2777                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2778                 return;
2779         }
2780
2781         tmp_ctx = talloc_new(ctdb);
2782         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2783
2784         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2785         if (ret != 0) {
2786                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2787                 talloc_free(tmp_ctx);
2788                 return;         
2789         }
2790
2791
2792         for (i=0;i<nodemap->num;i++) {
2793                 if (nodemap->nodes[i].pnn == c->pnn) break;
2794         }
2795
2796         if (i == nodemap->num) {
2797                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2798                 talloc_free(tmp_ctx);
2799                 return;
2800         }
2801
2802         if (c->old_flags != c->new_flags) {
2803                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2804         }
2805
2806         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2807
2808         nodemap->nodes[i].flags = c->new_flags;
2809
2810         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(),
2811                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2812
2813         if (ret == 0 &&
2814             rec->recmaster == ctdb->pnn &&
2815             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2816                 /* Only do the takeover run if the perm disabled or unhealthy
2817                    flags changed since these will cause an ip failover but not
2818                    a recovery.
2819                    If the node became disconnected or banned this will also
2820                    lead to an ip address failover but that is handled 
2821                    during recovery
2822                 */
2823                 if (disabled_flag_changed) {
2824                         rec->need_takeover_run = true;
2825                 }
2826         }
2827
2828         talloc_free(tmp_ctx);
2829 }
2830
2831 /*
2832   handler for when we need to push out flag changes ot all other nodes
2833 */
2834 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2835                                void *private_data)
2836 {
2837         struct ctdb_recoverd *rec = talloc_get_type(
2838                 private_data, struct ctdb_recoverd);
2839         struct ctdb_context *ctdb = rec->ctdb;
2840         int ret;
2841         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2842         struct ctdb_node_map_old *nodemap=NULL;
2843         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2844         uint32_t *nodes;
2845
2846         /* read the node flags from the recmaster */
2847         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2848                                    tmp_ctx, &nodemap);
2849         if (ret != 0) {
2850                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2851                 talloc_free(tmp_ctx);
2852                 return;
2853         }
2854         if (c->pnn >= nodemap->num) {
2855                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2856                 talloc_free(tmp_ctx);
2857                 return;
2858         }
2859
2860         /* send the flags update to all connected nodes */
2861         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2862
2863         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2864                                       nodes, 0, CONTROL_TIMEOUT(),
2865                                       false, data,
2866                                       NULL, NULL,
2867                                       NULL) != 0) {
2868                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2869
2870                 talloc_free(tmp_ctx);
2871                 return;
2872         }
2873
2874         talloc_free(tmp_ctx);
2875 }
2876
2877
2878 struct verify_recmode_normal_data {
2879         uint32_t count;
2880         enum monitor_result status;
2881 };
2882
2883 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2884 {
2885         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2886
2887
2888         /* one more node has responded with recmode data*/
2889         rmdata->count--;
2890
2891         /* if we failed to get the recmode, then return an error and let
2892            the main loop try again.
2893         */
2894         if (state->state != CTDB_CONTROL_DONE) {
2895                 if (rmdata->status == MONITOR_OK) {
2896                         rmdata->status = MONITOR_FAILED;
2897                 }
2898                 return;
2899         }
2900
2901         /* if we got a response, then the recmode will be stored in the
2902            status field
2903         */
2904         if (state->status != CTDB_RECOVERY_NORMAL) {
2905                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2906                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2907         }
2908
2909         return;
2910 }
2911
2912
2913 /* verify that all nodes are in normal recovery mode */
2914 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2915 {
2916         struct verify_recmode_normal_data *rmdata;
2917         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2918         struct ctdb_client_control_state *state;
2919         enum monitor_result status;
2920         int j;
2921         
2922         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2923         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2924         rmdata->count  = 0;
2925         rmdata->status = MONITOR_OK;
2926
2927         /* loop over all active nodes and send an async getrecmode call to 
2928            them*/
2929         for (j=0; j<nodemap->num; j++) {
2930                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2931                         continue;
2932                 }
2933                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2934                                         CONTROL_TIMEOUT(), 
2935                                         nodemap->nodes[j].pnn);
2936                 if (state == NULL) {
2937                         /* we failed to send the control, treat this as 
2938                            an error and try again next iteration
2939                         */                      
2940                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2941                         talloc_free(mem_ctx);
2942                         return MONITOR_FAILED;
2943                 }
2944
2945                 /* set up the callback functions */
2946                 state->async.fn = verify_recmode_normal_callback;
2947                 state->async.private_data = rmdata;
2948
2949                 /* one more control to wait for to complete */
2950                 rmdata->count++;
2951         }
2952
2953
2954         /* now wait for up to the maximum number of seconds allowed
2955            or until all nodes we expect a response from has replied
2956         */
2957         while (rmdata->count > 0) {
2958                 tevent_loop_once(ctdb->ev);
2959         }
2960
2961         status = rmdata->status;
2962         talloc_free(mem_ctx);
2963         return status;
2964 }
2965
2966
2967 struct verify_recmaster_data {
2968         struct ctdb_recoverd *rec;
2969         uint32_t count;
2970         uint32_t pnn;
2971         enum monitor_result status;
2972 };
2973
2974 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2975 {
2976         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2977
2978
2979         /* one more node has responded with recmaster data*/
2980         rmdata->count--;
2981
2982         /* if we failed to get the recmaster, then return an error and let
2983            the main loop try again.
2984         */
2985         if (state->state != CTDB_CONTROL_DONE) {
2986                 if (rmdata->status == MONITOR_OK) {
2987                         rmdata->status = MONITOR_FAILED;
2988                 }
2989                 return;
2990         }
2991
2992         /* if we got a response, then the recmaster will be stored in the
2993            status field
2994         */
2995         if (state->status != rmdata->pnn) {
2996                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2997                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2998                 rmdata->status = MONITOR_ELECTION_NEEDED;
2999         }
3000
3001         return;
3002 }
3003
3004
3005 /* verify that all nodes agree that we are the recmaster */
3006 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3007 {
3008         struct ctdb_context *ctdb = rec->ctdb;
3009         struct verify_recmaster_data *rmdata;
3010         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3011         struct ctdb_client_control_state *state;
3012         enum monitor_result status;
3013         int j;
3014         
3015         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3016         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3017         rmdata->rec    = rec;
3018         rmdata->count  = 0;
3019         rmdata->pnn    = pnn;
3020         rmdata->status = MONITOR_OK;
3021
3022         /* loop over all active nodes and send an async getrecmaster call to
3023            them*/
3024         for (j=0; j<nodemap->num; j++) {
3025                 if (nodemap->nodes[j].pnn == rec->recmaster) {
3026                         continue;
3027                 }
3028                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3029                         continue;
3030                 }
3031                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3032                                         CONTROL_TIMEOUT(),
3033                                         nodemap->nodes[j].pnn);
3034                 if (state == NULL) {
3035                         /* we failed to send the control, treat this as 
3036                            an error and try again next iteration
3037                         */                      
3038                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3039                         talloc_free(mem_ctx);
3040                         return MONITOR_FAILED;
3041                 }
3042
3043                 /* set up the callback functions */
3044                 state->async.fn = verify_recmaster_callback;
3045                 state->async.private_data = rmdata;
3046
3047                 /* one more control to wait for to complete */
3048                 rmdata->count++;
3049         }
3050
3051
3052         /* now wait for up to the maximum number of seconds allowed
3053            or until all nodes we expect a response from has replied
3054         */
3055         while (rmdata->count > 0) {
3056                 tevent_loop_once(ctdb->ev);
3057         }
3058
3059         status = rmdata->status;
3060         talloc_free(mem_ctx);
3061         return status;
3062 }
3063
3064 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3065                                     struct ctdb_recoverd *rec)
3066 {
3067         struct ctdb_iface_list_old *ifaces = NULL;
3068         TALLOC_CTX *mem_ctx;
3069         bool ret = false;
3070
3071         mem_ctx = talloc_new(NULL);
3072
3073         /* Read the interfaces from the local node */
3074         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3075                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3076                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3077                 /* We could return an error.  However, this will be
3078                  * rare so we'll decide that the interfaces have
3079                  * actually changed, just in case.
3080                  */
3081                 talloc_free(mem_ctx);
3082                 return true;
3083         }
3084
3085         if (!rec->ifaces) {
3086                 /* We haven't been here before so things have changed */
3087                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3088                 ret = true;
3089         } else if (rec->ifaces->num != ifaces->num) {
3090                 /* Number of interfaces has changed */
3091                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3092                                      rec->ifaces->num, ifaces->num));
3093                 ret = true;
3094         } else {
3095                 /* See if interface names or link states have changed */
3096                 int i;
3097                 for (i = 0; i < rec->ifaces->num; i++) {
3098                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
3099                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3100                                 DEBUG(DEBUG_NOTICE,
3101                                       ("Interface in slot %d changed: %s => %s\n",
3102                                        i, iface->name, ifaces->ifaces[i].name));
3103                                 ret = true;
3104                                 break;
3105                         }
3106                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3107                                 DEBUG(DEBUG_NOTICE,
3108                                       ("Interface %s changed state: %d => %d\n",
3109                                        iface->name, iface->link_state,
3110                                        ifaces->ifaces[i].link_state));
3111                                 ret = true;
3112                                 break;
3113                         }
3114                 }
3115         }
3116
3117         talloc_free(rec->ifaces);
3118         rec->ifaces = talloc_steal(rec, ifaces);
3119
3120         talloc_free(mem_ctx);
3121         return ret;
3122 }
3123
3124 /* called to check that the local allocation of public ip addresses is ok.
3125 */
3126 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
3127 {
3128         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3129         int ret, j;
3130         bool need_takeover_run = false;
3131
3132         if (interfaces_have_changed(ctdb, rec)) {
3133                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3134                                      "local node %u - force takeover run\n",
3135                                      pnn));
3136                 need_takeover_run = true;
3137         }
3138
3139         /* verify that we have the ip addresses we should have
3140            and we don't have ones we shouldnt have.
3141            if we find an inconsistency we set recmode to
3142            active on the local node and wait for the recmaster
3143            to do a full blown recovery.
3144            also if the pnn is -1 and we are healthy and can host the ip
3145            we also request a ip reallocation.
3146         */
3147         if (ctdb->tunable.disable_ip_failover == 0) {
3148                 struct ctdb_public_ip_list_old *ips = NULL;
3149
3150                 /* read the *available* IPs from the local node */
3151                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3152                 if (ret != 0) {
3153                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3154                         talloc_free(mem_ctx);
3155                         return -1;
3156                 }
3157
3158                 for (j=0; j<ips->num; j++) {
3159                         if (ips->ips[j].pnn == -1 &&
3160                             nodemap->nodes[pnn].flags == 0) {
3161                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3162                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3163                                 need_takeover_run = true;
3164                         }
3165                 }
3166
3167                 talloc_free(ips);
3168
3169                 /* read the *known* IPs from the local node */
3170                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3171                 if (ret != 0) {
3172                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3173                         talloc_free(mem_ctx);
3174                         return -1;
3175                 }
3176
3177                 for (j=0; j<ips->num; j++) {
3178                         if (ips->ips[j].pnn == pnn) {
3179                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3180                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3181                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3182                                         need_takeover_run = true;
3183                                 }
3184                         } else {
3185                                 if (ctdb->do_checkpublicip &&
3186                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3187
3188                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3189                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3190
3191                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3192                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3193                                         }
3194                                 }
3195                         }
3196                 }
3197         }
3198
3199         if (need_takeover_run) {
3200                 struct ctdb_srvid_message rd;
3201                 TDB_DATA data;
3202
3203                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3204
3205                 rd.pnn = ctdb->pnn;
3206                 rd.srvid = 0;
3207                 data.dptr = (uint8_t *)&rd;
3208                 data.dsize = sizeof(rd);
3209
3210                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3211                 if (ret != 0) {
3212                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3213                 }
3214         }
3215         talloc_free(mem_ctx);
3216         return 0;
3217 }
3218
3219
3220 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3221 {
3222         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3223
3224         if (node_pnn >= ctdb->num_nodes) {
3225                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3226                 return;
3227         }
3228
3229         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3230
3231 }
3232
3233 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3234         struct ctdb_node_map_old *nodemap,
3235         struct ctdb_node_map_old **remote_nodemaps)
3236 {
3237         uint32_t *nodes;
3238
3239         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3240         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3241                                         nodes, 0,
3242                                         CONTROL_TIMEOUT(), false, tdb_null,
3243                                         async_getnodemap_callback,
3244                                         NULL,
3245                                         remote_nodemaps) != 0) {
3246                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3247
3248                 return -1;
3249         }
3250
3251         return 0;
3252 }
3253
3254 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3255 {
3256         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3257         const char *reclockfile;
3258
3259         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3260                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3261                 talloc_free(tmp_ctx);
3262                 return -1;      
3263         }
3264
3265         if (reclockfile == NULL) {
3266                 if (ctdb->recovery_lock_file != NULL) {
3267                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3268                         talloc_free(ctdb->recovery_lock_file);
3269                         ctdb->recovery_lock_file = NULL;
3270                         ctdb_recovery_unlock(ctdb);
3271                 }
3272                 talloc_free(tmp_ctx);
3273                 return 0;
3274         }
3275
3276         if (ctdb->recovery_lock_file == NULL) {
3277                 DEBUG(DEBUG_NOTICE,
3278                       ("Recovery lock file enabled (%s)\n", reclockfile));
3279                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3280                 ctdb_recovery_unlock(ctdb);
3281                 talloc_free(tmp_ctx);
3282                 return 0;
3283         }
3284
3285
3286         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3287                 talloc_free(tmp_ctx);
3288                 return 0;
3289         }
3290
3291         DEBUG(DEBUG_NOTICE,
3292               ("Recovery lock file changed (now %s)\n", reclockfile));
3293         talloc_free(ctdb->recovery_lock_file);
3294         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3295         ctdb_recovery_unlock(ctdb);
3296
3297         talloc_free(tmp_ctx);
3298         return 0;
3299 }
3300
3301 static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
3302                                                     TALLOC_CTX *mem_ctx)
3303 {
3304         struct ctdb_context *ctdb = rec->ctdb;
3305         uint32_t pnn = ctdb_get_pnn(ctdb);
3306         struct ctdb_node_map_old *nodemap = rec->nodemap;
3307         struct ctdb_node_map_old *recmaster_nodemap = NULL;
3308         int ret;
3309
3310         /* When recovery daemon is started, recmaster is set to
3311          * "unknown" so it knows to start an election.
3312          */
3313         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
3314                 DEBUG(DEBUG_NOTICE,
3315                       ("Initial recovery master set - forcing election\n"));
3316                 return MONITOR_ELECTION_NEEDED;
3317         }
3318
3319         /*
3320          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3321          * but we have, then force an election and try to become the new
3322          * recmaster.
3323          */
3324         if (!ctdb_node_has_capabilities(rec->caps,
3325                                         rec->recmaster,
3326                                         CTDB_CAP_RECMASTER) &&
3327             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3328             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3329                 DEBUG(DEBUG_ERR,
3330                       (" Current recmaster node %u does not have CAP_RECMASTER,"
3331                        " but we (node %u) have - force an election\n",
3332                        rec->recmaster, pnn));
3333                 return MONITOR_ELECTION_NEEDED;
3334         }
3335
3336         /* Verify that the master node has not been deleted.  This
3337          * should not happen because a node should always be shutdown
3338          * before being deleted, causing a new master to be elected
3339          * before now.  However, if something strange has happened
3340          * then checking here will ensure we don't index beyond the
3341          * end of the nodemap array. */
3342         if (rec->recmaster >= nodemap->num) {
3343                 DEBUG(DEBUG_ERR,
3344                       ("Recmaster node %u has been deleted. Force election\n",
3345                        rec->recmaster));
3346                 return MONITOR_ELECTION_NEEDED;
3347         }
3348
3349         /* if recovery master is disconnected/deleted we must elect a new recmaster */
3350         if (nodemap->nodes[rec->recmaster].flags &
3351             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
3352                 DEBUG(DEBUG_NOTICE,
3353                       ("Recmaster node %u is disconnected/deleted. Force election\n",
3354                        rec->recmaster));
3355                 return MONITOR_ELECTION_NEEDED;
3356         }
3357
3358         /* get nodemap from the recovery master to check if it is inactive */
3359         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
3360                                    mem_ctx, &recmaster_nodemap);
3361         if (ret != 0) {
3362                 DEBUG(DEBUG_ERR,
3363                       (__location__
3364                        " Unable to get nodemap from recovery master %u\n",
3365                           rec->recmaster));
3366                 return MONITOR_FAILED;
3367         }
3368
3369
3370         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
3371             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3372                 DEBUG(DEBUG_NOTICE,
3373                       ("Recmaster node %u is inactive. Force election\n",
3374                        rec->recmaster));
3375                 /*
3376                  * update our nodemap to carry the recmaster's notion of
3377                  * its own flags, so that we don't keep freezing the
3378                  * inactive recmaster node...
3379                  */
3380                 nodemap->nodes[rec->recmaster].flags =
3381                         recmaster_nodemap->nodes[rec->recmaster].flags;
3382                 return MONITOR_ELECTION_NEEDED;
3383         }
3384
3385         return MONITOR_OK;
3386 }
3387
3388 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3389                       TALLOC_CTX *mem_ctx)
3390 {
3391         uint32_t pnn;
3392         struct ctdb_node_map_old *nodemap=NULL;
3393         struct ctdb_node_map_old **remote_nodemaps=NULL;
3394         struct ctdb_vnn_map *vnnmap=NULL;
3395         struct ctdb_vnn_map *remote_vnnmap=NULL;
3396         uint32_t num_lmasters;
3397         int32_t debug_level;
3398         int i, j, ret;
3399         bool self_ban;
3400
3401
3402         /* verify that the main daemon is still running */
3403         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3404                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3405                 exit(-1);
3406         }
3407
3408         /* ping the local daemon to tell it we are alive */
3409         ctdb_ctrl_recd_ping(ctdb);
3410
3411         if (rec->election_timeout) {
3412                 /* an election is in progress */
3413                 return;
3414         }
3415
3416         /* read the debug level from the parent and update locally */
3417         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3418         if (ret !=0) {
3419                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3420                 return;
3421         }
3422         DEBUGLEVEL = debug_level;
3423
3424         /* get relevant tunables */
3425         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3426         if (ret != 0) {
3427                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3428                 return;
3429         }
3430
3431         /* get runstate */
3432         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3433                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3434         if (ret != 0) {
3435                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3436                 return;
3437         }
3438
3439         /* get the current recovery lock file from the server */
3440         if (update_recovery_lock_file(ctdb) != 0) {
3441                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3442                 return;
3443         }
3444
3445         pnn = ctdb_get_pnn(ctdb);
3446
3447         /* get nodemap */
3448         TALLOC_FREE(rec->nodemap);
3449         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3450         if (ret != 0) {
3451                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3452                 return;
3453         }
3454         nodemap = rec->nodemap;
3455
3456         /* remember our own node flags */
3457         rec->node_flags = nodemap->nodes[pnn].flags;
3458
3459         ban_misbehaving_nodes(rec, &self_ban);
3460         if (self_ban) {
3461                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3462                 return;
3463         }
3464
3465         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3466            also frozen and that the recmode is set to active.
3467         */
3468         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3469                 /* If this node has become inactive then we want to
3470                  * reduce the chances of it taking over the recovery
3471                  * master role when it becomes active again.  This
3472                  * helps to stabilise the recovery master role so that
3473                  * it stays on the most stable node.
3474                  */
3475                 rec->priority_time = timeval_current();
3476
3477                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3478                 if (ret != 0) {
3479                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3480                 }
3481                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3482                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3483
3484                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3485                         if (ret != 0) {
3486                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3487
3488                                 return;
3489                         }
3490                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3491                         if (ret != 0) {
3492                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3493                                 return;
3494                         }
3495                 }
3496
3497                 /* If this node is stopped or banned then it is not the recovery
3498                  * master, so don't do anything. This prevents stopped or banned
3499                  * node from starting election and sending unnecessary controls.
3500                  */
3501                 return;
3502         }
3503
3504         /* If we are not the recmaster then do some housekeeping */
3505         if (rec->recmaster != pnn) {
3506                 /* Ignore any IP reallocate requests - only recmaster
3507                  * processes them
3508                  */
3509                 TALLOC_FREE(rec->reallocate_requests);
3510                 /* Clear any nodes that should be force rebalanced in
3511                  * the next takeover run.  If the recovery master role
3512                  * has moved then we don't want to process these some
3513                  * time in the future.
3514                  */
3515                 TALLOC_FREE(rec->force_rebalance_nodes);
3516         }
3517
3518         /* Retrieve capabilities from all connected nodes */
3519         ret = update_capabilities(rec, nodemap);
3520         if (ret != 0) {
3521                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3522                 return;
3523         }
3524
3525         switch (validate_recovery_master(rec, mem_ctx)) {
3526         case MONITOR_RECOVERY_NEEDED:
3527                 /* can not happen */
3528                 return;
3529         case MONITOR_ELECTION_NEEDED:
3530                 force_election(rec, pnn, nodemap);
3531                 return;
3532         case MONITOR_OK:
3533                 break;
3534         case MONITOR_FAILED:
3535                 return;
3536         }
3537
3538         /* verify that we have all ip addresses we should have and we dont
3539          * have addresses we shouldnt have.
3540          */ 
3541         if (ctdb->tunable.disable_ip_failover == 0 &&
3542             !ctdb_op_is_disabled(rec->takeover_run)) {
3543                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3544                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3545                 }
3546         }
3547
3548
3549         /* if we are not the recmaster then we do not need to check
3550            if recovery is needed
3551          */
3552         if (pnn != rec->recmaster) {
3553                 return;
3554         }
3555
3556
3557         /* ensure our local copies of flags are right */
3558         ret = update_local_flags(rec, nodemap);
3559         if (ret == MONITOR_ELECTION_NEEDED) {
3560                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3561                 force_election(rec, pnn, nodemap);
3562                 return;
3563         }
3564         if (ret != MONITOR_OK) {
3565                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3566                 return;
3567         }
3568
3569         if (ctdb->num_nodes != nodemap->num) {
3570                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3571                 ctdb_load_nodes_file(ctdb);
3572                 return;
3573         }
3574
3575         /* verify that all active nodes agree that we are the recmaster */
3576         switch (verify_recmaster(rec, nodemap, pnn)) {
3577         case MONITOR_RECOVERY_NEEDED:
3578                 /* can not happen */
3579                 return;
3580         case MONITOR_ELECTION_NEEDED:
3581                 force_election(rec, pnn, nodemap);
3582                 return;
3583         case MONITOR_OK:
3584                 break;
3585         case MONITOR_FAILED:
3586                 return;
3587         }
3588
3589
3590         /* get the vnnmap */
3591         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3592         if (ret != 0) {
3593                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3594                 return;
3595         }
3596
3597         if (rec->need_recovery) {
3598                 /* a previous recovery didn't finish */
3599                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3600                 return;
3601         }
3602
3603         /* verify that all active nodes are in normal mode 
3604            and not in recovery mode 
3605         */
3606         switch (verify_recmode(ctdb, nodemap)) {
3607         case MONITOR_RECOVERY_NEEDED:
3608                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3609                 return;
3610         case MONITOR_FAILED:
3611                 return;
3612         case MONITOR_ELECTION_NEEDED:
3613                 /* can not happen */
3614         case MONITOR_OK:
3615                 break;
3616         }
3617
3618
3619         if (ctdb->recovery_lock_file != NULL) {
3620                 /* We must already hold the recovery lock */
3621                 if (!ctdb_recovery_have_lock(ctdb)) {
3622                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3623                         ctdb_set_culprit(rec, ctdb->pnn);
3624                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3625                         return;
3626                 }
3627         }
3628
3629
3630         /* if there are takeovers requested, perform it and notify the waiters */
3631         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3632             rec->reallocate_requests) {
3633                 process_ipreallocate_requests(ctdb, rec);
3634         }
3635
3636         /* If recoveries are disabled then there is no use doing any
3637          * nodemap or flags checks.  Recoveries might be disabled due
3638          * to "reloadnodes", so doing these checks might cause an
3639          * unnecessary recovery.  */
3640         if (ctdb_op_is_disabled(rec->recovery)) {
3641                 return;
3642         }
3643
3644         /* get the nodemap for all active remote nodes
3645          */
3646         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3647         if (remote_nodemaps == NULL) {
3648                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3649                 return;
3650         }
3651         for(i=0; i<nodemap->num; i++) {
3652                 remote_nodemaps[i] = NULL;
3653         }
3654         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3655                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3656                 return;
3657         } 
3658
3659         /* verify that all other nodes have the same nodemap as we have
3660         */
3661         for (j=0; j<nodemap->num; j++) {
3662                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3663                         continue;
3664                 }
3665
3666                 if (remote_nodemaps[j] == NULL) {
3667                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3668                         ctdb_set_culprit(rec, j);
3669
3670                         return;
3671                 }
3672
3673                 /* if the nodes disagree on how many nodes there are
3674                    then this is a good reason to try recovery
3675                  */
3676                 if (remote_nodemaps[j]->num != nodemap->num) {
3677                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3678                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3679                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3680                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3681                         return;
3682                 }
3683
3684                 /* if the nodes disagree on which nodes exist and are
3685                    active, then that is also a good reason to do recovery
3686                  */
3687                 for (i=0;i<nodemap->num;i++) {
3688                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3689                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3690                                           nodemap->nodes[j].pnn, i, 
3691                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3692                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3693                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3694                                             vnnmap);
3695                                 return;
3696                         }
3697                 }
3698         }
3699
3700         /*
3701          * Update node flags obtained from each active node. This ensure we have
3702          * up-to-date information for all the nodes.
3703          */
3704         for (j=0; j<nodemap->num; j++) {
3705                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3706                         continue;
3707                 }
3708                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3709         }
3710
3711         for (j=0; j<nodemap->num; j++) {
3712                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3713                         continue;
3714                 }
3715
3716                 /* verify the flags are consistent
3717                 */
3718                 for (i=0; i<nodemap->num; i++) {
3719                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3720                                 continue;
3721                         }
3722                         
3723                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3724                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3725                                   nodemap->nodes[j].pnn, 
3726                                   nodemap->nodes[i].pnn, 
3727                                   remote_nodemaps[j]->nodes[i].flags,
3728                                   nodemap->nodes[i].flags));
3729                                 if (i == j) {
3730                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3731                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3732                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3733                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3734                                                     vnnmap);
3735                                         return;
3736                                 } else {
3737                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3738                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3739                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3740                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3741                                                     vnnmap);
3742                                         return;
3743                                 }
3744                         }
3745                 }
3746         }
3747
3748
3749         /* count how many active nodes there are */
3750         num_lmasters  = 0;
3751         for (i=0; i<nodemap->num; i++) {
3752                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3753                         if (ctdb_node_has_capabilities(rec->caps,
3754                                                        ctdb->nodes[i]->pnn,
3755                                                        CTDB_CAP_LMASTER)) {
3756                                 num_lmasters++;
3757                         }
3758                 }
3759         }
3760
3761
3762         /* There must be the same number of lmasters in the vnn map as
3763          * there are active nodes with the lmaster capability...  or
3764          * do a recovery.
3765          */
3766         if (vnnmap->size != num_lmasters) {
3767                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3768                           vnnmap->size, num_lmasters));
3769                 ctdb_set_culprit(rec, ctdb->pnn);
3770                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3771                 return;
3772         }
3773
3774         /* verify that all active nodes in the nodemap also exist in 
3775            the vnnmap.
3776          */
3777         for (j=0; j<nodemap->num; j++) {
3778                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3779                         continue;
3780                 }
3781                 if (nodemap->nodes[j].pnn == pnn) {
3782                         continue;
3783                 }
3784
3785                 for (i=0; i<vnnmap->size; i++) {
3786                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3787                                 break;
3788                         }
3789                 }
3790                 if (i == vnnmap->size) {
3791                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3792                                   nodemap->nodes[j].pnn));
3793                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3794                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3795                         return;
3796                 }
3797         }
3798
3799         
3800         /* verify that all other nodes have the same vnnmap
3801            and are from the same generation
3802          */
3803         for (j=0; j<nodemap->num; j++) {
3804                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3805                         continue;
3806                 }
3807                 if (nodemap->nodes[j].pnn == pnn) {
3808                         continue;
3809                 }
3810
3811                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3812                                           mem_ctx, &remote_vnnmap);
3813                 if (ret != 0) {
3814                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3815                                   nodemap->nodes[j].pnn));
3816                         return;
3817                 }
3818
3819                 /* verify the vnnmap generation is the same */
3820                 if (vnnmap->generation != remote_vnnmap->generation) {
3821                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3822                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3823                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3824                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3825                         return;
3826                 }
3827
3828                 /* verify the vnnmap size is the same */
3829                 if (vnnmap->size != remote_vnnmap->size) {
3830                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3831                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3832                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3833                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3834                         return;
3835                 }
3836
3837                 /* verify the vnnmap is the same */
3838                 for (i=0;i<vnnmap->size;i++) {
3839                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3840                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3841                                           nodemap->nodes[j].pnn));
3842                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3843                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3844                                             vnnmap);
3845                                 return;
3846                         }
3847                 }
3848         }
3849
3850         /* we might need to change who has what IP assigned */
3851         if (rec->need_takeover_run) {
3852                 /* If takeover run fails, then the offending nodes are
3853                  * assigned ban culprit counts. And we re-try takeover.
3854                  * If takeover run fails repeatedly, the node would get
3855                  * banned.
3856                  */
3857                 do_takeover_run(rec, nodemap, true);
3858         }
3859 }
3860
3861 /*
3862   the main monitoring loop
3863  */
3864 static void monitor_cluster(struct ctdb_context *ctdb)
3865 {
3866         struct ctdb_recoverd *rec;
3867
3868         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3869
3870         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3871         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3872
3873         rec->ctdb = ctdb;
3874         rec->recmaster = CTDB_UNKNOWN_PNN;
3875
3876         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3877         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3878
3879         rec->recovery = ctdb_op_init(rec, "recoveries");
3880         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3881
3882         rec->priority_time = timeval_current();
3883
3884         /* register a message port for sending memory dumps */
3885         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3886
3887         /* register a message port for recovery elections */
3888         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3889
3890         /* when nodes are disabled/enabled */
3891         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3892
3893         /* when we are asked to puch out a flag change */
3894         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3895
3896         /* register a message port for vacuum fetch */
3897         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3898
3899         /* register a message port for reloadnodes  */
3900         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3901
3902         /* register a message port for performing a takeover run */
3903         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3904
3905         /* register a message port for disabling the ip check for a short while */
3906         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3907
3908         /* register a message port for updating the recovery daemons node assignment for an ip */
3909         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3910
3911         /* register a message port for forcing a rebalance of a node next
3912            reallocation */
3913         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3914
3915         /* Register a message port for disabling takeover runs */
3916         ctdb_client_set_message_handler(ctdb,
3917                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3918                                         disable_takeover_runs_handler, rec);
3919
3920         /* Register a message port for disabling recoveries */
3921         ctdb_client_set_message_handler(ctdb,
3922                                         CTDB_SRVID_DISABLE_RECOVERIES,
3923                                         disable_recoveries_handler, rec);
3924
3925         /* register a message port for detaching database */
3926         ctdb_client_set_message_handler(ctdb,
3927                                         CTDB_SRVID_DETACH_DATABASE,
3928                                         detach_database_handler, rec);
3929
3930         for (;;) {
3931                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3932                 struct timeval start;
3933                 double elapsed;
3934
3935                 if (!mem_ctx) {
3936                         DEBUG(DEBUG_CRIT,(__location__
3937                                           " Failed to create temp context\n"));
3938                         exit(-1);
3939                 }
3940
3941                 start = timeval_current();
3942                 main_loop(ctdb, rec, mem_ctx);
3943                 talloc_free(mem_ctx);
3944
3945                 /* we only check for recovery once every second */
3946                 elapsed = timeval_elapsed(&start);
3947                 if (elapsed < ctdb->tunable.recover_interval) {
3948                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3949                                           - elapsed);
3950                 }
3951         }
3952 }
3953
3954 /*
3955   event handler for when the main ctdbd dies
3956  */
3957 static void ctdb_recoverd_parent(struct tevent_context *ev,
3958                                  struct tevent_fd *fde,
3959                                  uint16_t flags, void *private_data)
3960 {
3961         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3962         _exit(1);
3963 }
3964
3965 /*
3966   called regularly to verify that the recovery daemon is still running
3967  */
3968 static void ctdb_check_recd(struct tevent_context *ev,
3969                             struct tevent_timer *te,
3970                             struct timeval yt, void *p)
3971 {
3972         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3973
3974         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3975                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3976
3977                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3978                                  ctdb_restart_recd, ctdb);
3979
3980                 return;
3981         }
3982
3983         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3984                          timeval_current_ofs(30, 0),
3985                          ctdb_check_recd, ctdb);
3986 }
3987
3988 static void recd_sig_child_handler(struct tevent_context *ev,
3989                                    struct tevent_signal *se, int signum,
3990                                    int count, void *dont_care,
3991                                    void *private_data)
3992 {
3993 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3994         int status;
3995         pid_t pid = -1;
3996
3997         while (pid != 0) {
3998                 pid = waitpid(-1, &status, WNOHANG);
3999                 if (pid == -1) {
4000                         if (errno != ECHILD) {
4001                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4002                         }
4003                         return;
4004                 }
4005                 if (pid > 0) {
4006                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4007                 }
4008         }
4009 }
4010
4011 /*
4012   startup the recovery daemon as a child of the main ctdb daemon
4013  */
4014 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4015 {
4016         int fd[2];
4017         struct tevent_signal *se;
4018         struct tevent_fd *fde;
4019
4020         if (pipe(fd) != 0) {
4021                 return -1;
4022         }
4023
4024         ctdb->recoverd_pid = ctdb_fork(ctdb);
4025         if (ctdb->recoverd_pid == -1) {
4026                 return -1;
4027         }
4028
4029         if (ctdb->recoverd_pid != 0) {
4030                 talloc_free(ctdb->recd_ctx);
4031                 ctdb->recd_ctx = talloc_new(ctdb);
4032                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4033
4034                 close(fd[0]);
4035                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4036                                  timeval_current_ofs(30, 0),
4037                                  ctdb_check_recd, ctdb);
4038                 return 0;
4039         }
4040
4041         close(fd[1]);
4042
4043         srandom(getpid() ^ time(NULL));
4044
4045         prctl_set_comment("ctdb_recovered");
4046         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4047                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4048                 exit(1);
4049         }
4050
4051         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4052
4053         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4054                             ctdb_recoverd_parent, &fd[0]);
4055         tevent_fd_set_auto_close(fde);
4056
4057         /* set up a handler to pick up sigchld */
4058         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4059                                recd_sig_child_handler, ctdb);
4060         if (se == NULL) {
4061                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4062                 exit(1);
4063         }
4064
4065         monitor_cluster(ctdb);
4066
4067         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4068         return -1;
4069 }
4070
4071 /*
4072   shutdown the recovery daemon
4073  */
4074 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4075 {
4076         if (ctdb->recoverd_pid == 0) {
4077                 return;
4078         }
4079
4080         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4081         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4082
4083         TALLOC_FREE(ctdb->recd_ctx);
4084         TALLOC_FREE(ctdb->recd_ping_count);
4085 }
4086
4087 static void ctdb_restart_recd(struct tevent_context *ev,
4088                               struct tevent_timer *te,
4089                               struct timeval t, void *private_data)
4090 {
4091         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4092
4093         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4094         ctdb_stop_recoverd(ctdb);
4095         ctdb_start_recoverd(ctdb);
4096 }