dd6e8b93d0b22836bb7b31921be429aa6bf78541
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38
39 #include "common/system.h"
40 #include "common/cmdline.h"
41 #include "common/common.h"
42 #include "common/logging.h"
43
44
45 /* List of SRVID requests that need to be processed */
46 struct srvid_list {
47         struct srvid_list *next, *prev;
48         struct ctdb_srvid_message *request;
49 };
50
51 struct srvid_requests {
52         struct srvid_list *requests;
53 };
54
55 static void srvid_request_reply(struct ctdb_context *ctdb,
56                                 struct ctdb_srvid_message *request,
57                                 TDB_DATA result)
58 {
59         /* Someone that sent srvid==0 does not want a reply */
60         if (request->srvid == 0) {
61                 talloc_free(request);
62                 return;
63         }
64
65         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
66                                      result) == 0) {
67                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
68                                   (unsigned)request->pnn,
69                                   (unsigned long long)request->srvid));
70         } else {
71                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
72                                  (unsigned)request->pnn,
73                                  (unsigned long long)request->srvid));
74         }
75
76         talloc_free(request);
77 }
78
79 static void srvid_requests_reply(struct ctdb_context *ctdb,
80                                  struct srvid_requests **requests,
81                                  TDB_DATA result)
82 {
83         struct srvid_list *r;
84
85         for (r = (*requests)->requests; r != NULL; r = r->next) {
86                 srvid_request_reply(ctdb, r->request, result);
87         }
88
89         /* Free the list structure... */
90         TALLOC_FREE(*requests);
91 }
92
93 static void srvid_request_add(struct ctdb_context *ctdb,
94                               struct srvid_requests **requests,
95                               struct ctdb_srvid_message *request)
96 {
97         struct srvid_list *t;
98         int32_t ret;
99         TDB_DATA result;
100
101         if (*requests == NULL) {
102                 *requests = talloc_zero(ctdb, struct srvid_requests);
103                 if (*requests == NULL) {
104                         goto nomem;
105                 }
106         }
107
108         t = talloc_zero(*requests, struct srvid_list);
109         if (t == NULL) {
110                 /* If *requests was just allocated above then free it */
111                 if ((*requests)->requests == NULL) {
112                         TALLOC_FREE(*requests);
113                 }
114                 goto nomem;
115         }
116
117         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
118         DLIST_ADD((*requests)->requests, t);
119
120         return;
121
122 nomem:
123         /* Failed to add the request to the list.  Send a fail. */
124         DEBUG(DEBUG_ERR, (__location__
125                           " Out of memory, failed to queue SRVID request\n"));
126         ret = -ENOMEM;
127         result.dsize = sizeof(ret);
128         result.dptr = (uint8_t *)&ret;
129         srvid_request_reply(ctdb, request, result);
130 }
131
132 /* An abstraction to allow an operation (takeover runs, recoveries,
133  * ...) to be disabled for a given timeout */
134 struct ctdb_op_state {
135         struct tevent_timer *timer;
136         bool in_progress;
137         const char *name;
138 };
139
140 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
141 {
142         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
143
144         if (state != NULL) {
145                 state->in_progress = false;
146                 state->name = name;
147         }
148
149         return state;
150 }
151
152 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
153 {
154         return state->timer != NULL;
155 }
156
157 static bool ctdb_op_begin(struct ctdb_op_state *state)
158 {
159         if (ctdb_op_is_disabled(state)) {
160                 DEBUG(DEBUG_NOTICE,
161                       ("Unable to begin - %s are disabled\n", state->name));
162                 return false;
163         }
164
165         state->in_progress = true;
166         return true;
167 }
168
169 static bool ctdb_op_end(struct ctdb_op_state *state)
170 {
171         return state->in_progress = false;
172 }
173
174 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
175 {
176         return state->in_progress;
177 }
178
179 static void ctdb_op_enable(struct ctdb_op_state *state)
180 {
181         TALLOC_FREE(state->timer);
182 }
183
184 static void ctdb_op_timeout_handler(struct tevent_context *ev,
185                                     struct tevent_timer *te,
186                                     struct timeval yt, void *p)
187 {
188         struct ctdb_op_state *state =
189                 talloc_get_type(p, struct ctdb_op_state);
190
191         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
192         ctdb_op_enable(state);
193 }
194
195 static int ctdb_op_disable(struct ctdb_op_state *state,
196                            struct tevent_context *ev,
197                            uint32_t timeout)
198 {
199         if (timeout == 0) {
200                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
201                 ctdb_op_enable(state);
202                 return 0;
203         }
204
205         if (state->in_progress) {
206                 DEBUG(DEBUG_ERR,
207                       ("Unable to disable %s - in progress\n", state->name));
208                 return -EAGAIN;
209         }
210
211         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
212                             state->name, timeout));
213
214         /* Clear any old timers */
215         talloc_free(state->timer);
216
217         /* Arrange for the timeout to occur */
218         state->timer = tevent_add_timer(ev, state,
219                                         timeval_current_ofs(timeout, 0),
220                                         ctdb_op_timeout_handler, state);
221         if (state->timer == NULL) {
222                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
223                 return -ENOMEM;
224         }
225
226         return 0;
227 }
228
229 struct ctdb_banning_state {
230         uint32_t count;
231         struct timeval last_reported_time;
232 };
233
234 /*
235   private state of recovery daemon
236  */
237 struct ctdb_recoverd {
238         struct ctdb_context *ctdb;
239         uint32_t recmaster;
240         uint32_t last_culprit_node;
241         struct ctdb_node_map_old *nodemap;
242         struct timeval priority_time;
243         bool need_takeover_run;
244         bool need_recovery;
245         uint32_t node_flags;
246         struct tevent_timer *send_election_te;
247         struct tevent_timer *election_timeout;
248         struct srvid_requests *reallocate_requests;
249         struct ctdb_op_state *takeover_run;
250         struct ctdb_op_state *recovery;
251         struct ctdb_iface_list_old *ifaces;
252         uint32_t *force_rebalance_nodes;
253         struct ctdb_node_capabilities *caps;
254 };
255
256 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
257 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
258
259 static void ctdb_restart_recd(struct tevent_context *ev,
260                               struct tevent_timer *te, struct timeval t,
261                               void *private_data);
262
263 /*
264   ban a node for a period of time
265  */
266 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
267 {
268         int ret;
269         struct ctdb_context *ctdb = rec->ctdb;
270         struct ctdb_ban_state bantime;
271
272         if (!ctdb_validate_pnn(ctdb, pnn)) {
273                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
274                 return;
275         }
276
277         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
278
279         bantime.pnn  = pnn;
280         bantime.time = ban_time;
281
282         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
283         if (ret != 0) {
284                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
285                 return;
286         }
287
288 }
289
290 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
291
292
293 /*
294   remember the trouble maker
295  */
296 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
297 {
298         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
299         struct ctdb_banning_state *ban_state;
300
301         if (culprit > ctdb->num_nodes) {
302                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
303                 return;
304         }
305
306         /* If we are banned or stopped, do not set other nodes as culprits */
307         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
308                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
309                 return;
310         }
311
312         if (ctdb->nodes[culprit]->ban_state == NULL) {
313                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
314                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
315
316                 
317         }
318         ban_state = ctdb->nodes[culprit]->ban_state;
319         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
320                 /* this was the first time in a long while this node
321                    misbehaved so we will forgive any old transgressions.
322                 */
323                 ban_state->count = 0;
324         }
325
326         ban_state->count += count;
327         ban_state->last_reported_time = timeval_current();
328         rec->last_culprit_node = culprit;
329 }
330
331 /*
332   remember the trouble maker
333  */
334 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
335 {
336         ctdb_set_culprit_count(rec, culprit, 1);
337 }
338
339
340 /* this callback is called for every node that failed to execute the
341    recovered event
342 */
343 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
344 {
345         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
346
347         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
348
349         ctdb_set_culprit(rec, node_pnn);
350 }
351
352 /*
353   run the "recovered" eventscript on all nodes
354  */
355 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
356 {
357         TALLOC_CTX *tmp_ctx;
358         uint32_t *nodes;
359         struct ctdb_context *ctdb = rec->ctdb;
360
361         tmp_ctx = talloc_new(ctdb);
362         CTDB_NO_MEMORY(ctdb, tmp_ctx);
363
364         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
365         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
366                                         nodes, 0,
367                                         CONTROL_TIMEOUT(), false, tdb_null,
368                                         NULL, recovered_fail_callback,
369                                         rec) != 0) {
370                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
371
372                 talloc_free(tmp_ctx);
373                 return -1;
374         }
375
376         talloc_free(tmp_ctx);
377         return 0;
378 }
379
380 /* this callback is called for every node that failed to execute the
381    start recovery event
382 */
383 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
384 {
385         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
386
387         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
388
389         ctdb_set_culprit(rec, node_pnn);
390 }
391
392 /*
393   run the "startrecovery" eventscript on all nodes
394  */
395 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
396 {
397         TALLOC_CTX *tmp_ctx;
398         uint32_t *nodes;
399         struct ctdb_context *ctdb = rec->ctdb;
400
401         tmp_ctx = talloc_new(ctdb);
402         CTDB_NO_MEMORY(ctdb, tmp_ctx);
403
404         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
406                                         nodes, 0,
407                                         CONTROL_TIMEOUT(), false, tdb_null,
408                                         NULL,
409                                         startrecovery_fail_callback,
410                                         rec) != 0) {
411                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
412                 talloc_free(tmp_ctx);
413                 return -1;
414         }
415
416         talloc_free(tmp_ctx);
417         return 0;
418 }
419
420 /*
421   Retrieve capabilities from all connected nodes
422  */
423 static int update_capabilities(struct ctdb_recoverd *rec,
424                                struct ctdb_node_map_old *nodemap)
425 {
426         uint32_t *capp;
427         TALLOC_CTX *tmp_ctx;
428         struct ctdb_node_capabilities *caps;
429         struct ctdb_context *ctdb = rec->ctdb;
430
431         tmp_ctx = talloc_new(rec);
432         CTDB_NO_MEMORY(ctdb, tmp_ctx);
433
434         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
435                                      CONTROL_TIMEOUT(), nodemap);
436
437         if (caps == NULL) {
438                 DEBUG(DEBUG_ERR,
439                       (__location__ " Failed to get node capabilities\n"));
440                 talloc_free(tmp_ctx);
441                 return -1;
442         }
443
444         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
445         if (capp == NULL) {
446                 DEBUG(DEBUG_ERR,
447                       (__location__
448                        " Capabilities don't include current node.\n"));
449                 talloc_free(tmp_ctx);
450                 return -1;
451         }
452         ctdb->capabilities = *capp;
453
454         TALLOC_FREE(rec->caps);
455         rec->caps = talloc_steal(rec, caps);
456
457         talloc_free(tmp_ctx);
458         return 0;
459 }
460
461 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
462 {
463         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
464
465         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
466         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
467 }
468
469 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
470 {
471         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
472
473         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
474         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
475 }
476
477 /*
478   change recovery mode on all nodes
479  */
480 static int set_recovery_mode(struct ctdb_context *ctdb,
481                              struct ctdb_recoverd *rec,
482                              struct ctdb_node_map_old *nodemap,
483                              uint32_t rec_mode, bool freeze)
484 {
485         TDB_DATA data;
486         uint32_t *nodes;
487         TALLOC_CTX *tmp_ctx;
488
489         tmp_ctx = talloc_new(ctdb);
490         CTDB_NO_MEMORY(ctdb, tmp_ctx);
491
492         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
493
494         data.dsize = sizeof(uint32_t);
495         data.dptr = (unsigned char *)&rec_mode;
496
497         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
498                                         nodes, 0,
499                                         CONTROL_TIMEOUT(),
500                                         false, data,
501                                         NULL, NULL,
502                                         NULL) != 0) {
503                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
504                 talloc_free(tmp_ctx);
505                 return -1;
506         }
507
508         /* freeze all nodes */
509         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
510                 int i;
511
512                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
513                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
514                                                 nodes, i,
515                                                 CONTROL_TIMEOUT(),
516                                                 false, tdb_null,
517                                                 NULL,
518                                                 set_recmode_fail_callback,
519                                                 rec) != 0) {
520                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
521                                 talloc_free(tmp_ctx);
522                                 return -1;
523                         }
524                 }
525         }
526
527         talloc_free(tmp_ctx);
528         return 0;
529 }
530
531 /* update all remote nodes to use the same db priority that we have
532    this can fail if the remove node has not yet been upgraded to 
533    support this function, so we always return success and never fail
534    a recovery if this call fails.
535 */
536 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
537         struct ctdb_node_map_old *nodemap, 
538         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
539 {
540         int db;
541
542         /* step through all local databases */
543         for (db=0; db<dbmap->num;db++) {
544                 struct ctdb_db_priority db_prio;
545                 int ret;
546
547                 db_prio.db_id     = dbmap->dbs[db].db_id;
548                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
549                 if (ret != 0) {
550                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
551                         continue;
552                 }
553
554                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
555
556                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
557                                                 CTDB_CURRENT_NODE, &db_prio);
558                 if (ret != 0) {
559                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
560                                          db_prio.db_id));
561                 }
562         }
563
564         return 0;
565 }                       
566
567 /*
568   ensure all other nodes have attached to any databases that we have
569  */
570 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
571                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
572 {
573         int i, j, db, ret;
574         struct ctdb_dbid_map_old *remote_dbmap;
575
576         /* verify that all other nodes have all our databases */
577         for (j=0; j<nodemap->num; j++) {
578                 /* we don't need to ourself ourselves */
579                 if (nodemap->nodes[j].pnn == pnn) {
580                         continue;
581                 }
582                 /* don't check nodes that are unavailable */
583                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
584                         continue;
585                 }
586
587                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
588                                          mem_ctx, &remote_dbmap);
589                 if (ret != 0) {
590                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
591                         return -1;
592                 }
593
594                 /* step through all local databases */
595                 for (db=0; db<dbmap->num;db++) {
596                         const char *name;
597
598
599                         for (i=0;i<remote_dbmap->num;i++) {
600                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
601                                         break;
602                                 }
603                         }
604                         /* the remote node already have this database */
605                         if (i!=remote_dbmap->num) {
606                                 continue;
607                         }
608                         /* ok so we need to create this database */
609                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
610                                                   dbmap->dbs[db].db_id, mem_ctx,
611                                                   &name);
612                         if (ret != 0) {
613                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
614                                 return -1;
615                         }
616                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
617                                                  nodemap->nodes[j].pnn,
618                                                  mem_ctx, name,
619                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
620                         if (ret != 0) {
621                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
622                                 return -1;
623                         }
624                 }
625         }
626
627         return 0;
628 }
629
630
631 /*
632   ensure we are attached to any databases that anyone else is attached to
633  */
634 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
635                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
636 {
637         int i, j, db, ret;
638         struct ctdb_dbid_map_old *remote_dbmap;
639
640         /* verify that we have all database any other node has */
641         for (j=0; j<nodemap->num; j++) {
642                 /* we don't need to ourself ourselves */
643                 if (nodemap->nodes[j].pnn == pnn) {
644                         continue;
645                 }
646                 /* don't check nodes that are unavailable */
647                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
648                         continue;
649                 }
650
651                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
652                                          mem_ctx, &remote_dbmap);
653                 if (ret != 0) {
654                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
655                         return -1;
656                 }
657
658                 /* step through all databases on the remote node */
659                 for (db=0; db<remote_dbmap->num;db++) {
660                         const char *name;
661
662                         for (i=0;i<(*dbmap)->num;i++) {
663                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
664                                         break;
665                                 }
666                         }
667                         /* we already have this db locally */
668                         if (i!=(*dbmap)->num) {
669                                 continue;
670                         }
671                         /* ok so we need to create this database and
672                            rebuild dbmap
673                          */
674                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
675                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
676                         if (ret != 0) {
677                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
678                                           nodemap->nodes[j].pnn));
679                                 return -1;
680                         }
681                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
682                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
683                         if (ret != 0) {
684                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
685                                 return -1;
686                         }
687                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
688                         if (ret != 0) {
689                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
690                                 return -1;
691                         }
692                 }
693         }
694
695         return 0;
696 }
697
698
699 /*
700   pull the remote database contents from one node into the recdb
701  */
702 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
703                                     struct tdb_wrap *recdb, uint32_t dbid)
704 {
705         int ret;
706         TDB_DATA outdata;
707         struct ctdb_marshall_buffer *reply;
708         struct ctdb_rec_data_old *recdata;
709         int i;
710         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
711
712         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
713                                CONTROL_TIMEOUT(), &outdata);
714         if (ret != 0) {
715                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
716                 talloc_free(tmp_ctx);
717                 return -1;
718         }
719
720         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
721
722         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
723                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
724                 talloc_free(tmp_ctx);
725                 return -1;
726         }
727
728         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
729
730         for (i=0;
731              i<reply->count;
732              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
733                 TDB_DATA key, data;
734                 struct ctdb_ltdb_header *hdr;
735                 TDB_DATA existing;
736
737                 key.dptr = &recdata->data[0];
738                 key.dsize = recdata->keylen;
739                 data.dptr = &recdata->data[key.dsize];
740                 data.dsize = recdata->datalen;
741
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743
744                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
745                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
746                         talloc_free(tmp_ctx);
747                         return -1;
748                 }
749
750                 /* fetch the existing record, if any */
751                 existing = tdb_fetch(recdb->tdb, key);
752
753                 if (existing.dptr != NULL) {
754                         struct ctdb_ltdb_header header;
755                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
756                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
757                                          (unsigned)existing.dsize, srcnode));
758                                 free(existing.dptr);
759                                 talloc_free(tmp_ctx);
760                                 return -1;
761                         }
762                         header = *(struct ctdb_ltdb_header *)existing.dptr;
763                         free(existing.dptr);
764                         if (!(header.rsn < hdr->rsn ||
765                               (header.dmaster != ctdb_get_pnn(ctdb) &&
766                                header.rsn == hdr->rsn))) {
767                                 continue;
768                         }
769                 }
770
771                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
772                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
773                         talloc_free(tmp_ctx);
774                         return -1;
775                 }
776         }
777
778         talloc_free(tmp_ctx);
779
780         return 0;
781 }
782
783
784 struct pull_seqnum_cbdata {
785         int failed;
786         uint32_t pnn;
787         uint64_t seqnum;
788 };
789
790 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
791 {
792         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
793         uint64_t seqnum;
794
795         if (cb_data->failed != 0) {
796                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
797                 return;
798         }
799
800         if (res != 0) {
801                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
802                 cb_data->failed = 1;
803                 return;
804         }
805
806         if (outdata.dsize != sizeof(uint64_t)) {
807                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
808                 cb_data->failed = -1;
809                 return;
810         }
811
812         seqnum = *((uint64_t *)outdata.dptr);
813
814         if (seqnum > cb_data->seqnum ||
815             (cb_data->pnn == -1 && seqnum == 0)) {
816                 cb_data->seqnum = seqnum;
817                 cb_data->pnn = node_pnn;
818         }
819 }
820
821 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
822 {
823         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
824
825         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
826         cb_data->failed = 1;
827 }
828
829 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
830                                 struct ctdb_recoverd *rec, 
831                                 struct ctdb_node_map_old *nodemap, 
832                                 struct tdb_wrap *recdb, uint32_t dbid)
833 {
834         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
835         uint32_t *nodes;
836         TDB_DATA data;
837         uint32_t outdata[2];
838         struct pull_seqnum_cbdata *cb_data;
839
840         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
841
842         outdata[0] = dbid;
843         outdata[1] = 0;
844
845         data.dsize = sizeof(outdata);
846         data.dptr  = (uint8_t *)&outdata[0];
847
848         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
849         if (cb_data == NULL) {
850                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
851                 talloc_free(tmp_ctx);
852                 return -1;
853         }
854
855         cb_data->failed = 0;
856         cb_data->pnn    = -1;
857         cb_data->seqnum = 0;
858         
859         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
860         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
861                                         nodes, 0,
862                                         CONTROL_TIMEOUT(), false, data,
863                                         pull_seqnum_cb,
864                                         pull_seqnum_fail_cb,
865                                         cb_data) != 0) {
866                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
867
868                 talloc_free(tmp_ctx);
869                 return -1;
870         }
871
872         if (cb_data->failed != 0) {
873                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
874                 talloc_free(tmp_ctx);
875                 return -1;
876         }
877
878         if (cb_data->pnn == -1) {
879                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
880                 talloc_free(tmp_ctx);
881                 return -1;
882         }
883
884         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
885
886         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
887                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
888                 talloc_free(tmp_ctx);
889                 return -1;
890         }
891
892         talloc_free(tmp_ctx);
893         return 0;
894 }
895
896
897 /*
898   pull all the remote database contents into the recdb
899  */
900 static int pull_remote_database(struct ctdb_context *ctdb,
901                                 struct ctdb_recoverd *rec, 
902                                 struct ctdb_node_map_old *nodemap, 
903                                 struct tdb_wrap *recdb, uint32_t dbid,
904                                 bool persistent)
905 {
906         int j;
907
908         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
909                 int ret;
910                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
911                 if (ret == 0) {
912                         return 0;
913                 }
914         }
915
916         /* pull all records from all other nodes across onto this node
917            (this merges based on rsn)
918         */
919         for (j=0; j<nodemap->num; j++) {
920                 /* don't merge from nodes that are unavailable */
921                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
922                         continue;
923                 }
924                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
925                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
926                                  nodemap->nodes[j].pnn));
927                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
928                         return -1;
929                 }
930         }
931         
932         return 0;
933 }
934
935
936 /*
937   update flags on all active nodes
938  */
939 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
940 {
941         int ret;
942
943         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
944                 if (ret != 0) {
945                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
946                 return -1;
947         }
948
949         return 0;
950 }
951
952 /*
953   ensure all nodes have the same vnnmap we do
954  */
955 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
956                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
957 {
958         int j, ret;
959
960         /* push the new vnn map out to all the nodes */
961         for (j=0; j<nodemap->num; j++) {
962                 /* don't push to nodes that are unavailable */
963                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
964                         continue;
965                 }
966
967                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
968                 if (ret != 0) {
969                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
970                         return -1;
971                 }
972         }
973
974         return 0;
975 }
976
977
978 /*
979   called when a vacuum fetch has completed - just free it and do the next one
980  */
981 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
982 {
983         talloc_free(state);
984 }
985
986
987 /**
988  * Process one elements of the vacuum fetch list:
989  * Migrate it over to us with the special flag
990  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
991  */
992 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
993                                      uint32_t pnn,
994                                      struct ctdb_rec_data_old *r)
995 {
996         struct ctdb_client_call_state *state;
997         TDB_DATA data;
998         struct ctdb_ltdb_header *hdr;
999         struct ctdb_call call;
1000
1001         ZERO_STRUCT(call);
1002         call.call_id = CTDB_NULL_FUNC;
1003         call.flags = CTDB_IMMEDIATE_MIGRATION;
1004         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1005
1006         call.key.dptr = &r->data[0];
1007         call.key.dsize = r->keylen;
1008
1009         /* ensure we don't block this daemon - just skip a record if we can't get
1010            the chainlock */
1011         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1012                 return true;
1013         }
1014
1015         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1016         if (data.dptr == NULL) {
1017                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1018                 return true;
1019         }
1020
1021         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1022                 free(data.dptr);
1023                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1024                 return true;
1025         }
1026
1027         hdr = (struct ctdb_ltdb_header *)data.dptr;
1028         if (hdr->dmaster == pnn) {
1029                 /* its already local */
1030                 free(data.dptr);
1031                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1032                 return true;
1033         }
1034
1035         free(data.dptr);
1036
1037         state = ctdb_call_send(ctdb_db, &call);
1038         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1039         if (state == NULL) {
1040                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1041                 return false;
1042         }
1043         state->async.fn = vacuum_fetch_callback;
1044         state->async.private_data = NULL;
1045
1046         return true;
1047 }
1048
1049
1050 /*
1051   handler for vacuum fetch
1052 */
1053 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1054                                  void *private_data)
1055 {
1056         struct ctdb_recoverd *rec = talloc_get_type(
1057                 private_data, struct ctdb_recoverd);
1058         struct ctdb_context *ctdb = rec->ctdb;
1059         struct ctdb_marshall_buffer *recs;
1060         int ret, i;
1061         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1062         const char *name;
1063         struct ctdb_dbid_map_old *dbmap=NULL;
1064         bool persistent = false;
1065         struct ctdb_db_context *ctdb_db;
1066         struct ctdb_rec_data_old *r;
1067
1068         recs = (struct ctdb_marshall_buffer *)data.dptr;
1069
1070         if (recs->count == 0) {
1071                 goto done;
1072         }
1073
1074         /* work out if the database is persistent */
1075         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1076         if (ret != 0) {
1077                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1078                 goto done;
1079         }
1080
1081         for (i=0;i<dbmap->num;i++) {
1082                 if (dbmap->dbs[i].db_id == recs->db_id) {
1083                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1084                         break;
1085                 }
1086         }
1087         if (i == dbmap->num) {
1088                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1089                 goto done;
1090         }
1091
1092         /* find the name of this database */
1093         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1094                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1095                 goto done;
1096         }
1097
1098         /* attach to it */
1099         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1100         if (ctdb_db == NULL) {
1101                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1102                 goto done;
1103         }
1104
1105         r = (struct ctdb_rec_data_old *)&recs->data[0];
1106         while (recs->count) {
1107                 bool ok;
1108
1109                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1110                 if (!ok) {
1111                         break;
1112                 }
1113
1114                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1115                 recs->count--;
1116         }
1117
1118 done:
1119         talloc_free(tmp_ctx);
1120 }
1121
1122
1123 /*
1124  * handler for database detach
1125  */
1126 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1127                                     void *private_data)
1128 {
1129         struct ctdb_recoverd *rec = talloc_get_type(
1130                 private_data, struct ctdb_recoverd);
1131         struct ctdb_context *ctdb = rec->ctdb;
1132         uint32_t db_id;
1133         struct ctdb_db_context *ctdb_db;
1134
1135         if (data.dsize != sizeof(db_id)) {
1136                 return;
1137         }
1138         db_id = *(uint32_t *)data.dptr;
1139
1140         ctdb_db = find_ctdb_db(ctdb, db_id);
1141         if (ctdb_db == NULL) {
1142                 /* database is not attached */
1143                 return;
1144         }
1145
1146         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1147
1148         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1149                              ctdb_db->db_name));
1150         talloc_free(ctdb_db);
1151 }
1152
1153 /*
1154   called when ctdb_wait_timeout should finish
1155  */
1156 static void ctdb_wait_handler(struct tevent_context *ev,
1157                               struct tevent_timer *te,
1158                               struct timeval yt, void *p)
1159 {
1160         uint32_t *timed_out = (uint32_t *)p;
1161         (*timed_out) = 1;
1162 }
1163
1164 /*
1165   wait for a given number of seconds
1166  */
1167 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1168 {
1169         uint32_t timed_out = 0;
1170         time_t usecs = (secs - (time_t)secs) * 1000000;
1171         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1172                          ctdb_wait_handler, &timed_out);
1173         while (!timed_out) {
1174                 tevent_loop_once(ctdb->ev);
1175         }
1176 }
1177
1178 /*
1179   called when an election times out (ends)
1180  */
1181 static void ctdb_election_timeout(struct tevent_context *ev,
1182                                   struct tevent_timer *te,
1183                                   struct timeval t, void *p)
1184 {
1185         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1186         rec->election_timeout = NULL;
1187         fast_start = false;
1188
1189         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1190 }
1191
1192
1193 /*
1194   wait for an election to finish. It finished election_timeout seconds after
1195   the last election packet is received
1196  */
1197 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1198 {
1199         struct ctdb_context *ctdb = rec->ctdb;
1200         while (rec->election_timeout) {
1201                 tevent_loop_once(ctdb->ev);
1202         }
1203 }
1204
1205 /*
1206   Update our local flags from all remote connected nodes. 
1207   This is only run when we are or we belive we are the recovery master
1208  */
1209 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1210 {
1211         int j;
1212         struct ctdb_context *ctdb = rec->ctdb;
1213         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1214
1215         /* get the nodemap for all active remote nodes and verify
1216            they are the same as for this node
1217          */
1218         for (j=0; j<nodemap->num; j++) {
1219                 struct ctdb_node_map_old *remote_nodemap=NULL;
1220                 int ret;
1221
1222                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1223                         continue;
1224                 }
1225                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1226                         continue;
1227                 }
1228
1229                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1230                                            mem_ctx, &remote_nodemap);
1231                 if (ret != 0) {
1232                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1233                                   nodemap->nodes[j].pnn));
1234                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1235                         talloc_free(mem_ctx);
1236                         return MONITOR_FAILED;
1237                 }
1238                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1239                         /* We should tell our daemon about this so it
1240                            updates its flags or else we will log the same 
1241                            message again in the next iteration of recovery.
1242                            Since we are the recovery master we can just as
1243                            well update the flags on all nodes.
1244                         */
1245                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1246                         if (ret != 0) {
1247                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1248                                 return -1;
1249                         }
1250
1251                         /* Update our local copy of the flags in the recovery
1252                            daemon.
1253                         */
1254                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1255                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1256                                  nodemap->nodes[j].flags));
1257                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1258                 }
1259                 talloc_free(remote_nodemap);
1260         }
1261         talloc_free(mem_ctx);
1262         return MONITOR_OK;
1263 }
1264
1265
1266 /* Create a new random generation id.
1267    The generation id can not be the INVALID_GENERATION id
1268 */
1269 static uint32_t new_generation(void)
1270 {
1271         uint32_t generation;
1272
1273         while (1) {
1274                 generation = random();
1275
1276                 if (generation != INVALID_GENERATION) {
1277                         break;
1278                 }
1279         }
1280
1281         return generation;
1282 }
1283
1284
1285 /*
1286   create a temporary working database
1287  */
1288 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1289 {
1290         char *name;
1291         struct tdb_wrap *recdb;
1292         unsigned tdb_flags;
1293
1294         /* open up the temporary recovery database */
1295         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1296                                ctdb->db_directory_state,
1297                                ctdb->pnn);
1298         if (name == NULL) {
1299                 return NULL;
1300         }
1301         unlink(name);
1302
1303         tdb_flags = TDB_NOLOCK;
1304         if (ctdb->valgrinding) {
1305                 tdb_flags |= TDB_NOMMAP;
1306         }
1307         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1308
1309         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1310                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1311         if (recdb == NULL) {
1312                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1313         }
1314
1315         talloc_free(name);
1316
1317         return recdb;
1318 }
1319
1320
1321 /* 
1322    a traverse function for pulling all relevant records from recdb
1323  */
1324 struct recdb_data {
1325         struct ctdb_context *ctdb;
1326         struct ctdb_marshall_buffer *recdata;
1327         uint32_t len;
1328         uint32_t allocated_len;
1329         bool failed;
1330         bool persistent;
1331 };
1332
1333 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1334 {
1335         struct recdb_data *params = (struct recdb_data *)p;
1336         struct ctdb_rec_data_old *recdata;
1337         struct ctdb_ltdb_header *hdr;
1338
1339         /*
1340          * skip empty records - but NOT for persistent databases:
1341          *
1342          * The record-by-record mode of recovery deletes empty records.
1343          * For persistent databases, this can lead to data corruption
1344          * by deleting records that should be there:
1345          *
1346          * - Assume the cluster has been running for a while.
1347          *
1348          * - A record R in a persistent database has been created and
1349          *   deleted a couple of times, the last operation being deletion,
1350          *   leaving an empty record with a high RSN, say 10.
1351          *
1352          * - Now a node N is turned off.
1353          *
1354          * - This leaves the local database copy of D on N with the empty
1355          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1356          *   the copy of record R.
1357          *
1358          * - Now the record is created again while node N is turned off.
1359          *   This creates R with RSN = 1 on all nodes except for N.
1360          *
1361          * - Now node N is turned on again. The following recovery will chose
1362          *   the older empty copy of R due to RSN 10 > RSN 1.
1363          *
1364          * ==> Hence the record is gone after the recovery.
1365          *
1366          * On databases like Samba's registry, this can damage the higher-level
1367          * data structures built from the various tdb-level records.
1368          */
1369         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1370                 return 0;
1371         }
1372
1373         /* update the dmaster field to point to us */
1374         hdr = (struct ctdb_ltdb_header *)data.dptr;
1375         if (!params->persistent) {
1376                 hdr->dmaster = params->ctdb->pnn;
1377                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1378         }
1379
1380         /* add the record to the blob ready to send to the nodes */
1381         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1382         if (recdata == NULL) {
1383                 params->failed = true;
1384                 return -1;
1385         }
1386         if (params->len + recdata->length >= params->allocated_len) {
1387                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1388                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1389         }
1390         if (params->recdata == NULL) {
1391                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1392                          recdata->length + params->len));
1393                 params->failed = true;
1394                 return -1;
1395         }
1396         params->recdata->count++;
1397         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1398         params->len += recdata->length;
1399         talloc_free(recdata);
1400
1401         return 0;
1402 }
1403
1404 /*
1405   push the recdb database out to all nodes
1406  */
1407 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1408                                bool persistent,
1409                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1410 {
1411         struct recdb_data params;
1412         struct ctdb_marshall_buffer *recdata;
1413         TDB_DATA outdata;
1414         TALLOC_CTX *tmp_ctx;
1415         uint32_t *nodes;
1416
1417         tmp_ctx = talloc_new(ctdb);
1418         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1419
1420         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1421         CTDB_NO_MEMORY(ctdb, recdata);
1422
1423         recdata->db_id = dbid;
1424
1425         params.ctdb = ctdb;
1426         params.recdata = recdata;
1427         params.len = offsetof(struct ctdb_marshall_buffer, data);
1428         params.allocated_len = params.len;
1429         params.failed = false;
1430         params.persistent = persistent;
1431
1432         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1433                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1434                 talloc_free(params.recdata);
1435                 talloc_free(tmp_ctx);
1436                 return -1;
1437         }
1438
1439         if (params.failed) {
1440                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1441                 talloc_free(params.recdata);
1442                 talloc_free(tmp_ctx);
1443                 return -1;              
1444         }
1445
1446         recdata = params.recdata;
1447
1448         outdata.dptr = (void *)recdata;
1449         outdata.dsize = params.len;
1450
1451         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1452         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1453                                         nodes, 0,
1454                                         CONTROL_TIMEOUT(), false, outdata,
1455                                         NULL, NULL,
1456                                         NULL) != 0) {
1457                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1458                 talloc_free(recdata);
1459                 talloc_free(tmp_ctx);
1460                 return -1;
1461         }
1462
1463         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1464                   dbid, recdata->count));
1465
1466         talloc_free(recdata);
1467         talloc_free(tmp_ctx);
1468
1469         return 0;
1470 }
1471
1472
1473 /*
1474   go through a full recovery on one database 
1475  */
1476 static int recover_database(struct ctdb_recoverd *rec, 
1477                             TALLOC_CTX *mem_ctx,
1478                             uint32_t dbid,
1479                             bool persistent,
1480                             uint32_t pnn, 
1481                             struct ctdb_node_map_old *nodemap,
1482                             uint32_t transaction_id)
1483 {
1484         struct tdb_wrap *recdb;
1485         int ret;
1486         struct ctdb_context *ctdb = rec->ctdb;
1487         TDB_DATA data;
1488         struct ctdb_transdb w;
1489         uint32_t *nodes;
1490
1491         recdb = create_recdb(ctdb, mem_ctx);
1492         if (recdb == NULL) {
1493                 return -1;
1494         }
1495
1496         /* pull all remote databases onto the recdb */
1497         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1498         if (ret != 0) {
1499                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1500                 return -1;
1501         }
1502
1503         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1504
1505         /* wipe all the remote databases. This is safe as we are in a transaction */
1506         w.db_id = dbid;
1507         w.tid = transaction_id;
1508
1509         data.dptr = (void *)&w;
1510         data.dsize = sizeof(w);
1511
1512         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1513         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1514                                         nodes, 0,
1515                                         CONTROL_TIMEOUT(), false, data,
1516                                         NULL, NULL,
1517                                         NULL) != 0) {
1518                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1519                 talloc_free(recdb);
1520                 return -1;
1521         }
1522         
1523         /* push out the correct database. This sets the dmaster and skips 
1524            the empty records */
1525         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1526         if (ret != 0) {
1527                 talloc_free(recdb);
1528                 return -1;
1529         }
1530
1531         /* all done with this database */
1532         talloc_free(recdb);
1533
1534         return 0;
1535 }
1536
1537 /* when we start a recovery, make sure all nodes use the same reclock file
1538    setting
1539 */
1540 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1541 {
1542         struct ctdb_context *ctdb = rec->ctdb;
1543         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1544         TDB_DATA data;
1545         uint32_t *nodes;
1546
1547         if (ctdb->recovery_lock_file == NULL) {
1548                 data.dptr  = NULL;
1549                 data.dsize = 0;
1550         } else {
1551                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1552                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1553         }
1554
1555         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1556         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1557                                         nodes, 0,
1558                                         CONTROL_TIMEOUT(),
1559                                         false, data,
1560                                         NULL, NULL,
1561                                         rec) != 0) {
1562                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1563                 talloc_free(tmp_ctx);
1564                 return -1;
1565         }
1566
1567         talloc_free(tmp_ctx);
1568         return 0;
1569 }
1570
1571
1572 /*
1573  * this callback is called for every node that failed to execute ctdb_takeover_run()
1574  * and set flag to re-run takeover run.
1575  */
1576 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1577 {
1578         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1579
1580         if (callback_data != NULL) {
1581                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1582
1583                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1584
1585                 ctdb_set_culprit(rec, node_pnn);
1586         }
1587 }
1588
1589
1590 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1591 {
1592         struct ctdb_context *ctdb = rec->ctdb;
1593         int i;
1594         struct ctdb_banning_state *ban_state;
1595
1596         *self_ban = false;
1597         for (i=0; i<ctdb->num_nodes; i++) {
1598                 if (ctdb->nodes[i]->ban_state == NULL) {
1599                         continue;
1600                 }
1601                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1602                 if (ban_state->count < 2*ctdb->num_nodes) {
1603                         continue;
1604                 }
1605
1606                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1607                         ctdb->nodes[i]->pnn, ban_state->count,
1608                         ctdb->tunable.recovery_ban_period));
1609                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1610                 ban_state->count = 0;
1611
1612                 /* Banning ourself? */
1613                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1614                         *self_ban = true;
1615                 }
1616         }
1617 }
1618
1619 static bool do_takeover_run(struct ctdb_recoverd *rec,
1620                             struct ctdb_node_map_old *nodemap,
1621                             bool banning_credits_on_fail)
1622 {
1623         uint32_t *nodes = NULL;
1624         struct ctdb_disable_message dtr;
1625         TDB_DATA data;
1626         int i;
1627         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1628         int ret;
1629         bool ok;
1630
1631         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1632
1633         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1634                 DEBUG(DEBUG_ERR, (__location__
1635                                   " takeover run already in progress \n"));
1636                 ok = false;
1637                 goto done;
1638         }
1639
1640         if (!ctdb_op_begin(rec->takeover_run)) {
1641                 ok = false;
1642                 goto done;
1643         }
1644
1645         /* Disable IP checks (takeover runs, really) on other nodes
1646          * while doing this takeover run.  This will stop those other
1647          * nodes from triggering takeover runs when think they should
1648          * be hosting an IP but it isn't yet on an interface.  Don't
1649          * wait for replies since a failure here might cause some
1650          * noise in the logs but will not actually cause a problem.
1651          */
1652         dtr.srvid = 0; /* No reply */
1653         dtr.pnn = -1;
1654
1655         data.dptr  = (uint8_t*)&dtr;
1656         data.dsize = sizeof(dtr);
1657
1658         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1659
1660         /* Disable for 60 seconds.  This can be a tunable later if
1661          * necessary.
1662          */
1663         dtr.timeout = 60;
1664         for (i = 0; i < talloc_array_length(nodes); i++) {
1665                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1666                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1667                                              data) != 0) {
1668                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1669                 }
1670         }
1671
1672         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1673                                 rec->force_rebalance_nodes,
1674                                 takeover_fail_callback,
1675                                 banning_credits_on_fail ? rec : NULL);
1676
1677         /* Reenable takeover runs and IP checks on other nodes */
1678         dtr.timeout = 0;
1679         for (i = 0; i < talloc_array_length(nodes); i++) {
1680                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1681                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1682                                              data) != 0) {
1683                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1684                 }
1685         }
1686
1687         if (ret != 0) {
1688                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1689                 ok = false;
1690                 goto done;
1691         }
1692
1693         ok = true;
1694         /* Takeover run was successful so clear force rebalance targets */
1695         if (rebalance_nodes == rec->force_rebalance_nodes) {
1696                 TALLOC_FREE(rec->force_rebalance_nodes);
1697         } else {
1698                 DEBUG(DEBUG_WARNING,
1699                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1700         }
1701 done:
1702         rec->need_takeover_run = !ok;
1703         talloc_free(nodes);
1704         ctdb_op_end(rec->takeover_run);
1705
1706         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1707         return ok;
1708 }
1709
1710 struct recovery_helper_state {
1711         int fd[2];
1712         pid_t pid;
1713         int result;
1714         bool done;
1715 };
1716
1717 static void ctdb_recovery_handler(struct tevent_context *ev,
1718                                   struct tevent_fd *fde,
1719                                   uint16_t flags, void *private_data)
1720 {
1721         struct recovery_helper_state *state = talloc_get_type_abort(
1722                 private_data, struct recovery_helper_state);
1723         int ret;
1724
1725         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1726         if (ret != sizeof(state->result)) {
1727                 state->result = EPIPE;
1728         }
1729
1730         state->done = true;
1731 }
1732
1733
1734 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1735 {
1736         static char prog[PATH_MAX+1] = "";
1737         const char **args;
1738         struct recovery_helper_state *state;
1739         struct tevent_fd *fde;
1740         int nargs, ret;
1741
1742         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1743                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1744                              "ctdb_recovery_helper")) {
1745                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1746         }
1747
1748         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1749         if (state == NULL) {
1750                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1751                 return -1;
1752         }
1753
1754         state->pid = -1;
1755
1756         ret = pipe(state->fd);
1757         if (ret != 0) {
1758                 DEBUG(DEBUG_ERR,
1759                       ("Failed to create pipe for recovery helper\n"));
1760                 goto fail;
1761         }
1762
1763         set_close_on_exec(state->fd[0]);
1764
1765         nargs = 4;
1766         args = talloc_array(state, const char *, nargs);
1767         if (args == NULL) {
1768                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1769                 goto fail;
1770         }
1771
1772         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1773         args[1] = rec->ctdb->daemon.name;
1774         args[2] = talloc_asprintf(args, "%u", new_generation());
1775         args[3] = NULL;
1776
1777         if (args[0] == NULL || args[2] == NULL) {
1778                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1779                 goto fail;
1780         }
1781
1782         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1783                                      args, NULL, NULL, &state->pid)) {
1784                 DEBUG(DEBUG_ERR,
1785                       ("Failed to create child for recovery helper\n"));
1786                 goto fail;
1787         }
1788
1789         close(state->fd[1]);
1790         state->fd[1] = -1;
1791
1792         state->done = false;
1793
1794         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1795                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1796         if (fde == NULL) {
1797                 goto fail;
1798         }
1799         tevent_fd_set_auto_close(fde);
1800
1801         while (!state->done) {
1802                 tevent_loop_once(rec->ctdb->ev);
1803         }
1804
1805         close(state->fd[0]);
1806         state->fd[0] = -1;
1807
1808         if (state->result != 0) {
1809                 goto fail;
1810         }
1811
1812         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1813         talloc_free(state);
1814         return 0;
1815
1816 fail:
1817         if (state->fd[0] != -1) {
1818                 close(state->fd[0]);
1819         }
1820         if (state->fd[1] != -1) {
1821                 close(state->fd[1]);
1822         }
1823         if (state->pid != -1) {
1824                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1825         }
1826         talloc_free(state);
1827         return -1;
1828 }
1829
1830 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1831                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1832                               struct ctdb_vnn_map *vnnmap,
1833                               struct ctdb_dbid_map_old *dbmap)
1834 {
1835         struct ctdb_context *ctdb = rec->ctdb;
1836         uint32_t generation;
1837         TDB_DATA data;
1838         uint32_t *nodes;
1839         int ret, i, j;
1840
1841         /* set recovery mode to active on all nodes */
1842         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1843         if (ret != 0) {
1844                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1845                 return -1;
1846         }
1847
1848         /* execute the "startrecovery" event script on all nodes */
1849         ret = run_startrecovery_eventscript(rec, nodemap);
1850         if (ret!=0) {
1851                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1852                 return -1;
1853         }
1854
1855         /* pick a new generation number */
1856         generation = new_generation();
1857
1858         /* change the vnnmap on this node to use the new generation 
1859            number but not on any other nodes.
1860            this guarantees that if we abort the recovery prematurely
1861            for some reason (a node stops responding?)
1862            that we can just return immediately and we will reenter
1863            recovery shortly again.
1864            I.e. we deliberately leave the cluster with an inconsistent
1865            generation id to allow us to abort recovery at any stage and
1866            just restart it from scratch.
1867          */
1868         vnnmap->generation = generation;
1869         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1870         if (ret != 0) {
1871                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1872                 return -1;
1873         }
1874
1875         /* Database generations are updated when the transaction is commited to
1876          * the databases.  So make sure to use the final generation as the
1877          * transaction id
1878          */
1879         generation = new_generation();
1880
1881         data.dptr = (void *)&generation;
1882         data.dsize = sizeof(uint32_t);
1883
1884         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1885         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1886                                         nodes, 0,
1887                                         CONTROL_TIMEOUT(), false, data,
1888                                         NULL,
1889                                         transaction_start_fail_callback,
1890                                         rec) != 0) {
1891                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1892                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1893                                         nodes, 0,
1894                                         CONTROL_TIMEOUT(), false, tdb_null,
1895                                         NULL,
1896                                         NULL,
1897                                         NULL) != 0) {
1898                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1899                 }
1900                 return -1;
1901         }
1902
1903         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1904
1905         for (i=0;i<dbmap->num;i++) {
1906                 ret = recover_database(rec, mem_ctx,
1907                                        dbmap->dbs[i].db_id,
1908                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1909                                        pnn, nodemap, generation);
1910                 if (ret != 0) {
1911                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
1912                         return -1;
1913                 }
1914         }
1915
1916         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1917
1918         /* commit all the changes */
1919         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1920                                         nodes, 0,
1921                                         CONTROL_TIMEOUT(), false, data,
1922                                         NULL, NULL,
1923                                         NULL) != 0) {
1924                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1925                 return -1;
1926         }
1927
1928         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1929
1930         /* build a new vnn map with all the currently active and
1931            unbanned nodes */
1932         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1933         CTDB_NO_MEMORY(ctdb, vnnmap);
1934         vnnmap->generation = generation;
1935         vnnmap->size = 0;
1936         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1937         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1938         for (i=j=0;i<nodemap->num;i++) {
1939                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1940                         continue;
1941                 }
1942                 if (!ctdb_node_has_capabilities(rec->caps,
1943                                                 ctdb->nodes[i]->pnn,
1944                                                 CTDB_CAP_LMASTER)) {
1945                         /* this node can not be an lmaster */
1946                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1947                         continue;
1948                 }
1949
1950                 vnnmap->size++;
1951                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1952                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1953                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1954
1955         }
1956         if (vnnmap->size == 0) {
1957                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1958                 vnnmap->size++;
1959                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1960                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1961                 vnnmap->map[0] = pnn;
1962         }
1963
1964         /* update to the new vnnmap on all nodes */
1965         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1966         if (ret != 0) {
1967                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1968                 return -1;
1969         }
1970
1971         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1972
1973         /* disable recovery mode */
1974         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
1975         if (ret != 0) {
1976                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1977                 return -1;
1978         }
1979
1980         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1981
1982         return 0;
1983 }
1984
1985 /*
1986   we are the recmaster, and recovery is needed - start a recovery run
1987  */
1988 static int do_recovery(struct ctdb_recoverd *rec,
1989                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1990                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1991 {
1992         struct ctdb_context *ctdb = rec->ctdb;
1993         int i, ret;
1994         struct ctdb_dbid_map_old *dbmap;
1995         struct timeval start_time;
1996         bool self_ban;
1997         bool par_recovery;
1998
1999         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2000
2001         /* Check if the current node is still the recmaster.  It's possible that
2002          * re-election has changed the recmaster.
2003          */
2004         if (pnn != rec->recmaster) {
2005                 DEBUG(DEBUG_NOTICE,
2006                       ("Recovery master changed to %u, aborting recovery\n",
2007                        rec->recmaster));
2008                 return -1;
2009         }
2010
2011         /* if recovery fails, force it again */
2012         rec->need_recovery = true;
2013
2014         if (!ctdb_op_begin(rec->recovery)) {
2015                 return -1;
2016         }
2017
2018         if (rec->election_timeout) {
2019                 /* an election is in progress */
2020                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2021                 goto fail;
2022         }
2023
2024         ban_misbehaving_nodes(rec, &self_ban);
2025         if (self_ban) {
2026                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2027                 goto fail;
2028         }
2029
2030         if (ctdb->recovery_lock_file != NULL) {
2031                 if (ctdb_recovery_have_lock(ctdb)) {
2032                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2033                 } else {
2034                         start_time = timeval_current();
2035                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2036                                              ctdb->recovery_lock_file));
2037                         if (!ctdb_recovery_lock(ctdb)) {
2038                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2039                                         /* If ctdb is trying first recovery, it's
2040                                          * possible that current node does not know
2041                                          * yet who the recmaster is.
2042                                          */
2043                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2044                                                           " - retrying recovery\n"));
2045                                         goto fail;
2046                                 }
2047
2048                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2049                                                  "and ban ourself for %u seconds\n",
2050                                                  ctdb->tunable.recovery_ban_period));
2051                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2052                                 goto fail;
2053                         }
2054                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2055                                                            CONTROL_TIMEOUT(),
2056                                                            timeval_elapsed(&start_time));
2057                         DEBUG(DEBUG_NOTICE,
2058                               ("Recovery lock taken successfully by recovery daemon\n"));
2059                 }
2060         }
2061
2062         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2063
2064         /* get a list of all databases */
2065         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2066         if (ret != 0) {
2067                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2068                 goto fail;
2069         }
2070
2071         /* we do the db creation before we set the recovery mode, so the freeze happens
2072            on all databases we will be dealing with. */
2073
2074         /* verify that we have all the databases any other node has */
2075         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2076         if (ret != 0) {
2077                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2078                 goto fail;
2079         }
2080
2081         /* verify that all other nodes have all our databases */
2082         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2083         if (ret != 0) {
2084                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2085                 goto fail;
2086         }
2087         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2088
2089         /* update the database priority for all remote databases */
2090         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2091         if (ret != 0) {
2092                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2093         }
2094         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2095
2096
2097         /* update all other nodes to use the same setting for reclock files
2098            as the local recovery master.
2099         */
2100         sync_recovery_lock_file_across_cluster(rec);
2101
2102         /* Retrieve capabilities from all connected nodes */
2103         ret = update_capabilities(rec, nodemap);
2104         if (ret!=0) {
2105                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2106                 return -1;
2107         }
2108
2109         /*
2110           update all nodes to have the same flags that we have
2111          */
2112         for (i=0;i<nodemap->num;i++) {
2113                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2114                         continue;
2115                 }
2116
2117                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2118                 if (ret != 0) {
2119                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2120                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2121                         } else {
2122                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2123                                 return -1;
2124                         }
2125                 }
2126         }
2127
2128         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2129
2130         /* Check if all participating nodes have parallel recovery capability */
2131         par_recovery = true;
2132         for (i=0; i<nodemap->num; i++) {
2133                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2134                         continue;
2135                 }
2136
2137                 if (!(rec->caps[i].capabilities &
2138                       CTDB_CAP_PARALLEL_RECOVERY)) {
2139                         par_recovery = false;
2140                         break;
2141                 }
2142         }
2143
2144         if (par_recovery) {
2145                 ret = db_recovery_parallel(rec, mem_ctx);
2146         } else {
2147                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2148                                          dbmap);
2149         }
2150
2151         if (ret != 0) {
2152                 goto fail;
2153         }
2154
2155         do_takeover_run(rec, nodemap, false);
2156
2157         /* execute the "recovered" event script on all nodes */
2158         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2159         if (ret!=0) {
2160                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2161                 goto fail;
2162         }
2163
2164         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2165
2166         /* send a message to all clients telling them that the cluster 
2167            has been reconfigured */
2168         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2169                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2170         if (ret != 0) {
2171                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2172                 goto fail;
2173         }
2174
2175         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2176
2177         rec->need_recovery = false;
2178         ctdb_op_end(rec->recovery);
2179
2180         /* we managed to complete a full recovery, make sure to forgive
2181            any past sins by the nodes that could now participate in the
2182            recovery.
2183         */
2184         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2185         for (i=0;i<nodemap->num;i++) {
2186                 struct ctdb_banning_state *ban_state;
2187
2188                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2189                         continue;
2190                 }
2191
2192                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2193                 if (ban_state == NULL) {
2194                         continue;
2195                 }
2196
2197                 ban_state->count = 0;
2198         }
2199
2200         /* We just finished a recovery successfully.
2201            We now wait for rerecovery_timeout before we allow
2202            another recovery to take place.
2203         */
2204         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2205         ctdb_op_disable(rec->recovery, ctdb->ev,
2206                         ctdb->tunable.rerecovery_timeout);
2207         return 0;
2208
2209 fail:
2210         ctdb_op_end(rec->recovery);
2211         return -1;
2212 }
2213
2214
2215 /*
2216   elections are won by first checking the number of connected nodes, then
2217   the priority time, then the pnn
2218  */
2219 struct election_message {
2220         uint32_t num_connected;
2221         struct timeval priority_time;
2222         uint32_t pnn;
2223         uint32_t node_flags;
2224 };
2225
2226 /*
2227   form this nodes election data
2228  */
2229 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2230 {
2231         int ret, i;
2232         struct ctdb_node_map_old *nodemap;
2233         struct ctdb_context *ctdb = rec->ctdb;
2234
2235         ZERO_STRUCTP(em);
2236
2237         em->pnn = rec->ctdb->pnn;
2238         em->priority_time = rec->priority_time;
2239
2240         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2241         if (ret != 0) {
2242                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2243                 return;
2244         }
2245
2246         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2247         em->node_flags = rec->node_flags;
2248
2249         for (i=0;i<nodemap->num;i++) {
2250                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2251                         em->num_connected++;
2252                 }
2253         }
2254
2255         /* we shouldnt try to win this election if we cant be a recmaster */
2256         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2257                 em->num_connected = 0;
2258                 em->priority_time = timeval_current();
2259         }
2260
2261         talloc_free(nodemap);
2262 }
2263
2264 /*
2265   see if the given election data wins
2266  */
2267 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2268 {
2269         struct election_message myem;
2270         int cmp = 0;
2271
2272         ctdb_election_data(rec, &myem);
2273
2274         /* we cant win if we don't have the recmaster capability */
2275         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2276                 return false;
2277         }
2278
2279         /* we cant win if we are banned */
2280         if (rec->node_flags & NODE_FLAGS_BANNED) {
2281                 return false;
2282         }
2283
2284         /* we cant win if we are stopped */
2285         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2286                 return false;
2287         }
2288
2289         /* we will automatically win if the other node is banned */
2290         if (em->node_flags & NODE_FLAGS_BANNED) {
2291                 return true;
2292         }
2293
2294         /* we will automatically win if the other node is banned */
2295         if (em->node_flags & NODE_FLAGS_STOPPED) {
2296                 return true;
2297         }
2298
2299         /* then the longest running node */
2300         if (cmp == 0) {
2301                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2302         }
2303
2304         if (cmp == 0) {
2305                 cmp = (int)myem.pnn - (int)em->pnn;
2306         }
2307
2308         return cmp > 0;
2309 }
2310
2311 /*
2312   send out an election request
2313  */
2314 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2315 {
2316         int ret;
2317         TDB_DATA election_data;
2318         struct election_message emsg;
2319         uint64_t srvid;
2320         struct ctdb_context *ctdb = rec->ctdb;
2321
2322         srvid = CTDB_SRVID_ELECTION;
2323
2324         ctdb_election_data(rec, &emsg);
2325
2326         election_data.dsize = sizeof(struct election_message);
2327         election_data.dptr  = (unsigned char *)&emsg;
2328
2329
2330         /* first we assume we will win the election and set 
2331            recoverymaster to be ourself on the current node
2332          */
2333         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2334                                      CTDB_CURRENT_NODE, pnn);
2335         if (ret != 0) {
2336                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
2337                 return -1;
2338         }
2339         rec->recmaster = pnn;
2340
2341         /* send an election message to all active nodes */
2342         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2343         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2344 }
2345
2346 /*
2347   this function will unban all nodes in the cluster
2348 */
2349 static void unban_all_nodes(struct ctdb_context *ctdb)
2350 {
2351         int ret, i;
2352         struct ctdb_node_map_old *nodemap;
2353         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2354         
2355         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2356         if (ret != 0) {
2357                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2358                 return;
2359         }
2360
2361         for (i=0;i<nodemap->num;i++) {
2362                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2363                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2364                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2365                                                  nodemap->nodes[i].pnn, 0,
2366                                                  NODE_FLAGS_BANNED);
2367                         if (ret != 0) {
2368                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2369                         }
2370                 }
2371         }
2372
2373         talloc_free(tmp_ctx);
2374 }
2375
2376
2377 /*
2378   we think we are winning the election - send a broadcast election request
2379  */
2380 static void election_send_request(struct tevent_context *ev,
2381                                   struct tevent_timer *te,
2382                                   struct timeval t, void *p)
2383 {
2384         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2385         int ret;
2386
2387         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2388         if (ret != 0) {
2389                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2390         }
2391
2392         TALLOC_FREE(rec->send_election_te);
2393 }
2394
2395 /*
2396   handler for memory dumps
2397 */
2398 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2399 {
2400         struct ctdb_recoverd *rec = talloc_get_type(
2401                 private_data, struct ctdb_recoverd);
2402         struct ctdb_context *ctdb = rec->ctdb;
2403         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2404         TDB_DATA *dump;
2405         int ret;
2406         struct ctdb_srvid_message *rd;
2407
2408         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2409                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2410                 talloc_free(tmp_ctx);
2411                 return;
2412         }
2413         rd = (struct ctdb_srvid_message *)data.dptr;
2414
2415         dump = talloc_zero(tmp_ctx, TDB_DATA);
2416         if (dump == NULL) {
2417                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2418                 talloc_free(tmp_ctx);
2419                 return;
2420         }
2421         ret = ctdb_dump_memory(ctdb, dump);
2422         if (ret != 0) {
2423                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2424                 talloc_free(tmp_ctx);
2425                 return;
2426         }
2427
2428 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2429
2430         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2431         if (ret != 0) {
2432                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2433                 talloc_free(tmp_ctx);
2434                 return;
2435         }
2436
2437         talloc_free(tmp_ctx);
2438 }
2439
2440 /*
2441   handler for reload_nodes
2442 */
2443 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2444                                  void *private_data)
2445 {
2446         struct ctdb_recoverd *rec = talloc_get_type(
2447                 private_data, struct ctdb_recoverd);
2448
2449         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2450
2451         ctdb_load_nodes_file(rec->ctdb);
2452 }
2453
2454
2455 static void ctdb_rebalance_timeout(struct tevent_context *ev,
2456                                    struct tevent_timer *te,
2457                                    struct timeval t, void *p)
2458 {
2459         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2460
2461         if (rec->force_rebalance_nodes == NULL) {
2462                 DEBUG(DEBUG_ERR,
2463                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2464                 return;
2465         }
2466
2467         DEBUG(DEBUG_NOTICE,
2468               ("Rebalance timeout occurred - trigger takeover run\n"));
2469         rec->need_takeover_run = true;
2470 }
2471
2472
2473 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2474                                         void *private_data)
2475 {
2476         struct ctdb_recoverd *rec = talloc_get_type(
2477                 private_data, struct ctdb_recoverd);
2478         struct ctdb_context *ctdb = rec->ctdb;
2479         uint32_t pnn;
2480         uint32_t *t;
2481         int len;
2482         uint32_t deferred_rebalance;
2483
2484         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2485                 return;
2486         }
2487
2488         if (data.dsize != sizeof(uint32_t)) {
2489                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2490                 return;
2491         }
2492
2493         pnn = *(uint32_t *)&data.dptr[0];
2494
2495         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2496
2497         /* Copy any existing list of nodes.  There's probably some
2498          * sort of realloc variant that will do this but we need to
2499          * make sure that freeing the old array also cancels the timer
2500          * event for the timeout... not sure if realloc will do that.
2501          */
2502         len = (rec->force_rebalance_nodes != NULL) ?
2503                 talloc_array_length(rec->force_rebalance_nodes) :
2504                 0;
2505
2506         /* This allows duplicates to be added but they don't cause
2507          * harm.  A call to add a duplicate PNN arguably means that
2508          * the timeout should be reset, so this is the simplest
2509          * solution.
2510          */
2511         t = talloc_zero_array(rec, uint32_t, len+1);
2512         CTDB_NO_MEMORY_VOID(ctdb, t);
2513         if (len > 0) {
2514                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2515         }
2516         t[len] = pnn;
2517
2518         talloc_free(rec->force_rebalance_nodes);
2519
2520         rec->force_rebalance_nodes = t;
2521
2522         /* If configured, setup a deferred takeover run to make sure
2523          * that certain nodes get IPs rebalanced to them.  This will
2524          * be cancelled if a successful takeover run happens before
2525          * the timeout.  Assign tunable value to variable for
2526          * readability.
2527          */
2528         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2529         if (deferred_rebalance != 0) {
2530                 tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
2531                                  timeval_current_ofs(deferred_rebalance, 0),
2532                                  ctdb_rebalance_timeout, rec);
2533         }
2534 }
2535
2536
2537
2538 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2539                                    void *private_data)
2540 {
2541         struct ctdb_recoverd *rec = talloc_get_type(
2542                 private_data, struct ctdb_recoverd);
2543         struct ctdb_public_ip *ip;
2544
2545         if (rec->recmaster != rec->ctdb->pnn) {
2546                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2547                 return;
2548         }
2549
2550         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2551                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2552                 return;
2553         }
2554
2555         ip = (struct ctdb_public_ip *)data.dptr;
2556
2557         update_ip_assignment_tree(rec->ctdb, ip);
2558 }
2559
2560 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2561                                     TDB_DATA data,
2562                                     struct ctdb_op_state *op_state)
2563 {
2564         struct ctdb_disable_message *r;
2565         uint32_t timeout;
2566         TDB_DATA result;
2567         int32_t ret = 0;
2568
2569         /* Validate input data */
2570         if (data.dsize != sizeof(struct ctdb_disable_message)) {
2571                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2572                                  "expecting %lu\n", (long unsigned)data.dsize,
2573                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
2574                 return;
2575         }
2576         if (data.dptr == NULL) {
2577                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2578                 return;
2579         }
2580
2581         r = (struct ctdb_disable_message *)data.dptr;
2582         timeout = r->timeout;
2583
2584         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2585         if (ret != 0) {
2586                 goto done;
2587         }
2588
2589         /* Returning our PNN tells the caller that we succeeded */
2590         ret = ctdb_get_pnn(ctdb);
2591 done:
2592         result.dsize = sizeof(int32_t);
2593         result.dptr  = (uint8_t *)&ret;
2594         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
2595 }
2596
2597 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2598                                           void *private_data)
2599 {
2600         struct ctdb_recoverd *rec = talloc_get_type(
2601                 private_data, struct ctdb_recoverd);
2602
2603         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2604 }
2605
2606 /* Backward compatibility for this SRVID */
2607 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2608                                      void *private_data)
2609 {
2610         struct ctdb_recoverd *rec = talloc_get_type(
2611                 private_data, struct ctdb_recoverd);
2612         uint32_t timeout;
2613
2614         if (data.dsize != sizeof(uint32_t)) {
2615                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2616                                  "expecting %lu\n", (long unsigned)data.dsize,
2617                                  (long unsigned)sizeof(uint32_t)));
2618                 return;
2619         }
2620         if (data.dptr == NULL) {
2621                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2622                 return;
2623         }
2624
2625         timeout = *((uint32_t *)data.dptr);
2626
2627         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2628 }
2629
2630 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2631                                        void *private_data)
2632 {
2633         struct ctdb_recoverd *rec = talloc_get_type(
2634                 private_data, struct ctdb_recoverd);
2635
2636         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2637 }
2638
2639 /*
2640   handler for ip reallocate, just add it to the list of requests and 
2641   handle this later in the monitor_cluster loop so we do not recurse
2642   with other requests to takeover_run()
2643 */
2644 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2645                                   void *private_data)
2646 {
2647         struct ctdb_srvid_message *request;
2648         struct ctdb_recoverd *rec = talloc_get_type(
2649                 private_data, struct ctdb_recoverd);
2650
2651         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2652                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2653                 return;
2654         }
2655
2656         request = (struct ctdb_srvid_message *)data.dptr;
2657
2658         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2659 }
2660
2661 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2662                                           struct ctdb_recoverd *rec)
2663 {
2664         TDB_DATA result;
2665         int32_t ret;
2666         struct srvid_requests *current;
2667
2668         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2669
2670         /* Only process requests that are currently pending.  More
2671          * might come in while the takeover run is in progress and
2672          * they will need to be processed later since they might
2673          * be in response flag changes.
2674          */
2675         current = rec->reallocate_requests;
2676         rec->reallocate_requests = NULL;
2677
2678         if (do_takeover_run(rec, rec->nodemap, false)) {
2679                 ret = ctdb_get_pnn(ctdb);
2680         } else {
2681                 ret = -1;
2682         }
2683
2684         result.dsize = sizeof(int32_t);
2685         result.dptr  = (uint8_t *)&ret;
2686
2687         srvid_requests_reply(ctdb, &current, result);
2688 }
2689
2690
2691 /*
2692   handler for recovery master elections
2693 */
2694 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2695 {
2696         struct ctdb_recoverd *rec = talloc_get_type(
2697                 private_data, struct ctdb_recoverd);
2698         struct ctdb_context *ctdb = rec->ctdb;
2699         int ret;
2700         struct election_message *em = (struct election_message *)data.dptr;
2701
2702         /* Ignore election packets from ourself */
2703         if (ctdb->pnn == em->pnn) {
2704                 return;
2705         }
2706
2707         /* we got an election packet - update the timeout for the election */
2708         talloc_free(rec->election_timeout);
2709         rec->election_timeout = tevent_add_timer(
2710                         ctdb->ev, ctdb,
2711                         fast_start ?
2712                                 timeval_current_ofs(0, 500000) :
2713                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2714                         ctdb_election_timeout, rec);
2715
2716         /* someone called an election. check their election data
2717            and if we disagree and we would rather be the elected node, 
2718            send a new election message to all other nodes
2719          */
2720         if (ctdb_election_win(rec, em)) {
2721                 if (!rec->send_election_te) {
2722                         rec->send_election_te = tevent_add_timer(
2723                                         ctdb->ev, rec,
2724                                         timeval_current_ofs(0, 500000),
2725                                         election_send_request, rec);
2726                 }
2727                 /*unban_all_nodes(ctdb);*/
2728                 return;
2729         }
2730
2731         /* we didn't win */
2732         TALLOC_FREE(rec->send_election_te);
2733
2734         /* Release the recovery lock file */
2735         if (ctdb_recovery_have_lock(ctdb)) {
2736                 ctdb_recovery_unlock(ctdb);
2737                 unban_all_nodes(ctdb);
2738         }
2739
2740         clear_ip_assignment_tree(ctdb);
2741
2742         /* ok, let that guy become recmaster then */
2743         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2744                                      CTDB_CURRENT_NODE, em->pnn);
2745         if (ret != 0) {
2746                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
2747                 return;
2748         }
2749         rec->recmaster = em->pnn;
2750
2751         return;
2752 }
2753
2754
2755 /*
2756   force the start of the election process
2757  */
2758 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2759                            struct ctdb_node_map_old *nodemap)
2760 {
2761         int ret;
2762         struct ctdb_context *ctdb = rec->ctdb;
2763
2764         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2765
2766         /* set all nodes to recovery mode to stop all internode traffic */
2767         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2768         if (ret != 0) {
2769                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2770                 return;
2771         }
2772
2773         talloc_free(rec->election_timeout);
2774         rec->election_timeout = tevent_add_timer(
2775                         ctdb->ev, ctdb,
2776                         fast_start ?
2777                                 timeval_current_ofs(0, 500000) :
2778                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2779                         ctdb_election_timeout, rec);
2780
2781         ret = send_election_request(rec, pnn);
2782         if (ret!=0) {
2783                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2784                 return;
2785         }
2786
2787         /* wait for a few seconds to collect all responses */
2788         ctdb_wait_election(rec);
2789 }
2790
2791
2792
2793 /*
2794   handler for when a node changes its flags
2795 */
2796 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2797 {
2798         struct ctdb_recoverd *rec = talloc_get_type(
2799                 private_data, struct ctdb_recoverd);
2800         struct ctdb_context *ctdb = rec->ctdb;
2801         int ret;
2802         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2803         struct ctdb_node_map_old *nodemap=NULL;
2804         TALLOC_CTX *tmp_ctx;
2805         int i;
2806         int disabled_flag_changed;
2807
2808         if (data.dsize != sizeof(*c)) {
2809                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2810                 return;
2811         }
2812
2813         tmp_ctx = talloc_new(ctdb);
2814         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2815
2816         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2817         if (ret != 0) {
2818                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2819                 talloc_free(tmp_ctx);
2820                 return;         
2821         }
2822
2823
2824         for (i=0;i<nodemap->num;i++) {
2825                 if (nodemap->nodes[i].pnn == c->pnn) break;
2826         }
2827
2828         if (i == nodemap->num) {
2829                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2830                 talloc_free(tmp_ctx);
2831                 return;
2832         }
2833
2834         if (c->old_flags != c->new_flags) {
2835                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2836         }
2837
2838         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2839
2840         nodemap->nodes[i].flags = c->new_flags;
2841
2842         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(),
2843                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2844
2845         if (ret == 0 &&
2846             rec->recmaster == ctdb->pnn &&
2847             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2848                 /* Only do the takeover run if the perm disabled or unhealthy
2849                    flags changed since these will cause an ip failover but not
2850                    a recovery.
2851                    If the node became disconnected or banned this will also
2852                    lead to an ip address failover but that is handled 
2853                    during recovery
2854                 */
2855                 if (disabled_flag_changed) {
2856                         rec->need_takeover_run = true;
2857                 }
2858         }
2859
2860         talloc_free(tmp_ctx);
2861 }
2862
2863 /*
2864   handler for when we need to push out flag changes ot all other nodes
2865 */
2866 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2867                                void *private_data)
2868 {
2869         struct ctdb_recoverd *rec = talloc_get_type(
2870                 private_data, struct ctdb_recoverd);
2871         struct ctdb_context *ctdb = rec->ctdb;
2872         int ret;
2873         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2874         struct ctdb_node_map_old *nodemap=NULL;
2875         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2876         uint32_t *nodes;
2877
2878         /* read the node flags from the recmaster */
2879         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2880                                    tmp_ctx, &nodemap);
2881         if (ret != 0) {
2882                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2883                 talloc_free(tmp_ctx);
2884                 return;
2885         }
2886         if (c->pnn >= nodemap->num) {
2887                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2888                 talloc_free(tmp_ctx);
2889                 return;
2890         }
2891
2892         /* send the flags update to all connected nodes */
2893         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2894
2895         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2896                                       nodes, 0, CONTROL_TIMEOUT(),
2897                                       false, data,
2898                                       NULL, NULL,
2899                                       NULL) != 0) {
2900                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2901
2902                 talloc_free(tmp_ctx);
2903                 return;
2904         }
2905
2906         talloc_free(tmp_ctx);
2907 }
2908
2909
2910 struct verify_recmode_normal_data {
2911         uint32_t count;
2912         enum monitor_result status;
2913 };
2914
2915 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2916 {
2917         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2918
2919
2920         /* one more node has responded with recmode data*/
2921         rmdata->count--;
2922
2923         /* if we failed to get the recmode, then return an error and let
2924            the main loop try again.
2925         */
2926         if (state->state != CTDB_CONTROL_DONE) {
2927                 if (rmdata->status == MONITOR_OK) {
2928                         rmdata->status = MONITOR_FAILED;
2929                 }
2930                 return;
2931         }
2932
2933         /* if we got a response, then the recmode will be stored in the
2934            status field
2935         */
2936         if (state->status != CTDB_RECOVERY_NORMAL) {
2937                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2938                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2939         }
2940
2941         return;
2942 }
2943
2944
2945 /* verify that all nodes are in normal recovery mode */
2946 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2947 {
2948         struct verify_recmode_normal_data *rmdata;
2949         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2950         struct ctdb_client_control_state *state;
2951         enum monitor_result status;
2952         int j;
2953         
2954         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2955         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2956         rmdata->count  = 0;
2957         rmdata->status = MONITOR_OK;
2958
2959         /* loop over all active nodes and send an async getrecmode call to 
2960            them*/
2961         for (j=0; j<nodemap->num; j++) {
2962                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2963                         continue;
2964                 }
2965                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2966                                         CONTROL_TIMEOUT(), 
2967                                         nodemap->nodes[j].pnn);
2968                 if (state == NULL) {
2969                         /* we failed to send the control, treat this as 
2970                            an error and try again next iteration
2971                         */                      
2972                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2973                         talloc_free(mem_ctx);
2974                         return MONITOR_FAILED;
2975                 }
2976
2977                 /* set up the callback functions */
2978                 state->async.fn = verify_recmode_normal_callback;
2979                 state->async.private_data = rmdata;
2980
2981                 /* one more control to wait for to complete */
2982                 rmdata->count++;
2983         }
2984
2985
2986         /* now wait for up to the maximum number of seconds allowed
2987            or until all nodes we expect a response from has replied
2988         */
2989         while (rmdata->count > 0) {
2990                 tevent_loop_once(ctdb->ev);
2991         }
2992
2993         status = rmdata->status;
2994         talloc_free(mem_ctx);
2995         return status;
2996 }
2997
2998
2999 struct verify_recmaster_data {
3000         struct ctdb_recoverd *rec;
3001         uint32_t count;
3002         uint32_t pnn;
3003         enum monitor_result status;
3004 };
3005
3006 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3007 {
3008         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3009
3010
3011         /* one more node has responded with recmaster data*/
3012         rmdata->count--;
3013
3014         /* if we failed to get the recmaster, then return an error and let
3015            the main loop try again.
3016         */
3017         if (state->state != CTDB_CONTROL_DONE) {
3018                 if (rmdata->status == MONITOR_OK) {
3019                         rmdata->status = MONITOR_FAILED;
3020                 }
3021                 return;
3022         }
3023
3024         /* if we got a response, then the recmaster will be stored in the
3025            status field
3026         */
3027         if (state->status != rmdata->pnn) {
3028                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3029                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3030                 rmdata->status = MONITOR_ELECTION_NEEDED;
3031         }
3032
3033         return;
3034 }
3035
3036
3037 /* verify that all nodes agree that we are the recmaster */
3038 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3039 {
3040         struct ctdb_context *ctdb = rec->ctdb;
3041         struct verify_recmaster_data *rmdata;
3042         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3043         struct ctdb_client_control_state *state;
3044         enum monitor_result status;
3045         int j;
3046         
3047         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3048         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3049         rmdata->rec    = rec;
3050         rmdata->count  = 0;
3051         rmdata->pnn    = pnn;
3052         rmdata->status = MONITOR_OK;
3053
3054         /* loop over all active nodes and send an async getrecmaster call to
3055            them*/
3056         for (j=0; j<nodemap->num; j++) {
3057                 if (nodemap->nodes[j].pnn == rec->recmaster) {
3058                         continue;
3059                 }
3060                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3061                         continue;
3062                 }
3063                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3064                                         CONTROL_TIMEOUT(),
3065                                         nodemap->nodes[j].pnn);
3066                 if (state == NULL) {
3067                         /* we failed to send the control, treat this as 
3068                            an error and try again next iteration
3069                         */                      
3070                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3071                         talloc_free(mem_ctx);
3072                         return MONITOR_FAILED;
3073                 }
3074
3075                 /* set up the callback functions */
3076                 state->async.fn = verify_recmaster_callback;
3077                 state->async.private_data = rmdata;
3078
3079                 /* one more control to wait for to complete */
3080                 rmdata->count++;
3081         }
3082
3083
3084         /* now wait for up to the maximum number of seconds allowed
3085            or until all nodes we expect a response from has replied
3086         */
3087         while (rmdata->count > 0) {
3088                 tevent_loop_once(ctdb->ev);
3089         }
3090
3091         status = rmdata->status;
3092         talloc_free(mem_ctx);
3093         return status;
3094 }
3095
3096 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3097                                     struct ctdb_recoverd *rec)
3098 {
3099         struct ctdb_iface_list_old *ifaces = NULL;
3100         TALLOC_CTX *mem_ctx;
3101         bool ret = false;
3102
3103         mem_ctx = talloc_new(NULL);
3104
3105         /* Read the interfaces from the local node */
3106         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3107                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3108                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3109                 /* We could return an error.  However, this will be
3110                  * rare so we'll decide that the interfaces have
3111                  * actually changed, just in case.
3112                  */
3113                 talloc_free(mem_ctx);
3114                 return true;
3115         }
3116
3117         if (!rec->ifaces) {
3118                 /* We haven't been here before so things have changed */
3119                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3120                 ret = true;
3121         } else if (rec->ifaces->num != ifaces->num) {
3122                 /* Number of interfaces has changed */
3123                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3124                                      rec->ifaces->num, ifaces->num));
3125                 ret = true;
3126         } else {
3127                 /* See if interface names or link states have changed */
3128                 int i;
3129                 for (i = 0; i < rec->ifaces->num; i++) {
3130                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
3131                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3132                                 DEBUG(DEBUG_NOTICE,
3133                                       ("Interface in slot %d changed: %s => %s\n",
3134                                        i, iface->name, ifaces->ifaces[i].name));
3135                                 ret = true;
3136                                 break;
3137                         }
3138                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3139                                 DEBUG(DEBUG_NOTICE,
3140                                       ("Interface %s changed state: %d => %d\n",
3141                                        iface->name, iface->link_state,
3142                                        ifaces->ifaces[i].link_state));
3143                                 ret = true;
3144                                 break;
3145                         }
3146                 }
3147         }
3148
3149         talloc_free(rec->ifaces);
3150         rec->ifaces = talloc_steal(rec, ifaces);
3151
3152         talloc_free(mem_ctx);
3153         return ret;
3154 }
3155
3156 /* called to check that the local allocation of public ip addresses is ok.
3157 */
3158 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
3159 {
3160         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3161         int ret, j;
3162         bool need_takeover_run = false;
3163
3164         if (interfaces_have_changed(ctdb, rec)) {
3165                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3166                                      "local node %u - force takeover run\n",
3167                                      pnn));
3168                 need_takeover_run = true;
3169         }
3170
3171         /* verify that we have the ip addresses we should have
3172            and we don't have ones we shouldnt have.
3173            if we find an inconsistency we set recmode to
3174            active on the local node and wait for the recmaster
3175            to do a full blown recovery.
3176            also if the pnn is -1 and we are healthy and can host the ip
3177            we also request a ip reallocation.
3178         */
3179         if (ctdb->tunable.disable_ip_failover == 0) {
3180                 struct ctdb_public_ip_list_old *ips = NULL;
3181
3182                 /* read the *available* IPs from the local node */
3183                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3184                 if (ret != 0) {
3185                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3186                         talloc_free(mem_ctx);
3187                         return -1;
3188                 }
3189
3190                 for (j=0; j<ips->num; j++) {
3191                         if (ips->ips[j].pnn == -1 &&
3192                             nodemap->nodes[pnn].flags == 0) {
3193                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3194                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3195                                 need_takeover_run = true;
3196                         }
3197                 }
3198
3199                 talloc_free(ips);
3200
3201                 /* read the *known* IPs from the local node */
3202                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3203                 if (ret != 0) {
3204                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3205                         talloc_free(mem_ctx);
3206                         return -1;
3207                 }
3208
3209                 for (j=0; j<ips->num; j++) {
3210                         if (ips->ips[j].pnn == pnn) {
3211                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3212                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3213                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3214                                         need_takeover_run = true;
3215                                 }
3216                         } else {
3217                                 if (ctdb->do_checkpublicip &&
3218                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3219
3220                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3221                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3222
3223                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3224                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3225                                         }
3226                                 }
3227                         }
3228                 }
3229         }
3230
3231         if (need_takeover_run) {
3232                 struct ctdb_srvid_message rd;
3233                 TDB_DATA data;
3234
3235                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3236
3237                 rd.pnn = ctdb->pnn;
3238                 rd.srvid = 0;
3239                 data.dptr = (uint8_t *)&rd;
3240                 data.dsize = sizeof(rd);
3241
3242                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3243                 if (ret != 0) {
3244                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3245                 }
3246         }
3247         talloc_free(mem_ctx);
3248         return 0;
3249 }
3250
3251
3252 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3253 {
3254         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3255
3256         if (node_pnn >= ctdb->num_nodes) {
3257                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3258                 return;
3259         }
3260
3261         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3262
3263 }
3264
3265 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3266         struct ctdb_node_map_old *nodemap,
3267         struct ctdb_node_map_old **remote_nodemaps)
3268 {
3269         uint32_t *nodes;
3270
3271         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3272         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3273                                         nodes, 0,
3274                                         CONTROL_TIMEOUT(), false, tdb_null,
3275                                         async_getnodemap_callback,
3276                                         NULL,
3277                                         remote_nodemaps) != 0) {
3278                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3279
3280                 return -1;
3281         }
3282
3283         return 0;
3284 }
3285
3286 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3287 {
3288         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3289         const char *reclockfile;
3290
3291         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3292                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3293                 talloc_free(tmp_ctx);
3294                 return -1;      
3295         }
3296
3297         if (reclockfile == NULL) {
3298                 if (ctdb->recovery_lock_file != NULL) {
3299                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3300                         talloc_free(ctdb->recovery_lock_file);
3301                         ctdb->recovery_lock_file = NULL;
3302                         ctdb_recovery_unlock(ctdb);
3303                 }
3304                 talloc_free(tmp_ctx);
3305                 return 0;
3306         }
3307
3308         if (ctdb->recovery_lock_file == NULL) {
3309                 DEBUG(DEBUG_NOTICE,
3310                       ("Recovery lock file enabled (%s)\n", reclockfile));
3311                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3312                 ctdb_recovery_unlock(ctdb);
3313                 talloc_free(tmp_ctx);
3314                 return 0;
3315         }
3316
3317
3318         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3319                 talloc_free(tmp_ctx);
3320                 return 0;
3321         }
3322
3323         DEBUG(DEBUG_NOTICE,
3324               ("Recovery lock file changed (now %s)\n", reclockfile));
3325         talloc_free(ctdb->recovery_lock_file);
3326         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3327         ctdb_recovery_unlock(ctdb);
3328
3329         talloc_free(tmp_ctx);
3330         return 0;
3331 }
3332
3333 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3334                       TALLOC_CTX *mem_ctx)
3335 {
3336         uint32_t pnn;
3337         struct ctdb_node_map_old *nodemap=NULL;
3338         struct ctdb_node_map_old *recmaster_nodemap=NULL;
3339         struct ctdb_node_map_old **remote_nodemaps=NULL;
3340         struct ctdb_vnn_map *vnnmap=NULL;
3341         struct ctdb_vnn_map *remote_vnnmap=NULL;
3342         uint32_t num_lmasters;
3343         int32_t debug_level;
3344         int i, j, ret;
3345         bool self_ban;
3346
3347
3348         /* verify that the main daemon is still running */
3349         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3350                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3351                 exit(-1);
3352         }
3353
3354         /* ping the local daemon to tell it we are alive */
3355         ctdb_ctrl_recd_ping(ctdb);
3356
3357         if (rec->election_timeout) {
3358                 /* an election is in progress */
3359                 return;
3360         }
3361
3362         /* read the debug level from the parent and update locally */
3363         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3364         if (ret !=0) {
3365                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3366                 return;
3367         }
3368         DEBUGLEVEL = debug_level;
3369
3370         /* get relevant tunables */
3371         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3372         if (ret != 0) {
3373                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3374                 return;
3375         }
3376
3377         /* get runstate */
3378         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3379                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3380         if (ret != 0) {
3381                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3382                 return;
3383         }
3384
3385         /* get the current recovery lock file from the server */
3386         if (update_recovery_lock_file(ctdb) != 0) {
3387                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3388                 return;
3389         }
3390
3391         pnn = ctdb_get_pnn(ctdb);
3392
3393         /* get nodemap */
3394         TALLOC_FREE(rec->nodemap);
3395         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3396         if (ret != 0) {
3397                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3398                 return;
3399         }
3400         nodemap = rec->nodemap;
3401
3402         /* remember our own node flags */
3403         rec->node_flags = nodemap->nodes[pnn].flags;
3404
3405         ban_misbehaving_nodes(rec, &self_ban);
3406         if (self_ban) {
3407                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3408                 return;
3409         }
3410
3411         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3412            also frozen and that the recmode is set to active.
3413         */
3414         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3415                 /* If this node has become inactive then we want to
3416                  * reduce the chances of it taking over the recovery
3417                  * master role when it becomes active again.  This
3418                  * helps to stabilise the recovery master role so that
3419                  * it stays on the most stable node.
3420                  */
3421                 rec->priority_time = timeval_current();
3422
3423                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3424                 if (ret != 0) {
3425                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3426                 }
3427                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3428                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3429
3430                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3431                         if (ret != 0) {
3432                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3433
3434                                 return;
3435                         }
3436                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3437                         if (ret != 0) {
3438                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3439                                 return;
3440                         }
3441                 }
3442
3443                 /* If this node is stopped or banned then it is not the recovery
3444                  * master, so don't do anything. This prevents stopped or banned
3445                  * node from starting election and sending unnecessary controls.
3446                  */
3447                 return;
3448         }
3449
3450         /* If we are not the recmaster then do some housekeeping */
3451         if (rec->recmaster != pnn) {
3452                 /* Ignore any IP reallocate requests - only recmaster
3453                  * processes them
3454                  */
3455                 TALLOC_FREE(rec->reallocate_requests);
3456                 /* Clear any nodes that should be force rebalanced in
3457                  * the next takeover run.  If the recovery master role
3458                  * has moved then we don't want to process these some
3459                  * time in the future.
3460                  */
3461                 TALLOC_FREE(rec->force_rebalance_nodes);
3462         }
3463
3464         /* Retrieve capabilities from all connected nodes */
3465         ret = update_capabilities(rec, nodemap);
3466         if (ret != 0) {
3467                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3468                 return;
3469         }
3470
3471         /* When recovery daemon is started, recmaster is set to
3472          * "unknown" so it knows to start an election.
3473          */
3474         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
3475                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3476                 force_election(rec, pnn, nodemap);
3477                 return;
3478         }
3479
3480         /*
3481          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3482          * but we have, then force an election and try to become the new
3483          * recmaster.
3484          */
3485         if (!ctdb_node_has_capabilities(rec->caps,
3486                                         rec->recmaster,
3487                                         CTDB_CAP_RECMASTER) &&
3488             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3489             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3490                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3491                                   " but we (node %u) have - force an election\n",
3492                                   rec->recmaster, pnn));
3493                 force_election(rec, pnn, nodemap);
3494                 return;
3495         }
3496
3497         /* Verify that the master node has not been deleted.  This
3498          * should not happen because a node should always be shutdown
3499          * before being deleted, causing a new master to be elected
3500          * before now.  However, if something strange has happened
3501          * then checking here will ensure we don't index beyond the
3502          * end of the nodemap array. */
3503         if (rec->recmaster >= nodemap->num) {
3504                 DEBUG(DEBUG_ERR,
3505                       ("Recmaster node %u has been deleted. Force election\n",
3506                        rec->recmaster));
3507                 force_election(rec, pnn, nodemap);
3508                 return;
3509         }
3510
3511         /* if recovery master is disconnected/deleted we must elect a new recmaster */
3512         if (nodemap->nodes[rec->recmaster].flags &
3513             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
3514                 DEBUG(DEBUG_NOTICE,
3515                       ("Recmaster node %u is disconnected/deleted. Force election\n",
3516                        rec->recmaster));
3517                 force_election(rec, pnn, nodemap);
3518                 return;
3519         }
3520
3521         /* get nodemap from the recovery master to check if it is inactive */
3522         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
3523                                    mem_ctx, &recmaster_nodemap);
3524         if (ret != 0) {
3525                 DEBUG(DEBUG_ERR,
3526                       (__location__
3527                        " Unable to get nodemap from recovery master %u\n",
3528                           rec->recmaster));
3529                 return;
3530         }
3531
3532
3533         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
3534             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3535                 DEBUG(DEBUG_NOTICE,
3536                       ("Recmaster node %u is inactive. Force election\n",
3537                        rec->recmaster));
3538                 /*
3539                  * update our nodemap to carry the recmaster's notion of
3540                  * its own flags, so that we don't keep freezing the
3541                  * inactive recmaster node...
3542                  */
3543                 nodemap->nodes[rec->recmaster].flags =
3544                         recmaster_nodemap->nodes[rec->recmaster].flags;
3545                 force_election(rec, pnn, nodemap);
3546                 return;
3547         }
3548
3549         /* verify that we have all ip addresses we should have and we dont
3550          * have addresses we shouldnt have.
3551          */ 
3552         if (ctdb->tunable.disable_ip_failover == 0 &&
3553             !ctdb_op_is_disabled(rec->takeover_run)) {
3554                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3555                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3556                 }
3557         }
3558
3559
3560         /* if we are not the recmaster then we do not need to check
3561            if recovery is needed
3562          */
3563         if (pnn != rec->recmaster) {
3564                 return;
3565         }
3566
3567
3568         /* ensure our local copies of flags are right */
3569         ret = update_local_flags(rec, nodemap);
3570         if (ret == MONITOR_ELECTION_NEEDED) {
3571                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3572                 force_election(rec, pnn, nodemap);
3573                 return;
3574         }
3575         if (ret != MONITOR_OK) {
3576                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3577                 return;
3578         }
3579
3580         if (ctdb->num_nodes != nodemap->num) {
3581                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3582                 ctdb_load_nodes_file(ctdb);
3583                 return;
3584         }
3585
3586         /* verify that all active nodes agree that we are the recmaster */
3587         switch (verify_recmaster(rec, nodemap, pnn)) {
3588         case MONITOR_RECOVERY_NEEDED:
3589                 /* can not happen */
3590                 return;
3591         case MONITOR_ELECTION_NEEDED:
3592                 force_election(rec, pnn, nodemap);
3593                 return;
3594         case MONITOR_OK:
3595                 break;
3596         case MONITOR_FAILED:
3597                 return;
3598         }
3599
3600
3601         /* get the vnnmap */
3602         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3603         if (ret != 0) {
3604                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3605                 return;
3606         }
3607
3608         if (rec->need_recovery) {
3609                 /* a previous recovery didn't finish */
3610                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3611                 return;
3612         }
3613
3614         /* verify that all active nodes are in normal mode 
3615            and not in recovery mode 
3616         */
3617         switch (verify_recmode(ctdb, nodemap)) {
3618         case MONITOR_RECOVERY_NEEDED:
3619                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3620                 return;
3621         case MONITOR_FAILED:
3622                 return;
3623         case MONITOR_ELECTION_NEEDED:
3624                 /* can not happen */
3625         case MONITOR_OK:
3626                 break;
3627         }
3628
3629
3630         if (ctdb->recovery_lock_file != NULL) {
3631                 /* We must already hold the recovery lock */
3632                 if (!ctdb_recovery_have_lock(ctdb)) {
3633                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3634                         ctdb_set_culprit(rec, ctdb->pnn);
3635                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3636                         return;
3637                 }
3638         }
3639
3640
3641         /* if there are takeovers requested, perform it and notify the waiters */
3642         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3643             rec->reallocate_requests) {
3644                 process_ipreallocate_requests(ctdb, rec);
3645         }
3646
3647         /* If recoveries are disabled then there is no use doing any
3648          * nodemap or flags checks.  Recoveries might be disabled due
3649          * to "reloadnodes", so doing these checks might cause an
3650          * unnecessary recovery.  */
3651         if (ctdb_op_is_disabled(rec->recovery)) {
3652                 return;
3653         }
3654
3655         /* get the nodemap for all active remote nodes
3656          */
3657         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3658         if (remote_nodemaps == NULL) {
3659                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3660                 return;
3661         }
3662         for(i=0; i<nodemap->num; i++) {
3663                 remote_nodemaps[i] = NULL;
3664         }
3665         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3666                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3667                 return;
3668         } 
3669
3670         /* verify that all other nodes have the same nodemap as we have
3671         */
3672         for (j=0; j<nodemap->num; j++) {
3673                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3674                         continue;
3675                 }
3676
3677                 if (remote_nodemaps[j] == NULL) {
3678                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3679                         ctdb_set_culprit(rec, j);
3680
3681                         return;
3682                 }
3683
3684                 /* if the nodes disagree on how many nodes there are
3685                    then this is a good reason to try recovery
3686                  */
3687                 if (remote_nodemaps[j]->num != nodemap->num) {
3688                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3689                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3690                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3691                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3692                         return;
3693                 }
3694
3695                 /* if the nodes disagree on which nodes exist and are
3696                    active, then that is also a good reason to do recovery
3697                  */
3698                 for (i=0;i<nodemap->num;i++) {
3699                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3700                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3701                                           nodemap->nodes[j].pnn, i, 
3702                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3703                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3704                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3705                                             vnnmap);
3706                                 return;
3707                         }
3708                 }
3709         }
3710
3711         /*
3712          * Update node flags obtained from each active node. This ensure we have
3713          * up-to-date information for all the nodes.
3714          */
3715         for (j=0; j<nodemap->num; j++) {
3716                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3717                         continue;
3718                 }
3719                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3720         }
3721
3722         for (j=0; j<nodemap->num; j++) {
3723                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3724                         continue;
3725                 }
3726
3727                 /* verify the flags are consistent
3728                 */
3729                 for (i=0; i<nodemap->num; i++) {
3730                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3731                                 continue;
3732                         }
3733                         
3734                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3735                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3736                                   nodemap->nodes[j].pnn, 
3737                                   nodemap->nodes[i].pnn, 
3738                                   remote_nodemaps[j]->nodes[i].flags,
3739                                   nodemap->nodes[i].flags));
3740                                 if (i == j) {
3741                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3742                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3743                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3744                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3745                                                     vnnmap);
3746                                         return;
3747                                 } else {
3748                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3749                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3750                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3751                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3752                                                     vnnmap);
3753                                         return;
3754                                 }
3755                         }
3756                 }
3757         }
3758
3759
3760         /* count how many active nodes there are */
3761         num_lmasters  = 0;
3762         for (i=0; i<nodemap->num; i++) {
3763                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3764                         if (ctdb_node_has_capabilities(rec->caps,
3765                                                        ctdb->nodes[i]->pnn,
3766                                                        CTDB_CAP_LMASTER)) {
3767                                 num_lmasters++;
3768                         }
3769                 }
3770         }
3771
3772
3773         /* There must be the same number of lmasters in the vnn map as
3774          * there are active nodes with the lmaster capability...  or
3775          * do a recovery.
3776          */
3777         if (vnnmap->size != num_lmasters) {
3778                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3779                           vnnmap->size, num_lmasters));
3780                 ctdb_set_culprit(rec, ctdb->pnn);
3781                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3782                 return;
3783         }
3784
3785         /* verify that all active nodes in the nodemap also exist in 
3786            the vnnmap.
3787          */
3788         for (j=0; j<nodemap->num; j++) {
3789                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3790                         continue;
3791                 }
3792                 if (nodemap->nodes[j].pnn == pnn) {
3793                         continue;
3794                 }
3795
3796                 for (i=0; i<vnnmap->size; i++) {
3797                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3798                                 break;
3799                         }
3800                 }
3801                 if (i == vnnmap->size) {
3802                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3803                                   nodemap->nodes[j].pnn));
3804                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3805                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3806                         return;
3807                 }
3808         }
3809
3810         
3811         /* verify that all other nodes have the same vnnmap
3812            and are from the same generation
3813          */
3814         for (j=0; j<nodemap->num; j++) {
3815                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3816                         continue;
3817                 }
3818                 if (nodemap->nodes[j].pnn == pnn) {
3819                         continue;
3820                 }
3821
3822                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3823                                           mem_ctx, &remote_vnnmap);
3824                 if (ret != 0) {
3825                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3826                                   nodemap->nodes[j].pnn));
3827                         return;
3828                 }
3829
3830                 /* verify the vnnmap generation is the same */
3831                 if (vnnmap->generation != remote_vnnmap->generation) {
3832                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3833                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3834                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3835                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3836                         return;
3837                 }
3838
3839                 /* verify the vnnmap size is the same */
3840                 if (vnnmap->size != remote_vnnmap->size) {
3841                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3842                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3843                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3844                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3845                         return;
3846                 }
3847
3848                 /* verify the vnnmap is the same */
3849                 for (i=0;i<vnnmap->size;i++) {
3850                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3851                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3852                                           nodemap->nodes[j].pnn));
3853                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3854                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3855                                             vnnmap);
3856                                 return;
3857                         }
3858                 }
3859         }
3860
3861         /* we might need to change who has what IP assigned */
3862         if (rec->need_takeover_run) {
3863                 /* If takeover run fails, then the offending nodes are
3864                  * assigned ban culprit counts. And we re-try takeover.
3865                  * If takeover run fails repeatedly, the node would get
3866                  * banned.
3867                  */
3868                 do_takeover_run(rec, nodemap, true);
3869         }
3870 }
3871
3872 /*
3873   the main monitoring loop
3874  */
3875 static void monitor_cluster(struct ctdb_context *ctdb)
3876 {
3877         struct ctdb_recoverd *rec;
3878
3879         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3880
3881         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3882         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3883
3884         rec->ctdb = ctdb;
3885         rec->recmaster = CTDB_UNKNOWN_PNN;
3886
3887         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3888         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3889
3890         rec->recovery = ctdb_op_init(rec, "recoveries");
3891         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3892
3893         rec->priority_time = timeval_current();
3894
3895         /* register a message port for sending memory dumps */
3896         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3897
3898         /* register a message port for recovery elections */
3899         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3900
3901         /* when nodes are disabled/enabled */
3902         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3903
3904         /* when we are asked to puch out a flag change */
3905         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3906
3907         /* register a message port for vacuum fetch */
3908         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3909
3910         /* register a message port for reloadnodes  */
3911         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3912
3913         /* register a message port for performing a takeover run */
3914         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3915
3916         /* register a message port for disabling the ip check for a short while */
3917         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3918
3919         /* register a message port for updating the recovery daemons node assignment for an ip */
3920         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3921
3922         /* register a message port for forcing a rebalance of a node next
3923            reallocation */
3924         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3925
3926         /* Register a message port for disabling takeover runs */
3927         ctdb_client_set_message_handler(ctdb,
3928                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3929                                         disable_takeover_runs_handler, rec);
3930
3931         /* Register a message port for disabling recoveries */
3932         ctdb_client_set_message_handler(ctdb,
3933                                         CTDB_SRVID_DISABLE_RECOVERIES,
3934                                         disable_recoveries_handler, rec);
3935
3936         /* register a message port for detaching database */
3937         ctdb_client_set_message_handler(ctdb,
3938                                         CTDB_SRVID_DETACH_DATABASE,
3939                                         detach_database_handler, rec);
3940
3941         for (;;) {
3942                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3943                 struct timeval start;
3944                 double elapsed;
3945
3946                 if (!mem_ctx) {
3947                         DEBUG(DEBUG_CRIT,(__location__
3948                                           " Failed to create temp context\n"));
3949                         exit(-1);
3950                 }
3951
3952                 start = timeval_current();
3953                 main_loop(ctdb, rec, mem_ctx);
3954                 talloc_free(mem_ctx);
3955
3956                 /* we only check for recovery once every second */
3957                 elapsed = timeval_elapsed(&start);
3958                 if (elapsed < ctdb->tunable.recover_interval) {
3959                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3960                                           - elapsed);
3961                 }
3962         }
3963 }
3964
3965 /*
3966   event handler for when the main ctdbd dies
3967  */
3968 static void ctdb_recoverd_parent(struct tevent_context *ev,
3969                                  struct tevent_fd *fde,
3970                                  uint16_t flags, void *private_data)
3971 {
3972         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3973         _exit(1);
3974 }
3975
3976 /*
3977   called regularly to verify that the recovery daemon is still running
3978  */
3979 static void ctdb_check_recd(struct tevent_context *ev,
3980                             struct tevent_timer *te,
3981                             struct timeval yt, void *p)
3982 {
3983         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3984
3985         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3986                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3987
3988                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
3989                                  ctdb_restart_recd, ctdb);
3990
3991                 return;
3992         }
3993
3994         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
3995                          timeval_current_ofs(30, 0),
3996                          ctdb_check_recd, ctdb);
3997 }
3998
3999 static void recd_sig_child_handler(struct tevent_context *ev,
4000                                    struct tevent_signal *se, int signum,
4001                                    int count, void *dont_care,
4002                                    void *private_data)
4003 {
4004 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4005         int status;
4006         pid_t pid = -1;
4007
4008         while (pid != 0) {
4009                 pid = waitpid(-1, &status, WNOHANG);
4010                 if (pid == -1) {
4011                         if (errno != ECHILD) {
4012                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4013                         }
4014                         return;
4015                 }
4016                 if (pid > 0) {
4017                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4018                 }
4019         }
4020 }
4021
4022 /*
4023   startup the recovery daemon as a child of the main ctdb daemon
4024  */
4025 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4026 {
4027         int fd[2];
4028         struct tevent_signal *se;
4029         struct tevent_fd *fde;
4030
4031         if (pipe(fd) != 0) {
4032                 return -1;
4033         }
4034
4035         ctdb->recoverd_pid = ctdb_fork(ctdb);
4036         if (ctdb->recoverd_pid == -1) {
4037                 return -1;
4038         }
4039
4040         if (ctdb->recoverd_pid != 0) {
4041                 talloc_free(ctdb->recd_ctx);
4042                 ctdb->recd_ctx = talloc_new(ctdb);
4043                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4044
4045                 close(fd[0]);
4046                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4047                                  timeval_current_ofs(30, 0),
4048                                  ctdb_check_recd, ctdb);
4049                 return 0;
4050         }
4051
4052         close(fd[1]);
4053
4054         srandom(getpid() ^ time(NULL));
4055
4056         ctdb_set_process_name("ctdb_recovered");
4057         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4058                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4059                 exit(1);
4060         }
4061
4062         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4063
4064         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4065                             ctdb_recoverd_parent, &fd[0]);
4066         tevent_fd_set_auto_close(fde);
4067
4068         /* set up a handler to pick up sigchld */
4069         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4070                                recd_sig_child_handler, ctdb);
4071         if (se == NULL) {
4072                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4073                 exit(1);
4074         }
4075
4076         monitor_cluster(ctdb);
4077
4078         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4079         return -1;
4080 }
4081
4082 /*
4083   shutdown the recovery daemon
4084  */
4085 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4086 {
4087         if (ctdb->recoverd_pid == 0) {
4088                 return;
4089         }
4090
4091         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4092         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4093
4094         TALLOC_FREE(ctdb->recd_ctx);
4095         TALLOC_FREE(ctdb->recd_ping_count);
4096 }
4097
4098 static void ctdb_restart_recd(struct tevent_context *ev,
4099                               struct tevent_timer *te,
4100                               struct timeval t, void *private_data)
4101 {
4102         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4103
4104         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4105         ctdb_stop_recoverd(ctdb);
4106         ctdb_start_recoverd(ctdb);
4107 }