c9f19fa8870ed1df6644b5bd0b6f79bea045ff5b
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35 #include "lib/util/util_process.h"
36
37 #include "ctdb_private.h"
38 #include "ctdb_client.h"
39
40 #include "common/system.h"
41 #include "common/cmdline.h"
42 #include "common/common.h"
43 #include "common/logging.h"
44
45
46 /* List of SRVID requests that need to be processed */
47 struct srvid_list {
48         struct srvid_list *next, *prev;
49         struct ctdb_srvid_message *request;
50 };
51
52 struct srvid_requests {
53         struct srvid_list *requests;
54 };
55
56 static void srvid_request_reply(struct ctdb_context *ctdb,
57                                 struct ctdb_srvid_message *request,
58                                 TDB_DATA result)
59 {
60         /* Someone that sent srvid==0 does not want a reply */
61         if (request->srvid == 0) {
62                 talloc_free(request);
63                 return;
64         }
65
66         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
67                                      result) == 0) {
68                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
69                                   (unsigned)request->pnn,
70                                   (unsigned long long)request->srvid));
71         } else {
72                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
73                                  (unsigned)request->pnn,
74                                  (unsigned long long)request->srvid));
75         }
76
77         talloc_free(request);
78 }
79
80 static void srvid_requests_reply(struct ctdb_context *ctdb,
81                                  struct srvid_requests **requests,
82                                  TDB_DATA result)
83 {
84         struct srvid_list *r;
85
86         for (r = (*requests)->requests; r != NULL; r = r->next) {
87                 srvid_request_reply(ctdb, r->request, result);
88         }
89
90         /* Free the list structure... */
91         TALLOC_FREE(*requests);
92 }
93
94 static void srvid_request_add(struct ctdb_context *ctdb,
95                               struct srvid_requests **requests,
96                               struct ctdb_srvid_message *request)
97 {
98         struct srvid_list *t;
99         int32_t ret;
100         TDB_DATA result;
101
102         if (*requests == NULL) {
103                 *requests = talloc_zero(ctdb, struct srvid_requests);
104                 if (*requests == NULL) {
105                         goto nomem;
106                 }
107         }
108
109         t = talloc_zero(*requests, struct srvid_list);
110         if (t == NULL) {
111                 /* If *requests was just allocated above then free it */
112                 if ((*requests)->requests == NULL) {
113                         TALLOC_FREE(*requests);
114                 }
115                 goto nomem;
116         }
117
118         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
119         DLIST_ADD((*requests)->requests, t);
120
121         return;
122
123 nomem:
124         /* Failed to add the request to the list.  Send a fail. */
125         DEBUG(DEBUG_ERR, (__location__
126                           " Out of memory, failed to queue SRVID request\n"));
127         ret = -ENOMEM;
128         result.dsize = sizeof(ret);
129         result.dptr = (uint8_t *)&ret;
130         srvid_request_reply(ctdb, request, result);
131 }
132
133 /* An abstraction to allow an operation (takeover runs, recoveries,
134  * ...) to be disabled for a given timeout */
135 struct ctdb_op_state {
136         struct tevent_timer *timer;
137         bool in_progress;
138         const char *name;
139 };
140
141 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
142 {
143         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
144
145         if (state != NULL) {
146                 state->in_progress = false;
147                 state->name = name;
148         }
149
150         return state;
151 }
152
153 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
154 {
155         return state->timer != NULL;
156 }
157
158 static bool ctdb_op_begin(struct ctdb_op_state *state)
159 {
160         if (ctdb_op_is_disabled(state)) {
161                 DEBUG(DEBUG_NOTICE,
162                       ("Unable to begin - %s are disabled\n", state->name));
163                 return false;
164         }
165
166         state->in_progress = true;
167         return true;
168 }
169
170 static bool ctdb_op_end(struct ctdb_op_state *state)
171 {
172         return state->in_progress = false;
173 }
174
175 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
176 {
177         return state->in_progress;
178 }
179
180 static void ctdb_op_enable(struct ctdb_op_state *state)
181 {
182         TALLOC_FREE(state->timer);
183 }
184
185 static void ctdb_op_timeout_handler(struct tevent_context *ev,
186                                     struct tevent_timer *te,
187                                     struct timeval yt, void *p)
188 {
189         struct ctdb_op_state *state =
190                 talloc_get_type(p, struct ctdb_op_state);
191
192         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
193         ctdb_op_enable(state);
194 }
195
196 static int ctdb_op_disable(struct ctdb_op_state *state,
197                            struct tevent_context *ev,
198                            uint32_t timeout)
199 {
200         if (timeout == 0) {
201                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
202                 ctdb_op_enable(state);
203                 return 0;
204         }
205
206         if (state->in_progress) {
207                 DEBUG(DEBUG_ERR,
208                       ("Unable to disable %s - in progress\n", state->name));
209                 return -EAGAIN;
210         }
211
212         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
213                             state->name, timeout));
214
215         /* Clear any old timers */
216         talloc_free(state->timer);
217
218         /* Arrange for the timeout to occur */
219         state->timer = tevent_add_timer(ev, state,
220                                         timeval_current_ofs(timeout, 0),
221                                         ctdb_op_timeout_handler, state);
222         if (state->timer == NULL) {
223                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
224                 return -ENOMEM;
225         }
226
227         return 0;
228 }
229
230 struct ctdb_banning_state {
231         uint32_t count;
232         struct timeval last_reported_time;
233 };
234
235 /*
236   private state of recovery daemon
237  */
238 struct ctdb_recoverd {
239         struct ctdb_context *ctdb;
240         uint32_t recmaster;
241         uint32_t last_culprit_node;
242         struct ctdb_node_map_old *nodemap;
243         struct timeval priority_time;
244         bool need_takeover_run;
245         bool need_recovery;
246         uint32_t node_flags;
247         struct tevent_timer *send_election_te;
248         struct tevent_timer *election_timeout;
249         struct srvid_requests *reallocate_requests;
250         struct ctdb_op_state *takeover_run;
251         struct ctdb_op_state *recovery;
252         struct ctdb_iface_list_old *ifaces;
253         uint32_t *force_rebalance_nodes;
254         struct ctdb_node_capabilities *caps;
255 };
256
257 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
258 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
259
260 static void ctdb_restart_recd(struct tevent_context *ev,
261                               struct tevent_timer *te, struct timeval t,
262                               void *private_data);
263
264 /*
265   ban a node for a period of time
266  */
267 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
268 {
269         int ret;
270         struct ctdb_context *ctdb = rec->ctdb;
271         struct ctdb_ban_state bantime;
272
273         if (!ctdb_validate_pnn(ctdb, pnn)) {
274                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
275                 return;
276         }
277
278         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
279
280         bantime.pnn  = pnn;
281         bantime.time = ban_time;
282
283         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
284         if (ret != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
286                 return;
287         }
288
289 }
290
291 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
292
293
294 /*
295   remember the trouble maker
296  */
297 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
298 {
299         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
300         struct ctdb_banning_state *ban_state;
301
302         if (culprit > ctdb->num_nodes) {
303                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
304                 return;
305         }
306
307         /* If we are banned or stopped, do not set other nodes as culprits */
308         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
309                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
310                 return;
311         }
312
313         if (ctdb->nodes[culprit]->ban_state == NULL) {
314                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
315                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
316
317                 
318         }
319         ban_state = ctdb->nodes[culprit]->ban_state;
320         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
321                 /* this was the first time in a long while this node
322                    misbehaved so we will forgive any old transgressions.
323                 */
324                 ban_state->count = 0;
325         }
326
327         ban_state->count += count;
328         ban_state->last_reported_time = timeval_current();
329         rec->last_culprit_node = culprit;
330 }
331
332 /*
333   remember the trouble maker
334  */
335 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
336 {
337         ctdb_set_culprit_count(rec, culprit, 1);
338 }
339
340
341 /* this callback is called for every node that failed to execute the
342    recovered event
343 */
344 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
345 {
346         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
347
348         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
349
350         ctdb_set_culprit(rec, node_pnn);
351 }
352
353 /*
354   run the "recovered" eventscript on all nodes
355  */
356 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
357 {
358         TALLOC_CTX *tmp_ctx;
359         uint32_t *nodes;
360         struct ctdb_context *ctdb = rec->ctdb;
361
362         tmp_ctx = talloc_new(ctdb);
363         CTDB_NO_MEMORY(ctdb, tmp_ctx);
364
365         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
366         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
367                                         nodes, 0,
368                                         CONTROL_TIMEOUT(), false, tdb_null,
369                                         NULL, recovered_fail_callback,
370                                         rec) != 0) {
371                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
372
373                 talloc_free(tmp_ctx);
374                 return -1;
375         }
376
377         talloc_free(tmp_ctx);
378         return 0;
379 }
380
381 /* this callback is called for every node that failed to execute the
382    start recovery event
383 */
384 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
385 {
386         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
387
388         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
389
390         ctdb_set_culprit(rec, node_pnn);
391 }
392
393 /*
394   run the "startrecovery" eventscript on all nodes
395  */
396 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
397 {
398         TALLOC_CTX *tmp_ctx;
399         uint32_t *nodes;
400         struct ctdb_context *ctdb = rec->ctdb;
401
402         tmp_ctx = talloc_new(ctdb);
403         CTDB_NO_MEMORY(ctdb, tmp_ctx);
404
405         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
406         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
407                                         nodes, 0,
408                                         CONTROL_TIMEOUT(), false, tdb_null,
409                                         NULL,
410                                         startrecovery_fail_callback,
411                                         rec) != 0) {
412                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
413                 talloc_free(tmp_ctx);
414                 return -1;
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   Retrieve capabilities from all connected nodes
423  */
424 static int update_capabilities(struct ctdb_recoverd *rec,
425                                struct ctdb_node_map_old *nodemap)
426 {
427         uint32_t *capp;
428         TALLOC_CTX *tmp_ctx;
429         struct ctdb_node_capabilities *caps;
430         struct ctdb_context *ctdb = rec->ctdb;
431
432         tmp_ctx = talloc_new(rec);
433         CTDB_NO_MEMORY(ctdb, tmp_ctx);
434
435         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
436                                      CONTROL_TIMEOUT(), nodemap);
437
438         if (caps == NULL) {
439                 DEBUG(DEBUG_ERR,
440                       (__location__ " Failed to get node capabilities\n"));
441                 talloc_free(tmp_ctx);
442                 return -1;
443         }
444
445         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
446         if (capp == NULL) {
447                 DEBUG(DEBUG_ERR,
448                       (__location__
449                        " Capabilities don't include current node.\n"));
450                 talloc_free(tmp_ctx);
451                 return -1;
452         }
453         ctdb->capabilities = *capp;
454
455         TALLOC_FREE(rec->caps);
456         rec->caps = talloc_steal(rec, caps);
457
458         talloc_free(tmp_ctx);
459         return 0;
460 }
461
462 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
463 {
464         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
465
466         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
467         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
468 }
469
470 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
471 {
472         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
473
474         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
475         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
476 }
477
478 /*
479   change recovery mode on all nodes
480  */
481 static int set_recovery_mode(struct ctdb_context *ctdb,
482                              struct ctdb_recoverd *rec,
483                              struct ctdb_node_map_old *nodemap,
484                              uint32_t rec_mode, bool freeze)
485 {
486         TDB_DATA data;
487         uint32_t *nodes;
488         TALLOC_CTX *tmp_ctx;
489
490         tmp_ctx = talloc_new(ctdb);
491         CTDB_NO_MEMORY(ctdb, tmp_ctx);
492
493         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
494
495         data.dsize = sizeof(uint32_t);
496         data.dptr = (unsigned char *)&rec_mode;
497
498         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
499                                         nodes, 0,
500                                         CONTROL_TIMEOUT(),
501                                         false, data,
502                                         NULL, NULL,
503                                         NULL) != 0) {
504                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
505                 talloc_free(tmp_ctx);
506                 return -1;
507         }
508
509         /* freeze all nodes */
510         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
511                 int i;
512
513                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
514                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
515                                                 nodes, i,
516                                                 CONTROL_TIMEOUT(),
517                                                 false, tdb_null,
518                                                 NULL,
519                                                 set_recmode_fail_callback,
520                                                 rec) != 0) {
521                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
522                                 talloc_free(tmp_ctx);
523                                 return -1;
524                         }
525                 }
526         }
527
528         talloc_free(tmp_ctx);
529         return 0;
530 }
531
532 /* update all remote nodes to use the same db priority that we have
533    this can fail if the remove node has not yet been upgraded to 
534    support this function, so we always return success and never fail
535    a recovery if this call fails.
536 */
537 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
538         struct ctdb_node_map_old *nodemap, 
539         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
540 {
541         int db;
542
543         /* step through all local databases */
544         for (db=0; db<dbmap->num;db++) {
545                 struct ctdb_db_priority db_prio;
546                 int ret;
547
548                 db_prio.db_id     = dbmap->dbs[db].db_id;
549                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
550                 if (ret != 0) {
551                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
552                         continue;
553                 }
554
555                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
556
557                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
558                                                 CTDB_CURRENT_NODE, &db_prio);
559                 if (ret != 0) {
560                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
561                                          db_prio.db_id));
562                 }
563         }
564
565         return 0;
566 }                       
567
568 /*
569   ensure all other nodes have attached to any databases that we have
570  */
571 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
572                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
573 {
574         int i, j, db, ret;
575         struct ctdb_dbid_map_old *remote_dbmap;
576
577         /* verify that all other nodes have all our databases */
578         for (j=0; j<nodemap->num; j++) {
579                 /* we don't need to ourself ourselves */
580                 if (nodemap->nodes[j].pnn == pnn) {
581                         continue;
582                 }
583                 /* don't check nodes that are unavailable */
584                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
585                         continue;
586                 }
587
588                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
589                                          mem_ctx, &remote_dbmap);
590                 if (ret != 0) {
591                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
592                         return -1;
593                 }
594
595                 /* step through all local databases */
596                 for (db=0; db<dbmap->num;db++) {
597                         const char *name;
598
599
600                         for (i=0;i<remote_dbmap->num;i++) {
601                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
602                                         break;
603                                 }
604                         }
605                         /* the remote node already have this database */
606                         if (i!=remote_dbmap->num) {
607                                 continue;
608                         }
609                         /* ok so we need to create this database */
610                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
611                                                   dbmap->dbs[db].db_id, mem_ctx,
612                                                   &name);
613                         if (ret != 0) {
614                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
615                                 return -1;
616                         }
617                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
618                                                  nodemap->nodes[j].pnn,
619                                                  mem_ctx, name,
620                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
621                         if (ret != 0) {
622                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
623                                 return -1;
624                         }
625                 }
626         }
627
628         return 0;
629 }
630
631
632 /*
633   ensure we are attached to any databases that anyone else is attached to
634  */
635 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
636                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
637 {
638         int i, j, db, ret;
639         struct ctdb_dbid_map_old *remote_dbmap;
640
641         /* verify that we have all database any other node has */
642         for (j=0; j<nodemap->num; j++) {
643                 /* we don't need to ourself ourselves */
644                 if (nodemap->nodes[j].pnn == pnn) {
645                         continue;
646                 }
647                 /* don't check nodes that are unavailable */
648                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
649                         continue;
650                 }
651
652                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
653                                          mem_ctx, &remote_dbmap);
654                 if (ret != 0) {
655                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
656                         return -1;
657                 }
658
659                 /* step through all databases on the remote node */
660                 for (db=0; db<remote_dbmap->num;db++) {
661                         const char *name;
662
663                         for (i=0;i<(*dbmap)->num;i++) {
664                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
665                                         break;
666                                 }
667                         }
668                         /* we already have this db locally */
669                         if (i!=(*dbmap)->num) {
670                                 continue;
671                         }
672                         /* ok so we need to create this database and
673                            rebuild dbmap
674                          */
675                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
676                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
677                         if (ret != 0) {
678                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
679                                           nodemap->nodes[j].pnn));
680                                 return -1;
681                         }
682                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
683                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
684                         if (ret != 0) {
685                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
686                                 return -1;
687                         }
688                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
689                         if (ret != 0) {
690                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
691                                 return -1;
692                         }
693                 }
694         }
695
696         return 0;
697 }
698
699
700 /*
701   pull the remote database contents from one node into the recdb
702  */
703 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
704                                     struct tdb_wrap *recdb, uint32_t dbid)
705 {
706         int ret;
707         TDB_DATA outdata;
708         struct ctdb_marshall_buffer *reply;
709         struct ctdb_rec_data_old *recdata;
710         int i;
711         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
712
713         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
714                                CONTROL_TIMEOUT(), &outdata);
715         if (ret != 0) {
716                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
717                 talloc_free(tmp_ctx);
718                 return -1;
719         }
720
721         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
722
723         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
724                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
725                 talloc_free(tmp_ctx);
726                 return -1;
727         }
728
729         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
730
731         for (i=0;
732              i<reply->count;
733              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
734                 TDB_DATA key, data;
735                 struct ctdb_ltdb_header *hdr;
736                 TDB_DATA existing;
737
738                 key.dptr = &recdata->data[0];
739                 key.dsize = recdata->keylen;
740                 data.dptr = &recdata->data[key.dsize];
741                 data.dsize = recdata->datalen;
742
743                 hdr = (struct ctdb_ltdb_header *)data.dptr;
744
745                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
746                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
747                         talloc_free(tmp_ctx);
748                         return -1;
749                 }
750
751                 /* fetch the existing record, if any */
752                 existing = tdb_fetch(recdb->tdb, key);
753
754                 if (existing.dptr != NULL) {
755                         struct ctdb_ltdb_header header;
756                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
757                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
758                                          (unsigned)existing.dsize, srcnode));
759                                 free(existing.dptr);
760                                 talloc_free(tmp_ctx);
761                                 return -1;
762                         }
763                         header = *(struct ctdb_ltdb_header *)existing.dptr;
764                         free(existing.dptr);
765                         if (!(header.rsn < hdr->rsn ||
766                               (header.dmaster != ctdb_get_pnn(ctdb) &&
767                                header.rsn == hdr->rsn))) {
768                                 continue;
769                         }
770                 }
771
772                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
773                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
774                         talloc_free(tmp_ctx);
775                         return -1;
776                 }
777         }
778
779         talloc_free(tmp_ctx);
780
781         return 0;
782 }
783
784
785 struct pull_seqnum_cbdata {
786         int failed;
787         uint32_t pnn;
788         uint64_t seqnum;
789 };
790
791 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
792 {
793         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
794         uint64_t seqnum;
795
796         if (cb_data->failed != 0) {
797                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
798                 return;
799         }
800
801         if (res != 0) {
802                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
803                 cb_data->failed = 1;
804                 return;
805         }
806
807         if (outdata.dsize != sizeof(uint64_t)) {
808                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
809                 cb_data->failed = -1;
810                 return;
811         }
812
813         seqnum = *((uint64_t *)outdata.dptr);
814
815         if (seqnum > cb_data->seqnum ||
816             (cb_data->pnn == -1 && seqnum == 0)) {
817                 cb_data->seqnum = seqnum;
818                 cb_data->pnn = node_pnn;
819         }
820 }
821
822 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
823 {
824         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
825
826         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
827         cb_data->failed = 1;
828 }
829
830 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
831                                 struct ctdb_recoverd *rec, 
832                                 struct ctdb_node_map_old *nodemap, 
833                                 struct tdb_wrap *recdb, uint32_t dbid)
834 {
835         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
836         uint32_t *nodes;
837         TDB_DATA data;
838         uint32_t outdata[2];
839         struct pull_seqnum_cbdata *cb_data;
840
841         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
842
843         outdata[0] = dbid;
844         outdata[1] = 0;
845
846         data.dsize = sizeof(outdata);
847         data.dptr  = (uint8_t *)&outdata[0];
848
849         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
850         if (cb_data == NULL) {
851                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
852                 talloc_free(tmp_ctx);
853                 return -1;
854         }
855
856         cb_data->failed = 0;
857         cb_data->pnn    = -1;
858         cb_data->seqnum = 0;
859         
860         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
861         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
862                                         nodes, 0,
863                                         CONTROL_TIMEOUT(), false, data,
864                                         pull_seqnum_cb,
865                                         pull_seqnum_fail_cb,
866                                         cb_data) != 0) {
867                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
868
869                 talloc_free(tmp_ctx);
870                 return -1;
871         }
872
873         if (cb_data->failed != 0) {
874                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
875                 talloc_free(tmp_ctx);
876                 return -1;
877         }
878
879         if (cb_data->pnn == -1) {
880                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
881                 talloc_free(tmp_ctx);
882                 return -1;
883         }
884
885         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
886
887         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
888                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
889                 talloc_free(tmp_ctx);
890                 return -1;
891         }
892
893         talloc_free(tmp_ctx);
894         return 0;
895 }
896
897
898 /*
899   pull all the remote database contents into the recdb
900  */
901 static int pull_remote_database(struct ctdb_context *ctdb,
902                                 struct ctdb_recoverd *rec, 
903                                 struct ctdb_node_map_old *nodemap, 
904                                 struct tdb_wrap *recdb, uint32_t dbid,
905                                 bool persistent)
906 {
907         int j;
908
909         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
910                 int ret;
911                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
912                 if (ret == 0) {
913                         return 0;
914                 }
915         }
916
917         /* pull all records from all other nodes across onto this node
918            (this merges based on rsn)
919         */
920         for (j=0; j<nodemap->num; j++) {
921                 /* don't merge from nodes that are unavailable */
922                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
923                         continue;
924                 }
925                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
926                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
927                                  nodemap->nodes[j].pnn));
928                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
929                         return -1;
930                 }
931         }
932         
933         return 0;
934 }
935
936
937 /*
938   update flags on all active nodes
939  */
940 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
941 {
942         int ret;
943
944         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
945                 if (ret != 0) {
946                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
947                 return -1;
948         }
949
950         return 0;
951 }
952
953 /*
954   ensure all nodes have the same vnnmap we do
955  */
956 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
957                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
958 {
959         int j, ret;
960
961         /* push the new vnn map out to all the nodes */
962         for (j=0; j<nodemap->num; j++) {
963                 /* don't push to nodes that are unavailable */
964                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
965                         continue;
966                 }
967
968                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
969                 if (ret != 0) {
970                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
971                         return -1;
972                 }
973         }
974
975         return 0;
976 }
977
978
979 /*
980   called when a vacuum fetch has completed - just free it and do the next one
981  */
982 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
983 {
984         talloc_free(state);
985 }
986
987
988 /**
989  * Process one elements of the vacuum fetch list:
990  * Migrate it over to us with the special flag
991  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
992  */
993 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
994                                      uint32_t pnn,
995                                      struct ctdb_rec_data_old *r)
996 {
997         struct ctdb_client_call_state *state;
998         TDB_DATA data;
999         struct ctdb_ltdb_header *hdr;
1000         struct ctdb_call call;
1001
1002         ZERO_STRUCT(call);
1003         call.call_id = CTDB_NULL_FUNC;
1004         call.flags = CTDB_IMMEDIATE_MIGRATION;
1005         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1006
1007         call.key.dptr = &r->data[0];
1008         call.key.dsize = r->keylen;
1009
1010         /* ensure we don't block this daemon - just skip a record if we can't get
1011            the chainlock */
1012         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1013                 return true;
1014         }
1015
1016         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1017         if (data.dptr == NULL) {
1018                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1019                 return true;
1020         }
1021
1022         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1023                 free(data.dptr);
1024                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1025                 return true;
1026         }
1027
1028         hdr = (struct ctdb_ltdb_header *)data.dptr;
1029         if (hdr->dmaster == pnn) {
1030                 /* its already local */
1031                 free(data.dptr);
1032                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1033                 return true;
1034         }
1035
1036         free(data.dptr);
1037
1038         state = ctdb_call_send(ctdb_db, &call);
1039         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1040         if (state == NULL) {
1041                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1042                 return false;
1043         }
1044         state->async.fn = vacuum_fetch_callback;
1045         state->async.private_data = NULL;
1046
1047         return true;
1048 }
1049
1050
1051 /*
1052   handler for vacuum fetch
1053 */
1054 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1055                                  void *private_data)
1056 {
1057         struct ctdb_recoverd *rec = talloc_get_type(
1058                 private_data, struct ctdb_recoverd);
1059         struct ctdb_context *ctdb = rec->ctdb;
1060         struct ctdb_marshall_buffer *recs;
1061         int ret, i;
1062         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1063         const char *name;
1064         struct ctdb_dbid_map_old *dbmap=NULL;
1065         bool persistent = false;
1066         struct ctdb_db_context *ctdb_db;
1067         struct ctdb_rec_data_old *r;
1068
1069         recs = (struct ctdb_marshall_buffer *)data.dptr;
1070
1071         if (recs->count == 0) {
1072                 goto done;
1073         }
1074
1075         /* work out if the database is persistent */
1076         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1077         if (ret != 0) {
1078                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1079                 goto done;
1080         }
1081
1082         for (i=0;i<dbmap->num;i++) {
1083                 if (dbmap->dbs[i].db_id == recs->db_id) {
1084                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1085                         break;
1086                 }
1087         }
1088         if (i == dbmap->num) {
1089                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1090                 goto done;
1091         }
1092
1093         /* find the name of this database */
1094         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1095                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1096                 goto done;
1097         }
1098
1099         /* attach to it */
1100         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1101         if (ctdb_db == NULL) {
1102                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1103                 goto done;
1104         }
1105
1106         r = (struct ctdb_rec_data_old *)&recs->data[0];
1107         while (recs->count) {
1108                 bool ok;
1109
1110                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1111                 if (!ok) {
1112                         break;
1113                 }
1114
1115                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1116                 recs->count--;
1117         }
1118
1119 done:
1120         talloc_free(tmp_ctx);
1121 }
1122
1123
1124 /*
1125  * handler for database detach
1126  */
1127 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1128                                     void *private_data)
1129 {
1130         struct ctdb_recoverd *rec = talloc_get_type(
1131                 private_data, struct ctdb_recoverd);
1132         struct ctdb_context *ctdb = rec->ctdb;
1133         uint32_t db_id;
1134         struct ctdb_db_context *ctdb_db;
1135
1136         if (data.dsize != sizeof(db_id)) {
1137                 return;
1138         }
1139         db_id = *(uint32_t *)data.dptr;
1140
1141         ctdb_db = find_ctdb_db(ctdb, db_id);
1142         if (ctdb_db == NULL) {
1143                 /* database is not attached */
1144                 return;
1145         }
1146
1147         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1148
1149         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1150                              ctdb_db->db_name));
1151         talloc_free(ctdb_db);
1152 }
1153
1154 /*
1155   called when ctdb_wait_timeout should finish
1156  */
1157 static void ctdb_wait_handler(struct tevent_context *ev,
1158                               struct tevent_timer *te,
1159                               struct timeval yt, void *p)
1160 {
1161         uint32_t *timed_out = (uint32_t *)p;
1162         (*timed_out) = 1;
1163 }
1164
1165 /*
1166   wait for a given number of seconds
1167  */
1168 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1169 {
1170         uint32_t timed_out = 0;
1171         time_t usecs = (secs - (time_t)secs) * 1000000;
1172         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1173                          ctdb_wait_handler, &timed_out);
1174         while (!timed_out) {
1175                 tevent_loop_once(ctdb->ev);
1176         }
1177 }
1178
1179 /*
1180   called when an election times out (ends)
1181  */
1182 static void ctdb_election_timeout(struct tevent_context *ev,
1183                                   struct tevent_timer *te,
1184                                   struct timeval t, void *p)
1185 {
1186         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1187         rec->election_timeout = NULL;
1188         fast_start = false;
1189
1190         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1191 }
1192
1193
1194 /*
1195   wait for an election to finish. It finished election_timeout seconds after
1196   the last election packet is received
1197  */
1198 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1199 {
1200         struct ctdb_context *ctdb = rec->ctdb;
1201         while (rec->election_timeout) {
1202                 tevent_loop_once(ctdb->ev);
1203         }
1204 }
1205
1206 /*
1207   Update our local flags from all remote connected nodes. 
1208   This is only run when we are or we belive we are the recovery master
1209  */
1210 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1211 {
1212         int j;
1213         struct ctdb_context *ctdb = rec->ctdb;
1214         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1215
1216         /* get the nodemap for all active remote nodes and verify
1217            they are the same as for this node
1218          */
1219         for (j=0; j<nodemap->num; j++) {
1220                 struct ctdb_node_map_old *remote_nodemap=NULL;
1221                 int ret;
1222
1223                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1224                         continue;
1225                 }
1226                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1227                         continue;
1228                 }
1229
1230                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1231                                            mem_ctx, &remote_nodemap);
1232                 if (ret != 0) {
1233                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1234                                   nodemap->nodes[j].pnn));
1235                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1236                         talloc_free(mem_ctx);
1237                         return MONITOR_FAILED;
1238                 }
1239                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1240                         /* We should tell our daemon about this so it
1241                            updates its flags or else we will log the same 
1242                            message again in the next iteration of recovery.
1243                            Since we are the recovery master we can just as
1244                            well update the flags on all nodes.
1245                         */
1246                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1247                         if (ret != 0) {
1248                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1249                                 return -1;
1250                         }
1251
1252                         /* Update our local copy of the flags in the recovery
1253                            daemon.
1254                         */
1255                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1256                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1257                                  nodemap->nodes[j].flags));
1258                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1259                 }
1260                 talloc_free(remote_nodemap);
1261         }
1262         talloc_free(mem_ctx);
1263         return MONITOR_OK;
1264 }
1265
1266
1267 /* Create a new random generation id.
1268    The generation id can not be the INVALID_GENERATION id
1269 */
1270 static uint32_t new_generation(void)
1271 {
1272         uint32_t generation;
1273
1274         while (1) {
1275                 generation = random();
1276
1277                 if (generation != INVALID_GENERATION) {
1278                         break;
1279                 }
1280         }
1281
1282         return generation;
1283 }
1284
1285
1286 /*
1287   create a temporary working database
1288  */
1289 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1290 {
1291         char *name;
1292         struct tdb_wrap *recdb;
1293         unsigned tdb_flags;
1294
1295         /* open up the temporary recovery database */
1296         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1297                                ctdb->db_directory_state,
1298                                ctdb->pnn);
1299         if (name == NULL) {
1300                 return NULL;
1301         }
1302         unlink(name);
1303
1304         tdb_flags = TDB_NOLOCK;
1305         if (ctdb->valgrinding) {
1306                 tdb_flags |= TDB_NOMMAP;
1307         }
1308         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1309
1310         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1311                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1312         if (recdb == NULL) {
1313                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1314         }
1315
1316         talloc_free(name);
1317
1318         return recdb;
1319 }
1320
1321
1322 /* 
1323    a traverse function for pulling all relevant records from recdb
1324  */
1325 struct recdb_data {
1326         struct ctdb_context *ctdb;
1327         struct ctdb_marshall_buffer *recdata;
1328         uint32_t len;
1329         uint32_t allocated_len;
1330         bool failed;
1331         bool persistent;
1332 };
1333
1334 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1335 {
1336         struct recdb_data *params = (struct recdb_data *)p;
1337         struct ctdb_rec_data_old *recdata;
1338         struct ctdb_ltdb_header *hdr;
1339
1340         /*
1341          * skip empty records - but NOT for persistent databases:
1342          *
1343          * The record-by-record mode of recovery deletes empty records.
1344          * For persistent databases, this can lead to data corruption
1345          * by deleting records that should be there:
1346          *
1347          * - Assume the cluster has been running for a while.
1348          *
1349          * - A record R in a persistent database has been created and
1350          *   deleted a couple of times, the last operation being deletion,
1351          *   leaving an empty record with a high RSN, say 10.
1352          *
1353          * - Now a node N is turned off.
1354          *
1355          * - This leaves the local database copy of D on N with the empty
1356          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1357          *   the copy of record R.
1358          *
1359          * - Now the record is created again while node N is turned off.
1360          *   This creates R with RSN = 1 on all nodes except for N.
1361          *
1362          * - Now node N is turned on again. The following recovery will chose
1363          *   the older empty copy of R due to RSN 10 > RSN 1.
1364          *
1365          * ==> Hence the record is gone after the recovery.
1366          *
1367          * On databases like Samba's registry, this can damage the higher-level
1368          * data structures built from the various tdb-level records.
1369          */
1370         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1371                 return 0;
1372         }
1373
1374         /* update the dmaster field to point to us */
1375         hdr = (struct ctdb_ltdb_header *)data.dptr;
1376         if (!params->persistent) {
1377                 hdr->dmaster = params->ctdb->pnn;
1378                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1379         }
1380
1381         /* add the record to the blob ready to send to the nodes */
1382         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1383         if (recdata == NULL) {
1384                 params->failed = true;
1385                 return -1;
1386         }
1387         if (params->len + recdata->length >= params->allocated_len) {
1388                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1389                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1390         }
1391         if (params->recdata == NULL) {
1392                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1393                          recdata->length + params->len));
1394                 params->failed = true;
1395                 return -1;
1396         }
1397         params->recdata->count++;
1398         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1399         params->len += recdata->length;
1400         talloc_free(recdata);
1401
1402         return 0;
1403 }
1404
1405 /*
1406   push the recdb database out to all nodes
1407  */
1408 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1409                                bool persistent,
1410                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1411 {
1412         struct recdb_data params;
1413         struct ctdb_marshall_buffer *recdata;
1414         TDB_DATA outdata;
1415         TALLOC_CTX *tmp_ctx;
1416         uint32_t *nodes;
1417
1418         tmp_ctx = talloc_new(ctdb);
1419         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1420
1421         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1422         CTDB_NO_MEMORY(ctdb, recdata);
1423
1424         recdata->db_id = dbid;
1425
1426         params.ctdb = ctdb;
1427         params.recdata = recdata;
1428         params.len = offsetof(struct ctdb_marshall_buffer, data);
1429         params.allocated_len = params.len;
1430         params.failed = false;
1431         params.persistent = persistent;
1432
1433         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1434                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1435                 talloc_free(params.recdata);
1436                 talloc_free(tmp_ctx);
1437                 return -1;
1438         }
1439
1440         if (params.failed) {
1441                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1442                 talloc_free(params.recdata);
1443                 talloc_free(tmp_ctx);
1444                 return -1;              
1445         }
1446
1447         recdata = params.recdata;
1448
1449         outdata.dptr = (void *)recdata;
1450         outdata.dsize = params.len;
1451
1452         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1453         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1454                                         nodes, 0,
1455                                         CONTROL_TIMEOUT(), false, outdata,
1456                                         NULL, NULL,
1457                                         NULL) != 0) {
1458                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1459                 talloc_free(recdata);
1460                 talloc_free(tmp_ctx);
1461                 return -1;
1462         }
1463
1464         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1465                   dbid, recdata->count));
1466
1467         talloc_free(recdata);
1468         talloc_free(tmp_ctx);
1469
1470         return 0;
1471 }
1472
1473
1474 /*
1475   go through a full recovery on one database 
1476  */
1477 static int recover_database(struct ctdb_recoverd *rec, 
1478                             TALLOC_CTX *mem_ctx,
1479                             uint32_t dbid,
1480                             bool persistent,
1481                             uint32_t pnn, 
1482                             struct ctdb_node_map_old *nodemap,
1483                             uint32_t transaction_id)
1484 {
1485         struct tdb_wrap *recdb;
1486         int ret;
1487         struct ctdb_context *ctdb = rec->ctdb;
1488         TDB_DATA data;
1489         struct ctdb_transdb w;
1490         uint32_t *nodes;
1491
1492         recdb = create_recdb(ctdb, mem_ctx);
1493         if (recdb == NULL) {
1494                 return -1;
1495         }
1496
1497         /* pull all remote databases onto the recdb */
1498         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1499         if (ret != 0) {
1500                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1501                 return -1;
1502         }
1503
1504         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1505
1506         /* wipe all the remote databases. This is safe as we are in a transaction */
1507         w.db_id = dbid;
1508         w.tid = transaction_id;
1509
1510         data.dptr = (void *)&w;
1511         data.dsize = sizeof(w);
1512
1513         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1514         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1515                                         nodes, 0,
1516                                         CONTROL_TIMEOUT(), false, data,
1517                                         NULL, NULL,
1518                                         NULL) != 0) {
1519                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1520                 talloc_free(recdb);
1521                 return -1;
1522         }
1523         
1524         /* push out the correct database. This sets the dmaster and skips 
1525            the empty records */
1526         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1527         if (ret != 0) {
1528                 talloc_free(recdb);
1529                 return -1;
1530         }
1531
1532         /* all done with this database */
1533         talloc_free(recdb);
1534
1535         return 0;
1536 }
1537
1538 /* when we start a recovery, make sure all nodes use the same reclock file
1539    setting
1540 */
1541 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1542 {
1543         struct ctdb_context *ctdb = rec->ctdb;
1544         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1545         TDB_DATA data;
1546         uint32_t *nodes;
1547
1548         if (ctdb->recovery_lock_file == NULL) {
1549                 data.dptr  = NULL;
1550                 data.dsize = 0;
1551         } else {
1552                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1553                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1554         }
1555
1556         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1557         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1558                                         nodes, 0,
1559                                         CONTROL_TIMEOUT(),
1560                                         false, data,
1561                                         NULL, NULL,
1562                                         rec) != 0) {
1563                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1564                 talloc_free(tmp_ctx);
1565                 return -1;
1566         }
1567
1568         talloc_free(tmp_ctx);
1569         return 0;
1570 }
1571
1572
1573 /*
1574  * this callback is called for every node that failed to execute ctdb_takeover_run()
1575  * and set flag to re-run takeover run.
1576  */
1577 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1578 {
1579         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1580
1581         if (callback_data != NULL) {
1582                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1583
1584                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1585
1586                 ctdb_set_culprit(rec, node_pnn);
1587         }
1588 }
1589
1590
1591 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1592 {
1593         struct ctdb_context *ctdb = rec->ctdb;
1594         int i;
1595         struct ctdb_banning_state *ban_state;
1596
1597         *self_ban = false;
1598         for (i=0; i<ctdb->num_nodes; i++) {
1599                 if (ctdb->nodes[i]->ban_state == NULL) {
1600                         continue;
1601                 }
1602                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1603                 if (ban_state->count < 2*ctdb->num_nodes) {
1604                         continue;
1605                 }
1606
1607                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1608                         ctdb->nodes[i]->pnn, ban_state->count,
1609                         ctdb->tunable.recovery_ban_period));
1610                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1611                 ban_state->count = 0;
1612
1613                 /* Banning ourself? */
1614                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1615                         *self_ban = true;
1616                 }
1617         }
1618 }
1619
1620 static bool do_takeover_run(struct ctdb_recoverd *rec,
1621                             struct ctdb_node_map_old *nodemap,
1622                             bool banning_credits_on_fail)
1623 {
1624         uint32_t *nodes = NULL;
1625         struct ctdb_disable_message dtr;
1626         TDB_DATA data;
1627         int i;
1628         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1629         int ret;
1630         bool ok;
1631
1632         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1633
1634         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1635                 DEBUG(DEBUG_ERR, (__location__
1636                                   " takeover run already in progress \n"));
1637                 ok = false;
1638                 goto done;
1639         }
1640
1641         if (!ctdb_op_begin(rec->takeover_run)) {
1642                 ok = false;
1643                 goto done;
1644         }
1645
1646         /* Disable IP checks (takeover runs, really) on other nodes
1647          * while doing this takeover run.  This will stop those other
1648          * nodes from triggering takeover runs when think they should
1649          * be hosting an IP but it isn't yet on an interface.  Don't
1650          * wait for replies since a failure here might cause some
1651          * noise in the logs but will not actually cause a problem.
1652          */
1653         dtr.srvid = 0; /* No reply */
1654         dtr.pnn = -1;
1655
1656         data.dptr  = (uint8_t*)&dtr;
1657         data.dsize = sizeof(dtr);
1658
1659         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1660
1661         /* Disable for 60 seconds.  This can be a tunable later if
1662          * necessary.
1663          */
1664         dtr.timeout = 60;
1665         for (i = 0; i < talloc_array_length(nodes); i++) {
1666                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1667                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1668                                              data) != 0) {
1669                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1670                 }
1671         }
1672
1673         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1674                                 rec->force_rebalance_nodes,
1675                                 takeover_fail_callback,
1676                                 banning_credits_on_fail ? rec : NULL);
1677
1678         /* Reenable takeover runs and IP checks on other nodes */
1679         dtr.timeout = 0;
1680         for (i = 0; i < talloc_array_length(nodes); i++) {
1681                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1682                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1683                                              data) != 0) {
1684                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1685                 }
1686         }
1687
1688         if (ret != 0) {
1689                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1690                 ok = false;
1691                 goto done;
1692         }
1693
1694         ok = true;
1695         /* Takeover run was successful so clear force rebalance targets */
1696         if (rebalance_nodes == rec->force_rebalance_nodes) {
1697                 TALLOC_FREE(rec->force_rebalance_nodes);
1698         } else {
1699                 DEBUG(DEBUG_WARNING,
1700                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1701         }
1702 done:
1703         rec->need_takeover_run = !ok;
1704         talloc_free(nodes);
1705         ctdb_op_end(rec->takeover_run);
1706
1707         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1708         return ok;
1709 }
1710
1711 struct recovery_helper_state {
1712         int fd[2];
1713         pid_t pid;
1714         int result;
1715         bool done;
1716 };
1717
1718 static void ctdb_recovery_handler(struct tevent_context *ev,
1719                                   struct tevent_fd *fde,
1720                                   uint16_t flags, void *private_data)
1721 {
1722         struct recovery_helper_state *state = talloc_get_type_abort(
1723                 private_data, struct recovery_helper_state);
1724         int ret;
1725
1726         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1727         if (ret != sizeof(state->result)) {
1728                 state->result = EPIPE;
1729         }
1730
1731         state->done = true;
1732 }
1733
1734
1735 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1736 {
1737         static char prog[PATH_MAX+1] = "";
1738         const char **args;
1739         struct recovery_helper_state *state;
1740         struct tevent_fd *fde;
1741         int nargs, ret;
1742
1743         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1744                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1745                              "ctdb_recovery_helper")) {
1746                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1747         }
1748
1749         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1750         if (state == NULL) {
1751                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1752                 return -1;
1753         }
1754
1755         state->pid = -1;
1756
1757         ret = pipe(state->fd);
1758         if (ret != 0) {
1759                 DEBUG(DEBUG_ERR,
1760                       ("Failed to create pipe for recovery helper\n"));
1761                 goto fail;
1762         }
1763
1764         set_close_on_exec(state->fd[0]);
1765
1766         nargs = 4;
1767         args = talloc_array(state, const char *, nargs);
1768         if (args == NULL) {
1769                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1770                 goto fail;
1771         }
1772
1773         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1774         args[1] = rec->ctdb->daemon.name;
1775         args[2] = talloc_asprintf(args, "%u", new_generation());
1776         args[3] = NULL;
1777
1778         if (args[0] == NULL || args[2] == NULL) {
1779                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1780                 goto fail;
1781         }
1782
1783         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1784                                      args, NULL, NULL, &state->pid)) {
1785                 DEBUG(DEBUG_ERR,
1786                       ("Failed to create child for recovery helper\n"));
1787                 goto fail;
1788         }
1789
1790         close(state->fd[1]);
1791         state->fd[1] = -1;
1792
1793         state->done = false;
1794
1795         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1796                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1797         if (fde == NULL) {
1798                 goto fail;
1799         }
1800         tevent_fd_set_auto_close(fde);
1801
1802         while (!state->done) {
1803                 tevent_loop_once(rec->ctdb->ev);
1804         }
1805
1806         close(state->fd[0]);
1807         state->fd[0] = -1;
1808
1809         if (state->result != 0) {
1810                 goto fail;
1811         }
1812
1813         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1814         talloc_free(state);
1815         return 0;
1816
1817 fail:
1818         if (state->fd[0] != -1) {
1819                 close(state->fd[0]);
1820         }
1821         if (state->fd[1] != -1) {
1822                 close(state->fd[1]);
1823         }
1824         if (state->pid != -1) {
1825                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1826         }
1827         talloc_free(state);
1828         return -1;
1829 }
1830
1831 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1832                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1833                               struct ctdb_vnn_map *vnnmap,
1834                               struct ctdb_dbid_map_old *dbmap)
1835 {
1836         struct ctdb_context *ctdb = rec->ctdb;
1837         uint32_t generation;
1838         TDB_DATA data;
1839         uint32_t *nodes;
1840         int ret, i, j;
1841
1842         /* set recovery mode to active on all nodes */
1843         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1846                 return -1;
1847         }
1848
1849         /* execute the "startrecovery" event script on all nodes */
1850         ret = run_startrecovery_eventscript(rec, nodemap);
1851         if (ret!=0) {
1852                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1853                 return -1;
1854         }
1855
1856         /* pick a new generation number */
1857         generation = new_generation();
1858
1859         /* change the vnnmap on this node to use the new generation 
1860            number but not on any other nodes.
1861            this guarantees that if we abort the recovery prematurely
1862            for some reason (a node stops responding?)
1863            that we can just return immediately and we will reenter
1864            recovery shortly again.
1865            I.e. we deliberately leave the cluster with an inconsistent
1866            generation id to allow us to abort recovery at any stage and
1867            just restart it from scratch.
1868          */
1869         vnnmap->generation = generation;
1870         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1871         if (ret != 0) {
1872                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1873                 return -1;
1874         }
1875
1876         /* Database generations are updated when the transaction is commited to
1877          * the databases.  So make sure to use the final generation as the
1878          * transaction id
1879          */
1880         generation = new_generation();
1881
1882         data.dptr = (void *)&generation;
1883         data.dsize = sizeof(uint32_t);
1884
1885         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1886         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1887                                         nodes, 0,
1888                                         CONTROL_TIMEOUT(), false, data,
1889                                         NULL,
1890                                         transaction_start_fail_callback,
1891                                         rec) != 0) {
1892                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1893                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1894                                         nodes, 0,
1895                                         CONTROL_TIMEOUT(), false, tdb_null,
1896                                         NULL,
1897                                         NULL,
1898                                         NULL) != 0) {
1899                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1900                 }
1901                 return -1;
1902         }
1903
1904         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1905
1906         for (i=0;i<dbmap->num;i++) {
1907                 ret = recover_database(rec, mem_ctx,
1908                                        dbmap->dbs[i].db_id,
1909                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1910                                        pnn, nodemap, generation);
1911                 if (ret != 0) {
1912                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
1913                         return -1;
1914                 }
1915         }
1916
1917         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1918
1919         /* commit all the changes */
1920         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1921                                         nodes, 0,
1922                                         CONTROL_TIMEOUT(), false, data,
1923                                         NULL, NULL,
1924                                         NULL) != 0) {
1925                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1926                 return -1;
1927         }
1928
1929         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1930
1931         /* build a new vnn map with all the currently active and
1932            unbanned nodes */
1933         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1934         CTDB_NO_MEMORY(ctdb, vnnmap);
1935         vnnmap->generation = generation;
1936         vnnmap->size = 0;
1937         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1938         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1939         for (i=j=0;i<nodemap->num;i++) {
1940                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1941                         continue;
1942                 }
1943                 if (!ctdb_node_has_capabilities(rec->caps,
1944                                                 ctdb->nodes[i]->pnn,
1945                                                 CTDB_CAP_LMASTER)) {
1946                         /* this node can not be an lmaster */
1947                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1948                         continue;
1949                 }
1950
1951                 vnnmap->size++;
1952                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1953                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1954                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1955
1956         }
1957         if (vnnmap->size == 0) {
1958                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1959                 vnnmap->size++;
1960                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1961                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1962                 vnnmap->map[0] = pnn;
1963         }
1964
1965         /* update to the new vnnmap on all nodes */
1966         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1967         if (ret != 0) {
1968                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1969                 return -1;
1970         }
1971
1972         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1973
1974         /* disable recovery mode */
1975         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
1976         if (ret != 0) {
1977                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1978                 return -1;
1979         }
1980
1981         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1982
1983         return 0;
1984 }
1985
1986 /*
1987   we are the recmaster, and recovery is needed - start a recovery run
1988  */
1989 static int do_recovery(struct ctdb_recoverd *rec,
1990                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1991                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
1992 {
1993         struct ctdb_context *ctdb = rec->ctdb;
1994         int i, ret;
1995         struct ctdb_dbid_map_old *dbmap;
1996         struct timeval start_time;
1997         bool self_ban;
1998         bool par_recovery;
1999
2000         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2001
2002         /* Check if the current node is still the recmaster.  It's possible that
2003          * re-election has changed the recmaster.
2004          */
2005         if (pnn != rec->recmaster) {
2006                 DEBUG(DEBUG_NOTICE,
2007                       ("Recovery master changed to %u, aborting recovery\n",
2008                        rec->recmaster));
2009                 return -1;
2010         }
2011
2012         /* if recovery fails, force it again */
2013         rec->need_recovery = true;
2014
2015         if (!ctdb_op_begin(rec->recovery)) {
2016                 return -1;
2017         }
2018
2019         if (rec->election_timeout) {
2020                 /* an election is in progress */
2021                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2022                 goto fail;
2023         }
2024
2025         ban_misbehaving_nodes(rec, &self_ban);
2026         if (self_ban) {
2027                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2028                 goto fail;
2029         }
2030
2031         if (ctdb->recovery_lock_file != NULL) {
2032                 if (ctdb_recovery_have_lock(ctdb)) {
2033                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2034                 } else {
2035                         start_time = timeval_current();
2036                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2037                                              ctdb->recovery_lock_file));
2038                         if (!ctdb_recovery_lock(ctdb)) {
2039                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2040                                         /* If ctdb is trying first recovery, it's
2041                                          * possible that current node does not know
2042                                          * yet who the recmaster is.
2043                                          */
2044                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2045                                                           " - retrying recovery\n"));
2046                                         goto fail;
2047                                 }
2048
2049                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2050                                                  "and ban ourself for %u seconds\n",
2051                                                  ctdb->tunable.recovery_ban_period));
2052                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2053                                 goto fail;
2054                         }
2055                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2056                                                            CONTROL_TIMEOUT(),
2057                                                            timeval_elapsed(&start_time));
2058                         DEBUG(DEBUG_NOTICE,
2059                               ("Recovery lock taken successfully by recovery daemon\n"));
2060                 }
2061         }
2062
2063         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2064
2065         /* get a list of all databases */
2066         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2067         if (ret != 0) {
2068                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2069                 goto fail;
2070         }
2071
2072         /* we do the db creation before we set the recovery mode, so the freeze happens
2073            on all databases we will be dealing with. */
2074
2075         /* verify that we have all the databases any other node has */
2076         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2077         if (ret != 0) {
2078                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2079                 goto fail;
2080         }
2081
2082         /* verify that all other nodes have all our databases */
2083         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2084         if (ret != 0) {
2085                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2086                 goto fail;
2087         }
2088         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2089
2090         /* update the database priority for all remote databases */
2091         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2092         if (ret != 0) {
2093                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2094         }
2095         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2096
2097
2098         /* update all other nodes to use the same setting for reclock files
2099            as the local recovery master.
2100         */
2101         sync_recovery_lock_file_across_cluster(rec);
2102
2103         /* Retrieve capabilities from all connected nodes */
2104         ret = update_capabilities(rec, nodemap);
2105         if (ret!=0) {
2106                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2107                 return -1;
2108         }
2109
2110         /*
2111           update all nodes to have the same flags that we have
2112          */
2113         for (i=0;i<nodemap->num;i++) {
2114                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2115                         continue;
2116                 }
2117
2118                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2119                 if (ret != 0) {
2120                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2121                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2122                         } else {
2123                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2124                                 return -1;
2125                         }
2126                 }
2127         }
2128
2129         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2130
2131         /* Check if all participating nodes have parallel recovery capability */
2132         par_recovery = true;
2133         for (i=0; i<nodemap->num; i++) {
2134                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2135                         continue;
2136                 }
2137
2138                 if (!(rec->caps[i].capabilities &
2139                       CTDB_CAP_PARALLEL_RECOVERY)) {
2140                         par_recovery = false;
2141                         break;
2142                 }
2143         }
2144
2145         if (par_recovery) {
2146                 ret = db_recovery_parallel(rec, mem_ctx);
2147         } else {
2148                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2149                                          dbmap);
2150         }
2151
2152         if (ret != 0) {
2153                 goto fail;
2154         }
2155
2156         do_takeover_run(rec, nodemap, false);
2157
2158         /* execute the "recovered" event script on all nodes */
2159         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2160         if (ret!=0) {
2161                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2162                 goto fail;
2163         }
2164
2165         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2166
2167         /* send a message to all clients telling them that the cluster 
2168            has been reconfigured */
2169         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2170                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2171         if (ret != 0) {
2172                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2173                 goto fail;
2174         }
2175
2176         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2177
2178         rec->need_recovery = false;
2179         ctdb_op_end(rec->recovery);
2180
2181         /* we managed to complete a full recovery, make sure to forgive
2182            any past sins by the nodes that could now participate in the
2183            recovery.
2184         */
2185         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2186         for (i=0;i<nodemap->num;i++) {
2187                 struct ctdb_banning_state *ban_state;
2188
2189                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2190                         continue;
2191                 }
2192
2193                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2194                 if (ban_state == NULL) {
2195                         continue;
2196                 }
2197
2198                 ban_state->count = 0;
2199         }
2200
2201         /* We just finished a recovery successfully.
2202            We now wait for rerecovery_timeout before we allow
2203            another recovery to take place.
2204         */
2205         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2206         ctdb_op_disable(rec->recovery, ctdb->ev,
2207                         ctdb->tunable.rerecovery_timeout);
2208         return 0;
2209
2210 fail:
2211         ctdb_op_end(rec->recovery);
2212         return -1;
2213 }
2214
2215
2216 /*
2217   elections are won by first checking the number of connected nodes, then
2218   the priority time, then the pnn
2219  */
2220 struct election_message {
2221         uint32_t num_connected;
2222         struct timeval priority_time;
2223         uint32_t pnn;
2224         uint32_t node_flags;
2225 };
2226
2227 /*
2228   form this nodes election data
2229  */
2230 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2231 {
2232         int ret, i;
2233         struct ctdb_node_map_old *nodemap;
2234         struct ctdb_context *ctdb = rec->ctdb;
2235
2236         ZERO_STRUCTP(em);
2237
2238         em->pnn = rec->ctdb->pnn;
2239         em->priority_time = rec->priority_time;
2240
2241         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2242         if (ret != 0) {
2243                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2244                 return;
2245         }
2246
2247         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2248         em->node_flags = rec->node_flags;
2249
2250         for (i=0;i<nodemap->num;i++) {
2251                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2252                         em->num_connected++;
2253                 }
2254         }
2255
2256         /* we shouldnt try to win this election if we cant be a recmaster */
2257         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2258                 em->num_connected = 0;
2259                 em->priority_time = timeval_current();
2260         }
2261
2262         talloc_free(nodemap);
2263 }
2264
2265 /*
2266   see if the given election data wins
2267  */
2268 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2269 {
2270         struct election_message myem;
2271         int cmp = 0;
2272
2273         ctdb_election_data(rec, &myem);
2274
2275         /* we cant win if we don't have the recmaster capability */
2276         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2277                 return false;
2278         }
2279
2280         /* we cant win if we are banned */
2281         if (rec->node_flags & NODE_FLAGS_BANNED) {
2282                 return false;
2283         }
2284
2285         /* we cant win if we are stopped */
2286         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2287                 return false;
2288         }
2289
2290         /* we will automatically win if the other node is banned */
2291         if (em->node_flags & NODE_FLAGS_BANNED) {
2292                 return true;
2293         }
2294
2295         /* we will automatically win if the other node is banned */
2296         if (em->node_flags & NODE_FLAGS_STOPPED) {
2297                 return true;
2298         }
2299
2300         /* then the longest running node */
2301         if (cmp == 0) {
2302                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2303         }
2304
2305         if (cmp == 0) {
2306                 cmp = (int)myem.pnn - (int)em->pnn;
2307         }
2308
2309         return cmp > 0;
2310 }
2311
2312 /*
2313   send out an election request
2314  */
2315 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2316 {
2317         int ret;
2318         TDB_DATA election_data;
2319         struct election_message emsg;
2320         uint64_t srvid;
2321         struct ctdb_context *ctdb = rec->ctdb;
2322
2323         srvid = CTDB_SRVID_ELECTION;
2324
2325         ctdb_election_data(rec, &emsg);
2326
2327         election_data.dsize = sizeof(struct election_message);
2328         election_data.dptr  = (unsigned char *)&emsg;
2329
2330
2331         /* first we assume we will win the election and set 
2332            recoverymaster to be ourself on the current node
2333          */
2334         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2335                                      CTDB_CURRENT_NODE, pnn);
2336         if (ret != 0) {
2337                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster\n"));
2338                 return -1;
2339         }
2340         rec->recmaster = pnn;
2341
2342         /* send an election message to all active nodes */
2343         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2344         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2345 }
2346
2347 /*
2348   this function will unban all nodes in the cluster
2349 */
2350 static void unban_all_nodes(struct ctdb_context *ctdb)
2351 {
2352         int ret, i;
2353         struct ctdb_node_map_old *nodemap;
2354         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2355         
2356         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2357         if (ret != 0) {
2358                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2359                 return;
2360         }
2361
2362         for (i=0;i<nodemap->num;i++) {
2363                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2364                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2365                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2366                                                  nodemap->nodes[i].pnn, 0,
2367                                                  NODE_FLAGS_BANNED);
2368                         if (ret != 0) {
2369                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2370                         }
2371                 }
2372         }
2373
2374         talloc_free(tmp_ctx);
2375 }
2376
2377
2378 /*
2379   we think we are winning the election - send a broadcast election request
2380  */
2381 static void election_send_request(struct tevent_context *ev,
2382                                   struct tevent_timer *te,
2383                                   struct timeval t, void *p)
2384 {
2385         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2386         int ret;
2387
2388         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2389         if (ret != 0) {
2390                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2391         }
2392
2393         TALLOC_FREE(rec->send_election_te);
2394 }
2395
2396 /*
2397   handler for memory dumps
2398 */
2399 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2400 {
2401         struct ctdb_recoverd *rec = talloc_get_type(
2402                 private_data, struct ctdb_recoverd);
2403         struct ctdb_context *ctdb = rec->ctdb;
2404         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2405         TDB_DATA *dump;
2406         int ret;
2407         struct ctdb_srvid_message *rd;
2408
2409         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2410                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2411                 talloc_free(tmp_ctx);
2412                 return;
2413         }
2414         rd = (struct ctdb_srvid_message *)data.dptr;
2415
2416         dump = talloc_zero(tmp_ctx, TDB_DATA);
2417         if (dump == NULL) {
2418                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2419                 talloc_free(tmp_ctx);
2420                 return;
2421         }
2422         ret = ctdb_dump_memory(ctdb, dump);
2423         if (ret != 0) {
2424                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2425                 talloc_free(tmp_ctx);
2426                 return;
2427         }
2428
2429 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2430
2431         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2432         if (ret != 0) {
2433                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2434                 talloc_free(tmp_ctx);
2435                 return;
2436         }
2437
2438         talloc_free(tmp_ctx);
2439 }
2440
2441 /*
2442   handler for reload_nodes
2443 */
2444 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2445                                  void *private_data)
2446 {
2447         struct ctdb_recoverd *rec = talloc_get_type(
2448                 private_data, struct ctdb_recoverd);
2449
2450         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2451
2452         ctdb_load_nodes_file(rec->ctdb);
2453 }
2454
2455
2456 static void ctdb_rebalance_timeout(struct tevent_context *ev,
2457                                    struct tevent_timer *te,
2458                                    struct timeval t, void *p)
2459 {
2460         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2461
2462         if (rec->force_rebalance_nodes == NULL) {
2463                 DEBUG(DEBUG_ERR,
2464                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2465                 return;
2466         }
2467
2468         DEBUG(DEBUG_NOTICE,
2469               ("Rebalance timeout occurred - trigger takeover run\n"));
2470         rec->need_takeover_run = true;
2471 }
2472
2473
2474 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2475                                         void *private_data)
2476 {
2477         struct ctdb_recoverd *rec = talloc_get_type(
2478                 private_data, struct ctdb_recoverd);
2479         struct ctdb_context *ctdb = rec->ctdb;
2480         uint32_t pnn;
2481         uint32_t *t;
2482         int len;
2483         uint32_t deferred_rebalance;
2484
2485         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2486                 return;
2487         }
2488
2489         if (data.dsize != sizeof(uint32_t)) {
2490                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2491                 return;
2492         }
2493
2494         pnn = *(uint32_t *)&data.dptr[0];
2495
2496         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2497
2498         /* Copy any existing list of nodes.  There's probably some
2499          * sort of realloc variant that will do this but we need to
2500          * make sure that freeing the old array also cancels the timer
2501          * event for the timeout... not sure if realloc will do that.
2502          */
2503         len = (rec->force_rebalance_nodes != NULL) ?
2504                 talloc_array_length(rec->force_rebalance_nodes) :
2505                 0;
2506
2507         /* This allows duplicates to be added but they don't cause
2508          * harm.  A call to add a duplicate PNN arguably means that
2509          * the timeout should be reset, so this is the simplest
2510          * solution.
2511          */
2512         t = talloc_zero_array(rec, uint32_t, len+1);
2513         CTDB_NO_MEMORY_VOID(ctdb, t);
2514         if (len > 0) {
2515                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2516         }
2517         t[len] = pnn;
2518
2519         talloc_free(rec->force_rebalance_nodes);
2520
2521         rec->force_rebalance_nodes = t;
2522
2523         /* If configured, setup a deferred takeover run to make sure
2524          * that certain nodes get IPs rebalanced to them.  This will
2525          * be cancelled if a successful takeover run happens before
2526          * the timeout.  Assign tunable value to variable for
2527          * readability.
2528          */
2529         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2530         if (deferred_rebalance != 0) {
2531                 tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
2532                                  timeval_current_ofs(deferred_rebalance, 0),
2533                                  ctdb_rebalance_timeout, rec);
2534         }
2535 }
2536
2537
2538
2539 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2540                                    void *private_data)
2541 {
2542         struct ctdb_recoverd *rec = talloc_get_type(
2543                 private_data, struct ctdb_recoverd);
2544         struct ctdb_public_ip *ip;
2545
2546         if (rec->recmaster != rec->ctdb->pnn) {
2547                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2548                 return;
2549         }
2550
2551         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2552                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2553                 return;
2554         }
2555
2556         ip = (struct ctdb_public_ip *)data.dptr;
2557
2558         update_ip_assignment_tree(rec->ctdb, ip);
2559 }
2560
2561 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2562                                     TDB_DATA data,
2563                                     struct ctdb_op_state *op_state)
2564 {
2565         struct ctdb_disable_message *r;
2566         uint32_t timeout;
2567         TDB_DATA result;
2568         int32_t ret = 0;
2569
2570         /* Validate input data */
2571         if (data.dsize != sizeof(struct ctdb_disable_message)) {
2572                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2573                                  "expecting %lu\n", (long unsigned)data.dsize,
2574                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
2575                 return;
2576         }
2577         if (data.dptr == NULL) {
2578                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2579                 return;
2580         }
2581
2582         r = (struct ctdb_disable_message *)data.dptr;
2583         timeout = r->timeout;
2584
2585         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2586         if (ret != 0) {
2587                 goto done;
2588         }
2589
2590         /* Returning our PNN tells the caller that we succeeded */
2591         ret = ctdb_get_pnn(ctdb);
2592 done:
2593         result.dsize = sizeof(int32_t);
2594         result.dptr  = (uint8_t *)&ret;
2595         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
2596 }
2597
2598 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2599                                           void *private_data)
2600 {
2601         struct ctdb_recoverd *rec = talloc_get_type(
2602                 private_data, struct ctdb_recoverd);
2603
2604         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2605 }
2606
2607 /* Backward compatibility for this SRVID */
2608 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2609                                      void *private_data)
2610 {
2611         struct ctdb_recoverd *rec = talloc_get_type(
2612                 private_data, struct ctdb_recoverd);
2613         uint32_t timeout;
2614
2615         if (data.dsize != sizeof(uint32_t)) {
2616                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2617                                  "expecting %lu\n", (long unsigned)data.dsize,
2618                                  (long unsigned)sizeof(uint32_t)));
2619                 return;
2620         }
2621         if (data.dptr == NULL) {
2622                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2623                 return;
2624         }
2625
2626         timeout = *((uint32_t *)data.dptr);
2627
2628         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2629 }
2630
2631 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2632                                        void *private_data)
2633 {
2634         struct ctdb_recoverd *rec = talloc_get_type(
2635                 private_data, struct ctdb_recoverd);
2636
2637         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2638 }
2639
2640 /*
2641   handler for ip reallocate, just add it to the list of requests and 
2642   handle this later in the monitor_cluster loop so we do not recurse
2643   with other requests to takeover_run()
2644 */
2645 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2646                                   void *private_data)
2647 {
2648         struct ctdb_srvid_message *request;
2649         struct ctdb_recoverd *rec = talloc_get_type(
2650                 private_data, struct ctdb_recoverd);
2651
2652         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2653                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2654                 return;
2655         }
2656
2657         request = (struct ctdb_srvid_message *)data.dptr;
2658
2659         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2660 }
2661
2662 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2663                                           struct ctdb_recoverd *rec)
2664 {
2665         TDB_DATA result;
2666         int32_t ret;
2667         struct srvid_requests *current;
2668
2669         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2670
2671         /* Only process requests that are currently pending.  More
2672          * might come in while the takeover run is in progress and
2673          * they will need to be processed later since they might
2674          * be in response flag changes.
2675          */
2676         current = rec->reallocate_requests;
2677         rec->reallocate_requests = NULL;
2678
2679         if (do_takeover_run(rec, rec->nodemap, false)) {
2680                 ret = ctdb_get_pnn(ctdb);
2681         } else {
2682                 ret = -1;
2683         }
2684
2685         result.dsize = sizeof(int32_t);
2686         result.dptr  = (uint8_t *)&ret;
2687
2688         srvid_requests_reply(ctdb, &current, result);
2689 }
2690
2691
2692 /*
2693   handler for recovery master elections
2694 */
2695 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2696 {
2697         struct ctdb_recoverd *rec = talloc_get_type(
2698                 private_data, struct ctdb_recoverd);
2699         struct ctdb_context *ctdb = rec->ctdb;
2700         int ret;
2701         struct election_message *em = (struct election_message *)data.dptr;
2702
2703         /* Ignore election packets from ourself */
2704         if (ctdb->pnn == em->pnn) {
2705                 return;
2706         }
2707
2708         /* we got an election packet - update the timeout for the election */
2709         talloc_free(rec->election_timeout);
2710         rec->election_timeout = tevent_add_timer(
2711                         ctdb->ev, ctdb,
2712                         fast_start ?
2713                                 timeval_current_ofs(0, 500000) :
2714                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2715                         ctdb_election_timeout, rec);
2716
2717         /* someone called an election. check their election data
2718            and if we disagree and we would rather be the elected node, 
2719            send a new election message to all other nodes
2720          */
2721         if (ctdb_election_win(rec, em)) {
2722                 if (!rec->send_election_te) {
2723                         rec->send_election_te = tevent_add_timer(
2724                                         ctdb->ev, rec,
2725                                         timeval_current_ofs(0, 500000),
2726                                         election_send_request, rec);
2727                 }
2728                 /*unban_all_nodes(ctdb);*/
2729                 return;
2730         }
2731
2732         /* we didn't win */
2733         TALLOC_FREE(rec->send_election_te);
2734
2735         /* Release the recovery lock file */
2736         if (ctdb_recovery_have_lock(ctdb)) {
2737                 ctdb_recovery_unlock(ctdb);
2738                 unban_all_nodes(ctdb);
2739         }
2740
2741         clear_ip_assignment_tree(ctdb);
2742
2743         /* ok, let that guy become recmaster then */
2744         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(),
2745                                      CTDB_CURRENT_NODE, em->pnn);
2746         if (ret != 0) {
2747                 DEBUG(DEBUG_ERR, (__location__ " failed to set recmaster"));
2748                 return;
2749         }
2750         rec->recmaster = em->pnn;
2751
2752         return;
2753 }
2754
2755
2756 /*
2757   force the start of the election process
2758  */
2759 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2760                            struct ctdb_node_map_old *nodemap)
2761 {
2762         int ret;
2763         struct ctdb_context *ctdb = rec->ctdb;
2764
2765         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2766
2767         /* set all nodes to recovery mode to stop all internode traffic */
2768         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2769         if (ret != 0) {
2770                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2771                 return;
2772         }
2773
2774         talloc_free(rec->election_timeout);
2775         rec->election_timeout = tevent_add_timer(
2776                         ctdb->ev, ctdb,
2777                         fast_start ?
2778                                 timeval_current_ofs(0, 500000) :
2779                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2780                         ctdb_election_timeout, rec);
2781
2782         ret = send_election_request(rec, pnn);
2783         if (ret!=0) {
2784                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2785                 return;
2786         }
2787
2788         /* wait for a few seconds to collect all responses */
2789         ctdb_wait_election(rec);
2790 }
2791
2792
2793
2794 /*
2795   handler for when a node changes its flags
2796 */
2797 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2798 {
2799         struct ctdb_recoverd *rec = talloc_get_type(
2800                 private_data, struct ctdb_recoverd);
2801         struct ctdb_context *ctdb = rec->ctdb;
2802         int ret;
2803         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2804         struct ctdb_node_map_old *nodemap=NULL;
2805         TALLOC_CTX *tmp_ctx;
2806         int i;
2807         int disabled_flag_changed;
2808
2809         if (data.dsize != sizeof(*c)) {
2810                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2811                 return;
2812         }
2813
2814         tmp_ctx = talloc_new(ctdb);
2815         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2816
2817         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2818         if (ret != 0) {
2819                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2820                 talloc_free(tmp_ctx);
2821                 return;         
2822         }
2823
2824
2825         for (i=0;i<nodemap->num;i++) {
2826                 if (nodemap->nodes[i].pnn == c->pnn) break;
2827         }
2828
2829         if (i == nodemap->num) {
2830                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2831                 talloc_free(tmp_ctx);
2832                 return;
2833         }
2834
2835         if (c->old_flags != c->new_flags) {
2836                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2837         }
2838
2839         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2840
2841         nodemap->nodes[i].flags = c->new_flags;
2842
2843         ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(),
2844                                    CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2845
2846         if (ret == 0 &&
2847             rec->recmaster == ctdb->pnn &&
2848             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2849                 /* Only do the takeover run if the perm disabled or unhealthy
2850                    flags changed since these will cause an ip failover but not
2851                    a recovery.
2852                    If the node became disconnected or banned this will also
2853                    lead to an ip address failover but that is handled 
2854                    during recovery
2855                 */
2856                 if (disabled_flag_changed) {
2857                         rec->need_takeover_run = true;
2858                 }
2859         }
2860
2861         talloc_free(tmp_ctx);
2862 }
2863
2864 /*
2865   handler for when we need to push out flag changes ot all other nodes
2866 */
2867 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
2868                                void *private_data)
2869 {
2870         struct ctdb_recoverd *rec = talloc_get_type(
2871                 private_data, struct ctdb_recoverd);
2872         struct ctdb_context *ctdb = rec->ctdb;
2873         int ret;
2874         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2875         struct ctdb_node_map_old *nodemap=NULL;
2876         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2877         uint32_t *nodes;
2878
2879         /* read the node flags from the recmaster */
2880         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
2881                                    tmp_ctx, &nodemap);
2882         if (ret != 0) {
2883                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2884                 talloc_free(tmp_ctx);
2885                 return;
2886         }
2887         if (c->pnn >= nodemap->num) {
2888                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2889                 talloc_free(tmp_ctx);
2890                 return;
2891         }
2892
2893         /* send the flags update to all connected nodes */
2894         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2895
2896         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2897                                       nodes, 0, CONTROL_TIMEOUT(),
2898                                       false, data,
2899                                       NULL, NULL,
2900                                       NULL) != 0) {
2901                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2902
2903                 talloc_free(tmp_ctx);
2904                 return;
2905         }
2906
2907         talloc_free(tmp_ctx);
2908 }
2909
2910
2911 struct verify_recmode_normal_data {
2912         uint32_t count;
2913         enum monitor_result status;
2914 };
2915
2916 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2917 {
2918         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2919
2920
2921         /* one more node has responded with recmode data*/
2922         rmdata->count--;
2923
2924         /* if we failed to get the recmode, then return an error and let
2925            the main loop try again.
2926         */
2927         if (state->state != CTDB_CONTROL_DONE) {
2928                 if (rmdata->status == MONITOR_OK) {
2929                         rmdata->status = MONITOR_FAILED;
2930                 }
2931                 return;
2932         }
2933
2934         /* if we got a response, then the recmode will be stored in the
2935            status field
2936         */
2937         if (state->status != CTDB_RECOVERY_NORMAL) {
2938                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2939                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2940         }
2941
2942         return;
2943 }
2944
2945
2946 /* verify that all nodes are in normal recovery mode */
2947 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
2948 {
2949         struct verify_recmode_normal_data *rmdata;
2950         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2951         struct ctdb_client_control_state *state;
2952         enum monitor_result status;
2953         int j;
2954         
2955         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2956         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2957         rmdata->count  = 0;
2958         rmdata->status = MONITOR_OK;
2959
2960         /* loop over all active nodes and send an async getrecmode call to 
2961            them*/
2962         for (j=0; j<nodemap->num; j++) {
2963                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2964                         continue;
2965                 }
2966                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2967                                         CONTROL_TIMEOUT(), 
2968                                         nodemap->nodes[j].pnn);
2969                 if (state == NULL) {
2970                         /* we failed to send the control, treat this as 
2971                            an error and try again next iteration
2972                         */                      
2973                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2974                         talloc_free(mem_ctx);
2975                         return MONITOR_FAILED;
2976                 }
2977
2978                 /* set up the callback functions */
2979                 state->async.fn = verify_recmode_normal_callback;
2980                 state->async.private_data = rmdata;
2981
2982                 /* one more control to wait for to complete */
2983                 rmdata->count++;
2984         }
2985
2986
2987         /* now wait for up to the maximum number of seconds allowed
2988            or until all nodes we expect a response from has replied
2989         */
2990         while (rmdata->count > 0) {
2991                 tevent_loop_once(ctdb->ev);
2992         }
2993
2994         status = rmdata->status;
2995         talloc_free(mem_ctx);
2996         return status;
2997 }
2998
2999
3000 struct verify_recmaster_data {
3001         struct ctdb_recoverd *rec;
3002         uint32_t count;
3003         uint32_t pnn;
3004         enum monitor_result status;
3005 };
3006
3007 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3008 {
3009         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3010
3011
3012         /* one more node has responded with recmaster data*/
3013         rmdata->count--;
3014
3015         /* if we failed to get the recmaster, then return an error and let
3016            the main loop try again.
3017         */
3018         if (state->state != CTDB_CONTROL_DONE) {
3019                 if (rmdata->status == MONITOR_OK) {
3020                         rmdata->status = MONITOR_FAILED;
3021                 }
3022                 return;
3023         }
3024
3025         /* if we got a response, then the recmaster will be stored in the
3026            status field
3027         */
3028         if (state->status != rmdata->pnn) {
3029                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3030                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3031                 rmdata->status = MONITOR_ELECTION_NEEDED;
3032         }
3033
3034         return;
3035 }
3036
3037
3038 /* verify that all nodes agree that we are the recmaster */
3039 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3040 {
3041         struct ctdb_context *ctdb = rec->ctdb;
3042         struct verify_recmaster_data *rmdata;
3043         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3044         struct ctdb_client_control_state *state;
3045         enum monitor_result status;
3046         int j;
3047         
3048         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3049         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3050         rmdata->rec    = rec;
3051         rmdata->count  = 0;
3052         rmdata->pnn    = pnn;
3053         rmdata->status = MONITOR_OK;
3054
3055         /* loop over all active nodes and send an async getrecmaster call to
3056            them*/
3057         for (j=0; j<nodemap->num; j++) {
3058                 if (nodemap->nodes[j].pnn == rec->recmaster) {
3059                         continue;
3060                 }
3061                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3062                         continue;
3063                 }
3064                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3065                                         CONTROL_TIMEOUT(),
3066                                         nodemap->nodes[j].pnn);
3067                 if (state == NULL) {
3068                         /* we failed to send the control, treat this as 
3069                            an error and try again next iteration
3070                         */                      
3071                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3072                         talloc_free(mem_ctx);
3073                         return MONITOR_FAILED;
3074                 }
3075
3076                 /* set up the callback functions */
3077                 state->async.fn = verify_recmaster_callback;
3078                 state->async.private_data = rmdata;
3079
3080                 /* one more control to wait for to complete */
3081                 rmdata->count++;
3082         }
3083
3084
3085         /* now wait for up to the maximum number of seconds allowed
3086            or until all nodes we expect a response from has replied
3087         */
3088         while (rmdata->count > 0) {
3089                 tevent_loop_once(ctdb->ev);
3090         }
3091
3092         status = rmdata->status;
3093         talloc_free(mem_ctx);
3094         return status;
3095 }
3096
3097 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3098                                     struct ctdb_recoverd *rec)
3099 {
3100         struct ctdb_iface_list_old *ifaces = NULL;
3101         TALLOC_CTX *mem_ctx;
3102         bool ret = false;
3103
3104         mem_ctx = talloc_new(NULL);
3105
3106         /* Read the interfaces from the local node */
3107         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3108                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3109                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3110                 /* We could return an error.  However, this will be
3111                  * rare so we'll decide that the interfaces have
3112                  * actually changed, just in case.
3113                  */
3114                 talloc_free(mem_ctx);
3115                 return true;
3116         }
3117
3118         if (!rec->ifaces) {
3119                 /* We haven't been here before so things have changed */
3120                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3121                 ret = true;
3122         } else if (rec->ifaces->num != ifaces->num) {
3123                 /* Number of interfaces has changed */
3124                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3125                                      rec->ifaces->num, ifaces->num));
3126                 ret = true;
3127         } else {
3128                 /* See if interface names or link states have changed */
3129                 int i;
3130                 for (i = 0; i < rec->ifaces->num; i++) {
3131                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
3132                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3133                                 DEBUG(DEBUG_NOTICE,
3134                                       ("Interface in slot %d changed: %s => %s\n",
3135                                        i, iface->name, ifaces->ifaces[i].name));
3136                                 ret = true;
3137                                 break;
3138                         }
3139                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3140                                 DEBUG(DEBUG_NOTICE,
3141                                       ("Interface %s changed state: %d => %d\n",
3142                                        iface->name, iface->link_state,
3143                                        ifaces->ifaces[i].link_state));
3144                                 ret = true;
3145                                 break;
3146                         }
3147                 }
3148         }
3149
3150         talloc_free(rec->ifaces);
3151         rec->ifaces = talloc_steal(rec, ifaces);
3152
3153         talloc_free(mem_ctx);
3154         return ret;
3155 }
3156
3157 /* called to check that the local allocation of public ip addresses is ok.
3158 */
3159 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
3160 {
3161         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3162         int ret, j;
3163         bool need_takeover_run = false;
3164
3165         if (interfaces_have_changed(ctdb, rec)) {
3166                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3167                                      "local node %u - force takeover run\n",
3168                                      pnn));
3169                 need_takeover_run = true;
3170         }
3171
3172         /* verify that we have the ip addresses we should have
3173            and we don't have ones we shouldnt have.
3174            if we find an inconsistency we set recmode to
3175            active on the local node and wait for the recmaster
3176            to do a full blown recovery.
3177            also if the pnn is -1 and we are healthy and can host the ip
3178            we also request a ip reallocation.
3179         */
3180         if (ctdb->tunable.disable_ip_failover == 0) {
3181                 struct ctdb_public_ip_list_old *ips = NULL;
3182
3183                 /* read the *available* IPs from the local node */
3184                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3185                 if (ret != 0) {
3186                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3187                         talloc_free(mem_ctx);
3188                         return -1;
3189                 }
3190
3191                 for (j=0; j<ips->num; j++) {
3192                         if (ips->ips[j].pnn == -1 &&
3193                             nodemap->nodes[pnn].flags == 0) {
3194                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3195                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3196                                 need_takeover_run = true;
3197                         }
3198                 }
3199
3200                 talloc_free(ips);
3201
3202                 /* read the *known* IPs from the local node */
3203                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3204                 if (ret != 0) {
3205                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3206                         talloc_free(mem_ctx);
3207                         return -1;
3208                 }
3209
3210                 for (j=0; j<ips->num; j++) {
3211                         if (ips->ips[j].pnn == pnn) {
3212                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3213                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3214                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3215                                         need_takeover_run = true;
3216                                 }
3217                         } else {
3218                                 if (ctdb->do_checkpublicip &&
3219                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3220
3221                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3222                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3223
3224                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3225                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3226                                         }
3227                                 }
3228                         }
3229                 }
3230         }
3231
3232         if (need_takeover_run) {
3233                 struct ctdb_srvid_message rd;
3234                 TDB_DATA data;
3235
3236                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3237
3238                 rd.pnn = ctdb->pnn;
3239                 rd.srvid = 0;
3240                 data.dptr = (uint8_t *)&rd;
3241                 data.dsize = sizeof(rd);
3242
3243                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3244                 if (ret != 0) {
3245                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3246                 }
3247         }
3248         talloc_free(mem_ctx);
3249         return 0;
3250 }
3251
3252
3253 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3254 {
3255         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3256
3257         if (node_pnn >= ctdb->num_nodes) {
3258                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3259                 return;
3260         }
3261
3262         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3263
3264 }
3265
3266 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3267         struct ctdb_node_map_old *nodemap,
3268         struct ctdb_node_map_old **remote_nodemaps)
3269 {
3270         uint32_t *nodes;
3271
3272         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3273         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3274                                         nodes, 0,
3275                                         CONTROL_TIMEOUT(), false, tdb_null,
3276                                         async_getnodemap_callback,
3277                                         NULL,
3278                                         remote_nodemaps) != 0) {
3279                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3280
3281                 return -1;
3282         }
3283
3284         return 0;
3285 }
3286
3287 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3288 {
3289         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3290         const char *reclockfile;
3291
3292         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3293                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3294                 talloc_free(tmp_ctx);
3295                 return -1;      
3296         }
3297
3298         if (reclockfile == NULL) {
3299                 if (ctdb->recovery_lock_file != NULL) {
3300                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3301                         talloc_free(ctdb->recovery_lock_file);
3302                         ctdb->recovery_lock_file = NULL;
3303                         ctdb_recovery_unlock(ctdb);
3304                 }
3305                 talloc_free(tmp_ctx);
3306                 return 0;
3307         }
3308
3309         if (ctdb->recovery_lock_file == NULL) {
3310                 DEBUG(DEBUG_NOTICE,
3311                       ("Recovery lock file enabled (%s)\n", reclockfile));
3312                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3313                 ctdb_recovery_unlock(ctdb);
3314                 talloc_free(tmp_ctx);
3315                 return 0;
3316         }
3317
3318
3319         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3320                 talloc_free(tmp_ctx);
3321                 return 0;
3322         }
3323
3324         DEBUG(DEBUG_NOTICE,
3325               ("Recovery lock file changed (now %s)\n", reclockfile));
3326         talloc_free(ctdb->recovery_lock_file);
3327         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3328         ctdb_recovery_unlock(ctdb);
3329
3330         talloc_free(tmp_ctx);
3331         return 0;
3332 }
3333
3334 static enum monitor_result validate_recovery_master(struct ctdb_recoverd *rec,
3335                                                     TALLOC_CTX *mem_ctx)
3336 {
3337         struct ctdb_context *ctdb = rec->ctdb;
3338         uint32_t pnn = ctdb_get_pnn(ctdb);
3339         struct ctdb_node_map_old *nodemap = rec->nodemap;
3340         struct ctdb_node_map_old *recmaster_nodemap = NULL;
3341         int ret;
3342
3343         /* When recovery daemon is started, recmaster is set to
3344          * "unknown" so it knows to start an election.
3345          */
3346         if (rec->recmaster == CTDB_UNKNOWN_PNN) {
3347                 DEBUG(DEBUG_NOTICE,
3348                       ("Initial recovery master set - forcing election\n"));
3349                 return MONITOR_ELECTION_NEEDED;
3350         }
3351
3352         /*
3353          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3354          * but we have, then force an election and try to become the new
3355          * recmaster.
3356          */
3357         if (!ctdb_node_has_capabilities(rec->caps,
3358                                         rec->recmaster,
3359                                         CTDB_CAP_RECMASTER) &&
3360             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3361             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3362                 DEBUG(DEBUG_ERR,
3363                       (" Current recmaster node %u does not have CAP_RECMASTER,"
3364                        " but we (node %u) have - force an election\n",
3365                        rec->recmaster, pnn));
3366                 return MONITOR_ELECTION_NEEDED;
3367         }
3368
3369         /* Verify that the master node has not been deleted.  This
3370          * should not happen because a node should always be shutdown
3371          * before being deleted, causing a new master to be elected
3372          * before now.  However, if something strange has happened
3373          * then checking here will ensure we don't index beyond the
3374          * end of the nodemap array. */
3375         if (rec->recmaster >= nodemap->num) {
3376                 DEBUG(DEBUG_ERR,
3377                       ("Recmaster node %u has been deleted. Force election\n",
3378                        rec->recmaster));
3379                 return MONITOR_ELECTION_NEEDED;
3380         }
3381
3382         /* if recovery master is disconnected/deleted we must elect a new recmaster */
3383         if (nodemap->nodes[rec->recmaster].flags &
3384             (NODE_FLAGS_DISCONNECTED|NODE_FLAGS_DELETED)) {
3385                 DEBUG(DEBUG_NOTICE,
3386                       ("Recmaster node %u is disconnected/deleted. Force election\n",
3387                        rec->recmaster));
3388                 return MONITOR_ELECTION_NEEDED;
3389         }
3390
3391         /* get nodemap from the recovery master to check if it is inactive */
3392         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->recmaster,
3393                                    mem_ctx, &recmaster_nodemap);
3394         if (ret != 0) {
3395                 DEBUG(DEBUG_ERR,
3396                       (__location__
3397                        " Unable to get nodemap from recovery master %u\n",
3398                           rec->recmaster));
3399                 return MONITOR_FAILED;
3400         }
3401
3402
3403         if ((recmaster_nodemap->nodes[rec->recmaster].flags & NODE_FLAGS_INACTIVE) &&
3404             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3405                 DEBUG(DEBUG_NOTICE,
3406                       ("Recmaster node %u is inactive. Force election\n",
3407                        rec->recmaster));
3408                 /*
3409                  * update our nodemap to carry the recmaster's notion of
3410                  * its own flags, so that we don't keep freezing the
3411                  * inactive recmaster node...
3412                  */
3413                 nodemap->nodes[rec->recmaster].flags =
3414                         recmaster_nodemap->nodes[rec->recmaster].flags;
3415                 return MONITOR_ELECTION_NEEDED;
3416         }
3417
3418         return MONITOR_OK;
3419 }
3420
3421 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3422                       TALLOC_CTX *mem_ctx)
3423 {
3424         uint32_t pnn;
3425         struct ctdb_node_map_old *nodemap=NULL;
3426         struct ctdb_node_map_old **remote_nodemaps=NULL;
3427         struct ctdb_vnn_map *vnnmap=NULL;
3428         struct ctdb_vnn_map *remote_vnnmap=NULL;
3429         uint32_t num_lmasters;
3430         int32_t debug_level;
3431         int i, j, ret;
3432         bool self_ban;
3433
3434
3435         /* verify that the main daemon is still running */
3436         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3437                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3438                 exit(-1);
3439         }
3440
3441         /* ping the local daemon to tell it we are alive */
3442         ctdb_ctrl_recd_ping(ctdb);
3443
3444         if (rec->election_timeout) {
3445                 /* an election is in progress */
3446                 return;
3447         }
3448
3449         /* read the debug level from the parent and update locally */
3450         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3451         if (ret !=0) {
3452                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3453                 return;
3454         }
3455         DEBUGLEVEL = debug_level;
3456
3457         /* get relevant tunables */
3458         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3459         if (ret != 0) {
3460                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3461                 return;
3462         }
3463
3464         /* get runstate */
3465         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3466                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3467         if (ret != 0) {
3468                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3469                 return;
3470         }
3471
3472         /* get the current recovery lock file from the server */
3473         if (update_recovery_lock_file(ctdb) != 0) {
3474                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3475                 return;
3476         }
3477
3478         pnn = ctdb_get_pnn(ctdb);
3479
3480         /* get nodemap */
3481         TALLOC_FREE(rec->nodemap);
3482         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3483         if (ret != 0) {
3484                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3485                 return;
3486         }
3487         nodemap = rec->nodemap;
3488
3489         /* remember our own node flags */
3490         rec->node_flags = nodemap->nodes[pnn].flags;
3491
3492         ban_misbehaving_nodes(rec, &self_ban);
3493         if (self_ban) {
3494                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3495                 return;
3496         }
3497
3498         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3499            also frozen and that the recmode is set to active.
3500         */
3501         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3502                 /* If this node has become inactive then we want to
3503                  * reduce the chances of it taking over the recovery
3504                  * master role when it becomes active again.  This
3505                  * helps to stabilise the recovery master role so that
3506                  * it stays on the most stable node.
3507                  */
3508                 rec->priority_time = timeval_current();
3509
3510                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3511                 if (ret != 0) {
3512                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3513                 }
3514                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3515                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3516
3517                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3518                         if (ret != 0) {
3519                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3520
3521                                 return;
3522                         }
3523                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3524                         if (ret != 0) {
3525                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3526                                 return;
3527                         }
3528                 }
3529
3530                 /* If this node is stopped or banned then it is not the recovery
3531                  * master, so don't do anything. This prevents stopped or banned
3532                  * node from starting election and sending unnecessary controls.
3533                  */
3534                 return;
3535         }
3536
3537         /* If we are not the recmaster then do some housekeeping */
3538         if (rec->recmaster != pnn) {
3539                 /* Ignore any IP reallocate requests - only recmaster
3540                  * processes them
3541                  */
3542                 TALLOC_FREE(rec->reallocate_requests);
3543                 /* Clear any nodes that should be force rebalanced in
3544                  * the next takeover run.  If the recovery master role
3545                  * has moved then we don't want to process these some
3546                  * time in the future.
3547                  */
3548                 TALLOC_FREE(rec->force_rebalance_nodes);
3549         }
3550
3551         /* Retrieve capabilities from all connected nodes */
3552         ret = update_capabilities(rec, nodemap);
3553         if (ret != 0) {
3554                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3555                 return;
3556         }
3557
3558         switch (validate_recovery_master(rec, mem_ctx)) {
3559         case MONITOR_RECOVERY_NEEDED:
3560                 /* can not happen */
3561                 return;
3562         case MONITOR_ELECTION_NEEDED:
3563                 force_election(rec, pnn, nodemap);
3564                 return;
3565         case MONITOR_OK:
3566                 break;
3567         case MONITOR_FAILED:
3568                 return;
3569         }
3570
3571         /* verify that we have all ip addresses we should have and we dont
3572          * have addresses we shouldnt have.
3573          */ 
3574         if (ctdb->tunable.disable_ip_failover == 0 &&
3575             !ctdb_op_is_disabled(rec->takeover_run)) {
3576                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3577                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3578                 }
3579         }
3580
3581
3582         /* if we are not the recmaster then we do not need to check
3583            if recovery is needed
3584          */
3585         if (pnn != rec->recmaster) {
3586                 return;
3587         }
3588
3589
3590         /* ensure our local copies of flags are right */
3591         ret = update_local_flags(rec, nodemap);
3592         if (ret == MONITOR_ELECTION_NEEDED) {
3593                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3594                 force_election(rec, pnn, nodemap);
3595                 return;
3596         }
3597         if (ret != MONITOR_OK) {
3598                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3599                 return;
3600         }
3601
3602         if (ctdb->num_nodes != nodemap->num) {
3603                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3604                 ctdb_load_nodes_file(ctdb);
3605                 return;
3606         }
3607
3608         /* verify that all active nodes agree that we are the recmaster */
3609         switch (verify_recmaster(rec, nodemap, pnn)) {
3610         case MONITOR_RECOVERY_NEEDED:
3611                 /* can not happen */
3612                 return;
3613         case MONITOR_ELECTION_NEEDED:
3614                 force_election(rec, pnn, nodemap);
3615                 return;
3616         case MONITOR_OK:
3617                 break;
3618         case MONITOR_FAILED:
3619                 return;
3620         }
3621
3622
3623         /* get the vnnmap */
3624         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3625         if (ret != 0) {
3626                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3627                 return;
3628         }
3629
3630         if (rec->need_recovery) {
3631                 /* a previous recovery didn't finish */
3632                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3633                 return;
3634         }
3635
3636         /* verify that all active nodes are in normal mode 
3637            and not in recovery mode 
3638         */
3639         switch (verify_recmode(ctdb, nodemap)) {
3640         case MONITOR_RECOVERY_NEEDED:
3641                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3642                 return;
3643         case MONITOR_FAILED:
3644                 return;
3645         case MONITOR_ELECTION_NEEDED:
3646                 /* can not happen */
3647         case MONITOR_OK:
3648                 break;
3649         }
3650
3651
3652         if (ctdb->recovery_lock_file != NULL) {
3653                 /* We must already hold the recovery lock */
3654                 if (!ctdb_recovery_have_lock(ctdb)) {
3655                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3656                         ctdb_set_culprit(rec, ctdb->pnn);
3657                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3658                         return;
3659                 }
3660         }
3661
3662
3663         /* if there are takeovers requested, perform it and notify the waiters */
3664         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3665             rec->reallocate_requests) {
3666                 process_ipreallocate_requests(ctdb, rec);
3667         }
3668
3669         /* If recoveries are disabled then there is no use doing any
3670          * nodemap or flags checks.  Recoveries might be disabled due
3671          * to "reloadnodes", so doing these checks might cause an
3672          * unnecessary recovery.  */
3673         if (ctdb_op_is_disabled(rec->recovery)) {
3674                 return;
3675         }
3676
3677         /* get the nodemap for all active remote nodes
3678          */
3679         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3680         if (remote_nodemaps == NULL) {
3681                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3682                 return;
3683         }
3684         for(i=0; i<nodemap->num; i++) {
3685                 remote_nodemaps[i] = NULL;
3686         }
3687         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3688                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3689                 return;
3690         } 
3691
3692         /* verify that all other nodes have the same nodemap as we have
3693         */
3694         for (j=0; j<nodemap->num; j++) {
3695                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3696                         continue;
3697                 }
3698
3699                 if (remote_nodemaps[j] == NULL) {
3700                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3701                         ctdb_set_culprit(rec, j);
3702
3703                         return;
3704                 }
3705
3706                 /* if the nodes disagree on how many nodes there are
3707                    then this is a good reason to try recovery
3708                  */
3709                 if (remote_nodemaps[j]->num != nodemap->num) {
3710                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3711                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3712                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3713                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3714                         return;
3715                 }
3716
3717                 /* if the nodes disagree on which nodes exist and are
3718                    active, then that is also a good reason to do recovery
3719                  */
3720                 for (i=0;i<nodemap->num;i++) {
3721                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3722                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3723                                           nodemap->nodes[j].pnn, i, 
3724                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3725                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3726                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3727                                             vnnmap);
3728                                 return;
3729                         }
3730                 }
3731         }
3732
3733         /*
3734          * Update node flags obtained from each active node. This ensure we have
3735          * up-to-date information for all the nodes.
3736          */
3737         for (j=0; j<nodemap->num; j++) {
3738                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3739                         continue;
3740                 }
3741                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3742         }
3743
3744         for (j=0; j<nodemap->num; j++) {
3745                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3746                         continue;
3747                 }
3748
3749                 /* verify the flags are consistent
3750                 */
3751                 for (i=0; i<nodemap->num; i++) {
3752                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3753                                 continue;
3754                         }
3755                         
3756                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3757                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3758                                   nodemap->nodes[j].pnn, 
3759                                   nodemap->nodes[i].pnn, 
3760                                   remote_nodemaps[j]->nodes[i].flags,
3761                                   nodemap->nodes[i].flags));
3762                                 if (i == j) {
3763                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3764                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3765                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3766                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3767                                                     vnnmap);
3768                                         return;
3769                                 } else {
3770                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3771                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3772                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3773                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3774                                                     vnnmap);
3775                                         return;
3776                                 }
3777                         }
3778                 }
3779         }
3780
3781
3782         /* count how many active nodes there are */
3783         num_lmasters  = 0;
3784         for (i=0; i<nodemap->num; i++) {
3785                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3786                         if (ctdb_node_has_capabilities(rec->caps,
3787                                                        ctdb->nodes[i]->pnn,
3788                                                        CTDB_CAP_LMASTER)) {
3789                                 num_lmasters++;
3790                         }
3791                 }
3792         }
3793
3794
3795         /* There must be the same number of lmasters in the vnn map as
3796          * there are active nodes with the lmaster capability...  or
3797          * do a recovery.
3798          */
3799         if (vnnmap->size != num_lmasters) {
3800                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3801                           vnnmap->size, num_lmasters));
3802                 ctdb_set_culprit(rec, ctdb->pnn);
3803                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3804                 return;
3805         }
3806
3807         /* verify that all active nodes in the nodemap also exist in 
3808            the vnnmap.
3809          */
3810         for (j=0; j<nodemap->num; j++) {
3811                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3812                         continue;
3813                 }
3814                 if (nodemap->nodes[j].pnn == pnn) {
3815                         continue;
3816                 }
3817
3818                 for (i=0; i<vnnmap->size; i++) {
3819                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3820                                 break;
3821                         }
3822                 }
3823                 if (i == vnnmap->size) {
3824                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3825                                   nodemap->nodes[j].pnn));
3826                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3827                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3828                         return;
3829                 }
3830         }
3831
3832         
3833         /* verify that all other nodes have the same vnnmap
3834            and are from the same generation
3835          */
3836         for (j=0; j<nodemap->num; j++) {
3837                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3838                         continue;
3839                 }
3840                 if (nodemap->nodes[j].pnn == pnn) {
3841                         continue;
3842                 }
3843
3844                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3845                                           mem_ctx, &remote_vnnmap);
3846                 if (ret != 0) {
3847                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3848                                   nodemap->nodes[j].pnn));
3849                         return;
3850                 }
3851
3852                 /* verify the vnnmap generation is the same */
3853                 if (vnnmap->generation != remote_vnnmap->generation) {
3854                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3855                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3856                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3857                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3858                         return;
3859                 }
3860
3861                 /* verify the vnnmap size is the same */
3862                 if (vnnmap->size != remote_vnnmap->size) {
3863                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3864                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3865                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3866                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3867                         return;
3868                 }
3869
3870                 /* verify the vnnmap is the same */
3871                 for (i=0;i<vnnmap->size;i++) {
3872                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3873                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3874                                           nodemap->nodes[j].pnn));
3875                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3876                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3877                                             vnnmap);
3878                                 return;
3879                         }
3880                 }
3881         }
3882
3883         /* we might need to change who has what IP assigned */
3884         if (rec->need_takeover_run) {
3885                 /* If takeover run fails, then the offending nodes are
3886                  * assigned ban culprit counts. And we re-try takeover.
3887                  * If takeover run fails repeatedly, the node would get
3888                  * banned.
3889                  */
3890                 do_takeover_run(rec, nodemap, true);
3891         }
3892 }
3893
3894 /*
3895   the main monitoring loop
3896  */
3897 static void monitor_cluster(struct ctdb_context *ctdb)
3898 {
3899         struct ctdb_recoverd *rec;
3900
3901         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3902
3903         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3904         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3905
3906         rec->ctdb = ctdb;
3907         rec->recmaster = CTDB_UNKNOWN_PNN;
3908
3909         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
3910         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
3911
3912         rec->recovery = ctdb_op_init(rec, "recoveries");
3913         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
3914
3915         rec->priority_time = timeval_current();
3916
3917         /* register a message port for sending memory dumps */
3918         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3919
3920         /* register a message port for recovery elections */
3921         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
3922
3923         /* when nodes are disabled/enabled */
3924         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3925
3926         /* when we are asked to puch out a flag change */
3927         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3928
3929         /* register a message port for vacuum fetch */
3930         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3931
3932         /* register a message port for reloadnodes  */
3933         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3934
3935         /* register a message port for performing a takeover run */
3936         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3937
3938         /* register a message port for disabling the ip check for a short while */
3939         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3940
3941         /* register a message port for updating the recovery daemons node assignment for an ip */
3942         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3943
3944         /* register a message port for forcing a rebalance of a node next
3945            reallocation */
3946         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3947
3948         /* Register a message port for disabling takeover runs */
3949         ctdb_client_set_message_handler(ctdb,
3950                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
3951                                         disable_takeover_runs_handler, rec);
3952
3953         /* Register a message port for disabling recoveries */
3954         ctdb_client_set_message_handler(ctdb,
3955                                         CTDB_SRVID_DISABLE_RECOVERIES,
3956                                         disable_recoveries_handler, rec);
3957
3958         /* register a message port for detaching database */
3959         ctdb_client_set_message_handler(ctdb,
3960                                         CTDB_SRVID_DETACH_DATABASE,
3961                                         detach_database_handler, rec);
3962
3963         for (;;) {
3964                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3965                 struct timeval start;
3966                 double elapsed;
3967
3968                 if (!mem_ctx) {
3969                         DEBUG(DEBUG_CRIT,(__location__
3970                                           " Failed to create temp context\n"));
3971                         exit(-1);
3972                 }
3973
3974                 start = timeval_current();
3975                 main_loop(ctdb, rec, mem_ctx);
3976                 talloc_free(mem_ctx);
3977
3978                 /* we only check for recovery once every second */
3979                 elapsed = timeval_elapsed(&start);
3980                 if (elapsed < ctdb->tunable.recover_interval) {
3981                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3982                                           - elapsed);
3983                 }
3984         }
3985 }
3986
3987 /*
3988   event handler for when the main ctdbd dies
3989  */
3990 static void ctdb_recoverd_parent(struct tevent_context *ev,
3991                                  struct tevent_fd *fde,
3992                                  uint16_t flags, void *private_data)
3993 {
3994         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3995         _exit(1);
3996 }
3997
3998 /*
3999   called regularly to verify that the recovery daemon is still running
4000  */
4001 static void ctdb_check_recd(struct tevent_context *ev,
4002                             struct tevent_timer *te,
4003                             struct timeval yt, void *p)
4004 {
4005         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4006
4007         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4008                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4009
4010                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
4011                                  ctdb_restart_recd, ctdb);
4012
4013                 return;
4014         }
4015
4016         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4017                          timeval_current_ofs(30, 0),
4018                          ctdb_check_recd, ctdb);
4019 }
4020
4021 static void recd_sig_child_handler(struct tevent_context *ev,
4022                                    struct tevent_signal *se, int signum,
4023                                    int count, void *dont_care,
4024                                    void *private_data)
4025 {
4026 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4027         int status;
4028         pid_t pid = -1;
4029
4030         while (pid != 0) {
4031                 pid = waitpid(-1, &status, WNOHANG);
4032                 if (pid == -1) {
4033                         if (errno != ECHILD) {
4034                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4035                         }
4036                         return;
4037                 }
4038                 if (pid > 0) {
4039                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4040                 }
4041         }
4042 }
4043
4044 /*
4045   startup the recovery daemon as a child of the main ctdb daemon
4046  */
4047 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4048 {
4049         int fd[2];
4050         struct tevent_signal *se;
4051         struct tevent_fd *fde;
4052
4053         if (pipe(fd) != 0) {
4054                 return -1;
4055         }
4056
4057         ctdb->recoverd_pid = ctdb_fork(ctdb);
4058         if (ctdb->recoverd_pid == -1) {
4059                 return -1;
4060         }
4061
4062         if (ctdb->recoverd_pid != 0) {
4063                 talloc_free(ctdb->recd_ctx);
4064                 ctdb->recd_ctx = talloc_new(ctdb);
4065                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4066
4067                 close(fd[0]);
4068                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4069                                  timeval_current_ofs(30, 0),
4070                                  ctdb_check_recd, ctdb);
4071                 return 0;
4072         }
4073
4074         close(fd[1]);
4075
4076         srandom(getpid() ^ time(NULL));
4077
4078         prctl_set_comment("ctdb_recovered");
4079         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4080                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4081                 exit(1);
4082         }
4083
4084         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4085
4086         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4087                             ctdb_recoverd_parent, &fd[0]);
4088         tevent_fd_set_auto_close(fde);
4089
4090         /* set up a handler to pick up sigchld */
4091         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4092                                recd_sig_child_handler, ctdb);
4093         if (se == NULL) {
4094                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4095                 exit(1);
4096         }
4097
4098         monitor_cluster(ctdb);
4099
4100         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4101         return -1;
4102 }
4103
4104 /*
4105   shutdown the recovery daemon
4106  */
4107 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4108 {
4109         if (ctdb->recoverd_pid == 0) {
4110                 return;
4111         }
4112
4113         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4114         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4115
4116         TALLOC_FREE(ctdb->recd_ctx);
4117         TALLOC_FREE(ctdb->recd_ping_count);
4118 }
4119
4120 static void ctdb_restart_recd(struct tevent_context *ev,
4121                               struct tevent_timer *te,
4122                               struct timeval t, void *private_data)
4123 {
4124         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4125
4126         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4127         ctdb_stop_recoverd(ctdb);
4128         ctdb_start_recoverd(ctdb);
4129 }