1ef7560bc4cd6ae789bd4037b563d64c109c62e6
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25
26 #include <popt.h>
27 #include <talloc.h>
28 #include <tevent.h>
29 #include <tdb.h>
30
31 #include "lib/tdb_wrap/tdb_wrap.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
34 #include "lib/util/samba_util.h"
35
36 #include "ctdb_private.h"
37 #include "ctdb_client.h"
38 #include "ctdb_logging.h"
39
40 #include "common/system.h"
41 #include "common/cmdline.h"
42 #include "common/common.h"
43
44
45 /* List of SRVID requests that need to be processed */
46 struct srvid_list {
47         struct srvid_list *next, *prev;
48         struct ctdb_srvid_message *request;
49 };
50
51 struct srvid_requests {
52         struct srvid_list *requests;
53 };
54
55 static void srvid_request_reply(struct ctdb_context *ctdb,
56                                 struct ctdb_srvid_message *request,
57                                 TDB_DATA result)
58 {
59         /* Someone that sent srvid==0 does not want a reply */
60         if (request->srvid == 0) {
61                 talloc_free(request);
62                 return;
63         }
64
65         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
66                                      result) == 0) {
67                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
68                                   (unsigned)request->pnn,
69                                   (unsigned long long)request->srvid));
70         } else {
71                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
72                                  (unsigned)request->pnn,
73                                  (unsigned long long)request->srvid));
74         }
75
76         talloc_free(request);
77 }
78
79 static void srvid_requests_reply(struct ctdb_context *ctdb,
80                                  struct srvid_requests **requests,
81                                  TDB_DATA result)
82 {
83         struct srvid_list *r;
84
85         for (r = (*requests)->requests; r != NULL; r = r->next) {
86                 srvid_request_reply(ctdb, r->request, result);
87         }
88
89         /* Free the list structure... */
90         TALLOC_FREE(*requests);
91 }
92
93 static void srvid_request_add(struct ctdb_context *ctdb,
94                               struct srvid_requests **requests,
95                               struct ctdb_srvid_message *request)
96 {
97         struct srvid_list *t;
98         int32_t ret;
99         TDB_DATA result;
100
101         if (*requests == NULL) {
102                 *requests = talloc_zero(ctdb, struct srvid_requests);
103                 if (*requests == NULL) {
104                         goto nomem;
105                 }
106         }
107
108         t = talloc_zero(*requests, struct srvid_list);
109         if (t == NULL) {
110                 /* If *requests was just allocated above then free it */
111                 if ((*requests)->requests == NULL) {
112                         TALLOC_FREE(*requests);
113                 }
114                 goto nomem;
115         }
116
117         t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
118         DLIST_ADD((*requests)->requests, t);
119
120         return;
121
122 nomem:
123         /* Failed to add the request to the list.  Send a fail. */
124         DEBUG(DEBUG_ERR, (__location__
125                           " Out of memory, failed to queue SRVID request\n"));
126         ret = -ENOMEM;
127         result.dsize = sizeof(ret);
128         result.dptr = (uint8_t *)&ret;
129         srvid_request_reply(ctdb, request, result);
130 }
131
132 /* An abstraction to allow an operation (takeover runs, recoveries,
133  * ...) to be disabled for a given timeout */
134 struct ctdb_op_state {
135         struct tevent_timer *timer;
136         bool in_progress;
137         const char *name;
138 };
139
140 static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
141 {
142         struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
143
144         if (state != NULL) {
145                 state->in_progress = false;
146                 state->name = name;
147         }
148
149         return state;
150 }
151
152 static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
153 {
154         return state->timer != NULL;
155 }
156
157 static bool ctdb_op_begin(struct ctdb_op_state *state)
158 {
159         if (ctdb_op_is_disabled(state)) {
160                 DEBUG(DEBUG_NOTICE,
161                       ("Unable to begin - %s are disabled\n", state->name));
162                 return false;
163         }
164
165         state->in_progress = true;
166         return true;
167 }
168
169 static bool ctdb_op_end(struct ctdb_op_state *state)
170 {
171         return state->in_progress = false;
172 }
173
174 static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
175 {
176         return state->in_progress;
177 }
178
179 static void ctdb_op_enable(struct ctdb_op_state *state)
180 {
181         TALLOC_FREE(state->timer);
182 }
183
184 static void ctdb_op_timeout_handler(struct tevent_context *ev,
185                                     struct tevent_timer *te,
186                                     struct timeval yt, void *p)
187 {
188         struct ctdb_op_state *state =
189                 talloc_get_type(p, struct ctdb_op_state);
190
191         DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
192         ctdb_op_enable(state);
193 }
194
195 static int ctdb_op_disable(struct ctdb_op_state *state,
196                            struct tevent_context *ev,
197                            uint32_t timeout)
198 {
199         if (timeout == 0) {
200                 DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
201                 ctdb_op_enable(state);
202                 return 0;
203         }
204
205         if (state->in_progress) {
206                 DEBUG(DEBUG_ERR,
207                       ("Unable to disable %s - in progress\n", state->name));
208                 return -EAGAIN;
209         }
210
211         DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
212                             state->name, timeout));
213
214         /* Clear any old timers */
215         talloc_free(state->timer);
216
217         /* Arrange for the timeout to occur */
218         state->timer = tevent_add_timer(ev, state,
219                                         timeval_current_ofs(timeout, 0),
220                                         ctdb_op_timeout_handler, state);
221         if (state->timer == NULL) {
222                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
223                 return -ENOMEM;
224         }
225
226         return 0;
227 }
228
229 struct ctdb_banning_state {
230         uint32_t count;
231         struct timeval last_reported_time;
232 };
233
234 /*
235   private state of recovery daemon
236  */
237 struct ctdb_recoverd {
238         struct ctdb_context *ctdb;
239         uint32_t recmaster;
240         uint32_t last_culprit_node;
241         struct ctdb_node_map_old *nodemap;
242         struct timeval priority_time;
243         bool need_takeover_run;
244         bool need_recovery;
245         uint32_t node_flags;
246         struct tevent_timer *send_election_te;
247         struct tevent_timer *election_timeout;
248         struct srvid_requests *reallocate_requests;
249         struct ctdb_op_state *takeover_run;
250         struct ctdb_op_state *recovery;
251         struct ctdb_iface_list_old *ifaces;
252         uint32_t *force_rebalance_nodes;
253         struct ctdb_node_capabilities *caps;
254 };
255
256 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
257 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
258
259 static void ctdb_restart_recd(struct tevent_context *ev,
260                               struct tevent_timer *te, struct timeval t,
261                               void *private_data);
262
263 /*
264   ban a node for a period of time
265  */
266 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
267 {
268         int ret;
269         struct ctdb_context *ctdb = rec->ctdb;
270         struct ctdb_ban_state bantime;
271
272         if (!ctdb_validate_pnn(ctdb, pnn)) {
273                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
274                 return;
275         }
276
277         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
278
279         bantime.pnn  = pnn;
280         bantime.time = ban_time;
281
282         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
283         if (ret != 0) {
284                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
285                 return;
286         }
287
288 }
289
290 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
291
292
293 /*
294   remember the trouble maker
295  */
296 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
297 {
298         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
299         struct ctdb_banning_state *ban_state;
300
301         if (culprit > ctdb->num_nodes) {
302                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
303                 return;
304         }
305
306         /* If we are banned or stopped, do not set other nodes as culprits */
307         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
308                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
309                 return;
310         }
311
312         if (ctdb->nodes[culprit]->ban_state == NULL) {
313                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
314                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
315
316                 
317         }
318         ban_state = ctdb->nodes[culprit]->ban_state;
319         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
320                 /* this was the first time in a long while this node
321                    misbehaved so we will forgive any old transgressions.
322                 */
323                 ban_state->count = 0;
324         }
325
326         ban_state->count += count;
327         ban_state->last_reported_time = timeval_current();
328         rec->last_culprit_node = culprit;
329 }
330
331 /*
332   remember the trouble maker
333  */
334 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
335 {
336         ctdb_set_culprit_count(rec, culprit, 1);
337 }
338
339
340 /* this callback is called for every node that failed to execute the
341    recovered event
342 */
343 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
344 {
345         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
346
347         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
348
349         ctdb_set_culprit(rec, node_pnn);
350 }
351
352 /*
353   run the "recovered" eventscript on all nodes
354  */
355 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, const char *caller)
356 {
357         TALLOC_CTX *tmp_ctx;
358         uint32_t *nodes;
359         struct ctdb_context *ctdb = rec->ctdb;
360
361         tmp_ctx = talloc_new(ctdb);
362         CTDB_NO_MEMORY(ctdb, tmp_ctx);
363
364         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
365         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
366                                         nodes, 0,
367                                         CONTROL_TIMEOUT(), false, tdb_null,
368                                         NULL, recovered_fail_callback,
369                                         rec) != 0) {
370                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
371
372                 talloc_free(tmp_ctx);
373                 return -1;
374         }
375
376         talloc_free(tmp_ctx);
377         return 0;
378 }
379
380 /* this callback is called for every node that failed to execute the
381    start recovery event
382 */
383 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
384 {
385         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
386
387         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
388
389         ctdb_set_culprit(rec, node_pnn);
390 }
391
392 /*
393   run the "startrecovery" eventscript on all nodes
394  */
395 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
396 {
397         TALLOC_CTX *tmp_ctx;
398         uint32_t *nodes;
399         struct ctdb_context *ctdb = rec->ctdb;
400
401         tmp_ctx = talloc_new(ctdb);
402         CTDB_NO_MEMORY(ctdb, tmp_ctx);
403
404         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
406                                         nodes, 0,
407                                         CONTROL_TIMEOUT(), false, tdb_null,
408                                         NULL,
409                                         startrecovery_fail_callback,
410                                         rec) != 0) {
411                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
412                 talloc_free(tmp_ctx);
413                 return -1;
414         }
415
416         talloc_free(tmp_ctx);
417         return 0;
418 }
419
420 /*
421   update the node capabilities for all connected nodes
422  */
423 static int update_capabilities(struct ctdb_recoverd *rec,
424                                struct ctdb_node_map_old *nodemap)
425 {
426         uint32_t *capp;
427         TALLOC_CTX *tmp_ctx;
428         struct ctdb_node_capabilities *caps;
429         struct ctdb_context *ctdb = rec->ctdb;
430
431         tmp_ctx = talloc_new(rec);
432         CTDB_NO_MEMORY(ctdb, tmp_ctx);
433
434         caps = ctdb_get_capabilities(ctdb, tmp_ctx,
435                                      CONTROL_TIMEOUT(), nodemap);
436
437         if (caps == NULL) {
438                 DEBUG(DEBUG_ERR,
439                       (__location__ " Failed to get node capabilities\n"));
440                 talloc_free(tmp_ctx);
441                 return -1;
442         }
443
444         capp = ctdb_get_node_capabilities(caps, ctdb_get_pnn(ctdb));
445         if (capp == NULL) {
446                 DEBUG(DEBUG_ERR,
447                       (__location__
448                        " Capabilities don't include current node.\n"));
449                 talloc_free(tmp_ctx);
450                 return -1;
451         }
452         ctdb->capabilities = *capp;
453
454         TALLOC_FREE(rec->caps);
455         rec->caps = talloc_steal(rec, caps);
456
457         talloc_free(tmp_ctx);
458         return 0;
459 }
460
461 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
462 {
463         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
464
465         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
466         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
467 }
468
469 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
470 {
471         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
472
473         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
474         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
475 }
476
477 /*
478   change recovery mode on all nodes
479  */
480 static int set_recovery_mode(struct ctdb_context *ctdb,
481                              struct ctdb_recoverd *rec,
482                              struct ctdb_node_map_old *nodemap,
483                              uint32_t rec_mode, bool freeze)
484 {
485         TDB_DATA data;
486         uint32_t *nodes;
487         TALLOC_CTX *tmp_ctx;
488
489         tmp_ctx = talloc_new(ctdb);
490         CTDB_NO_MEMORY(ctdb, tmp_ctx);
491
492         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
493
494         data.dsize = sizeof(uint32_t);
495         data.dptr = (unsigned char *)&rec_mode;
496
497         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
498                                         nodes, 0,
499                                         CONTROL_TIMEOUT(),
500                                         false, data,
501                                         NULL, NULL,
502                                         NULL) != 0) {
503                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
504                 talloc_free(tmp_ctx);
505                 return -1;
506         }
507
508         /* freeze all nodes */
509         if (freeze && rec_mode == CTDB_RECOVERY_ACTIVE) {
510                 int i;
511
512                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
513                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
514                                                 nodes, i,
515                                                 CONTROL_TIMEOUT(),
516                                                 false, tdb_null,
517                                                 NULL,
518                                                 set_recmode_fail_callback,
519                                                 rec) != 0) {
520                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
521                                 talloc_free(tmp_ctx);
522                                 return -1;
523                         }
524                 }
525         }
526
527         talloc_free(tmp_ctx);
528         return 0;
529 }
530
531 /*
532   change recovery master on all node
533  */
534 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn)
535 {
536         TDB_DATA data;
537         TALLOC_CTX *tmp_ctx;
538         uint32_t *nodes;
539
540         tmp_ctx = talloc_new(ctdb);
541         CTDB_NO_MEMORY(ctdb, tmp_ctx);
542
543         data.dsize = sizeof(uint32_t);
544         data.dptr = (unsigned char *)&pnn;
545
546         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
547         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
548                                         nodes, 0,
549                                         CONTROL_TIMEOUT(), false, data,
550                                         NULL, NULL,
551                                         NULL) != 0) {
552                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
553                 talloc_free(tmp_ctx);
554                 return -1;
555         }
556
557         talloc_free(tmp_ctx);
558         return 0;
559 }
560
561 /* update all remote nodes to use the same db priority that we have
562    this can fail if the remove node has not yet been upgraded to 
563    support this function, so we always return success and never fail
564    a recovery if this call fails.
565 */
566 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
567         struct ctdb_node_map_old *nodemap, 
568         uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
569 {
570         int db;
571
572         /* step through all local databases */
573         for (db=0; db<dbmap->num;db++) {
574                 struct ctdb_db_priority db_prio;
575                 int ret;
576
577                 db_prio.db_id     = dbmap->dbs[db].db_id;
578                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].db_id, &db_prio.priority);
579                 if (ret != 0) {
580                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].db_id));
581                         continue;
582                 }
583
584                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].db_id, db_prio.priority)); 
585
586                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
587                                                 CTDB_CURRENT_NODE, &db_prio);
588                 if (ret != 0) {
589                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
590                                          db_prio.db_id));
591                 }
592         }
593
594         return 0;
595 }                       
596
597 /*
598   ensure all other nodes have attached to any databases that we have
599  */
600 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
601                                            uint32_t pnn, struct ctdb_dbid_map_old *dbmap, TALLOC_CTX *mem_ctx)
602 {
603         int i, j, db, ret;
604         struct ctdb_dbid_map_old *remote_dbmap;
605
606         /* verify that all other nodes have all our databases */
607         for (j=0; j<nodemap->num; j++) {
608                 /* we don't need to ourself ourselves */
609                 if (nodemap->nodes[j].pnn == pnn) {
610                         continue;
611                 }
612                 /* don't check nodes that are unavailable */
613                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
614                         continue;
615                 }
616
617                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
618                                          mem_ctx, &remote_dbmap);
619                 if (ret != 0) {
620                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
621                         return -1;
622                 }
623
624                 /* step through all local databases */
625                 for (db=0; db<dbmap->num;db++) {
626                         const char *name;
627
628
629                         for (i=0;i<remote_dbmap->num;i++) {
630                                 if (dbmap->dbs[db].db_id == remote_dbmap->dbs[i].db_id) {
631                                         break;
632                                 }
633                         }
634                         /* the remote node already have this database */
635                         if (i!=remote_dbmap->num) {
636                                 continue;
637                         }
638                         /* ok so we need to create this database */
639                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
640                                                   dbmap->dbs[db].db_id, mem_ctx,
641                                                   &name);
642                         if (ret != 0) {
643                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
644                                 return -1;
645                         }
646                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
647                                                  nodemap->nodes[j].pnn,
648                                                  mem_ctx, name,
649                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
650                         if (ret != 0) {
651                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
652                                 return -1;
653                         }
654                 }
655         }
656
657         return 0;
658 }
659
660
661 /*
662   ensure we are attached to any databases that anyone else is attached to
663  */
664 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
665                                           uint32_t pnn, struct ctdb_dbid_map_old **dbmap, TALLOC_CTX *mem_ctx)
666 {
667         int i, j, db, ret;
668         struct ctdb_dbid_map_old *remote_dbmap;
669
670         /* verify that we have all database any other node has */
671         for (j=0; j<nodemap->num; j++) {
672                 /* we don't need to ourself ourselves */
673                 if (nodemap->nodes[j].pnn == pnn) {
674                         continue;
675                 }
676                 /* don't check nodes that are unavailable */
677                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
678                         continue;
679                 }
680
681                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
682                                          mem_ctx, &remote_dbmap);
683                 if (ret != 0) {
684                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
685                         return -1;
686                 }
687
688                 /* step through all databases on the remote node */
689                 for (db=0; db<remote_dbmap->num;db++) {
690                         const char *name;
691
692                         for (i=0;i<(*dbmap)->num;i++) {
693                                 if (remote_dbmap->dbs[db].db_id == (*dbmap)->dbs[i].db_id) {
694                                         break;
695                                 }
696                         }
697                         /* we already have this db locally */
698                         if (i!=(*dbmap)->num) {
699                                 continue;
700                         }
701                         /* ok so we need to create this database and
702                            rebuild dbmap
703                          */
704                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
705                                             remote_dbmap->dbs[db].db_id, mem_ctx, &name);
706                         if (ret != 0) {
707                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
708                                           nodemap->nodes[j].pnn));
709                                 return -1;
710                         }
711                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
712                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
713                         if (ret != 0) {
714                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
715                                 return -1;
716                         }
717                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
718                         if (ret != 0) {
719                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
720                                 return -1;
721                         }
722                 }
723         }
724
725         return 0;
726 }
727
728
729 /*
730   pull the remote database contents from one node into the recdb
731  */
732 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode,
733                                     struct tdb_wrap *recdb, uint32_t dbid)
734 {
735         int ret;
736         TDB_DATA outdata;
737         struct ctdb_marshall_buffer *reply;
738         struct ctdb_rec_data_old *recdata;
739         int i;
740         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
741
742         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
743                                CONTROL_TIMEOUT(), &outdata);
744         if (ret != 0) {
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
746                 talloc_free(tmp_ctx);
747                 return -1;
748         }
749
750         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
751
752         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
753                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
754                 talloc_free(tmp_ctx);
755                 return -1;
756         }
757
758         recdata = (struct ctdb_rec_data_old *)&reply->data[0];
759
760         for (i=0;
761              i<reply->count;
762              recdata = (struct ctdb_rec_data_old *)(recdata->length + (uint8_t *)recdata), i++) {
763                 TDB_DATA key, data;
764                 struct ctdb_ltdb_header *hdr;
765                 TDB_DATA existing;
766
767                 key.dptr = &recdata->data[0];
768                 key.dsize = recdata->keylen;
769                 data.dptr = &recdata->data[key.dsize];
770                 data.dsize = recdata->datalen;
771
772                 hdr = (struct ctdb_ltdb_header *)data.dptr;
773
774                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
775                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
776                         talloc_free(tmp_ctx);
777                         return -1;
778                 }
779
780                 /* fetch the existing record, if any */
781                 existing = tdb_fetch(recdb->tdb, key);
782
783                 if (existing.dptr != NULL) {
784                         struct ctdb_ltdb_header header;
785                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
786                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n",
787                                          (unsigned)existing.dsize, srcnode));
788                                 free(existing.dptr);
789                                 talloc_free(tmp_ctx);
790                                 return -1;
791                         }
792                         header = *(struct ctdb_ltdb_header *)existing.dptr;
793                         free(existing.dptr);
794                         if (!(header.rsn < hdr->rsn ||
795                               (header.dmaster != ctdb_get_pnn(ctdb) &&
796                                header.rsn == hdr->rsn))) {
797                                 continue;
798                         }
799                 }
800
801                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
802                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
803                         talloc_free(tmp_ctx);
804                         return -1;
805                 }
806         }
807
808         talloc_free(tmp_ctx);
809
810         return 0;
811 }
812
813
814 struct pull_seqnum_cbdata {
815         int failed;
816         uint32_t pnn;
817         uint64_t seqnum;
818 };
819
820 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
821 {
822         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
823         uint64_t seqnum;
824
825         if (cb_data->failed != 0) {
826                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
827                 return;
828         }
829
830         if (res != 0) {
831                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
832                 cb_data->failed = 1;
833                 return;
834         }
835
836         if (outdata.dsize != sizeof(uint64_t)) {
837                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
838                 cb_data->failed = -1;
839                 return;
840         }
841
842         seqnum = *((uint64_t *)outdata.dptr);
843
844         if (seqnum > cb_data->seqnum ||
845             (cb_data->pnn == -1 && seqnum == 0)) {
846                 cb_data->seqnum = seqnum;
847                 cb_data->pnn = node_pnn;
848         }
849 }
850
851 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
852 {
853         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
854
855         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
856         cb_data->failed = 1;
857 }
858
859 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
860                                 struct ctdb_recoverd *rec, 
861                                 struct ctdb_node_map_old *nodemap, 
862                                 struct tdb_wrap *recdb, uint32_t dbid)
863 {
864         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
865         uint32_t *nodes;
866         TDB_DATA data;
867         uint32_t outdata[2];
868         struct pull_seqnum_cbdata *cb_data;
869
870         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
871
872         outdata[0] = dbid;
873         outdata[1] = 0;
874
875         data.dsize = sizeof(outdata);
876         data.dptr  = (uint8_t *)&outdata[0];
877
878         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
879         if (cb_data == NULL) {
880                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
881                 talloc_free(tmp_ctx);
882                 return -1;
883         }
884
885         cb_data->failed = 0;
886         cb_data->pnn    = -1;
887         cb_data->seqnum = 0;
888         
889         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
890         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
891                                         nodes, 0,
892                                         CONTROL_TIMEOUT(), false, data,
893                                         pull_seqnum_cb,
894                                         pull_seqnum_fail_cb,
895                                         cb_data) != 0) {
896                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
897
898                 talloc_free(tmp_ctx);
899                 return -1;
900         }
901
902         if (cb_data->failed != 0) {
903                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
904                 talloc_free(tmp_ctx);
905                 return -1;
906         }
907
908         if (cb_data->pnn == -1) {
909                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
910                 talloc_free(tmp_ctx);
911                 return -1;
912         }
913
914         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
915
916         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
917                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
918                 talloc_free(tmp_ctx);
919                 return -1;
920         }
921
922         talloc_free(tmp_ctx);
923         return 0;
924 }
925
926
927 /*
928   pull all the remote database contents into the recdb
929  */
930 static int pull_remote_database(struct ctdb_context *ctdb,
931                                 struct ctdb_recoverd *rec, 
932                                 struct ctdb_node_map_old *nodemap, 
933                                 struct tdb_wrap *recdb, uint32_t dbid,
934                                 bool persistent)
935 {
936         int j;
937
938         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
939                 int ret;
940                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
941                 if (ret == 0) {
942                         return 0;
943                 }
944         }
945
946         /* pull all records from all other nodes across onto this node
947            (this merges based on rsn)
948         */
949         for (j=0; j<nodemap->num; j++) {
950                 /* don't merge from nodes that are unavailable */
951                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
952                         continue;
953                 }
954                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
955                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
956                                  nodemap->nodes[j].pnn));
957                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
958                         return -1;
959                 }
960         }
961         
962         return 0;
963 }
964
965
966 /*
967   update flags on all active nodes
968  */
969 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, uint32_t pnn, uint32_t flags)
970 {
971         int ret;
972
973         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
974                 if (ret != 0) {
975                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
976                 return -1;
977         }
978
979         return 0;
980 }
981
982 /*
983   ensure all nodes have the same vnnmap we do
984  */
985 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap, 
986                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
987 {
988         int j, ret;
989
990         /* push the new vnn map out to all the nodes */
991         for (j=0; j<nodemap->num; j++) {
992                 /* don't push to nodes that are unavailable */
993                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
994                         continue;
995                 }
996
997                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
998                 if (ret != 0) {
999                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1000                         return -1;
1001                 }
1002         }
1003
1004         return 0;
1005 }
1006
1007
1008 /*
1009   called when a vacuum fetch has completed - just free it and do the next one
1010  */
1011 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
1012 {
1013         talloc_free(state);
1014 }
1015
1016
1017 /**
1018  * Process one elements of the vacuum fetch list:
1019  * Migrate it over to us with the special flag
1020  * CTDB_CALL_FLAG_VACUUM_MIGRATION.
1021  */
1022 static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
1023                                      uint32_t pnn,
1024                                      struct ctdb_rec_data_old *r)
1025 {
1026         struct ctdb_client_call_state *state;
1027         TDB_DATA data;
1028         struct ctdb_ltdb_header *hdr;
1029         struct ctdb_call call;
1030
1031         ZERO_STRUCT(call);
1032         call.call_id = CTDB_NULL_FUNC;
1033         call.flags = CTDB_IMMEDIATE_MIGRATION;
1034         call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
1035
1036         call.key.dptr = &r->data[0];
1037         call.key.dsize = r->keylen;
1038
1039         /* ensure we don't block this daemon - just skip a record if we can't get
1040            the chainlock */
1041         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, call.key) != 0) {
1042                 return true;
1043         }
1044
1045         data = tdb_fetch(ctdb_db->ltdb->tdb, call.key);
1046         if (data.dptr == NULL) {
1047                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1048                 return true;
1049         }
1050
1051         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1052                 free(data.dptr);
1053                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1054                 return true;
1055         }
1056
1057         hdr = (struct ctdb_ltdb_header *)data.dptr;
1058         if (hdr->dmaster == pnn) {
1059                 /* its already local */
1060                 free(data.dptr);
1061                 tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1062                 return true;
1063         }
1064
1065         free(data.dptr);
1066
1067         state = ctdb_call_send(ctdb_db, &call);
1068         tdb_chainunlock(ctdb_db->ltdb->tdb, call.key);
1069         if (state == NULL) {
1070                 DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
1071                 return false;
1072         }
1073         state->async.fn = vacuum_fetch_callback;
1074         state->async.private_data = NULL;
1075
1076         return true;
1077 }
1078
1079
1080 /*
1081   handler for vacuum fetch
1082 */
1083 static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
1084                                  void *private_data)
1085 {
1086         struct ctdb_recoverd *rec = talloc_get_type(
1087                 private_data, struct ctdb_recoverd);
1088         struct ctdb_context *ctdb = rec->ctdb;
1089         struct ctdb_marshall_buffer *recs;
1090         int ret, i;
1091         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1092         const char *name;
1093         struct ctdb_dbid_map_old *dbmap=NULL;
1094         bool persistent = false;
1095         struct ctdb_db_context *ctdb_db;
1096         struct ctdb_rec_data_old *r;
1097
1098         recs = (struct ctdb_marshall_buffer *)data.dptr;
1099
1100         if (recs->count == 0) {
1101                 goto done;
1102         }
1103
1104         /* work out if the database is persistent */
1105         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1106         if (ret != 0) {
1107                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1108                 goto done;
1109         }
1110
1111         for (i=0;i<dbmap->num;i++) {
1112                 if (dbmap->dbs[i].db_id == recs->db_id) {
1113                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1114                         break;
1115                 }
1116         }
1117         if (i == dbmap->num) {
1118                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1119                 goto done;
1120         }
1121
1122         /* find the name of this database */
1123         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1125                 goto done;
1126         }
1127
1128         /* attach to it */
1129         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1130         if (ctdb_db == NULL) {
1131                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1132                 goto done;
1133         }
1134
1135         r = (struct ctdb_rec_data_old *)&recs->data[0];
1136         while (recs->count) {
1137                 bool ok;
1138
1139                 ok = vacuum_fetch_process_one(ctdb_db, rec->ctdb->pnn, r);
1140                 if (!ok) {
1141                         break;
1142                 }
1143
1144                 r = (struct ctdb_rec_data_old *)(r->length + (uint8_t *)r);
1145                 recs->count--;
1146         }
1147
1148 done:
1149         talloc_free(tmp_ctx);
1150 }
1151
1152
1153 /*
1154  * handler for database detach
1155  */
1156 static void detach_database_handler(uint64_t srvid, TDB_DATA data,
1157                                     void *private_data)
1158 {
1159         struct ctdb_recoverd *rec = talloc_get_type(
1160                 private_data, struct ctdb_recoverd);
1161         struct ctdb_context *ctdb = rec->ctdb;
1162         uint32_t db_id;
1163         struct ctdb_db_context *ctdb_db;
1164
1165         if (data.dsize != sizeof(db_id)) {
1166                 return;
1167         }
1168         db_id = *(uint32_t *)data.dptr;
1169
1170         ctdb_db = find_ctdb_db(ctdb, db_id);
1171         if (ctdb_db == NULL) {
1172                 /* database is not attached */
1173                 return;
1174         }
1175
1176         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1177
1178         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1179                              ctdb_db->db_name));
1180         talloc_free(ctdb_db);
1181 }
1182
1183 /*
1184   called when ctdb_wait_timeout should finish
1185  */
1186 static void ctdb_wait_handler(struct tevent_context *ev,
1187                               struct tevent_timer *te,
1188                               struct timeval yt, void *p)
1189 {
1190         uint32_t *timed_out = (uint32_t *)p;
1191         (*timed_out) = 1;
1192 }
1193
1194 /*
1195   wait for a given number of seconds
1196  */
1197 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1198 {
1199         uint32_t timed_out = 0;
1200         time_t usecs = (secs - (time_t)secs) * 1000000;
1201         tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
1202                          ctdb_wait_handler, &timed_out);
1203         while (!timed_out) {
1204                 tevent_loop_once(ctdb->ev);
1205         }
1206 }
1207
1208 /*
1209   called when an election times out (ends)
1210  */
1211 static void ctdb_election_timeout(struct tevent_context *ev,
1212                                   struct tevent_timer *te,
1213                                   struct timeval t, void *p)
1214 {
1215         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1216         rec->election_timeout = NULL;
1217         fast_start = false;
1218
1219         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1220 }
1221
1222
1223 /*
1224   wait for an election to finish. It finished election_timeout seconds after
1225   the last election packet is received
1226  */
1227 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1228 {
1229         struct ctdb_context *ctdb = rec->ctdb;
1230         while (rec->election_timeout) {
1231                 tevent_loop_once(ctdb->ev);
1232         }
1233 }
1234
1235 /*
1236   Update our local flags from all remote connected nodes. 
1237   This is only run when we are or we belive we are the recovery master
1238  */
1239 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap)
1240 {
1241         int j;
1242         struct ctdb_context *ctdb = rec->ctdb;
1243         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1244
1245         /* get the nodemap for all active remote nodes and verify
1246            they are the same as for this node
1247          */
1248         for (j=0; j<nodemap->num; j++) {
1249                 struct ctdb_node_map_old *remote_nodemap=NULL;
1250                 int ret;
1251
1252                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1253                         continue;
1254                 }
1255                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1256                         continue;
1257                 }
1258
1259                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1260                                            mem_ctx, &remote_nodemap);
1261                 if (ret != 0) {
1262                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1263                                   nodemap->nodes[j].pnn));
1264                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1265                         talloc_free(mem_ctx);
1266                         return MONITOR_FAILED;
1267                 }
1268                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1269                         /* We should tell our daemon about this so it
1270                            updates its flags or else we will log the same 
1271                            message again in the next iteration of recovery.
1272                            Since we are the recovery master we can just as
1273                            well update the flags on all nodes.
1274                         */
1275                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1276                         if (ret != 0) {
1277                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1278                                 return -1;
1279                         }
1280
1281                         /* Update our local copy of the flags in the recovery
1282                            daemon.
1283                         */
1284                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1285                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1286                                  nodemap->nodes[j].flags));
1287                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1288                 }
1289                 talloc_free(remote_nodemap);
1290         }
1291         talloc_free(mem_ctx);
1292         return MONITOR_OK;
1293 }
1294
1295
1296 /* Create a new random generation id.
1297    The generation id can not be the INVALID_GENERATION id
1298 */
1299 static uint32_t new_generation(void)
1300 {
1301         uint32_t generation;
1302
1303         while (1) {
1304                 generation = random();
1305
1306                 if (generation != INVALID_GENERATION) {
1307                         break;
1308                 }
1309         }
1310
1311         return generation;
1312 }
1313
1314
1315 /*
1316   create a temporary working database
1317  */
1318 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1319 {
1320         char *name;
1321         struct tdb_wrap *recdb;
1322         unsigned tdb_flags;
1323
1324         /* open up the temporary recovery database */
1325         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1326                                ctdb->db_directory_state,
1327                                ctdb->pnn);
1328         if (name == NULL) {
1329                 return NULL;
1330         }
1331         unlink(name);
1332
1333         tdb_flags = TDB_NOLOCK;
1334         if (ctdb->valgrinding) {
1335                 tdb_flags |= TDB_NOMMAP;
1336         }
1337         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1338
1339         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1340                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1341         if (recdb == NULL) {
1342                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1343         }
1344
1345         talloc_free(name);
1346
1347         return recdb;
1348 }
1349
1350
1351 /* 
1352    a traverse function for pulling all relevant records from recdb
1353  */
1354 struct recdb_data {
1355         struct ctdb_context *ctdb;
1356         struct ctdb_marshall_buffer *recdata;
1357         uint32_t len;
1358         uint32_t allocated_len;
1359         bool failed;
1360         bool persistent;
1361 };
1362
1363 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1364 {
1365         struct recdb_data *params = (struct recdb_data *)p;
1366         struct ctdb_rec_data_old *recdata;
1367         struct ctdb_ltdb_header *hdr;
1368
1369         /*
1370          * skip empty records - but NOT for persistent databases:
1371          *
1372          * The record-by-record mode of recovery deletes empty records.
1373          * For persistent databases, this can lead to data corruption
1374          * by deleting records that should be there:
1375          *
1376          * - Assume the cluster has been running for a while.
1377          *
1378          * - A record R in a persistent database has been created and
1379          *   deleted a couple of times, the last operation being deletion,
1380          *   leaving an empty record with a high RSN, say 10.
1381          *
1382          * - Now a node N is turned off.
1383          *
1384          * - This leaves the local database copy of D on N with the empty
1385          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1386          *   the copy of record R.
1387          *
1388          * - Now the record is created again while node N is turned off.
1389          *   This creates R with RSN = 1 on all nodes except for N.
1390          *
1391          * - Now node N is turned on again. The following recovery will chose
1392          *   the older empty copy of R due to RSN 10 > RSN 1.
1393          *
1394          * ==> Hence the record is gone after the recovery.
1395          *
1396          * On databases like Samba's registry, this can damage the higher-level
1397          * data structures built from the various tdb-level records.
1398          */
1399         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1400                 return 0;
1401         }
1402
1403         /* update the dmaster field to point to us */
1404         hdr = (struct ctdb_ltdb_header *)data.dptr;
1405         if (!params->persistent) {
1406                 hdr->dmaster = params->ctdb->pnn;
1407                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1408         }
1409
1410         /* add the record to the blob ready to send to the nodes */
1411         recdata = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1412         if (recdata == NULL) {
1413                 params->failed = true;
1414                 return -1;
1415         }
1416         if (params->len + recdata->length >= params->allocated_len) {
1417                 params->allocated_len = recdata->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1418                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1419         }
1420         if (params->recdata == NULL) {
1421                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1422                          recdata->length + params->len));
1423                 params->failed = true;
1424                 return -1;
1425         }
1426         params->recdata->count++;
1427         memcpy(params->len+(uint8_t *)params->recdata, recdata, recdata->length);
1428         params->len += recdata->length;
1429         talloc_free(recdata);
1430
1431         return 0;
1432 }
1433
1434 /*
1435   push the recdb database out to all nodes
1436  */
1437 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1438                                bool persistent,
1439                                struct tdb_wrap *recdb, struct ctdb_node_map_old *nodemap)
1440 {
1441         struct recdb_data params;
1442         struct ctdb_marshall_buffer *recdata;
1443         TDB_DATA outdata;
1444         TALLOC_CTX *tmp_ctx;
1445         uint32_t *nodes;
1446
1447         tmp_ctx = talloc_new(ctdb);
1448         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1449
1450         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1451         CTDB_NO_MEMORY(ctdb, recdata);
1452
1453         recdata->db_id = dbid;
1454
1455         params.ctdb = ctdb;
1456         params.recdata = recdata;
1457         params.len = offsetof(struct ctdb_marshall_buffer, data);
1458         params.allocated_len = params.len;
1459         params.failed = false;
1460         params.persistent = persistent;
1461
1462         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1463                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1464                 talloc_free(params.recdata);
1465                 talloc_free(tmp_ctx);
1466                 return -1;
1467         }
1468
1469         if (params.failed) {
1470                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1471                 talloc_free(params.recdata);
1472                 talloc_free(tmp_ctx);
1473                 return -1;              
1474         }
1475
1476         recdata = params.recdata;
1477
1478         outdata.dptr = (void *)recdata;
1479         outdata.dsize = params.len;
1480
1481         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1482         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1483                                         nodes, 0,
1484                                         CONTROL_TIMEOUT(), false, outdata,
1485                                         NULL, NULL,
1486                                         NULL) != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1488                 talloc_free(recdata);
1489                 talloc_free(tmp_ctx);
1490                 return -1;
1491         }
1492
1493         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1494                   dbid, recdata->count));
1495
1496         talloc_free(recdata);
1497         talloc_free(tmp_ctx);
1498
1499         return 0;
1500 }
1501
1502
1503 /*
1504   go through a full recovery on one database 
1505  */
1506 static int recover_database(struct ctdb_recoverd *rec, 
1507                             TALLOC_CTX *mem_ctx,
1508                             uint32_t dbid,
1509                             bool persistent,
1510                             uint32_t pnn, 
1511                             struct ctdb_node_map_old *nodemap,
1512                             uint32_t transaction_id)
1513 {
1514         struct tdb_wrap *recdb;
1515         int ret;
1516         struct ctdb_context *ctdb = rec->ctdb;
1517         TDB_DATA data;
1518         struct ctdb_transdb w;
1519         uint32_t *nodes;
1520
1521         recdb = create_recdb(ctdb, mem_ctx);
1522         if (recdb == NULL) {
1523                 return -1;
1524         }
1525
1526         /* pull all remote databases onto the recdb */
1527         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1528         if (ret != 0) {
1529                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1530                 return -1;
1531         }
1532
1533         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1534
1535         /* wipe all the remote databases. This is safe as we are in a transaction */
1536         w.db_id = dbid;
1537         w.tid = transaction_id;
1538
1539         data.dptr = (void *)&w;
1540         data.dsize = sizeof(w);
1541
1542         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1543         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1544                                         nodes, 0,
1545                                         CONTROL_TIMEOUT(), false, data,
1546                                         NULL, NULL,
1547                                         NULL) != 0) {
1548                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1549                 talloc_free(recdb);
1550                 return -1;
1551         }
1552         
1553         /* push out the correct database. This sets the dmaster and skips 
1554            the empty records */
1555         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1556         if (ret != 0) {
1557                 talloc_free(recdb);
1558                 return -1;
1559         }
1560
1561         /* all done with this database */
1562         talloc_free(recdb);
1563
1564         return 0;
1565 }
1566
1567 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1568                                          struct ctdb_recoverd *rec,
1569                                          struct ctdb_node_map_old *nodemap,
1570                                          uint32_t *culprit)
1571 {
1572         int j;
1573         int ret;
1574
1575         if (ctdb->num_nodes != nodemap->num) {
1576                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1577                                   ctdb->num_nodes, nodemap->num));
1578                 if (culprit) {
1579                         *culprit = ctdb->pnn;
1580                 }
1581                 return -1;
1582         }
1583
1584         for (j=0; j<nodemap->num; j++) {
1585                 /* For readability */
1586                 struct ctdb_node *node = ctdb->nodes[j];
1587
1588                 /* release any existing data */
1589                 TALLOC_FREE(node->known_public_ips);
1590                 TALLOC_FREE(node->available_public_ips);
1591
1592                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1593                         continue;
1594                 }
1595
1596                 /* Retrieve the list of known public IPs from the node */
1597                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1598                                         CONTROL_TIMEOUT(),
1599                                         node->pnn,
1600                                         ctdb->nodes,
1601                                         0,
1602                                         &node->known_public_ips);
1603                 if (ret != 0) {
1604                         DEBUG(DEBUG_ERR,
1605                               ("Failed to read known public IPs from node: %u\n",
1606                                node->pnn));
1607                         if (culprit) {
1608                                 *culprit = node->pnn;
1609                         }
1610                         return -1;
1611                 }
1612
1613                 if (ctdb->do_checkpublicip &&
1614                     !ctdb_op_is_disabled(rec->takeover_run) &&
1615                     verify_remote_ip_allocation(ctdb,
1616                                                  node->known_public_ips,
1617                                                  node->pnn)) {
1618                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1619                         rec->need_takeover_run = true;
1620                 }
1621
1622                 /* Retrieve the list of available public IPs from the node */
1623                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1624                                         CONTROL_TIMEOUT(),
1625                                         node->pnn,
1626                                         ctdb->nodes,
1627                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1628                                         &node->available_public_ips);
1629                 if (ret != 0) {
1630                         DEBUG(DEBUG_ERR,
1631                               ("Failed to read available public IPs from node: %u\n",
1632                                node->pnn));
1633                         if (culprit) {
1634                                 *culprit = node->pnn;
1635                         }
1636                         return -1;
1637                 }
1638         }
1639
1640         return 0;
1641 }
1642
1643 /* when we start a recovery, make sure all nodes use the same reclock file
1644    setting
1645 */
1646 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1647 {
1648         struct ctdb_context *ctdb = rec->ctdb;
1649         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1650         TDB_DATA data;
1651         uint32_t *nodes;
1652
1653         if (ctdb->recovery_lock_file == NULL) {
1654                 data.dptr  = NULL;
1655                 data.dsize = 0;
1656         } else {
1657                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1658                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1659         }
1660
1661         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1662         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1663                                         nodes, 0,
1664                                         CONTROL_TIMEOUT(),
1665                                         false, data,
1666                                         NULL, NULL,
1667                                         rec) != 0) {
1668                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1669                 talloc_free(tmp_ctx);
1670                 return -1;
1671         }
1672
1673         talloc_free(tmp_ctx);
1674         return 0;
1675 }
1676
1677
1678 /*
1679  * this callback is called for every node that failed to execute ctdb_takeover_run()
1680  * and set flag to re-run takeover run.
1681  */
1682 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1683 {
1684         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1685
1686         if (callback_data != NULL) {
1687                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1688
1689                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1690
1691                 ctdb_set_culprit(rec, node_pnn);
1692         }
1693 }
1694
1695
1696 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1697 {
1698         struct ctdb_context *ctdb = rec->ctdb;
1699         int i;
1700         struct ctdb_banning_state *ban_state;
1701
1702         *self_ban = false;
1703         for (i=0; i<ctdb->num_nodes; i++) {
1704                 if (ctdb->nodes[i]->ban_state == NULL) {
1705                         continue;
1706                 }
1707                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1708                 if (ban_state->count < 2*ctdb->num_nodes) {
1709                         continue;
1710                 }
1711
1712                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1713                         ctdb->nodes[i]->pnn, ban_state->count,
1714                         ctdb->tunable.recovery_ban_period));
1715                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1716                 ban_state->count = 0;
1717
1718                 /* Banning ourself? */
1719                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1720                         *self_ban = true;
1721                 }
1722         }
1723 }
1724
1725 static bool do_takeover_run(struct ctdb_recoverd *rec,
1726                             struct ctdb_node_map_old *nodemap,
1727                             bool banning_credits_on_fail)
1728 {
1729         uint32_t *nodes = NULL;
1730         struct ctdb_disable_message dtr;
1731         TDB_DATA data;
1732         int i;
1733         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1734         int ret;
1735         bool ok;
1736
1737         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1738
1739         if (ctdb_op_is_in_progress(rec->takeover_run)) {
1740                 DEBUG(DEBUG_ERR, (__location__
1741                                   " takeover run already in progress \n"));
1742                 ok = false;
1743                 goto done;
1744         }
1745
1746         if (!ctdb_op_begin(rec->takeover_run)) {
1747                 ok = false;
1748                 goto done;
1749         }
1750
1751         /* Disable IP checks (takeover runs, really) on other nodes
1752          * while doing this takeover run.  This will stop those other
1753          * nodes from triggering takeover runs when think they should
1754          * be hosting an IP but it isn't yet on an interface.  Don't
1755          * wait for replies since a failure here might cause some
1756          * noise in the logs but will not actually cause a problem.
1757          */
1758         dtr.srvid = 0; /* No reply */
1759         dtr.pnn = -1;
1760
1761         data.dptr  = (uint8_t*)&dtr;
1762         data.dsize = sizeof(dtr);
1763
1764         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1765
1766         /* Disable for 60 seconds.  This can be a tunable later if
1767          * necessary.
1768          */
1769         dtr.timeout = 60;
1770         for (i = 0; i < talloc_array_length(nodes); i++) {
1771                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1772                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1773                                              data) != 0) {
1774                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1775                 }
1776         }
1777
1778         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1779                                 rec->force_rebalance_nodes,
1780                                 takeover_fail_callback,
1781                                 banning_credits_on_fail ? rec : NULL);
1782
1783         /* Reenable takeover runs and IP checks on other nodes */
1784         dtr.timeout = 0;
1785         for (i = 0; i < talloc_array_length(nodes); i++) {
1786                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1787                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1788                                              data) != 0) {
1789                         DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
1790                 }
1791         }
1792
1793         if (ret != 0) {
1794                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1795                 ok = false;
1796                 goto done;
1797         }
1798
1799         ok = true;
1800         /* Takeover run was successful so clear force rebalance targets */
1801         if (rebalance_nodes == rec->force_rebalance_nodes) {
1802                 TALLOC_FREE(rec->force_rebalance_nodes);
1803         } else {
1804                 DEBUG(DEBUG_WARNING,
1805                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1806         }
1807 done:
1808         rec->need_takeover_run = !ok;
1809         talloc_free(nodes);
1810         ctdb_op_end(rec->takeover_run);
1811
1812         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1813         return ok;
1814 }
1815
1816 struct recovery_helper_state {
1817         int fd[2];
1818         pid_t pid;
1819         int result;
1820         bool done;
1821 };
1822
1823 static void ctdb_recovery_handler(struct tevent_context *ev,
1824                                   struct tevent_fd *fde,
1825                                   uint16_t flags, void *private_data)
1826 {
1827         struct recovery_helper_state *state = talloc_get_type_abort(
1828                 private_data, struct recovery_helper_state);
1829         int ret;
1830
1831         ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
1832         if (ret != sizeof(state->result)) {
1833                 state->result = EPIPE;
1834         }
1835
1836         state->done = true;
1837 }
1838
1839
1840 static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
1841 {
1842         static char prog[PATH_MAX+1] = "";
1843         const char **args;
1844         struct recovery_helper_state *state;
1845         struct tevent_fd *fde;
1846         int nargs, ret;
1847
1848         if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
1849                              "CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
1850                              "ctdb_recovery_helper")) {
1851                 ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
1852         }
1853
1854         state = talloc_zero(mem_ctx, struct recovery_helper_state);
1855         if (state == NULL) {
1856                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1857                 return -1;
1858         }
1859
1860         state->pid = -1;
1861
1862         ret = pipe(state->fd);
1863         if (ret != 0) {
1864                 DEBUG(DEBUG_ERR,
1865                       ("Failed to create pipe for recovery helper\n"));
1866                 goto fail;
1867         }
1868
1869         set_close_on_exec(state->fd[0]);
1870
1871         nargs = 4;
1872         args = talloc_array(state, const char *, nargs);
1873         if (args == NULL) {
1874                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1875                 goto fail;
1876         }
1877
1878         args[0] = talloc_asprintf(args, "%d", state->fd[1]);
1879         args[1] = rec->ctdb->daemon.name;
1880         args[2] = talloc_asprintf(args, "%u", new_generation());
1881         args[3] = NULL;
1882
1883         if (args[0] == NULL || args[2] == NULL) {
1884                 DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
1885                 goto fail;
1886         }
1887
1888         if (!ctdb_vfork_with_logging(state, rec->ctdb, "recovery", prog, nargs,
1889                                      args, NULL, NULL, &state->pid)) {
1890                 DEBUG(DEBUG_ERR,
1891                       ("Failed to create child for recovery helper\n"));
1892                 goto fail;
1893         }
1894
1895         close(state->fd[1]);
1896         state->fd[1] = -1;
1897
1898         state->done = false;
1899
1900         fde = tevent_add_fd(rec->ctdb->ev, rec->ctdb, state->fd[0],
1901                             TEVENT_FD_READ, ctdb_recovery_handler, state);
1902         if (fde == NULL) {
1903                 goto fail;
1904         }
1905         tevent_fd_set_auto_close(fde);
1906
1907         while (!state->done) {
1908                 tevent_loop_once(rec->ctdb->ev);
1909         }
1910
1911         close(state->fd[0]);
1912         state->fd[0] = -1;
1913
1914         if (state->result != 0) {
1915                 goto fail;
1916         }
1917
1918         ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1919         talloc_free(state);
1920         return 0;
1921
1922 fail:
1923         if (state->fd[0] != -1) {
1924                 close(state->fd[0]);
1925         }
1926         if (state->fd[1] != -1) {
1927                 close(state->fd[1]);
1928         }
1929         if (state->pid != -1) {
1930                 ctdb_kill(rec->ctdb, state->pid, SIGKILL);
1931         }
1932         talloc_free(state);
1933         return -1;
1934 }
1935
1936 static int db_recovery_serial(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
1937                               uint32_t pnn, struct ctdb_node_map_old *nodemap,
1938                               struct ctdb_vnn_map *vnnmap,
1939                               struct ctdb_dbid_map_old *dbmap)
1940 {
1941         struct ctdb_context *ctdb = rec->ctdb;
1942         uint32_t generation;
1943         TDB_DATA data;
1944         uint32_t *nodes;
1945         int ret, i, j;
1946
1947         /* set recovery mode to active on all nodes */
1948         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, true);
1949         if (ret != 0) {
1950                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1951                 return -1;
1952         }
1953
1954         /* execute the "startrecovery" event script on all nodes */
1955         ret = run_startrecovery_eventscript(rec, nodemap);
1956         if (ret!=0) {
1957                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1958                 return -1;
1959         }
1960
1961         /* pick a new generation number */
1962         generation = new_generation();
1963
1964         /* change the vnnmap on this node to use the new generation 
1965            number but not on any other nodes.
1966            this guarantees that if we abort the recovery prematurely
1967            for some reason (a node stops responding?)
1968            that we can just return immediately and we will reenter
1969            recovery shortly again.
1970            I.e. we deliberately leave the cluster with an inconsistent
1971            generation id to allow us to abort recovery at any stage and
1972            just restart it from scratch.
1973          */
1974         vnnmap->generation = generation;
1975         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1976         if (ret != 0) {
1977                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1978                 return -1;
1979         }
1980
1981         /* Database generations are updated when the transaction is commited to
1982          * the databases.  So make sure to use the final generation as the
1983          * transaction id
1984          */
1985         generation = new_generation();
1986
1987         data.dptr = (void *)&generation;
1988         data.dsize = sizeof(uint32_t);
1989
1990         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1991         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1992                                         nodes, 0,
1993                                         CONTROL_TIMEOUT(), false, data,
1994                                         NULL,
1995                                         transaction_start_fail_callback,
1996                                         rec) != 0) {
1997                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1998                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1999                                         nodes, 0,
2000                                         CONTROL_TIMEOUT(), false, tdb_null,
2001                                         NULL,
2002                                         NULL,
2003                                         NULL) != 0) {
2004                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
2005                 }
2006                 return -1;
2007         }
2008
2009         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
2010
2011         for (i=0;i<dbmap->num;i++) {
2012                 ret = recover_database(rec, mem_ctx,
2013                                        dbmap->dbs[i].db_id,
2014                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
2015                                        pnn, nodemap, generation);
2016                 if (ret != 0) {
2017                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].db_id));
2018                         return -1;
2019                 }
2020         }
2021
2022         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
2023
2024         /* commit all the changes */
2025         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
2026                                         nodes, 0,
2027                                         CONTROL_TIMEOUT(), false, data,
2028                                         NULL, NULL,
2029                                         NULL) != 0) {
2030                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
2031                 return -1;
2032         }
2033
2034         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
2035
2036         /* build a new vnn map with all the currently active and
2037            unbanned nodes */
2038         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
2039         CTDB_NO_MEMORY(ctdb, vnnmap);
2040         vnnmap->generation = generation;
2041         vnnmap->size = 0;
2042         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
2043         CTDB_NO_MEMORY(ctdb, vnnmap->map);
2044         for (i=j=0;i<nodemap->num;i++) {
2045                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2046                         continue;
2047                 }
2048                 if (!ctdb_node_has_capabilities(rec->caps,
2049                                                 ctdb->nodes[i]->pnn,
2050                                                 CTDB_CAP_LMASTER)) {
2051                         /* this node can not be an lmaster */
2052                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2053                         continue;
2054                 }
2055
2056                 vnnmap->size++;
2057                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2058                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2059                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2060
2061         }
2062         if (vnnmap->size == 0) {
2063                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2064                 vnnmap->size++;
2065                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2066                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2067                 vnnmap->map[0] = pnn;
2068         }
2069
2070         /* update to the new vnnmap on all nodes */
2071         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2072         if (ret != 0) {
2073                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2074                 return -1;
2075         }
2076
2077         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2078
2079         /* update recmaster to point to us for all nodes */
2080         ret = set_recovery_master(ctdb, nodemap, pnn);
2081         if (ret!=0) {
2082                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2083                 return -1;
2084         }
2085
2086         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2087
2088         /* disable recovery mode */
2089         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL, false);
2090         if (ret != 0) {
2091                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2092                 return -1;
2093         }
2094
2095         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2096
2097         return 0;
2098 }
2099
2100 /*
2101   we are the recmaster, and recovery is needed - start a recovery run
2102  */
2103 static int do_recovery(struct ctdb_recoverd *rec,
2104                        TALLOC_CTX *mem_ctx, uint32_t pnn,
2105                        struct ctdb_node_map_old *nodemap, struct ctdb_vnn_map *vnnmap)
2106 {
2107         struct ctdb_context *ctdb = rec->ctdb;
2108         int i, ret;
2109         struct ctdb_dbid_map_old *dbmap;
2110         struct timeval start_time;
2111         uint32_t culprit = (uint32_t)-1;
2112         bool self_ban;
2113         bool par_recovery;
2114
2115         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
2116
2117         /* Check if the current node is still the recmaster.  It's possible that
2118          * re-election has changed the recmaster, but we have not yet updated
2119          * that information.
2120          */
2121         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2122                                      pnn, &ctdb->recovery_master);
2123         if (ret != 0) {
2124                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster\n"));
2125                 return -1;
2126         }
2127
2128         if (pnn != ctdb->recovery_master) {
2129                 DEBUG(DEBUG_NOTICE,
2130                       ("Recovery master changed to %u, aborting recovery\n",
2131                        ctdb->recovery_master));
2132                 return -1;
2133         }
2134
2135         /* if recovery fails, force it again */
2136         rec->need_recovery = true;
2137
2138         if (!ctdb_op_begin(rec->recovery)) {
2139                 return -1;
2140         }
2141
2142         if (rec->election_timeout) {
2143                 /* an election is in progress */
2144                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
2145                 goto fail;
2146         }
2147
2148         ban_misbehaving_nodes(rec, &self_ban);
2149         if (self_ban) {
2150                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
2151                 goto fail;
2152         }
2153
2154         if (ctdb->recovery_lock_file != NULL) {
2155                 if (ctdb_recovery_have_lock(ctdb)) {
2156                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
2157                 } else {
2158                         start_time = timeval_current();
2159                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
2160                                              ctdb->recovery_lock_file));
2161                         if (!ctdb_recovery_lock(ctdb)) {
2162                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
2163                                         /* If ctdb is trying first recovery, it's
2164                                          * possible that current node does not know
2165                                          * yet who the recmaster is.
2166                                          */
2167                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
2168                                                           " - retrying recovery\n"));
2169                                         goto fail;
2170                                 }
2171
2172                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
2173                                                  "and ban ourself for %u seconds\n",
2174                                                  ctdb->tunable.recovery_ban_period));
2175                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
2176                                 goto fail;
2177                         }
2178                         ctdb_ctrl_report_recd_lock_latency(ctdb,
2179                                                            CONTROL_TIMEOUT(),
2180                                                            timeval_elapsed(&start_time));
2181                         DEBUG(DEBUG_NOTICE,
2182                               ("Recovery lock taken successfully by recovery daemon\n"));
2183                 }
2184         }
2185
2186         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
2187
2188         /* get a list of all databases */
2189         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
2190         if (ret != 0) {
2191                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
2192                 goto fail;
2193         }
2194
2195         /* we do the db creation before we set the recovery mode, so the freeze happens
2196            on all databases we will be dealing with. */
2197
2198         /* verify that we have all the databases any other node has */
2199         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
2200         if (ret != 0) {
2201                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
2202                 goto fail;
2203         }
2204
2205         /* verify that all other nodes have all our databases */
2206         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
2207         if (ret != 0) {
2208                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
2209                 goto fail;
2210         }
2211         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
2212
2213         /* update the database priority for all remote databases */
2214         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
2215         if (ret != 0) {
2216                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
2217         }
2218         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
2219
2220
2221         /* update all other nodes to use the same setting for reclock files
2222            as the local recovery master.
2223         */
2224         sync_recovery_lock_file_across_cluster(rec);
2225
2226         /* update the capabilities for all nodes */
2227         ret = update_capabilities(rec, nodemap);
2228         if (ret!=0) {
2229                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
2230                 return -1;
2231         }
2232
2233         /*
2234           update all nodes to have the same flags that we have
2235          */
2236         for (i=0;i<nodemap->num;i++) {
2237                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2238                         continue;
2239                 }
2240
2241                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
2242                 if (ret != 0) {
2243                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2244                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
2245                         } else {
2246                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
2247                                 return -1;
2248                         }
2249                 }
2250         }
2251
2252         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
2253
2254         /* Check if all participating nodes have parallel recovery capability */
2255         par_recovery = true;
2256         for (i=0; i<nodemap->num; i++) {
2257                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2258                         continue;
2259                 }
2260
2261                 if (!(rec->caps[i].capabilities &
2262                       CTDB_CAP_PARALLEL_RECOVERY)) {
2263                         par_recovery = false;
2264                         break;
2265                 }
2266         }
2267
2268         if (par_recovery) {
2269                 ret = db_recovery_parallel(rec, mem_ctx);
2270         } else {
2271                 ret = db_recovery_serial(rec, mem_ctx, pnn, nodemap, vnnmap,
2272                                          dbmap);
2273         }
2274
2275         if (ret != 0) {
2276                 goto fail;
2277         }
2278
2279         /* Fetch known/available public IPs from each active node */
2280         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2281         if (ret != 0) {
2282                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2283                                  culprit));
2284                 rec->need_takeover_run = true;
2285                 goto fail;
2286         }
2287
2288         do_takeover_run(rec, nodemap, false);
2289
2290         /* execute the "recovered" event script on all nodes */
2291         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2292         if (ret!=0) {
2293                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2294                 goto fail;
2295         }
2296
2297         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2298
2299         /* send a message to all clients telling them that the cluster 
2300            has been reconfigured */
2301         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2302                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2303         if (ret != 0) {
2304                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2305                 goto fail;
2306         }
2307
2308         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2309
2310         rec->need_recovery = false;
2311         ctdb_op_end(rec->recovery);
2312
2313         /* we managed to complete a full recovery, make sure to forgive
2314            any past sins by the nodes that could now participate in the
2315            recovery.
2316         */
2317         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2318         for (i=0;i<nodemap->num;i++) {
2319                 struct ctdb_banning_state *ban_state;
2320
2321                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2322                         continue;
2323                 }
2324
2325                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2326                 if (ban_state == NULL) {
2327                         continue;
2328                 }
2329
2330                 ban_state->count = 0;
2331         }
2332
2333         /* We just finished a recovery successfully.
2334            We now wait for rerecovery_timeout before we allow
2335            another recovery to take place.
2336         */
2337         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2338         ctdb_op_disable(rec->recovery, ctdb->ev,
2339                         ctdb->tunable.rerecovery_timeout);
2340         return 0;
2341
2342 fail:
2343         ctdb_op_end(rec->recovery);
2344         return -1;
2345 }
2346
2347
2348 /*
2349   elections are won by first checking the number of connected nodes, then
2350   the priority time, then the pnn
2351  */
2352 struct election_message {
2353         uint32_t num_connected;
2354         struct timeval priority_time;
2355         uint32_t pnn;
2356         uint32_t node_flags;
2357 };
2358
2359 /*
2360   form this nodes election data
2361  */
2362 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2363 {
2364         int ret, i;
2365         struct ctdb_node_map_old *nodemap;
2366         struct ctdb_context *ctdb = rec->ctdb;
2367
2368         ZERO_STRUCTP(em);
2369
2370         em->pnn = rec->ctdb->pnn;
2371         em->priority_time = rec->priority_time;
2372
2373         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2374         if (ret != 0) {
2375                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2376                 return;
2377         }
2378
2379         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2380         em->node_flags = rec->node_flags;
2381
2382         for (i=0;i<nodemap->num;i++) {
2383                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2384                         em->num_connected++;
2385                 }
2386         }
2387
2388         /* we shouldnt try to win this election if we cant be a recmaster */
2389         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2390                 em->num_connected = 0;
2391                 em->priority_time = timeval_current();
2392         }
2393
2394         talloc_free(nodemap);
2395 }
2396
2397 /*
2398   see if the given election data wins
2399  */
2400 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2401 {
2402         struct election_message myem;
2403         int cmp = 0;
2404
2405         ctdb_election_data(rec, &myem);
2406
2407         /* we cant win if we don't have the recmaster capability */
2408         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2409                 return false;
2410         }
2411
2412         /* we cant win if we are banned */
2413         if (rec->node_flags & NODE_FLAGS_BANNED) {
2414                 return false;
2415         }
2416
2417         /* we cant win if we are stopped */
2418         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2419                 return false;
2420         }
2421
2422         /* we will automatically win if the other node is banned */
2423         if (em->node_flags & NODE_FLAGS_BANNED) {
2424                 return true;
2425         }
2426
2427         /* we will automatically win if the other node is banned */
2428         if (em->node_flags & NODE_FLAGS_STOPPED) {
2429                 return true;
2430         }
2431
2432         /* then the longest running node */
2433         if (cmp == 0) {
2434                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2435         }
2436
2437         if (cmp == 0) {
2438                 cmp = (int)myem.pnn - (int)em->pnn;
2439         }
2440
2441         return cmp > 0;
2442 }
2443
2444 /*
2445   send out an election request
2446  */
2447 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2448 {
2449         int ret;
2450         TDB_DATA election_data;
2451         struct election_message emsg;
2452         uint64_t srvid;
2453         struct ctdb_context *ctdb = rec->ctdb;
2454
2455         srvid = CTDB_SRVID_ELECTION;
2456
2457         ctdb_election_data(rec, &emsg);
2458
2459         election_data.dsize = sizeof(struct election_message);
2460         election_data.dptr  = (unsigned char *)&emsg;
2461
2462
2463         /* first we assume we will win the election and set 
2464            recoverymaster to be ourself on the current node
2465          */
2466         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2467         if (ret != 0) {
2468                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2469                 return -1;
2470         }
2471
2472
2473         /* send an election message to all active nodes */
2474         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2475         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2476 }
2477
2478 /*
2479   this function will unban all nodes in the cluster
2480 */
2481 static void unban_all_nodes(struct ctdb_context *ctdb)
2482 {
2483         int ret, i;
2484         struct ctdb_node_map_old *nodemap;
2485         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2486         
2487         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2488         if (ret != 0) {
2489                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2490                 return;
2491         }
2492
2493         for (i=0;i<nodemap->num;i++) {
2494                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2495                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2496                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2497                                                  nodemap->nodes[i].pnn, 0,
2498                                                  NODE_FLAGS_BANNED);
2499                         if (ret != 0) {
2500                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2501                         }
2502                 }
2503         }
2504
2505         talloc_free(tmp_ctx);
2506 }
2507
2508
2509 /*
2510   we think we are winning the election - send a broadcast election request
2511  */
2512 static void election_send_request(struct tevent_context *ev,
2513                                   struct tevent_timer *te,
2514                                   struct timeval t, void *p)
2515 {
2516         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2517         int ret;
2518
2519         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2520         if (ret != 0) {
2521                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2522         }
2523
2524         TALLOC_FREE(rec->send_election_te);
2525 }
2526
2527 /*
2528   handler for memory dumps
2529 */
2530 static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2531 {
2532         struct ctdb_recoverd *rec = talloc_get_type(
2533                 private_data, struct ctdb_recoverd);
2534         struct ctdb_context *ctdb = rec->ctdb;
2535         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2536         TDB_DATA *dump;
2537         int ret;
2538         struct ctdb_srvid_message *rd;
2539
2540         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2541                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2542                 talloc_free(tmp_ctx);
2543                 return;
2544         }
2545         rd = (struct ctdb_srvid_message *)data.dptr;
2546
2547         dump = talloc_zero(tmp_ctx, TDB_DATA);
2548         if (dump == NULL) {
2549                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2550                 talloc_free(tmp_ctx);
2551                 return;
2552         }
2553         ret = ctdb_dump_memory(ctdb, dump);
2554         if (ret != 0) {
2555                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2556                 talloc_free(tmp_ctx);
2557                 return;
2558         }
2559
2560 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2561
2562         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2563         if (ret != 0) {
2564                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2565                 talloc_free(tmp_ctx);
2566                 return;
2567         }
2568
2569         talloc_free(tmp_ctx);
2570 }
2571
2572 /*
2573   handler for reload_nodes
2574 */
2575 static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
2576                                  void *private_data)
2577 {
2578         struct ctdb_recoverd *rec = talloc_get_type(
2579                 private_data, struct ctdb_recoverd);
2580
2581         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2582
2583         ctdb_load_nodes_file(rec->ctdb);
2584 }
2585
2586
2587 static void ctdb_rebalance_timeout(struct tevent_context *ev,
2588                                    struct tevent_timer *te,
2589                                    struct timeval t, void *p)
2590 {
2591         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2592
2593         if (rec->force_rebalance_nodes == NULL) {
2594                 DEBUG(DEBUG_ERR,
2595                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2596                 return;
2597         }
2598
2599         DEBUG(DEBUG_NOTICE,
2600               ("Rebalance timeout occurred - do takeover run\n"));
2601         do_takeover_run(rec, rec->nodemap, false);
2602 }
2603
2604
2605 static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
2606                                         void *private_data)
2607 {
2608         struct ctdb_recoverd *rec = talloc_get_type(
2609                 private_data, struct ctdb_recoverd);
2610         struct ctdb_context *ctdb = rec->ctdb;
2611         uint32_t pnn;
2612         uint32_t *t;
2613         int len;
2614         uint32_t deferred_rebalance;
2615
2616         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2617                 return;
2618         }
2619
2620         if (data.dsize != sizeof(uint32_t)) {
2621                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2622                 return;
2623         }
2624
2625         pnn = *(uint32_t *)&data.dptr[0];
2626
2627         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2628
2629         /* Copy any existing list of nodes.  There's probably some
2630          * sort of realloc variant that will do this but we need to
2631          * make sure that freeing the old array also cancels the timer
2632          * event for the timeout... not sure if realloc will do that.
2633          */
2634         len = (rec->force_rebalance_nodes != NULL) ?
2635                 talloc_array_length(rec->force_rebalance_nodes) :
2636                 0;
2637
2638         /* This allows duplicates to be added but they don't cause
2639          * harm.  A call to add a duplicate PNN arguably means that
2640          * the timeout should be reset, so this is the simplest
2641          * solution.
2642          */
2643         t = talloc_zero_array(rec, uint32_t, len+1);
2644         CTDB_NO_MEMORY_VOID(ctdb, t);
2645         if (len > 0) {
2646                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2647         }
2648         t[len] = pnn;
2649
2650         talloc_free(rec->force_rebalance_nodes);
2651
2652         rec->force_rebalance_nodes = t;
2653
2654         /* If configured, setup a deferred takeover run to make sure
2655          * that certain nodes get IPs rebalanced to them.  This will
2656          * be cancelled if a successful takeover run happens before
2657          * the timeout.  Assign tunable value to variable for
2658          * readability.
2659          */
2660         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2661         if (deferred_rebalance != 0) {
2662                 tevent_add_timer(ctdb->ev, rec->force_rebalance_nodes,
2663                                  timeval_current_ofs(deferred_rebalance, 0),
2664                                  ctdb_rebalance_timeout, rec);
2665         }
2666 }
2667
2668
2669
2670 static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
2671                                    void *private_data)
2672 {
2673         struct ctdb_recoverd *rec = talloc_get_type(
2674                 private_data, struct ctdb_recoverd);
2675         struct ctdb_public_ip *ip;
2676
2677         if (rec->recmaster != rec->ctdb->pnn) {
2678                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2679                 return;
2680         }
2681
2682         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2683                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2684                 return;
2685         }
2686
2687         ip = (struct ctdb_public_ip *)data.dptr;
2688
2689         update_ip_assignment_tree(rec->ctdb, ip);
2690 }
2691
2692 static void srvid_disable_and_reply(struct ctdb_context *ctdb,
2693                                     TDB_DATA data,
2694                                     struct ctdb_op_state *op_state)
2695 {
2696         struct ctdb_disable_message *r;
2697         uint32_t timeout;
2698         TDB_DATA result;
2699         int32_t ret = 0;
2700
2701         /* Validate input data */
2702         if (data.dsize != sizeof(struct ctdb_disable_message)) {
2703                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2704                                  "expecting %lu\n", (long unsigned)data.dsize,
2705                                  (long unsigned)sizeof(struct ctdb_srvid_message)));
2706                 return;
2707         }
2708         if (data.dptr == NULL) {
2709                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2710                 return;
2711         }
2712
2713         r = (struct ctdb_disable_message *)data.dptr;
2714         timeout = r->timeout;
2715
2716         ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
2717         if (ret != 0) {
2718                 goto done;
2719         }
2720
2721         /* Returning our PNN tells the caller that we succeeded */
2722         ret = ctdb_get_pnn(ctdb);
2723 done:
2724         result.dsize = sizeof(int32_t);
2725         result.dptr  = (uint8_t *)&ret;
2726         srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
2727 }
2728
2729 static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
2730                                           void *private_data)
2731 {
2732         struct ctdb_recoverd *rec = talloc_get_type(
2733                 private_data, struct ctdb_recoverd);
2734
2735         srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
2736 }
2737
2738 /* Backward compatibility for this SRVID */
2739 static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
2740                                      void *private_data)
2741 {
2742         struct ctdb_recoverd *rec = talloc_get_type(
2743                 private_data, struct ctdb_recoverd);
2744         uint32_t timeout;
2745
2746         if (data.dsize != sizeof(uint32_t)) {
2747                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2748                                  "expecting %lu\n", (long unsigned)data.dsize,
2749                                  (long unsigned)sizeof(uint32_t)));
2750                 return;
2751         }
2752         if (data.dptr == NULL) {
2753                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2754                 return;
2755         }
2756
2757         timeout = *((uint32_t *)data.dptr);
2758
2759         ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
2760 }
2761
2762 static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
2763                                        void *private_data)
2764 {
2765         struct ctdb_recoverd *rec = talloc_get_type(
2766                 private_data, struct ctdb_recoverd);
2767
2768         srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
2769 }
2770
2771 /*
2772   handler for ip reallocate, just add it to the list of requests and 
2773   handle this later in the monitor_cluster loop so we do not recurse
2774   with other requests to takeover_run()
2775 */
2776 static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
2777                                   void *private_data)
2778 {
2779         struct ctdb_srvid_message *request;
2780         struct ctdb_recoverd *rec = talloc_get_type(
2781                 private_data, struct ctdb_recoverd);
2782
2783         if (data.dsize != sizeof(struct ctdb_srvid_message)) {
2784                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2785                 return;
2786         }
2787
2788         request = (struct ctdb_srvid_message *)data.dptr;
2789
2790         srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
2791 }
2792
2793 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2794                                           struct ctdb_recoverd *rec)
2795 {
2796         TDB_DATA result;
2797         int32_t ret;
2798         uint32_t culprit;
2799         struct srvid_requests *current;
2800
2801         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2802
2803         /* Only process requests that are currently pending.  More
2804          * might come in while the takeover run is in progress and
2805          * they will need to be processed later since they might
2806          * be in response flag changes.
2807          */
2808         current = rec->reallocate_requests;
2809         rec->reallocate_requests = NULL;
2810
2811         /* update the list of public ips that a node can handle for
2812            all connected nodes
2813         */
2814         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2815         if (ret != 0) {
2816                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2817                                  culprit));
2818                 rec->need_takeover_run = true;
2819         }
2820         if (ret == 0) {
2821                 if (do_takeover_run(rec, rec->nodemap, false)) {
2822                         ret = ctdb_get_pnn(ctdb);
2823                 } else {
2824                         ret = -1;
2825                 }
2826         }
2827
2828         result.dsize = sizeof(int32_t);
2829         result.dptr  = (uint8_t *)&ret;
2830
2831         srvid_requests_reply(ctdb, &current, result);
2832 }
2833
2834
2835 /*
2836   handler for recovery master elections
2837 */
2838 static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2839 {
2840         struct ctdb_recoverd *rec = talloc_get_type(
2841                 private_data, struct ctdb_recoverd);
2842         struct ctdb_context *ctdb = rec->ctdb;
2843         int ret;
2844         struct election_message *em = (struct election_message *)data.dptr;
2845
2846         /* Ignore election packets from ourself */
2847         if (ctdb->pnn == em->pnn) {
2848                 return;
2849         }
2850
2851         /* we got an election packet - update the timeout for the election */
2852         talloc_free(rec->election_timeout);
2853         rec->election_timeout = tevent_add_timer(
2854                         ctdb->ev, ctdb,
2855                         fast_start ?
2856                                 timeval_current_ofs(0, 500000) :
2857                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2858                         ctdb_election_timeout, rec);
2859
2860         /* someone called an election. check their election data
2861            and if we disagree and we would rather be the elected node, 
2862            send a new election message to all other nodes
2863          */
2864         if (ctdb_election_win(rec, em)) {
2865                 if (!rec->send_election_te) {
2866                         rec->send_election_te = tevent_add_timer(
2867                                         ctdb->ev, rec,
2868                                         timeval_current_ofs(0, 500000),
2869                                         election_send_request, rec);
2870                 }
2871                 /*unban_all_nodes(ctdb);*/
2872                 return;
2873         }
2874
2875         /* we didn't win */
2876         TALLOC_FREE(rec->send_election_te);
2877
2878         /* Release the recovery lock file */
2879         if (ctdb_recovery_have_lock(ctdb)) {
2880                 ctdb_recovery_unlock(ctdb);
2881                 unban_all_nodes(ctdb);
2882         }
2883
2884         clear_ip_assignment_tree(ctdb);
2885
2886         /* ok, let that guy become recmaster then */
2887         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2888         if (ret != 0) {
2889                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2890                 return;
2891         }
2892
2893         return;
2894 }
2895
2896
2897 /*
2898   force the start of the election process
2899  */
2900 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2901                            struct ctdb_node_map_old *nodemap)
2902 {
2903         int ret;
2904         struct ctdb_context *ctdb = rec->ctdb;
2905
2906         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2907
2908         /* set all nodes to recovery mode to stop all internode traffic */
2909         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE, false);
2910         if (ret != 0) {
2911                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2912                 return;
2913         }
2914
2915         talloc_free(rec->election_timeout);
2916         rec->election_timeout = tevent_add_timer(
2917                         ctdb->ev, ctdb,
2918                         fast_start ?
2919                                 timeval_current_ofs(0, 500000) :
2920                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0),
2921                         ctdb_election_timeout, rec);
2922
2923         ret = send_election_request(rec, pnn);
2924         if (ret!=0) {
2925                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2926                 return;
2927         }
2928
2929         /* wait for a few seconds to collect all responses */
2930         ctdb_wait_election(rec);
2931 }
2932
2933
2934
2935 /*
2936   handler for when a node changes its flags
2937 */
2938 static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
2939 {
2940         struct ctdb_recoverd *rec = talloc_get_type(
2941                 private_data, struct ctdb_recoverd);
2942         struct ctdb_context *ctdb = rec->ctdb;
2943         int ret;
2944         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2945         struct ctdb_node_map_old *nodemap=NULL;
2946         TALLOC_CTX *tmp_ctx;
2947         int i;
2948         int disabled_flag_changed;
2949
2950         if (data.dsize != sizeof(*c)) {
2951                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2952                 return;
2953         }
2954
2955         tmp_ctx = talloc_new(ctdb);
2956         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2957
2958         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2959         if (ret != 0) {
2960                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2961                 talloc_free(tmp_ctx);
2962                 return;         
2963         }
2964
2965
2966         for (i=0;i<nodemap->num;i++) {
2967                 if (nodemap->nodes[i].pnn == c->pnn) break;
2968         }
2969
2970         if (i == nodemap->num) {
2971                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2972                 talloc_free(tmp_ctx);
2973                 return;
2974         }
2975
2976         if (c->old_flags != c->new_flags) {
2977                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2978         }
2979
2980         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2981
2982         nodemap->nodes[i].flags = c->new_flags;
2983
2984         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2985                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2986
2987         if (ret == 0) {
2988                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2989                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2990         }
2991         
2992         if (ret == 0 &&
2993             ctdb->recovery_master == ctdb->pnn &&
2994             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2995                 /* Only do the takeover run if the perm disabled or unhealthy
2996                    flags changed since these will cause an ip failover but not
2997                    a recovery.
2998                    If the node became disconnected or banned this will also
2999                    lead to an ip address failover but that is handled 
3000                    during recovery
3001                 */
3002                 if (disabled_flag_changed) {
3003                         rec->need_takeover_run = true;
3004                 }
3005         }
3006
3007         talloc_free(tmp_ctx);
3008 }
3009
3010 /*
3011   handler for when we need to push out flag changes ot all other nodes
3012 */
3013 static void push_flags_handler(uint64_t srvid, TDB_DATA data,
3014                                void *private_data)
3015 {
3016         struct ctdb_recoverd *rec = talloc_get_type(
3017                 private_data, struct ctdb_recoverd);
3018         struct ctdb_context *ctdb = rec->ctdb;
3019         int ret;
3020         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
3021         struct ctdb_node_map_old *nodemap=NULL;
3022         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3023         uint32_t recmaster;
3024         uint32_t *nodes;
3025
3026         /* find the recovery master */
3027         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
3028         if (ret != 0) {
3029                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
3030                 talloc_free(tmp_ctx);
3031                 return;
3032         }
3033
3034         /* read the node flags from the recmaster */
3035         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
3036         if (ret != 0) {
3037                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
3038                 talloc_free(tmp_ctx);
3039                 return;
3040         }
3041         if (c->pnn >= nodemap->num) {
3042                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
3043                 talloc_free(tmp_ctx);
3044                 return;
3045         }
3046
3047         /* send the flags update to all connected nodes */
3048         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
3049
3050         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
3051                                       nodes, 0, CONTROL_TIMEOUT(),
3052                                       false, data,
3053                                       NULL, NULL,
3054                                       NULL) != 0) {
3055                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
3056
3057                 talloc_free(tmp_ctx);
3058                 return;
3059         }
3060
3061         talloc_free(tmp_ctx);
3062 }
3063
3064
3065 struct verify_recmode_normal_data {
3066         uint32_t count;
3067         enum monitor_result status;
3068 };
3069
3070 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
3071 {
3072         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
3073
3074
3075         /* one more node has responded with recmode data*/
3076         rmdata->count--;
3077
3078         /* if we failed to get the recmode, then return an error and let
3079            the main loop try again.
3080         */
3081         if (state->state != CTDB_CONTROL_DONE) {
3082                 if (rmdata->status == MONITOR_OK) {
3083                         rmdata->status = MONITOR_FAILED;
3084                 }
3085                 return;
3086         }
3087
3088         /* if we got a response, then the recmode will be stored in the
3089            status field
3090         */
3091         if (state->status != CTDB_RECOVERY_NORMAL) {
3092                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
3093                 rmdata->status = MONITOR_RECOVERY_NEEDED;
3094         }
3095
3096         return;
3097 }
3098
3099
3100 /* verify that all nodes are in normal recovery mode */
3101 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
3102 {
3103         struct verify_recmode_normal_data *rmdata;
3104         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3105         struct ctdb_client_control_state *state;
3106         enum monitor_result status;
3107         int j;
3108         
3109         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
3110         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3111         rmdata->count  = 0;
3112         rmdata->status = MONITOR_OK;
3113
3114         /* loop over all active nodes and send an async getrecmode call to 
3115            them*/
3116         for (j=0; j<nodemap->num; j++) {
3117                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3118                         continue;
3119                 }
3120                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
3121                                         CONTROL_TIMEOUT(), 
3122                                         nodemap->nodes[j].pnn);
3123                 if (state == NULL) {
3124                         /* we failed to send the control, treat this as 
3125                            an error and try again next iteration
3126                         */                      
3127                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
3128                         talloc_free(mem_ctx);
3129                         return MONITOR_FAILED;
3130                 }
3131
3132                 /* set up the callback functions */
3133                 state->async.fn = verify_recmode_normal_callback;
3134                 state->async.private_data = rmdata;
3135
3136                 /* one more control to wait for to complete */
3137                 rmdata->count++;
3138         }
3139
3140
3141         /* now wait for up to the maximum number of seconds allowed
3142            or until all nodes we expect a response from has replied
3143         */
3144         while (rmdata->count > 0) {
3145                 tevent_loop_once(ctdb->ev);
3146         }
3147
3148         status = rmdata->status;
3149         talloc_free(mem_ctx);
3150         return status;
3151 }
3152
3153
3154 struct verify_recmaster_data {
3155         struct ctdb_recoverd *rec;
3156         uint32_t count;
3157         uint32_t pnn;
3158         enum monitor_result status;
3159 };
3160
3161 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3162 {
3163         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3164
3165
3166         /* one more node has responded with recmaster data*/
3167         rmdata->count--;
3168
3169         /* if we failed to get the recmaster, then return an error and let
3170            the main loop try again.
3171         */
3172         if (state->state != CTDB_CONTROL_DONE) {
3173                 if (rmdata->status == MONITOR_OK) {
3174                         rmdata->status = MONITOR_FAILED;
3175                 }
3176                 return;
3177         }
3178
3179         /* if we got a response, then the recmaster will be stored in the
3180            status field
3181         */
3182         if (state->status != rmdata->pnn) {
3183                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3184                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3185                 rmdata->status = MONITOR_ELECTION_NEEDED;
3186         }
3187
3188         return;
3189 }
3190
3191
3192 /* verify that all nodes agree that we are the recmaster */
3193 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map_old *nodemap, uint32_t pnn)
3194 {
3195         struct ctdb_context *ctdb = rec->ctdb;
3196         struct verify_recmaster_data *rmdata;
3197         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3198         struct ctdb_client_control_state *state;
3199         enum monitor_result status;
3200         int j;
3201         
3202         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3203         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3204         rmdata->rec    = rec;
3205         rmdata->count  = 0;
3206         rmdata->pnn    = pnn;
3207         rmdata->status = MONITOR_OK;
3208
3209         /* loop over all active nodes and send an async getrecmaster call to 
3210            them*/
3211         for (j=0; j<nodemap->num; j++) {
3212                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3213                         continue;
3214                 }
3215                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3216                                         CONTROL_TIMEOUT(),
3217                                         nodemap->nodes[j].pnn);
3218                 if (state == NULL) {
3219                         /* we failed to send the control, treat this as 
3220                            an error and try again next iteration
3221                         */                      
3222                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3223                         talloc_free(mem_ctx);
3224                         return MONITOR_FAILED;
3225                 }
3226
3227                 /* set up the callback functions */
3228                 state->async.fn = verify_recmaster_callback;
3229                 state->async.private_data = rmdata;
3230
3231                 /* one more control to wait for to complete */
3232                 rmdata->count++;
3233         }
3234
3235
3236         /* now wait for up to the maximum number of seconds allowed
3237            or until all nodes we expect a response from has replied
3238         */
3239         while (rmdata->count > 0) {
3240                 tevent_loop_once(ctdb->ev);
3241         }
3242
3243         status = rmdata->status;
3244         talloc_free(mem_ctx);
3245         return status;
3246 }
3247
3248 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3249                                     struct ctdb_recoverd *rec)
3250 {
3251         struct ctdb_iface_list_old *ifaces = NULL;
3252         TALLOC_CTX *mem_ctx;
3253         bool ret = false;
3254
3255         mem_ctx = talloc_new(NULL);
3256
3257         /* Read the interfaces from the local node */
3258         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3259                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3260                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3261                 /* We could return an error.  However, this will be
3262                  * rare so we'll decide that the interfaces have
3263                  * actually changed, just in case.
3264                  */
3265                 talloc_free(mem_ctx);
3266                 return true;
3267         }
3268
3269         if (!rec->ifaces) {
3270                 /* We haven't been here before so things have changed */
3271                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3272                 ret = true;
3273         } else if (rec->ifaces->num != ifaces->num) {
3274                 /* Number of interfaces has changed */
3275                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3276                                      rec->ifaces->num, ifaces->num));
3277                 ret = true;
3278         } else {
3279                 /* See if interface names or link states have changed */
3280                 int i;
3281                 for (i = 0; i < rec->ifaces->num; i++) {
3282                         struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
3283                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3284                                 DEBUG(DEBUG_NOTICE,
3285                                       ("Interface in slot %d changed: %s => %s\n",
3286                                        i, iface->name, ifaces->ifaces[i].name));
3287                                 ret = true;
3288                                 break;
3289                         }
3290                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3291                                 DEBUG(DEBUG_NOTICE,
3292                                       ("Interface %s changed state: %d => %d\n",
3293                                        iface->name, iface->link_state,
3294                                        ifaces->ifaces[i].link_state));
3295                                 ret = true;
3296                                 break;
3297                         }
3298                 }
3299         }
3300
3301         talloc_free(rec->ifaces);
3302         rec->ifaces = talloc_steal(rec, ifaces);
3303
3304         talloc_free(mem_ctx);
3305         return ret;
3306 }
3307
3308 /* called to check that the local allocation of public ip addresses is ok.
3309 */
3310 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map_old *nodemap)
3311 {
3312         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3313         int ret, j;
3314         bool need_takeover_run = false;
3315
3316         if (interfaces_have_changed(ctdb, rec)) {
3317                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3318                                      "local node %u - force takeover run\n",
3319                                      pnn));
3320                 need_takeover_run = true;
3321         }
3322
3323         /* verify that we have the ip addresses we should have
3324            and we don't have ones we shouldnt have.
3325            if we find an inconsistency we set recmode to
3326            active on the local node and wait for the recmaster
3327            to do a full blown recovery.
3328            also if the pnn is -1 and we are healthy and can host the ip
3329            we also request a ip reallocation.
3330         */
3331         if (ctdb->tunable.disable_ip_failover == 0) {
3332                 struct ctdb_public_ip_list_old *ips = NULL;
3333
3334                 /* read the *available* IPs from the local node */
3335                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3336                 if (ret != 0) {
3337                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3338                         talloc_free(mem_ctx);
3339                         return -1;
3340                 }
3341
3342                 for (j=0; j<ips->num; j++) {
3343                         if (ips->ips[j].pnn == -1 &&
3344                             nodemap->nodes[pnn].flags == 0) {
3345                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3346                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3347                                 need_takeover_run = true;
3348                         }
3349                 }
3350
3351                 talloc_free(ips);
3352
3353                 /* read the *known* IPs from the local node */
3354                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3355                 if (ret != 0) {
3356                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3357                         talloc_free(mem_ctx);
3358                         return -1;
3359                 }
3360
3361                 for (j=0; j<ips->num; j++) {
3362                         if (ips->ips[j].pnn == pnn) {
3363                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3364                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3365                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3366                                         need_takeover_run = true;
3367                                 }
3368                         } else {
3369                                 if (ctdb->do_checkpublicip &&
3370                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3371
3372                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3373                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3374
3375                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3376                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3377                                         }
3378                                 }
3379                         }
3380                 }
3381         }
3382
3383         if (need_takeover_run) {
3384                 struct ctdb_srvid_message rd;
3385                 TDB_DATA data;
3386
3387                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3388
3389                 rd.pnn = ctdb->pnn;
3390                 rd.srvid = 0;
3391                 data.dptr = (uint8_t *)&rd;
3392                 data.dsize = sizeof(rd);
3393
3394                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3395                 if (ret != 0) {
3396                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3397                 }
3398         }
3399         talloc_free(mem_ctx);
3400         return 0;
3401 }
3402
3403
3404 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3405 {
3406         struct ctdb_node_map_old **remote_nodemaps = callback_data;
3407
3408         if (node_pnn >= ctdb->num_nodes) {
3409                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3410                 return;
3411         }
3412
3413         remote_nodemaps[node_pnn] = (struct ctdb_node_map_old *)talloc_steal(remote_nodemaps, outdata.dptr);
3414
3415 }
3416
3417 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3418         struct ctdb_node_map_old *nodemap,
3419         struct ctdb_node_map_old **remote_nodemaps)
3420 {
3421         uint32_t *nodes;
3422
3423         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3424         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3425                                         nodes, 0,
3426                                         CONTROL_TIMEOUT(), false, tdb_null,
3427                                         async_getnodemap_callback,
3428                                         NULL,
3429                                         remote_nodemaps) != 0) {
3430                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3431
3432                 return -1;
3433         }
3434
3435         return 0;
3436 }
3437
3438 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3439 {
3440         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3441         const char *reclockfile;
3442
3443         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3444                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3445                 talloc_free(tmp_ctx);
3446                 return -1;      
3447         }
3448
3449         if (reclockfile == NULL) {
3450                 if (ctdb->recovery_lock_file != NULL) {
3451                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3452                         talloc_free(ctdb->recovery_lock_file);
3453                         ctdb->recovery_lock_file = NULL;
3454                         ctdb_recovery_unlock(ctdb);
3455                 }
3456                 talloc_free(tmp_ctx);
3457                 return 0;
3458         }
3459
3460         if (ctdb->recovery_lock_file == NULL) {
3461                 DEBUG(DEBUG_NOTICE,
3462                       ("Recovery lock file enabled (%s)\n", reclockfile));
3463                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3464                 ctdb_recovery_unlock(ctdb);
3465                 talloc_free(tmp_ctx);
3466                 return 0;
3467         }
3468
3469
3470         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3471                 talloc_free(tmp_ctx);
3472                 return 0;
3473         }
3474
3475         DEBUG(DEBUG_NOTICE,
3476               ("Recovery lock file changed (now %s)\n", reclockfile));
3477         talloc_free(ctdb->recovery_lock_file);
3478         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3479         ctdb_recovery_unlock(ctdb);
3480
3481         talloc_free(tmp_ctx);
3482         return 0;
3483 }
3484
3485 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3486                       TALLOC_CTX *mem_ctx)
3487 {
3488         uint32_t pnn;
3489         struct ctdb_node_map_old *nodemap=NULL;
3490         struct ctdb_node_map_old *recmaster_nodemap=NULL;
3491         struct ctdb_node_map_old **remote_nodemaps=NULL;
3492         struct ctdb_vnn_map *vnnmap=NULL;
3493         struct ctdb_vnn_map *remote_vnnmap=NULL;
3494         uint32_t num_lmasters;
3495         int32_t debug_level;
3496         int i, j, ret;
3497         bool self_ban;
3498
3499
3500         /* verify that the main daemon is still running */
3501         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3502                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3503                 exit(-1);
3504         }
3505
3506         /* ping the local daemon to tell it we are alive */
3507         ctdb_ctrl_recd_ping(ctdb);
3508
3509         if (rec->election_timeout) {
3510                 /* an election is in progress */
3511                 return;
3512         }
3513
3514         /* read the debug level from the parent and update locally */
3515         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3516         if (ret !=0) {
3517                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3518                 return;
3519         }
3520         DEBUGLEVEL = debug_level;
3521
3522         /* get relevant tunables */
3523         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3524         if (ret != 0) {
3525                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3526                 return;
3527         }
3528
3529         /* get runstate */
3530         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3531                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3532         if (ret != 0) {
3533                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3534                 return;
3535         }
3536
3537         /* get the current recovery lock file from the server */
3538         if (update_recovery_lock_file(ctdb) != 0) {
3539                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3540                 return;
3541         }
3542
3543         /* Make sure that if recovery lock verification becomes disabled when
3544            we close the file
3545         */
3546         if (ctdb->recovery_lock_file == NULL) {
3547                 ctdb_recovery_unlock(ctdb);
3548         }
3549
3550         pnn = ctdb_get_pnn(ctdb);
3551
3552         /* get the vnnmap */
3553         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3554         if (ret != 0) {
3555                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3556                 return;
3557         }
3558
3559
3560         /* get number of nodes */
3561         if (rec->nodemap) {
3562                 talloc_free(rec->nodemap);
3563                 rec->nodemap = NULL;
3564                 nodemap=NULL;
3565         }
3566         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3567         if (ret != 0) {
3568                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3569                 return;
3570         }
3571         nodemap = rec->nodemap;
3572
3573         /* remember our own node flags */
3574         rec->node_flags = nodemap->nodes[pnn].flags;
3575
3576         ban_misbehaving_nodes(rec, &self_ban);
3577         if (self_ban) {
3578                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3579                 return;
3580         }
3581
3582         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3583            also frozen and that the recmode is set to active.
3584         */
3585         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3586                 /* If this node has become inactive then we want to
3587                  * reduce the chances of it taking over the recovery
3588                  * master role when it becomes active again.  This
3589                  * helps to stabilise the recovery master role so that
3590                  * it stays on the most stable node.
3591                  */
3592                 rec->priority_time = timeval_current();
3593
3594                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3595                 if (ret != 0) {
3596                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3597                 }
3598                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3599                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3600
3601                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3602                         if (ret != 0) {
3603                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3604
3605                                 return;
3606                         }
3607                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3608                         if (ret != 0) {
3609                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3610                                 return;
3611                         }
3612                 }
3613
3614                 /* If this node is stopped or banned then it is not the recovery
3615                  * master, so don't do anything. This prevents stopped or banned
3616                  * node from starting election and sending unnecessary controls.
3617                  */
3618                 return;
3619         }
3620
3621         /* check which node is the recovery master */
3622         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3623         if (ret != 0) {
3624                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3625                 return;
3626         }
3627
3628         /* If we are not the recmaster then do some housekeeping */
3629         if (rec->recmaster != pnn) {
3630                 /* Ignore any IP reallocate requests - only recmaster
3631                  * processes them
3632                  */
3633                 TALLOC_FREE(rec->reallocate_requests);
3634                 /* Clear any nodes that should be force rebalanced in
3635                  * the next takeover run.  If the recovery master role
3636                  * has moved then we don't want to process these some
3637                  * time in the future.
3638                  */
3639                 TALLOC_FREE(rec->force_rebalance_nodes);
3640         }
3641
3642         /* This is a special case.  When recovery daemon is started, recmaster
3643          * is set to -1.  If a node is not started in stopped state, then
3644          * start election to decide recovery master
3645          */
3646         if (rec->recmaster == (uint32_t)-1) {
3647                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3648                 force_election(rec, pnn, nodemap);
3649                 return;
3650         }
3651
3652         /* update the capabilities for all nodes */
3653         ret = update_capabilities(rec, nodemap);
3654         if (ret != 0) {
3655                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3656                 return;
3657         }
3658
3659         /*
3660          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3661          * but we have, then force an election and try to become the new
3662          * recmaster.
3663          */
3664         if (!ctdb_node_has_capabilities(rec->caps,
3665                                         rec->recmaster,
3666                                         CTDB_CAP_RECMASTER) &&
3667             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3668             !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3669                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3670                                   " but we (node %u) have - force an election\n",
3671                                   rec->recmaster, pnn));
3672                 force_election(rec, pnn, nodemap);
3673                 return;
3674         }
3675
3676         /* verify that the recmaster node is still active */
3677         for (j=0; j<nodemap->num; j++) {
3678                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3679                         break;
3680                 }
3681         }
3682
3683         if (j == nodemap->num) {
3684                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3685                 force_election(rec, pnn, nodemap);
3686                 return;
3687         }
3688
3689         /* if recovery master is disconnected we must elect a new recmaster */
3690         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3691                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3692                 force_election(rec, pnn, nodemap);
3693                 return;
3694         }
3695
3696         /* get nodemap from the recovery master to check if it is inactive */
3697         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3698                                    mem_ctx, &recmaster_nodemap);
3699         if (ret != 0) {
3700                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3701                           nodemap->nodes[j].pnn));
3702                 return;
3703         }
3704
3705
3706         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3707             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3708                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3709                 /*
3710                  * update our nodemap to carry the recmaster's notion of
3711                  * its own flags, so that we don't keep freezing the
3712                  * inactive recmaster node...
3713                  */
3714                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3715                 force_election(rec, pnn, nodemap);
3716                 return;
3717         }
3718
3719         /* verify that we have all ip addresses we should have and we dont
3720          * have addresses we shouldnt have.
3721          */ 
3722         if (ctdb->tunable.disable_ip_failover == 0 &&
3723             !ctdb_op_is_disabled(rec->takeover_run)) {
3724                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3725                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3726                 }
3727         }
3728
3729
3730         /* if we are not the recmaster then we do not need to check
3731            if recovery is needed
3732          */
3733         if (pnn != rec->recmaster) {
3734                 return;
3735         }
3736
3737
3738         /* ensure our local copies of flags are right */
3739         ret = update_local_flags(rec, nodemap);
3740         if (ret == MONITOR_ELECTION_NEEDED) {
3741                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3742                 force_election(rec, pnn, nodemap);
3743                 return;
3744         }
3745         if (ret != MONITOR_OK) {
3746                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3747                 return;
3748         }
3749
3750         if (ctdb->num_nodes != nodemap->num) {
3751                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3752                 ctdb_load_nodes_file(ctdb);
3753                 return;
3754         }
3755
3756         /* verify that all active nodes agree that we are the recmaster */
3757         switch (verify_recmaster(rec, nodemap, pnn)) {
3758         case MONITOR_RECOVERY_NEEDED:
3759                 /* can not happen */
3760                 return;
3761         case MONITOR_ELECTION_NEEDED:
3762                 force_election(rec, pnn, nodemap);
3763                 return;
3764         case MONITOR_OK:
3765                 break;
3766         case MONITOR_FAILED:
3767                 return;
3768         }
3769
3770
3771         if (rec->need_recovery) {
3772                 /* a previous recovery didn't finish */
3773                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3774                 return;
3775         }
3776
3777         /* verify that all active nodes are in normal mode 
3778            and not in recovery mode 
3779         */
3780         switch (verify_recmode(ctdb, nodemap)) {
3781         case MONITOR_RECOVERY_NEEDED:
3782                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3783                 return;
3784         case MONITOR_FAILED:
3785                 return;
3786         case MONITOR_ELECTION_NEEDED:
3787                 /* can not happen */
3788         case MONITOR_OK:
3789                 break;
3790         }
3791
3792
3793         if (ctdb->recovery_lock_file != NULL) {
3794                 /* We must already hold the recovery lock */
3795                 if (!ctdb_recovery_have_lock(ctdb)) {
3796                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3797                         ctdb_set_culprit(rec, ctdb->pnn);
3798                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3799                         return;
3800                 }
3801         }
3802
3803
3804         /* if there are takeovers requested, perform it and notify the waiters */
3805         if (!ctdb_op_is_disabled(rec->takeover_run) &&
3806             rec->reallocate_requests) {
3807                 process_ipreallocate_requests(ctdb, rec);
3808         }
3809
3810         /* If recoveries are disabled then there is no use doing any
3811          * nodemap or flags checks.  Recoveries might be disabled due
3812          * to "reloadnodes", so doing these checks might cause an
3813          * unnecessary recovery.  */
3814         if (ctdb_op_is_disabled(rec->recovery)) {
3815                 return;
3816         }
3817
3818         /* get the nodemap for all active remote nodes
3819          */
3820         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map_old *, nodemap->num);
3821         if (remote_nodemaps == NULL) {
3822                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3823                 return;
3824         }
3825         for(i=0; i<nodemap->num; i++) {
3826                 remote_nodemaps[i] = NULL;
3827         }
3828         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3829                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3830                 return;
3831         } 
3832
3833         /* verify that all other nodes have the same nodemap as we have
3834         */
3835         for (j=0; j<nodemap->num; j++) {
3836                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3837                         continue;
3838                 }
3839
3840                 if (remote_nodemaps[j] == NULL) {
3841                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3842                         ctdb_set_culprit(rec, j);
3843
3844                         return;
3845                 }
3846
3847                 /* if the nodes disagree on how many nodes there are
3848                    then this is a good reason to try recovery
3849                  */
3850                 if (remote_nodemaps[j]->num != nodemap->num) {
3851                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3852                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3853                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3854                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3855                         return;
3856                 }
3857
3858                 /* if the nodes disagree on which nodes exist and are
3859                    active, then that is also a good reason to do recovery
3860                  */
3861                 for (i=0;i<nodemap->num;i++) {
3862                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3863                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3864                                           nodemap->nodes[j].pnn, i, 
3865                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3866                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3867                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3868                                             vnnmap);
3869                                 return;
3870                         }
3871                 }
3872         }
3873
3874         /*
3875          * Update node flags obtained from each active node. This ensure we have
3876          * up-to-date information for all the nodes.
3877          */
3878         for (j=0; j<nodemap->num; j++) {
3879                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3880                         continue;
3881                 }
3882                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3883         }
3884
3885         for (j=0; j<nodemap->num; j++) {
3886                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3887                         continue;
3888                 }
3889
3890                 /* verify the flags are consistent
3891                 */
3892                 for (i=0; i<nodemap->num; i++) {
3893                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3894                                 continue;
3895                         }
3896                         
3897                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3898                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3899                                   nodemap->nodes[j].pnn, 
3900                                   nodemap->nodes[i].pnn, 
3901                                   remote_nodemaps[j]->nodes[i].flags,
3902                                   nodemap->nodes[i].flags));
3903                                 if (i == j) {
3904                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3905                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3906                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3907                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3908                                                     vnnmap);
3909                                         return;
3910                                 } else {
3911                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3912                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3913                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3914                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3915                                                     vnnmap);
3916                                         return;
3917                                 }
3918                         }
3919                 }
3920         }
3921
3922
3923         /* count how many active nodes there are */
3924         num_lmasters  = 0;
3925         for (i=0; i<nodemap->num; i++) {
3926                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3927                         if (ctdb_node_has_capabilities(rec->caps,
3928                                                        ctdb->nodes[i]->pnn,
3929                                                        CTDB_CAP_LMASTER)) {
3930                                 num_lmasters++;
3931                         }
3932                 }
3933         }
3934
3935
3936         /* There must be the same number of lmasters in the vnn map as
3937          * there are active nodes with the lmaster capability...  or
3938          * do a recovery.
3939          */
3940         if (vnnmap->size != num_lmasters) {
3941                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3942                           vnnmap->size, num_lmasters));
3943                 ctdb_set_culprit(rec, ctdb->pnn);
3944                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3945                 return;
3946         }
3947
3948         /* verify that all active nodes in the nodemap also exist in 
3949            the vnnmap.
3950          */
3951         for (j=0; j<nodemap->num; j++) {
3952                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3953                         continue;
3954                 }
3955                 if (nodemap->nodes[j].pnn == pnn) {
3956                         continue;
3957                 }
3958
3959                 for (i=0; i<vnnmap->size; i++) {
3960                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3961                                 break;
3962                         }
3963                 }
3964                 if (i == vnnmap->size) {
3965                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3966                                   nodemap->nodes[j].pnn));
3967                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3968                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3969                         return;
3970                 }
3971         }
3972
3973         
3974         /* verify that all other nodes have the same vnnmap
3975            and are from the same generation
3976          */
3977         for (j=0; j<nodemap->num; j++) {
3978                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3979                         continue;
3980                 }
3981                 if (nodemap->nodes[j].pnn == pnn) {
3982                         continue;
3983                 }
3984
3985                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3986                                           mem_ctx, &remote_vnnmap);
3987                 if (ret != 0) {
3988                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3989                                   nodemap->nodes[j].pnn));
3990                         return;
3991                 }
3992
3993                 /* verify the vnnmap generation is the same */
3994                 if (vnnmap->generation != remote_vnnmap->generation) {
3995                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3996                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3997                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3998                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3999                         return;
4000                 }
4001
4002                 /* verify the vnnmap size is the same */
4003                 if (vnnmap->size != remote_vnnmap->size) {
4004                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4005                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4006                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4007                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4008                         return;
4009                 }
4010
4011                 /* verify the vnnmap is the same */
4012                 for (i=0;i<vnnmap->size;i++) {
4013                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4014                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4015                                           nodemap->nodes[j].pnn));
4016                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4017                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4018                                             vnnmap);
4019                                 return;
4020                         }
4021                 }
4022         }
4023
4024         /* we might need to change who has what IP assigned */
4025         if (rec->need_takeover_run) {
4026                 uint32_t culprit = (uint32_t)-1;
4027
4028                 rec->need_takeover_run = false;
4029
4030                 /* update the list of public ips that a node can handle for
4031                    all connected nodes
4032                 */
4033                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4034                 if (ret != 0) {
4035                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4036                                          culprit));
4037                         rec->need_takeover_run = true;
4038                         return;
4039                 }
4040
4041                 /* If takeover run fails, then the offending nodes are
4042                  * assigned ban culprit counts. And we re-try takeover.
4043                  * If takeover run fails repeatedly, the node would get
4044                  * banned.
4045                  */
4046                 do_takeover_run(rec, nodemap, true);
4047         }
4048 }
4049
4050 /*
4051   the main monitoring loop
4052  */
4053 static void monitor_cluster(struct ctdb_context *ctdb)
4054 {
4055         struct ctdb_recoverd *rec;
4056
4057         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4058
4059         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4060         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4061
4062         rec->ctdb = ctdb;
4063
4064         rec->takeover_run = ctdb_op_init(rec, "takeover runs");
4065         CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
4066
4067         rec->recovery = ctdb_op_init(rec, "recoveries");
4068         CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
4069
4070         rec->priority_time = timeval_current();
4071
4072         /* register a message port for sending memory dumps */
4073         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4074
4075         /* register a message port for recovery elections */
4076         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
4077
4078         /* when nodes are disabled/enabled */
4079         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4080
4081         /* when we are asked to puch out a flag change */
4082         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4083
4084         /* register a message port for vacuum fetch */
4085         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4086
4087         /* register a message port for reloadnodes  */
4088         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4089
4090         /* register a message port for performing a takeover run */
4091         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4092
4093         /* register a message port for disabling the ip check for a short while */
4094         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4095
4096         /* register a message port for updating the recovery daemons node assignment for an ip */
4097         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4098
4099         /* register a message port for forcing a rebalance of a node next
4100            reallocation */
4101         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4102
4103         /* Register a message port for disabling takeover runs */
4104         ctdb_client_set_message_handler(ctdb,
4105                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4106                                         disable_takeover_runs_handler, rec);
4107
4108         /* Register a message port for disabling recoveries */
4109         ctdb_client_set_message_handler(ctdb,
4110                                         CTDB_SRVID_DISABLE_RECOVERIES,
4111                                         disable_recoveries_handler, rec);
4112
4113         /* register a message port for detaching database */
4114         ctdb_client_set_message_handler(ctdb,
4115                                         CTDB_SRVID_DETACH_DATABASE,
4116                                         detach_database_handler, rec);
4117
4118         for (;;) {
4119                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4120                 struct timeval start;
4121                 double elapsed;
4122
4123                 if (!mem_ctx) {
4124                         DEBUG(DEBUG_CRIT,(__location__
4125                                           " Failed to create temp context\n"));
4126                         exit(-1);
4127                 }
4128
4129                 start = timeval_current();
4130                 main_loop(ctdb, rec, mem_ctx);
4131                 talloc_free(mem_ctx);
4132
4133                 /* we only check for recovery once every second */
4134                 elapsed = timeval_elapsed(&start);
4135                 if (elapsed < ctdb->tunable.recover_interval) {
4136                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4137                                           - elapsed);
4138                 }
4139         }
4140 }
4141
4142 /*
4143   event handler for when the main ctdbd dies
4144  */
4145 static void ctdb_recoverd_parent(struct tevent_context *ev,
4146                                  struct tevent_fd *fde,
4147                                  uint16_t flags, void *private_data)
4148 {
4149         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4150         _exit(1);
4151 }
4152
4153 /*
4154   called regularly to verify that the recovery daemon is still running
4155  */
4156 static void ctdb_check_recd(struct tevent_context *ev,
4157                             struct tevent_timer *te,
4158                             struct timeval yt, void *p)
4159 {
4160         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4161
4162         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4163                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4164
4165                 tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
4166                                  ctdb_restart_recd, ctdb);
4167
4168                 return;
4169         }
4170
4171         tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4172                          timeval_current_ofs(30, 0),
4173                          ctdb_check_recd, ctdb);
4174 }
4175
4176 static void recd_sig_child_handler(struct tevent_context *ev,
4177                                    struct tevent_signal *se, int signum,
4178                                    int count, void *dont_care,
4179                                    void *private_data)
4180 {
4181 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4182         int status;
4183         pid_t pid = -1;
4184
4185         while (pid != 0) {
4186                 pid = waitpid(-1, &status, WNOHANG);
4187                 if (pid == -1) {
4188                         if (errno != ECHILD) {
4189                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4190                         }
4191                         return;
4192                 }
4193                 if (pid > 0) {
4194                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4195                 }
4196         }
4197 }
4198
4199 /*
4200   startup the recovery daemon as a child of the main ctdb daemon
4201  */
4202 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4203 {
4204         int fd[2];
4205         struct tevent_signal *se;
4206         struct tevent_fd *fde;
4207
4208         if (pipe(fd) != 0) {
4209                 return -1;
4210         }
4211
4212         ctdb->recoverd_pid = ctdb_fork(ctdb);
4213         if (ctdb->recoverd_pid == -1) {
4214                 return -1;
4215         }
4216
4217         if (ctdb->recoverd_pid != 0) {
4218                 talloc_free(ctdb->recd_ctx);
4219                 ctdb->recd_ctx = talloc_new(ctdb);
4220                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4221
4222                 close(fd[0]);
4223                 tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
4224                                  timeval_current_ofs(30, 0),
4225                                  ctdb_check_recd, ctdb);
4226                 return 0;
4227         }
4228
4229         close(fd[1]);
4230
4231         srandom(getpid() ^ time(NULL));
4232
4233         ctdb_set_process_name("ctdb_recovered");
4234         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4235                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4236                 exit(1);
4237         }
4238
4239         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4240
4241         fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
4242                             ctdb_recoverd_parent, &fd[0]);
4243         tevent_fd_set_auto_close(fde);
4244
4245         /* set up a handler to pick up sigchld */
4246         se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
4247                                recd_sig_child_handler, ctdb);
4248         if (se == NULL) {
4249                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4250                 exit(1);
4251         }
4252
4253         monitor_cluster(ctdb);
4254
4255         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4256         return -1;
4257 }
4258
4259 /*
4260   shutdown the recovery daemon
4261  */
4262 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4263 {
4264         if (ctdb->recoverd_pid == 0) {
4265                 return;
4266         }
4267
4268         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4269         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4270
4271         TALLOC_FREE(ctdb->recd_ctx);
4272         TALLOC_FREE(ctdb->recd_ping_count);
4273 }
4274
4275 static void ctdb_restart_recd(struct tevent_context *ev,
4276                               struct tevent_timer *te,
4277                               struct timeval t, void *private_data)
4278 {
4279         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4280
4281         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4282         ctdb_stop_recoverd(ctdb);
4283         ctdb_start_recoverd(ctdb);
4284 }